Skip to content

WebSockets

Real-time communication using Cloudflare’s WebSocket Hibernation API with Durable Objects.

Prerequisites: OVERVIEW.md, durable-objects.md


WebSockets enable real-time, bidirectional communication between clients and the server. z0 uses WebSockets for scenarios where polling is inefficient or latency-sensitive.

ScenarioUse WebSocketsUse Polling
Real-time dashboardsYes - live call status, budget updatesNo
Live call status updatesYes - call starts, ends, transfersNo
NotificationsYes - alerts, budget warningsNo
Infrequent data fetchesNoYes - every 30+ seconds is fine
Simple read-only dataNoYes - caching works well
Client without WebSocket supportNoYes - fallback option

Rule of thumb: If the user expects to see changes within 1-2 seconds, use WebSockets. If 30-second delays are acceptable, polling is simpler.


Cloudflare’s WebSocket Hibernation API is the preferred approach for WebSocket connections in Workers. It allows Durable Objects to handle WebSocket connections without consuming resources while idle.

Traditional WebSocketsHibernating WebSockets
DO stays active while connection is openDO hibernates between messages
Billed for entire connection durationBilled only when processing messages
Memory consumed continuouslyMemory released during hibernation
Scales poorly with idle connectionsScales to thousands of idle connections

Cost implication: A dashboard with 1000 connected users who receive updates every 10 seconds costs the same as 100 users with hibernation, because you pay for message handling, not connection holding.

1. Client connects via WebSocket
→ Worker upgrades connection
→ Worker forwards to Durable Object
2. DO accepts connection with acceptWebSocket()
→ Connection is now "hibernatable"
→ DO can hibernate (evict from memory)
3. Message arrives from client
→ DO wakes if hibernating
→ webSocketMessage() handler runs
→ DO can hibernate again
4. Server sends message
→ DO wakes if hibernating
→ Sends via WebSocket
→ DO can hibernate again
5. Connection closes
→ webSocketClose() handler runs
→ Resources released
// Inside Durable Object class
export class RealtimeDO extends DurableObject {
// Accept and register WebSocket for hibernation
async fetch(request: Request): Promise<Response> {
const upgradeHeader = request.headers.get('Upgrade');
if (upgradeHeader !== 'websocket') {
return new Response('Expected WebSocket', { status: 426 });
}
const pair = new WebSocketPair();
const [client, server] = Object.values(pair);
// Accept with hibernation - this is the key API
this.ctx.acceptWebSocket(server, ['tag1', 'tag2']); // Tags are optional
return new Response(null, {
status: 101,
webSocket: client,
});
}
// Called when a message arrives (DO wakes from hibernation if needed)
async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
const data = JSON.parse(message as string);
switch (data.type) {
case 'subscribe':
await this.handleSubscribe(ws, data);
break;
case 'unsubscribe':
await this.handleUnsubscribe(ws, data);
break;
case 'ping':
ws.send(JSON.stringify({ type: 'pong' }));
break;
}
}
// Called when connection closes
async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean): Promise<void> {
// Clean up subscriptions for this connection
await this.removeSubscriptions(ws);
}
// Called on connection error
async webSocketError(ws: WebSocket, error: unknown): Promise<void> {
console.error('WebSocket error:', error);
await this.removeSubscriptions(ws);
}
}
// Get all connected WebSockets
const sockets = this.ctx.getWebSockets();
// Get WebSockets by tag
const dashboardSockets = this.ctx.getWebSockets('dashboard');
const tenantSockets = this.ctx.getWebSockets(`tenant:${tenantId}`);
// Broadcast to all
for (const ws of sockets) {
ws.send(JSON.stringify(update));
}

Durable Objects are the natural home for WebSocket connections in z0. The single-threaded nature of DOs eliminates race conditions in connection management.

┌─────────────────────────────────────────────────────────────┐
│ Worker (Stateless) │
│ - Authenticates request │
│ - Routes to appropriate DO │
│ - Upgrades HTTP to WebSocket │
└─────────────────────────┬───────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ RealtimeDO (Per Tenant or Channel) │
│ - Accepts WebSocket connections │
│ - Tracks subscriptions │
│ - Receives events from other DOs │
│ - Fans out updates to connected clients │
└─────────────────────────────────────────────────────────────┘

