Skip to main content

Overview

FlowContext is passed as the second argument to all step handlers. It provides access to enqueue functionality, state management, streams, logging, and trigger information.

Class: FlowContext[TEnqueueData]

from motia import FlowContext

async def handler(input_data, ctx: FlowContext):
    # Access context properties and methods
    await ctx.enqueue({"topic": "next", "data": input_data})
Generic type parameter TEnqueueData represents the type of data being enqueued.

Properties

enqueue
Callable[[Any], Awaitable[None]]
Function to enqueue events to other steps. Takes event data and returns a Promise.
trace_id
str
Unique identifier for tracing this execution across steps.
state
StateManager
State manager for persisting and retrieving scoped state. See StateManager.
logger
Logger
Logger instance for structured logging.
streams
dict[str, Stream[Any]]
Dictionary of available streams, keyed by stream name.
trigger
TriggerInfo
Information about the trigger that fired this handler.
input_value
Any
The raw input value received by the handler.

Methods

is_queue

def is_queue(self) -> bool
Returns True if the trigger is a queue trigger.

is_api

def is_api(self) -> bool
Returns True if the trigger is an HTTP/API request.

is_cron

def is_cron(self) -> bool
Returns True if the trigger is a cron event.

is_state

def is_state(self) -> bool
Returns True if the trigger is a state change.

is_stream

def is_stream(self) -> bool
Returns True if the trigger is a stream event.

get_data

def get_data(self) -> Any
Extract the data payload from the input, regardless of trigger type.
return
Any
  • For HTTP triggers: returns request.body
  • For queue triggers: returns the queue data directly
  • For cron triggers: returns None
  • For state/stream triggers: returns the input value

match

async def match(self, handlers: dict[str, Callable]) -> Any
Match and execute handlers based on trigger type. Useful for multi-trigger steps.
handlers
dict[str, Callable]
required
Dictionary mapping trigger types to handler functions:
  • "queue": async (input) -> Any
  • "http" or "api": async (request) -> ApiResponse
  • "cron": async () -> Any
  • "state": async (input) -> Any
  • "stream": async (input) -> Any
  • "default": async (input) -> Any (fallback)
return
Any
The result from the matched handler.

Class: StateManager

Manages scoped state for your application.

Methods

get

async def get(self, scope: str, key: str) -> Any | None
Get a value from the state.
scope
str
required
The scope/group identifier.
key
str
required
The item key within the scope.
return
Any | None
The stored value, or None if not found.

set

async def set(self, scope: str, key: str, value: Any) -> Any
Set a value in the state.
scope
str
required
The scope/group identifier.
key
str
required
The item key within the scope.
value
Any
required
The value to store.
return
Any
The stored value.

update

async def update(self, scope: str, key: str, ops: list[dict[str, Any]]) -> Any
Update a value using update operations.
scope
str
required
The scope/group identifier.
key
str
required
The item key within the scope.
ops
list[dict[str, Any]]
required
List of update operations to apply.
return
Any
The updated value.

delete

async def delete(self, scope: str, key: str) -> Any | None
Delete a value from the state.
scope
str
required
The scope/group identifier.
key
str
required
The item key to delete.
return
Any | None
The deleted value, or None if not found.

list

async def list(self, scope: str) -> list[Any]
List all items in a scope.
scope
str
required
The scope/group identifier.
return
list[Any]
List of all items in the scope.

list_groups

async def list_groups(self) -> list[str]
List all scope IDs.
return
list[str]
List of all scope identifiers.

clear

async def clear(self, scope: str) -> None
Clear all items in a scope.
scope
str
required
The scope/group identifier to clear.

Class: Stream[TData]

Manages distributed stream state.

Methods

get

async def get(self, group_id: str, item_id: str) -> TData | None
Get an item from the stream.
group_id
str
required
The group identifier.
item_id
str
required
The item identifier.
return
TData | None
The stream item, or None if not found.

set

async def set(self, group_id: str, item_id: str, data: TData) -> Any
Set an item in the stream.
group_id
str
required
The group identifier.
item_id
str
required
The item identifier.
data
TData
required
The data to store.
return
Any
Result of the set operation.

delete

async def delete(self, group_id: str, item_id: str) -> None
Delete an item from the stream.

list

async def list(self, group_id: str) -> list[TData]
List all items in a group.
group_id
str
required
The group identifier.
return
list[TData]
List of all items in the group.

update

async def update(self, group_id: str, item_id: str, ops: list[dict[str, Any]]) -> Any
Update an item using update operations.

list_groups

async def list_groups(self) -> list[str]
List all group IDs for the stream.

Type: TriggerInfo

Information about the trigger that fired.

Fields

type
'http' | 'queue' | 'cron' | 'state' | 'stream'
The trigger type.
index
int | None
Index of the trigger in the step’s trigger list.
path
str | None
HTTP path (API triggers only).
method
str | None
HTTP method (API triggers only).
topic
str | None
Queue topic (queue triggers only).
expression
str | None
Cron expression (cron triggers only).

Usage Examples

Using State

async def handler(data, ctx: FlowContext):
    # Get state
    counter = await ctx.state.get("global", "request-count") or 0
    
    # Update state
    await ctx.state.set("global", "request-count", counter + 1)
    
    # List all items in scope
    items = await ctx.state.list("global")

Using Streams

async def handler(data, ctx: FlowContext):
    chat_stream = ctx.streams["chat"]
    
    # Add message to stream
    await chat_stream.set(
        group_id="room-123",
        item_id="msg-456",
        data={"text": "Hello", "user": "alice"}
    )
    
    # Get all messages in room
    messages = await chat_stream.list(group_id="room-123")

Enqueuing Events

async def handler(data, ctx: FlowContext):
    # Process data
    result = process(data)
    
    # Enqueue to next step
    await ctx.enqueue({
        "topic": "next-step",
        "data": result
    })

Multi-Trigger Routing

async def handler(input_data, ctx: FlowContext):
    return await ctx.match({
        "http": async def handle_http(req):
            return {"status": 200, "body": {"data": req.body}},
        
        "queue": async def handle_queue(data):
            await ctx.enqueue({"topic": "processed", "data": data}),
        
        "cron": async def handle_cron():
            await cleanup_old_data(),
        
        "default": async def handle_default(data):
            ctx.logger.info(f"Unhandled trigger: {ctx.trigger.type}")
    })

Checking Trigger Type

async def handler(input_data, ctx: FlowContext):
    if ctx.is_api():
        # Handle HTTP request
        return {"status": 200, "body": {"ok": True}}
    
    elif ctx.is_queue():
        # Handle queue message
        data = ctx.get_data()
        await process_queue_data(data)
    
    elif ctx.is_cron():
        # Handle scheduled task
        await run_periodic_task()