CQRS and Event Sourcing: Building Scalable Event-Driven Systems
In the world of distributed systems and microservices, two powerful patterns have emerged as game-changers for building scalable, maintainable, and auditable applications: CQRS (Command Query Responsibility Segregation) and Event Sourcing. While each pattern provides significant benefits independently, their combination creates a robust architecture that addresses many challenges in modern software development.
Table of Contents
Open Table of Contents
Introduction to CQRS
Command Query Responsibility Segregation (CQRS) is an architectural pattern that separates read and write operations into different models. This separation allows each model to be optimized for its specific use case, leading to better performance, scalability, and maintainability.
Core Principles of CQRS
- Separation of Concerns: Commands (writes) and queries (reads) are handled by different models
- Optimized Data Models: Each side can use the most appropriate data structure
- Independent Scaling: Read and write sides can scale independently
- Eventual Consistency: Read models are updated asynchronously from write models
CQRS Architecture Overview
graph TB
subgraph "Client Layer"
UI[User Interface]
API[API Gateway]
end
subgraph "CQRS Pattern"
subgraph "Command Side"
CMD[Commands] --> CV[Command Validation]
CV --> CH[Command Handler]
CH --> DM[Domain Model]
DM --> AR[Aggregate Root]
AR --> WDB[(Write Database)]
end
subgraph "Query Side"
Q[Queries] --> QV[Query Validation]
QV --> QH[Query Handler]
QH --> RM[Read Models]
RM --> RDB[(Read Database)]
end
end
UI --> API
API --> CMD
API --> Q
WDB -.-> SYNC[Synchronization]
SYNC -.-> RDB
style CMD fill:#e74c3c,stroke:#c0392b,color:#fff
style Q fill:#3498db,stroke:#2980b9,color:#fff
Benefits of CQRS
- Performance Optimization: Tailor each model for its specific workload
- Scalability: Scale read and write operations independently
- Flexibility: Use different technologies for different concerns
- Security: Apply different security policies to commands and queries
- Simplified Queries: Denormalized read models make queries simpler and faster
Event Sourcing Fundamentals
Event Sourcing is a pattern where the state of an application is determined by a sequence of events. Instead of storing just the current state, Event Sourcing stores all changes to the application state as a sequence of immutable events.
Core Concepts
- Events as Source of Truth: Every state change is captured as an event
- Immutability: Events are never modified or deleted
- Event Replay: Current state can be rebuilt by replaying events
- Temporal Queries: Query the state at any point in time
- Audit Trail: Complete history of all changes
Event Sourcing Flow
graph LR
subgraph "Event Sourcing Flow"
CMD[Command] --> DM[Domain Model]
DM --> E1[Event 1]
DM --> E2[Event 2]
DM --> E3[Event 3]
E1 --> ES[(Event Store)]
E2 --> ES
E3 --> ES
ES --> ER[Event Replay]
ER --> CS[Current State]
ES --> EB[Event Bus]
EB --> P1[Projection 1]
EB --> P2[Projection 2]
EB --> P3[Projection 3]
end
style ES fill:#f39c12,stroke:#d68910,color:#fff
style EB fill:#9b59b6,stroke:#8e44ad,color:#fff
Event Structure
interface DomainEvent {
aggregateId: string; // The entity this event belongs to
eventType: string; // Type of the event
eventVersion: number; // Version for schema evolution
eventData: object; // The actual event payload
timestamp: Date; // When the event occurred
userId?: string; // Who triggered the event
correlationId?: string; // For tracking across services
causationId?: string; // What caused this event
}
// Example Events
class OrderCreatedEvent implements DomainEvent {
aggregateId: string;
eventType = "OrderCreated";
eventVersion = 1;
eventData: {
orderId: string;
customerId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
totalAmount: number;
};
timestamp: Date;
userId: string;
}
class PaymentProcessedEvent implements DomainEvent {
aggregateId: string;
eventType = "PaymentProcessed";
eventVersion = 1;
eventData: {
orderId: string;
paymentId: string;
amount: number;
method: "credit_card" | "paypal" | "bank_transfer";
transactionId: string;
};
timestamp: Date;
userId: string;
}
How CQRS and Event Sourcing Work Together
The combination of CQRS and Event Sourcing creates a powerful architecture where:
- Commands generate Events that are stored in the Event Store
- Events are the source of truth for the write model
- Projections consume events to build Read Models
- Queries are served from optimized Read Models
Combined Architecture
graph TB
subgraph "Command Flow"
C[Command] --> CH[Command Handler]
CH --> A[Aggregate]
A --> E[Events]
E --> ES[(Event Store)]
end
subgraph "Event Processing"
ES --> EB[Event Bus]
EB --> EP1[Email Projection]
EB --> EP2[Report Projection]
EB --> EP3[Search Projection]
EB --> EP4[Analytics Projection]
end
subgraph "Query Flow"
Q[Query] --> QH[Query Handler]
QH --> RM1[(Email Read Model)]
QH --> RM2[(Report Read Model)]
QH --> RM3[(Search Read Model)]
QH --> RM4[(Analytics Read Model)]
end
EP1 --> RM1
EP2 --> RM2
EP3 --> RM3
EP4 --> RM4
style C fill:#e74c3c,stroke:#c0392b,color:#fff
style Q fill:#3498db,stroke:#2980b9,color:#fff
style ES fill:#f39c12,stroke:#d68910,color:#fff
style EB fill:#9b59b6,stroke:#8e44ad,color:#fff
Implementation Example
// Command Handler with Event Sourcing
class OrderCommandHandler {
constructor(
private eventStore: EventStore,
private eventBus: EventBus
) {}
async handle(command: CreateOrderCommand): Promise<void> {
// Load aggregate from events
const events = await this.eventStore.getEvents(command.orderId);
const order = Order.fromEvents(events);
// Execute business logic
order.create(command.customerId, command.items, command.shippingAddress);
// Get new events
const newEvents = order.getUncommittedEvents();
// Save events to event store
await this.eventStore.saveEvents(command.orderId, newEvents, order.version);
// Publish events for projections
await this.eventBus.publishBatch(newEvents);
}
}
// Projection Handler
class OrderProjectionHandler {
constructor(private readModelDb: Database) {}
async handle(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case "OrderCreated":
await this.handleOrderCreated(event);
break;
case "OrderShipped":
await this.handleOrderShipped(event);
break;
case "OrderDelivered":
await this.handleOrderDelivered(event);
break;
}
}
private async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
// Create denormalized read model
await this.readModelDb.insert("order_summaries", {
order_id: event.eventData.orderId,
customer_id: event.eventData.customerId,
customer_name: await this.getCustomerName(event.eventData.customerId),
total_amount: event.eventData.totalAmount,
status: "CREATED",
created_at: event.timestamp,
items_count: event.eventData.items.length,
});
}
}
Event Store Implementation
The Event Store is the heart of an Event Sourcing system. It must be reliable, performant, and support various querying capabilities.
Event Store Structure
graph TB
subgraph "Event Store Architecture"
subgraph "Storage Layer"
ES[(Event Stream)]
SS[(Snapshot Store)]
IS[(Index Store)]
end
subgraph "API Layer"
WA[Write API]
RA[Read API]
SA[Subscription API]
end
subgraph "Processing Layer"
SP[Snapshot Processor]
IP[Index Processor]
PP[Projection Processor]
end
WA --> ES
RA --> ES
RA --> SS
SA --> ES
ES --> SP
SP --> SS
ES --> IP
IP --> IS
ES --> PP
end
style ES fill:#f39c12,stroke:#d68910,color:#fff
style SS fill:#e67e22,stroke:#d35400,color:#fff
style IS fill:#e67e22,stroke:#d35400,color:#fff
EventStore Implementation
class EventStore {
private db: Database;
private snapshots: SnapshotStore;
private subscriptions: Map<string, EventHandler[]> = new Map();
constructor(database: Database, snapshotStore: SnapshotStore) {
this.db = database;
this.snapshots = snapshotStore;
}
async appendEvents(
streamId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void> {
await this.db.transaction(async trx => {
// Check for concurrency conflicts
const currentVersion = await this.getCurrentVersion(streamId, trx);
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion} but was ${currentVersion}`
);
}
// Prepare events for storage
const eventRecords = events.map((event, index) => ({
stream_id: streamId,
event_id: uuid(),
event_type: event.eventType,
event_version: event.eventVersion,
event_data: JSON.stringify(event.eventData),
event_metadata: JSON.stringify({
userId: event.userId,
correlationId: event.correlationId,
causationId: event.causationId,
}),
stream_version: currentVersion + index + 1,
created_at: event.timestamp || new Date(),
}));
// Insert events
await trx("events").insert(eventRecords);
// Update stream metadata
await trx("streams").upsert({
stream_id: streamId,
version: currentVersion + events.length,
updated_at: new Date(),
});
});
// Publish to subscribers
await this.publishToSubscribers(streamId, events);
// Check if snapshot is needed
await this.checkSnapshot(streamId);
}
async getEvents(
streamId: string,
fromVersion: number = 0,
toVersion?: number
): Promise<DomainEvent[]> {
// Try to load from snapshot first
const snapshot = await this.snapshots.getLatest(streamId);
const startVersion = snapshot ? snapshot.version + 1 : fromVersion;
// Load events
let query = this.db("events")
.where("stream_id", streamId)
.where("stream_version", ">=", startVersion)
.orderBy("stream_version");
if (toVersion) {
query = query.where("stream_version", "<=", toVersion);
}
const records = await query;
// Deserialize events
const events = records.map(record => this.deserializeEvent(record));
// If we have a snapshot, prepend it to events
if (snapshot && snapshot.data) {
events.unshift(snapshot.data);
}
return events;
}
async createSnapshot(streamId: string): Promise<void> {
const events = await this.getEvents(streamId);
const aggregate = await this.rebuildAggregate(streamId, events);
await this.snapshots.save({
streamId,
version: aggregate.version,
data: aggregate.toSnapshot(),
createdAt: new Date(),
});
}
subscribe(eventType: string, handler: EventHandler): void {
if (!this.subscriptions.has(eventType)) {
this.subscriptions.set(eventType, []);
}
this.subscriptions.get(eventType)!.push(handler);
}
private async publishToSubscribers(
streamId: string,
events: DomainEvent[]
): Promise<void> {
for (const event of events) {
const handlers = this.subscriptions.get(event.eventType) || [];
await Promise.all(
handlers.map(handler => handler(event).catch(console.error))
);
}
}
}
Using Apache Kafka as Event Store
class KafkaEventStore implements EventStore {
private producer: KafkaProducer;
private admin: KafkaAdmin;
private consumers: Map<string, KafkaConsumer> = new Map();
constructor(kafkaConfig: KafkaConfig) {
this.producer = new KafkaProducer(kafkaConfig);
this.admin = new KafkaAdmin(kafkaConfig);
}
async appendEvents(
streamId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void> {
// Ensure topic exists
await this.ensureTopic(streamId);
// Prepare messages
const messages = events.map((event, index) => ({
key: streamId,
value: JSON.stringify(event),
headers: {
"event-type": event.eventType,
"event-version": event.eventVersion.toString(),
"stream-version": (expectedVersion + index + 1).toString(),
timestamp: event.timestamp.toISOString(),
},
partition: this.getPartition(streamId),
}));
// Send to Kafka
await this.producer.sendBatch({
topic: this.getTopicName(streamId),
messages,
});
}
async subscribeToStream(
streamId: string,
handler: EventHandler,
options: SubscriptionOptions = {}
): Promise<void> {
const consumer = new KafkaConsumer({
groupId: options.consumerGroup || `${streamId}-consumer`,
...this.kafkaConfig,
});
await consumer.subscribe({
topic: this.getTopicName(streamId),
fromBeginning: options.fromBeginning ?? true,
});
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value.toString());
await handler(event);
},
});
this.consumers.set(streamId, consumer);
}
private getTopicName(streamId: string): string {
return `events.${streamId}`;
}
private getPartition(streamId: string): number {
// Use consistent hashing to determine partition
return hashCode(streamId) % this.partitionCount;
}
}
Projection and Read Model Strategies
Projections are the bridge between the event-sourced write model and the query-optimized read models. They consume events and update read models accordingly.
Projection Mechanism
graph LR
subgraph "Event Flow"
ES[(Event Store)] --> EB[Event Bus]
EB --> PH1[Order Summary Projection]
EB --> PH2[Customer View Projection]
EB --> PH3[Inventory Projection]
EB --> PH4[Financial Report Projection]
end
subgraph "Read Models"
PH1 --> RM1[(Order Summaries)]
PH2 --> RM2[(Customer Views)]
PH3 --> RM3[(Inventory Status)]
PH4 --> RM4[(Financial Reports)]
end
subgraph "Query Services"
QS1[Order Query Service] --> RM1
QS2[Customer Query Service] --> RM2
QS3[Inventory Query Service] --> RM3
QS4[Reporting Service] --> RM4
end
style ES fill:#f39c12,stroke:#d68910,color:#fff
style EB fill:#9b59b6,stroke:#8e44ad,color:#fff
Projection Implementation Strategies
// Base Projection Handler
abstract class ProjectionHandler {
protected readDb: Database;
protected checkpointStore: CheckpointStore;
protected projectionName: string;
constructor(
readDb: Database,
checkpointStore: CheckpointStore,
projectionName: string
) {
this.readDb = readDb;
this.checkpointStore = checkpointStore;
this.projectionName = projectionName;
}
async handleEvent(event: DomainEvent): Promise<void> {
try {
await this.readDb.transaction(async trx => {
// Process the event
await this.processEvent(event, trx);
// Update checkpoint
await this.checkpointStore.updateCheckpoint(
this.projectionName,
event.aggregateId,
event.eventVersion
);
});
} catch (error) {
// Handle idempotency - if event already processed, skip
if (this.isAlreadyProcessed(error)) {
return;
}
throw error;
}
}
abstract processEvent(event: DomainEvent, trx: Transaction): Promise<void>;
protected isAlreadyProcessed(error: Error): boolean {
return error.message.includes("duplicate key");
}
}
// Specialized Projection for Order Summaries
class OrderSummaryProjection extends ProjectionHandler {
async processEvent(event: DomainEvent, trx: Transaction): Promise<void> {
switch (event.eventType) {
case "OrderCreated":
await this.handleOrderCreated(event as OrderCreatedEvent, trx);
break;
case "OrderItemAdded":
await this.handleOrderItemAdded(event as OrderItemAddedEvent, trx);
break;
case "OrderShipped":
await this.handleOrderShipped(event as OrderShippedEvent, trx);
break;
case "OrderDelivered":
await this.handleOrderDelivered(event as OrderDeliveredEvent, trx);
break;
case "OrderCancelled":
await this.handleOrderCancelled(event as OrderCancelledEvent, trx);
break;
}
}
private async handleOrderCreated(
event: OrderCreatedEvent,
trx: Transaction
): Promise<void> {
const customerInfo = await this.getCustomerInfo(event.eventData.customerId);
await trx("order_summaries").insert({
order_id: event.eventData.orderId,
customer_id: event.eventData.customerId,
customer_name: customerInfo.name,
customer_email: customerInfo.email,
total_amount: event.eventData.totalAmount,
currency: event.eventData.currency,
status: "CREATED",
items_count: event.eventData.items.length,
created_at: event.timestamp,
updated_at: event.timestamp,
});
// Also update customer statistics
await trx("customer_statistics")
.where("customer_id", event.eventData.customerId)
.increment("total_orders", 1)
.increment("lifetime_value", event.eventData.totalAmount);
}
}
// Materialized View Projection
class MaterializedViewProjection extends ProjectionHandler {
private viewDefinition: ViewDefinition;
constructor(
readDb: Database,
checkpointStore: CheckpointStore,
viewDefinition: ViewDefinition
) {
super(readDb, checkpointStore, viewDefinition.name);
this.viewDefinition = viewDefinition;
}
async processEvent(event: DomainEvent, trx: Transaction): Promise<void> {
// Check if this event affects the view
if (!this.viewDefinition.eventTypes.includes(event.eventType)) {
return;
}
// Execute the view update query
const updateQuery = this.viewDefinition.generateUpdateQuery(event);
await trx.raw(updateQuery);
// Refresh materialized view if needed
if (this.viewDefinition.refreshStrategy === "immediate") {
await trx.raw(
`REFRESH MATERIALIZED VIEW ${this.viewDefinition.viewName}`
);
}
}
}
// Async Projection with Retry Logic
class ResilientProjection extends ProjectionHandler {
private retryPolicy: RetryPolicy;
async handleEvent(event: DomainEvent): Promise<void> {
await this.retryPolicy.execute(async () => {
await super.handleEvent(event);
});
}
async rebuild(fromEventVersion: number = 0): Promise<void> {
// Get all events from the event store
const eventStream = await this.eventStore.streamEvents({
fromVersion: fromEventVersion,
batchSize: 1000,
});
// Process in batches
for await (const batch of eventStream) {
await this.readDb.transaction(async trx => {
for (const event of batch) {
await this.processEvent(event, trx);
}
});
// Update checkpoint after each batch
const lastEvent = batch[batch.length - 1];
await this.checkpointStore.updateCheckpoint(
this.projectionName,
"rebuild",
lastEvent.eventVersion
);
}
}
}
Read Model Optimization Strategies
// Denormalized Read Model
interface OrderSummaryReadModel {
// Order fields
orderId: string;
orderNumber: string;
orderDate: Date;
status: OrderStatus;
totalAmount: number;
currency: string;
// Denormalized customer data
customerId: string;
customerName: string;
customerEmail: string;
customerPhone: string;
customerTier: "bronze" | "silver" | "gold" | "platinum";
// Denormalized shipping data
shippingAddress: Address;
shippingMethod: string;
estimatedDelivery: Date;
trackingNumber?: string;
// Aggregated data
itemsCount: number;
uniqueProductsCount: number;
totalWeight: number;
// Computed fields
isExpressShipping: boolean;
isPriorityCustomer: boolean;
requiresSpecialHandling: boolean;
}
// Cached Read Model Service
class CachedReadModelService {
private cache: RedisCache;
private db: Database;
private ttl: number = 300; // 5 minutes
async getOrderSummary(orderId: string): Promise<OrderSummaryReadModel> {
// Try cache first
const cached = await this.cache.get<OrderSummaryReadModel>(
`order:${orderId}`
);
if (cached) return cached;
// Load from database
const order = await this.db("order_summaries")
.where("order_id", orderId)
.first();
if (!order) {
throw new NotFoundError(`Order ${orderId} not found`);
}
// Transform to read model
const readModel = this.mapToReadModel(order);
// Cache for future requests
await this.cache.set(`order:${orderId}`, readModel, this.ttl);
return readModel;
}
async searchOrders(
criteria: SearchCriteria
): Promise<OrderSummaryReadModel[]> {
// Build optimized query using indexes
let query = this.db("order_summaries");
if (criteria.customerId) {
query = query.where("customer_id", criteria.customerId);
}
if (criteria.status) {
query = query.where("status", criteria.status);
}
if (criteria.dateFrom) {
query = query.where("order_date", ">=", criteria.dateFrom);
}
if (criteria.dateTo) {
query = query.where("order_date", "<=", criteria.dateTo);
}
// Use covering index for performance
query = query.select([
"order_id",
"order_number",
"customer_name",
"total_amount",
"status",
"order_date",
]);
const results = await query.limit(criteria.limit || 100);
return results.map(this.mapToReadModel);
}
}
Event Versioning and Schema Evolution
As systems evolve, event schemas need to change. Proper versioning strategies ensure backward compatibility and smooth migrations.
Event Versioning Strategies
graph TB
subgraph "Event Evolution"
V1[Event v1] --> U1[Upcaster v1→v2]
U1 --> V2[Event v2]
V2 --> U2[Upcaster v2→v3]
U2 --> V3[Event v3]
subgraph "Handlers"
H1[Handler v1]
H2[Handler v2]
H3[Handler v3]
end
V3 --> H3
V3 --> H2
V3 --> H1
end
style V1 fill:#3498db,stroke:#2980b9,color:#fff
style V2 fill:#3498db,stroke:#2980b9,color:#fff
style V3 fill:#3498db,stroke:#2980b9,color:#fff
Implementing Event Versioning
// Event Version Registry
class EventVersionRegistry {
private versions: Map<string, EventVersionInfo[]> = new Map();
private upcasters: Map<string, EventUpcaster[]> = new Map();
registerEventVersion(
eventType: string,
version: number,
schema: JsonSchema,
upcaster?: EventUpcaster
): void {
if (!this.versions.has(eventType)) {
this.versions.set(eventType, []);
this.upcasters.set(eventType, []);
}
this.versions.get(eventType)!.push({
version,
schema,
registeredAt: new Date(),
});
if (upcaster) {
this.upcasters.get(eventType)!.push(upcaster);
}
}
upcastEvent(event: DomainEvent): DomainEvent {
const upcasters = this.upcasters.get(event.eventType) || [];
let currentEvent = event;
// Apply all upcasters in sequence
for (const upcaster of upcasters) {
if (upcaster.fromVersion === currentEvent.eventVersion) {
currentEvent = upcaster.upcast(currentEvent);
}
}
return currentEvent;
}
validateEvent(event: DomainEvent): ValidationResult {
const versions = this.versions.get(event.eventType);
if (!versions) {
return { valid: false, error: "Unknown event type" };
}
const versionInfo = versions.find(v => v.version === event.eventVersion);
if (!versionInfo) {
return { valid: false, error: "Unknown event version" };
}
// Validate against JSON schema
const validator = new JsonSchemaValidator(versionInfo.schema);
return validator.validate(event.eventData);
}
}
// Event Upcaster Implementation
interface EventUpcaster {
fromVersion: number;
toVersion: number;
upcast(event: DomainEvent): DomainEvent;
}
class OrderCreatedEventUpcasterV1ToV2 implements EventUpcaster {
fromVersion = 1;
toVersion = 2;
upcast(event: DomainEvent): DomainEvent {
const v1Data = event.eventData as OrderCreatedEventV1;
// Transform v1 to v2 format
const v2Data: OrderCreatedEventV2 = {
orderId: v1Data.orderId,
customerId: v1Data.customerId,
items: v1Data.items.map(item => ({
...item,
// Add new field with default value
taxRate: 0.0,
// Calculate new field from existing data
totalPrice: item.quantity * item.price,
})),
// Split address into structured format
shippingAddress: this.parseAddress(v1Data.shippingAddress),
totalAmount: v1Data.totalAmount,
// Add new required field
currency: "USD",
};
return {
...event,
eventVersion: 2,
eventData: v2Data,
};
}
private parseAddress(addressString: string): Address {
// Parse string address into structured format
const parts = addressString.split(", ");
return {
street: parts[0] || "",
city: parts[1] || "",
state: parts[2] || "",
zipCode: parts[3] || "",
country: parts[4] || "USA",
};
}
}
// Backward Compatible Event Handler
class BackwardCompatibleEventHandler {
private registry: EventVersionRegistry;
async handleEvent(event: DomainEvent): Promise<void> {
// Upcast to latest version
const upcastedEvent = this.registry.upcastEvent(event);
// Validate the event
const validation = this.registry.validateEvent(upcastedEvent);
if (!validation.valid) {
throw new ValidationError(validation.error);
}
// Process with version-specific logic
await this.processEventByVersion(upcastedEvent);
}
private async processEventByVersion(event: DomainEvent): Promise<void> {
// Handle different versions if needed
switch (event.eventVersion) {
case 3:
await this.handleV3Event(event);
break;
case 2:
await this.handleV2Event(event);
break;
case 1:
// Should not reach here after upcasting
await this.handleV1Event(event);
break;
default:
throw new Error(`Unsupported event version: ${event.eventVersion}`);
}
}
}
// Schema Evolution Best Practices
class EventSchemaEvolution {
// Adding new fields - backward compatible
static addFieldExample(): void {
// Old event
interface ProductAddedV1 {
productId: string;
quantity: number;
price: number;
}
// New event with optional field
interface ProductAddedV2 {
productId: string;
quantity: number;
price: number;
discount?: number; // Optional for backward compatibility
}
}
// Renaming fields - requires upcaster
static renameFieldExample(): EventUpcaster {
return {
fromVersion: 1,
toVersion: 2,
upcast: event => ({
...event,
eventVersion: 2,
eventData: {
...event.eventData,
customerId: event.eventData.userId, // Rename
userId: undefined, // Remove old field
},
}),
};
}
// Changing field types - requires careful handling
static changeTypeExample(): EventUpcaster {
return {
fromVersion: 1,
toVersion: 2,
upcast: event => ({
...event,
eventVersion: 2,
eventData: {
...event.eventData,
// Convert string to number
amount: parseFloat(event.eventData.amount),
// Convert single value to array
tags: [event.eventData.tag],
},
}),
};
}
}
Practical Examples
Let’s look at complete implementations using different technologies.
Example 1: E-Commerce Order Management with EventStore
// Domain Model
class Order {
private id: string;
private version: number = 0;
private uncommittedEvents: DomainEvent[] = [];
private status: OrderStatus = "DRAFT";
private items: OrderItem[] = [];
private customerId: string;
private totalAmount: number = 0;
static fromEvents(events: DomainEvent[]): Order {
const order = new Order();
events.forEach(event => order.apply(event, false));
return order;
}
create(customerId: string, items: OrderItem[]): void {
if (this.status !== "DRAFT") {
throw new Error("Order already created");
}
this.raiseEvent(
new OrderCreatedEvent({
orderId: this.id,
customerId,
items,
totalAmount: this.calculateTotal(items),
createdAt: new Date(),
})
);
}
ship(trackingNumber: string): void {
if (this.status !== "PAID") {
throw new Error("Order must be paid before shipping");
}
this.raiseEvent(
new OrderShippedEvent({
orderId: this.id,
trackingNumber,
shippedAt: new Date(),
})
);
}
private raiseEvent(event: DomainEvent): void {
this.uncommittedEvents.push(event);
this.apply(event, true);
}
private apply(event: DomainEvent, isNew: boolean): void {
switch (event.eventType) {
case "OrderCreated":
this.applyOrderCreated(event as OrderCreatedEvent);
break;
case "OrderShipped":
this.applyOrderShipped(event as OrderShippedEvent);
break;
}
if (!isNew) {
this.version++;
}
}
private applyOrderCreated(event: OrderCreatedEvent): void {
this.id = event.eventData.orderId;
this.customerId = event.eventData.customerId;
this.items = event.eventData.items;
this.totalAmount = event.eventData.totalAmount;
this.status = "CREATED";
}
private applyOrderShipped(event: OrderShippedEvent): void {
this.status = "SHIPPED";
}
getUncommittedEvents(): DomainEvent[] {
return this.uncommittedEvents;
}
markEventsAsCommitted(): void {
this.uncommittedEvents = [];
}
}
// EventStore Integration
class EventStoreOrderRepository {
private eventStore: EventStore;
constructor(eventStore: EventStore) {
this.eventStore = eventStore;
}
async save(order: Order): Promise<void> {
const events = order.getUncommittedEvents();
if (events.length === 0) return;
await this.eventStore.appendEvents(order.id, events, order.version);
order.markEventsAsCommitted();
}
async getById(orderId: string): Promise<Order> {
const events = await this.eventStore.getEvents(orderId);
if (events.length === 0) {
throw new NotFoundError(`Order ${orderId} not found`);
}
return Order.fromEvents(events);
}
}
// Query Side Implementation
class OrderQueryService {
private readDb: Database;
async getOrderSummary(orderId: string): Promise<OrderSummaryDto> {
const summary = await this.readDb("order_summaries")
.where("order_id", orderId)
.first();
if (!summary) {
throw new NotFoundError(`Order ${orderId} not found`);
}
return {
orderId: summary.order_id,
orderNumber: summary.order_number,
customerName: summary.customer_name,
totalAmount: summary.total_amount,
status: summary.status,
createdAt: summary.created_at,
items: await this.getOrderItems(orderId),
};
}
async getCustomerOrders(
customerId: string,
options: PaginationOptions
): Promise<PagedResult<OrderSummaryDto>> {
const query = this.readDb("order_summaries")
.where("customer_id", customerId)
.orderBy("created_at", "desc")
.limit(options.pageSize)
.offset(options.page * options.pageSize);
const [results, totalCount] = await Promise.all([
query,
this.readDb("order_summaries")
.where("customer_id", customerId)
.count("* as count")
.first(),
]);
return {
items: results.map(this.mapToDto),
totalCount: totalCount.count,
page: options.page,
pageSize: options.pageSize,
};
}
}
Example 2: Financial Transaction System with Kafka
// Event Definitions
interface TransactionEvent extends DomainEvent {
accountId: string;
transactionId: string;
}
class MoneyDepositedEvent implements TransactionEvent {
eventType = "MoneyDeposited";
eventVersion = 1;
accountId: string;
transactionId: string;
eventData: {
amount: number;
currency: string;
source: string;
reference: string;
};
timestamp: Date;
}
class MoneyWithdrawnEvent implements TransactionEvent {
eventType = "MoneyWithdrawn";
eventVersion = 1;
accountId: string;
transactionId: string;
eventData: {
amount: number;
currency: string;
destination: string;
reference: string;
};
timestamp: Date;
}
// Kafka-based Event Store and Projection
class KafkaFinancialEventProcessor {
private kafka: Kafka;
private producer: Producer;
private consumers: Map<string, Consumer> = new Map();
constructor(kafkaConfig: KafkaConfig) {
this.kafka = new Kafka(kafkaConfig);
this.producer = this.kafka.producer();
}
async publishTransactionEvent(event: TransactionEvent): Promise<void> {
await this.producer.send({
topic: "financial-transactions",
messages: [
{
key: event.accountId,
value: JSON.stringify(event),
headers: {
"event-type": event.eventType,
"event-version": event.eventVersion.toString(),
"correlation-id": event.correlationId || uuid(),
},
partition: this.getPartition(event.accountId),
},
],
});
}
async subscribeToProjection(
projectionName: string,
handler: ProjectionHandler
): Promise<void> {
const consumer = this.kafka.consumer({
groupId: `projection-${projectionName}`,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
await consumer.connect();
await consumer.subscribe({
topic: "financial-transactions",
fromBeginning: false,
});
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
try {
// Process event in projection
await handler.handleEvent(event);
// Commit offset only after successful processing
await consumer.commitOffsets([
{
topic,
partition,
offset: (parseInt(message.offset) + 1).toString(),
},
]);
} catch (error) {
console.error(`Projection error: ${error.message}`);
// Implement retry logic or dead letter queue
}
},
});
this.consumers.set(projectionName, consumer);
}
private getPartition(accountId: string): number {
// Consistent hashing for partition assignment
const hash = crypto.createHash("sha256").update(accountId).digest();
return hash.readUInt32BE(0) % 10; // Assuming 10 partitions
}
}
// Account Balance Projection
class AccountBalanceProjection extends ProjectionHandler {
async processEvent(event: DomainEvent, trx: Transaction): Promise<void> {
const txEvent = event as TransactionEvent;
switch (event.eventType) {
case "MoneyDeposited":
await this.handleDeposit(txEvent as MoneyDepositedEvent, trx);
break;
case "MoneyWithdrawn":
await this.handleWithdrawal(txEvent as MoneyWithdrawnEvent, trx);
break;
}
}
private async handleDeposit(
event: MoneyDepositedEvent,
trx: Transaction
): Promise<void> {
// Update account balance
await trx.raw(
`
INSERT INTO account_balances (account_id, balance, currency, last_updated)
VALUES (?, ?, ?, ?)
ON CONFLICT (account_id) DO UPDATE
SET balance = account_balances.balance + EXCLUDED.balance,
last_updated = EXCLUDED.last_updated
`,
[
event.accountId,
event.eventData.amount,
event.eventData.currency,
event.timestamp,
]
);
// Record transaction
await trx("transactions").insert({
transaction_id: event.transactionId,
account_id: event.accountId,
type: "DEPOSIT",
amount: event.eventData.amount,
currency: event.eventData.currency,
reference: event.eventData.reference,
created_at: event.timestamp,
});
}
}
Example 3: Complete CQRS + Event Sourcing Architecture
graph TB
subgraph "API Layer"
REST[REST API]
GQL[GraphQL API]
WS[WebSocket API]
end
subgraph "Application Layer"
CMD[Command Bus]
QRY[Query Bus]
SAGA[Saga Orchestrator]
end
subgraph "Domain Layer"
AGG[Aggregates]
DOM[Domain Services]
EVT[Domain Events]
end
subgraph "Infrastructure Layer"
subgraph "Write Side"
ES[(Event Store)]
SN[(Snapshot Store)]
end
subgraph "Read Side"
PG[(PostgreSQL)]
REDIS[(Redis Cache)]
ELASTIC[(Elasticsearch)]
end
KAFKA[Apache Kafka]
end
subgraph "Projection Layer"
PP[Projection Processor]
PM[Projection Manager]
end
REST --> CMD
REST --> QRY
GQL --> CMD
GQL --> QRY
WS --> EVT
CMD --> AGG
AGG --> ES
AGG --> EVT
EVT --> KAFKA
KAFKA --> PP
PP --> PG
PP --> REDIS
PP --> ELASTIC
QRY --> PG
QRY --> REDIS
QRY --> ELASTIC
ES --> SN
style ES fill:#f39c12,stroke:#d68910,color:#fff
style KAFKA fill:#9b59b6,stroke:#8e44ad,color:#fff
style CMD fill:#e74c3c,stroke:#c0392b,color:#fff
style QRY fill:#3498db,stroke:#2980b9,color:#fff
Best Practices and Common Pitfalls
Best Practices
-
Event Design
- Make events immutable
- Include all necessary information
- Use past tense for event names
- Version events from the start
- Keep events small and focused
-
Projection Design
- Make projections idempotent
- Design for eventual consistency
- Use appropriate data stores for each projection
- Monitor projection lag
- Implement catch-up mechanisms
-
Performance Optimization
- Use snapshots for long event streams
- Implement proper indexing strategies
- Cache frequently accessed read models
- Partition event streams by aggregate
- Use bulk operations for projections
-
Error Handling
- Implement retry mechanisms
- Use dead letter queues
- Log all errors with context
- Monitor system health
- Plan for disaster recovery
Common Pitfalls
-
Over-Engineering
- Not every system needs CQRS + Event Sourcing
- Start simple and evolve
- Consider complexity vs benefits
-
Event Granularity
- Too fine-grained events create noise
- Too coarse-grained events lose information
- Find the right balance
-
Projection Complexity
- Avoid complex business logic in projections
- Keep projections simple and focused
- Don’t try to maintain consistency across projections
-
Storage Costs
- Event stores can grow large
- Plan for data retention policies
- Consider archiving old events
-
Debugging Challenges
- Distributed systems are hard to debug
- Implement comprehensive logging
- Use correlation IDs
- Build debugging tools
Conclusion
CQRS and Event Sourcing are powerful patterns that, when combined, provide a robust foundation for building scalable, maintainable, and auditable distributed systems. While they introduce complexity, the benefits often outweigh the costs for systems that require:
- Complete audit trails
- Temporal queries
- High scalability
- Complex business logic
- Multiple read models
- Event-driven integration
The key to success is understanding when and how to apply these patterns, starting simple, and evolving the architecture as requirements grow. With proper implementation, monitoring, and tooling, CQRS and Event Sourcing can transform how we build modern distributed systems.
Remember that these patterns are not silver bullets - they’re tools in your architectural toolkit. Use them wisely, and they’ll serve you well in building the next generation of scalable, event-driven applications.
This guide covered the fundamentals and advanced concepts of CQRS and Event Sourcing. For hands-on implementation, check out the example repositories and consider starting with a small proof of concept before applying these patterns to production systems.