When an event occurs (call starts, budget changes), the source DO notifies the realtime DO, which fans out to all subscribed clients.

// In AssetDO - when a call starts
async recordCallStart(call: Call): Promise<void> {
// Record the fact
await this.appendFact({
type: 'invocation',
subtype: 'call_started',
data: call,
});
// Notify the realtime DO for this tenant
const realtimeDO = this.env.REALTIME_DO.get(
this.env.REALTIME_DO.idFromName(`tenant:${this.tenantId}`)
);
await realtimeDO.fetch('https://internal/broadcast', {
method: 'POST',
body: JSON.stringify({
event: 'call:started',
data: {
call_id: call.id,
asset_id: this.entityId,
caller: call.caller,
timestamp: Date.now(),
},
}),
});
}
// In RealtimeDO - broadcast to subscribed clients
async handleBroadcast(request: Request): Promise<Response> {
const { event, data } = await request.json();
// Get WebSockets subscribed to this event type
const sockets = this.ctx.getWebSockets(`event:${event}`);
const message = JSON.stringify({
type: 'event',
event,
data,
timestamp: Date.now(),
});
for (const ws of sockets) {
try {
ws.send(message);
} catch (error) {
// Connection may have closed
console.error('Failed to send to WebSocket:', error);
}
}
return new Response('OK');
}

When a client connects, they need the current state before receiving updates.

async handleSubscribe(ws: WebSocket, data: SubscribeMessage): Promise<void> {
const { channel, lastEventId } = data;
// Tag this WebSocket for the subscription
// Note: Tags are set at accept time, so we track in storage
const subscriptions = await this.storage.get<Set<string>>('subscriptions') || new Set();
subscriptions.add(`${ws}:${channel}`);
await this.storage.put('subscriptions', subscriptions);
// Send current state snapshot
const currentState = await this.getCurrentState(channel);
ws.send(JSON.stringify({
type: 'snapshot',
channel,
data: currentState,
lastEventId: currentState.lastEventId,
}));
// If client had a lastEventId, send missed events
if (lastEventId) {
const missedEvents = await this.getEventsSince(channel, lastEventId);
for (const event of missedEvents) {
ws.send(JSON.stringify({
type: 'event',
event: event.type,
data: event.data,
eventId: event.id,
}));
}
}
}

Real-time visibility into active calls across the account.

// Events the dashboard subscribes to:
interface CallDashboardEvents {
'call:started': {
call_id: string;
asset_id: string;
caller: string;
started_at: number;
};
'call:status_changed': {
call_id: string;
status: 'ringing' | 'in_progress' | 'transferring' | 'on_hold';
duration_seconds: number;
};
'call:ended': {
call_id: string;
outcome: 'completed' | 'missed' | 'voicemail' | 'failed';
duration_seconds: number;
charge_amount?: number;
};
}
// Client-side subscription
socket.send(JSON.stringify({
type: 'subscribe',
channel: 'calls',
filters: {
asset_ids: ['ast_001', 'ast_002'], // Optional: filter to specific assets
},
}));

Real-time notifications when budgets approach or exceed limits.

// Events for budget monitoring:
interface BudgetAlertEvents {
'budget:threshold_approaching': {
account_id: string;
budget_type: 'daily' | 'monthly' | 'total';
current_spend: number;
limit: number;
percentage: number; // 80%, 90%, etc.
};
'budget:exceeded': {
account_id: string;
budget_type: 'daily' | 'monthly' | 'total';
current_spend: number;
limit: number;
calls_blocked: number; // Calls rejected due to budget
};
'budget:reset': {
account_id: string;
budget_type: 'daily' | 'monthly';
new_limit: number;
};
}
// In AccountDO - after charging
async processCharge(charge: Charge): Promise<void> {
await this.appendFact({
type: 'charge',
data: charge,
});
const budgetState = await this.updateBudgetState(charge);
// Check thresholds
for (const threshold of [0.8, 0.9, 0.95]) {
if (budgetState.percentage >= threshold &&
budgetState.lastThresholdNotified < threshold) {
await this.notifyBudgetThreshold(budgetState, threshold);
}
}
if (budgetState.percentage >= 1.0) {
await this.notifyBudgetExceeded(budgetState);
}
}

