ProjectionEngine
Fact aggregation system for building materialized views from event streams.
Prerequisites: yaml-manifests.md
Overview
Section titled “Overview”The ProjectionEngine processes facts and generates aggregations based on declarative YAML configuration. It transforms raw event streams into queryable metrics, supporting:
- 6 aggregation functions - count, sum, avg, min, max, count_distinct
- Time windows - bucket by minute, hour, day, or month
- Multi-dimensional analysis - groupBy multiple fields
- Materialization - store results in CachedState for instant retrieval
ProjectionEngine is ideal for:
- Analytics dashboards - page views, sessions, conversions
- Usage tracking - API calls, compute minutes, storage bytes
- Business metrics - revenue, orders, active users
- Operational monitoring - error rates, latency percentiles
import { parseProjectionConfig, ProjectionEngine } from '@z0-app/sdk';
const yaml = `source: api_requestfactTypes: [request_completed]timeWindow: hourgroupBy: [endpoint, status]aggregations: - field: count function: count - field: data.duration_ms function: avgmaterialize: true`;
const config = parseProjectionConfig(yaml);const engine = new ProjectionEngine(config);const results = engine.process(facts);// => { "2026-03-12T10:US:/api/v1:200": { count: 42, duration_ms_avg: 120.5 } }Configuration
Section titled “Configuration”ProjectionEngine uses YAML configuration parsed via parseProjectionConfig() or can be constructed from a versioned Config<ProjectionConfigSettings> object for full z0 Config pattern integration.
Signature
Section titled “Signature”function parseProjectionConfig(yaml: string): ProjectionConfig
interface ProjectionConfig { source: string; // Source entity type factTypes: string[]; // Fact types to include timeWindow?: TimeWindow; // Optional time bucketing groupBy?: string[]; // Optional grouping fields aggregations: Aggregation[]; // Aggregations to compute materialize: boolean; // Store in CachedState? storageVersion?: number; // Storage format version (1=blob, 2=sparse)}
interface ProjectionConfigSettings { source: string; // Source entity type factTypes: string[]; // Fact types to include timeWindow?: TimeWindow; // Optional time bucketing groupBy?: string[]; // Optional grouping fields aggregations: Aggregation[]; // Aggregations to compute materialize: boolean; // Store in CachedState? storageVersion: number; // Storage format version (1=blob, 2=sparse) rollup?: RollupConfig; // Optional auto-rollup configuration}
type TimeWindow = 'minute' | 'hour' | 'day' | 'month';
interface Aggregation { field: string; // Field to aggregate (use 'count' for count function) function: AggregationFunction;}
type AggregationFunction = 'count' | 'sum' | 'avg' | 'min' | 'max' | 'count_distinct';Error Handling
Section titled “Error Handling”import { parseProjectionConfig, ConfigParseError, ConfigValidationError} from '@z0-app/sdk';
try { const config = parseProjectionConfig(yamlString);} catch (err) { if (err instanceof ConfigParseError) { console.error('Invalid YAML syntax:', err.message); } else if (err instanceof ConfigValidationError) { console.error('Config validation failed:', err.message); console.error('Issues:', err.issues); // Array of specific errors }}Storage Formats (v0.10.0+)
Section titled “Storage Formats (v0.10.0+)”Sparse Storage
Section titled “Sparse Storage”ProjectionEngine supports two storage formats controlled by the storageVersion field:
- storageVersion: 1 (default, blob format) - All buckets stored as single JSON blob
- storageVersion: 2 (sparse format) - Each bucket stored as separate key (recommended for large datasets)
Sparse storage benefits:
- Efficient time-range queries - Load only buckets in range, not entire dataset
- Incremental updates - Update single bucket without reading/writing all data
- Lower memory usage - Process one bucket at a time
- Better for high-cardinality groupBy - Scale to millions of buckets
Bucket Key Format
Section titled “Bucket Key Format”When using sparse storage (storageVersion: 2), buckets are stored with this key pattern:
{projectionKey}:{timeWindow}:{bucketKey}{projectionKey}:{timeWindow}:{bucketKey}:{groupBy[0]}:{groupBy[1]}:...Examples:
// Time window only"daily_usage:day:2026-03-12"
// Time window + single groupBy"api_metrics:hour:2026-03-12T10:US"
// Time window + multiple groupBy"orders:day:2026-03-12:product_category=electronics:payment_method=credit"Enabling Sparse Storage
Section titled “Enabling Sparse Storage”source: api_requestfactTypes: [completed]timeWindow: daygroupBy: [data.country]aggregations: - field: count function: countmaterialize: truestorageVersion: 2 # Enable sparse storageconst config = parseProjectionConfig(yaml);const engine = new ProjectionEngine(config);
// Each day+country bucket stored separatelyconst result = engine.process(facts, { cachedStateManager, projectionKey: 'api_usage',});
// Storage keys created:// - api_usage:day:2026-03-12:US// - api_usage:day:2026-03-12:UK// - api_usage:day:2026-03-13:USMigration Between Storage Versions
Section titled “Migration Between Storage Versions”When storageVersion changes in config, ProjectionEngine automatically migrates data and emits migration facts for audit trail:
import { PROJECTION_FACT_TYPES } from '@z0-app/sdk';
// Facts emitted during migration:// - projection.migration_started// - projection.migration_completed (with duration and bucket count)// - projection.migration_failed (on error, original data intact)Migration is triggered when:
- Config
storageVersionchanges (e.g., 1 → 2) - Engine processes facts with new config version
- CachedStateManager detects version mismatch
Migration safety:
- Original data preserved until migration succeeds
- Atomic switch to new format
- Automatic rollback on failure
Aggregation Functions
Section titled “Aggregation Functions”ProjectionEngine supports six aggregation functions:
Count the number of facts in each bucket.
aggregations: - field: count function: countResult key: count
Example:
const config = parseProjectionConfig(`source: pageviewfactTypes: [viewed]aggregations: - field: count function: count`);const engine = new ProjectionEngine(config);const result = engine.process(facts);// => { count: 42 }Sum numeric values across facts.
aggregations: - field: data.amount function: sumResult key: {field}_sum (e.g., amount_sum)
Example:
// Facts: [{ data: { amount: 100 } }, { data: { amount: 250 } }]// Result: { amount_sum: 350 }Calculate the mean of numeric values.
aggregations: - field: data.duration_ms function: avgResult key: {field}_avg (e.g., duration_ms_avg)
Example:
// Facts: [{ data: { duration_ms: 100 } }, { data: { duration_ms: 200 } }]// Result: { duration_ms_avg: 150 }Find the minimum numeric value.
aggregations: - field: data.response_time function: minResult key: {field}_min (e.g., response_time_min)
Example:
// Facts: [{ data: { response_time: 50 } }, { data: { response_time: 120 } }]// Result: { response_time_min: 50 }Find the maximum numeric value.
aggregations: - field: data.response_time function: maxResult key: {field}_max (e.g., response_time_max)
Example:
// Facts: [{ data: { response_time: 50 } }, { data: { response_time: 120 } }]// Result: { response_time_max: 120 }count_distinct
Section titled “count_distinct”Count unique values (uses Set internally).
aggregations: - field: data.user_id function: count_distinctResult key: {field}_distinct (e.g., user_id_distinct)
Example:
// Facts: [// { data: { user_id: 'u1' } },// { data: { user_id: 'u1' } },// { data: { user_id: 'u2' } }// ]// Result: { user_id_distinct: 2 }Time Windows
Section titled “Time Windows”Time windows bucket facts by timestamp for temporal analysis.
Available Windows
Section titled “Available Windows”| Window | Format | Example |
|---|---|---|
minute | YYYY-MM-DDTHH:mm | 2026-03-12T10:15 |
hour | YYYY-MM-DDTHH | 2026-03-12T10 |
day | YYYY-MM-DD | 2026-03-12 |
month | YYYY-MM | 2026-03 |
All timestamps use UTC.
Configuration
Section titled “Configuration”source: pageviewfactTypes: [viewed]timeWindow: hour # Bucket by houraggregations: - field: count function: countExample: Hourly Page Views
Section titled “Example: Hourly Page Views”const config = parseProjectionConfig(`source: pageviewfactTypes: [viewed]timeWindow: houraggregations: - field: count function: count`);
const facts = [ { type: 'viewed', timestamp: Date.UTC(2026, 2, 12, 10, 15), data: {} }, { type: 'viewed', timestamp: Date.UTC(2026, 2, 12, 10, 45), data: {} }, { type: 'viewed', timestamp: Date.UTC(2026, 2, 12, 11, 5), data: {} },];
const engine = new ProjectionEngine(config);const result = engine.process(facts);
console.log(result);// Output:// {// "2026-03-12T10": { count: 2 },// "2026-03-12T11": { count: 1 }// }Multi-Dimensional Aggregation with groupBy
Section titled “Multi-Dimensional Aggregation with groupBy”The groupBy field enables dimensional analysis by grouping facts by one or more fields.
Configuration
Section titled “Configuration”source: api_requestfactTypes: [completed]groupBy: [data.endpoint, data.status]aggregations: - field: count function: count - field: data.duration_ms function: avgExample: API Metrics by Endpoint and Status
Section titled “Example: API Metrics by Endpoint and Status”const config = parseProjectionConfig(`source: api_requestfactTypes: [completed]groupBy: [data.endpoint, data.status]aggregations: - field: count function: count - field: data.duration_ms function: avg`);
const facts = [ { type: 'completed', data: { endpoint: '/api/v1', status: 200, duration_ms: 100 } }, { type: 'completed', data: { endpoint: '/api/v1', status: 200, duration_ms: 150 } }, { type: 'completed', data: { endpoint: '/api/v1', status: 500, duration_ms: 2000 } }, { type: 'completed', data: { endpoint: '/api/v2', status: 200, duration_ms: 80 } },];
const engine = new ProjectionEngine(config);const result = engine.process(facts);
console.log(result);// Output:// {// "/api/v1:200": { count: 2, duration_ms_avg: 125 },// "/api/v1:500": { count: 1, duration_ms_avg: 2000 },// "/api/v2:200": { count: 1, duration_ms_avg: 80 }// }Combining Time Windows and groupBy
Section titled “Combining Time Windows and groupBy”source: api_requestfactTypes: [completed]timeWindow: daygroupBy: [data.country]aggregations: - field: count function: countBucket key format: {timeWindow}:{groupBy[0]}:{groupBy[1]}:...
Example result:
{ "2026-03-12:US": { count: 42 }, "2026-03-12:UK": { count: 18 }, "2026-03-13:US": { count: 55 }}Materialization in CachedState
Section titled “Materialization in CachedState”When materialize: true, ProjectionEngine stores aggregation results in CachedState for instant retrieval.
Why Materialize?
Section titled “Why Materialize?”- Fast queries - Pre-computed results, no fact scanning
- Incremental updates - Process only new facts
- Consistency - Tracks
factsThroughIdto detect gaps
Configuration
Section titled “Configuration”source: usagefactTypes: [api_call, compute_used]timeWindow: dayaggregations: - field: count function: countmaterialize: true # Store in CachedStateUsage with CachedStateManager
Section titled “Usage with CachedStateManager”import { ProjectionEngine, parseProjectionConfig } from '@z0-app/sdk';import type { CachedStateManager } from '@z0-app/sdk';
const config = parseProjectionConfig(`source: usagefactTypes: [api_call]timeWindow: dayaggregations: - field: count function: countmaterialize: true`);
const engine = new ProjectionEngine(config);
// Process facts and materializeconst result = engine.process(facts, { cachedStateManager, // CachedStateManager instance projectionKey: 'daily_usage', // Key for storing in CachedState factsThroughId: 'fact_xyz', // Latest fact ID processed});
// Later, retrieve from CachedState without recomputingconst cached = cachedStateManager.get('daily_usage');console.log(cached);// => {// value: { "2026-03-12": { count: 42 } },// factsThroughId: "fact_xyz"// }Incremental Updates
Section titled “Incremental Updates”When new facts arrive, only process facts after factsThroughId:
// Initial computationconst result1 = engine.process(facts, { cachedStateManager, projectionKey: 'daily_usage', factsThroughId: 'fact_100',});
// Later, new facts arrive (fact_101, fact_102, ...)const newFacts = facts.filter(f => f.id > 'fact_100');const result2 = engine.process(newFacts, { cachedStateManager, projectionKey: 'daily_usage', factsThroughId: 'fact_150',});// CachedState now updated with new aggregationsAccessing Nested Fields
Section titled “Accessing Nested Fields”Use dot notation to access nested fields in fact data:
const config = parseProjectionConfig(`source: orderfactTypes: [placed]aggregations: - field: data.payment.amount function: sum - field: data.customer.country function: count_distinct`);
// Facts:// [// { type: 'placed', data: { payment: { amount: 100 }, customer: { country: 'US' } } },// { type: 'placed', data: { payment: { amount: 250 }, customer: { country: 'UK' } } }// ]//// Result:// {// payment_amount_sum: 350,// customer_country_distinct: 2// }Complete Examples
Section titled “Complete Examples”Example 1: Analytics Dashboard
Section titled “Example 1: Analytics Dashboard”Track page views and engagement by hour:
source: pageviewfactTypes: [viewed, engaged]timeWindow: hourgroupBy: [data.page_type]aggregations: - field: count function: count - field: data.duration_ms function: avgmaterialize: trueconst config = parseProjectionConfig(yaml);const engine = new ProjectionEngine(config);
const facts = [ { type: 'viewed', timestamp: Date.UTC(2026, 2, 12, 10, 15), data: { page_type: 'homepage', duration_ms: 5000 } }, { type: 'viewed', timestamp: Date.UTC(2026, 2, 12, 10, 30), data: { page_type: 'homepage', duration_ms: 3000 } }, { type: 'engaged', timestamp: Date.UTC(2026, 2, 12, 10, 45), data: { page_type: 'product', duration_ms: 12000 } },];
const result = engine.process(facts, { cachedStateManager, projectionKey: 'hourly_analytics', factsThroughId: facts[facts.length - 1].id,});
console.log(result);// {// "2026-03-12T10:homepage": { count: 2, duration_ms_avg: 4000 },// "2026-03-12T10:product": { count: 1, duration_ms_avg: 12000 }// }Example 2: Usage Metering
Section titled “Example 2: Usage Metering”Calculate daily usage per customer:
source: usagefactTypes: [api_call, compute_used, storage_used]timeWindow: daygroupBy: [data.customer_id]aggregations: - field: count function: count - field: data.compute_ms function: sum - field: data.storage_bytes function: maxmaterialize: trueconst config = parseProjectionConfig(yaml);const engine = new ProjectionEngine(config);
const facts = [ { type: 'api_call', timestamp: Date.UTC(2026, 2, 12, 8, 0), data: { customer_id: 'c1', compute_ms: 100 } }, { type: 'api_call', timestamp: Date.UTC(2026, 2, 12, 14, 0), data: { customer_id: 'c1', compute_ms: 200 } }, { type: 'storage_used', timestamp: Date.UTC(2026, 2, 12, 20, 0), data: { customer_id: 'c1', storage_bytes: 1024 } },];
const result = engine.process(facts, { cachedStateManager, projectionKey: 'daily_usage', factsThroughId: facts[facts.length - 1].id,});
console.log(result);// {// "2026-03-12:c1": {// count: 3,// compute_ms_sum: 300,// storage_bytes_max: 1024// }// }Example 3: Error Rate Monitoring
Section titled “Example 3: Error Rate Monitoring”Track errors by endpoint and calculate percentages:
source: api_requestfactTypes: [completed]timeWindow: minutegroupBy: [data.endpoint, data.status_category]aggregations: - field: count function: count - field: data.duration_ms function: avg - field: data.duration_ms function: maxmaterialize: falseconst config = parseProjectionConfig(yaml);const engine = new ProjectionEngine(config);
const facts = [ { type: 'completed', timestamp: Date.UTC(2026, 2, 12, 10, 15, 5), data: { endpoint: '/api/users', status_category: '2xx', duration_ms: 50 } }, { type: 'completed', timestamp: Date.UTC(2026, 2, 12, 10, 15, 10), data: { endpoint: '/api/users', status_category: '2xx', duration_ms: 75 } }, { type: 'completed', timestamp: Date.UTC(2026, 2, 12, 10, 15, 20), data: { endpoint: '/api/users', status_category: '5xx', duration_ms: 2000 } },];
const result = engine.process(facts);
console.log(result);// {// "2026-03-12T10:15:/api/users:2xx": { count: 2, duration_ms_avg: 62.5, duration_ms_max: 75 },// "2026-03-12T10:15:/api/users:5xx": { count: 1, duration_ms_avg: 2000, duration_ms_max: 2000 }// }
// Calculate error rateconst success = result["2026-03-12T10:15:/api/users:2xx"].count;const error = result["2026-03-12T10:15:/api/users:5xx"].count;const errorRate = error / (success + error);console.log(`Error rate: ${(errorRate * 100).toFixed(2)}%`); // "33.33%"Time Range Queries (v0.10.0+)
Section titled “Time Range Queries (v0.10.0+)”Query specific time ranges without loading the entire projection dataset.
query() Method
Section titled “query() Method”engine.query<T>( projectionKey: string, options?: TimeRangeQueryOptions): Promise<Record<string, T>>
interface TimeRangeQueryOptions { from?: number; // Start timestamp (inclusive) to?: number; // End timestamp (inclusive) aggregate?: boolean; // Aggregate all buckets into single result mode?: QueryMode; // 'materialized' (default) or 'compute'}Performance: O(k) where k = number of buckets in range (not O(n) total buckets)
Examples
Section titled “Examples”Query specific date range:
const engine = new ProjectionEngine(config);
// Get March 12-14 data onlyconst result = await engine.query('daily_usage', { from: Date.UTC(2026, 2, 12, 0, 0, 0), to: Date.UTC(2026, 2, 14, 23, 59, 59),});
// Returns only buckets in range:// {// "2026-03-12:US": { count: 42 },// "2026-03-13:US": { count: 55 },// "2026-03-14:US": { count: 38 }// }Open-ended ranges:
// All data from March 12 onwardsconst fromMarch = await engine.query('daily_usage', { from: Date.UTC(2026, 2, 12),});
// All data up to March 14const throughMarch = await engine.query('daily_usage', { to: Date.UTC(2026, 2, 14, 23, 59, 59),});Aggregate across range:
// Single aggregated result for entire weekconst weekTotal = await engine.query('daily_usage', { from: Date.UTC(2026, 2, 10), to: Date.UTC(2026, 2, 16, 23, 59, 59), aggregate: true,});
// Returns:// {// "aggregated": {// count: 300, // sum of all counts// duration_ms_avg: 125.5 // weighted average// }// }Query Modes (v0.10.0+)
Section titled “Query Modes (v0.10.0+)”ProjectionEngine supports two query modes for different use cases:
Materialized Mode (Default)
Section titled “Materialized Mode (Default)”Reads from stored CachedState. Fast but requires prior materialization.
const result = await engine.query('daily_usage', { mode: 'materialized', // default});Use when:
- Data is already materialized
- Query performance is critical
- You want to read pre-computed results
Compute Mode
Section titled “Compute Mode”Computes directly from source facts without touching storage. No side effects.
const result = await engine.query('daily_usage', { from: Date.UTC(2026, 2, 12), to: Date.UTC(2026, 2, 14), mode: 'compute',});Use when:
- Running ad-hoc queries on historical data
- Testing projections before materializing
- You don’t want to modify cached state
- Debugging or validation
Characteristics:
- No reads from CachedState
- No writes to CachedState
- Computes aggregations on-demand from facts
- Can be combined with time range filtering
Incremental Processing (v0.10.0+)
Section titled “Incremental Processing (v0.10.0+)”Process individual facts with O(1) complexity instead of reprocessing all data.
processIncremental() Method
Section titled “processIncremental() Method”engine.processIncremental( fact: Fact, options?: ProjectionProcessOptions): Promise<void>Benefits:
- O(1) complexity - Updates only affected bucket
- Idempotent - Safe to process same fact multiple times
- Real-time updates - Process facts as they arrive
- Correct aggregations - Maintains running sums/counts for avg, count_distinct
Example
Section titled “Example”const config = parseProjectionConfig(`source: api_requestfactTypes: [completed]timeWindow: houraggregations: - field: count function: count - field: data.duration_ms function: avgmaterialize: truestorageVersion: 2`);
const engine = new ProjectionEngine(config);
// Process individual fact incrementallyawait engine.processIncremental( { id: 'fact_123', type: 'completed', timestamp: Date.UTC(2026, 2, 12, 10, 15), data: { duration_ms: 150 }, }, { cachedStateManager, projectionKey: 'api_metrics', });
// Only the 2026-03-12T10 bucket is read, updated, and written// No need to load or reprocess other hoursIdempotency
Section titled “Idempotency”ProcessIncremental tracks processed fact IDs to prevent double-counting:
// Process same fact twice - second call is no-opawait engine.processIncremental(fact, options);await engine.processIncremental(fact, options); // No effectOut-of-Order Facts
Section titled “Out-of-Order Facts”Late-arriving facts update historical buckets correctly:
// Process facts out of orderawait engine.processIncremental(factFromMarch14, options); // Creates Mar 14 bucketawait engine.processIncremental(factFromMarch12, options); // Updates Mar 12 bucketawait engine.processIncremental(factFromMarch13, options); // Creates Mar 13 bucket
// Each bucket maintained independentlyAuto-Rollups (v0.10.0+)
Section titled “Auto-Rollups (v0.10.0+)”Automatically aggregate fine-grained buckets into coarser time windows (minute → hour → day → month).
Configuration
Section titled “Configuration”source: api_requestfactTypes: [completed]timeWindow: minuteaggregations: - field: count function: countmaterialize: truerollup: enabled: true retention: minute: 7 # Keep 7 days of minute buckets hour: 90 # Keep 90 days of hour buckets day: 730 # Keep 2 years of day buckets month: 0 # Keep month buckets foreverRollup Hierarchy
Section titled “Rollup Hierarchy”minute (60 buckets) → hour (24 buckets) → day (28-31 buckets) → monthRollup triggers:
- Minute → Hour: After 60 minutes of data in an hour
- Hour → Day: At day boundary (24 hours complete)
- Day → Month: At month boundary (all days in month complete)
Aggregation Behavior
Section titled “Aggregation Behavior”| Function | Rollup Method |
|---|---|
count | Sum child counts |
sum | Sum child sums |
avg | Weighted average (using counts) |
min | Min of child mins |
max | Max of child maxes |
count_distinct | HLL approximation or disabled* |
*count_distinct either uses HyperLogLog for approximate rollup or is skipped with documented limitation.
Retention Policy
Section titled “Retention Policy”After rollup, fine-grained buckets older than retention period are deleted:
// Configuration with 7-day minute retentionrollup: enabled: true retention: minute: 7 // After 7 days, minute buckets rolled up and deleted
// Timeline:// Day 0-7: Minute buckets + hour buckets exist// Day 8+: Only hour buckets (minute buckets deleted after rollup)Retention value of 0 means keep forever (no cleanup).
Example
Section titled “Example”const config = parseProjectionConfig(`source: pageviewfactTypes: [viewed]timeWindow: minuteaggregations: - field: count function: count - field: data.duration_ms function: avgmaterialize: truestorageVersion: 2rollup: enabled: true retention: minute: 1 hour: 7 day: 90`);
const engine = new ProjectionEngine(config);
// Process a week's worth of facts at minute granularityawait engine.process(weekOfFacts, { cachedStateManager, projectionKey: 'pageviews',});
// Storage state after rollup:// - Last 24 hours: minute + hour buckets// - Last 7 days: hour + day buckets// - Last 90 days: day buckets// - Older: month bucketsBest Practices
Section titled “Best Practices”1. Choose Appropriate Time Windows
Section titled “1. Choose Appropriate Time Windows”- minute - Real-time monitoring (heavy write load)
- hour - Operational dashboards
- day - Usage metering, billing
- month - Business analytics, long-term trends
2. Limit groupBy Dimensions
Section titled “2. Limit groupBy Dimensions”Each unique combination creates a bucket. Avoid high-cardinality fields:
# ❌ BAD - Creates millions of bucketsgroupBy: [data.user_id, data.session_id]
# ✅ GOOD - Bounded cardinalitygroupBy: [data.country, data.plan_type]3. Materialize Heavy Aggregations
Section titled “3. Materialize Heavy Aggregations”If computing the projection is expensive (many facts, complex aggregations), set materialize: true.
4. Use Incremental Processing
Section titled “4. Use Incremental Processing”For materialized projections, only process new facts:
// Track last processed fact IDconst lastFactId = cachedStateManager.get('daily_usage')?.factsThroughId;
// Get new facts onlyconst newFacts = facts.filter(f => f.id > lastFactId);
// Incremental updateengine.process(newFacts, { cachedStateManager, projectionKey: 'daily_usage', factsThroughId: newFacts[newFacts.length - 1].id,});5. Validate Configs at Boot Time
Section titled “5. Validate Configs at Boot Time”import { parseProjectionConfig, ConfigValidationError } from '@z0-app/sdk';
const projectionYamls = ['daily_usage.yaml', 'error_rates.yaml'];
for (const yamlFile of projectionYamls) { try { const yaml = fs.readFileSync(yamlFile, 'utf-8'); parseProjectionConfig(yaml); // Validate } catch (err) { if (err instanceof ConfigValidationError) { console.error(`Invalid projection config in ${yamlFile}:`); err.issues.forEach(issue => console.error(` - ${issue}`)); process.exit(1); } }}6. Use Config Pattern for Versioning
Section titled “6. Use Config Pattern for Versioning”Wrap ProjectionConfig in z0’s Config pattern for versioned configuration:
import type { Config } from '@z0-app/sdk';import { ProjectionConfigSettings, ProjectionEngine } from '@z0-app/sdk';
// Store as versioned configconst projectionConfig: Config<ProjectionConfigSettings> = { id: 'proj_daily_usage', type: 'projection', scope: 'platform', version: 1, tenant_id: 'system', settings: { source: 'api_request', factTypes: ['completed'], timeWindow: 'day', aggregations: [{ field: 'count', function: 'count' }], materialize: true, storageVersion: 2, }, effective_at: Date.now(),};
// Construct engine from Configconst engine = new ProjectionEngine(projectionConfig);
// Config version tracked in metadataconst result = engine.process(facts, options);// result._meta.config_version === 1Benefits:
- Audit trail of configuration changes
- Rollback to previous versions
- Track which config version produced which results
7. Use Sparse Storage for Large Datasets
Section titled “7. Use Sparse Storage for Large Datasets”Enable storageVersion: 2 when:
- Dataset has many time buckets (months/years of data)
- High-cardinality groupBy fields
- Need efficient time-range queries
- Memory constraints during processing
storageVersion: 2 # Sparse storage8. Monitor Migration Facts
Section titled “8. Monitor Migration Facts”Track projection migrations using PROJECTION_FACT_TYPES:
import { PROJECTION_FACT_TYPES } from '@z0-app/sdk';
// Listen for migration eventsfactManager.on('fact', (fact) => { if (fact.type === PROJECTION_FACT_TYPES.MIGRATION_STARTED) { console.log('Migration started:', fact.data); } if (fact.type === PROJECTION_FACT_TYPES.MIGRATION_COMPLETED) { console.log('Migration completed in', fact.data.duration_ms, 'ms'); }});Summary
Section titled “Summary”| Feature | Description | Example |
|---|---|---|
| Aggregation Functions | count, sum, avg, min, max, count_distinct | { field: 'data.amount', function: 'sum' } |
| Time Windows | minute, hour, day, month | timeWindow: hour |
| Multi-Dimensional | groupBy multiple fields | groupBy: [data.country, data.plan] |
| Sparse Storage (v0.10.0+) | Each bucket as separate key | storageVersion: 2 |
| Time Range Queries (v0.10.0+) | Query specific date ranges | query(key, { from, to }) |
| Incremental Processing (v0.10.0+) | O(1) per-fact updates | processIncremental(fact) |
| Query Modes (v0.10.0+) | Materialized or compute | mode: 'compute' |
| Auto-Rollups (v0.10.0+) | Minute→hour→day→month | rollup: { enabled: true } |
| Materialization | Store in CachedState | materialize: true |
| Config Pattern (v0.10.0+) | Versioned configuration | Config<ProjectionConfigSettings> |
| Migration Facts (v0.10.0+) | Audit trail for changes | PROJECTION_FACT_TYPES |
| Nested Fields | Dot notation access | field: 'data.payment.amount' |
ProjectionEngine transforms raw event streams into queryable metrics using declarative YAML configuration. v0.10.0 adds sparse storage for efficiency, incremental processing for real-time updates, time range queries for targeted analysis, and auto-rollups for long-term data retention.