Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/MotiaDev/motia/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The Flow Context (ctx) is passed as the second argument to every step handler. It provides utilities for:
  • Enqueuing events to other steps
  • Managing state with key-value storage
  • Logging structured logs
  • Accessing streams for real-time data
  • Trigger information about how the step was invoked

Context Structure

interface FlowContext<TEnqueueData = never, TInput = unknown> {
  enqueue: Enqueuer<TEnqueueData>
  traceId: string
  state: InternalStateManager
  logger: Logger
  streams: Streams
  trigger: TriggerInfo
  
  // Type guards
  is: {
    queue: (input: TInput) => boolean
    http: (input: TInput) => boolean
    cron: (input: TInput) => boolean
    state: (input: TInput) => boolean
    stream: (input: TInput) => boolean
  }
  
  // Utilities
  getData: () => ExtractDataPayload<TInput>
  match: <TResult>(handlers: MatchHandlers<TInput, TEnqueueData, TResult>) => Promise<TResult>
}
Reference: motia-js/packages/motia/src/types.ts:58-92, motia-py/packages/motia/src/motia/types.py:36-114

ctx.enqueue()

Enqueue events to trigger other steps asynchronously.

Basic Usage

// Enqueue a single event
await ctx.enqueue({
  topic: 'order.created',
  data: {
    orderId: '12345',
    amount: 99.99,
    items: ['item1', 'item2'],
  },
})

// For FIFO queues, specify message group ID
await ctx.enqueue({
  topic: 'order.processing',
  data: { orderId: '12345' },
  messageGroupId: 'customer-456', // Ensures ordering per customer
})

Type Safety

