CQRS & Event Sourcing Introduction - Scalable Architecture Design

2025.12.02

What is CQRS

CQRS (Command Query Responsibility Segregation) is an architectural pattern that separates data reading (Query) from writing (Command).

Traditional CRUD Architecture:

flowchart TB
    subgraph Traditional["Traditional CRUD"]
        C1["Client"]
        C1 -->|"CRUD Operations"| Model["Single Data Model<br/>(Read + Write)"]
        Model --> DB1["Database"]
    end

CQRS Architecture:

flowchart TB
    subgraph CQRS["CQRS Architecture"]
        C2["Client"] -->|"Command"| CM["Command Model<br/>(Write)"]
        C3["Client"] -->|"Query"| QM["Query Model<br/>(Read)"]
        CM --> WS["Write Store<br/>(Normalized)"]
        QM --> RS["Read Store<br/>(Denormalized)"]
        WS -->|"Sync"| RS
    end

When to Adopt CQRS

Use CaseDescription
Asymmetric read/write loadReads far exceed writes (SNS, EC sites)
Complex domain logicApply complex business rules on writes
Different optimizations neededTransactions for writes, caching for reads
Multiple read viewsDisplay same data in different formats
Audit/history requirementsNeed to maintain complete change history

Basic CQRS Implementation

Command Side (Write)

// commands/types.ts
export interface Command {
  type: string;
  payload: unknown;
  metadata: {
    userId: string;
    timestamp: Date;
    correlationId: string;
  };
}

export interface CreateOrderCommand extends Command {
  type: 'CreateOrder';
  payload: {
    customerId: string;
    items: Array<{
      productId: string;
      quantity: number;
      unitPrice: number;
    }>;
    shippingAddress: Address;
  };
}
// commands/handlers/order-command-handler.ts
@Injectable()
@CommandHandler(CreateOrderCommand)
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
  constructor(
    private readonly orderRepository: OrderRepository,
    private readonly eventPublisher: EventPublisher,
  ) {}

  async execute(command: CreateOrderCommand): Promise<string> {
    // Execute domain logic
    const order = Order.create({
      customerId: command.payload.customerId,
      items: command.payload.items,
      shippingAddress: command.payload.shippingAddress,
    });

    // Validation
    order.validate();

    // Persist
    await this.orderRepository.save(order);

    // Publish events
    await this.eventPublisher.publish(order.getUncommittedEvents());

    return order.id;
  }
}

Query Side (Read)

// queries/types.ts
export interface Query {
  type: string;
}

export interface GetOrderByIdQuery extends Query {
  type: 'GetOrderById';
  orderId: string;
}

export interface GetOrdersByCustomerQuery extends Query {
  type: 'GetOrdersByCustomer';
  customerId: string;
  pagination: {
    page: number;
    limit: number;
  };
}
// queries/handlers/order-query-handler.ts
// Read-only DTO
interface OrderDetailDto {
  id: string;
  customerName: string;
  items: Array<{
    productName: string;
    quantity: number;
    totalPrice: number;
  }>;
  status: string;
  totalAmount: number;
  createdAt: Date;
}

@Injectable()
@QueryHandler(GetOrderByIdQuery)
export class GetOrderByIdHandler implements IQueryHandler<GetOrderByIdQuery> {
  constructor(private readonly readRepository: OrderReadRepository) {}

  async execute(query: GetOrderByIdQuery): Promise<OrderDetailDto | null> {
    // Get directly from read-optimized store
    return this.readRepository.findById(query.orderId);
  }
}

Event Sourcing

Event Sourcing is a pattern that records state changes as “events” and derives current state from the accumulation of events.

Traditional State Storage:

Order #123Value
status”shipped” ← Only latest state stored
total15000

Event Sourcing:

flowchart TB
    subgraph ES["Event Stream for Order #123"]
        E1["[1] OrderCreated<br/>items: [...], total: 10000"]
        E2["[2] ItemAdded<br/>productId: P456, qty: 2"]
        E3["[3] OrderConfirmed<br/>confirmedAt: ..."]
        E4["[4] PaymentReceived<br/>amount: 15000"]
        E5["[5] OrderShipped<br/>trackingNo: TRK123"]
        E1 --> E2 --> E3 --> E4 --> E5
    end
    E5 --> State["Current state = Apply events [1]-[5] in order"]

