Service¶
Persistent, registered entities into INTERSECT are Services. If you’re looking to create your own orchestrator or utilize a temporary script, please see the intersect_sdk.client module.
The Service is at the heart of a client-developed SDK. It works off of a user-defined capability (which can extend from many capabilities), and automatically integrates the user-defined capability into INTERSECT’s message and schema-based ecosystem.
Users do not need to interact with the service other than through its constructor and the lifecycle “start” and “stop” methods.
The service will automatically handle all system-level interfaces (i.e. status broadcasts). User-level interfaces are all handled on the same messaging channel, so users should not need to configure message brokers or the data layer beyond defining credentials in their “IntersectConfig” class.
Most useful definitions and typings will be found in the service_definitions module.
- final class intersect_sdk.service.IntersectService(capabilities: list[IntersectBaseCapabilityImplementation], config: IntersectServiceConfig)¶
This is the core gateway class for a user’s application into INTERSECT.
A service receives messages from a source, then sends a response message to that same source. On a scientific domain level, this can be expressed through a simple functional style: you take in inputs, you return your output.
- The service automatically integrates all of the following components:
The user-defined capabilities
Any message brokers
Any core INTERSECT data layers
- What it does NOT do:
deal with any custom messaging logic - i.e. Pyro logic, an internal ZeroMQ system, etc. … these should be defined on the capability level.
deal with any application logic - that should be handled by the capabilities
Users should generally not need to interact with objects of this class outside of the constructor and the following functions: startup(), add_startup_messages(), shutdown(), add_shutdown_messages(), create_external_request(). It’s advisable not to mutate the service object yourself, though you can freely log out properties for debugging purposes.
- Note: The ONLY stable function methods are:
the constructor
startup()
add_startup_messages()
shutdown()
add_shutdown_messages()
is_connected()
considered_unrecoverable()
forbid_keys()
allow_keys()
allow_all_functions()
get_blocked_keys()
create_external_request()
No other functions or parameters are guaranteed to remain stable.
- add_shutdown_messages(messages: list[tuple[IntersectDirectMessageParams, Callable[[str, str, bool, list[list[INTERSECT_JSON_VALUE] | dict[str, INTERSECT_JSON_VALUE] | str | bool | int | float | None] | dict[str, list[INTERSECT_JSON_VALUE] | dict[str, INTERSECT_JSON_VALUE] | str | bool | int | float | None] | str | bool | int | float | None | bytes], None] | None]]) None¶
Add request messages to send out to various microservices on shutdown.
- Params:
- messages: list of tuples - first value in tuple contains the messages you want to send out on startup,
second value in tuple contains the callback function for handling the response from the other Service.
- add_startup_messages(messages: list[tuple[IntersectDirectMessageParams, Callable[[str, str, bool, list[list[INTERSECT_JSON_VALUE] | dict[str, INTERSECT_JSON_VALUE] | str | bool | int | float | None] | dict[str, list[INTERSECT_JSON_VALUE] | dict[str, INTERSECT_JSON_VALUE] | str | bool | int | float | None] | str | bool | int | float | None | bytes], None] | None]]) None¶
Add request messages to send out to various microservices when this service starts.
- Params:
- messages: list of tuples - first value in tuple contains the messages you want to send out on startup,
second value in tuple contains the callback function for handling the response from the other Service.
- allow_all_functions() Self¶
Allow every function established in the service, and send out an appropriate message.
If you want to only allow certain functions, use “service.allow_keys()”
- allow_keys(keys: set[str]) Self¶
Allow all functions annotated with any key in “keys”, and send out an appropriate message.
NOTE: if the function has multiple keys, the function will only be “allowed” if all keys which have been forbidden are now allowed. If you want to bulk allow everything, use “service.allow_all_functions()”
- Params:
keys: keys of functions you want to block
- block_all_functions() Self¶
Block every function which _can_ be blocked in the service, and send out an appropriate message.
Note that this does NOT disconnect from INTERSECT, and will not block functions which have no markings.
- final considered_unrecoverable() bool¶
Check if any broker is considered to be in an unrecoverable state.
- Returns:
True if we can’t recover, false otherwise
- create_external_request(request: IntersectDirectMessageParams, response_handler: Callable[[str, str, bool, list[list[INTERSECT_JSON_VALUE] | dict[str, INTERSECT_JSON_VALUE] | str | bool | int | float | None] | dict[str, list[INTERSECT_JSON_VALUE] | dict[str, INTERSECT_JSON_VALUE] | str | bool | int | float | None] | str | bool | int | float | None | bytes], None] | None = None, timeout: float = 300.0) UUID¶
Create an external request that we’ll send to a different Service.
- Params:
request: the request we want to send out, encapsulated as an IntersectDirectMessageParams object
response_handler: optional callback for how we want to handle the response from this request.
timeout: optional value for how long we should wait on the request, in seconds (default: 300 seconds)
- Returns:
generated RequestID associated with your request
- Raises:
- pydantic.ValidationError - if the request parameter isn't valid –
- forbid_keys(keys: set[str]) Self¶
Block all functions annotated with any key in “keys”, and send out an appropriate message.
NOTE: if you want to bulk forbid everything, you may want to call adapter.shutdown() to disconnect entirely from INTERSECT.
- Params:
keys: keys of functions you want to block
- get_blocked_keys() set[str]¶
Returns a set of the function keys which indicate the function should be blocked.
Note that this returns a shallow copy, as the inner reference should NOT be mutated externally.
- final is_connected() bool¶
Check if we’re currently connected to the INTERSECT brokers.
- Returns:
True if we are currently connected to INTERSECT, False if not
- register_event(service: HierarchyConfig, capability_name: str, event_name: str, response_handler: Callable[[str, str, str, list[list[INTERSECT_JSON_VALUE] | dict[str, INTERSECT_JSON_VALUE] | str | bool | int | float | None] | dict[str, list[INTERSECT_JSON_VALUE] | dict[str, INTERSECT_JSON_VALUE] | str | bool | int | float | None] | str | bool | int | float | None | bytes], IntersectClientCallback | None]) None¶
Begin subscribing to events from a different Service.
- Params:
service: HierarchyConfig of the service we want to talk to
capability_name: name of capability which will fire off the event
event_name: name of event to subscribe to
response_handler: callback for how to handle the reception of an event
- final shutdown(reason: str | None = None) Self¶
This function disconnects the service from all INTERSECT systems. It does NOT otherwise drop anything else from memory.
You should call this function whenever your application is terminating. You may also call this function if you need to temporarily disconnect from INTERSECT systems, although this usecase is relatively niche.
The function should only be called in your own lifecycle functions. The default INTERSECT lifecycle loop will call it once it breaks out of the main lifecycle loop.
- Params:
reason: an optional description you may provide as to why the adapter is shutting down.
- final startup() Self¶
This function connects the service to all INTERSECT systems.
You will need to call this function at least once in your application’s lifecycle. You will also need to call it again if you call shutdown() on the service, and want to restart the INTERSECT connection without killing your application’s process.
This function should only be called in your own lifecycle functions. The default INTERSECT lifecycle loop will call it prior to starting the main lifecycle loop.
NOTE: any functions which were manually blocked will continue to be blocked if you restart the service; call “service.allow_all_keys()” to unblock all functions.