Durable Object Implementation Guide
TypeScript patterns for implementing DO classes. Ready for production.
Prerequisites: durable-object-architecture.md, durable-object-schemas.md
Overview
Section titled “Overview”This document provides TypeScript implementation patterns for all DO classes. Each pattern is production-ready and implements z0 principles.
Code organization:
src/ platform/ ledgers/ EntityLedger.ts # Generic base class for all entity ledgers types/ primitives.ts # Entity, Fact, Config types ledger.ts # Ledger-specific types domain/ entities/ Account.ts # Account entity ledger Asset.ts # Asset entity ledger Contact.ts # Contact entity ledger Deal.ts # Deal entity ledger CallLedger.ts # Call orchestrator (not EntityLedger) schema.ts # Field mappings manifest.ts # Domain registrationEntityLedger Base Class
Section titled “EntityLedger Base Class”All entity-based DOs extend EntityLedger which provides common functionality.
EntityLedger.ts
Section titled “EntityLedger.ts”import { DurableObject } from 'cloudflare:workers';
export interface Env { ACCOUNT_LEDGER: DurableObjectNamespace; ASSET_LEDGER: DurableObjectNamespace; CONTACT_LEDGER: DurableObjectNamespace; DEAL_LEDGER: DurableObjectNamespace; REPLICATION_QUEUE: Queue; D1: D1Database;}
export interface Entity { id: string; type: string; subtype?: string; tenant_id?: string; parent_id?: string; owner_id?: string; status: string; data: Record<string, any>; created_at: number; updated_at: number;}
export interface Fact { id: string; type: string; subtype?: string; timestamp: number; tenant_id: string; source_id?: string; entity_id?: string; asset_id?: string; campaign_id?: string; contact_id?: string; deal_id?: string; tool_id?: string; user_id?: string; parent_invocation_id?: string; from_entity?: string; to_entity?: string; amount?: number; currency?: string; config_id?: string; config_version?: number; trace_id?: string; external_source?: string; external_id?: string; data: Record<string, any>;}
export interface Config { id: string; version: number; type: string; category: 'policy' | 'logic'; name: string; applies_to: string; scope: 'account' | 'campaign' | 'asset'; settings: Record<string, any>; effective_at: number; superseded_at?: number;}
export interface CachedState { last_fact_id?: string; computed_at: number;}
export abstract class EntityLedger extends DurableObject { protected sql: SqlStorage; protected entityId: string; protected entityType: string; protected initialized = false;
// In-memory hot caches (short TTL) protected hotEntityCache?: Entity; protected hotStateCaches = new Map<string, { state: any; expiresAt: number }>(); protected configCache = new Map<string, { config: Config; expiresAt: number }>();
constructor(state: DurableObjectState, protected env: Env) { super(state, env); this.sql = state.storage.sql;
// Extract entity ID from DO name const doName = state.id.name; if (!doName) { throw new Error('DO must be created with named ID'); }
// DO name format: "{namespace}_{entity_id}" const parts = doName.split('_'); this.entityType = parts[0]; // 'account', 'asset', 'contact', 'deal' this.entityId = parts.slice(1).join('_'); // Rejoin in case entity_id has underscores }
async initialize(): Promise<void> { if (this.initialized) return;
// Get or create schema version const version = await this.state.storage.get<number>('schema_version') ?? 0; const targetVersion = this.getSchemaVersion();
if (version < targetVersion) { await this.migrateSchema(version, targetVersion); }
// Schedule reconciliation alarm const existingAlarm = await this.state.storage.getAlarm(); if (existingAlarm === null) { await this.scheduleReconciliation(); }
this.initialized = true; }
abstract getSchemaVersion(): number; abstract applyMigration(version: number): Promise<void>; abstract getCachedStateTypes(): string[];
async migrateSchema(from: number, to: number): Promise<void> { console.log(`[${this.entityId}] Migrating schema from v${from} to v${to}`);
for (let v = from; v < to; v++) { await this.applyMigration(v + 1); }
await this.state.storage.put('schema_version', to); }
// Entity operations async getEntity(): Promise<Entity> { if (this.hotEntityCache) { return this.hotEntityCache; }
const row = await this.sql.exec( `SELECT data FROM entity WHERE id = ?`, [this.entityId] ).first();
if (!row) { throw new Error(`Entity ${this.entityId} not found`); }
const entity = JSON.parse(row.data) as Entity; this.hotEntityCache = entity; return entity; }
async updateEntity(updates: Partial<Entity>): Promise<Entity> { const entity = await this.getEntity(); const updated = { ...entity, ...updates, updated_at: Date.now() };
await this.sql.exec( `UPDATE entity SET data = ?, updated_at = ? WHERE id = ?`, [JSON.stringify(updated), updated.updated_at, this.entityId] );
// Invalidate cache this.hotEntityCache = updated;
// Record lifecycle Fact if status changed if (updates.status && updates.status !== entity.status) { await this.appendFact({ id: this.generateFactId(), type: 'lifecycle', subtype: 'status_changed', timestamp: Date.now(), tenant_id: entity.tenant_id!, entity_id: this.entityId, data: { from_status: entity.status, to_status: updates.status } }); }
return updated; }
// Fact operations async appendFact(fact: Fact): Promise<Fact> { await this.initialize();
fact.id = fact.id ?? this.generateFactId(); fact.timestamp = fact.timestamp ?? Date.now();
// Insert Fact await this.sql.exec(` INSERT INTO facts ( id, type, subtype, timestamp, tenant_id, source_id, entity_id, asset_id, campaign_id, contact_id, deal_id, tool_id, user_id, parent_invocation_id, from_entity, to_entity, amount, currency, config_id, config_version, trace_id, external_source, external_id, data, replicated_at, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, [ fact.id, fact.type, fact.subtype, fact.timestamp, fact.tenant_id, fact.source_id, fact.entity_id, fact.asset_id, fact.campaign_id, fact.contact_id, fact.deal_id, fact.tool_id, fact.user_id, fact.parent_invocation_id, fact.from_entity, fact.to_entity, fact.amount, fact.currency, fact.config_id, fact.config_version, fact.trace_id, fact.external_source, fact.external_id, JSON.stringify(fact), null, Date.now() ]);
// Update cached state inline await this.updateCachedStateInline(fact);
// Schedule replication await this.scheduleReplication();
return fact; }
abstract updateCachedStateInline(fact: Fact): Promise<void>;
async getFactsByType(type: string, limit = 100): Promise<Fact[]> { const rows = await this.sql.exec( `SELECT data FROM facts WHERE type = ? ORDER BY timestamp DESC LIMIT ?`, [type, limit] );
return rows.map(row => JSON.parse(row.data) as Fact); }
async getRecentFacts(limit = 100): Promise<Fact[]> { const rows = await this.sql.exec( `SELECT data FROM facts ORDER BY timestamp DESC LIMIT ?`, [limit] );
return rows.map(row => JSON.parse(row.data) as Fact); }
// Cached state operations async getCachedState<T extends CachedState>(key: string): Promise<T | null> { // Check in-memory cache first const cached = this.hotStateCaches.get(key); if (cached && cached.expiresAt > Date.now()) { return cached.state as T; }
// Query SQLite const row = await this.sql.exec( `SELECT value, computed_at, facts_through FROM cached_state WHERE key = ?`, [key] ).first();
if (!row) return null;
const state = { ...JSON.parse(row.value), computed_at: row.computed_at, last_fact_id: row.facts_through } as T;
// Cache in memory (1 minute TTL) this.hotStateCaches.set(key, { state, expiresAt: Date.now() + 60_000 });
return state; }
async setCachedState<T extends CachedState>(key: string, state: T): Promise<void> { const { last_fact_id, computed_at, ...value } = state as any;
await this.sql.exec(` INSERT OR REPLACE INTO cached_state (key, value, computed_at, facts_through, updated_at) VALUES (?, ?, ?, ?, ?) `, [ key, JSON.stringify(value), computed_at ?? Date.now(), last_fact_id, Date.now() ]);
// Invalidate in-memory cache this.hotStateCaches.delete(key); }
// Config operations async getActiveConfig(type: string): Promise<Config | null> { const row = await this.sql.exec( `SELECT * FROM configs WHERE type = ? AND superseded_at IS NULL ORDER BY version DESC LIMIT 1`, [type] ).first();
if (!row) return null;
return { id: row.id, version: row.version, type: row.type, category: row.category, name: row.name, applies_to: row.applies_to, scope: row.scope, settings: JSON.parse(row.settings), effective_at: row.effective_at, superseded_at: row.superseded_at }; }
async createConfig(config: Omit<Config, 'version' | 'effective_at' | 'superseded_at'>): Promise<Config> { const full: Config = { ...config, version: 1, effective_at: Date.now(), superseded_at: undefined };
await this.sql.exec(` INSERT INTO configs (id, version, type, category, name, applies_to, scope, settings, effective_at, superseded_at, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, [ full.id, full.version, full.type, full.category, full.name, full.applies_to, full.scope, JSON.stringify(full.settings), full.effective_at, null, Date.now() ]);
return full; }
async updateConfig(configId: string, expectedVersion: number, newSettings: Record<string, any>): Promise<Config> { const current = await this.sql.exec( `SELECT * FROM configs WHERE id = ? AND version = ? AND superseded_at IS NULL`, [configId, expectedVersion] ).first();
if (!current) { throw new Error('Config version mismatch'); }
const now = Date.now(); const newVersion = expectedVersion + 1;
await this.sql.exec('BEGIN TRANSACTION');
try { // Supersede old version await this.sql.exec( `UPDATE configs SET superseded_at = ? WHERE id = ? AND version = ?`, [now, configId, expectedVersion] );
// Insert new version await this.sql.exec(` INSERT INTO configs (id, version, type, category, name, applies_to, scope, settings, effective_at, superseded_at, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, [ configId, newVersion, current.type, current.category, current.name, current.applies_to, current.scope, JSON.stringify(newSettings), now, null, Date.now() ]);
await this.sql.exec('COMMIT');
return { id: configId, version: newVersion, type: current.type, category: current.category, name: current.name, applies_to: current.applies_to, scope: current.scope, settings: newSettings, effective_at: now }; } catch (error) { await this.sql.exec('ROLLBACK'); throw error; } }
// Reconciliation async alarm(): Promise<void> { const alarmType = await this.state.storage.get<string>('alarm_type');
switch (alarmType) { case 'reconciliation': await this.runReconciliation(); await this.scheduleReconciliation(); break; case 'replication': await this.runReplication(); break; } }
async scheduleReconciliation(): Promise<void> { await this.state.storage.put('alarm_type', 'reconciliation'); await this.state.storage.setAlarm(Date.now() + 5 * 60 * 1000); // 5 minutes }
async runReconciliation(): Promise<void> { const stateTypes = this.getCachedStateTypes();
for (const stateType of stateTypes) { await this.reconcileState(stateType); } }
abstract calculateFromLedger(stateType: string): Promise<CachedState>;
async reconcileState(stateType: string): Promise<void> { const startTime = Date.now();
const cached = await this.getCachedState(stateType); const calculated = await this.calculateFromLedger(stateType);
if (this.statesMatch(cached, calculated)) { return; }
// Record mismatch await this.appendFact({ id: this.generateFactId(), type: 'reconciliation', subtype: 'mismatch_detected', timestamp: Date.now(), tenant_id: (await this.getEntity()).tenant_id!, entity_id: this.entityId, data: { cache_type: stateType, cached_value: cached, calculated_value: calculated, delta: this.computeDelta(cached, calculated), resolution: 'cache_updated', facts_scanned: await this.countFactsForState(stateType), duration_ms: Date.now() - startTime } });
await this.setCachedState(stateType, calculated); }
abstract statesMatch(cached: any, calculated: any): boolean; abstract computeDelta(cached: any, calculated: any): any; abstract countFactsForState(stateType: string): Promise<number>;
// Replication async scheduleReplication(): Promise<void> { const existingAlarm = await this.state.storage.getAlarm(); if (existingAlarm !== null && await this.state.storage.get('alarm_type') === 'replication') { return; // Already scheduled }
await this.state.storage.put('alarm_type', 'replication'); await this.state.storage.setAlarm(Date.now() + 100); // 100ms debounce }
async runReplication(): Promise<void> { const BATCH_SIZE = 100;
const unreplicated = await this.sql.exec(` SELECT id, data FROM facts WHERE replicated_at IS NULL ORDER BY timestamp LIMIT ? `, [BATCH_SIZE]);
if (unreplicated.length === 0) return;
await this.env.REPLICATION_QUEUE.send({ entity_id: this.entityId, entity_type: this.entityType, facts: unreplicated.map(row => ({ id: row.id, data: JSON.parse(row.data) })) });
const ids = unreplicated.map(row => row.id); await this.sql.exec(` UPDATE facts SET replicated_at = ? WHERE id IN (${ids.map(() => '?').join(',')}) `, [Date.now(), ...ids]);
const remaining = await this.sql.exec( `SELECT COUNT(*) as count FROM facts WHERE replicated_at IS NULL` ).first();
if (remaining.count > 0) { await this.state.storage.setAlarm(Date.now() + 100); } }
// Utilities protected generateFactId(): string { return `fact_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; }
protected isStale(cachedAt: number, ttlMs = 60_000): boolean { return Date.now() - cachedAt > ttlMs; }}AccountDO Implementation
Section titled “AccountDO Implementation”import { EntityLedger, Env, Fact, CachedState } from '@/platform/ledgers/EntityLedger';
interface BudgetState extends CachedState { deposited: number; spent: number; credits: number; pending: number; remaining: number;}
interface PrepaidBalance extends CachedState { balance: number;}
export class Account extends EntityLedger { // Hot state for RTB path private hotBudgetState?: BudgetState & { cachedAt: number };
getSchemaVersion(): number { return 1; }
async applyMigration(version: number): Promise<void> { switch (version) { case 1: // Initial schema - see durable-object-schemas.md for full DDL await this.sql.exec(` CREATE TABLE IF NOT EXISTS entity ( id TEXT PRIMARY KEY, type TEXT NOT NULL CHECK(type = 'account'), subtype TEXT NOT NULL, tenant_id TEXT, parent_id TEXT, data TEXT NOT NULL, created_at INTEGER NOT NULL DEFAULT (unixepoch()), updated_at INTEGER NOT NULL DEFAULT (unixepoch()) );
CREATE TABLE IF NOT EXISTS facts ( id TEXT PRIMARY KEY, type TEXT NOT NULL, subtype TEXT, timestamp INTEGER NOT NULL, tenant_id TEXT NOT NULL, source_id TEXT, entity_id TEXT, asset_id TEXT, campaign_id TEXT, contact_id TEXT, deal_id TEXT, tool_id TEXT, user_id TEXT, from_entity TEXT, to_entity TEXT, amount INTEGER, currency TEXT DEFAULT 'USD', config_id TEXT, config_version INTEGER, trace_id TEXT, external_source TEXT, external_id TEXT, data TEXT NOT NULL, replicated_at INTEGER, created_at INTEGER NOT NULL DEFAULT (unixepoch()) );
CREATE INDEX IF NOT EXISTS idx_facts_type_ts ON facts(type, timestamp); CREATE INDEX IF NOT EXISTS idx_facts_unreplicated ON facts(replicated_at) WHERE replicated_at IS NULL;
CREATE TABLE IF NOT EXISTS cached_state ( key TEXT PRIMARY KEY, value TEXT NOT NULL, computed_at INTEGER NOT NULL, facts_through TEXT, updated_at INTEGER NOT NULL DEFAULT (unixepoch()) );
CREATE TABLE IF NOT EXISTS configs ( id TEXT NOT NULL, version INTEGER NOT NULL, type TEXT NOT NULL, category TEXT NOT NULL CHECK(category IN ('policy', 'logic')), name TEXT NOT NULL, applies_to TEXT NOT NULL, scope TEXT NOT NULL CHECK(scope IN ('account', 'campaign', 'asset')), settings TEXT NOT NULL, effective_at INTEGER NOT NULL, superseded_at INTEGER, created_at INTEGER NOT NULL DEFAULT (unixepoch()), PRIMARY KEY (id, version) );
CREATE INDEX IF NOT EXISTS idx_configs_active ON configs(id) WHERE superseded_at IS NULL; `); break; } }
getCachedStateTypes(): string[] { return ['BudgetState', 'PrepaidBalance', 'ARBalance', 'APBalance']; }
async updateCachedStateInline(fact: Fact): Promise<void> { switch (fact.type) { case 'charge': await this.incrementBudgetSpent(fact.amount!, fact.id); break; case 'deposit': await this.incrementBudgetDeposited(fact.amount!, fact.id); break; case 'credit_issued': await this.incrementBudgetCredits(fact.amount!, fact.id); break; } }
private async incrementBudgetSpent(amount: number, factId: string): Promise<void> { await this.sql.exec(` UPDATE cached_state SET value = json_set( value, '$.spent', json_extract(value, '$.spent') + ?, '$.remaining', json_extract(value, '$.remaining') - ? ), facts_through = ?, computed_at = ?, updated_at = ? WHERE key = 'BudgetState' `, [amount, amount, factId, Date.now(), Date.now()]);
// Invalidate hot cache this.hotBudgetState = undefined; }
private async incrementBudgetDeposited(amount: number, factId: string): Promise<void> { await this.sql.exec(` UPDATE cached_state SET value = json_set( value, '$.deposited', json_extract(value, '$.deposited') + ?, '$.remaining', json_extract(value, '$.remaining') + ? ), facts_through = ?, computed_at = ?, updated_at = ? WHERE key = 'BudgetState' `, [amount, amount, factId, Date.now(), Date.now()]);
this.hotBudgetState = undefined; }
private async incrementBudgetCredits(amount: number, factId: string): Promise<void> { await this.sql.exec(` UPDATE cached_state SET value = json_set( value, '$.credits', json_extract(value, '$.credits') + ?, '$.remaining', json_extract(value, '$.remaining') + ? ), facts_through = ?, computed_at = ?, updated_at = ? WHERE key = 'BudgetState' `, [amount, amount, factId, Date.now(), Date.now()]);
this.hotBudgetState = undefined; }
// Hot path: Budget availability check async checkBudgetAvailability(amount: number): Promise<boolean> { if (this.hotBudgetState && !this.isStale(this.hotBudgetState.cachedAt)) { return this.hotBudgetState.remaining >= amount; }
const cached = await this.getCachedState<BudgetState>('BudgetState'); if (!cached) { // No budget state yet, calculate from ledger const calculated = await this.calculateBudgetFromLedger(); await this.setCachedState('BudgetState', calculated); this.hotBudgetState = { ...calculated, cachedAt: Date.now() }; return calculated.remaining >= amount; }
this.hotBudgetState = { ...cached, cachedAt: Date.now() }; return cached.remaining >= amount; }
async calculateFromLedger(stateType: string): Promise<CachedState> { switch (stateType) { case 'BudgetState': return this.calculateBudgetFromLedger(); case 'PrepaidBalance': return this.calculatePrepaidFromLedger(); default: throw new Error(`Unknown state type: ${stateType}`); } }
private async calculateBudgetFromLedger(): Promise<BudgetState> { const rows = await this.sql.exec(` SELECT type, SUM(amount) as total, MAX(id) as last_fact_id FROM facts WHERE type IN ('deposit', 'charge', 'credit_issued') GROUP BY type `);
let deposited = 0, spent = 0, credits = 0; let lastFactId = '';
for (const row of rows) { switch (row.type) { case 'deposit': deposited = row.total || 0; break; case 'charge': spent = row.total || 0; break; case 'credit_issued': credits = row.total || 0; break; } if (row.last_fact_id > lastFactId) { lastFactId = row.last_fact_id; } }
return { deposited, spent, credits, pending: 0, // Pending is runtime-only, not in ledger remaining: deposited + credits - spent, last_fact_id: lastFactId, computed_at: Date.now() }; }
private async calculatePrepaidFromLedger(): Promise<PrepaidBalance> { const budget = await this.calculateBudgetFromLedger(); return { balance: budget.deposited - budget.spent, last_fact_id: budget.last_fact_id, computed_at: Date.now() }; }
statesMatch(cached: any, calculated: any): boolean { if (!cached || !calculated) return false; return ( cached.deposited === calculated.deposited && cached.spent === calculated.spent && cached.credits === calculated.credits ); }
computeDelta(cached: any, calculated: any): any { return { deposited: (calculated?.deposited || 0) - (cached?.deposited || 0), spent: (calculated?.spent || 0) - (cached?.spent || 0), credits: (calculated?.credits || 0) - (cached?.credits || 0), remaining: (calculated?.remaining || 0) - (cached?.remaining || 0) }; }
async countFactsForState(stateType: string): Promise<number> { let types: string[] = [];
switch (stateType) { case 'BudgetState': case 'PrepaidBalance': types = ['deposit', 'charge', 'credit_issued']; break; default: types = []; }
if (types.length === 0) return 0;
const result = await this.sql.exec(` SELECT COUNT(*) as count FROM facts WHERE type IN (${types.map(() => '?').join(',')}) `, types).first();
return result?.count || 0; }
// Request handler async fetch(request: Request): Promise<Response> { await this.initialize();
const url = new URL(request.url); const path = url.pathname;
try { if (request.method === 'POST' && path === '/budget/check') { const { campaign_id, amount } = await request.json(); const available = await this.checkBudgetAvailability(amount); return Response.json({ available, campaign_id }); }
if (request.method === 'POST' && path === '/facts') { const fact = await request.json(); const created = await this.appendFact(fact); return Response.json(created, { status: 201 }); }
if (request.method === 'GET' && path === '/state/budget') { const state = await this.getCachedState<BudgetState>('BudgetState'); return Response.json(state); }
return new Response('Not Found', { status: 404 }); } catch (error) { console.error(`[${this.entityId}] Error:`, error); return Response.json({ error: error.message }, { status: 500 }); } }}Summary
Section titled “Summary”The implementation guide provides:
- EntityLedger — Shared base class with common patterns
- AccountDO — Full implementation example
- Schema migrations — Versioned migration pattern
- Hot path optimization — In-memory caching with TTL
- Reconciliation — Alarm-based verification
- Replication — Queue-based batch replication
Next steps:
- Implement AssetDO, ContactDO, DealDO following AccountDO pattern
- Add request routing in Worker
- Implement Queue consumers for replication
- Add observability (metrics, logs, traces)
All code is production-ready and follows z0 principles.