Ping Pong Events

This is an example of a Client communicating with two Services, each which emit their own events. It’s also an example of how a Client can dynamically start/stop listening to events from a specific Service during its runtime.

In this example, the Ping service advertises “ping” events and the Pong service advertises “pong” events. We listen to one service at a time, and switch services after receiving an event.

To run these locally, you must first make sure that you have all necessary backing services running; see the Installation and usage page for details on how to do this.

Afterwards, you can start the service in one terminal, wait a brief moment, then finally run the client in another terminal.

Service walkthrough

There are two Services, but their code does not significantly differ:

Ping Service:

"""Simple service which regularly emits an event."""

import logging
import threading
import time
from typing import ClassVar

from intersect_sdk import IntersectEventDefinition

from .service_runner import P_ngBaseCapabilityImplementation, run_service

logging.basicConfig(level=logging.INFO)


class PingCapabilityImplementation(P_ngBaseCapabilityImplementation):
    """Basic capability definition, very similar to the other capability except for the type of event it emits."""

    intersect_sdk_capability_name = 'ping'
    intersect_sdk_events: ClassVar[dict[str, IntersectEventDefinition]] = {
        'ping': IntersectEventDefinition(event_type=str),
    }

    def after_service_startup(self) -> None:
        """Called after service startup."""
        self.counter_thread = threading.Thread(
            target=self.ping_event,
            daemon=True,
            name='counter_thread',
        )
        self.counter_thread.start()

    def ping_event(self) -> None:
        """Send out a ping event every 2 seconds."""
        while True:
            time.sleep(2.0)
            self.intersect_sdk_emit_event('ping', 'ping')


if __name__ == '__main__':
    run_service(PingCapabilityImplementation())

Pong Service:

"""Simple service which regularly emits an event."""

import logging
import threading
import time
from typing import ClassVar

from intersect_sdk import IntersectEventDefinition

from .service_runner import P_ngBaseCapabilityImplementation, run_service

logging.basicConfig(level=logging.INFO)


class PongCapabilityImplementation(P_ngBaseCapabilityImplementation):
    """Basic capability definition, very similar to the other capability except for the type of event it emits."""

    intersect_sdk_capability_name = 'pong'
    intersect_sdk_events: ClassVar[dict[str, IntersectEventDefinition]] = {
        'pong': IntersectEventDefinition(event_type=str),
    }

    def after_service_startup(self) -> None:
        """Called after service startup."""
        self.counter_thread = threading.Thread(
            target=self.pong_event,
            daemon=True,
            name='counter_thread',
        )
        self.counter_thread.start()

    def pong_event(self) -> None:
        """Send out a pong event every 2 seconds."""
        while True:
            time.sleep(2.0)
            self.intersect_sdk_emit_event('pong', 'pong')


if __name__ == '__main__':
    run_service(PongCapabilityImplementation())

Both of these Services share some common code:

"""Common functionality between all of the Service classes."""

import logging
from abc import ABC, abstractmethod

from intersect_sdk import (
    HierarchyConfig,
    IntersectBaseCapabilityImplementation,
    IntersectService,
    IntersectServiceConfig,
    default_intersect_lifecycle_loop,
)

logger = logging.getLogger(__name__)


class P_ngBaseCapabilityImplementation(IntersectBaseCapabilityImplementation, ABC):  # noqa: N801
    """Common interface definitions, no implementations here."""

    @abstractmethod
    def after_service_startup(self) -> None:
        """Common function implemented by the various P-NG capabilities after the service starts up."""


def run_service(capability: P_ngBaseCapabilityImplementation) -> None:
    """The idea behind the two services is that each one will emit a unique event.

    The interesting configuration mostly happens in the Client, look at that one for details.
    """
    from_config_file = {
        'brokers': [
            {
                'username': 'intersect_username',
                'password': 'intersect_password',
                'port': 1883,
                'protocol': 'mqtt5.0',
            },
        ],
    }
    service_name = capability.__class__.__name__[:4].lower()
    config = IntersectServiceConfig(
        hierarchy=HierarchyConfig(
            organization='p-ng-organization',
            facility='p-ng-facility',
            system='p-ng-system',
            subsystem='p-ng-subsystem',
            service=f'{service_name}-service',
        ),
        status_interval=30.0,
        **from_config_file,
    )
    service = IntersectService([capability], config)
    logger.info('Starting %s_service, use Ctrl+C to exit.', service_name)

    """
    Here, we provide the after_service_startup function on the capability as a post startup callback.

    This ensures we don't emit any events until we've actually started up our Service.
    """
    default_intersect_lifecycle_loop(
        service, post_startup_callback=capability.after_service_startup
    )

Client walkthrough

