Counting With Events¶
This is an example of a Service which doesn’t accept requests from clients, but will continually broadcast an event. Clients can subscribe to these events.
In this example, the Service will send out an “increment_counter” event containing a simple integer value every two seconds. The value will always be 3 times greater than the last emitted value.
To run these locally, you must first make sure that you have all necessary backing services running; see the Installation and usage page for details on how to do this.
Afterwards, you can start the service in one terminal, wait a brief moment, then finally run the client in another terminal.
Service walkthrough¶
import logging
import threading
import time
from typing import ClassVar
from intersect_sdk import (
HierarchyConfig,
IntersectBaseCapabilityImplementation,
IntersectEventDefinition,
IntersectService,
IntersectServiceConfig,
default_intersect_lifecycle_loop,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CountingServiceCapabilityImplementation(IntersectBaseCapabilityImplementation):
"""This example is meant to showcase a simple event emitter.
This service does not have any endpoints, but simply fires off a single event every three seconds.
"""
intersect_sdk_capability_name = 'CountingExample'
intersect_sdk_events: ClassVar[dict[str, IntersectEventDefinition]] = {
'increment_counter': IntersectEventDefinition(event_type=int),
}
def after_service_startup(self) -> None:
"""This is a 'post-initialization' method.
We want to call it AFTER we start up the Service. Until that point, we DON'T want to emit any events.
"""
self.counter = 1
self.counter_thread = threading.Thread(
target=self.increment_counter_function,
daemon=True,
name='counter_thread',
)
self.counter_thread.start()
def increment_counter_function(self) -> None:
"""This is the event thread which continually emits count events.
Every 3 seconds, we fire off a new 'increment_counter' event; each time, we are firing off a value 3 times greater than before.
We have to configure our event on the intersect_sdk_events class variable. Since 'increment_counter' is emitting an integer value,
we need to specify the emission type on the class variable. Failure to register the event and its type will mean that the event won't
be emitted.
"""
while True:
time.sleep(2.0)
self.counter *= 3
self.intersect_sdk_emit_event('increment_counter', self.counter)
if __name__ == '__main__':
from_config_file = {
'brokers': [
{
'username': 'intersect_username',
'password': 'intersect_password',
'port': 1883,
'protocol': 'mqtt5.0',
},
],
}
config = IntersectServiceConfig(
hierarchy=HierarchyConfig(
organization='counting-organization',
facility='counting-facility',
system='counting-system',
subsystem='counting-subsystem',
service='counting-service',
),
status_interval=30.0,
**from_config_file,
)
capability = CountingServiceCapabilityImplementation()
service = IntersectService([capability], config)
logger.info('Starting counting_service, use Ctrl+C to exit.')
"""
Here, we provide the after_service_startup function on the capability as a post startup callback.
This ensures we don't emit any events until we've actually started up our Service.
"""
default_intersect_lifecycle_loop(
service, post_startup_callback=capability.after_service_startup
)
Client walkthrough¶
import logging
from intersect_sdk import (
INTERSECT_RESPONSE_VALUE,
IntersectClient,
IntersectClientCallback,
IntersectClientConfig,
IntersectEventMessageParams,
default_intersect_lifecycle_loop,
)
logging.basicConfig(level=logging.INFO)
class SampleOrchestrator:
"""This class contains the callback function.
It uses a class because we want to modify our own state from the callback function.
State is managed through counting the events we receive - we'll process a certain number of events, then stop.
"""
MAX_EVENTS_TO_PROCESS = 3
def __init__(self) -> None:
"""Straightforward constructor, initializes a global variable we modify on getting an event."""
self.events_encountered = 0
def event_callback(
self,
_source: str,
_capability_name: str,
_event_name: str,
payload: INTERSECT_RESPONSE_VALUE,
) -> None:
"""Handles events from the Counting Service.
We want to process 3 events and then stop processing events. With each event, we'll print out the emitted event.
In this case, since we don't send out any messages or switch the events we listen to, we can just return None.
If we DID want to modify this, we would return an IntersectClientCallback object.
Params:
- _source: the source of the event (in this instance, it will always be the counting service)
- _capability_name: the name of the capability from the service which emitted the event. In this case it will always be 'CountingExample'.
- _event_name: the name of the event. In this case it will always be 'increment_counter'.
- payload: the actual value of the emitted event. In this case it will always be an integer (3, 27, 81, 243, ...)
"""
# this check isn't necessary in this instance, but given that events are completely asynchronous
# it can be good to verify that we only handle the intended events
if self.events_encountered > self.MAX_EVENTS_TO_PROCESS:
return
self.events_encountered += 1
print(payload)
if self.events_encountered == self.MAX_EVENTS_TO_PROCESS:
raise Exception
if __name__ == '__main__':
from_config_file = {
'brokers': [
{
'username': 'intersect_username',
'password': 'intersect_password',
'port': 1883,
'protocol': 'mqtt5.0',
},
],
}
# start listening to events from the counting service
events = [
IntersectEventMessageParams(
hierarchy='counting-organization.counting-facility.counting-system.counting-subsystem.counting-service',
capability_name='CountingExample',
event_name='increment_counter',
)
]
config = IntersectClientConfig(
initial_message_event_config=IntersectClientCallback(
services_to_start_listening_for_events=events,
),
**from_config_file,
)
orchestrator = SampleOrchestrator()
client = IntersectClient(
config=config,
event_callback=orchestrator.event_callback,
)
default_intersect_lifecycle_loop(
client,
)
Running it yourself: Using a Python environment¶
# to suppress progress messages and only show stdout, you can add
# 2>/dev/null
# to the end of your command on UNIX systems
# First, run the service
python -m examples.2_counting_events.counting_service
# Next, run the client in a separate terminal
python -m examples.2_counting_events.counting_client
After a few seconds, the output from counting_client will look something like this (the values will be higher if you waited longer between starting the client and the service):
3
9
27
The client will exit automatically; you will have to use Ctrl+C on the terminal running the service process.
You can also run the client again while NOT killing the service; the output will look different from the last time.
Running it yourself: Using Docker¶
First, you will need to build the latest INTERSECT-SDK image; run the following command from the repository root:
docker build -t intersect-sdk .
Then you can run the examples like this:
# to suppress progress messages and only show stdout, you can add
# 2>/dev/null
# to the end of your command on UNIX systems
# First, run the service
docker run --rm -it --name intersect-service --network host intersect-sdk python -m examples.1_hello_world.hello_service
# Next, run the client in a separate terminal
docker run --rm -it --name intersect-client --network host intersect-sdk python -m examples.1_hello_world.hello_client
After several seconds, the output from counting_client will look something like this (the values will be higher if you waited longer between starting the client and the service):
3
9
27
The client will exit automatically; you will have to use Ctrl+C on the terminal running the service process.
You can also run the client again while NOT killing the service; the output will look different from the last time.
(If you ran Docker in detached mode (-d), you can instead run docker stop intersect-client.)