Domain Event Definition

// domain/events/order-events.ts
export interface DomainEvent {
  eventId: string;
  eventType: string;
  aggregateId: string;
  aggregateType: string;
  version: number;
  timestamp: Date;
  payload: unknown;
  metadata: {
    userId?: string;
    correlationId?: string;
    causationId?: string;
  };
}

export interface OrderCreatedEvent extends DomainEvent {
  eventType: 'OrderCreated';
  payload: {
    customerId: string;
    items: OrderItem[];
    shippingAddress: Address;
    totalAmount: number;
  };
}

export interface OrderShippedEvent extends DomainEvent {
  eventType: 'OrderShipped';
  payload: {
    shippedAt: Date;
    trackingNumber: string;
    carrier: string;
  };
}

Event Store Implementation

// infrastructure/event-store.ts
@Injectable()
export class EventStore {
  constructor(private readonly pool: Pool) {}

  async appendEvents(
    streamId: string,
    events: DomainEvent[],
    expectedVersion: number,
  ): Promise<void> {
    const client = await this.pool.connect();

    try {
      await client.query('BEGIN');

      // Optimistic locking: version check
      const versionCheck = await client.query(
        `SELECT MAX(version) as current_version
         FROM events
         WHERE stream_id = $1`,
        [streamId],
      );

      const currentVersion = versionCheck.rows[0]?.current_version ?? 0;

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but found ${currentVersion}`,
        );
      }

      // Append events
      for (const event of events) {
        await client.query(
          `INSERT INTO events (
            event_id, stream_id, event_type, version,
            payload, metadata, timestamp
          ) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
          [
            event.eventId,
            streamId,
            event.eventType,
            event.version,
            JSON.stringify(event.payload),
            JSON.stringify(event.metadata),
            event.timestamp,
          ],
        );
      }

      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }
}

Read Model Synchronization (Projections)

flowchart LR
    subgraph EventStore["Event Store"]
        E1["Event 1"]
        E2["Event 2"]
        E3["Event 3"]
        E4["..."]
    end

    EventStore --> Proj["Projector<br/>(Event Handler)"]

    Proj --> RD1["Read DB 1<br/>(Order List)"]
    Proj --> RD2["Read DB 2<br/>(Sales Agg)"]

Features:

  • Async processing (eventual consistency)
  • Can generate multiple read models
  • Rebuildable (replay events)
// projections/order-list-projection.ts
@Injectable()
export class OrderListProjection {
  constructor(private readonly readRepository: OrderReadRepository) {}

  @OnEvent('OrderCreated')
  async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
    await this.readRepository.insert({
      id: event.aggregateId,
      customerId: event.payload.customerId,
      customerName: await this.getCustomerName(event.payload.customerId),
      status: 'pending',
      totalAmount: event.payload.totalAmount,
      itemCount: event.payload.items.length,
      createdAt: event.timestamp,
      updatedAt: event.timestamp,
    });
  }

  @OnEvent('OrderShipped')
  async handleOrderShipped(event: OrderShippedEvent): Promise<void> {
    await this.readRepository.update(event.aggregateId, {
      status: 'shipped',
      shippedAt: event.payload.shippedAt,
      trackingNumber: event.payload.trackingNumber,
      updatedAt: event.timestamp,
    });
  }
}

Saga Pattern

A pattern for managing transactions that span multiple aggregates.

// sagas/order-saga.ts
@Injectable()
export class OrderSaga {
  @Saga()
  orderCreated = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(OrderCreatedEvent),
      map((event) => {
        // Issue inventory reservation command
        return new ReserveInventoryCommand({
          orderId: event.aggregateId,
          items: event.payload.items,
        });
      }),
    );
  };

  @Saga()
  paymentFailed = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(PaymentFailedEvent),
      map((event) => {
        // Compensating transaction: release inventory
        return new ReleaseInventoryCommand({
          orderId: event.payload.orderId,
        });
      }),
    );
  };
}

Implementation Considerations

ChallengeSolution
Eventual consistencyAppropriate UI feedback, optimistic updates
Event schema evolutionEvent versioning, upcasting
Projection rebuildingSnapshots, parallel processing
Debugging complexityCorrelation IDs, detailed logging, event visualization tools
Initial adoption costGradual introduction, start with critical domains
← Back to list