Live feed of agent decisions and actions for monitoring autonomous operations.

// Events for agent activity:
interface AgentActivityEvents {
'agent:decision': {
agent_id: string;
decision_type: 'route' | 'qualify' | 'respond' | 'escalate';
confidence: number;
reasoning: string;
action_taken: string;
};
'agent:action': {
agent_id: string;
action: 'called_tool' | 'sent_message' | 'created_record';
tool_name?: string;
success: boolean;
};
'agent:escalation': {
agent_id: string;
reason: 'low_confidence' | 'policy_violation' | 'user_request';
context: Record<string, unknown>;
};
}

All WebSocket messages use a consistent JSON format.

// Client to Server messages
interface ClientMessage {
type: 'subscribe' | 'unsubscribe' | 'ping' | 'auth';
id?: string; // Optional request ID for correlation
channel?: string;
data?: Record<string, unknown>;
}
// Server to Client messages
interface ServerMessage {
type: 'event' | 'snapshot' | 'error' | 'pong' | 'subscribed' | 'unsubscribed';
id?: string; // Correlation ID if responding to a request
event?: string;
channel?: string;
data?: Record<string, unknown>;
timestamp: number;
}
// Error message
interface ErrorMessage {
type: 'error';
id?: string;
code: string;
message: string;
details?: Record<string, unknown>;
}
// Subscribe to a channel
{
type: 'subscribe',
id: 'req_001',
channel: 'calls',
data: {
filters: { asset_ids: ['ast_001'] },
lastEventId: 'evt_123' // For resuming after disconnect
}
}
// Server confirms subscription
{
type: 'subscribed',
id: 'req_001',
channel: 'calls',
timestamp: 1705500000000
}
// Unsubscribe from a channel
{
type: 'unsubscribe',
channel: 'calls'
}
// Event pushed from server
{
type: 'event',
event: 'call:started',
channel: 'calls',
data: {
call_id: 'call_001',
asset_id: 'ast_001',
caller: '+15551234567'
},
timestamp: 1705500001000
}

Authenticate before subscribing to channels.

// Option 1: Auth in initial HTTP request (preferred)
const socket = new WebSocket('wss://api.web1.co/realtime', {
headers: {
'Authorization': `Bearer ${apiKey}`,
},
});
// Option 2: Auth message after connection
socket.onopen = () => {
socket.send(JSON.stringify({
type: 'auth',
data: {
token: apiKey,
},
}));
};
// Server validates and responds
async webSocketMessage(ws: WebSocket, message: string): Promise<void> {
const msg = JSON.parse(message);
if (msg.type === 'auth') {
const auth = await this.validateToken(msg.data.token);
if (auth) {
// Store auth context with connection
ws.serializeAttachment({ tenantId: auth.tenantId, userId: auth.userId });
ws.send(JSON.stringify({ type: 'authenticated', timestamp: Date.now() }));
} else {
ws.send(JSON.stringify({ type: 'error', code: 'unauthorized', message: 'Invalid token' }));
ws.close(4001, 'Unauthorized');
}
return;
}
// Check authentication on subsequent messages
const attachment = ws.deserializeAttachment();
if (!attachment?.tenantId) {
ws.send(JSON.stringify({ type: 'error', code: 'not_authenticated', message: 'Authenticate first' }));
return;
}
// Process authenticated message
await this.handleAuthenticatedMessage(ws, msg, attachment);
}

Each tenant gets their own realtime DO to isolate load.

// Worker routes to tenant-specific DO
async function handleWebSocketUpgrade(
request: Request,
env: Env,
tenantId: string
): Promise<Response> {
const doId = env.REALTIME_DO.idFromName(`tenant:${tenantId}`);
const stub = env.REALTIME_DO.get(doId);
return stub.fetch(request);
}

For high-volume tenants, further shard by channel:

