Overview
The Motia class is the centralized runtime manager for Motia applications. It manages steps, streams, and authentication handlers.
Class: Motia
from motia import Motia
motia = Motia()
Constructor
Creates a new Motia runtime instance.
Methods
add_stream
def add_stream(
self,
config: StreamConfig,
file_path: str
) -> Stream[Any]
Add a stream to the runtime.
Stream configuration object with name, schema, and lifecycle handlers.
Path to the file where the stream is defined.
The created stream instance.
set_authenticate
def set_authenticate(
self,
handler: Callable[..., Awaitable[StreamAuthResult | bool]]
) -> None
Set the global stream authentication handler.
handler
Callable[[StreamAuthInput, FlowContext], Awaitable[StreamAuthResult | bool]]
required
Async function that authenticates stream connections. Returns StreamAuthResult or a boolean.
add_step
def add_step(
self,
config: StepDefinition | StepConfig | dict[str, Any],
step_path: str,
handler: Callable[..., Awaitable[Any]] | None = None,
file_path: str | None = None
) -> None
Add a step to the runtime and register with III engine.
config
StepDefinition | StepConfig | dict
required
Step configuration. Can be a StepDefinition (from step() or multi_trigger_step()), a StepConfig object, or a dictionary.
Identifier path for the step.
handler
Callable[..., Awaitable[Any]] | None
Optional async handler function. Required if config is not a StepDefinition.
Optional file path where the step is defined. Defaults to step_path.
The handler is automatically extracted from StepDefinition. For StepConfig or dict configs, you must provide the handler argument.
initialize
async def initialize(self) -> None
Initialize the runtime and register bridge functions. This method:
- Registers the global authentication handler (if set)
- Sets up stream join/leave handlers
- Registers step endpoints with the III engine
Returns nothing. Must be called before the runtime can process events.
Properties
Dictionary of registered streams, keyed by stream name.
Usage Examples
Basic Setup
from motia import Motia, step, StepConfig
from motia.triggers import http
motia = Motia()
# Define a step
my_step = step(
StepConfig(
name="my-step",
triggers=[http("GET", "/hello")]
),
handler=async def handle(req, ctx):
return {"status": 200, "body": {"message": "Hello"}}
)
# Register the step
motia.add_step(my_step, "my-step")
# Initialize
await motia.initialize()
With Streams
from motia import Motia
from motia.types_stream import StreamConfig
motia = Motia()
# Add a stream
stream_config = StreamConfig(name="chat", description="Chat stream")
chat_stream = motia.add_stream(stream_config, "streams.py")
# Set authentication
async def authenticate(input_data, ctx):
token = input_data.headers.get("authorization")
if token == "valid-token":
return {"authorized": True, "context": {"user_id": "123"}}
return {"authorized": False}
motia.set_authenticate(authenticate)
await motia.initialize()
- Step - Creating step definitions
- Context - Flow context for handlers
- Triggers - Trigger configuration