Integrating with FastAPI

This example showcases how to make your FastAPI Server an INTERSECT Service as well. These ideas can be used with any other framework. The idea is to integrate INTERSECT with FastAPI, not the other way around.

Service walkthrough

The below code is an example of what you can initialize with uvicorn. FastAPI routes and the INTERSECT capability should be able to interact via a two-way data binding. You can use the FastAPI lifespan events to startup and shutdown the INTERSECT Service. You can attach the INTERSECT Capability to the global FastAPI state, and interact with the Capability from FastAPI routes.

"""Main file to start backend server.

This file shows how to implement an INTERSECT Service inside of a FastAPI server.
This is a fairly limited example which shows off a basic request/response handler from INTERSECT, and
also allows you to emit an event from an HTTP POST request.


"""

import logging
import typing
from contextlib import asynccontextmanager

from fastapi import FastAPI, Request, Response

from intersect_sdk import (
    IntersectBaseCapabilityImplementation,
    IntersectEventDefinition,
    IntersectService,
    IntersectServiceConfig,
    intersect_message,
)

from .shared import (
    CAPABILITY_NAME,
    EVENT_NAME,
    MockInputType,
    MockOutputType,
    do_mock_compute,
    get_service_hierarchy,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('fastapi_intersect_asgi_app_server')

################## INTERSECT stuff ####################


class FastAPICapability(IntersectBaseCapabilityImplementation):
    """The INTERSECT-SDK Capability.

    In addition to being added to the INTERSECT Service, this can be accessed via FastAPI via global state.
    """

    intersect_sdk_capability_name = CAPABILITY_NAME
    intersect_sdk_events: typing.ClassVar[dict[str, IntersectEventDefinition]] = {
        EVENT_NAME: IntersectEventDefinition(event_type=MockOutputType),
    }

    @intersect_message
    def intersect_request(self, param: MockInputType) -> MockOutputType:
        """Simple response generation."""
        output = do_mock_compute(param)
        output.message = 'Origin from INTERSECT'
        # returns response over INTERSECT
        return output


################## FASTAPI CORE #####################


@asynccontextmanager
async def fastapi_lifespan(app: FastAPI) -> typing.AsyncGenerator[None, None]:
    """This is the application lifecycle function that you attach to the FastAPI object, use this instead of default_intersect_lifecycle_loop.

    Startup occurs before the 'yield', cleanup after the 'yield'.
    """
    # On startup
    logger.info('Initializing app')

    from_config_file = {
        'brokers': [
            {
                'username': 'intersect_username',
                'password': 'intersect_password',
                'port': 5672,
                'protocol': 'amqp0.9.1',
            },
        ],
    }

    capability = FastAPICapability()
    service = IntersectService(
        [capability],
        IntersectServiceConfig(
            hierarchy=get_service_hierarchy(),
            status_interval=30.0,
            **from_config_file,
        ),
    )
    app.state.capability = capability
    service.startup()

    logger.info('App initialized')

    yield

    # On cleanup
    logger.info('Shutting down gracefully')

    service.shutdown('Shutdown request by user')

    logger.info('Graceful shutdown complete')


app = FastAPI(
    debug=True,
    lifespan=fastapi_lifespan,
)

###################### FASTAPI ROUTES ####################################


@app.post('/')
async def publish_event(request: Request, input_value: MockInputType) -> Response:
    """The FastAPI HTTP POST endpoint, this also publishes an INTERSECT event before returning the response. The response will be received prior to the event."""
    output = do_mock_compute(input_value)
    output.message = 'Origin from FastAPI - INTERSECT'
    # send output back over INTERSECT
    request.app.state.capability.intersect_sdk_emit_event(EVENT_NAME, output)
    output.message = 'Origin from FastAPI - HTTP response'
    return output

Client walkthrough

In this example, the Client is contacting the Service both via the FastAPI POST endpoint (which additionally sends an INTERSECT event) and the request/response mechanism of INTERSECT. Note that in the code’s order of execution, you will first get the request/response back from INTERSECT, then the HTTP response value, and finally the event value from INTERSECT.

"""Client for the FastAPI example.

This Client will submit one request over INTERSECT and one request over HTTP. Both requests will receive responses in INTERSECT.
"""

import argparse
import logging
import time
import urllib.request

from intersect_sdk import (
    INTERSECT_RESPONSE_VALUE,
    IntersectClient,
    IntersectClientCallback,
    IntersectClientConfig,
    IntersectDirectMessageParams,
    IntersectEventMessageParams,
    default_intersect_lifecycle_loop,
)

from .app.shared import (
    CAPABILITY_NAME,
    EVENT_NAME,
    MockInputType,
    MockOutputType,
    get_service_hierarchy,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

server_hierarchy = get_service_hierarchy()


class SampleOrchestrator:
    """Simple orchestrator."""

    def __init__(self) -> None:
        """Initialize a message counter to 0, for interruption later."""
        self.event_counter = 0

    def message_callback(
        self,
        _source: str,
        _operation: str,
        _has_error: bool,
        payload: INTERSECT_RESPONSE_VALUE,
    ) -> None:
        """Received from the INTERSECT message."""
        validated_response = MockOutputType(**payload)
        print(validated_response.message)
        self.event_counter += 1
        if self.event_counter >= 2:
            raise Exception('Client loop ended')  # noqa: EM101, TRY003
        # send no additional messages

    def event_callback(
        self,
        _source: str,
        _capability_name: str,
        _event_name: str,
        payload: INTERSECT_RESPONSE_VALUE,
    ) -> None:
        """Received by the INTERSECT event, this is triggered by the FastAPI request."""
        validated_response = MockOutputType(**payload)
        print(validated_response.message)
        self.event_counter += 1
        if self.event_counter >= 2:
            raise Exception('Client loop ended')  # noqa: TRY003, EM101
        # send no additional messages


def generate_mock_input() -> MockInputType:
    """Dummy request body generator."""
    return MockInputType(param_1=13.37, param_2=42069, param_3=42)


class Poster:
    """Wrapper class to be able to POST a message after startup."""

    def __init__(self, server_uri: str, wait_time: int = 0) -> None:
        """Server URI determined from CLI args. Wait_time parameter is only used to ensure ordering of output in tests, it delays the HTTP POST but not the INTERSECT request/response."""
        self.server_uri = server_uri
        self.wait_time = wait_time

    def publish_one_message(self) -> None:
        """Called once by internal INTERSECT logic. This is called AFTER making the HTTP request."""
        data = generate_mock_input().model_dump_json().encode()
        if self.wait_time:
            time.sleep(self.wait_time)
        request = urllib.request.Request(  # noqa: S310 (validate the URI more carefully in a real world example)
            self.server_uri, data, headers={'Content-Type': 'application/json'}
        )
        with urllib.request.urlopen(request) as req:  # noqa: S310
            print(MockOutputType.model_validate_json(req.read()).message)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--server-scheme', default='http')
    parser.add_argument('--server-host', default='127.0.0.1')
    parser.add_argument('--server-port', type=int, default=8000)
    parser.add_argument('--wait-time', type=int, default=0)
    args = parser.parse_args()

    server_uri = f'{args.server_scheme}://{args.server_host}:{args.server_port}'
    poster = Poster(server_uri, args.wait_time)

    from_config_file = {
        'brokers': [
            {
                'username': 'intersect_username',
                'password': 'intersect_password',
                'port': 5672,
                'protocol': 'amqp0.9.1',
            },
        ],
    }

    # Listen for an event on the exposed service
    events = [
        IntersectEventMessageParams(
            hierarchy=server_hierarchy.hierarchy_string('.'),
            capability_name=CAPABILITY_NAME,
            event_name=EVENT_NAME,
        )
    ]
    initial_messages = [
        IntersectDirectMessageParams(
            destination=server_hierarchy.hierarchy_string('.'),
            operation=f'{CAPABILITY_NAME}.intersect_request',
            payload=generate_mock_input(),
        )
    ]
    config = IntersectClientConfig(
        initial_message_event_config=IntersectClientCallback(
            services_to_start_listening_for_events=events,
            messages_to_send=initial_messages,
        ),
        **from_config_file,
    )
    orchestrator = SampleOrchestrator()
    client = IntersectClient(
        config=config,
        user_callback=orchestrator.message_callback,
        event_callback=orchestrator.event_callback,
    )

    # we should get the response from the initial request/response INTERSECT endpoint back before we get the event message or the HTTP response back
    default_intersect_lifecycle_loop(
        client,
        post_startup_callback=poster.publish_one_message,
    )

Shared code

import datetime

from pydantic import BaseModel

from intersect_sdk import HierarchyConfig

CAPABILITY_NAME = 'fastapi_sample_capability'
EVENT_NAME = 'fastapi_event'


class MockInputType(BaseModel):
    """mock data type with expanded out fields."""

    param_1: float
    param_2: int
    param_3: int


class MockOutputType(BaseModel):
    """mock output data type."""

    computed_value: float
    parse_time: datetime.datetime = datetime.datetime.fromtimestamp(0, datetime.timezone.utc)
    message: str = ''


def do_mock_compute(input_value: MockInputType) -> MockOutputType:
    """Generic computation."""
    return MockOutputType(
        computed_value=(input_value.param_1 + input_value.param_2 * input_value.param_3),
        parse_time=datetime.datetime.now(datetime.timezone.utc),
    )


def get_service_hierarchy() -> HierarchyConfig:
    """Shared hierarchy."""
    return HierarchyConfig(
        organization='fastapi-organization',
        facility='fastapi-facility',
        system='fastapi-system',
        subsystem='fastapi-subsystem',
        service='fastapi-service',
    )

Running it yourself: Using a Python environment

# to suppress progress messages and only show stdout, you can add
# 2>/dev/null
# to the end of your command on UNIX systems

# Start the service in your first terminal
# Runs on port 8000 and 127.0.0.1 by default, change this with --port <PORT> and --host <HOST> respectively
python -m examples.5_fastapi_example.fastapi_service

# Finally, run the client in a second terminal
# By default this assumes the service runs on port 8000 on host 127.0.0.1, change this with --server-port <PORT> and --server-host <HOST> respectively
python -m examples.5_fastapi_example.fastapi_client

After a few seconds, the output from fastapi_client will look something like this:

Origin from INTERSECT
Origin from FastAPI - HTTP response
Origin from FastAPI - INTERSECT

The ordering of the lines may vary.

The client will exit automatically; you will have to use Ctrl+C on the terminal running the service process.

Running it yourself: Using Docker

First, you will need to build the latest INTERSECT-SDK image; run the following command from the repository root:

docker build -t intersect-sdk .

Then you can run the examples like this:

# to suppress progress messages and only show stdout, you can add
# 2>/dev/null
# to the end of your command on UNIX systems

# Start the service in your first terminal
# Runs on port 8000 and 127.0.0.1 by default, change this with --port <PORT> and --host <HOST> respectively (if running in Docker, need to use 0.0.0.0 host)
docker run --rm -it --name intersect-service --network host intersect-sdk python -m examples.5_fastapi_example.fastapi_service --host 0.0.0.0

# Finally, run the client in a second terminal
# By default this assumes the service runs on port 8000 on host 127.0.0.1, change this with --server-port <PORT> and --server-host <HOST> respectively (need to use 0.0.0.0 host in Docker)
docker run --rm -it --name intersect-client --network host intersect-sdk python -m examples.5_fastapi_example.fastapi_client --server-host 0.0.0.0

After several seconds, the output from fastapi_client will look something like this:

Origin from INTERSECT
Origin from FastAPI - HTTP response
Origin from FastAPI - INTERSECT

The ordering of the lines may vary.

The client will exit automatically; you will have to use Ctrl+C on the terminal running the service process.

(If you ran Docker in detached mode (-d), you can instead run docker stop intersect-client.)