// Shard by channel type
const doId = env.REALTIME_DO.idFromName(`tenant:${tenantId}:channel:${channelType}`);
// Or shard by numeric hash for even distribution
const shardId = hashTenantId(tenantId) % NUM_SHARDS;
const doId = env.REALTIME_DO.idFromName(`tenant:${tenantId}:shard:${shardId}`);
// Broadcast to all subscribers of a channel
async broadcast(channel: string, event: string, data: unknown): Promise<void> {
const sockets = this.ctx.getWebSockets(`channel:${channel}`);
const message = JSON.stringify({ type: 'event', event, channel, data, timestamp: Date.now() });
for (const ws of sockets) {
ws.send(message);
}
}
// Targeted message to specific connection
async sendToUser(userId: string, event: string, data: unknown): Promise<void> {
const sockets = this.ctx.getWebSockets(`user:${userId}`);
const message = JSON.stringify({ type: 'event', event, data, timestamp: Date.now() });
for (const ws of sockets) {
ws.send(message);
}
}
// Filtered broadcast (check each connection)
async broadcastFiltered(
channel: string,
event: string,
data: unknown,
filter: (attachment: unknown) => boolean
): Promise<void> {
const sockets = this.ctx.getWebSockets(`channel:${channel}`);
const message = JSON.stringify({ type: 'event', event, channel, data, timestamp: Date.now() });
for (const ws of sockets) {
const attachment = ws.deserializeAttachment();
if (filter(attachment)) {
ws.send(message);
}
}
}

Clients should handle disconnection gracefully with exponential backoff.

// Client-side reconnection logic
class ReconnectingWebSocket {
private socket: WebSocket | null = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 10;
private baseDelay = 1000; // 1 second
private lastEventId: string | null = null;
private subscriptions: Set<string> = new Set();
connect(): void {
this.socket = new WebSocket(this.url);
this.socket.onopen = () => {
this.reconnectAttempts = 0;
this.authenticate();
this.resubscribe();
};
this.socket.onclose = (event) => {
if (!event.wasClean && this.reconnectAttempts < this.maxReconnectAttempts) {
this.scheduleReconnect();
}
};
this.socket.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.eventId) {
this.lastEventId = data.eventId;
}
this.onMessage(data);
};
}
private scheduleReconnect(): void {
const delay = this.baseDelay * Math.pow(2, this.reconnectAttempts);
const jitter = Math.random() * 1000;
setTimeout(() => {
this.reconnectAttempts++;
this.connect();
}, delay + jitter);
}
private resubscribe(): void {
for (const channel of this.subscriptions) {
this.socket?.send(JSON.stringify({
type: 'subscribe',
channel,
data: { lastEventId: this.lastEventId },
}));
}
}
}

