Documentation Index
Fetch the complete documentation index at: https://mintlify.com/MotiaDev/motia/llms.txt
Use this file to discover all available pages before exploring further.
Workflows
Motia enables you to build complex workflows by orchestrating multiple steps through events, state, and streams. Workflows can execute in parallel, sequentially, or conditionally based on your business logic.Event-Driven Orchestration
Steps communicate through events using theenqueue function:
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const orderSchema = z.object({
email: z.string(),
productId: z.string(),
quantity: z.number(),
})
export const config = {
name: 'CreateOrder',
triggers: [{ type: 'http', method: 'POST', path: '/orders', bodySchema: orderSchema }],
enqueues: ['order.process', 'notification.send'],
flows: ['order-workflow'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { enqueue, state, logger }) => {
const order = {
id: crypto.randomUUID(),
...request.body,
status: 'created',
createdAt: new Date().toISOString(),
}
await state.set('orders', order.id, order)
logger.info('Order created', { orderId: order.id })
// Trigger downstream steps
await enqueue({
topic: 'order.process',
data: { orderId: order.id },
})
await enqueue({
topic: 'notification.send',
data: {
email: order.email,
template: 'order-confirmation',
data: order,
},
})
return {
status: 201,
body: { order },
}
}
Sequential Workflows
Chain steps together for sequential processing:// Step 1: Validate Payment
export const validatePaymentConfig = {
name: 'ValidatePayment',
triggers: [{ type: 'queue', topic: 'order.process' }],
enqueues: ['inventory.reserve'],
flows: ['order-workflow'],
} as const satisfies StepConfig
export const validatePaymentHandler: Handlers<typeof validatePaymentConfig> = async (
input,
{ enqueue, state, logger }
) => {
const { orderId } = input
const order = await state.get('orders', orderId)
logger.info('Validating payment', { orderId })
// Simulate payment validation
const paymentValid = true // Call payment provider
if (paymentValid) {
await state.update('orders', orderId, [
{ type: 'set', path: 'paymentStatus', value: 'validated' },
])
// Continue to next step
await enqueue({
topic: 'inventory.reserve',
data: { orderId },
})
} else {
await state.update('orders', orderId, [
{ type: 'set', path: 'status', value: 'payment-failed' },
])
}
}
// Step 2: Reserve Inventory
export const reserveInventoryConfig = {
name: 'ReserveInventory',
triggers: [{ type: 'queue', topic: 'inventory.reserve' }],
enqueues: ['shipping.create'],
flows: ['order-workflow'],
} as const satisfies StepConfig
export const reserveInventoryHandler: Handlers<typeof reserveInventoryConfig> = async (
input,
{ enqueue, state, logger }
) => {
const { orderId } = input
const order = await state.get('orders', orderId)
logger.info('Reserving inventory', { orderId })
// Reserve inventory
const reserved = true // Check inventory system
if (reserved) {
await state.update('orders', orderId, [
{ type: 'set', path: 'inventoryStatus', value: 'reserved' },
])
// Continue to shipping
await enqueue({
topic: 'shipping.create',
data: { orderId },
})
}
}
// Step 3: Create Shipment
export const createShipmentConfig = {
name: 'CreateShipment',
triggers: [{ type: 'queue', topic: 'shipping.create' }],
flows: ['order-workflow'],
} as const satisfies StepConfig
export const createShipmentHandler: Handlers<typeof createShipmentConfig> = async (
input,
{ state, logger }
) => {
const { orderId } = input
logger.info('Creating shipment', { orderId })
await state.update('orders', orderId, [
{ type: 'set', path: 'status', value: 'shipped' },
{ type: 'set', path: 'shippedAt', value: new Date().toISOString() },
])
logger.info('Order workflow completed', { orderId })
}
Parallel Execution
Execute multiple steps in parallel for better performance:import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'ParallelDataGathering',
triggers: [{ type: 'http', method: 'POST', path: '/analyze/:userId' }],
enqueues: ['fetch.profile', 'fetch.activity', 'fetch.preferences'],
flows: ['data-aggregation'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { enqueue, state }) => {
const { userId } = request.params
const analysisId = crypto.randomUUID()
// Initialize aggregation state
await state.set('analysis', analysisId, {
userId,
status: 'gathering',
results: {},
startedAt: Date.now(),
})
// Trigger parallel data fetching
await Promise.all([
enqueue({
topic: 'fetch.profile',
data: { analysisId, userId },
}),
enqueue({
topic: 'fetch.activity',
data: { analysisId, userId },
}),
enqueue({
topic: 'fetch.preferences',
data: { analysisId, userId },
}),
])
return {
status: 202,
body: { analysisId, status: 'gathering' },
}
}
Parallel Merge Pattern
import type { Handlers, StepConfig } from 'motia'
// Individual data fetcher
export const fetchProfileConfig = {
name: 'FetchProfile',
triggers: [{ type: 'queue', topic: 'fetch.profile' }],
enqueues: ['analysis.aggregate'],
flows: ['data-aggregation'],
} as const satisfies StepConfig
export const fetchProfileHandler: Handlers<typeof fetchProfileConfig> = async (
input,
{ enqueue, state }
) => {
const { analysisId, userId } = input
// Fetch profile data
const profile = { name: 'John Doe', age: 30 } // From database
// Store partial result
await state.update('analysis', analysisId, [
{ type: 'set', path: 'results.profile', value: profile },
{ type: 'increment', path: 'completedTasks', by: 1 },
])
// Notify aggregator
await enqueue({
topic: 'analysis.aggregate',
data: { analysisId },
})
}
// Aggregation step (triggered by state)
export const aggregateConfig = {
name: 'AggregateResults',
triggers: [
{
type: 'state',
scope: 'analysis',
condition: (input) => {
return (
input.event.type === 'update' &&
input.event.data.completedTasks === 3
)
},
},
],
flows: ['data-aggregation'],
} as const satisfies StepConfig
export const aggregateHandler: Handlers<typeof aggregateConfig> = async (input, { state, logger }) => {
const analysis = input.event.data
const duration = Date.now() - analysis.startedAt
logger.info('All parallel tasks completed', {
analysisId: input.id,
duration,
results: analysis.results,
})
await state.update('analysis', input.id, [
{ type: 'set', path: 'status', value: 'completed' },
{ type: 'set', path: 'completedAt', value: Date.now() },
])
}
Conditional Workflows
Route workflows based on conditions:import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'ProcessTransaction',
triggers: [{ type: 'queue', topic: 'transaction.created' }],
enqueues: ['fraud.check', 'payment.process', 'transaction.reject'],
flows: ['payment-workflow'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { enqueue, state, logger }) => {
const { transactionId, amount, userId } = input
logger.info('Processing transaction', { transactionId, amount })
// High-value transactions require fraud check
if (amount > 10000) {
logger.info('High-value transaction, initiating fraud check', { transactionId })
await state.set('transactions', transactionId, {
id: transactionId,
status: 'fraud-check',
amount,
userId,
})
await enqueue({
topic: 'fraud.check',
data: { transactionId, amount, userId },
})
} else {
// Direct to payment processing
logger.info('Standard transaction, processing payment', { transactionId })
await state.set('transactions', transactionId, {
id: transactionId,
status: 'processing',
amount,
userId,
})
await enqueue({
topic: 'payment.process',
data: { transactionId },
})
}
}
Multi-Path Routing
export const handler: Handlers<typeof config> = async (input, { enqueue, logger }) => {
const { orderType, priority, value } = input
switch (orderType) {
case 'express':
await enqueue({ topic: 'express.fulfillment', data: input })
break
case 'standard':
if (priority === 'high' || value > 1000) {
await enqueue({ topic: 'priority.queue', data: input })
} else {
await enqueue({ topic: 'standard.queue', data: input })
}
break
case 'backorder':
await enqueue({ topic: 'inventory.notify', data: input })
break
default:
logger.warn('Unknown order type', { orderType })
await enqueue({ topic: 'manual.review', data: input })
}
}
Multi-Trigger Steps
Create versatile steps that respond to multiple trigger types:import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const orderSchema = z.object({
amount: z.number(),
description: z.string(),
})
export const config = {
name: 'ProcessOrder',
triggers: [
// API trigger for manual orders
{
type: 'http',
method: 'POST',
path: '/orders/manual',
bodySchema: orderSchema,
},
// Queue trigger for automated orders
{
type: 'queue',
topic: 'order.created',
input: orderSchema,
},
// Cron trigger for batch processing
{
type: 'cron',
expression: '0 2 * * *', // Daily at 2 AM
},
],
enqueues: ['order.processed'],
flows: ['order-workflow'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (_, ctx) => {
const orderId = crypto.randomUUID()
return ctx.match({
http: async ({ request }) => {
const order = {
id: orderId,
...request.body,
source: 'manual-api',
createdAt: new Date().toISOString(),
}
await ctx.state.set('orders', orderId, order)
await ctx.enqueue({ topic: 'order.processed', data: { orderId } })
return {
status: 200,
body: { message: 'Order processed', orderId },
}
},
queue: async (queueInput) => {
const order = {
id: orderId,
...queueInput,
source: 'event',
createdAt: new Date().toISOString(),
}
await ctx.state.set('orders', orderId, order)
await ctx.enqueue({ topic: 'order.processed', data: { orderId } })
},
cron: async () => {
ctx.logger.info('Processing batch orders')
const pendingOrders = await ctx.state.list<{ id: string }>('pending-orders')
for (const order of pendingOrders) {
await ctx.enqueue({
topic: 'order.processed',
data: { orderId: order.id },
})
}
ctx.logger.info('Batch processing complete', { count: pendingOrders.length })
},
})
}
Long-Running Workflows
Handle workflows that span hours or days:import type { Handlers, StepConfig } from 'motia'
// Start long-running process
export const startConfig = {
name: 'StartApprovalProcess',
triggers: [{ type: 'http', method: 'POST', path: '/approvals' }],
enqueues: ['approval.reminder'],
flows: ['approval-workflow'],
} as const satisfies StepConfig
export const startHandler: Handlers<typeof startConfig> = async (
{ request },
{ enqueue, state }
) => {
const approvalId = crypto.randomUUID()
await state.set('approvals', approvalId, {
id: approvalId,
status: 'pending',
requestedAt: new Date().toISOString(),
...request.body,
})
// Schedule reminder for 24 hours later
await enqueue({
topic: 'approval.reminder',
data: { approvalId },
delay: 24 * 60 * 60 * 1000, // 24 hours
})
return { status: 202, body: { approvalId, status: 'pending' } }
}
// Reminder step (triggered after delay)
export const reminderConfig = {
name: 'ApprovalReminder',
triggers: [{ type: 'queue', topic: 'approval.reminder' }],
enqueues: ['notification.send'],
flows: ['approval-workflow'],
} as const satisfies StepConfig
export const reminderHandler: Handlers<typeof reminderConfig> = async (input, { state, enqueue }) => {
const { approvalId } = input
const approval = await state.get('approvals', approvalId)
if (approval?.status === 'pending') {
await enqueue({
topic: 'notification.send',
data: {
email: approval.approver,
template: 'approval-reminder',
data: approval,
},
})
}
}
Error Handling & Retries
Motia automatically retries failed steps (configurable inconfig.yaml):
export const handler: Handlers<typeof config> = async (input, { logger, enqueue }) => {
try {
// Attempt external API call
const result = await fetch('https://api.example.com/process', {
method: 'POST',
body: JSON.stringify(input),
})
if (!result.ok) {
throw new Error(`API error: ${result.status}`)
}
logger.info('Processing successful')
} catch (error) {
logger.error('Processing failed, will retry', { error })
// Throw to trigger automatic retry
throw error
}
}
Dead Letter Queue
export const config = {
name: 'ProcessWithDLQ',
triggers: [{ type: 'queue', topic: 'data.process' }],
enqueues: ['data.failed'],
flows: ['data-processing'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger, enqueue, state }) => {
try {
// Process data
await processData(input)
} catch (error) {
const retryCount = input.retryCount || 0
if (retryCount >= 3) {
// Move to dead letter queue after max retries
logger.error('Max retries exceeded, sending to DLQ', { error, input })
await state.set('failed-jobs', crypto.randomUUID(), {
input,
error: String(error),
failedAt: new Date().toISOString(),
})
await enqueue({
topic: 'data.failed',
data: { input, error: String(error) },
})
} else {
// Retry with incremented count
throw error
}
}
}
Workflow Monitoring
Track workflow progress in real-time:export const handler: Handlers<typeof config> = async ({ request }, { state, streams }) => {
const workflowId = crypto.randomUUID()
// Initialize workflow state
await state.set('workflows', workflowId, {
id: workflowId,
status: 'running',
steps: {
step1: 'pending',
step2: 'pending',
step3: 'pending',
},
startedAt: Date.now(),
})
// Also set in stream for real-time updates
await streams.workflows.set('active', workflowId, {
id: workflowId,
status: 'running',
progress: 0,
})
return { status: 202, body: { workflowId } }
}
// Update progress as steps complete
export const stepHandler: Handlers<typeof stepConfig> = async (input, { state, streams }) => {
const { workflowId, stepName } = input
// Update state
await state.update('workflows', workflowId, [
{ type: 'set', path: `steps.${stepName}`, value: 'completed' },
])
// Update stream for real-time dashboard
await streams.workflows.update('active', workflowId, [
{ type: 'increment', path: 'progress', by: 33 },
])
}
Best Practices
1. Design for Idempotency
export const handler: Handlers<typeof config> = async (input, { state }) => {
const { orderId } = input
// Check if already processed
const existing = await state.get('processed', orderId)
if (existing) {
logger.info('Already processed, skipping', { orderId })
return
}
// Process order
await processOrder(orderId)
// Mark as processed
await state.set('processed', orderId, { processedAt: Date.now() })
}
2. Use Meaningful Topic Names
// Good: Clear, hierarchical
await enqueue({ topic: 'user.created', data })
await enqueue({ topic: 'order.payment.validated', data })
await enqueue({ topic: 'inventory.stock.low', data })
// Avoid: Vague, flat
await enqueue({ topic: 'process', data })
await enqueue({ topic: 'data', data })
3. Document Workflow Flows
export const config = {
name: 'ProcessOrder',
description: 'Main order processing step. Validates payment, reserves inventory, creates shipment.',
flows: ['order-workflow'],
enqueues: ['payment.validate', 'inventory.reserve'],
// ...
}
Next Steps
- Learn about State Management for workflow data
- Explore Streaming for real-time workflow updates
- Check out Observability for workflow monitoring