Counting¶
This is a slightly more complex example on how to communicate with the INTERSECT SDK, how to manage state on the service, complex entrypoints on the service, and how to send multiple messages with the client.
The counting_service class has three entrypoints: start_count, stop_count, and reset_count. These entrypoints all return complex responses, and reset_count allows for specifying if the counter should count again or not.
To run these locally, you must first make sure that you have all necessary backing services running; see the Installation and usage page for details on how to do this.
Afterwards, you can start the service in one terminal, wait a brief moment, then finally run the client in another terminal.
Service walkthrough¶
import logging
import threading
import time
from dataclasses import dataclass
from typing import Annotated
from pydantic import BaseModel, Field
from intersect_sdk import (
HierarchyConfig,
IntersectBaseCapabilityImplementation,
IntersectService,
IntersectServiceConfig,
default_intersect_lifecycle_loop,
intersect_message,
intersect_status,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CountingServiceCapabilityImplementationState(BaseModel):
"""We can't just use any class to represent state. This class either needs to extend Pydantic's BaseModel class, or be a dataclass. Both the Python standard library's dataclass and Pydantic's dataclass are valid."""
count: Annotated[int, Field(default=0, ge=0)]
"""
Generic integer state - increases/decreases as needed
Note the annotations - this advertises to the schema that the default value is 0,
and that the value should always be greater than or equal to 0.
"""
counting: bool = False
"""
True if the count thread is running, False if it's not
"""
@dataclass
class CountingServiceCapabilityImplementationResponse:
"""This class is used as a reply to messages which may not do anything.
It's also an example of using a dataclass instead of Pydantic's BaseModel.
"""
state: CountingServiceCapabilityImplementationState
"""
We wrap the state in the response
"""
success: bool
"""
If true: message caused a change. If false: it did not.
"""
class CountingServiceCapabilityImplementation(IntersectBaseCapabilityImplementation):
"""This example is meant to showcase that your implementation is able to track state if you want it to.
Please note that this is not an especially robust implementation, as in the instance
the service gets two messages at the same time, it may manage to create
two threads at once.
"""
intersect_sdk_capability_name = 'CountingExample'
def __init__(self) -> None:
"""Constructors are never exposed to INTERSECT.
You are free to provide whatever parameters you like to the constructor, and
do whatever you like in the constructor. In this instance, we just initialize our state.
"""
super().__init__()
self.state = CountingServiceCapabilityImplementationState()
self.counter_thread: threading.Thread | None = None
@intersect_status()
def status(self) -> CountingServiceCapabilityImplementationState:
"""Basic status function communicates our current state.
We set the status interval in the configuration to be 30 seconds long - if you
run the service without the client, and set the log level
of intersect-sdk to DEBUG, then you'll be able to see the message
every 30 seconds in your terminal. (By default, this value is 5 minutes.)
"""
return self.state
@intersect_message()
def start_count(self) -> CountingServiceCapabilityImplementationResponse:
"""Start the counter (potentially from any number). "Fails" if the counter is already running.
Returns:
A CountingServiceCapabilityImplementationResponse object. The success value will be:
True - if counter was started successfully
False - if counter was already running and this was called
"""
if self.state.counting:
return CountingServiceCapabilityImplementationResponse(
state=self.state,
success=False,
)
self.state.counting = True
self.counter_thread = threading.Thread(
target=self._run_count,
daemon=True,
name='counter_thread',
)
self.counter_thread.start()
return CountingServiceCapabilityImplementationResponse(
state=self.state,
success=True,
)
@intersect_message()
def stop_count(self) -> CountingServiceCapabilityImplementationResponse:
"""Stop the new ticker.
Returns:
A CountingServiceCapabilityImplementationResponse object. The success value will be:
True - if counter was stopped successfully
False - if counter was already not running and this was called
"""
if not self.state.counting:
return CountingServiceCapabilityImplementationResponse(
state=self.state,
success=False,
)
self.state.counting = False
self.counter_thread.join()
self.counter_thread = None
return CountingServiceCapabilityImplementationResponse(
state=self.state,
success=True,
)
@intersect_message()
def reset_count(self, start_again: bool) -> CountingServiceCapabilityImplementationState:
"""Set the counter back to 0.
Params
start_again: if True, start the counter again; if False, the
counter will remain off.
Returns:
the state BEFORE the counter was reset
"""
original_state = self.state.model_copy()
if self.state.counting:
self.stop_count()
self.state.count = 0
if start_again:
self.start_count()
return original_state
def _run_count(self) -> None:
"""This is an example of a function which will NOT be exposed to INTERSECT.
This is just the thread which increments the counter.
"""
while self.state.counting:
self.state.count += 1
time.sleep(1.0)
if __name__ == '__main__':
from_config_file = {
'brokers': [
{
'username': 'intersect_username',
'password': 'intersect_password',
'port': 1883,
'protocol': 'mqtt5.0',
},
],
}
config = IntersectServiceConfig(
hierarchy=HierarchyConfig(
organization='counting-organization',
facility='counting-facility',
system='counting-system',
subsystem='counting-subsystem',
service='counting-service',
),
status_interval=30.0,
**from_config_file,
)
capability = CountingServiceCapabilityImplementation()
service = IntersectService([capability], config)
logger.info('Starting counting_service, use Ctrl+C to exit.')
default_intersect_lifecycle_loop(
service,
)
Client walkthrough¶
import json
import logging
import time
from intersect_sdk import (
INTERSECT_RESPONSE_VALUE,
IntersectClient,
IntersectClientCallback,
IntersectClientConfig,
IntersectDirectMessageParams,
default_intersect_lifecycle_loop,
)
logging.basicConfig(level=logging.INFO)
class SampleOrchestrator:
"""This class contains the callback function.
It uses a class because we want to modify our own state from the callback function.
State is managed through a message stack. We initialize a request-reply-request-reply... chain with the Service,
and the chain ends once we've popped all messages from our message stack.
"""
def __init__(self) -> None:
"""Basic constructor for the orchestrator class, call before creating the IntersectClient.
As the only thing exposed to the Client is the callback function, the orchestrator class may otherwise be
created and managed as the SDK developer sees fit.
The messages are initialized in the order they are sent for readability purposes.
The message stack is a tuple: the message, and the time to wait before sending it.
All approximations in comments come from the first run against the service, but the values can potentially vary
if you run the client multiple times!
"""
self.message_stack = [
# wait 5 seconds before stopping the counter. "Count" in response will be approx. 6
(
IntersectDirectMessageParams(
destination='counting-organization.counting-facility.counting-system.counting-subsystem.counting-service',
operation='CountingExample.stop_count',
payload=None,
),
5.0,
),
# start the counter up again - it will not be 0 at this point! "Count" in response will be approx. 7
(
IntersectDirectMessageParams(
destination='counting-organization.counting-facility.counting-system.counting-subsystem.counting-service',
operation='CountingExample.start_count',
payload=None,
),
1.0,
),
# reset the counter, but have it immediately start running again. "Count" in response will be approx. 10
(
IntersectDirectMessageParams(
destination='counting-organization.counting-facility.counting-system.counting-subsystem.counting-service',
operation='CountingExample.reset_count',
payload=True,
),
3.0,
),
# reset the counter, but don't have it run again. "Count" in response will be approx. 6
(
IntersectDirectMessageParams(
destination='counting-organization.counting-facility.counting-system.counting-subsystem.counting-service',
operation='CountingExample.reset_count',
payload=False,
),
5.0,
),
# start the counter back up. "Count" in response will be approx. 1
(
IntersectDirectMessageParams(
destination='counting-organization.counting-facility.counting-system.counting-subsystem.counting-service',
operation='CountingExample.start_count',
payload=None,
),
3.0,
),
# finally, stop the counter one last time. "Count" in response will be approx. 4
(
IntersectDirectMessageParams(
destination='counting-organization.counting-facility.counting-system.counting-subsystem.counting-service',
operation='CountingExample.stop_count',
payload=None,
),
3.0,
),
# if you ran through all the messages, the counter value should be in the range of 3-6 .
]
self.message_stack.reverse()
def client_callback(
self, source: str, operation: str, _has_error: bool, payload: INTERSECT_RESPONSE_VALUE
) -> IntersectClientCallback:
"""This simply prints the response from the Service to your console.
In this case, we only send one message at a time, and will not send another message until we get a response back from the Service.
If we have additional messages in our stack, we'll wait a little bit (based on the float value in the stack) before we
send the next message to the Service.
When we've exhausted our message stack, we just throw an Exception to break out of the pub/sub loop.
Params:
source: the source of the response message. In this case it will always be from the counting_service.
operation: the name of the function we called in the original message. For example, since we call "start_count" in our first request,
our first response will have "start_count" as the value.
_has_error: Boolean value which represents an error. Since there is never an error in this example, it will always be "False".
payload: Value of the response from the Service. The typing of the payload varies, based on the operation called and whether or not
_has_error was set to "True". In this case, since we do not have an error, we can defer to the operation's response type. This response type is
will be a dictionary resembling either "CountingServiceCapabilityImplementationState" or "CountingServiceCapabilityImplementationResponse",
depending on the operation called.
Note that the payload will always be a deserialized Python object, but the types are fairly limited: str, bool, float, int, None, List[T], and Dict[str, T]
are the only types the payload can have. "T" in this case can be any of the 7 types just mentioned. Since both response types from the Service are
classes*, this will be represented as a Dict[str, ...].
*The only exception to the "Class gets deserialized to a dictionary" rule is if the class inherits from Python's NamedTuple builtin, in which case
it will be deserialized as a List.
"""
print('Source:', json.dumps(source))
print('Operation:', json.dumps(operation))
print('Payload:', json.dumps(payload))
print()
if not self.message_stack:
# break out of pub/sub loop
raise Exception
message, wait_time = self.message_stack.pop()
time.sleep(wait_time)
return IntersectClientCallback(messages_to_send=[message])
if __name__ == '__main__':
from_config_file = {
'brokers': [
{
'username': 'intersect_username',
'password': 'intersect_password',
'port': 1883,
'protocol': 'mqtt5.0',
},
],
}
# The counter will start after the initial message.
# If the service is already active and counting, this may do nothing.
initial_messages = [
IntersectDirectMessageParams(
destination='counting-organization.counting-facility.counting-system.counting-subsystem.counting-service',
operation='CountingExample.start_count',
payload=None,
)
]
config = IntersectClientConfig(
initial_message_event_config=IntersectClientCallback(messages_to_send=initial_messages),
**from_config_file,
)
orchestrator = SampleOrchestrator()
client = IntersectClient(
config=config,
user_callback=orchestrator.client_callback,
)
default_intersect_lifecycle_loop(
client,
)
Running it yourself: Using a Python environment¶
# to suppress progress messages and only show stdout, you can add
# 2>/dev/null
# to the end of your command on UNIX systems
# First, run the service
python -m examples.2_counting.counting_service
# Next, run the client in a separate terminal
python -m examples.2_counting.counting_client
After several seconds, the output from counting_client will look something like this:
Source: counting-organization.counting-facility.counting-system.counting-subsystem.counting-service
Operation: start_count
Payload: {'state': {'count': 1, 'counting': True}, 'success': True}
Source: counting-organization.counting-facility.counting-system.counting-subsystem.counting-service
Operation: stop_count
Payload: {'state': {'count': 6, 'counting': False}, 'success': True}
Source: counting-organization.counting-facility.counting-system.counting-subsystem.counting-service
Operation: start_count
Payload: {'state': {'count': 7, 'counting': True}, 'success': True}
Source: counting-organization.counting-facility.counting-system.counting-subsystem.counting-service
Operation: reset_count
Payload: {'count': 10, 'counting': True}
Source: counting-organization.counting-facility.counting-system.counting-subsystem.counting-service
Operation: reset_count
Payload: {'count': 6, 'counting': True}
Source: counting-organization.counting-facility.counting-system.counting-subsystem.counting-service
Operation: start_count
Payload: {'state': {'count': 1, 'counting': True}, 'success': True}
Source: counting-organization.counting-facility.counting-system.counting-subsystem.counting-service
Operation: stop_count
Payload: {'state': {'count': 4, 'counting': False}, 'success': True}
The client will exit automatically; you will have to use Ctrl+C on the terminal running the service process.
You can also run the client again while NOT killing the service; the output will look different from the last time.
Running it yourself: Using Docker¶
First, you will need to build the latest INTERSECT-SDK image; run the following command from the repository root:
docker build -t intersect-sdk .
Then you can run the examples like this:
# to suppress progress messages and only show stdout, you can add
# 2>/dev/null
# to the end of your command on UNIX systems
# First, run the service
docker run --rm -it --name intersect-service --network host intersect-sdk python -m examples.1_hello_world.hello_service
# Next, run the client in a separate terminal
docker run --rm -it --name intersect-client --network host intersect-sdk python -m examples.1_hello_world.hello_client
After several seconds, the output from counting_client will look something like this:
Source: counting-organization.counting-facility.counting-system.counting-subsystem.counting-service
Operation: start_count
Payload: {'state': {'count': 1, 'counting': True}, 'success': True}
Source: counting-organization.counting-facility.counting-system.counting-subsystem.counting-service
Operation: stop_count
Payload: {'state': {'count': 6, 'counting': False}, 'success': True}
Source: counting-organization.counting-facility.counting-system.counting-subsystem.counting-service
Operation: start_count
Payload: {'state': {'count': 7, 'counting': True}, 'success': True}
Source: counting-organization.counting-facility.counting-system.counting-subsystem.counting-service
Operation: reset_count
Payload: {'count': 10, 'counting': True}
Source: counting-organization.counting-facility.counting-system.counting-subsystem.counting-service
Operation: reset_count
Payload: {'count': 6, 'counting': True}
Source: counting-organization.counting-facility.counting-system.counting-subsystem.counting-service
Operation: start_count
Payload: {'state': {'count': 1, 'counting': True}, 'success': True}
Source: counting-organization.counting-facility.counting-system.counting-subsystem.counting-service
Operation: stop_count
Payload: {'state': {'count': 4, 'counting': False}, 'success': True}
The client will exit automatically; you will have to use Ctrl+C on the terminal running the service process.
You can also run the client again while NOT killing the service; the output will look different from the last time.
(If you ran Docker in detached mode (-d), you can instead run docker stop intersect-client.)