Documentation Index
Fetch the complete documentation index at: https://mintlify.com/MotiaDev/motia/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The III Engine is the high-performance Rust core that orchestrates Motia applications. It provides:
- Function invocation - Call handlers across workers
- Trigger management - Register and fire triggers
- Queue processing - Asynchronous event routing
- State management - Distributed key-value storage
- Stream synchronization - Real-time data streams
- WebSocket protocol - Communication between workers and engine
- Telemetry - OpenTelemetry-based observability
Architecture
Core Components
Engine Struct
The main engine orchestrator:
pub struct Engine {
pub worker_registry: Arc<WorkerRegistry>,
pub functions: Arc<FunctionsRegistry>,
pub trigger_registry: Arc<TriggerRegistry>,
pub service_registry: Arc<ServicesRegistry>,
pub invocations: Arc<InvocationHandler>,
pub channel_manager: Arc<ChannelManager>,
}
Reference: engine/src/engine/mod.rs:142-150
Engine Trait
Defines the core engine operations:
pub trait EngineTrait: Send + Sync {
async fn call(
&self,
function_id: &str,
input: impl Serialize + Send,
) -> Result<Option<Value>, ErrorBody>;
async fn register_trigger_type(&self, trigger_type: TriggerType);
fn register_function(
&self,
request: RegisterFunctionRequest,
handler: Box<dyn FunctionHandler + Send + Sync>,
);
}
Reference: engine/src/engine/mod.rs:121-140
WebSocket Protocol
Workers communicate with the engine using a JSON-based WebSocket protocol.
Message Types
pub enum Message {
// Trigger management
RegisterTriggerType { id: String, description: String },
RegisterTrigger { id: String, trigger_type: String, function_id: String, config: Value },
UnregisterTrigger { id: String, trigger_type: Option<String> },
TriggerRegistrationResult { id: String, trigger_type: String, function_id: String, error: Option<ErrorBody> },
// Function management
RegisterFunction { id: String, description: Option<String>, request_format: Option<Value>, response_format: Option<Value>, metadata: Option<Value>, invocation: Option<HttpInvocationRef> },
UnregisterFunction { id: String },
// Invocation
InvokeFunction { invocation_id: Option<Uuid>, function_id: String, data: Value, traceparent: Option<String>, baggage: Option<String> },
InvocationResult { invocation_id: Uuid, function_id: String, result: Option<Value>, error: Option<ErrorBody>, traceparent: Option<String>, baggage: Option<String> },
// Service management
RegisterService { id: String, name: String, description: Option<String> },
// Health
Ping,
Pong,
WorkerRegistered { worker_id: String },
}
Reference: engine/src/protocol.rs:33-107
Binary Frames
The engine supports binary frames with magic prefixes for telemetry:
- OTLP (
OTLP prefix) - Trace spans
- MTRC (
MTRC prefix) - Metrics
- LOGS (
LOGS prefix) - Log records
const OTLP_WS_PREFIX: &[u8] = b"OTLP";
const MTRC_WS_PREFIX: &[u8] = b"MTRC";
const LOGS_WS_PREFIX: &[u8] = b"LOGS";
Reference: engine/src/engine/mod.rs:35-39
Connection Lifecycle
Connection Established
Worker opens WebSocket connection to engine: Worker Registration
Engine assigns worker ID and sends confirmation:{
"type": "workerregistered",
"worker_id": "550e8400-e29b-41d4-a716-446655440000"
}
Function Registration
Worker registers its functions:{
"type": "registerfunction",
"id": "CreateTodo",
"description": "Create a new todo item",
"request_format": { "type": "object", ... },
"response_format": { "type": "object", ... }
}
Trigger Registration
Worker registers triggers for its functions:{
"type": "registertrigger",
"id": "http:POST:/todo",
"trigger_type": "http",
"function_id": "CreateTodo",
"config": { "path": "/todo", "method": "POST" }
}
Invocation
Engine invokes function when trigger fires:{
"type": "invokefunction",
"invocation_id": "123e4567-e89b-12d3-a456-426614174000",
"function_id": "CreateTodo",
"data": { "description": "Buy groceries" },
"traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"
}
Result
Worker sends invocation result back:{
"type": "invocationresult",
"invocation_id": "123e4567-e89b-12d3-a456-426614174000",
"function_id": "CreateTodo",
"result": { "id": "todo-123", "description": "Buy groceries" },
"traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"
}
Cleanup
When worker disconnects:
- Unregister all functions
- Unregister all triggers
- Halt pending invocations
- Fire
worker_disconnected trigger
Reference: engine/src/engine/mod.rs:645-799
Module System
The engine uses a modular architecture where each module provides specific functionality.
Module Trait
#[async_trait::async_trait]
pub trait Module: Send + Sync {
fn id(&self) -> &str;
fn name(&self) -> &str;
async fn initialize(&self) -> anyhow::Result<()>;
async fn shutdown(&self) -> anyhow::Result<()>;
}
Available Modules
Queue Module
Manages event-driven messaging with adapters for Redis, RabbitMQ, etc.Location: engine/src/modules/queue/
State Module
Provides distributed key-value storage with state change triggers.Location: engine/src/modules/state/
Stream Module
Enables real-time data synchronization via WebSocket connections.Location: engine/src/modules/stream/
HTTP Module
Exposes REST API endpoints and handles HTTP function invocations.Location: engine/src/modules/rest_api/
Cron Module
Schedules and executes time-based triggers.Location: engine/src/modules/cron/
Observability Module
Collects and exports traces, metrics, and logs.Location: engine/src/modules/observability/
Reference: engine/src/lib.rs:20-38
Queue Module
Handles asynchronous event processing.
Queue Adapter Trait
#[async_trait::async_trait]
pub trait QueueAdapter: Send + Sync + 'static {
async fn enqueue(
&self,
topic: &str,
data: Value,
traceparent: Option<String>,
baggage: Option<String>,
);
async fn subscribe(
&self,
topic: &str,
id: &str,
function_id: &str,
condition_function_id: Option<String>,
queue_config: Option<SubscriberQueueConfig>,
);
async fn unsubscribe(&self, topic: &str, id: &str);
async fn redrive_dlq(&self, topic: &str) -> anyhow::Result<u64>;
async fn dlq_count(&self, topic: &str) -> anyhow::Result<u64>;
}
Reference: engine/src/modules/queue/mod.rs:19-39
Redis Adapter
Default queue implementation using Redis:
- Standard queues: Redis lists for FIFO processing
- FIFO queues: Redis sorted sets with message group partitioning
- Dead-letter queues: Failed messages stored for retry
State Module
Provides distributed state management.
State Operations
- set: Store or update a value
- get: Retrieve a value
- update: Apply JSON Patch operations
- delete: Remove a value
- list: List all values in a group
- clear: Clear all values in a group
State Triggers
State changes automatically fire triggers:
// When state changes...
state.set("users", "user-123", user_data).await;
// Triggers are fired...
engine.fire_triggers("state", json!({
"type": "state",
"group_id": "users",
"item_id": "user-123",
"old_value": old_value,
"new_value": user_data,
})).await;
Reference: engine/src/modules/state/mod.rs
Stream Module
Enables real-time data synchronization.
Stream Operations
- set: Create or update stream item
- get: Retrieve stream item
- update: Apply JSON Patch operations
- delete: Remove stream item
- list: List items in a group
Stream Events
Stream operations emit events to subscribed clients:
pub enum StreamEvent {
Create { data: Value },
Update { data: Value },
Delete { data: Value },
}
WebSocket Synchronization
Clients subscribe to streams via WebSocket and receive real-time updates:
{
"type": "stream",
"timestamp": 1634567890000,
"streamName": "todo",
"groupId": "inbox",
"id": "todo-123",
"event": {
"type": "create",
"data": { "description": "Buy groceries" }
}
}
Reference: engine/src/modules/stream/mod.rs
Invocation Handler
Manages function invocations with distributed tracing.
Invocation Flow
Create Invocation
Generate unique invocation ID:let invocation_id = Uuid::new_v4();
Extract Trace Context
Parse W3C Trace Context headers:let traceparent = "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01";
let baggage = "key1=value1,key2=value2";
Send InvokeFunction
Send message to worker:Message::InvokeFunction {
invocation_id: Some(invocation_id),
function_id: function_id.to_string(),
data: input,
traceparent,
baggage,
}
Wait for Result
Block until worker responds:let (tx, rx) = oneshot::channel();
invocations.insert(invocation_id, tx);
let result = rx.await?;
Return Result
Return result or error to caller:match result {
Ok(value) => Ok(value),
Err(error) => Err(error),
}
Reference: engine/src/engine/mod.rs:177-220
Telemetry System
The engine includes comprehensive observability via OpenTelemetry.
Trace Propagation
Traces follow W3C Trace Context specification:
// Extract from incoming request
let traceparent = extract_traceparent_from_headers(headers);
let baggage = extract_baggage_from_headers(headers);
// Inject into outgoing request
let ctx = tracing::Span::current().context();
let traceparent = inject_traceparent_from_context(&ctx);
let baggage = inject_baggage_from_context(&ctx);
Reference: engine/src/telemetry.rs
Span Creation
let span = tracing::info_span!(
"handle_invocation",
otel.name = %format!("handle_invocation {}", function_id),
worker_id = %worker.id,
function_id = %function_id,
invocation_id = ?invocation_id,
otel.kind = "server",
otel.status_code = tracing::field::Empty,
)
.with_parent_headers(traceparent.as_deref(), baggage.as_deref());
Reference: engine/src/engine/mod.rs:315-324
Metrics Collection
The engine tracks:
- Function registrations
- Trigger registrations
- Invocation counts
- Error rates
- Latencies
Worker Registry
Tracks connected workers and their capabilities.
Worker Struct
pub struct Worker {
pub id: Uuid,
pub channel: mpsc::Sender<Outbound>,
pub invocations: Arc<RwLock<HashSet<Uuid>>>,
pub function_ids: Arc<RwLock<HashSet<String>>>,
pub external_function_ids: Arc<RwLock<HashSet<String>>>,
}
Worker Lifecycle
- Connection: Worker connects, receives ID
- Registration: Worker registers functions and triggers
- Ready: Worker processes invocations
- Disconnection: Worker disconnects, cleanup occurs
Function Registry
Stores registered functions and their handlers.
pub struct Function {
pub handler: Arc<dyn Fn(Option<Uuid>, Value) -> BoxFuture<'static, FunctionResult<Option<Value>, ErrorBody>> + Send + Sync>,
pub _function_id: String,
pub _description: Option<String>,
pub request_format: Option<Value>,
pub response_format: Option<Value>,
pub metadata: Option<Value>,
}
Trigger Registry
Manages trigger registrations and firing.
pub struct Trigger {
pub id: String,
pub trigger_type: String,
pub function_id: String,
pub config: Value,
pub worker_id: Option<Uuid>,
}
Fire Triggers
When a trigger fires, the engine invokes all matching functions:
pub async fn fire_triggers(&self, trigger_type: &str, data: Value) {
let triggers: Vec<Trigger> = self
.trigger_registry
.triggers
.iter()
.filter(|entry| entry.value().trigger_type == trigger_type)
.map(|entry| entry.value().clone())
.collect();
for trigger in triggers {
let function_id = trigger.function_id.clone();
tokio::spawn(async move {
self.call(&function_id, data.clone()).await
});
}
}
Reference: engine/src/engine/mod.rs:616-643
- Connection pooling reduces overhead
- Binary frames for telemetry reduce serialization cost
- Async I/O prevents blocking
- Local invocations: ~1-5ms
- Cross-worker invocations: ~5-20ms (depends on network)
- Queue processing: ~10-50ms (depends on adapter)
- 10,000+ invocations/second per engine
- Horizontally scalable with multiple engines
- Queue adapters support millions of messages/day
- Engine: ~50-100MB RAM baseline
- Per worker: ~20-50MB RAM
- CPU: Scales with workload
Best Practices
Run the engine as a separate process from workers for isolation and scalability.
Reuse WebSocket connections across invocations to reduce overhead.
Use WebSocket compression for large payloads.
Track invocation latency, error rates, and queue depths.
Choose appropriate queue/state/stream adapters for your workload (Redis, RabbitMQ, PostgreSQL).
Next Steps
Self-Hosting
Deploy your own III Engine
Engine Modules
Configure queue, state, and stream adapters
Observability
Monitor engine performance
Configuration
Configure resource limits