Documentation Index Fetch the complete documentation index at: https://mintlify.com/MotiaDev/motia/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The Flow Context (ctx) is passed as the second argument to every step handler. It provides utilities for:
Enqueuing events to other steps
Managing state with key-value storage
Logging structured logs
Accessing streams for real-time data
Trigger information about how the step was invoked
Context Structure
interface FlowContext < TEnqueueData = never , TInput = unknown > {
enqueue : Enqueuer < TEnqueueData >
traceId : string
state : InternalStateManager
logger : Logger
streams : Streams
trigger : TriggerInfo
// Type guards
is : {
queue : ( input : TInput ) => boolean
http : ( input : TInput ) => boolean
cron : ( input : TInput ) => boolean
state : ( input : TInput ) => boolean
stream : ( input : TInput ) => boolean
}
// Utilities
getData : () => ExtractDataPayload < TInput >
match : < TResult >( handlers : MatchHandlers < TInput , TEnqueueData , TResult >) => Promise < TResult >
}
Reference: motia-js/packages/motia/src/types.ts:58-92, motia-py/packages/motia/src/motia/types.py:36-114
ctx.enqueue()
Enqueue events to trigger other steps asynchronously.
Basic Usage
// Enqueue a single event
await ctx . enqueue ({
topic: 'order.created' ,
data: {
orderId: '12345' ,
amount: 99.99 ,
items: [ 'item1' , 'item2' ],
},
})
// For FIFO queues, specify message group ID
await ctx . enqueue ({
topic: 'order.processing' ,
data: { orderId: '12345' },
messageGroupId: 'customer-456' , // Ensures ordering per customer
})
Type Safety
Enqueue topics are type-checked against the step’s enqueues configuration:
export const config = {
name: 'ProcessOrder' ,
enqueues: [ 'order.processed' , 'order.failed' ],
} as const satisfies StepConfig
export const handler : Handlers < typeof config > = async ( input , ctx ) => {
// ✅ Type-safe - 'order.processed' is in enqueues array
await ctx . enqueue ({ topic: 'order.processed' , data: { orderId: '123' } })
// ❌ Type error - 'invalid.topic' not in enqueues array
await ctx . enqueue ({ topic: 'invalid.topic' , data: {} })
}
Reference: motia-js/packages/motia/src/types.ts:28-29, motia-js/playground/steps/hello/hello-api.step.ts:31-39
ctx.state
Access the distributed key-value state management system.
State Operations
// Set a value
await ctx . state . set ( 'users' , 'user-123' , {
id: 'user-123' ,
name: 'Alice' ,
email: 'alice@example.com' ,
})
// Get a value
const user = await ctx . state . get < User >( 'users' , 'user-123' )
// Update a value (JSON Patch operations)
await ctx . state . update ( 'users' , 'user-123' , [
{ op: 'replace' , path: '/name' , value: 'Alice Smith' },
{ op: 'add' , path: '/age' , value: 30 },
])
// Delete a value
await ctx . state . delete ( 'users' , 'user-123' )
// List all values in a group
const allUsers = await ctx . state . list < User >( 'users' )
// Clear entire group
await ctx . state . clear ( 'users' )
State Structure
State is organized hierarchically:
group_id (e.g., "users", "orders", "sessions")
└── item_id (e.g., "user-123", "order-456")
└── value (any JSON-serializable data)
State Triggers
State changes automatically fire state triggers:
// This set operation...
await ctx . state . set ( 'users' , 'user-123' , { name: 'Alice' , status: 'active' })
// ...triggers steps with state triggers:
export const config = {
name: 'OnUserChange' ,
triggers: [
state ({
condition : ( input ) => input . group_id === 'users' ,
}),
],
}
export const handler = async ( input , ctx ) => {
// input.group_id === 'users'
// input.item_id === 'user-123'
// input.new_value === { name: 'Alice', status: 'active' }
// input.old_value === (previous value or undefined)
}
Reference: motia-js/packages/motia/src/types.ts:19-26, motia-js/playground/steps/multi-trigger-example.step.ts:79, motia-py/packages/motia/src/motia/types.py:21-31
ctx.logger
Structured logging with automatic trace correlation.
Logging Levels
ctx . logger . debug ( 'Debug message' , { details: 'low-level info' })
ctx . logger . info ( 'Info message' , { userId: '123' })
ctx . logger . warn ( 'Warning message' , { reason: 'rate limit approaching' })
ctx . logger . error ( 'Error message' , { error: err . message , stack: err . stack })
Automatic Trace Correlation
All logs are automatically tagged with:
traceId : Unique identifier for the entire workflow
spanId : Identifier for the current operation
trigger metadata : Type, path, topic, etc.
Logs are ingested via OpenTelemetry and viewable in the Motia Console.
Reference: motia-js/packages/motia/src/types.ts:62, engine/src/engine/mod.rs:67-75
ctx.streams
Access real-time data streams for collaborative applications.
Stream Operations
// Set a stream item
const result = await ctx . streams . todo . set ( 'inbox' , todoId , {
id: todoId ,
description: 'Buy groceries' ,
createdAt: new Date (). toISOString (),
})
// Get a stream item
const todo = await ctx . streams . todo . get ( 'inbox' , todoId )
// Update a stream item
await ctx . streams . todo . update ( 'inbox' , todoId , [
{ op: 'replace' , path: '/description' , value: 'Buy groceries and cook dinner' },
])
// Delete a stream item
await ctx . streams . todo . delete ( 'inbox' , todoId )
// List items in a group
const allTodos = await ctx . streams . todo . list ( 'inbox' )
Streams vs State
Feature Streams State Use Case Real-time collaboration, UI sync Backend data storage Persistence Optional (configurable) Always persisted WebSocket Updates Yes (live updates to clients) No Access Control Per-stream auth Global Triggers Stream triggers State triggers
Reference: motia-js/playground/steps/todo/create-todo.step.ts:47, engine/src/modules/stream/mod.rs
ctx.trigger
Information about how the step was triggered.
Trigger Info Structure
type TriggerInfo = {
type : 'http' | 'queue' | 'cron' | 'state' | 'stream'
index ?: number // Index in triggers array
// HTTP-specific
path ?: string
method ?: string
// Queue-specific
topic ?: string
// Cron-specific
expression ?: string
}
Usage Example
export const handler : Handlers < typeof config > = async ( input , ctx ) => {
ctx . logger . info ( 'Step triggered' , {
triggerType: ctx . trigger . type ,
triggerPath: ctx . trigger . path ,
triggerTopic: ctx . trigger . topic ,
})
if ( ctx . trigger . type === 'http' ) {
ctx . logger . info ( `HTTP ${ ctx . trigger . method } ${ ctx . trigger . path } ` )
} else if ( ctx . trigger . type === 'queue' ) {
ctx . logger . info ( `Queue topic: ${ ctx . trigger . topic } ` )
}
}
Reference: motia-js/packages/motia/src/types.ts:98-105
ctx.traceId
Unique identifier for distributed tracing across the workflow.
export const handler : Handlers < typeof config > = async ( input , ctx ) => {
// Log the trace ID
ctx . logger . info ( 'Processing request' , { traceId: ctx . traceId })
// Pass to external services for correlation
await fetch ( 'https://api.example.com/process' , {
headers: {
'X-Trace-Id' : ctx . traceId ,
},
})
}
The trace ID follows W3C Trace Context format and is automatically propagated across function calls and enqueued events.
Reference: motia-js/packages/motia/src/types.ts:60, engine/src/protocol.rs:75-80
Type Guards (ctx.is)
Check which trigger type invoked the step.
export const handler : Handlers < typeof config > = async ( input , ctx ) => {
if ( ctx . is . http ( input )) {
// TypeScript narrows input to MotiaHttpArgs
const body = input . request . body
return { status: 200 , body: { message: 'OK' } }
}
if ( ctx . is . queue ( input )) {
// TypeScript narrows input to queue payload
ctx . logger . info ( 'Processing queue event' , input )
}
if ( ctx . is . cron ( input )) {
// input is undefined for cron triggers
ctx . logger . info ( 'Cron trigger fired' )
}
}
Reference: motia-js/packages/motia/src/types.ts:66-72
ctx.getData()
Extract the data payload regardless of trigger type.
export const handler : Handlers < typeof config > = async ( input , ctx ) => {
// Works for both HTTP (extracts request.body) and queue triggers (returns data directly)
const data = ctx . getData ()
// Process data uniformly
const result = await processData ( data )
if ( ctx . is . http ( input )) {
return { status: 200 , body: result }
}
}
Reference: motia-js/packages/motia/src/types.ts:74-88, motia-py/packages/motia/src/motia/types.py:69-83
ctx.match()
Handle multiple trigger types with pattern matching.
export const handler : Handlers < typeof config > = async ( input , ctx ) => {
return ctx . match ({
http : async ({ request }) => {
ctx . logger . info ( 'HTTP request' , { body: request . body })
return { status: 200 , body: { message: 'OK' } }
},
queue : async ( data ) => {
ctx . logger . info ( 'Queue event' , data )
await processQueueEvent ( data )
},
cron : async () => {
ctx . logger . info ( 'Cron triggered' )
await runScheduledTask ()
},
state : async ( stateInput ) => {
ctx . logger . info ( 'State changed' , {
group: stateInput . group_id ,
item: stateInput . item_id ,
})
},
stream : async ( streamInput ) => {
ctx . logger . info ( 'Stream event' , {
stream: streamInput . streamName ,
event: streamInput . event . type ,
})
},
default : async ( input ) => {
ctx . logger . warn ( 'Unknown trigger type' , { input })
},
})
}
Reference: motia-js/packages/motia/src/types.ts:91, motia-js/playground/steps/multi-trigger-example.step.ts:70-152, motia-py/packages/motia/src/motia/types.py:84-113
Best Practices
Always include relevant context in logs. Avoid plain string messages: // ❌ Bad
ctx . logger . info ( 'Order processed' )
// ✅ Good
ctx . logger . info ( 'Order processed' , { orderId , amount , status })
Declare all topics in the enqueues array for compile-time validation: export const config = {
enqueues: [ 'order.created' , 'order.failed' ],
} as const satisfies StepConfig
Use meaningful group IDs for state organization: await ctx . state . set ( 'users' , userId , userData ) // ✅ Good
await ctx . state . set ( 'user-data' , userId , userData ) // ❌ Redundant
Handle errors in async operations
Wrap state, stream, and enqueue operations in try-catch blocks: try {
await ctx . state . set ( 'users' , userId , data )
} catch ( err ) {
ctx . logger . error ( 'Failed to save user' , { error: err . message })
throw err
}
Next Steps
State Management Deep dive into state operations
Streams Learn about real-time streams
Observability Explore logs, traces, and metrics
Workflows Organize steps into workflows