What is CQRS
CQRS (Command Query Responsibility Segregation) is an architectural pattern that separates data reading (Query) from writing (Command).
Traditional CRUD Architecture:
flowchart TB
subgraph Traditional["Traditional CRUD"]
C1["Client"]
C1 -->|"CRUD Operations"| Model["Single Data Model<br/>(Read + Write)"]
Model --> DB1["Database"]
end
CQRS Architecture:
flowchart TB
subgraph CQRS["CQRS Architecture"]
C2["Client"] -->|"Command"| CM["Command Model<br/>(Write)"]
C3["Client"] -->|"Query"| QM["Query Model<br/>(Read)"]
CM --> WS["Write Store<br/>(Normalized)"]
QM --> RS["Read Store<br/>(Denormalized)"]
WS -->|"Sync"| RS
end
When to Adopt CQRS
| Use Case | Description |
|---|---|
| Asymmetric read/write load | Reads far exceed writes (SNS, EC sites) |
| Complex domain logic | Apply complex business rules on writes |
| Different optimizations needed | Transactions for writes, caching for reads |
| Multiple read views | Display same data in different formats |
| Audit/history requirements | Need to maintain complete change history |
Basic CQRS Implementation
Command Side (Write)
// commands/types.ts
export interface Command {
type: string;
payload: unknown;
metadata: {
userId: string;
timestamp: Date;
correlationId: string;
};
}
export interface CreateOrderCommand extends Command {
type: 'CreateOrder';
payload: {
customerId: string;
items: Array<{
productId: string;
quantity: number;
unitPrice: number;
}>;
shippingAddress: Address;
};
}
// commands/handlers/order-command-handler.ts
@Injectable()
@CommandHandler(CreateOrderCommand)
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
constructor(
private readonly orderRepository: OrderRepository,
private readonly eventPublisher: EventPublisher,
) {}
async execute(command: CreateOrderCommand): Promise<string> {
// Execute domain logic
const order = Order.create({
customerId: command.payload.customerId,
items: command.payload.items,
shippingAddress: command.payload.shippingAddress,
});
// Validation
order.validate();
// Persist
await this.orderRepository.save(order);
// Publish events
await this.eventPublisher.publish(order.getUncommittedEvents());
return order.id;
}
}
Query Side (Read)
// queries/types.ts
export interface Query {
type: string;
}
export interface GetOrderByIdQuery extends Query {
type: 'GetOrderById';
orderId: string;
}
export interface GetOrdersByCustomerQuery extends Query {
type: 'GetOrdersByCustomer';
customerId: string;
pagination: {
page: number;
limit: number;
};
}
// queries/handlers/order-query-handler.ts
// Read-only DTO
interface OrderDetailDto {
id: string;
customerName: string;
items: Array<{
productName: string;
quantity: number;
totalPrice: number;
}>;
status: string;
totalAmount: number;
createdAt: Date;
}
@Injectable()
@QueryHandler(GetOrderByIdQuery)
export class GetOrderByIdHandler implements IQueryHandler<GetOrderByIdQuery> {
constructor(private readonly readRepository: OrderReadRepository) {}
async execute(query: GetOrderByIdQuery): Promise<OrderDetailDto | null> {
// Get directly from read-optimized store
return this.readRepository.findById(query.orderId);
}
}
Event Sourcing
Event Sourcing is a pattern that records state changes as “events” and derives current state from the accumulation of events.
Traditional State Storage:
| Order #123 | Value |
|---|---|
| status | ”shipped” ← Only latest state stored |
| total | 15000 |
Event Sourcing:
flowchart TB
subgraph ES["Event Stream for Order #123"]
E1["[1] OrderCreated<br/>items: [...], total: 10000"]
E2["[2] ItemAdded<br/>productId: P456, qty: 2"]
E3["[3] OrderConfirmed<br/>confirmedAt: ..."]
E4["[4] PaymentReceived<br/>amount: 15000"]
E5["[5] OrderShipped<br/>trackingNo: TRK123"]
E1 --> E2 --> E3 --> E4 --> E5
end
E5 --> State["Current state = Apply events [1]-[5] in order"]
Domain Event Definition
// domain/events/order-events.ts
export interface DomainEvent {
eventId: string;
eventType: string;
aggregateId: string;
aggregateType: string;
version: number;
timestamp: Date;
payload: unknown;
metadata: {
userId?: string;
correlationId?: string;
causationId?: string;
};
}
export interface OrderCreatedEvent extends DomainEvent {
eventType: 'OrderCreated';
payload: {
customerId: string;
items: OrderItem[];
shippingAddress: Address;
totalAmount: number;
};
}
export interface OrderShippedEvent extends DomainEvent {
eventType: 'OrderShipped';
payload: {
shippedAt: Date;
trackingNumber: string;
carrier: string;
};
}
Event Store Implementation
// infrastructure/event-store.ts
@Injectable()
export class EventStore {
constructor(private readonly pool: Pool) {}
async appendEvents(
streamId: string,
events: DomainEvent[],
expectedVersion: number,
): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Optimistic locking: version check
const versionCheck = await client.query(
`SELECT MAX(version) as current_version
FROM events
WHERE stream_id = $1`,
[streamId],
);
const currentVersion = versionCheck.rows[0]?.current_version ?? 0;
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but found ${currentVersion}`,
);
}
// Append events
for (const event of events) {
await client.query(
`INSERT INTO events (
event_id, stream_id, event_type, version,
payload, metadata, timestamp
) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
event.eventId,
streamId,
event.eventType,
event.version,
JSON.stringify(event.payload),
JSON.stringify(event.metadata),
event.timestamp,
],
);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
Read Model Synchronization (Projections)
flowchart LR
subgraph EventStore["Event Store"]
E1["Event 1"]
E2["Event 2"]
E3["Event 3"]
E4["..."]
end
EventStore --> Proj["Projector<br/>(Event Handler)"]
Proj --> RD1["Read DB 1<br/>(Order List)"]
Proj --> RD2["Read DB 2<br/>(Sales Agg)"]
Features:
- Async processing (eventual consistency)
- Can generate multiple read models
- Rebuildable (replay events)
// projections/order-list-projection.ts
@Injectable()
export class OrderListProjection {
constructor(private readonly readRepository: OrderReadRepository) {}
@OnEvent('OrderCreated')
async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
await this.readRepository.insert({
id: event.aggregateId,
customerId: event.payload.customerId,
customerName: await this.getCustomerName(event.payload.customerId),
status: 'pending',
totalAmount: event.payload.totalAmount,
itemCount: event.payload.items.length,
createdAt: event.timestamp,
updatedAt: event.timestamp,
});
}
@OnEvent('OrderShipped')
async handleOrderShipped(event: OrderShippedEvent): Promise<void> {
await this.readRepository.update(event.aggregateId, {
status: 'shipped',
shippedAt: event.payload.shippedAt,
trackingNumber: event.payload.trackingNumber,
updatedAt: event.timestamp,
});
}
}
Saga Pattern
A pattern for managing transactions that span multiple aggregates.
// sagas/order-saga.ts
@Injectable()
export class OrderSaga {
@Saga()
orderCreated = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(OrderCreatedEvent),
map((event) => {
// Issue inventory reservation command
return new ReserveInventoryCommand({
orderId: event.aggregateId,
items: event.payload.items,
});
}),
);
};
@Saga()
paymentFailed = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(PaymentFailedEvent),
map((event) => {
// Compensating transaction: release inventory
return new ReleaseInventoryCommand({
orderId: event.payload.orderId,
});
}),
);
};
}
Implementation Considerations
| Challenge | Solution |
|---|---|
| Eventual consistency | Appropriate UI feedback, optimistic updates |
| Event schema evolution | Event versioning, upcasting |
| Projection rebuilding | Snapshots, parallel processing |
| Debugging complexity | Correlation IDs, detailed logging, event visualization tools |
| Initial adoption cost | Gradual introduction, start with critical domains |