Skip to main content

Overview

This tutorial demonstrates how to build a message queue system using Motia’s pub/sub pattern. You’ll create an order processing system that accepts orders via HTTP, processes them asynchronously, and sends notifications. What you’ll learn:
  • Pub/sub messaging patterns
  • Queue-triggered Steps
  • Chaining Steps with enqueue
  • Event-driven architecture
  • Error handling in async workflows
  • Processing pipelines

Prerequisites

Before starting, make sure you have:
  • Node.js version 19 or higher
  • Completed the Hello World tutorial
  • Understanding of asynchronous processing

Use Case: Order Processing System

We’ll build a system that:
  1. Accepts orders via HTTP API
  2. Validates and processes orders in the background
  3. Updates inventory
  4. Sends confirmation notifications
  5. Handles errors gracefully

Project Setup

1

Create project

mkdir order-processing
cd order-processing
npm init -y
2

Install dependencies

npm install motia zod
npm install -D typescript @types/node
Update package.json:
{
  "type": "module",
  "scripts": {
    "dev": "iii",
    "build": "motia build"
  }
}
3

Create structure

mkdir -p steps/orders

Building the System

Step 1: Define Data Types

Create steps/orders/types.ts:
steps/orders/types.ts
import { z } from 'zod'

export const orderItemSchema = z.object({
  productId: z.string(),
  quantity: z.number().positive(),
  price: z.number().positive(),
})

export const orderSchema = z.object({
  orderId: z.string(),
  customerId: z.string(),
  items: z.array(orderItemSchema),
  total: z.number(),
  status: z.enum(['pending', 'processing', 'completed', 'failed']),
  createdAt: z.string(),
  processedAt: z.string().optional(),
})

export const processOrderEventSchema = z.object({
  orderId: z.string(),
  customerId: z.string(),
  items: z.array(orderItemSchema),
  total: z.number(),
  timestamp: z.string(),
})

export const notificationEventSchema = z.object({
  orderId: z.string(),
  customerId: z.string(),
  status: z.string(),
  message: z.string(),
})

export type Order = z.infer<typeof orderSchema>
export type OrderItem = z.infer<typeof orderItemSchema>
export type ProcessOrderEvent = z.infer<typeof processOrderEventSchema>
export type NotificationEvent = z.infer<typeof notificationEventSchema>

Step 2: Create Order API

Create steps/orders/create-order.step.ts:
steps/orders/create-order.step.ts
import { type Handlers, type StepConfig } from 'motia'
import { z } from 'zod'
import { orderItemSchema, orderSchema } from './types'

