Documentation Index
Fetch the complete documentation index at: https://mintlify.com/MotiaDev/motia/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Triggers define how and when a step executes. Motia supports 5 trigger types:
- HTTP - REST API endpoints
- Queue - Event-driven messaging
- Cron - Time-based scheduling
- State - State change reactions
- Stream - Real-time data stream events
Steps can have multiple triggers, allowing them to respond to different invocation methods.
HTTP Triggers
HTTP triggers expose steps as REST API endpoints.
Basic HTTP Trigger
import { http } from 'motia'
import { z } from 'zod'
export const config = {
name: 'CreateTodo',
triggers: [
http('POST', '/todo', {
bodySchema: z.object({
description: z.string(),
dueDate: z.string().optional(),
}),
responseSchema: {
200: z.object({ id: z.string(), description: z.string() }),
400: z.object({ error: z.string() }),
},
}),
],
}
export const handler: Handlers<typeof config> = async ({ request }, ctx) => {
const { description, dueDate } = request.body
if (!description) {
return { status: 400, body: { error: 'Description is required' } }
}
const todoId = `todo-${Date.now()}`
return { status: 200, body: { id: todoId, description } }
}
HTTP Trigger Options
http(method: ApiRouteMethod, path: string, options?: {
bodySchema?: ZodSchema | JsonSchema,
responseSchema?: Record<number, Schema>,
queryParams?: readonly QueryParam[],
middleware?: readonly ApiMiddleware[],
})
Supported Methods: GET, POST, PUT, DELETE, PATCH, OPTIONS, HEAD
Path Parameters: Use :param syntax for dynamic routes:
http('GET', '/users/:userId/posts/:postId')
// Access in handler
const { userId, postId } = request.pathParams
Query Parameters:
http('GET', '/search', {
queryParams: [
{ name: 'q', description: 'Search query' },
{ name: 'limit', description: 'Results limit' },
],
})
// Access in handler
const { q, limit } = request.queryParams
Reference: motia-js/packages/motia/src/triggers.ts:29-37
Queue Triggers
Queue triggers subscribe to event topics for asynchronous, event-driven processing.
Basic Queue Trigger
import { queue } from 'motia'
import { z } from 'zod'
export const config = {
name: 'ProcessOrder',
triggers: [
queue('order.created', {
input: z.object({
orderId: z.string(),
amount: z.number(),
items: z.array(z.string()),
}),
}),
],
enqueues: ['order.processed'],
}
export const handler: Handlers<typeof config> = async (input, ctx) => {
ctx.logger.info('Processing order', { orderId: input.orderId })
// Process the order
const result = await processOrderLogic(input)
// Enqueue next event
await ctx.enqueue({
topic: 'order.processed',
data: { orderId: input.orderId, status: 'completed' },
})
}
Queue Configuration
queue(topic: string, options?: {
input?: ZodSchema | JsonSchema,
infrastructure?: {
queue?: {
type: 'fifo' | 'standard',
maxRetries: number,
visibilityTimeout: number,
delaySeconds: number,
},
},
})
Queue Types:
- Standard: High throughput, at-least-once delivery, best-effort ordering
- FIFO: Exactly-once processing, strict ordering, lower throughput
Message Group ID: For FIFO queues, use messageGroupId to partition messages:
await ctx.enqueue({
topic: 'order.processing',
data: { orderId: '123' },
messageGroupId: 'customer-456', // Orders from same customer processed in order
})
Reference: motia-js/packages/motia/src/types.ts:194-201
Cron Triggers
Cron triggers execute steps on a schedule using cron expressions.
Basic Cron Trigger
import { cron } from 'motia'
export const config = {
name: 'DailyReport',
triggers: [
cron('0 9 * * *'), // Every day at 9:00 AM
],
enqueues: ['report.generated'],
}
export const handler: Handlers<typeof config> = async (_input, ctx) => {
ctx.logger.info('Generating daily report')
const report = await generateReport()
await ctx.enqueue({
topic: 'report.generated',
data: { reportId: report.id, timestamp: new Date().toISOString() },
})
}
Motia uses extended cron syntax with optional seconds field:
* * * * * * *
│ │ │ │ │ │ └─ Year (optional)
│ │ │ │ │ └─── Day of week (0-7, 0 and 7 = Sunday)
│ │ │ │ └───── Month (1-12)
│ │ │ └─────── Day of month (1-31)
│ │ └───────── Hour (0-23)
│ └─────────── Minute (0-59)
└───────────── Second (0-59, optional)
Common Patterns:
cron('* * * * *') // Every minute
cron('0 * * * *') // Every hour
cron('0 0 * * *') // Daily at midnight
cron('0 0 * * 0') // Weekly on Sunday
cron('0 0 1 * *') // Monthly on 1st
cron('*/5 * * * *') // Every 5 minutes
cron('0 9,17 * * 1-5') // 9 AM and 5 PM, Monday-Friday
Reference: motia-js/playground/steps/cronExample/handlePeriodicJob.step.ts:8
State Triggers
State triggers react to changes in the state management system.
Basic State Trigger
import { state } from 'motia'
export const config = {
name: 'OnUserStatusChange',
triggers: [
state({
condition: (input, ctx) => input.group_id === 'users',
}),
],
enqueues: ['user.status.changed'],
}
export const handler: Handlers<typeof config> = async (input, ctx) => {
ctx.logger.info('User state changed', {
groupId: input.group_id,
itemId: input.item_id,
oldValue: input.old_value,
newValue: input.new_value,
})
await ctx.enqueue({
topic: 'user.status.changed',
data: {
userId: input.item_id,
oldStatus: input.old_value?.status,
newStatus: input.new_value?.status,
},
})
}
State triggers receive the following input structure:
type StateTriggerInput<T> = {
type: 'state'
group_id: string
item_id: string
old_value?: T
new_value?: T
}
Reference: motia-js/packages/motia/src/types.ts:113-119, motia-py/playground/steps/state_example/on_state_change_step.py
Stream Triggers
Stream triggers react to real-time events from data streams.
Basic Stream Trigger
import { stream } from 'motia'
export const config = {
name: 'OnTodoStreamEvent',
triggers: [
stream('todo', {
groupId: 'inbox', // Optional: filter by group
itemId: undefined, // Optional: filter by specific item
}),
],
enqueues: ['todo.processed'],
}
export const handler: Handlers<typeof config> = async (input, ctx) => {
ctx.logger.info('Todo stream event', {
streamName: input.streamName,
groupId: input.groupId,
itemId: input.id,
eventType: input.event.type,
})
if (input.event.type === 'create') {
ctx.logger.info('New todo created', { id: input.id })
} else if (input.event.type === 'update') {
ctx.logger.info('Todo updated', { id: input.id })
} else if (input.event.type === 'delete') {
ctx.logger.info('Todo deleted', { id: input.id })
}
await ctx.enqueue({
topic: 'todo.processed',
data: { todoId: input.id, eventType: input.event.type },
})
}
Stream triggers receive the following input structure:
type StreamTriggerInput<T> = {
type: 'stream'
timestamp: number
streamName: string
groupId: string
id: string
event: StreamEvent<T>
}
type StreamEvent<T> =
| { type: 'create'; data: T }
| { type: 'update'; data: T }
| { type: 'delete'; data: T }
Reference: motia-js/packages/motia/src/types.ts:121-133, motia-py/playground/steps/stream_example/on_todo_event_step.py
Trigger Conditions
All trigger types support optional conditions to filter invocations:
queue('order.created', {
condition: (input, ctx) => {
// Only process high-value orders
return input.amount > 1000
},
})
http('POST', '/orders', {
condition: (input, ctx) => {
// Only process for verified users
return input.body.user.verified === true
},
})
Conditions can be async:
state({
condition: async (input, ctx) => {
const user = await ctx.state.get('users', input.item_id)
return user?.role === 'admin'
},
})
Reference: motia-js/playground/steps/multi-trigger-example.step.ts:4-16
Trigger Registration Protocol
The III Engine uses a WebSocket protocol for trigger registration:
Message::RegisterTrigger {
id: String,
trigger_type: String,
function_id: String,
config: Value,
}
Message::TriggerRegistrationResult {
id: String,
trigger_type: String,
function_id: String,
error: Option<ErrorBody>,
}
Reference: engine/src/protocol.rs:39-51
Best Practices
Choose the right trigger type
- Use HTTP for synchronous requests requiring immediate responses
- Use Queue for asynchronous, decoupled event processing
- Use Cron for scheduled tasks
- Use State for reactive data change handling
- Use Stream for real-time data synchronization
Queue triggers may retry on failure. Ensure handlers are idempotent and can safely process the same message multiple times.
Conditions filter invocations at the trigger level, reducing unnecessary executions and improving performance.
Handle errors appropriately
- HTTP triggers: Return appropriate status codes (400, 500, etc.)
- Queue triggers: Throw errors to trigger retries via DLQ
- Cron triggers: Log errors and consider alerting mechanisms
Next Steps
Context API
Learn about ctx.enqueue(), ctx.state, and more
HTTP Triggers
Add middleware to HTTP triggers
State Management
Work with state triggers
Streams
Work with stream triggers