src/durable-objects/realtime.ts
import { DurableObject } from 'cloudflare:workers';
interface Env {
REALTIME_DO: DurableObjectNamespace;
}
interface ClientAttachment {
tenantId: string;
userId?: string;
subscribedChannels: string[];
}
export class RealtimeDO extends DurableObject {
private eventBuffer: Map<string, Array<{ id: string; event: string; data: unknown; timestamp: number }>> = new Map();
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
// Internal broadcast endpoint
if (url.pathname === '/broadcast') {
return this.handleBroadcast(request);
}
// WebSocket upgrade
const upgradeHeader = request.headers.get('Upgrade');
if (upgradeHeader !== 'websocket') {
return new Response('Expected WebSocket', { status: 426 });
}
// Authenticate from header
const authHeader = request.headers.get('Authorization');
const auth = await this.authenticateFromHeader(authHeader);
if (!auth) {
return new Response('Unauthorized', { status: 401 });
}
// Create WebSocket pair
const pair = new WebSocketPair();
const [client, server] = Object.values(pair);
// Accept with hibernation, tag with tenant
this.ctx.acceptWebSocket(server, [`tenant:${auth.tenantId}`]);
// Attach auth context
server.serializeAttachment({
tenantId: auth.tenantId,
userId: auth.userId,
subscribedChannels: [],
} as ClientAttachment);
return new Response(null, {
status: 101,
webSocket: client,
});
}
async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
const attachment = ws.deserializeAttachment() as ClientAttachment;
if (!attachment?.tenantId) {
ws.close(4001, 'Not authenticated');
return;
}
const msg = JSON.parse(message as string);
switch (msg.type) {
case 'subscribe':
await this.handleSubscribe(ws, attachment, msg);
break;
case 'unsubscribe':
await this.handleUnsubscribe(ws, attachment, msg);
break;
case 'ping':
ws.send(JSON.stringify({ type: 'pong', timestamp: Date.now() }));
break;
default:
ws.send(JSON.stringify({
type: 'error',
code: 'unknown_message_type',
message: `Unknown message type: ${msg.type}`,
}));
}
}
async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean): Promise<void> {
const attachment = ws.deserializeAttachment() as ClientAttachment;
console.log(`WebSocket closed: tenant=${attachment?.tenantId}, code=${code}, clean=${wasClean}`);
}
async webSocketError(ws: WebSocket, error: unknown): Promise<void> {
console.error('WebSocket error:', error);
}
private async handleSubscribe(
ws: WebSocket,
attachment: ClientAttachment,
msg: { channel: string; data?: { lastEventId?: string; filters?: Record<string, unknown> } }
): Promise<void> {
const { channel } = msg;
// Validate channel access
if (!this.canAccessChannel(attachment.tenantId, channel)) {
ws.send(JSON.stringify({
type: 'error',
code: 'forbidden',
message: `Cannot access channel: ${channel}`,
}));
return;
}
// Update attachment with subscription
attachment.subscribedChannels.push(channel);
ws.serializeAttachment(attachment);
// Send current state snapshot
const snapshot = await this.getChannelSnapshot(channel);
ws.send(JSON.stringify({
type: 'snapshot',
channel,
data: snapshot.data,
lastEventId: snapshot.lastEventId,
timestamp: Date.now(),
}));
// Send missed events if resuming
if (msg.data?.lastEventId) {
const missedEvents = await this.getEventsSince(channel, msg.data.lastEventId);
for (const event of missedEvents) {
ws.send(JSON.stringify({
type: 'event',
event: event.event,
channel,
data: event.data,
eventId: event.id,
timestamp: event.timestamp,
}));
}
}
ws.send(JSON.stringify({
type: 'subscribed',
channel,
timestamp: Date.now(),
}));
}
private async handleUnsubscribe(
ws: WebSocket,
attachment: ClientAttachment,
msg: { channel: string }
): Promise<void> {
attachment.subscribedChannels = attachment.subscribedChannels.filter(c => c !== msg.channel);
ws.serializeAttachment(attachment);
ws.send(JSON.stringify({
type: 'unsubscribed',
channel: msg.channel,
timestamp: Date.now(),
}));
}
private async handleBroadcast(request: Request): Promise<Response> {
const { channel, event, data } = await request.json() as {
channel: string;
event: string;
data: unknown;
};
const eventId = `evt_${Date.now()}_${Math.random().toString(36).slice(2)}`;
const timestamp = Date.now();
// Buffer event for replay
if (!this.eventBuffer.has(channel)) {
this.eventBuffer.set(channel, []);
}
const buffer = this.eventBuffer.get(channel)!;
buffer.push({ id: eventId, event, data, timestamp });
// Keep last 100 events per channel
if (buffer.length > 100) {
buffer.shift();
}
// Broadcast to subscribed connections
const sockets = this.ctx.getWebSockets();
const message = JSON.stringify({
type: 'event',
event,
channel,
data,
eventId,
timestamp,
});
let sent = 0;
for (const ws of sockets) {
const attachment = ws.deserializeAttachment() as ClientAttachment;
if (attachment?.subscribedChannels.includes(channel)) {
ws.send(message);
sent++;
}
}
return Response.json({ sent, eventId });
}
private async authenticateFromHeader(authHeader: string | null): Promise<{ tenantId: string; userId?: string } | null> {
if (!authHeader?.startsWith('Bearer ')) {
return null;
}
const token = authHeader.slice(7);
// Validate token and extract tenant/user
// Implementation depends on auth system
return { tenantId: 'tenant_001' }; // Placeholder
}
private canAccessChannel(tenantId: string, channel: string): boolean {
// Implement channel access control
return true;
}
private async getChannelSnapshot(channel: string): Promise<{ data: unknown; lastEventId: string }> {
// Fetch current state for channel
const buffer = this.eventBuffer.get(channel) || [];
return {
data: {},
lastEventId: buffer.length > 0 ? buffer[buffer.length - 1].id : '',
};
}
private async getEventsSince(channel: string, lastEventId: string): Promise<Array<{ id: string; event: string; data: unknown; timestamp: number }>> {
const buffer = this.eventBuffer.get(channel) || [];
const index = buffer.findIndex(e => e.id === lastEventId);
if (index === -1) {
return buffer; // Return all if lastEventId not found
}
return buffer.slice(index + 1);
}
}
src/lib/websocket-client.ts
type MessageHandler = (data: unknown) => void;
interface WebSocketClientOptions {
url: string;
token: string;
onConnect?: () => void;
onDisconnect?: (wasClean: boolean) => void;
onError?: (error: Error) => void;
}
export class WebSocketClient {
private socket: WebSocket | null = null;
private handlers: Map<string, Set<MessageHandler>> = new Map();
private subscriptions: Map<string, { lastEventId?: string }> = new Map();
private reconnectAttempts = 0;
private reconnectTimeout: ReturnType<typeof setTimeout> | null = null;
private authenticated = false;
constructor(private options: WebSocketClientOptions) {}
connect(): void {
if (this.socket?.readyState === WebSocket.OPEN) {
return;
}
this.socket = new WebSocket(this.options.url);
this.socket.onopen = () => {
this.reconnectAttempts = 0;
this.authenticate();
};
this.socket.onclose = (event) => {
this.authenticated = false;
this.options.onDisconnect?.(event.wasClean);
if (!event.wasClean) {
this.scheduleReconnect();
}
};
this.socket.onerror = () => {
this.options.onError?.(new Error('WebSocket error'));
};
this.socket.onmessage = (event) => {
this.handleMessage(JSON.parse(event.data));
};
}
disconnect(): void {
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
this.socket?.close(1000, 'Client disconnect');
this.socket = null;
}
subscribe(channel: string, handler: MessageHandler, lastEventId?: string): () => void {
// Register handler
if (!this.handlers.has(channel)) {
this.handlers.set(channel, new Set());
}
this.handlers.get(channel)!.add(handler);
// Track subscription
this.subscriptions.set(channel, { lastEventId });
// Send subscribe if connected
if (this.authenticated) {
this.send({
type: 'subscribe',
channel,
data: { lastEventId },
});
}
// Return unsubscribe function
return () => {
this.handlers.get(channel)?.delete(handler);
if (this.handlers.get(channel)?.size === 0) {
this.handlers.delete(channel);
this.subscriptions.delete(channel);
this.send({ type: 'unsubscribe', channel });
}
};
}
private authenticate(): void {
this.send({
type: 'auth',
data: { token: this.options.token },
});
}
private handleMessage(msg: Record<string, unknown>): void {
switch (msg.type) {
case 'authenticated':
this.authenticated = true;
this.options.onConnect?.();
this.resubscribeAll();
break;
case 'event':
case 'snapshot':
const channel = msg.channel as string;
const handlers = this.handlers.get(channel);
if (handlers) {
for (const handler of handlers) {
handler(msg);
}
}
// Track last event ID
if (msg.eventId && this.subscriptions.has(channel)) {
this.subscriptions.get(channel)!.lastEventId = msg.eventId as string;
}
break;
case 'error':
console.error('WebSocket error:', msg);
break;
case 'pong':
// Heartbeat response
break;
}
}
private resubscribeAll(): void {
for (const [channel, { lastEventId }] of this.subscriptions) {
this.send({
type: 'subscribe',
channel,
data: { lastEventId },
});
}
}
private send(data: unknown): void {
if (this.socket?.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify(data));
}
}
private scheduleReconnect(): void {
const maxAttempts = 10;
if (this.reconnectAttempts >= maxAttempts) {
this.options.onError?.(new Error('Max reconnection attempts reached'));
return;
}
const baseDelay = 1000;
const delay = baseDelay * Math.pow(2, this.reconnectAttempts) + Math.random() * 1000;
this.reconnectTimeout = setTimeout(() => {
this.reconnectAttempts++;
this.connect();
}, delay);
}
}
src/hooks/useRealtimeChannel.ts
import { useEffect, useState, useCallback, useRef } from 'react';
import { WebSocketClient } from '../lib/websocket-client';
interface UseRealtimeChannelOptions {
url: string;
token: string;
channel: string;
enabled?: boolean;
}
interface ChannelState<T> {
data: T | null;
isConnected: boolean;
isLoading: boolean;
error: Error | null;
}
export function useRealtimeChannel<T>({
url,
token,
channel,
enabled = true,
}: UseRealtimeChannelOptions): ChannelState<T> {
const [state, setState] = useState<ChannelState<T>>({
data: null,
isConnected: false,
isLoading: true,
error: null,
});
const clientRef = useRef<WebSocketClient | null>(null);
useEffect(() => {
if (!enabled) {
return;
}
const client = new WebSocketClient({
url,
token,
onConnect: () => {
setState(s => ({ ...s, isConnected: true, error: null }));
},
onDisconnect: (wasClean) => {
setState(s => ({ ...s, isConnected: false }));
if (!wasClean) {
setState(s => ({ ...s, error: new Error('Connection lost') }));
}
},
onError: (error) => {
setState(s => ({ ...s, error }));
},
});
clientRef.current = client;
client.connect();
const unsubscribe = client.subscribe(channel, (msg: any) => {
if (msg.type === 'snapshot') {
setState(s => ({ ...s, data: msg.data, isLoading: false }));
} else if (msg.type === 'event') {
// Merge event into current state
setState(s => ({
...s,
data: mergeEvent(s.data, msg.event, msg.data),
}));
}
});
return () => {
unsubscribe();
client.disconnect();
};
}, [url, token, channel, enabled]);
return state;
}
// Example: merge function for call dashboard
function mergeEvent<T>(currentData: T | null, event: string, eventData: any): T {
if (!currentData) {
return eventData as T;
}
// Implement event-specific merge logic
// This depends on your data structure
return {
...currentData,
...eventData,
};
}
// Usage in component
function CallDashboard() {
const { data, isConnected, isLoading, error } = useRealtimeChannel<{
activeCalls: Call[];
recentCalls: Call[];
}>({
url: 'wss://api.web1.co/realtime',
token: apiKey,
channel: 'calls',
});
if (isLoading) {
return <LoadingSpinner />;
}
if (error) {
return <ErrorMessage error={error} />;
}
return (
<div>
<ConnectionStatus connected={isConnected} />
<ActiveCallsList calls={data?.activeCalls || []} />
<RecentCallsList calls={data?.recentCalls || []} />
</div>
);
}

LimitValueMitigation
Connections per DO32,768Shard by tenant/channel
WebSocket message size1 MBPaginate large payloads
Hibernation durationUnlimitedN/A
Messages per second per DO~10,000Shard high-volume channels
FactorRecommendation
Message frequencyBatch rapid updates (e.g., debounce to 100ms)
Payload sizeKeep messages small; send IDs, not full objects
Fan-outFor 1000+ subscribers, consider pub/sub patterns
State snapshotsPaginate large initial payloads

Conceptz0 Implementation
When to useReal-time dashboards, live status, notifications
Hibernation APIacceptWebSocket() for cost-efficient idle connections
DO coordinationPer-tenant RealtimeDO manages connections and broadcasts
Message protocolJSON with type, event, channel, data, timestamp
AuthenticationToken in header or auth message after connect
ScalingShard by tenant, filter broadcasts, buffer for replay
Client handlingReconnect with backoff, track lastEventId for resume

WebSockets with Durable Objects enable z0 to provide real-time visibility into economic activity as it happens, from call status to budget alerts to agent decisions.