Skip to main content

Overview

The Motia class is the centralized runtime manager for Motia applications. It manages steps, streams, and authentication handlers.

Class: Motia

from motia import Motia

motia = Motia()

Constructor

__init__
() -> None
Creates a new Motia runtime instance.

Methods

add_stream

def add_stream(
    self,
    config: StreamConfig,
    file_path: str
) -> Stream[Any]
Add a stream to the runtime.
config
StreamConfig
required
Stream configuration object with name, schema, and lifecycle handlers.
file_path
str
required
Path to the file where the stream is defined.
return
Stream[Any]
The created stream instance.

set_authenticate

def set_authenticate(
    self,
    handler: Callable[..., Awaitable[StreamAuthResult | bool]]
) -> None
Set the global stream authentication handler.
handler
Callable[[StreamAuthInput, FlowContext], Awaitable[StreamAuthResult | bool]]
required
Async function that authenticates stream connections. Returns StreamAuthResult or a boolean.

add_step

def add_step(
    self,
    config: StepDefinition | StepConfig | dict[str, Any],
    step_path: str,
    handler: Callable[..., Awaitable[Any]] | None = None,
    file_path: str | None = None
) -> None
Add a step to the runtime and register with III engine.
config
StepDefinition | StepConfig | dict
required
Step configuration. Can be a StepDefinition (from step() or multi_trigger_step()), a StepConfig object, or a dictionary.
step_path
str
required
Identifier path for the step.
handler
Callable[..., Awaitable[Any]] | None
Optional async handler function. Required if config is not a StepDefinition.
file_path
str | None
Optional file path where the step is defined. Defaults to step_path.
The handler is automatically extracted from StepDefinition. For StepConfig or dict configs, you must provide the handler argument.

initialize

async def initialize(self) -> None
Initialize the runtime and register bridge functions. This method:
  • Registers the global authentication handler (if set)
  • Sets up stream join/leave handlers
  • Registers step endpoints with the III engine
return
None
Returns nothing. Must be called before the runtime can process events.

Properties

streams
dict[str, Stream[Any]]
Dictionary of registered streams, keyed by stream name.

Usage Examples

Basic Setup

from motia import Motia, step, StepConfig
from motia.triggers import http

motia = Motia()

# Define a step
my_step = step(
    StepConfig(
        name="my-step",
        triggers=[http("GET", "/hello")]
    ),
    handler=async def handle(req, ctx):
        return {"status": 200, "body": {"message": "Hello"}}
)

# Register the step
motia.add_step(my_step, "my-step")

# Initialize
await motia.initialize()

With Streams

from motia import Motia
from motia.types_stream import StreamConfig

motia = Motia()

# Add a stream
stream_config = StreamConfig(name="chat", description="Chat stream")
chat_stream = motia.add_stream(stream_config, "streams.py")

# Set authentication
async def authenticate(input_data, ctx):
    token = input_data.headers.get("authorization")
    if token == "valid-token":
        return {"authorized": True, "context": {"user_id": "123"}}
    return {"authorized": False}

motia.set_authenticate(authenticate)

await motia.initialize()
  • Step - Creating step definitions
  • Context - Flow context for handlers
  • Triggers - Trigger configuration