export const config = {
  name: 'CreateOrder',
  description: 'Accepts new orders and enqueues them for processing',
  flows: ['order-processing'],
  triggers: [
    {
      type: 'http',
      method: 'POST',
      path: '/orders',
      bodySchema: z.object({
        customerId: z.string(),
        items: z.array(orderItemSchema).min(1),
      }),
      responseSchema: {
        200: orderSchema,
        400: z.object({ error: z.string() }),
      },
    },
  ],
  enqueues: ['process-order'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  { request },
  { enqueue, logger, state }
) => {
  const { customerId, items } = request.body || {}

  logger.info('Received new order', { customerId, itemCount: items?.length })

  // Calculate total
  const total = items.reduce((sum, item) => sum + item.price * item.quantity, 0)

  // Generate order ID
  const orderId = `order-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`
  const timestamp = new Date().toISOString()

  // Create order record
  const order = {
    orderId,
    customerId,
    items,
    total,
    status: 'pending' as const,
    createdAt: timestamp,
  }

  // Store order in state
  await state.set('orders', orderId, order)

  // Enqueue for background processing
  await enqueue({
    topic: 'process-order',
    data: {
      orderId,
      customerId,
      items,
      total,
      timestamp,
    },
  })

  logger.info('Order created and enqueued', { orderId })

  return {
    status: 200,
    body: order,
  }
}
Key concepts:
  • Immediate response: API returns immediately while processing happens in background
  • State storage: Order is persisted before enqueueing
  • Enqueue: Publishes event to process-order topic

Step 3: Process Orders

Create steps/orders/process-order.step.ts:
steps/orders/process-order.step.ts
import { type Handlers, type StepConfig } from 'motia'
import { processOrderEventSchema } from './types'

export const config = {
  name: 'ProcessOrder',
  description: 'Processes orders in the background',
  flows: ['order-processing'],
  triggers: [
    {
      type: 'queue',
      topic: 'process-order',
      input: processOrderEventSchema,
    },
  ],
  enqueues: ['update-inventory', 'send-notification'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  input,
  { enqueue, logger, state }
) => {
  const { orderId, customerId, items, total } = input

  logger.info('Processing order', { orderId, customerId })

  try {
    // Simulate order validation
    await new Promise((resolve) => setTimeout(resolve, 100))

    // Update order status to processing
    await state.update('orders', orderId, [
      { type: 'set', path: 'status', value: 'processing' },
    ])

    // Enqueue inventory update
    await enqueue({
      topic: 'update-inventory',
      data: {
        orderId,
        items,
        timestamp: new Date().toISOString(),
      },
    })

    // Update order status to completed
    await state.update('orders', orderId, [
      { type: 'set', path: 'status', value: 'completed' },
      { type: 'set', path: 'processedAt', value: new Date().toISOString() },
    ])

    // Enqueue notification
    await enqueue({
      topic: 'send-notification',
      data: {
        orderId,
        customerId,
        status: 'completed',
        message: `Your order ${orderId} has been processed successfully. Total: $${total}`,
      },
    })

    logger.info('Order processed successfully', { orderId })
  } catch (error) {
    logger.error('Order processing failed', { orderId, error })

    // Update order status to failed
    await state.update('orders', orderId, [
      { type: 'set', path: 'status', value: 'failed' },
    ])

    // Send failure notification
    await enqueue({
      topic: 'send-notification',
      data: {
        orderId,
        customerId,
        status: 'failed',
        message: `Order ${orderId} processing failed. Please contact support.`,
      },
    })
  }
}
Key concepts:
  • Chained events: Enqueues multiple follow-up tasks
  • Error handling: Catches failures and triggers error workflow
  • State updates: Tracks order progress
  • Fan-out pattern: Single event triggers multiple downstream events

Step 4: Update Inventory

Create steps/orders/update-inventory.step.ts:
steps/orders/update-inventory.step.ts
import { type Handlers, type StepConfig } from 'motia'
import { z } from 'zod'
import { orderItemSchema } from './types'

const inventoryEventSchema = z.object({
  orderId: z.string(),
  items: z.array(orderItemSchema),
  timestamp: z.string(),
})

export const config = {
  name: 'UpdateInventory',
  description: 'Updates inventory based on order items',
  flows: ['order-processing'],
  triggers: [
    {
      type: 'queue',
      topic: 'update-inventory',
      input: inventoryEventSchema,
    },
  ],
  enqueues: [],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  input,
  { logger, state }
) => {
  const { orderId, items } = input

  logger.info('Updating inventory', { orderId, itemCount: items.length })

  for (const item of items) {
    // Get current inventory
    const inventory = await state.get('inventory', item.productId)
    const currentStock = inventory?.stock || 0

    // Decrement stock
    const newStock = Math.max(0, currentStock - item.quantity)

    await state.set('inventory', item.productId, {
      productId: item.productId,
      stock: newStock,
      lastUpdated: new Date().toISOString(),
    })

    logger.info('Inventory updated', {
      productId: item.productId,
      previousStock: currentStock,
      newStock,
      quantity: item.quantity,
    })
  }

  logger.info('Inventory update completed', { orderId })
}

Step 5: Send Notifications

Create steps/orders/send-notification.step.ts:
steps/orders/send-notification.step.ts
import { type Handlers, type StepConfig } from 'motia'
import { notificationEventSchema } from './types'

export const config = {
  name: 'SendNotification',
  description: 'Sends order notifications to customers',
  flows: ['order-processing'],
  triggers: [
    {
      type: 'queue',
      topic: 'send-notification',
      input: notificationEventSchema,
    },
  ],
  enqueues: [],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  input,
  { logger, state }
) => {
  const { orderId, customerId, status, message } = input

  logger.info('Sending notification', { orderId, customerId, status })

  // In production, integrate with email/SMS service
  // For demo, just log and store in state

  const notification = {
    id: `notif-${Date.now()}`,
    orderId,
    customerId,
    status,
    message,
    sentAt: new Date().toISOString(),
  }

  await state.set('notifications', notification.id, notification)

  logger.info('Notification sent', { 
    notificationId: notification.id, 
    orderId, 
    customerId 
  })

  // Simulate email/SMS sending
  console.log(`\n📧 NOTIFICATION to customer ${customerId}:`, message, '\n')
}

Step 6: Get Order Status

Create steps/orders/get-order.step.ts:
steps/orders/get-order.step.ts
import { type Handlers, type StepConfig } from 'motia'
import { z } from 'zod'
import { orderSchema } from './types'

export const config = {
  name: 'GetOrder',
  description: 'Retrieves order status',
  flows: ['order-processing'],
  triggers: [
    {
      type: 'http',
      method: 'GET',
      path: '/orders/:orderId',
      responseSchema: {
        200: orderSchema,
        404: z.object({ error: z.string() }),
      },
    },
  ],
  enqueues: [],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  { request },
  { logger, state }
) => {
  const { orderId } = request.pathParams || {}

  logger.info('Fetching order', { orderId })

  const order = await state.get('orders', orderId)

  if (!order) {
    return {
      status: 404,
      body: { error: 'Order not found' },
    }
  }

  return {
    status: 200,
    body: order,
  }
}

Running the Application

1

Start the server

npm run dev
2

Create an order

curl -X POST http://localhost:3000/orders \
  -H "Content-Type: application/json" \
  -d '{
    "customerId": "customer-123",
    "items": [
      {"productId": "prod-1", "quantity": 2, "price": 19.99},
      {"productId": "prod-2", "quantity": 1, "price": 49.99}
    ]
  }'
Response:
{
  "orderId": "order-1709568000000-abc123",
  "customerId": "customer-123",
  "items": [...],
  "total": 89.97,
  "status": "pending",
  "createdAt": "2026-03-04T10:00:00.000Z"
}
3

Check order status

Watch the logs to see the processing pipeline:
[CreateOrder] Order created and enqueued
[ProcessOrder] Processing order
[UpdateInventory] Updating inventory
[SendNotification] Sending notification
📧 NOTIFICATION: Your order has been processed successfully
Query the order:
curl http://localhost:3000/orders/order-1709568000000-abc123

Flow Diagram

HTTP POST /orders

  CreateOrder (API)

  [enqueue: process-order]

  ProcessOrder (Queue)

       ├─→ [enqueue: update-inventory] → UpdateInventory
       └─→ [enqueue: send-notification] → SendNotification

Advanced Patterns

Parallel Processing

Modify process-order.step.ts to run inventory and notification in parallel:
// Enqueue both in parallel
await Promise.all([
  enqueue({ topic: 'update-inventory', data: inventoryData }),
  enqueue({ topic: 'send-notification', data: notificationData }),
])

Retry Logic

Add retry configuration to queue triggers:
triggers: [
  {
    type: 'queue',
    topic: 'process-order',
    input: processOrderEventSchema,
    retry: {
      maxAttempts: 3,
      backoff: 'exponential',
    },
  },
]

Dead Letter Queue

Handle permanently failed events:
triggers: [
  {
    type: 'queue',
    topic: 'process-order',
    input: processOrderEventSchema,
    deadLetterQueue: 'failed-orders',
  },
]

Next Steps

Scheduled Tasks

Add periodic jobs to your system

Queue Reference

Learn advanced queue features

Background Jobs Guide

Best practices for background processing

Workflows Guide

Advanced workflow patterns