Skip to main content

Overview

The @motiadev/stream-client package provides a client-side API for subscribing to real-time data streams. It supports WebSocket connections and provides reactive subscriptions to individual items and groups.

Installation

npm install @motiadev/stream-client

Classes

Stream

The main client class for managing stream connections and subscriptions.
class Stream {
  constructor(adapterFactory: SocketAdapterFactory)
  
  subscribeItem<TData extends { id: string }>(
    streamName: string,
    groupId: string,
    id: string
  ): StreamItemSubscription<TData>
  
  subscribeGroup<TData extends { id: string }>(
    streamName: string,
    groupId: string,
    sortKey?: keyof TData
  ): StreamGroupSubscription<TData>
  
  close(): void
}
constructor
function
Creates a new Stream client instance.
adapterFactory
SocketAdapterFactory
required
Factory function that creates socket adapter instances.
type SocketAdapterFactory = () => SocketAdapter

Methods

subscribeItem
function
Subscribe to a specific item in a stream.
streamName
string
required
The name of the stream.
groupId
string
required
The group identifier.
id
string
required
The item identifier.
return
StreamItemSubscription<TData>
A subscription object for the specific item.
subscribeGroup
function
Subscribe to all items in a group.
streamName
string
required
The name of the stream.
groupId
string
required
The group identifier.
sortKey
keyof TData
Optional key to sort items by.
return
StreamGroupSubscription<TData>
A subscription object for the group.
close
function
Closes the stream connection and cleans up all subscriptions.

StreamItemSubscription

Subscription to a single item in a stream.
class StreamItemSubscription<TData extends { id: string }> {
  getState(): TData | null
  addChangeListener(listener: Listener<TData | null>): void
  removeChangeListener(listener: Listener<TData | null>): void
  onEvent(type: string, listener: CustomEventListener): void
  offEvent(type: string, listener: CustomEventListener): void
  onClose(listener: () => void): void
  close(): void
}
getState
function
Returns the current state of the item.
return
TData | null
The current item data or null if deleted/not yet loaded.
addChangeListener
function
Adds a listener that is called whenever the item changes.
listener
Listener<TData | null>
required
Callback function receiving the updated item state.
type Listener<TData> = (state: TData | null) => void
removeChangeListener
function
Removes a previously added change listener.
listener
Listener<TData | null>
required
The listener to remove.
onEvent
function
Adds a listener for custom events.
type
string
required
The custom event type to listen for.
listener
CustomEventListener
required
Callback function for the event.
type CustomEventListener<TData> = (event: TData) => void
offEvent
function
Removes a custom event listener.
type
string
required
The event type.
listener
CustomEventListener
required
The listener to remove.
onClose
function
Adds a callback to be called when the subscription is closed.
listener
() => void
required
Callback function.
close
function
Closes the subscription and stops receiving updates.

StreamGroupSubscription

Subscription to all items in a group.
class StreamGroupSubscription<TData extends { id: string }> {
  getState(): TData[]
  addChangeListener(listener: Listener<TData[]>): void
  removeChangeListener(listener: Listener<TData[]>): void
  onEvent(type: string, listener: CustomEventListener): void
  offEvent(type: string, listener: CustomEventListener): void
  onClose(listener: () => void): void
  close(): void
}
getState
function
Returns the current state of all items in the group.
return
TData[]
Array of all items in the group.
addChangeListener
function
Adds a listener that is called whenever any item in the group changes.
listener
Listener<TData[]>
required
Callback function receiving the updated group state.
removeChangeListener
function
Removes a previously added change listener.
listener
Listener<TData[]>
required
The listener to remove.
Other methods work the same as StreamItemSubscription.

Types

BaseMessage

type BaseMessage = {
  streamName: string
  groupId: string
  id?: string
  timestamp: number
}

StreamEvent

type StreamEvent<TData extends { id: string }> =
  | { type: 'create'; data: TData }
  | { type: 'update'; data: TData }
  | { type: 'delete'; data: TData }
  | { type: 'event'; event: CustomEvent }

ItemStreamEvent

type ItemStreamEvent<TData extends { id: string }> = 
  | StreamEvent<TData> 
  | { type: 'sync'; data: TData }

GroupStreamEvent

type GroupStreamEvent<TData extends { id: string }> = 
  | StreamEvent<TData> 
  | { type: 'sync'; data: TData[] }

CustomEvent

type CustomEvent = {
  type: string
  data: any
}

SocketAdapter

interface SocketAdapter {
  send(message: string): void
  onMessage(callback: (message: string) => void): void
  onOpen(callback: () => void): void
  onClose(callback: () => void): void
  close(): void
  isOpen(): boolean
}

SocketAdapterFactory

type SocketAdapterFactory = () => SocketAdapter

Usage Example

Basic Item Subscription

import { Stream } from '@motiadev/stream-client'

// Create socket adapter factory (example with WebSocket)
const adapterFactory = () => ({
  ws: new WebSocket('wss://your-server.com/stream'),
  send(msg) { this.ws.send(msg) },
  onMessage(cb) { this.ws.onmessage = (e) => cb(e.data) },
  onOpen(cb) { this.ws.onopen = cb },
  onClose(cb) { this.ws.onclose = cb },
  close() { this.ws.close() },
  isOpen() { return this.ws.readyState === WebSocket.OPEN }
})

// Create stream client
const stream = new Stream(adapterFactory)

// Subscribe to a specific item
const subscription = stream.subscribeItem<{ id: string; message: string }>(
  'chat',
  'room-123',
  'message-456'
)

// Listen for changes
subscription.addChangeListener((item) => {
  if (item) {
    console.log('Message updated:', item.message)
  } else {
    console.log('Message deleted')
  }
})

// Get current state
const currentState = subscription.getState()

// Clean up
subscription.close()

Group Subscription with Sorting

interface Message {
  id: string
  text: string
  timestamp: number
}

// Subscribe to all messages in a room
const groupSub = stream.subscribeGroup<Message>(
  'chat',
  'room-123',
  'timestamp' // Sort by timestamp
)

// Listen for any changes in the group
groupSub.addChangeListener((messages) => {
  console.log(`${messages.length} messages in room`)
  messages.forEach(msg => {
    console.log(`${msg.timestamp}: ${msg.text}`)
  })
})

// Get all current messages
const allMessages = groupSub.getState()

Custom Events

const subscription = stream.subscribeItem('notifications', 'user-123', 'alerts')

// Listen for custom events
subscription.onEvent('new-notification', (data) => {
  console.log('New notification:', data)
})

// Remove event listener
const handler = (data) => console.log(data)
subscription.onEvent('custom-event', handler)
subscription.offEvent('custom-event', handler)

React Integration Example

import { useEffect, useState } from 'react'
import { Stream } from '@motiadev/stream-client'

function ChatRoom({ roomId }) {
  const [messages, setMessages] = useState([])
  
  useEffect(() => {
    const stream = new Stream(adapterFactory)
    const subscription = stream.subscribeGroup('chat', roomId, 'timestamp')
    
    subscription.addChangeListener(setMessages)
    
    return () => {
      subscription.close()
      stream.close()
    }
  }, [roomId])
  
  return (
    <div>
      {messages.map(msg => (
        <div key={msg.id}>{msg.text}</div>
      ))}
    </div>
  )
}