import logging

from intersect_sdk import (
    INTERSECT_RESPONSE_VALUE,
    IntersectClient,
    IntersectClientCallback,
    IntersectClientConfig,
    IntersectEventMessageParams,
    default_intersect_lifecycle_loop,
)

logging.basicConfig(level=logging.INFO)


def get_event_message_params(png: str) -> IntersectEventMessageParams:
    """Generate the event information we'll handle based on the value of 'png'.

    We always want to pass in a new object; don't mutate a persistent object.
    """
    return IntersectEventMessageParams(
        hierarchy=f'p-ng-organization.p-ng-facility.p-ng-system.p-ng-subsystem.{png}-service',
        capability_name=png,
        event_name=png,
    )


class SampleOrchestrator:
    """Basic orchestrator with an event callback.

    The event callback switches between listening to 'ping' and 'pong' events; it never listens to both simultaneously.
    """

    MAX_EVENTS_TO_PROCESS = 4

    def __init__(self) -> None:
        """Straightforward constructor, just initializes global variable which counts events."""
        self.events_encountered = 0

    def event_callback(
        self,
        _source: str,
        _capability_name: str,
        event_name: str,
        payload: INTERSECT_RESPONSE_VALUE,
    ) -> IntersectClientCallback:
        """Handles events from two Services at once.

        With this handler, the order of instructions goes like this:

        1. Listen for a ping event.
        2. On ping event - Start listening for pong events and stop listening for ping events.
        3. On pong event - Start listening for ping events and stop listening for pong events.
        4. Repeat steps 2-3 until we have processed the maximum number of events we want to.
        """
        self.events_encountered += 1
        print(payload)
        if self.events_encountered == self.MAX_EVENTS_TO_PROCESS:
            raise Exception

        # In this case, we can check any of the source, the capability_name, or the event_name. For maximum robustness, you should check all three values.
        if event_name == 'ping':
            return IntersectClientCallback(
                services_to_start_listening_for_events=[get_event_message_params('pong')],
                services_to_stop_listening_for_events=[get_event_message_params('ping')],
            )
        return IntersectClientCallback(
            services_to_start_listening_for_events=[get_event_message_params('ping')],
            services_to_stop_listening_for_events=[get_event_message_params('pong')],
        )


if __name__ == '__main__':
    from_config_file = {
        'brokers': [
            {
                'username': 'intersect_username',
                'password': 'intersect_password',
                'port': 1883,
                'protocol': 'mqtt5.0',
            },
        ],
    }

    # we initially only listen for ping service events,
    config = IntersectClientConfig(
        initial_message_event_config=IntersectClientCallback(
            services_to_start_listening_for_events=[
                get_event_message_params('ping'),
            ]
        ),
        **from_config_file,
    )
    orchestrator = SampleOrchestrator()
    client = IntersectClient(
        config=config,
        event_callback=orchestrator.event_callback,
    )
    default_intersect_lifecycle_loop(
        client,
    )

Running it yourself: Using a Python environment

# to suppress progress messages and only show stdout, you can add
# 2>/dev/null
# to the end of your command on UNIX systems

# First, start up the ping service in one terminal
python -m examples.3_ping_pong_events.ping_service

# Next, start up the pong service in a second terminal
python -m examples.3_ping_pong_events.pong_service

# Finally, run the client in a third terminal
python -m examples.3_ping_pong_events.ping_pong_client

After a few seconds, the output from ping_pong_client will look something like this:

ping
pong
ping
pong

The client will exit automatically; you will have to use Ctrl+C on the terminal running the service process.

You can also run the client again while NOT killing the service; the output will look different from the last time.

Running it yourself: Using Docker

First, you will need to build the latest INTERSECT-SDK image; run the following command from the repository root:

docker build -t intersect-sdk .

Then you can run the examples like this:

# to suppress progress messages and only show stdout, you can add
# 2>/dev/null
# to the end of your command on UNIX systems

# First, start up the ping service in one terminal
docker run --rm -it --name intersect-service --network host intersect-sdk python examples.3_ping_pong_events.ping_service

# Next, start up the pong service in a second terminal
docker run --rm -it --name intersect-service --network host intersect-sdk python examples.3_ping_pong_events.pong_service

# Finally, run the client in a third terminal
docker run --rm -it --name intersect-service --network host intersect-sdk python examples.3_ping_pong_events.ping_pong_client

After several seconds, the output from ping_pong_client will look something like this:

ping
pong
ping
pong

The client will exit automatically; you will have to use Ctrl+C on the terminal running the service process.

You can also run the client again while NOT killing the service; the output will look different from the last time.

(If you ran Docker in detached mode (-d), you can instead run docker stop intersect-client.)