Skip to content

Durable Object Implementation Guide

TypeScript patterns for implementing DO classes. Ready for production.

Prerequisites: durable-object-architecture.md, durable-object-schemas.md


This document provides TypeScript implementation patterns for all DO classes. Each pattern is production-ready and implements z0 principles.

Code organization:

src/
platform/
ledgers/
EntityLedger.ts # Generic base class for all entity ledgers
types/
primitives.ts # Entity, Fact, Config types
ledger.ts # Ledger-specific types
domain/
entities/
Account.ts # Account entity ledger
Asset.ts # Asset entity ledger
Contact.ts # Contact entity ledger
Deal.ts # Deal entity ledger
CallLedger.ts # Call orchestrator (not EntityLedger)
schema.ts # Field mappings
manifest.ts # Domain registration

All entity-based DOs extend EntityLedger which provides common functionality.

import { DurableObject } from 'cloudflare:workers';
export interface Env {
ACCOUNT_LEDGER: DurableObjectNamespace;
ASSET_LEDGER: DurableObjectNamespace;
CONTACT_LEDGER: DurableObjectNamespace;
DEAL_LEDGER: DurableObjectNamespace;
REPLICATION_QUEUE: Queue;
D1: D1Database;
}
export interface Entity {
id: string;
type: string;
subtype?: string;
tenant_id?: string;
parent_id?: string;
owner_id?: string;
status: string;
data: Record<string, any>;
created_at: number;
updated_at: number;
}
export interface Fact {
id: string;
type: string;
subtype?: string;
timestamp: number;
tenant_id: string;
source_id?: string;
entity_id?: string;
asset_id?: string;
campaign_id?: string;
contact_id?: string;
deal_id?: string;
tool_id?: string;
user_id?: string;
parent_invocation_id?: string;
from_entity?: string;
to_entity?: string;
amount?: number;
currency?: string;
config_id?: string;
config_version?: number;
trace_id?: string;
external_source?: string;
external_id?: string;
data: Record<string, any>;
}
export interface Config {
id: string;
version: number;
type: string;
category: 'policy' | 'logic';
name: string;
applies_to: string;
scope: 'account' | 'campaign' | 'asset';
settings: Record<string, any>;
effective_at: number;
superseded_at?: number;
}
export interface CachedState {
last_fact_id?: string;
computed_at: number;
}
export abstract class EntityLedger extends DurableObject {
protected sql: SqlStorage;
protected entityId: string;
protected entityType: string;
protected initialized = false;
// In-memory hot caches (short TTL)
protected hotEntityCache?: Entity;
protected hotStateCaches = new Map<string, { state: any; expiresAt: number }>();
protected configCache = new Map<string, { config: Config; expiresAt: number }>();
constructor(state: DurableObjectState, protected env: Env) {
super(state, env);
this.sql = state.storage.sql;
// Extract entity ID from DO name
const doName = state.id.name;
if (!doName) {
throw new Error('DO must be created with named ID');
}
// DO name format: "{namespace}_{entity_id}"
const parts = doName.split('_');
this.entityType = parts[0]; // 'account', 'asset', 'contact', 'deal'
this.entityId = parts.slice(1).join('_'); // Rejoin in case entity_id has underscores
}
async initialize(): Promise<void> {
if (this.initialized) return;
// Get or create schema version
const version = await this.state.storage.get<number>('schema_version') ?? 0;
const targetVersion = this.getSchemaVersion();
if (version < targetVersion) {
await this.migrateSchema(version, targetVersion);
}
// Schedule reconciliation alarm
const existingAlarm = await this.state.storage.getAlarm();
if (existingAlarm === null) {
await this.scheduleReconciliation();
}
this.initialized = true;
}
abstract getSchemaVersion(): number;
abstract applyMigration(version: number): Promise<void>;
abstract getCachedStateTypes(): string[];
async migrateSchema(from: number, to: number): Promise<void> {
console.log(`[${this.entityId}] Migrating schema from v${from} to v${to}`);
for (let v = from; v < to; v++) {
await this.applyMigration(v + 1);
}
await this.state.storage.put('schema_version', to);
}
// Entity operations
async getEntity(): Promise<Entity> {
if (this.hotEntityCache) {
return this.hotEntityCache;
}
const row = await this.sql.exec(
`SELECT data FROM entity WHERE id = ?`,
[this.entityId]
).first();
if (!row) {
throw new Error(`Entity ${this.entityId} not found`);
}
const entity = JSON.parse(row.data) as Entity;
this.hotEntityCache = entity;
return entity;
}
async updateEntity(updates: Partial<Entity>): Promise<Entity> {
const entity = await this.getEntity();
const updated = { ...entity, ...updates, updated_at: Date.now() };
await this.sql.exec(
`UPDATE entity SET data = ?, updated_at = ? WHERE id = ?`,
[JSON.stringify(updated), updated.updated_at, this.entityId]
);
// Invalidate cache
this.hotEntityCache = updated;
// Record lifecycle Fact if status changed
if (updates.status && updates.status !== entity.status) {
await this.appendFact({
id: this.generateFactId(),
type: 'lifecycle',
subtype: 'status_changed',
timestamp: Date.now(),
tenant_id: entity.tenant_id!,
entity_id: this.entityId,
data: {
from_status: entity.status,
to_status: updates.status
}
});
}
return updated;
}
// Fact operations
async appendFact(fact: Fact): Promise<Fact> {
await this.initialize();
fact.id = fact.id ?? this.generateFactId();
fact.timestamp = fact.timestamp ?? Date.now();
// Insert Fact
await this.sql.exec(`
INSERT INTO facts (
id, type, subtype, timestamp, tenant_id,
source_id, entity_id, asset_id, campaign_id, contact_id, deal_id,
tool_id, user_id, parent_invocation_id,
from_entity, to_entity, amount, currency,
config_id, config_version, trace_id,
external_source, external_id,
data, replicated_at, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, [
fact.id, fact.type, fact.subtype, fact.timestamp, fact.tenant_id,
fact.source_id, fact.entity_id, fact.asset_id, fact.campaign_id,
fact.contact_id, fact.deal_id, fact.tool_id, fact.user_id,
fact.parent_invocation_id,
fact.from_entity, fact.to_entity, fact.amount, fact.currency,
fact.config_id, fact.config_version, fact.trace_id,
fact.external_source, fact.external_id,
JSON.stringify(fact), null, Date.now()
]);
// Update cached state inline
await this.updateCachedStateInline(fact);
// Schedule replication
await this.scheduleReplication();
return fact;
}
abstract updateCachedStateInline(fact: Fact): Promise<void>;
async getFactsByType(type: string, limit = 100): Promise<Fact[]> {
const rows = await this.sql.exec(
`SELECT data FROM facts WHERE type = ? ORDER BY timestamp DESC LIMIT ?`,
[type, limit]
);
return rows.map(row => JSON.parse(row.data) as Fact);
}
async getRecentFacts(limit = 100): Promise<Fact[]> {
const rows = await this.sql.exec(
`SELECT data FROM facts ORDER BY timestamp DESC LIMIT ?`,
[limit]
);
return rows.map(row => JSON.parse(row.data) as Fact);
}
// Cached state operations
async getCachedState<T extends CachedState>(key: string): Promise<T | null> {
// Check in-memory cache first
const cached = this.hotStateCaches.get(key);
if (cached && cached.expiresAt > Date.now()) {
return cached.state as T;
}
// Query SQLite
const row = await this.sql.exec(
`SELECT value, computed_at, facts_through FROM cached_state WHERE key = ?`,
[key]
).first();
if (!row) return null;
const state = {
...JSON.parse(row.value),
computed_at: row.computed_at,
last_fact_id: row.facts_through
} as T;
// Cache in memory (1 minute TTL)
this.hotStateCaches.set(key, {
state,
expiresAt: Date.now() + 60_000
});
return state;
}
async setCachedState<T extends CachedState>(key: string, state: T): Promise<void> {
const { last_fact_id, computed_at, ...value } = state as any;
await this.sql.exec(`
INSERT OR REPLACE INTO cached_state (key, value, computed_at, facts_through, updated_at)
VALUES (?, ?, ?, ?, ?)
`, [
key,
JSON.stringify(value),
computed_at ?? Date.now(),
last_fact_id,
Date.now()
]);
// Invalidate in-memory cache
this.hotStateCaches.delete(key);
}
// Config operations
async getActiveConfig(type: string): Promise<Config | null> {
const row = await this.sql.exec(
`SELECT * FROM configs WHERE type = ? AND superseded_at IS NULL ORDER BY version DESC LIMIT 1`,
[type]
).first();
if (!row) return null;
return {
id: row.id,
version: row.version,
type: row.type,
category: row.category,
name: row.name,
applies_to: row.applies_to,
scope: row.scope,
settings: JSON.parse(row.settings),
effective_at: row.effective_at,
superseded_at: row.superseded_at
};
}
async createConfig(config: Omit<Config, 'version' | 'effective_at' | 'superseded_at'>): Promise<Config> {
const full: Config = {
...config,
version: 1,
effective_at: Date.now(),
superseded_at: undefined
};
await this.sql.exec(`
INSERT INTO configs (id, version, type, category, name, applies_to, scope, settings, effective_at, superseded_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, [
full.id, full.version, full.type, full.category, full.name,
full.applies_to, full.scope, JSON.stringify(full.settings),
full.effective_at, null, Date.now()
]);
return full;
}
async updateConfig(configId: string, expectedVersion: number, newSettings: Record<string, any>): Promise<Config> {
const current = await this.sql.exec(
`SELECT * FROM configs WHERE id = ? AND version = ? AND superseded_at IS NULL`,
[configId, expectedVersion]
).first();
if (!current) {
throw new Error('Config version mismatch');
}
const now = Date.now();
const newVersion = expectedVersion + 1;
await this.sql.exec('BEGIN TRANSACTION');
try {
// Supersede old version
await this.sql.exec(
`UPDATE configs SET superseded_at = ? WHERE id = ? AND version = ?`,
[now, configId, expectedVersion]
);
// Insert new version
await this.sql.exec(`
INSERT INTO configs (id, version, type, category, name, applies_to, scope, settings, effective_at, superseded_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, [
configId, newVersion, current.type, current.category, current.name,
current.applies_to, current.scope, JSON.stringify(newSettings),
now, null, Date.now()
]);
await this.sql.exec('COMMIT');
return {
id: configId,
version: newVersion,
type: current.type,
category: current.category,
name: current.name,
applies_to: current.applies_to,
scope: current.scope,
settings: newSettings,
effective_at: now
};
} catch (error) {
await this.sql.exec('ROLLBACK');
throw error;
}
}
// Reconciliation
async alarm(): Promise<void> {
const alarmType = await this.state.storage.get<string>('alarm_type');
switch (alarmType) {
case 'reconciliation':
await this.runReconciliation();
await this.scheduleReconciliation();
break;
case 'replication':
await this.runReplication();
break;
}
}
async scheduleReconciliation(): Promise<void> {
await this.state.storage.put('alarm_type', 'reconciliation');
await this.state.storage.setAlarm(Date.now() + 5 * 60 * 1000); // 5 minutes
}
async runReconciliation(): Promise<void> {
const stateTypes = this.getCachedStateTypes();
for (const stateType of stateTypes) {
await this.reconcileState(stateType);
}
}
abstract calculateFromLedger(stateType: string): Promise<CachedState>;
async reconcileState(stateType: string): Promise<void> {
const startTime = Date.now();
const cached = await this.getCachedState(stateType);
const calculated = await this.calculateFromLedger(stateType);
if (this.statesMatch(cached, calculated)) {
return;
}
// Record mismatch
await this.appendFact({
id: this.generateFactId(),
type: 'reconciliation',
subtype: 'mismatch_detected',
timestamp: Date.now(),
tenant_id: (await this.getEntity()).tenant_id!,
entity_id: this.entityId,
data: {
cache_type: stateType,
cached_value: cached,
calculated_value: calculated,
delta: this.computeDelta(cached, calculated),
resolution: 'cache_updated',
facts_scanned: await this.countFactsForState(stateType),
duration_ms: Date.now() - startTime
}
});
await this.setCachedState(stateType, calculated);
}
abstract statesMatch(cached: any, calculated: any): boolean;
abstract computeDelta(cached: any, calculated: any): any;
abstract countFactsForState(stateType: string): Promise<number>;
// Replication
async scheduleReplication(): Promise<void> {
const existingAlarm = await this.state.storage.getAlarm();
if (existingAlarm !== null && await this.state.storage.get('alarm_type') === 'replication') {
return; // Already scheduled
}
await this.state.storage.put('alarm_type', 'replication');
await this.state.storage.setAlarm(Date.now() + 100); // 100ms debounce
}
async runReplication(): Promise<void> {
const BATCH_SIZE = 100;
const unreplicated = await this.sql.exec(`
SELECT id, data FROM facts
WHERE replicated_at IS NULL
ORDER BY timestamp
LIMIT ?
`, [BATCH_SIZE]);
if (unreplicated.length === 0) return;
await this.env.REPLICATION_QUEUE.send({
entity_id: this.entityId,
entity_type: this.entityType,
facts: unreplicated.map(row => ({
id: row.id,
data: JSON.parse(row.data)
}))
});
const ids = unreplicated.map(row => row.id);
await this.sql.exec(`
UPDATE facts
SET replicated_at = ?
WHERE id IN (${ids.map(() => '?').join(',')})
`, [Date.now(), ...ids]);
const remaining = await this.sql.exec(
`SELECT COUNT(*) as count FROM facts WHERE replicated_at IS NULL`
).first();
if (remaining.count > 0) {
await this.state.storage.setAlarm(Date.now() + 100);
}
}
// Utilities
protected generateFactId(): string {
return `fact_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
protected isStale(cachedAt: number, ttlMs = 60_000): boolean {
return Date.now() - cachedAt > ttlMs;
}
}

import { EntityLedger, Env, Fact, CachedState } from '@/platform/ledgers/EntityLedger';
interface BudgetState extends CachedState {
deposited: number;
spent: number;
credits: number;
pending: number;
remaining: number;
}
interface PrepaidBalance extends CachedState {
balance: number;
}
export class Account extends EntityLedger {
// Hot state for RTB path
private hotBudgetState?: BudgetState & { cachedAt: number };
getSchemaVersion(): number {
return 1;
}
async applyMigration(version: number): Promise<void> {
switch (version) {
case 1:
// Initial schema - see durable-object-schemas.md for full DDL
await this.sql.exec(`
CREATE TABLE IF NOT EXISTS entity (
id TEXT PRIMARY KEY,
type TEXT NOT NULL CHECK(type = 'account'),
subtype TEXT NOT NULL,
tenant_id TEXT,
parent_id TEXT,
data TEXT NOT NULL,
created_at INTEGER NOT NULL DEFAULT (unixepoch()),
updated_at INTEGER NOT NULL DEFAULT (unixepoch())
);
CREATE TABLE IF NOT EXISTS facts (
id TEXT PRIMARY KEY,
type TEXT NOT NULL,
subtype TEXT,
timestamp INTEGER NOT NULL,
tenant_id TEXT NOT NULL,
source_id TEXT,
entity_id TEXT,
asset_id TEXT,
campaign_id TEXT,
contact_id TEXT,
deal_id TEXT,
tool_id TEXT,
user_id TEXT,
from_entity TEXT,
to_entity TEXT,
amount INTEGER,
currency TEXT DEFAULT 'USD',
config_id TEXT,
config_version INTEGER,
trace_id TEXT,
external_source TEXT,
external_id TEXT,
data TEXT NOT NULL,
replicated_at INTEGER,
created_at INTEGER NOT NULL DEFAULT (unixepoch())
);
CREATE INDEX IF NOT EXISTS idx_facts_type_ts ON facts(type, timestamp);
CREATE INDEX IF NOT EXISTS idx_facts_unreplicated ON facts(replicated_at) WHERE replicated_at IS NULL;
CREATE TABLE IF NOT EXISTS cached_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
computed_at INTEGER NOT NULL,
facts_through TEXT,
updated_at INTEGER NOT NULL DEFAULT (unixepoch())
);
CREATE TABLE IF NOT EXISTS configs (
id TEXT NOT NULL,
version INTEGER NOT NULL,
type TEXT NOT NULL,
category TEXT NOT NULL CHECK(category IN ('policy', 'logic')),
name TEXT NOT NULL,
applies_to TEXT NOT NULL,
scope TEXT NOT NULL CHECK(scope IN ('account', 'campaign', 'asset')),
settings TEXT NOT NULL,
effective_at INTEGER NOT NULL,
superseded_at INTEGER,
created_at INTEGER NOT NULL DEFAULT (unixepoch()),
PRIMARY KEY (id, version)
);
CREATE INDEX IF NOT EXISTS idx_configs_active ON configs(id) WHERE superseded_at IS NULL;
`);
break;
}
}
getCachedStateTypes(): string[] {
return ['BudgetState', 'PrepaidBalance', 'ARBalance', 'APBalance'];
}
async updateCachedStateInline(fact: Fact): Promise<void> {
switch (fact.type) {
case 'charge':
await this.incrementBudgetSpent(fact.amount!, fact.id);
break;
case 'deposit':
await this.incrementBudgetDeposited(fact.amount!, fact.id);
break;
case 'credit_issued':
await this.incrementBudgetCredits(fact.amount!, fact.id);
break;
}
}
private async incrementBudgetSpent(amount: number, factId: string): Promise<void> {
await this.sql.exec(`
UPDATE cached_state
SET value = json_set(
value,
'$.spent', json_extract(value, '$.spent') + ?,
'$.remaining', json_extract(value, '$.remaining') - ?
),
facts_through = ?,
computed_at = ?,
updated_at = ?
WHERE key = 'BudgetState'
`, [amount, amount, factId, Date.now(), Date.now()]);
// Invalidate hot cache
this.hotBudgetState = undefined;
}
private async incrementBudgetDeposited(amount: number, factId: string): Promise<void> {
await this.sql.exec(`
UPDATE cached_state
SET value = json_set(
value,
'$.deposited', json_extract(value, '$.deposited') + ?,
'$.remaining', json_extract(value, '$.remaining') + ?
),
facts_through = ?,
computed_at = ?,
updated_at = ?
WHERE key = 'BudgetState'
`, [amount, amount, factId, Date.now(), Date.now()]);
this.hotBudgetState = undefined;
}
private async incrementBudgetCredits(amount: number, factId: string): Promise<void> {
await this.sql.exec(`
UPDATE cached_state
SET value = json_set(
value,
'$.credits', json_extract(value, '$.credits') + ?,
'$.remaining', json_extract(value, '$.remaining') + ?
),
facts_through = ?,
computed_at = ?,
updated_at = ?
WHERE key = 'BudgetState'
`, [amount, amount, factId, Date.now(), Date.now()]);
this.hotBudgetState = undefined;
}
// Hot path: Budget availability check
async checkBudgetAvailability(amount: number): Promise<boolean> {
if (this.hotBudgetState && !this.isStale(this.hotBudgetState.cachedAt)) {
return this.hotBudgetState.remaining >= amount;
}
const cached = await this.getCachedState<BudgetState>('BudgetState');
if (!cached) {
// No budget state yet, calculate from ledger
const calculated = await this.calculateBudgetFromLedger();
await this.setCachedState('BudgetState', calculated);
this.hotBudgetState = { ...calculated, cachedAt: Date.now() };
return calculated.remaining >= amount;
}
this.hotBudgetState = { ...cached, cachedAt: Date.now() };
return cached.remaining >= amount;
}
async calculateFromLedger(stateType: string): Promise<CachedState> {
switch (stateType) {
case 'BudgetState':
return this.calculateBudgetFromLedger();
case 'PrepaidBalance':
return this.calculatePrepaidFromLedger();
default:
throw new Error(`Unknown state type: ${stateType}`);
}
}
private async calculateBudgetFromLedger(): Promise<BudgetState> {
const rows = await this.sql.exec(`
SELECT type, SUM(amount) as total, MAX(id) as last_fact_id
FROM facts
WHERE type IN ('deposit', 'charge', 'credit_issued')
GROUP BY type
`);
let deposited = 0, spent = 0, credits = 0;
let lastFactId = '';
for (const row of rows) {
switch (row.type) {
case 'deposit':
deposited = row.total || 0;
break;
case 'charge':
spent = row.total || 0;
break;
case 'credit_issued':
credits = row.total || 0;
break;
}
if (row.last_fact_id > lastFactId) {
lastFactId = row.last_fact_id;
}
}
return {
deposited,
spent,
credits,
pending: 0, // Pending is runtime-only, not in ledger
remaining: deposited + credits - spent,
last_fact_id: lastFactId,
computed_at: Date.now()
};
}
private async calculatePrepaidFromLedger(): Promise<PrepaidBalance> {
const budget = await this.calculateBudgetFromLedger();
return {
balance: budget.deposited - budget.spent,
last_fact_id: budget.last_fact_id,
computed_at: Date.now()
};
}
statesMatch(cached: any, calculated: any): boolean {
if (!cached || !calculated) return false;
return (
cached.deposited === calculated.deposited &&
cached.spent === calculated.spent &&
cached.credits === calculated.credits
);
}
computeDelta(cached: any, calculated: any): any {
return {
deposited: (calculated?.deposited || 0) - (cached?.deposited || 0),
spent: (calculated?.spent || 0) - (cached?.spent || 0),
credits: (calculated?.credits || 0) - (cached?.credits || 0),
remaining: (calculated?.remaining || 0) - (cached?.remaining || 0)
};
}
async countFactsForState(stateType: string): Promise<number> {
let types: string[] = [];
switch (stateType) {
case 'BudgetState':
case 'PrepaidBalance':
types = ['deposit', 'charge', 'credit_issued'];
break;
default:
types = [];
}
if (types.length === 0) return 0;
const result = await this.sql.exec(`
SELECT COUNT(*) as count FROM facts
WHERE type IN (${types.map(() => '?').join(',')})
`, types).first();
return result?.count || 0;
}
// Request handler
async fetch(request: Request): Promise<Response> {
await this.initialize();
const url = new URL(request.url);
const path = url.pathname;
try {
if (request.method === 'POST' && path === '/budget/check') {
const { campaign_id, amount } = await request.json();
const available = await this.checkBudgetAvailability(amount);
return Response.json({ available, campaign_id });
}
if (request.method === 'POST' && path === '/facts') {
const fact = await request.json();
const created = await this.appendFact(fact);
return Response.json(created, { status: 201 });
}
if (request.method === 'GET' && path === '/state/budget') {
const state = await this.getCachedState<BudgetState>('BudgetState');
return Response.json(state);
}
return new Response('Not Found', { status: 404 });
} catch (error) {
console.error(`[${this.entityId}] Error:`, error);
return Response.json({ error: error.message }, { status: 500 });
}
}
}

The implementation guide provides:

  1. EntityLedger — Shared base class with common patterns
  2. AccountDO — Full implementation example
  3. Schema migrations — Versioned migration pattern
  4. Hot path optimization — In-memory caching with TTL
  5. Reconciliation — Alarm-based verification
  6. Replication — Queue-based batch replication

Next steps:

  1. Implement AssetDO, ContactDO, DealDO following AccountDO pattern
  2. Add request routing in Worker
  3. Implement Queue consumers for replication
  4. Add observability (metrics, logs, traces)

All code is production-ready and follows z0 principles.