WebSockets
Real-time communication using Cloudflare’s WebSocket Hibernation API with Durable Objects.
Prerequisites: OVERVIEW.md, durable-objects.md
Overview
Section titled “Overview”WebSockets enable real-time, bidirectional communication between clients and the server. z0 uses WebSockets for scenarios where polling is inefficient or latency-sensitive.
When to Use WebSockets vs Polling
Section titled “When to Use WebSockets vs Polling”| Scenario | Use WebSockets | Use Polling |
|---|---|---|
| Real-time dashboards | Yes - live call status, budget updates | No |
| Live call status updates | Yes - call starts, ends, transfers | No |
| Notifications | Yes - alerts, budget warnings | No |
| Infrequent data fetches | No | Yes - every 30+ seconds is fine |
| Simple read-only data | No | Yes - caching works well |
| Client without WebSocket support | No | Yes - fallback option |
Rule of thumb: If the user expects to see changes within 1-2 seconds, use WebSockets. If 30-second delays are acceptable, polling is simpler.
WebSocket Hibernation API
Section titled “WebSocket Hibernation API”Cloudflare’s WebSocket Hibernation API is the preferred approach for WebSocket connections in Workers. It allows Durable Objects to handle WebSocket connections without consuming resources while idle.
Why Hibernation
Section titled “Why Hibernation”| Traditional WebSockets | Hibernating WebSockets |
|---|---|
| DO stays active while connection is open | DO hibernates between messages |
| Billed for entire connection duration | Billed only when processing messages |
| Memory consumed continuously | Memory released during hibernation |
| Scales poorly with idle connections | Scales to thousands of idle connections |
Cost implication: A dashboard with 1000 connected users who receive updates every 10 seconds costs the same as 100 users with hibernation, because you pay for message handling, not connection holding.
Hibernation Lifecycle
Section titled “Hibernation Lifecycle”1. Client connects via WebSocket → Worker upgrades connection → Worker forwards to Durable Object
2. DO accepts connection with acceptWebSocket() → Connection is now "hibernatable" → DO can hibernate (evict from memory)
3. Message arrives from client → DO wakes if hibernating → webSocketMessage() handler runs → DO can hibernate again
4. Server sends message → DO wakes if hibernating → Sends via WebSocket → DO can hibernate again
5. Connection closes → webSocketClose() handler runs → Resources releasedCore API Methods
Section titled “Core API Methods”// Inside Durable Object classexport class RealtimeDO extends DurableObject { // Accept and register WebSocket for hibernation async fetch(request: Request): Promise<Response> { const upgradeHeader = request.headers.get('Upgrade'); if (upgradeHeader !== 'websocket') { return new Response('Expected WebSocket', { status: 426 }); }
const pair = new WebSocketPair(); const [client, server] = Object.values(pair);
// Accept with hibernation - this is the key API this.ctx.acceptWebSocket(server, ['tag1', 'tag2']); // Tags are optional
return new Response(null, { status: 101, webSocket: client, }); }
// Called when a message arrives (DO wakes from hibernation if needed) async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> { const data = JSON.parse(message as string);
switch (data.type) { case 'subscribe': await this.handleSubscribe(ws, data); break; case 'unsubscribe': await this.handleUnsubscribe(ws, data); break; case 'ping': ws.send(JSON.stringify({ type: 'pong' })); break; } }
// Called when connection closes async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean): Promise<void> { // Clean up subscriptions for this connection await this.removeSubscriptions(ws); }
// Called on connection error async webSocketError(ws: WebSocket, error: unknown): Promise<void> { console.error('WebSocket error:', error); await this.removeSubscriptions(ws); }}Retrieving Connected WebSockets
Section titled “Retrieving Connected WebSockets”// Get all connected WebSocketsconst sockets = this.ctx.getWebSockets();
// Get WebSockets by tagconst dashboardSockets = this.ctx.getWebSockets('dashboard');const tenantSockets = this.ctx.getWebSockets(`tenant:${tenantId}`);
// Broadcast to allfor (const ws of sockets) { ws.send(JSON.stringify(update));}Durable Objects + WebSockets Pattern
Section titled “Durable Objects + WebSockets Pattern”Durable Objects are the natural home for WebSocket connections in z0. The single-threaded nature of DOs eliminates race conditions in connection management.
DO as WebSocket Coordinator
Section titled “DO as WebSocket Coordinator”┌─────────────────────────────────────────────────────────────┐│ Worker (Stateless) ││ - Authenticates request ││ - Routes to appropriate DO ││ - Upgrades HTTP to WebSocket │└─────────────────────────┬───────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────┐│ RealtimeDO (Per Tenant or Channel) ││ - Accepts WebSocket connections ││ - Tracks subscriptions ││ - Receives events from other DOs ││ - Fans out updates to connected clients │└─────────────────────────────────────────────────────────────┘Fan-Out Pattern
Section titled “Fan-Out Pattern”When an event occurs (call starts, budget changes), the source DO notifies the realtime DO, which fans out to all subscribed clients.
// In AssetDO - when a call startsasync recordCallStart(call: Call): Promise<void> { // Record the fact await this.appendFact({ type: 'invocation', subtype: 'call_started', data: call, });
// Notify the realtime DO for this tenant const realtimeDO = this.env.REALTIME_DO.get( this.env.REALTIME_DO.idFromName(`tenant:${this.tenantId}`) );
await realtimeDO.fetch('https://internal/broadcast', { method: 'POST', body: JSON.stringify({ event: 'call:started', data: { call_id: call.id, asset_id: this.entityId, caller: call.caller, timestamp: Date.now(), }, }), });}
// In RealtimeDO - broadcast to subscribed clientsasync handleBroadcast(request: Request): Promise<Response> { const { event, data } = await request.json();
// Get WebSockets subscribed to this event type const sockets = this.ctx.getWebSockets(`event:${event}`);
const message = JSON.stringify({ type: 'event', event, data, timestamp: Date.now(), });
for (const ws of sockets) { try { ws.send(message); } catch (error) { // Connection may have closed console.error('Failed to send to WebSocket:', error); } }
return new Response('OK');}State Synchronization
Section titled “State Synchronization”When a client connects, they need the current state before receiving updates.
async handleSubscribe(ws: WebSocket, data: SubscribeMessage): Promise<void> { const { channel, lastEventId } = data;
// Tag this WebSocket for the subscription // Note: Tags are set at accept time, so we track in storage const subscriptions = await this.storage.get<Set<string>>('subscriptions') || new Set(); subscriptions.add(`${ws}:${channel}`); await this.storage.put('subscriptions', subscriptions);
// Send current state snapshot const currentState = await this.getCurrentState(channel); ws.send(JSON.stringify({ type: 'snapshot', channel, data: currentState, lastEventId: currentState.lastEventId, }));
// If client had a lastEventId, send missed events if (lastEventId) { const missedEvents = await this.getEventsSince(channel, lastEventId); for (const event of missedEvents) { ws.send(JSON.stringify({ type: 'event', event: event.type, data: event.data, eventId: event.id, })); } }}z0 Use Cases
Section titled “z0 Use Cases”Live Call Dashboard
Section titled “Live Call Dashboard”Real-time visibility into active calls across the account.
// Events the dashboard subscribes to:interface CallDashboardEvents { 'call:started': { call_id: string; asset_id: string; caller: string; started_at: number; }; 'call:status_changed': { call_id: string; status: 'ringing' | 'in_progress' | 'transferring' | 'on_hold'; duration_seconds: number; }; 'call:ended': { call_id: string; outcome: 'completed' | 'missed' | 'voicemail' | 'failed'; duration_seconds: number; charge_amount?: number; };}
// Client-side subscriptionsocket.send(JSON.stringify({ type: 'subscribe', channel: 'calls', filters: { asset_ids: ['ast_001', 'ast_002'], // Optional: filter to specific assets },}));Budget Alerts
Section titled “Budget Alerts”Real-time notifications when budgets approach or exceed limits.
// Events for budget monitoring:interface BudgetAlertEvents { 'budget:threshold_approaching': { account_id: string; budget_type: 'daily' | 'monthly' | 'total'; current_spend: number; limit: number; percentage: number; // 80%, 90%, etc. }; 'budget:exceeded': { account_id: string; budget_type: 'daily' | 'monthly' | 'total'; current_spend: number; limit: number; calls_blocked: number; // Calls rejected due to budget }; 'budget:reset': { account_id: string; budget_type: 'daily' | 'monthly'; new_limit: number; };}
// In AccountDO - after chargingasync processCharge(charge: Charge): Promise<void> { await this.appendFact({ type: 'charge', data: charge, });
const budgetState = await this.updateBudgetState(charge);
// Check thresholds for (const threshold of [0.8, 0.9, 0.95]) { if (budgetState.percentage >= threshold && budgetState.lastThresholdNotified < threshold) { await this.notifyBudgetThreshold(budgetState, threshold); } }
if (budgetState.percentage >= 1.0) { await this.notifyBudgetExceeded(budgetState); }}Agent Activity Feed
Section titled “Agent Activity Feed”Live feed of agent decisions and actions for monitoring autonomous operations.
// Events for agent activity:interface AgentActivityEvents { 'agent:decision': { agent_id: string; decision_type: 'route' | 'qualify' | 'respond' | 'escalate'; confidence: number; reasoning: string; action_taken: string; }; 'agent:action': { agent_id: string; action: 'called_tool' | 'sent_message' | 'created_record'; tool_name?: string; success: boolean; }; 'agent:escalation': { agent_id: string; reason: 'low_confidence' | 'policy_violation' | 'user_request'; context: Record<string, unknown>; };}Message Protocol
Section titled “Message Protocol”JSON Message Format
Section titled “JSON Message Format”All WebSocket messages use a consistent JSON format.
// Client to Server messagesinterface ClientMessage { type: 'subscribe' | 'unsubscribe' | 'ping' | 'auth'; id?: string; // Optional request ID for correlation channel?: string; data?: Record<string, unknown>;}
// Server to Client messagesinterface ServerMessage { type: 'event' | 'snapshot' | 'error' | 'pong' | 'subscribed' | 'unsubscribed'; id?: string; // Correlation ID if responding to a request event?: string; channel?: string; data?: Record<string, unknown>; timestamp: number;}
// Error messageinterface ErrorMessage { type: 'error'; id?: string; code: string; message: string; details?: Record<string, unknown>;}Event Types
Section titled “Event Types”// Subscribe to a channel{ type: 'subscribe', id: 'req_001', channel: 'calls', data: { filters: { asset_ids: ['ast_001'] }, lastEventId: 'evt_123' // For resuming after disconnect }}
// Server confirms subscription{ type: 'subscribed', id: 'req_001', channel: 'calls', timestamp: 1705500000000}
// Unsubscribe from a channel{ type: 'unsubscribe', channel: 'calls'}
// Event pushed from server{ type: 'event', event: 'call:started', channel: 'calls', data: { call_id: 'call_001', asset_id: 'ast_001', caller: '+15551234567' }, timestamp: 1705500001000}Authentication Over WebSocket
Section titled “Authentication Over WebSocket”Authenticate before subscribing to channels.
// Option 1: Auth in initial HTTP request (preferred)const socket = new WebSocket('wss://api.web1.co/realtime', { headers: { 'Authorization': `Bearer ${apiKey}`, },});
// Option 2: Auth message after connectionsocket.onopen = () => { socket.send(JSON.stringify({ type: 'auth', data: { token: apiKey, }, }));};
// Server validates and respondsasync webSocketMessage(ws: WebSocket, message: string): Promise<void> { const msg = JSON.parse(message);
if (msg.type === 'auth') { const auth = await this.validateToken(msg.data.token); if (auth) { // Store auth context with connection ws.serializeAttachment({ tenantId: auth.tenantId, userId: auth.userId }); ws.send(JSON.stringify({ type: 'authenticated', timestamp: Date.now() })); } else { ws.send(JSON.stringify({ type: 'error', code: 'unauthorized', message: 'Invalid token' })); ws.close(4001, 'Unauthorized'); } return; }
// Check authentication on subsequent messages const attachment = ws.deserializeAttachment(); if (!attachment?.tenantId) { ws.send(JSON.stringify({ type: 'error', code: 'not_authenticated', message: 'Authenticate first' })); return; }
// Process authenticated message await this.handleAuthenticatedMessage(ws, msg, attachment);}Scaling Patterns
Section titled “Scaling Patterns”Sharding by Tenant
Section titled “Sharding by Tenant”Each tenant gets their own realtime DO to isolate load.
// Worker routes to tenant-specific DOasync function handleWebSocketUpgrade( request: Request, env: Env, tenantId: string): Promise<Response> { const doId = env.REALTIME_DO.idFromName(`tenant:${tenantId}`); const stub = env.REALTIME_DO.get(doId); return stub.fetch(request);}For high-volume tenants, further shard by channel:
// Shard by channel typeconst doId = env.REALTIME_DO.idFromName(`tenant:${tenantId}:channel:${channelType}`);
// Or shard by numeric hash for even distributionconst shardId = hashTenantId(tenantId) % NUM_SHARDS;const doId = env.REALTIME_DO.idFromName(`tenant:${tenantId}:shard:${shardId}`);Broadcast vs Targeted Messages
Section titled “Broadcast vs Targeted Messages”// Broadcast to all subscribers of a channelasync broadcast(channel: string, event: string, data: unknown): Promise<void> { const sockets = this.ctx.getWebSockets(`channel:${channel}`); const message = JSON.stringify({ type: 'event', event, channel, data, timestamp: Date.now() });
for (const ws of sockets) { ws.send(message); }}
// Targeted message to specific connectionasync sendToUser(userId: string, event: string, data: unknown): Promise<void> { const sockets = this.ctx.getWebSockets(`user:${userId}`); const message = JSON.stringify({ type: 'event', event, data, timestamp: Date.now() });
for (const ws of sockets) { ws.send(message); }}
// Filtered broadcast (check each connection)async broadcastFiltered( channel: string, event: string, data: unknown, filter: (attachment: unknown) => boolean): Promise<void> { const sockets = this.ctx.getWebSockets(`channel:${channel}`); const message = JSON.stringify({ type: 'event', event, channel, data, timestamp: Date.now() });
for (const ws of sockets) { const attachment = ws.deserializeAttachment(); if (filter(attachment)) { ws.send(message); } }}Handling Reconnection
Section titled “Handling Reconnection”Clients should handle disconnection gracefully with exponential backoff.
// Client-side reconnection logicclass ReconnectingWebSocket { private socket: WebSocket | null = null; private reconnectAttempts = 0; private maxReconnectAttempts = 10; private baseDelay = 1000; // 1 second private lastEventId: string | null = null; private subscriptions: Set<string> = new Set();
connect(): void { this.socket = new WebSocket(this.url);
this.socket.onopen = () => { this.reconnectAttempts = 0; this.authenticate(); this.resubscribe(); };
this.socket.onclose = (event) => { if (!event.wasClean && this.reconnectAttempts < this.maxReconnectAttempts) { this.scheduleReconnect(); } };
this.socket.onmessage = (event) => { const data = JSON.parse(event.data); if (data.eventId) { this.lastEventId = data.eventId; } this.onMessage(data); }; }
private scheduleReconnect(): void { const delay = this.baseDelay * Math.pow(2, this.reconnectAttempts); const jitter = Math.random() * 1000;
setTimeout(() => { this.reconnectAttempts++; this.connect(); }, delay + jitter); }
private resubscribe(): void { for (const channel of this.subscriptions) { this.socket?.send(JSON.stringify({ type: 'subscribe', channel, data: { lastEventId: this.lastEventId }, })); } }}Code Examples
Section titled “Code Examples”Durable Object with WebSocket Hibernation
Section titled “Durable Object with WebSocket Hibernation”import { DurableObject } from 'cloudflare:workers';
interface Env { REALTIME_DO: DurableObjectNamespace;}
interface ClientAttachment { tenantId: string; userId?: string; subscribedChannels: string[];}
export class RealtimeDO extends DurableObject { private eventBuffer: Map<string, Array<{ id: string; event: string; data: unknown; timestamp: number }>> = new Map();
async fetch(request: Request): Promise<Response> { const url = new URL(request.url);
// Internal broadcast endpoint if (url.pathname === '/broadcast') { return this.handleBroadcast(request); }
// WebSocket upgrade const upgradeHeader = request.headers.get('Upgrade'); if (upgradeHeader !== 'websocket') { return new Response('Expected WebSocket', { status: 426 }); }
// Authenticate from header const authHeader = request.headers.get('Authorization'); const auth = await this.authenticateFromHeader(authHeader); if (!auth) { return new Response('Unauthorized', { status: 401 }); }
// Create WebSocket pair const pair = new WebSocketPair(); const [client, server] = Object.values(pair);
// Accept with hibernation, tag with tenant this.ctx.acceptWebSocket(server, [`tenant:${auth.tenantId}`]);
// Attach auth context server.serializeAttachment({ tenantId: auth.tenantId, userId: auth.userId, subscribedChannels: [], } as ClientAttachment);
return new Response(null, { status: 101, webSocket: client, }); }
async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> { const attachment = ws.deserializeAttachment() as ClientAttachment; if (!attachment?.tenantId) { ws.close(4001, 'Not authenticated'); return; }
const msg = JSON.parse(message as string);
switch (msg.type) { case 'subscribe': await this.handleSubscribe(ws, attachment, msg); break;
case 'unsubscribe': await this.handleUnsubscribe(ws, attachment, msg); break;
case 'ping': ws.send(JSON.stringify({ type: 'pong', timestamp: Date.now() })); break;
default: ws.send(JSON.stringify({ type: 'error', code: 'unknown_message_type', message: `Unknown message type: ${msg.type}`, })); } }
async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean): Promise<void> { const attachment = ws.deserializeAttachment() as ClientAttachment; console.log(`WebSocket closed: tenant=${attachment?.tenantId}, code=${code}, clean=${wasClean}`); }
async webSocketError(ws: WebSocket, error: unknown): Promise<void> { console.error('WebSocket error:', error); }
private async handleSubscribe( ws: WebSocket, attachment: ClientAttachment, msg: { channel: string; data?: { lastEventId?: string; filters?: Record<string, unknown> } } ): Promise<void> { const { channel } = msg;
// Validate channel access if (!this.canAccessChannel(attachment.tenantId, channel)) { ws.send(JSON.stringify({ type: 'error', code: 'forbidden', message: `Cannot access channel: ${channel}`, })); return; }
// Update attachment with subscription attachment.subscribedChannels.push(channel); ws.serializeAttachment(attachment);
// Send current state snapshot const snapshot = await this.getChannelSnapshot(channel); ws.send(JSON.stringify({ type: 'snapshot', channel, data: snapshot.data, lastEventId: snapshot.lastEventId, timestamp: Date.now(), }));
// Send missed events if resuming if (msg.data?.lastEventId) { const missedEvents = await this.getEventsSince(channel, msg.data.lastEventId); for (const event of missedEvents) { ws.send(JSON.stringify({ type: 'event', event: event.event, channel, data: event.data, eventId: event.id, timestamp: event.timestamp, })); } }
ws.send(JSON.stringify({ type: 'subscribed', channel, timestamp: Date.now(), })); }
private async handleUnsubscribe( ws: WebSocket, attachment: ClientAttachment, msg: { channel: string } ): Promise<void> { attachment.subscribedChannels = attachment.subscribedChannels.filter(c => c !== msg.channel); ws.serializeAttachment(attachment);
ws.send(JSON.stringify({ type: 'unsubscribed', channel: msg.channel, timestamp: Date.now(), })); }
private async handleBroadcast(request: Request): Promise<Response> { const { channel, event, data } = await request.json() as { channel: string; event: string; data: unknown; };
const eventId = `evt_${Date.now()}_${Math.random().toString(36).slice(2)}`; const timestamp = Date.now();
// Buffer event for replay if (!this.eventBuffer.has(channel)) { this.eventBuffer.set(channel, []); } const buffer = this.eventBuffer.get(channel)!; buffer.push({ id: eventId, event, data, timestamp });
// Keep last 100 events per channel if (buffer.length > 100) { buffer.shift(); }
// Broadcast to subscribed connections const sockets = this.ctx.getWebSockets(); const message = JSON.stringify({ type: 'event', event, channel, data, eventId, timestamp, });
let sent = 0; for (const ws of sockets) { const attachment = ws.deserializeAttachment() as ClientAttachment; if (attachment?.subscribedChannels.includes(channel)) { ws.send(message); sent++; } }
return Response.json({ sent, eventId }); }
private async authenticateFromHeader(authHeader: string | null): Promise<{ tenantId: string; userId?: string } | null> { if (!authHeader?.startsWith('Bearer ')) { return null; } const token = authHeader.slice(7); // Validate token and extract tenant/user // Implementation depends on auth system return { tenantId: 'tenant_001' }; // Placeholder }
private canAccessChannel(tenantId: string, channel: string): boolean { // Implement channel access control return true; }
private async getChannelSnapshot(channel: string): Promise<{ data: unknown; lastEventId: string }> { // Fetch current state for channel const buffer = this.eventBuffer.get(channel) || []; return { data: {}, lastEventId: buffer.length > 0 ? buffer[buffer.length - 1].id : '', }; }
private async getEventsSince(channel: string, lastEventId: string): Promise<Array<{ id: string; event: string; data: unknown; timestamp: number }>> { const buffer = this.eventBuffer.get(channel) || []; const index = buffer.findIndex(e => e.id === lastEventId); if (index === -1) { return buffer; // Return all if lastEventId not found } return buffer.slice(index + 1); }}Client-Side Connection Management
Section titled “Client-Side Connection Management”type MessageHandler = (data: unknown) => void;
interface WebSocketClientOptions { url: string; token: string; onConnect?: () => void; onDisconnect?: (wasClean: boolean) => void; onError?: (error: Error) => void;}
export class WebSocketClient { private socket: WebSocket | null = null; private handlers: Map<string, Set<MessageHandler>> = new Map(); private subscriptions: Map<string, { lastEventId?: string }> = new Map(); private reconnectAttempts = 0; private reconnectTimeout: ReturnType<typeof setTimeout> | null = null; private authenticated = false;
constructor(private options: WebSocketClientOptions) {}
connect(): void { if (this.socket?.readyState === WebSocket.OPEN) { return; }
this.socket = new WebSocket(this.options.url);
this.socket.onopen = () => { this.reconnectAttempts = 0; this.authenticate(); };
this.socket.onclose = (event) => { this.authenticated = false; this.options.onDisconnect?.(event.wasClean);
if (!event.wasClean) { this.scheduleReconnect(); } };
this.socket.onerror = () => { this.options.onError?.(new Error('WebSocket error')); };
this.socket.onmessage = (event) => { this.handleMessage(JSON.parse(event.data)); }; }
disconnect(): void { if (this.reconnectTimeout) { clearTimeout(this.reconnectTimeout); this.reconnectTimeout = null; } this.socket?.close(1000, 'Client disconnect'); this.socket = null; }
subscribe(channel: string, handler: MessageHandler, lastEventId?: string): () => void { // Register handler if (!this.handlers.has(channel)) { this.handlers.set(channel, new Set()); } this.handlers.get(channel)!.add(handler);
// Track subscription this.subscriptions.set(channel, { lastEventId });
// Send subscribe if connected if (this.authenticated) { this.send({ type: 'subscribe', channel, data: { lastEventId }, }); }
// Return unsubscribe function return () => { this.handlers.get(channel)?.delete(handler); if (this.handlers.get(channel)?.size === 0) { this.handlers.delete(channel); this.subscriptions.delete(channel); this.send({ type: 'unsubscribe', channel }); } }; }
private authenticate(): void { this.send({ type: 'auth', data: { token: this.options.token }, }); }
private handleMessage(msg: Record<string, unknown>): void { switch (msg.type) { case 'authenticated': this.authenticated = true; this.options.onConnect?.(); this.resubscribeAll(); break;
case 'event': case 'snapshot': const channel = msg.channel as string; const handlers = this.handlers.get(channel); if (handlers) { for (const handler of handlers) { handler(msg); } } // Track last event ID if (msg.eventId && this.subscriptions.has(channel)) { this.subscriptions.get(channel)!.lastEventId = msg.eventId as string; } break;
case 'error': console.error('WebSocket error:', msg); break;
case 'pong': // Heartbeat response break; } }
private resubscribeAll(): void { for (const [channel, { lastEventId }] of this.subscriptions) { this.send({ type: 'subscribe', channel, data: { lastEventId }, }); } }
private send(data: unknown): void { if (this.socket?.readyState === WebSocket.OPEN) { this.socket.send(JSON.stringify(data)); } }
private scheduleReconnect(): void { const maxAttempts = 10; if (this.reconnectAttempts >= maxAttempts) { this.options.onError?.(new Error('Max reconnection attempts reached')); return; }
const baseDelay = 1000; const delay = baseDelay * Math.pow(2, this.reconnectAttempts) + Math.random() * 1000;
this.reconnectTimeout = setTimeout(() => { this.reconnectAttempts++; this.connect(); }, delay); }}React Hook for WebSocket
Section titled “React Hook for WebSocket”import { useEffect, useState, useCallback, useRef } from 'react';import { WebSocketClient } from '../lib/websocket-client';
interface UseRealtimeChannelOptions { url: string; token: string; channel: string; enabled?: boolean;}
interface ChannelState<T> { data: T | null; isConnected: boolean; isLoading: boolean; error: Error | null;}
export function useRealtimeChannel<T>({ url, token, channel, enabled = true,}: UseRealtimeChannelOptions): ChannelState<T> { const [state, setState] = useState<ChannelState<T>>({ data: null, isConnected: false, isLoading: true, error: null, });
const clientRef = useRef<WebSocketClient | null>(null);
useEffect(() => { if (!enabled) { return; }
const client = new WebSocketClient({ url, token, onConnect: () => { setState(s => ({ ...s, isConnected: true, error: null })); }, onDisconnect: (wasClean) => { setState(s => ({ ...s, isConnected: false })); if (!wasClean) { setState(s => ({ ...s, error: new Error('Connection lost') })); } }, onError: (error) => { setState(s => ({ ...s, error })); }, });
clientRef.current = client; client.connect();
const unsubscribe = client.subscribe(channel, (msg: any) => { if (msg.type === 'snapshot') { setState(s => ({ ...s, data: msg.data, isLoading: false })); } else if (msg.type === 'event') { // Merge event into current state setState(s => ({ ...s, data: mergeEvent(s.data, msg.event, msg.data), })); } });
return () => { unsubscribe(); client.disconnect(); }; }, [url, token, channel, enabled]);
return state;}
// Example: merge function for call dashboardfunction mergeEvent<T>(currentData: T | null, event: string, eventData: any): T { if (!currentData) { return eventData as T; }
// Implement event-specific merge logic // This depends on your data structure return { ...currentData, ...eventData, };}
// Usage in componentfunction CallDashboard() { const { data, isConnected, isLoading, error } = useRealtimeChannel<{ activeCalls: Call[]; recentCalls: Call[]; }>({ url: 'wss://api.web1.co/realtime', token: apiKey, channel: 'calls', });
if (isLoading) { return <LoadingSpinner />; }
if (error) { return <ErrorMessage error={error} />; }
return ( <div> <ConnectionStatus connected={isConnected} /> <ActiveCallsList calls={data?.activeCalls || []} /> <RecentCallsList calls={data?.recentCalls || []} /> </div> );}Constraints and Limits
Section titled “Constraints and Limits”Cloudflare Limits
Section titled “Cloudflare Limits”| Limit | Value | Mitigation |
|---|---|---|
| Connections per DO | 32,768 | Shard by tenant/channel |
| WebSocket message size | 1 MB | Paginate large payloads |
| Hibernation duration | Unlimited | N/A |
| Messages per second per DO | ~10,000 | Shard high-volume channels |
Performance Considerations
Section titled “Performance Considerations”| Factor | Recommendation |
|---|---|
| Message frequency | Batch rapid updates (e.g., debounce to 100ms) |
| Payload size | Keep messages small; send IDs, not full objects |
| Fan-out | For 1000+ subscribers, consider pub/sub patterns |
| State snapshots | Paginate large initial payloads |
Summary
Section titled “Summary”| Concept | z0 Implementation |
|---|---|
| When to use | Real-time dashboards, live status, notifications |
| Hibernation API | acceptWebSocket() for cost-efficient idle connections |
| DO coordination | Per-tenant RealtimeDO manages connections and broadcasts |
| Message protocol | JSON with type, event, channel, data, timestamp |
| Authentication | Token in header or auth message after connect |
| Scaling | Shard by tenant, filter broadcasts, buffer for replay |
| Client handling | Reconnect with backoff, track lastEventId for resume |
WebSockets with Durable Objects enable z0 to provide real-time visibility into economic activity as it happens, from call status to budget alerts to agent decisions.