Las colas de mensajes son una tecnologia fundamental que permite la comunicacion asincrona entre componentes en sistemas distribuidos. Un sistema de mensajeria bien disenado permite escalabilidad, confiabilidad y una arquitectura desacoplada.
Conceptos Basicos de Colas de Mensajes
Procesamiento Sincrono vs Asincrono
flowchart TB
subgraph Sync["Procesamiento Sincrono"]
C1["Client"] --> S1["Server"] --> D1["DB"]
D1 --> S1 --> C1
Note1["Espera hasta completar todo<br/>Tiempo de respuesta: 500ms+"]
end
subgraph Async["Procesamiento Asincrono (usando Cola de Mensajes)"]
C2["Client"] --> S2["Server"]
S2 --> C2
S2 --> Q["Queue"]
Q --> W["Consumer<br/>(Worker)"]
W --> D2["DB"]
Note2["Respuesta inmediata (50ms)"]
end
Patrones de Mensajeria
flowchart LR
subgraph P2P["1. Point-to-Point (Cola)"]
Prod["Producer"] --> Queue["Queue"] --> Cons["Consumer"]
end
subgraph PubSub["2. Publish/Subscribe (Tema)"]
Pub["Publisher"] --> Topic["Topic"]
Topic --> SubA["Subscriber A"]
Topic --> SubB["Subscriber B"]
Topic --> SubC["Subscriber C"]
end
subgraph ReqRep["3. Request/Reply"]
Client["Client"] --> ReqQ["Request Queue"] --> Server["Server"]
Server --> RepQ["Reply Queue"] --> Client
end
Point-to-Point: Un mensaje es recibido por un solo Consumer, adecuado para balanceo de carga
Publish/Subscribe: Un mensaje es recibido por multiples Subscribers, adecuado para notificacion de eventos
Request/Reply: Patron similar a RPC asincrono
Principales Sistemas de Colas de Mensajes
Comparacion de Sistemas
| Caracteristica | RabbitMQ | Apache Kafka | Amazon SQS | Redis Streams |
|---|---|---|---|---|
| Protocolo | AMQP | Propietario | HTTP/HTTPS | Redis Protocol |
| Retencion de Mensajes | Eliminado despues del consumo | Persistencia permanente | Maximo 14 dias | Configurable |
| Garantia de Orden | Por cola | Por particion | FIFO disponible | Por stream |
| Throughput | Medio | Muy alto | Alto | Alto |
| Latencia | Baja | Media | Media | Muy baja |
| Uso | Distribucion de tareas | Event streaming | Cloud native | Tiempo real |
RabbitMQ
// RabbitMQ with amqplib
import amqp from 'amqplib';
// Clase de gestion de conexion
class RabbitMQConnection {
private connection: amqp.Connection | null = null;
private channel: amqp.Channel | null = null;
async connect(url: string): Promise<void> {
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
// Configuracion de prefetch (numero de procesamiento simultaneo)
await this.channel.prefetch(10);
}
async setupExchangeAndQueue(
exchange: string,
queue: string,
routingKey: string
): Promise<void> {
if (!this.channel) throw new Error('Not connected');
// Declaracion del Exchange
await this.channel.assertExchange(exchange, 'topic', {
durable: true,
});
// Declaracion de la cola
await this.channel.assertQueue(queue, {
durable: true,
deadLetterExchange: `${exchange}.dlx`, // Cola de mensajes fallidos
messageTtl: 86400000, // 24 horas
});
// Binding
await this.channel.bindQueue(queue, exchange, routingKey);
}
async publish(
exchange: string,
routingKey: string,
message: object
): Promise<boolean> {
if (!this.channel) throw new Error('Not connected');
return this.channel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(message)),
{
persistent: true, // Persistencia
contentType: 'application/json',
timestamp: Date.now(),
messageId: crypto.randomUUID(),
}
);
}
async consume(
queue: string,
handler: (msg: amqp.Message) => Promise<void>
): Promise<void> {
if (!this.channel) throw new Error('Not connected');
await this.channel.consume(queue, async (msg) => {
if (!msg) return;
try {
await handler(msg);
this.channel!.ack(msg); // ack en caso de exito
} catch (error) {
console.error('Message processing failed:', error);
// Verificar numero de reintentos
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;
if (retryCount < 3) {
// Reintento (con retraso)
await this.publishWithDelay(
msg.fields.exchange,
msg.fields.routingKey,
msg.content,
retryCount
);
}
this.channel!.nack(msg, false, false); // Hacia DLQ
}
});
}
private async publishWithDelay(
exchange: string,
routingKey: string,
content: Buffer,
retryCount: number
): Promise<void> {
const delay = Math.pow(2, retryCount) * 1000; // Backoff exponencial
this.channel!.publish(
`${exchange}.delayed`,
routingKey,
content,
{
headers: {
'x-delay': delay,
'x-retry-count': retryCount,
},
}
);
}
async close(): Promise<void> {
await this.channel?.close();
await this.connection?.close();
}
}
// Ejemplo de uso
const rabbit = new RabbitMQConnection();
await rabbit.connect('amqp://localhost');
await rabbit.setupExchangeAndQueue(
'orders',
'order-processing',
'order.created'
);
// Producer
await rabbit.publish('orders', 'order.created', {
orderId: '12345',
userId: 'user-1',
items: [{ productId: 'prod-1', quantity: 2 }],
});
// Consumer
await rabbit.consume('order-processing', async (msg) => {
const order = JSON.parse(msg.content.toString());
console.log('Processing order:', order.orderId);
// Logica de procesamiento de pedido
});
Apache Kafka
// Kafka with kafkajs
import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs';
// Configuracion de conexion Kafka
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
ssl: true,
sasl: {
mechanism: 'scram-sha-256',
username: process.env.KAFKA_USERNAME!,
password: process.env.KAFKA_PASSWORD!,
},
retry: {
initialRetryTime: 100,
retries: 8,
},
});
// Producer
class KafkaProducer {
private producer: Producer;
constructor() {
this.producer = kafka.producer({
idempotent: true, // Garantia de idempotencia
maxInFlightRequests: 5,
transactionalId: 'my-transactional-id', // Para transacciones
});
}
async connect(): Promise<void> {
await this.producer.connect();
}
async sendBatch(
topic: string,
messages: Array<{ key: string; value: object }>
): Promise<void> {
await this.producer.send({
topic,
messages: messages.map(({ key, value }) => ({
key,
value: JSON.stringify(value),
headers: {
'correlation-id': crypto.randomUUID(),
timestamp: Date.now().toString(),
},
})),
acks: -1, // Esperar hasta que se complete la escritura en todas las replicas
timeout: 30000,
});
}
// Envio con transaccion
async sendWithTransaction(
messages: Array<{ topic: string; key: string; value: object }>
): Promise<void> {
const transaction = await this.producer.transaction();
try {
for (const { topic, key, value } of messages) {
await transaction.send({
topic,
messages: [{ key, value: JSON.stringify(value) }],
});
}
await transaction.commit();
} catch (error) {
await transaction.abort();
throw error;
}
}
async disconnect(): Promise<void> {
await this.producer.disconnect();
}
}
// Consumer
class KafkaConsumer {
private consumer: Consumer;
constructor(groupId: string) {
this.consumer = kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576, // 1MB
});
}
async connect(): Promise<void> {
await this.consumer.connect();
}
async subscribe(topics: string[]): Promise<void> {
await this.consumer.subscribe({
topics,
fromBeginning: false,
});
}
async run(
handler: (payload: EachMessagePayload) => Promise<void>
): Promise<void> {
await this.consumer.run({
eachMessage: async (payload) => {
const { topic, partition, message } = payload;
console.log({
topic,
partition,
offset: message.offset,
key: message.key?.toString(),
});
try {
await handler(payload);
} catch (error) {
console.error('Message processing failed:', error);
// Manejo de errores (envio a DLQ, etc.)
}
},
});
}
async disconnect(): Promise<void> {
await this.consumer.disconnect();
}
}
// Implementacion de Exactly-Once Semantics (EOS)
class KafkaEOSProcessor {
private producer: Producer;
private consumer: Consumer;
async processWithEOS(
inputTopic: string,
outputTopic: string,
transform: (value: any) => any
): Promise<void> {
await this.consumer.subscribe({ topics: [inputTopic] });
await this.consumer.run({
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => {
const transaction = await this.producer.transaction();
try {
for (const message of batch.messages) {
const value = JSON.parse(message.value!.toString());
const transformed = transform(value);
await transaction.send({
topic: outputTopic,
messages: [{ value: JSON.stringify(transformed) }],
});
resolveOffset(message.offset);
}
await transaction.sendOffsets({
consumerGroupId: 'my-group',
topics: [{
topic: inputTopic,
partitions: [{
partition: batch.partition,
offset: batch.lastOffset(),
}],
}],
});
await transaction.commit();
} catch (error) {
await transaction.abort();
throw error;
}
},
});
}
}
Amazon SQS
// AWS SQS with @aws-sdk/client-sqs
import {
SQSClient,
SendMessageCommand,
ReceiveMessageCommand,
DeleteMessageCommand,
SendMessageBatchCommand,
Message,
} from '@aws-sdk/client-sqs';
class SQSQueue {
private client: SQSClient;
private queueUrl: string;
constructor(queueUrl: string, region: string = 'ap-northeast-1') {
this.client = new SQSClient({ region });
this.queueUrl = queueUrl;
}
async sendMessage(
body: object,
options: {
delaySeconds?: number;
messageGroupId?: string; // Para FIFO
deduplicationId?: string; // Para FIFO
} = {}
): Promise<string> {
const command = new SendMessageCommand({
QueueUrl: this.queueUrl,
MessageBody: JSON.stringify(body),
DelaySeconds: options.delaySeconds,
MessageGroupId: options.messageGroupId,
MessageDeduplicationId: options.deduplicationId,
MessageAttributes: {
Timestamp: {
DataType: 'Number',
StringValue: Date.now().toString(),
},
},
});
const response = await this.client.send(command);
return response.MessageId!;
}
async sendBatch(
messages: Array<{ id: string; body: object; delaySeconds?: number }>
): Promise<void> {
// SQS permite maximo 10 mensajes por lote
const chunks = this.chunkArray(messages, 10);
for (const chunk of chunks) {
const command = new SendMessageBatchCommand({
QueueUrl: this.queueUrl,
Entries: chunk.map(({ id, body, delaySeconds }) => ({
Id: id,
MessageBody: JSON.stringify(body),
DelaySeconds: delaySeconds,
})),
});
await this.client.send(command);
}
}
async receiveMessages(maxMessages: number = 10): Promise<Message[]> {
const command = new ReceiveMessageCommand({
QueueUrl: this.queueUrl,
MaxNumberOfMessages: Math.min(maxMessages, 10),
WaitTimeSeconds: 20, // Long polling
VisibilityTimeout: 30,
AttributeNames: ['All'],
MessageAttributeNames: ['All'],
});
const response = await this.client.send(command);
return response.Messages || [];
}
async deleteMessage(receiptHandle: string): Promise<void> {
const command = new DeleteMessageCommand({
QueueUrl: this.queueUrl,
ReceiptHandle: receiptHandle,
});
await this.client.send(command);
}
// Bucle de polling
async startPolling(
handler: (message: Message) => Promise<void>,
options: { concurrency?: number } = {}
): Promise<void> {
const { concurrency = 5 } = options;
while (true) {
const messages = await this.receiveMessages(concurrency);
if (messages.length === 0) continue;
await Promise.all(
messages.map(async (message) => {
try {
await handler(message);
await this.deleteMessage(message.ReceiptHandle!);
} catch (error) {
console.error('Message processing failed:', error);
// El mensaje vuelve automaticamente a la cola (despues del Visibility Timeout)
}
})
);
}
}
private chunkArray<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
}
// Integracion con Lambda (trigger SQS)
export const handler = async (event: {
Records: Array<{
body: string;
receiptHandle: string;
messageId: string;
}>;
}): Promise<{ batchItemFailures: Array<{ itemIdentifier: string }> }> => {
const failures: string[] = [];
await Promise.all(
event.Records.map(async (record) => {
try {
const body = JSON.parse(record.body);
await processMessage(body);
} catch (error) {
console.error(`Failed to process ${record.messageId}:`, error);
failures.push(record.messageId);
}
})
);
// Reportar fallos parciales del lote
return {
batchItemFailures: failures.map((id) => ({ itemIdentifier: id })),
};
};
Redis Streams
// Redis Streams with ioredis
import Redis from 'ioredis';
class RedisStreamQueue {
private redis: Redis;
private consumerGroup: string;
private consumerName: string;
constructor(
redisUrl: string,
consumerGroup: string,
consumerName: string
) {
this.redis = new Redis(redisUrl);
this.consumerGroup = consumerGroup;
this.consumerName = consumerName;
}
// Creacion del Consumer Group
async createConsumerGroup(stream: string): Promise<void> {
try {
await this.redis.xgroup(
'CREATE',
stream,
this.consumerGroup,
'0',
'MKSTREAM'
);
} catch (error: any) {
if (!error.message.includes('BUSYGROUP')) {
throw error;
}
// Ignorar si el grupo ya existe
}
}
// Agregar mensaje
async addMessage(
stream: string,
data: Record<string, string>,
maxLen?: number
): Promise<string> {
const args: (string | number)[] = [stream];
if (maxLen) {
args.push('MAXLEN', '~', maxLen); // Limite de longitud aproximado
}
args.push('*'); // Generacion automatica de ID
// Aplanar datos
for (const [key, value] of Object.entries(data)) {
args.push(key, value);
}
return (await this.redis.xadd(...args)) as string;
}
// Lectura de mensajes (Consumer Group)
async readMessages(
stream: string,
count: number = 10,
blockMs: number = 5000
): Promise<Array<{ id: string; data: Record<string, string> }>> {
const result = await this.redis.xreadgroup(
'GROUP',
this.consumerGroup,
this.consumerName,
'COUNT',
count,
'BLOCK',
blockMs,
'STREAMS',
stream,
'>' // Solo mensajes no entregados
);
if (!result) return [];
const messages: Array<{ id: string; data: Record<string, string> }> = [];
for (const [, entries] of result) {
for (const [id, fields] of entries) {
const data: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) {
data[fields[i]] = fields[i + 1];
}
messages.push({ id, data });
}
}
return messages;
}
// Confirmar procesamiento completado
async acknowledgeMessage(stream: string, messageId: string): Promise<void> {
await this.redis.xack(stream, this.consumerGroup, messageId);
}
// Reprocesar mensajes no confirmados
async claimPendingMessages(
stream: string,
minIdleMs: number = 60000,
count: number = 10
): Promise<Array<{ id: string; data: Record<string, string> }>> {
// Obtener mensajes pendientes
const pending = await this.redis.xpending(
stream,
this.consumerGroup,
'-',
'+',
count
);
const messages: Array<{ id: string; data: Record<string, string> }> = [];
for (const [id, , idleTime] of pending) {
if (idleTime >= minIdleMs) {
// Reclamar mensaje
const claimed = await this.redis.xclaim(
stream,
this.consumerGroup,
this.consumerName,
minIdleMs,
id
);
if (claimed.length > 0) {
const [, fields] = claimed[0];
const data: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) {
data[fields[i]] = fields[i + 1];
}
messages.push({ id, data });
}
}
}
return messages;
}
// Bucle de procesamiento de mensajes
async startProcessing(
stream: string,
handler: (message: { id: string; data: Record<string, string> }) => Promise<void>
): Promise<void> {
await this.createConsumerGroup(stream);
while (true) {
// Procesar nuevos mensajes
const messages = await this.readMessages(stream);
for (const message of messages) {
try {
await handler(message);
await this.acknowledgeMessage(stream, message.id);
} catch (error) {
console.error(`Failed to process message ${message.id}:`, error);
// El mensaje permanece como pendiente
}
}
// Reprocesar mensajes pendientes antiguos
const pendingMessages = await this.claimPendingMessages(stream);
for (const message of pendingMessages) {
try {
await handler(message);
await this.acknowledgeMessage(stream, message.id);
} catch (error) {
console.error(`Failed to reprocess message ${message.id}:`, error);
}
}
}
}
async close(): Promise<void> {
await this.redis.quit();
}
}
Patrones de Diseno y Mejores Practicas
Idempotencia de Mensajes
// Eliminacion de duplicados mediante clave de idempotencia
interface IdempotentMessage {
idempotencyKey: string;
payload: unknown;
timestamp: number;
}
class IdempotentProcessor {
private processedKeys: Map<string, { result: unknown; expiry: number }> = new Map();
private ttlMs: number = 24 * 60 * 60 * 1000; // 24 horas
async process<T>(
message: IdempotentMessage,
handler: (payload: unknown) => Promise<T>
): Promise<T> {
const { idempotencyKey, payload } = message;
// Verificar si ya fue procesado
const cached = this.processedKeys.get(idempotencyKey);
if (cached && cached.expiry > Date.now()) {
console.log(`Returning cached result for ${idempotencyKey}`);
return cached.result as T;
}
// Ejecutar procesamiento
const result = await handler(payload);
// Cachear resultado
this.processedKeys.set(idempotencyKey, {
result,
expiry: Date.now() + this.ttlMs,
});
return result;
}
// Version Redis (para entornos distribuidos)
async processWithRedis<T>(
redis: Redis,
message: IdempotentMessage,
handler: (payload: unknown) => Promise<T>
): Promise<T> {
const { idempotencyKey, payload } = message;
const cacheKey = `idempotency:${idempotencyKey}`;
// Verificar si ya fue procesado
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached) as T;
}
// Control de exclusion mutua (flag de procesamiento)
const lockKey = `lock:${idempotencyKey}`;
const acquired = await redis.set(lockKey, '1', 'EX', 30, 'NX');
if (!acquired) {
// Otro worker esta procesando, esperar resultado
await this.waitForResult(redis, cacheKey);
const result = await redis.get(cacheKey);
return JSON.parse(result!) as T;
}
try {
const result = await handler(payload);
// Cachear resultado
await redis.setex(cacheKey, this.ttlMs / 1000, JSON.stringify(result));
return result;
} finally {
await redis.del(lockKey);
}
}
private async waitForResult(
redis: Redis,
key: string,
maxWaitMs: number = 30000
): Promise<void> {
const startTime = Date.now();
while (Date.now() - startTime < maxWaitMs) {
const result = await redis.get(key);
if (result) return;
await new Promise((resolve) => setTimeout(resolve, 100));
}
throw new Error('Timeout waiting for result');
}
}
Cola de Mensajes Fallidos (DLQ)
flowchart TB
MQ["Main Queue"] --> Consumer["Consumer"]
Consumer --> Success{"Resultado"}
Success -->|Exito| Complete["Completado"]
Success -->|Fallo| Retry{"Reintentos < 3?"}
Retry -->|SI| Requeue["Re-encolar<br/>(con retraso)"]
Requeue --> MQ
Retry -->|NO| DLQ["Dead Letter Queue"]
DLQ --> Alert["Monitoreo/Alerta<br/>Intervencion manual"]
style DLQ fill:#f99,stroke:#f00
style Alert fill:#ff9,stroke:#f90
// Ejemplo de implementacion DLQ
interface DeadLetterMessage {
originalMessage: unknown;
error: string;
failedAt: string;
retryCount: number;
originalQueue: string;
}
class DeadLetterQueueHandler {
constructor(
private mainQueue: MessageQueue,
private dlq: MessageQueue
) {}
async processWithDLQ<T>(
message: Message,
handler: (msg: Message) => Promise<T>,
maxRetries: number = 3
): Promise<T | null> {
const retryCount = this.getRetryCount(message);
try {
return await handler(message);
} catch (error) {
if (retryCount < maxRetries) {
// Reintentar
await this.requeueWithDelay(message, retryCount + 1);
} else {
// Enviar a DLQ
await this.sendToDLQ(message, error as Error, retryCount);
}
return null;
}
}
private getRetryCount(message: Message): number {
return message.headers?.['x-retry-count'] || 0;
}
private async requeueWithDelay(
message: Message,
retryCount: number
): Promise<void> {
const delay = Math.min(
1000 * Math.pow(2, retryCount), // Backoff exponencial
60000 // Maximo 1 minuto
);
await this.mainQueue.publish({
...message,
headers: {
...message.headers,
'x-retry-count': retryCount,
'x-retry-delay': delay,
},
}, { delayMs: delay });
}
private async sendToDLQ(
message: Message,
error: Error,
retryCount: number
): Promise<void> {
const dlqMessage: DeadLetterMessage = {
originalMessage: message,
error: error.message,
failedAt: new Date().toISOString(),
retryCount,
originalQueue: this.mainQueue.name,
};
await this.dlq.publish(dlqMessage);
// Enviar alerta
await this.sendAlert(dlqMessage);
}
private async sendAlert(message: DeadLetterMessage): Promise<void> {
console.error('Message sent to DLQ:', {
error: message.error,
retryCount: message.retryCount,
originalQueue: message.originalQueue,
});
// Notificacion a Slack/PagerDuty, etc.
}
}
Garantia de Orden
// Garantia de orden mediante clave de particion
class OrderedMessageProcessor {
private processingMap: Map<string, Promise<void>> = new Map();
async processInOrder<T>(
partitionKey: string,
handler: () => Promise<T>
): Promise<T> {
// Esperar el procesamiento anterior con la misma clave de particion
const previousPromise = this.processingMap.get(partitionKey) || Promise.resolve();
const currentPromise = previousPromise.then(handler);
this.processingMap.set(partitionKey, currentPromise.then(() => {}));
try {
return await currentPromise;
} finally {
// Limpiar el mapa despues del procesamiento
if (this.processingMap.get(partitionKey) === currentPromise) {
this.processingMap.delete(partitionKey);
}
}
}
}
// Ejemplo de uso
const processor = new OrderedMessageProcessor();
// Usar el ID de usuario como clave de particion
// Los mensajes del mismo usuario se procesan en orden
await processor.processInOrder(message.userId, async () => {
await processUserMessage(message);
});
Monitoreo y Observabilidad
// Recoleccion de metricas
interface QueueMetrics {
messagesPublished: number;
messagesConsumed: number;
messagesFailed: number;
processingLatencyMs: number[];
queueDepth: number;
}
class MetricsCollector {
private metrics: QueueMetrics = {
messagesPublished: 0,
messagesConsumed: 0,
messagesFailed: 0,
processingLatencyMs: [],
queueDepth: 0,
};
recordPublish(): void {
this.metrics.messagesPublished++;
}
recordConsume(latencyMs: number): void {
this.metrics.messagesConsumed++;
this.metrics.processingLatencyMs.push(latencyMs);
}
recordFailure(): void {
this.metrics.messagesFailed++;
}
updateQueueDepth(depth: number): void {
this.metrics.queueDepth = depth;
}
getStats(): {
throughput: number;
errorRate: number;
avgLatency: number;
p99Latency: number;
} {
const total = this.metrics.messagesConsumed + this.metrics.messagesFailed;
return {
throughput: this.metrics.messagesConsumed,
errorRate: total > 0 ? this.metrics.messagesFailed / total : 0,
avgLatency: this.calculateAverage(this.metrics.processingLatencyMs),
p99Latency: this.calculatePercentile(this.metrics.processingLatencyMs, 99),
};
}
private calculateAverage(values: number[]): number {
if (values.length === 0) return 0;
return values.reduce((a, b) => a + b, 0) / values.length;
}
private calculatePercentile(values: number[], percentile: number): number {
if (values.length === 0) return 0;
const sorted = [...values].sort((a, b) => a - b);
const index = Math.ceil((percentile / 100) * sorted.length) - 1;
return sorted[index];
}
}
Resumen
Las colas de mensajes son una tecnologia fundamental en sistemas distribuidos.
Criterios de Seleccion
| Requisito | Sistema Recomendado |
|---|---|
| Cola de tareas simple | RabbitMQ, SQS |
| Event streaming | Kafka |
| Procesamiento en tiempo real | Redis Streams |
| Serverless | SQS + Lambda |
| Agregacion de logs | Kafka |
Puntos a Considerar en el Diseno
- Garantizar idempotencia: Prevenir procesamiento duplicado
- Garantia de orden: Solo cuando sea necesario, por unidad de particion
- Manejo de errores: DLQ y estrategia de reintentos
- Monitoreo: Throughput, latencia, tasa de errores
La seleccion y diseno apropiado de colas de mensajes permite construir sistemas escalables y confiables.
Enlaces de Referencia
- RabbitMQ Documentation
- Apache Kafka Documentation
- Amazon SQS Developer Guide
- Redis Streams Introduction