Ping Pong Events¶
This is an example of a Client communicating with two Services, each which emit their own events. It’s also an example of how a Client can dynamically start/stop listening to events from a specific Service during its runtime.
In this example, the Ping service advertises “ping” events and the Pong service advertises “pong” events. We listen to one service at a time, and switch services after receiving an event.
To run these locally, you must first make sure that you have all necessary backing services running; see the Installation and usage page for details on how to do this.
Afterwards, you can start the service in one terminal, wait a brief moment, then finally run the client in another terminal.
Service walkthrough¶
There are two Services, but their code does not significantly differ:
—
Ping Service:
"""Simple service which regularly emits an event."""
import logging
import threading
import time
from typing import ClassVar
from intersect_sdk import IntersectEventDefinition
from .service_runner import P_ngBaseCapabilityImplementation, run_service
logging.basicConfig(level=logging.INFO)
class PingCapabilityImplementation(P_ngBaseCapabilityImplementation):
"""Basic capability definition, very similar to the other capability except for the type of event it emits."""
intersect_sdk_capability_name = 'ping'
intersect_sdk_events: ClassVar[dict[str, IntersectEventDefinition]] = {
'ping': IntersectEventDefinition(event_type=str),
}
def after_service_startup(self) -> None:
"""Called after service startup."""
self.counter_thread = threading.Thread(
target=self.ping_event,
daemon=True,
name='counter_thread',
)
self.counter_thread.start()
def ping_event(self) -> None:
"""Send out a ping event every 2 seconds."""
while True:
time.sleep(2.0)
self.intersect_sdk_emit_event('ping', 'ping')
if __name__ == '__main__':
run_service(PingCapabilityImplementation())
Pong Service:
"""Simple service which regularly emits an event."""
import logging
import threading
import time
from typing import ClassVar
from intersect_sdk import IntersectEventDefinition
from .service_runner import P_ngBaseCapabilityImplementation, run_service
logging.basicConfig(level=logging.INFO)
class PongCapabilityImplementation(P_ngBaseCapabilityImplementation):
"""Basic capability definition, very similar to the other capability except for the type of event it emits."""
intersect_sdk_capability_name = 'pong'
intersect_sdk_events: ClassVar[dict[str, IntersectEventDefinition]] = {
'pong': IntersectEventDefinition(event_type=str),
}
def after_service_startup(self) -> None:
"""Called after service startup."""
self.counter_thread = threading.Thread(
target=self.pong_event,
daemon=True,
name='counter_thread',
)
self.counter_thread.start()
def pong_event(self) -> None:
"""Send out a pong event every 2 seconds."""
while True:
time.sleep(2.0)
self.intersect_sdk_emit_event('pong', 'pong')
if __name__ == '__main__':
run_service(PongCapabilityImplementation())
Both of these Services share some common code:
"""Common functionality between all of the Service classes."""
import logging
from abc import ABC, abstractmethod
from intersect_sdk import (
HierarchyConfig,
IntersectBaseCapabilityImplementation,
IntersectService,
IntersectServiceConfig,
default_intersect_lifecycle_loop,
)
logger = logging.getLogger(__name__)
class P_ngBaseCapabilityImplementation(IntersectBaseCapabilityImplementation, ABC): # noqa: N801
"""Common interface definitions, no implementations here."""
@abstractmethod
def after_service_startup(self) -> None:
"""Common function implemented by the various P-NG capabilities after the service starts up."""
def run_service(capability: P_ngBaseCapabilityImplementation) -> None:
"""The idea behind the two services is that each one will emit a unique event.
The interesting configuration mostly happens in the Client, look at that one for details.
"""
from_config_file = {
'brokers': [
{
'username': 'intersect_username',
'password': 'intersect_password',
'port': 1883,
'protocol': 'mqtt5.0',
},
],
}
service_name = capability.__class__.__name__[:4].lower()
config = IntersectServiceConfig(
hierarchy=HierarchyConfig(
organization='p-ng-organization',
facility='p-ng-facility',
system='p-ng-system',
subsystem='p-ng-subsystem',
service=f'{service_name}-service',
),
status_interval=30.0,
**from_config_file,
)
service = IntersectService([capability], config)
logger.info('Starting %s_service, use Ctrl+C to exit.', service_name)
"""
Here, we provide the after_service_startup function on the capability as a post startup callback.
This ensures we don't emit any events until we've actually started up our Service.
"""
default_intersect_lifecycle_loop(
service, post_startup_callback=capability.after_service_startup
)
Client walkthrough¶
import logging
from intersect_sdk import (
INTERSECT_RESPONSE_VALUE,
IntersectClient,
IntersectClientCallback,
IntersectClientConfig,
IntersectEventMessageParams,
default_intersect_lifecycle_loop,
)
logging.basicConfig(level=logging.INFO)
def get_event_message_params(png: str) -> IntersectEventMessageParams:
"""Generate the event information we'll handle based on the value of 'png'.
We always want to pass in a new object; don't mutate a persistent object.
"""
return IntersectEventMessageParams(
hierarchy=f'p-ng-organization.p-ng-facility.p-ng-system.p-ng-subsystem.{png}-service',
capability_name=png,
event_name=png,
)
class SampleOrchestrator:
"""Basic orchestrator with an event callback.
The event callback switches between listening to 'ping' and 'pong' events; it never listens to both simultaneously.
"""
MAX_EVENTS_TO_PROCESS = 4
def __init__(self) -> None:
"""Straightforward constructor, just initializes global variable which counts events."""
self.events_encountered = 0
def event_callback(
self,
_source: str,
_capability_name: str,
event_name: str,
payload: INTERSECT_RESPONSE_VALUE,
) -> IntersectClientCallback:
"""Handles events from two Services at once.
With this handler, the order of instructions goes like this:
1. Listen for a ping event.
2. On ping event - Start listening for pong events and stop listening for ping events.
3. On pong event - Start listening for ping events and stop listening for pong events.
4. Repeat steps 2-3 until we have processed the maximum number of events we want to.
"""
self.events_encountered += 1
print(payload)
if self.events_encountered == self.MAX_EVENTS_TO_PROCESS:
raise Exception
# In this case, we can check any of the source, the capability_name, or the event_name. For maximum robustness, you should check all three values.
if event_name == 'ping':
return IntersectClientCallback(
services_to_start_listening_for_events=[get_event_message_params('pong')],
services_to_stop_listening_for_events=[get_event_message_params('ping')],
)
return IntersectClientCallback(
services_to_start_listening_for_events=[get_event_message_params('ping')],
services_to_stop_listening_for_events=[get_event_message_params('pong')],
)
if __name__ == '__main__':
from_config_file = {
'brokers': [
{
'username': 'intersect_username',
'password': 'intersect_password',
'port': 1883,
'protocol': 'mqtt5.0',
},
],
}
# we initially only listen for ping service events,
config = IntersectClientConfig(
initial_message_event_config=IntersectClientCallback(
services_to_start_listening_for_events=[
get_event_message_params('ping'),
]
),
**from_config_file,
)
orchestrator = SampleOrchestrator()
client = IntersectClient(
config=config,
event_callback=orchestrator.event_callback,
)
default_intersect_lifecycle_loop(
client,
)
Running it yourself: Using a Python environment¶
# to suppress progress messages and only show stdout, you can add
# 2>/dev/null
# to the end of your command on UNIX systems
# First, start up the ping service in one terminal
python -m examples.3_ping_pong_events.ping_service
# Next, start up the pong service in a second terminal
python -m examples.3_ping_pong_events.pong_service
# Finally, run the client in a third terminal
python -m examples.3_ping_pong_events.ping_pong_client
After a few seconds, the output from ping_pong_client will look something like this:
ping
pong
ping
pong
The client will exit automatically; you will have to use Ctrl+C on the terminal running the service process.
You can also run the client again while NOT killing the service; the output will look different from the last time.
Running it yourself: Using Docker¶
First, you will need to build the latest INTERSECT-SDK image; run the following command from the repository root:
docker build -t intersect-sdk .
Then you can run the examples like this:
# to suppress progress messages and only show stdout, you can add
# 2>/dev/null
# to the end of your command on UNIX systems
# First, start up the ping service in one terminal
docker run --rm -it --name intersect-service --network host intersect-sdk python examples.3_ping_pong_events.ping_service
# Next, start up the pong service in a second terminal
docker run --rm -it --name intersect-service --network host intersect-sdk python examples.3_ping_pong_events.pong_service
# Finally, run the client in a third terminal
docker run --rm -it --name intersect-service --network host intersect-sdk python examples.3_ping_pong_events.ping_pong_client
After several seconds, the output from ping_pong_client will look something like this:
ping
pong
ping
pong
The client will exit automatically; you will have to use Ctrl+C on the terminal running the service process.
You can also run the client again while NOT killing the service; the output will look different from the last time.
(If you ran Docker in detached mode (-d), you can instead run docker stop intersect-client.)