Enqueue topics are type-checked against the step’s enqueues configuration:
export const config = {
  name: 'ProcessOrder',
  enqueues: ['order.processed', 'order.failed'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, ctx) => {
  // ✅ Type-safe - 'order.processed' is in enqueues array
  await ctx.enqueue({ topic: 'order.processed', data: { orderId: '123' } })
  
  // ❌ Type error - 'invalid.topic' not in enqueues array
  await ctx.enqueue({ topic: 'invalid.topic', data: {} })
}
Reference: motia-js/packages/motia/src/types.ts:28-29, motia-js/playground/steps/hello/hello-api.step.ts:31-39

ctx.state

Access the distributed key-value state management system.

State Operations

// Set a value
await ctx.state.set('users', 'user-123', {
  id: 'user-123',
  name: 'Alice',
  email: 'alice@example.com',
})

// Get a value
const user = await ctx.state.get<User>('users', 'user-123')

// Update a value (JSON Patch operations)
await ctx.state.update('users', 'user-123', [
  { op: 'replace', path: '/name', value: 'Alice Smith' },
  { op: 'add', path: '/age', value: 30 },
])

// Delete a value
await ctx.state.delete('users', 'user-123')

// List all values in a group
const allUsers = await ctx.state.list<User>('users')

// Clear entire group
await ctx.state.clear('users')

State Structure

State is organized hierarchically:
group_id (e.g., "users", "orders", "sessions")
  └── item_id (e.g., "user-123", "order-456")
        └── value (any JSON-serializable data)

State Triggers

State changes automatically fire state triggers:
// This set operation...
await ctx.state.set('users', 'user-123', { name: 'Alice', status: 'active' })

// ...triggers steps with state triggers:
export const config = {
  name: 'OnUserChange',
  triggers: [
    state({
      condition: (input) => input.group_id === 'users',
    }),
  ],
}

export const handler = async (input, ctx) => {
  // input.group_id === 'users'
  // input.item_id === 'user-123'
  // input.new_value === { name: 'Alice', status: 'active' }
  // input.old_value === (previous value or undefined)
}
Reference: motia-js/packages/motia/src/types.ts:19-26, motia-js/playground/steps/multi-trigger-example.step.ts:79, motia-py/packages/motia/src/motia/types.py:21-31

ctx.logger

Structured logging with automatic trace correlation.

Logging Levels

ctx.logger.debug('Debug message', { details: 'low-level info' })
ctx.logger.info('Info message', { userId: '123' })
ctx.logger.warn('Warning message', { reason: 'rate limit approaching' })
ctx.logger.error('Error message', { error: err.message, stack: err.stack })

Automatic Trace Correlation

All logs are automatically tagged with:
  • traceId: Unique identifier for the entire workflow
  • spanId: Identifier for the current operation
  • trigger metadata: Type, path, topic, etc.
Logs are ingested via OpenTelemetry and viewable in the Motia Console. Reference: motia-js/packages/motia/src/types.ts:62, engine/src/engine/mod.rs:67-75

ctx.streams

Access real-time data streams for collaborative applications.

Stream Operations

// Set a stream item
const result = await ctx.streams.todo.set('inbox', todoId, {
  id: todoId,
  description: 'Buy groceries',
  createdAt: new Date().toISOString(),
})

// Get a stream item
const todo = await ctx.streams.todo.get('inbox', todoId)

// Update a stream item
await ctx.streams.todo.update('inbox', todoId, [
  { op: 'replace', path: '/description', value: 'Buy groceries and cook dinner' },
])

// Delete a stream item
await ctx.streams.todo.delete('inbox', todoId)

// List items in a group
const allTodos = await ctx.streams.todo.list('inbox')

Streams vs State

FeatureStreamsState
Use CaseReal-time collaboration, UI syncBackend data storage
PersistenceOptional (configurable)Always persisted
WebSocket UpdatesYes (live updates to clients)No
Access ControlPer-stream authGlobal
TriggersStream triggersState triggers
Reference: motia-js/playground/steps/todo/create-todo.step.ts:47, engine/src/modules/stream/mod.rs

ctx.trigger

Information about how the step was triggered.

Trigger Info Structure

type TriggerInfo = {
  type: 'http' | 'queue' | 'cron' | 'state' | 'stream'
  index?: number  // Index in triggers array
  
  // HTTP-specific
  path?: string
  method?: string
  
  // Queue-specific
  topic?: string
  
  // Cron-specific
  expression?: string
}

Usage Example

export const handler: Handlers<typeof config> = async (input, ctx) => {
  ctx.logger.info('Step triggered', {
    triggerType: ctx.trigger.type,
    triggerPath: ctx.trigger.path,
    triggerTopic: ctx.trigger.topic,
  })
  
  if (ctx.trigger.type === 'http') {
    ctx.logger.info(`HTTP ${ctx.trigger.method} ${ctx.trigger.path}`)
  } else if (ctx.trigger.type === 'queue') {
    ctx.logger.info(`Queue topic: ${ctx.trigger.topic}`)
  }
}
Reference: motia-js/packages/motia/src/types.ts:98-105

ctx.traceId

Unique identifier for distributed tracing across the workflow.
export const handler: Handlers<typeof config> = async (input, ctx) => {
  // Log the trace ID
  ctx.logger.info('Processing request', { traceId: ctx.traceId })
  
  // Pass to external services for correlation
  await fetch('https://api.example.com/process', {
    headers: {
      'X-Trace-Id': ctx.traceId,
    },
  })
}
The trace ID follows W3C Trace Context format and is automatically propagated across function calls and enqueued events. Reference: motia-js/packages/motia/src/types.ts:60, engine/src/protocol.rs:75-80

Type Guards (ctx.is)

Check which trigger type invoked the step.
export const handler: Handlers<typeof config> = async (input, ctx) => {
  if (ctx.is.http(input)) {
    // TypeScript narrows input to MotiaHttpArgs
    const body = input.request.body
    return { status: 200, body: { message: 'OK' } }
  }
  
  if (ctx.is.queue(input)) {
    // TypeScript narrows input to queue payload
    ctx.logger.info('Processing queue event', input)
  }
  
  if (ctx.is.cron(input)) {
    // input is undefined for cron triggers
    ctx.logger.info('Cron trigger fired')
  }
}
Reference: motia-js/packages/motia/src/types.ts:66-72

ctx.getData()

Extract the data payload regardless of trigger type.
export const handler: Handlers<typeof config> = async (input, ctx) => {
  // Works for both HTTP (extracts request.body) and queue triggers (returns data directly)
  const data = ctx.getData()
  
  // Process data uniformly
  const result = await processData(data)
  
  if (ctx.is.http(input)) {
    return { status: 200, body: result }
  }
}
Reference: motia-js/packages/motia/src/types.ts:74-88, motia-py/packages/motia/src/motia/types.py:69-83

ctx.match()

Handle multiple trigger types with pattern matching.
export const handler: Handlers<typeof config> = async (input, ctx) => {
  return ctx.match({
    http: async ({ request }) => {
      ctx.logger.info('HTTP request', { body: request.body })
      return { status: 200, body: { message: 'OK' } }
    },
    
    queue: async (data) => {
      ctx.logger.info('Queue event', data)
      await processQueueEvent(data)
    },
    
    cron: async () => {
      ctx.logger.info('Cron triggered')
      await runScheduledTask()
    },
    
    state: async (stateInput) => {
      ctx.logger.info('State changed', {
        group: stateInput.group_id,
        item: stateInput.item_id,
      })
    },
    
    stream: async (streamInput) => {
      ctx.logger.info('Stream event', {
        stream: streamInput.streamName,
        event: streamInput.event.type,
      })
    },
    
    default: async (input) => {
      ctx.logger.warn('Unknown trigger type', { input })
    },
  })
}
Reference: motia-js/packages/motia/src/types.ts:91, motia-js/playground/steps/multi-trigger-example.step.ts:70-152, motia-py/packages/motia/src/motia/types.py:84-113

Best Practices

Always include relevant context in logs. Avoid plain string messages:
// ❌ Bad
ctx.logger.info('Order processed')

// ✅ Good
ctx.logger.info('Order processed', { orderId, amount, status })
Declare all topics in the enqueues array for compile-time validation:
export const config = {
  enqueues: ['order.created', 'order.failed'],
} as const satisfies StepConfig
Use meaningful group IDs for state organization:
await ctx.state.set('users', userId, userData)      // ✅ Good
await ctx.state.set('user-data', userId, userData)  // ❌ Redundant
Wrap state, stream, and enqueue operations in try-catch blocks:
try {
  await ctx.state.set('users', userId, data)
} catch (err) {
  ctx.logger.error('Failed to save user', { error: err.message })
  throw err
}

Next Steps

State Management

Deep dive into state operations

Streams

Learn about real-time streams

Observability

Explore logs, traces, and metrics

Workflows

Organize steps into workflows