Overview
The FlowContext provides access to state management, logging, queue operations, and stream utilities within step handlers.
Types
FlowContext
The main context object passed to step handlers.
interface FlowContext<TEnqueueData = never, TInput = unknown> {
enqueue: Enqueuer<TEnqueueData>
traceId: string
state: InternalStateManager
logger: Logger
streams: Streams
trigger: TriggerInfo
is: TypeGuards<TInput>
getData: () => ExtractDataPayload<TInput>
match: <TResult = any>(
handlers: MatchHandlers<TInput, TEnqueueData, TResult>
) => Promise<TResult | undefined>
}
Properties
Function to enqueue messages to topics.type Enqueuer<TData> = (event: TData) => Promise<void>
Usage:await ctx.enqueue({ topic: 'process-order', data: orderData })
Unique identifier for tracing the current execution.
State management interface for persistent storage.get
<T>(groupId: string, key: string) => Promise<T | null>
Retrieve a value from state.
set
<T>(groupId: string, key: string, value: T) => Promise<StreamSetResult<T> | null>
Store a value in state.
update
<T>(groupId: string, key: string, ops: UpdateOp[]) => Promise<StreamSetResult<T> | null>
Update a value using operations.
delete
<T>(groupId: string, key: string) => Promise<T | null>
Delete a value from state.
list
<T>(groupId: string) => Promise<T[]>
List all values in a group.
clear
(groupId: string) => Promise<void>
Clear all values in a group.
Logger instance for structured logging.ctx.logger.info('Processing order', { orderId: '123' })
ctx.logger.error('Failed to process', { error })
Dictionary of configured streams accessible by name.const chatStream = ctx.streams.chat
await chatStream.set(groupId, itemId, data)
Information about the trigger that activated this step.type TriggerInfo = {
type: 'http' | 'queue' | 'cron' | 'state' | 'stream'
index?: number
path?: string
method?: string
topic?: string
expression?: string
}
Type guard functions to check trigger types.{
queue: (input: TInput) => input is ExtractQueueInput<TInput>
http: (input: TInput) => input is ExtractApiInput<TInput>
cron: (input: TInput) => input is never
state: (input: TInput) => input is ExtractStateInput<TInput>
stream: (input: TInput) => input is ExtractStreamInput<TInput>
}
Usage:if (ctx.is.http(input)) {
// input is typed as HTTP request
const body = input.request.body
}
getData
() => ExtractDataPayload<TInput>
Extracts the data payload from input regardless of trigger type.
- For HTTP triggers: returns
request.body
- For queue triggers: returns the data directly
- For cron triggers: returns
undefined
const orderData = ctx.getData() // Works for both queue and HTTP
Pattern matching for handling different trigger types.return ctx.match({
http: async (request) => {
return { status: 200, body: { success: true } }
},
queue: async (data) => {
await processQueueData(data)
},
cron: async () => {
await runScheduledTask()
}
})
ApiRequest
interface ApiRequest<TBody = unknown> {
pathParams: Record<string, string>
queryParams: Record<string, string | string[]>
body: TBody
headers: Record<string, string | string[]>
}
MotiaHttpArgs
interface MotiaHttpArgs<TBody = unknown> {
request: MotiaHttpRequest<TBody>
response: MotiaHttpResponse
}
interface MotiaHttpRequest<TBody = unknown> {
pathParams: Record<string, string>
queryParams: Record<string, string | string[]>
body: TBody
headers: Record<string, string | string[]>
method: string
requestBody: ChannelReader
}
interface MotiaHttpResponse {
status: (statusCode: number) => void
headers: (headers: Record<string, string>) => void
stream: NodeJS.WritableStream
close: () => void
}
type StateTriggerInput<T> = {
type: 'state'
group_id: string
item_id: string
old_value?: T
new_value?: T
}
type StreamTriggerInput<T> = {
type: 'stream'
timestamp: number
streamName: string
groupId: string
id: string
event: StreamEvent<T>
}
type StreamEvent<TData> =
| { type: 'create'; data: TData }
| { type: 'update'; data: TData }
| { type: 'delete'; data: TData }
Response Types
ApiResponse
type ApiResponse<TStatus extends number = number, TBody = any> = {
status: TStatus
headers?: Record<string, string>
body: TBody
}
Match Handlers
MatchHandlers
type MatchHandlers<TInput, TEnqueueData, TResult> = {
queue?: (input: ExtractQueueInput<TInput>) => Promise<void>
http?: (request: ExtractApiInput<TInput>) => Promise<TResult>
cron?: () => Promise<void>
state?: (input: ExtractStateInput<TInput>) => Promise<TResult>
stream?: (input: ExtractStreamInput<TInput>) => Promise<TResult>
default?: (input: TInput) => Promise<TResult | undefined>
}
Stream Configuration
StreamConfig
interface StreamConfig {
name: string
schema: StepSchemaInput
baseConfig: { storageType: 'default' }
onJoin?: (
subscription: StreamSubscription,
context: FlowContext,
authContext?: StreamContext
) => PromiseOrValue<StreamJoinResult>
onLeave?: (
subscription: StreamSubscription,
context: FlowContext,
authContext?: StreamContext
) => PromiseOrValue<void>
}
interface StreamAuthInput {
headers: Record<string, string>
path: string
queryParams: Record<string, string[]>
addr: string
}
Usage Example
import { step, http, queue } from 'motia'
import { z } from 'zod'
export default step(
{
name: 'order-handler',
triggers: [
http('POST', '/orders', {
bodySchema: z.object({ orderId: z.string(), items: z.array(z.string()) })
}),
queue('process-orders', {
input: z.object({ orderId: z.string(), items: z.array(z.string()) })
})
],
enqueues: ['send-confirmation']
},
async (input, ctx) => {
// Extract data regardless of trigger type
const orderData = ctx.getData()
// Use state
await ctx.state.set('orders', orderData.orderId, orderData)
// Log
ctx.logger.info('Processing order', { orderId: orderData.orderId })
// Use streams
await ctx.streams.notifications.set(
'user-123',
'order-update',
{ message: 'Order received' }
)
// Pattern match on trigger type
return ctx.match({
http: async (req) => {
return { status: 200, body: { success: true } }
},
queue: async (data) => {
await ctx.enqueue({
topic: 'send-confirmation',
data: { orderId: data.orderId }
})
}
})
}
)