Documentation Index Fetch the complete documentation index at: https://mintlify.com/MotiaDev/motia/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Motia provides built-in support for real-time data streaming using Server-Sent Events (SSE) and WebSocket-like streams. This enables you to push updates to clients as they happen.
Server-Sent Events (SSE)
SSE is perfect for one-way server-to-client streaming, like live updates, progress notifications, or streaming AI responses.
Create an SSE endpoint
steps/sse-example.step.ts
import { type Handlers , http , type StepConfig } from 'motia'
export const config = {
name: 'SSE Example' ,
description: 'Stream data to clients using Server-Sent Events' ,
flows: [ 'streaming' ],
triggers: [ http ( 'POST' , '/stream' )],
enqueues: [],
} as const satisfies StepConfig
export const handler : Handlers < typeof config > = async (
{ request , response },
{ logger }
) => {
logger . info ( 'SSE stream requested' )
// Set up SSE headers
response . status ( 200 )
response . headers ({
'content-type' : 'text/event-stream' ,
'cache-control' : 'no-cache' ,
connection: 'keep-alive' ,
})
// Parse request data
const chunks : string [] = []
for await ( const chunk of request . requestBody . stream ) {
chunks . push ( Buffer . from ( chunk ). toString ( 'utf-8' ))
}
const body = chunks . join ( '' ). trim ()
const params = new URLSearchParams ( body )
const data = Object . fromEntries ( params )
logger . info ( 'Processing SSE request' , { data })
// Generate and stream items
const items = generateItems ( data )
for ( const item of items ) {
// Send SSE event
response . stream . write (
`event: item \n data: ${ JSON . stringify ( item ) } \n\n `
)
// Simulate processing delay
await sleep ( 300 + Math . random () * 700 )
}
// Send completion event
response . stream . write (
`event: done \n data: ${ JSON . stringify ({ total: items . length }) } \n\n `
)
response . close ()
}
function sleep ( ms : number ) {
return new Promise (( resolve ) => setTimeout ( resolve , ms ))
}
function generateItems ( data : Record < string , string >) {
const count = 5 + Math . floor ( Math . random () * 6 )
return Array . from ({ length: count }, ( _ , i ) => ({
id: `item- ${ Date . now () } - ${ i } ` ,
data: data ,
timestamp: new Date (). toISOString (),
}))
}
Connect from the client
const eventSource = new EventSource ( 'http://localhost:3000/stream' )
eventSource . addEventListener ( 'item' , ( event ) => {
const item = JSON . parse ( event . data )
console . log ( 'Received item:' , item )
// Update UI with new item
})
eventSource . addEventListener ( 'done' , ( event ) => {
const summary = JSON . parse ( event . data )
console . log ( 'Stream complete:' , summary )
eventSource . close ()
})
eventSource . onerror = ( error ) => {
console . error ( 'SSE error:' , error )
eventSource . close ()
}
SSE with Python
You can also build SSE endpoints using Motia’s Python SDK:
import asyncio
import json
import random
from typing import Any
from motia import MotiaHttpArgs, FlowContext, http
config = {
"name" : "SSE Example" ,
"description" : "Stream data using Server-Sent Events" ,
"flows" : [ "streaming" ],
"triggers" : [http( "POST" , "/stream" )],
"enqueues" : [],
}
async def handler ( args : MotiaHttpArgs[Any], ctx : FlowContext[Any]) -> None :
request = args.request
response = args.response
ctx.logger.info( "SSE stream requested" )
# Set up SSE headers
await response.status( 200 )
await response.headers({
"content-type" : "text/event-stream" ,
"cache-control" : "no-cache" ,
"connection" : "keep-alive" ,
})
# Read request body
raw_chunks: list[ str ] = []
async for chunk in request.request_body.stream:
if isinstance (chunk, bytes ):
raw_chunks.append(chunk.decode( "utf-8" , errors = "replace" ))
else :
raw_chunks.append( str (chunk))
# Generate items
items = _generate_items()
# Stream items
for item in items:
data = json.dumps(item)
response.writer.stream.write(
f "event: item \n data: { data } \n\n " .encode( "utf-8" )
)
await asyncio.sleep( 0.3 + random.random() * 0.7 )
# Send completion
done_data = json.dumps({ "total" : len (items)})
response.writer.stream.write(
f "event: done \n data: { done_data } \n\n " .encode( "utf-8" )
)
response.close()
def _generate_items () -> list[dict[ str , Any]]:
return [
{
"id" : f "item- { i } " ,
"value" : random.randint( 1 , 100 ),
}
for i in range ( 10 )
]
Streaming AI responses
SSE is ideal for streaming LLM responses to clients:
import { type Handlers , http , type StepConfig } from 'motia'
import { z } from 'zod'
export const config = {
name: 'AI Chat Stream' ,
description: 'Stream AI responses to clients' ,
flows: [ 'ai' ],
triggers: [
http ( 'POST' , '/chat' , {
bodySchema: z . object ({
message: z . string (),
conversationId: z . string (). optional (),
}),
}),
],
} as const satisfies StepConfig
export const handler : Handlers < typeof config > = async (
{ request , response },
{ logger }
) => {
const { message , conversationId } = request . body
logger . info ( 'AI chat requested' , { message , conversationId })
// Set up SSE
response . status ( 200 )
response . headers ({
'content-type' : 'text/event-stream' ,
'cache-control' : 'no-cache' ,
connection: 'keep-alive' ,
})
try {
// Call your LLM (example with OpenAI)
const stream = await openai . chat . completions . create ({
model: 'gpt-4' ,
messages: [{ role: 'user' , content: message }],
stream: true ,
})
for await ( const chunk of stream ) {
const content = chunk . choices [ 0 ]?. delta ?. content || ''
if ( content ) {
response . stream . write (
`event: token \n data: ${ JSON . stringify ({ content }) } \n\n `
)
}
}
response . stream . write (
`event: done \n data: ${ JSON . stringify ({ finished: true }) } \n\n `
)
} catch ( error ) {
logger . error ( 'AI streaming failed' , { error })
response . stream . write (
`event: error \n data: ${ JSON . stringify ({ error: 'Stream failed' }) } \n\n `
)
}
response . close ()
}
Real-time state streams
Motia provides built-in state streams for real-time data synchronization:
import type { StreamConfig } from 'motia'
import { z } from 'zod'
const todoSchema = z . object ({
id: z . string (),
description: z . string (),
createdAt: z . string (),
completedAt: z . string (). optional (),
})
export const config : StreamConfig = {
baseConfig: { storageType: 'default' },
name: 'todo' ,
schema: todoSchema ,
onJoin : async ( subscription , context , authContext ) => {
// Track active subscribers
await context . streams . inbox . update ( 'watching' , subscription . groupId , [
{ type: 'increment' , path: 'watching' , by: 1 },
])
context . logger . info ( 'Client joined stream' , {
subscription ,
authContext ,
})
return { unauthorized: false }
},
onLeave : async ( subscription , context , authContext ) => {
// Update subscriber count
await context . streams . inbox . update ( 'watching' , subscription . groupId , [
{ type: 'decrement' , path: 'watching' , by: 1 },
])
context . logger . info ( 'Client left stream' , { subscription })
},
}
export type Todo = z . infer < typeof todoSchema >
Use the stream in your steps:
steps/create-todo.step.ts
import { type Handlers , http , type StepConfig } from 'motia'
import { z } from 'zod'
import type { Todo } from './todo.stream'
export const config = {
name: 'CreateTodo' ,
flows: [ 'todo-app' ],
triggers: [
http ( 'POST' , '/todo' , {
bodySchema: z . object ({
description: z . string ()
}),
}),
],
} as const satisfies StepConfig
export const handler : Handlers < typeof config > = async (
{ request },
{ logger , streams , state }
) => {
const { description } = request . body
const todoId = `todo- ${ Date . now () } `
const newTodo : Todo = {
id: todoId ,
description ,
createdAt: new Date (). toISOString (),
}
// Update stream - all connected clients receive this update
await streams . todo . set ( 'inbox' , todoId , newTodo )
// Also persist to state
await state . set ( 'todos' , todoId , newTodo )
logger . info ( 'Todo created and streamed' , { todoId })
return { status: 200 , body: newTodo }
}
Progress updates
Stream progress updates for long-running operations:
export const handler : Handlers < typeof config > = async (
{ response },
{ logger }
) => {
response . status ( 200 )
response . headers ({
'content-type' : 'text/event-stream' ,
'cache-control' : 'no-cache' ,
connection: 'keep-alive' ,
})
const steps = [
{ name: 'Initializing' , duration: 1000 },
{ name: 'Processing data' , duration: 3000 },
{ name: 'Generating report' , duration: 2000 },
{ name: 'Finalizing' , duration: 1000 },
]
for ( let i = 0 ; i < steps . length ; i ++ ) {
const step = steps [ i ]
const progress = (( i + 1 ) / steps . length ) * 100
response . stream . write (
`event: progress \n data: ${ JSON . stringify ({
step: step . name ,
progress: Math . round ( progress ) ,
current: i + 1 ,
total: steps . length ,
}) } \n\n `
)
await sleep ( step . duration )
}
response . stream . write (
`event: complete \n data: ${ JSON . stringify ({ success: true }) } \n\n `
)
response . close ()
}
Best practices
Set proper headers Always set content-type: text/event-stream and cache-control: no-cache for SSE.
Handle client disconnects Check connection status periodically. Clean up resources when clients disconnect.
Send heartbeats Send periodic ping events to keep connections alive and detect disconnects.
Limit concurrent streams Monitor and limit the number of concurrent streaming connections per client.
Next steps
AI Integration Build AI-powered workflows with streaming responses
Multi-language Mix TypeScript and Python in one project