O que é CQRS
CQRS (Command Query Responsibility Segregation) é um padrão de arquitetura que separa leitura (Query) e escrita (Command) de dados.
Arquitetura CRUD Tradicional
flowchart TB
subgraph CRUD["Arquitetura CRUD Tradicional"]
C1["Client"]
C1 -->|Operacoes CRUD| DM["Modelo de Dados Unico<br/>(Read + Write)<br/>← Mesmo modelo para leitura e escrita"]
DM --> DB1["Database"]
end
Arquitetura CQRS
flowchart TB
subgraph CQRS["Arquitetura CQRS"]
C2["Client"]
C3["Client"]
C2 -->|Command| CM["Command Model<br/>(Write)"]
C3 -->|Query| QM["Query Model<br/>(Read)"]
CM --> WS["Write Store<br/>(Normalizado)"]
QM --> RS["Read Store<br/>(Desnormalizado)"]
WS -->|Sincronizacao| RS
end
Quando Adotar CQRS
| Caso de Uso | Descricao |
|---|---|
| Carga assimetrica de leitura/escrita | Leitura muito maior que escrita (redes sociais, e-commerce) |
| Logica de dominio complexa | Aplicar regras de negocio complexas na escrita |
| Otimizacoes diferentes necessarias | Transacoes para escrita, cache para leitura |
| Multiplas views de leitura | Exibir os mesmos dados em formatos diferentes |
| Requisitos de auditoria/historico | Necessidade de manter historico de todas as alteracoes |
Implementação Básica de CQRS
Lado do Command (Escrita)
// commands/types.ts
export interface Command {
type: string;
payload: unknown;
metadata: {
userId: string;
timestamp: Date;
correlationId: string;
};
}
export interface CreateOrderCommand extends Command {
type: 'CreateOrder';
payload: {
customerId: string;
items: Array<{
productId: string;
quantity: number;
unitPrice: number;
}>;
shippingAddress: Address;
};
}
export interface CancelOrderCommand extends Command {
type: 'CancelOrder';
payload: {
orderId: string;
reason: string;
};
}
// commands/handlers/order-command-handler.ts
import { Injectable } from '@nestjs/common';
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { CreateOrderCommand, CancelOrderCommand } from '../types';
import { OrderRepository } from '../../domain/repositories/order-repository';
import { Order } from '../../domain/entities/order';
import { EventPublisher } from '../../infrastructure/event-publisher';
@Injectable()
@CommandHandler(CreateOrderCommand)
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
constructor(
private readonly orderRepository: OrderRepository,
private readonly eventPublisher: EventPublisher,
) {}
async execute(command: CreateOrderCommand): Promise<string> {
// Execucao da logica de dominio
const order = Order.create({
customerId: command.payload.customerId,
items: command.payload.items,
shippingAddress: command.payload.shippingAddress,
});
// Validacao
order.validate();
// Persistencia
await this.orderRepository.save(order);
// Publicacao de eventos
await this.eventPublisher.publish(order.getUncommittedEvents());
return order.id;
}
}
@Injectable()
@CommandHandler(CancelOrderCommand)
export class CancelOrderHandler implements ICommandHandler<CancelOrderCommand> {
constructor(
private readonly orderRepository: OrderRepository,
private readonly eventPublisher: EventPublisher,
) {}
async execute(command: CancelOrderCommand): Promise<void> {
const order = await this.orderRepository.findById(command.payload.orderId);
if (!order) {
throw new OrderNotFoundError(command.payload.orderId);
}
// Logica de dominio: verificar se pode cancelar
order.cancel(command.payload.reason);
await this.orderRepository.save(order);
await this.eventPublisher.publish(order.getUncommittedEvents());
}
}
Lado do Query (Leitura)
// queries/types.ts
export interface Query {
type: string;
}
export interface GetOrderByIdQuery extends Query {
type: 'GetOrderById';
orderId: string;
}
export interface GetOrdersByCustomerQuery extends Query {
type: 'GetOrdersByCustomer';
customerId: string;
pagination: {
page: number;
limit: number;
};
filters?: {
status?: OrderStatus[];
dateRange?: { from: Date; to: Date };
};
}
// queries/handlers/order-query-handler.ts
import { Injectable } from '@nestjs/common';
import { QueryHandler, IQueryHandler } from '@nestjs/cqrs';
import { GetOrderByIdQuery, GetOrdersByCustomerQuery } from '../types';
import { OrderReadRepository } from '../../infrastructure/read-repositories';
// DTO somente leitura
interface OrderDetailDto {
id: string;
customerName: string;
items: Array<{
productName: string;
quantity: number;
totalPrice: number;
}>;
status: string;
totalAmount: number;
createdAt: Date;
}
@Injectable()
@QueryHandler(GetOrderByIdQuery)
export class GetOrderByIdHandler implements IQueryHandler<GetOrderByIdQuery> {
constructor(private readonly readRepository: OrderReadRepository) {}
async execute(query: GetOrderByIdQuery): Promise<OrderDetailDto | null> {
// Obter diretamente do store otimizado para leitura
return this.readRepository.findById(query.orderId);
}
}
@Injectable()
@QueryHandler(GetOrdersByCustomerQuery)
export class GetOrdersByCustomerHandler
implements IQueryHandler<GetOrdersByCustomerQuery>
{
constructor(private readonly readRepository: OrderReadRepository) {}
async execute(query: GetOrdersByCustomerQuery): Promise<{
orders: OrderDetailDto[];
total: number;
hasMore: boolean;
}> {
return this.readRepository.findByCustomer(
query.customerId,
query.pagination,
query.filters,
);
}
}
Event Sourcing
Event Sourcing é um padrão que registra mudanças de estado como “eventos” e deriva o estado atual a partir do acúmulo de eventos.
Armazenamento de Estado Tradicional:
flowchart LR
subgraph Traditional["Armazenamento de Estado Tradicional"]
O["Order #123<br/>status: 'shipped' ← Apenas estado mais recente salvo<br/>total: 15000"]
end
Event Sourcing:
flowchart TB
subgraph ES["Event Stream para Order #123"]
E1["[1] OrderCreated<br/>items: [...], total: 10000"]
E2["[2] ItemAdded<br/>productId: 'P456', qty: 2"]
E3["[3] OrderConfirmed<br/>confirmedAt: '...'"]
E4["[4] PaymentReceived<br/>amount: 15000"]
E5["[5] OrderShipped<br/>trackingNo: 'TRK123'"]
E1 --> E2 --> E3 --> E4 --> E5
end
E5 --> State["Estado atual = Aplicar eventos [1]~[5] em ordem"]
Definição de Eventos de Domínio
// domain/events/order-events.ts
export interface DomainEvent {
eventId: string;
eventType: string;
aggregateId: string;
aggregateType: string;
version: number;
timestamp: Date;
payload: unknown;
metadata: {
userId?: string;
correlationId?: string;
causationId?: string;
};
}
export interface OrderCreatedEvent extends DomainEvent {
eventType: 'OrderCreated';
payload: {
customerId: string;
items: OrderItem[];
shippingAddress: Address;
totalAmount: number;
};
}
export interface OrderItemAddedEvent extends DomainEvent {
eventType: 'OrderItemAdded';
payload: {
item: OrderItem;
newTotal: number;
};
}
export interface OrderShippedEvent extends DomainEvent {
eventType: 'OrderShipped';
payload: {
shippedAt: Date;
trackingNumber: string;
carrier: string;
};
}
export interface OrderCancelledEvent extends DomainEvent {
eventType: 'OrderCancelled';
payload: {
reason: string;
cancelledAt: Date;
refundAmount: number;
};
}
Agregado com Suporte a Event Sourcing
// domain/aggregates/order.ts
import { AggregateRoot } from '../base/aggregate-root';
import {
OrderCreatedEvent,
OrderItemAddedEvent,
OrderShippedEvent,
OrderCancelledEvent,
} from '../events/order-events';
export class Order extends AggregateRoot {
private _customerId: string;
private _items: OrderItem[] = [];
private _status: OrderStatus = 'draft';
private _totalAmount: number = 0;
private _shippingAddress: Address;
// Restaurar estado a partir de eventos
static fromEvents(events: DomainEvent[]): Order {
const order = new Order();
events.forEach((event) => order.apply(event, false));
return order;
}
// Command: criar pedido
static create(params: CreateOrderParams): Order {
const order = new Order();
const event: OrderCreatedEvent = {
eventId: generateId(),
eventType: 'OrderCreated',
aggregateId: generateId(),
aggregateType: 'Order',
version: 1,
timestamp: new Date(),
payload: {
customerId: params.customerId,
items: params.items,
shippingAddress: params.shippingAddress,
totalAmount: calculateTotal(params.items),
},
metadata: {},
};
order.apply(event, true);
return order;
}
// Command: adicionar item
addItem(item: OrderItem): void {
if (this._status !== 'draft' && this._status !== 'pending') {
throw new InvalidOperationError('Cannot add items to this order');
}
const event: OrderItemAddedEvent = {
eventId: generateId(),
eventType: 'OrderItemAdded',
aggregateId: this.id,
aggregateType: 'Order',
version: this.version + 1,
timestamp: new Date(),
payload: {
item,
newTotal: this._totalAmount + item.quantity * item.unitPrice,
},
metadata: {},
};
this.apply(event, true);
}
// Command: enviar
ship(trackingNumber: string, carrier: string): void {
if (this._status !== 'paid') {
throw new InvalidOperationError('Order must be paid before shipping');
}
const event: OrderShippedEvent = {
eventId: generateId(),
eventType: 'OrderShipped',
aggregateId: this.id,
aggregateType: 'Order',
version: this.version + 1,
timestamp: new Date(),
payload: {
shippedAt: new Date(),
trackingNumber,
carrier,
},
metadata: {},
};
this.apply(event, true);
}
// Event handler: atualizar estado
protected when(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderCreated': {
const e = event as OrderCreatedEvent;
this._id = e.aggregateId;
this._customerId = e.payload.customerId;
this._items = e.payload.items;
this._shippingAddress = e.payload.shippingAddress;
this._totalAmount = e.payload.totalAmount;
this._status = 'pending';
break;
}
case 'OrderItemAdded': {
const e = event as OrderItemAddedEvent;
this._items.push(e.payload.item);
this._totalAmount = e.payload.newTotal;
break;
}
case 'OrderShipped': {
this._status = 'shipped';
break;
}
case 'OrderCancelled': {
this._status = 'cancelled';
break;
}
}
}
}
Implementação do Event Store
// infrastructure/event-store.ts
import { Injectable } from '@nestjs/common';
import { Pool } from 'pg';
@Injectable()
export class EventStore {
constructor(private readonly pool: Pool) {}
async appendEvents(
streamId: string,
events: DomainEvent[],
expectedVersion: number,
): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Lock otimista: verificacao de versao
const versionCheck = await client.query(
`SELECT MAX(version) as current_version
FROM events
WHERE stream_id = $1`,
[streamId],
);
const currentVersion = versionCheck.rows[0]?.current_version ?? 0;
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but found ${currentVersion}`,
);
}
// Adicionar eventos
for (const event of events) {
await client.query(
`INSERT INTO events (
event_id, stream_id, event_type, version,
payload, metadata, timestamp
) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
event.eventId,
streamId,
event.eventType,
event.version,
JSON.stringify(event.payload),
JSON.stringify(event.metadata),
event.timestamp,
],
);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async getEvents(
streamId: string,
fromVersion: number = 0,
): Promise<DomainEvent[]> {
const result = await this.pool.query(
`SELECT * FROM events
WHERE stream_id = $1 AND version > $2
ORDER BY version ASC`,
[streamId, fromVersion],
);
return result.rows.map(this.mapToEvent);
}
}
Sincronização do Modelo de Leitura (Projection)
flowchart TB
subgraph EventStore["Event Store"]
E1["Event 1"]
E2["Event 2"]
E3["Event 3"]
E4["..."]
end
P["Projector<br/>(Event Handler)"]
subgraph ReadDBs["Read Databases"]
RD1["Read DB 1<br/>(Lista de Pedidos)"]
RD2["Read DB 2<br/>(Agregacao de Vendas)"]
end
EventStore --> P
P --> RD1
P --> RD2
Características:
- Processamento assíncrono (consistência eventual)
- Possível gerar múltiplos modelos de leitura
- Reconstruível (replay de eventos)
// projections/order-list-projection.ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { OrderReadRepository } from '../infrastructure/read-repositories';
@Injectable()
export class OrderListProjection {
constructor(private readonly readRepository: OrderReadRepository) {}
@OnEvent('OrderCreated')
async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
await this.readRepository.insert({
id: event.aggregateId,
customerId: event.payload.customerId,
customerName: await this.getCustomerName(event.payload.customerId),
status: 'pending',
totalAmount: event.payload.totalAmount,
itemCount: event.payload.items.length,
createdAt: event.timestamp,
updatedAt: event.timestamp,
});
}
@OnEvent('OrderShipped')
async handleOrderShipped(event: OrderShippedEvent): Promise<void> {
await this.readRepository.update(event.aggregateId, {
status: 'shipped',
shippedAt: event.payload.shippedAt,
trackingNumber: event.payload.trackingNumber,
updatedAt: event.timestamp,
});
}
@OnEvent('OrderCancelled')
async handleOrderCancelled(event: OrderCancelledEvent): Promise<void> {
await this.readRepository.update(event.aggregateId, {
status: 'cancelled',
cancelledAt: event.payload.cancelledAt,
cancellationReason: event.payload.reason,
updatedAt: event.timestamp,
});
}
}
Padrão Saga
Um padrão para gerenciar transações que abrangem múltiplos agregados.
// sagas/order-saga.ts
import { Injectable } from '@nestjs/common';
import { Saga, ICommand, ofType } from '@nestjs/cqrs';
import { Observable } from 'rxjs';
import { map, filter } from 'rxjs/operators';
@Injectable()
export class OrderSaga {
@Saga()
orderCreated = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(OrderCreatedEvent),
map((event) => {
// Emitir comando de reserva de estoque
return new ReserveInventoryCommand({
orderId: event.aggregateId,
items: event.payload.items,
});
}),
);
};
@Saga()
inventoryReserved = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(InventoryReservedEvent),
map((event) => {
// Emitir comando de processamento de pagamento
return new ProcessPaymentCommand({
orderId: event.payload.orderId,
amount: event.payload.totalAmount,
});
}),
);
};
@Saga()
paymentFailed = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(PaymentFailedEvent),
map((event) => {
// Transacao compensatoria: liberar estoque
return new ReleaseInventoryCommand({
orderId: event.payload.orderId,
});
}),
);
};
}
Considerações de Implementação
| Desafio | Solucao |
|---|---|
| Consistencia eventual | Feedback adequado na UI, atualizacao otimista |
| Evolucao de schema de eventos | Versionamento de eventos, upcasting |
| Reconstrucao de projections | Snapshots, processamento paralelo |
| Complexidade de debug | Correlation ID, logs detalhados, ferramentas de visualizacao de eventos |
| Custo inicial de implementacao | Adocao gradual, comecar por dominios importantes |