Message queues are a critical foundational technology for enabling asynchronous communication between components in distributed systems. A properly designed messaging system enables scalability, reliability, and loosely coupled architecture.
Basic Concepts of Message Queues
Synchronous vs Asynchronous Processing
Synchronous Processing:
flowchart LR
C["Client"] <-->|"Waits for entire processing<br/>Response time: 500ms+"| S["Server"]
S <--> DB["DB"]
Asynchronous Processing (using message queue):
flowchart TB
C["Client"] -->|"Request"| S["Server"]
S -->|"Immediate response (50ms)"| C
S --> Q["Queue"]
Q --> W["Consumer (Worker)"]
W --> DB["DB"]
Messaging Patterns
1. Point-to-Point (Queue):
flowchart LR
P["Producer"] --> Q["Queue"] --> C["Consumer"]
- One message is received by only one Consumer
- Suitable for load balancing
2. Publish/Subscribe (Topic):
flowchart LR
Pub["Publisher"] --> T["Topic"]
T --> A["Subscriber A"]
T --> B["Subscriber B"]
T --> C["Subscriber C"]
- One message is received by multiple Subscribers
- Suitable for event notifications
3. Request/Reply:
flowchart LR
C["Client"] --> RQ["Request Queue"] --> S["Server"]
S --> RepQ["Reply Queue"] --> C
- Asynchronous RPC-like pattern
Major Message Queue Systems
System Comparison
| Feature | RabbitMQ | Apache Kafka | Amazon SQS | Redis Streams |
|---|---|---|---|---|
| Protocol | AMQP | Proprietary | HTTP/HTTPS | Redis Protocol |
| Message Retention | Deleted after consumption | Persistent retention | Up to 14 days | Configurable |
| Order Guarantee | Per queue | Per partition | FIFO available | Per stream |
| Throughput | Medium | Very high | High | High |
| Latency | Low | Medium | Medium | Very low |
| Use Case | Task distribution | Event streaming | Cloud-native | Real-time |
RabbitMQ
// RabbitMQ with amqplib
import amqp from 'amqplib';
// Connection management class
class RabbitMQConnection {
private connection: amqp.Connection | null = null;
private channel: amqp.Channel | null = null;
async connect(url: string): Promise<void> {
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
// Prefetch setting (concurrent processing count)
await this.channel.prefetch(10);
}
async setupExchangeAndQueue(
exchange: string,
queue: string,
routingKey: string
): Promise<void> {
if (!this.channel) throw new Error('Not connected');
// Declare Exchange
await this.channel.assertExchange(exchange, 'topic', {
durable: true,
});
// Declare Queue
await this.channel.assertQueue(queue, {
durable: true,
deadLetterExchange: `${exchange}.dlx`, // Dead letter queue
messageTtl: 86400000, // 24 hours
});
// Binding
await this.channel.bindQueue(queue, exchange, routingKey);
}
async publish(
exchange: string,
routingKey: string,
message: object
): Promise<boolean> {
if (!this.channel) throw new Error('Not connected');
return this.channel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(message)),
{
persistent: true, // Persistence
contentType: 'application/json',
timestamp: Date.now(),
messageId: crypto.randomUUID(),
}
);
}
async consume(
queue: string,
handler: (msg: amqp.Message) => Promise<void>
): Promise<void> {
if (!this.channel) throw new Error('Not connected');
await this.channel.consume(queue, async (msg) => {
if (!msg) return;
try {
await handler(msg);
this.channel!.ack(msg); // Ack on success
} catch (error) {
console.error('Message processing failed:', error);
// Check retry count
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;
if (retryCount < 3) {
// Retry (with delay)
await this.publishWithDelay(
msg.fields.exchange,
msg.fields.routingKey,
msg.content,
retryCount
);
}
this.channel!.nack(msg, false, false); // Send to DLQ
}
});
}
private async publishWithDelay(
exchange: string,
routingKey: string,
content: Buffer,
retryCount: number
): Promise<void> {
const delay = Math.pow(2, retryCount) * 1000; // Exponential backoff
this.channel!.publish(
`${exchange}.delayed`,
routingKey,
content,
{
headers: {
'x-delay': delay,
'x-retry-count': retryCount,
},
}
);
}
async close(): Promise<void> {
await this.channel?.close();
await this.connection?.close();
}
}
// Usage example
const rabbit = new RabbitMQConnection();
await rabbit.connect('amqp://localhost');
await rabbit.setupExchangeAndQueue(
'orders',
'order-processing',
'order.created'
);
// Producer
await rabbit.publish('orders', 'order.created', {
orderId: '12345',
userId: 'user-1',
items: [{ productId: 'prod-1', quantity: 2 }],
});
// Consumer
await rabbit.consume('order-processing', async (msg) => {
const order = JSON.parse(msg.content.toString());
console.log('Processing order:', order.orderId);
// Order processing logic
});
Apache Kafka
// Kafka with kafkajs
import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs';
// Kafka connection configuration
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
ssl: true,
sasl: {
mechanism: 'scram-sha-256',
username: process.env.KAFKA_USERNAME!,
password: process.env.KAFKA_PASSWORD!,
},
retry: {
initialRetryTime: 100,
retries: 8,
},
});
// Producer
class KafkaProducer {
private producer: Producer;
constructor() {
this.producer = kafka.producer({
idempotent: true, // Idempotency guarantee
maxInFlightRequests: 5,
transactionalId: 'my-transactional-id', // For transactions
});
}
async connect(): Promise<void> {
await this.producer.connect();
}
async sendBatch(
topic: string,
messages: Array<{ key: string; value: object }>
): Promise<void> {
await this.producer.send({
topic,
messages: messages.map(({ key, value }) => ({
key,
value: JSON.stringify(value),
headers: {
'correlation-id': crypto.randomUUID(),
timestamp: Date.now().toString(),
},
})),
acks: -1, // Wait for write completion to all replicas
timeout: 30000,
});
}
// Send with transaction
async sendWithTransaction(
messages: Array<{ topic: string; key: string; value: object }>
): Promise<void> {
const transaction = await this.producer.transaction();
try {
for (const { topic, key, value } of messages) {
await transaction.send({
topic,
messages: [{ key, value: JSON.stringify(value) }],
});
}
await transaction.commit();
} catch (error) {
await transaction.abort();
throw error;
}
}
async disconnect(): Promise<void> {
await this.producer.disconnect();
}
}
// Consumer
class KafkaConsumer {
private consumer: Consumer;
constructor(groupId: string) {
this.consumer = kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576, // 1MB
});
}
async connect(): Promise<void> {
await this.consumer.connect();
}
async subscribe(topics: string[]): Promise<void> {
await this.consumer.subscribe({
topics,
fromBeginning: false,
});
}
async run(
handler: (payload: EachMessagePayload) => Promise<void>
): Promise<void> {
await this.consumer.run({
eachMessage: async (payload) => {
const { topic, partition, message } = payload;
console.log({
topic,
partition,
offset: message.offset,
key: message.key?.toString(),
});
try {
await handler(payload);
} catch (error) {
console.error('Message processing failed:', error);
// Error handling (e.g., send to DLQ)
}
},
});
}
async disconnect(): Promise<void> {
await this.consumer.disconnect();
}
}
// Exactly-Once Semantics (EOS) Implementation
class KafkaEOSProcessor {
private producer: Producer;
private consumer: Consumer;
async processWithEOS(
inputTopic: string,
outputTopic: string,
transform: (value: any) => any
): Promise<void> {
await this.consumer.subscribe({ topics: [inputTopic] });
await this.consumer.run({
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => {
const transaction = await this.producer.transaction();
try {
for (const message of batch.messages) {
const value = JSON.parse(message.value!.toString());
const transformed = transform(value);
await transaction.send({
topic: outputTopic,
messages: [{ value: JSON.stringify(transformed) }],
});
resolveOffset(message.offset);
}
await transaction.sendOffsets({
consumerGroupId: 'my-group',
topics: [{
topic: inputTopic,
partitions: [{
partition: batch.partition,
offset: batch.lastOffset(),
}],
}],
});
await transaction.commit();
} catch (error) {
await transaction.abort();
throw error;
}
},
});
}
}
Amazon SQS
// AWS SQS with @aws-sdk/client-sqs
import {
SQSClient,
SendMessageCommand,
ReceiveMessageCommand,
DeleteMessageCommand,
SendMessageBatchCommand,
Message,
} from '@aws-sdk/client-sqs';
class SQSQueue {
private client: SQSClient;
private queueUrl: string;
constructor(queueUrl: string, region: string = 'ap-northeast-1') {
this.client = new SQSClient({ region });
this.queueUrl = queueUrl;
}
async sendMessage(
body: object,
options: {
delaySeconds?: number;
messageGroupId?: string; // For FIFO
deduplicationId?: string; // For FIFO
} = {}
): Promise<string> {
const command = new SendMessageCommand({
QueueUrl: this.queueUrl,
MessageBody: JSON.stringify(body),
DelaySeconds: options.delaySeconds,
MessageGroupId: options.messageGroupId,
MessageDeduplicationId: options.deduplicationId,
MessageAttributes: {
Timestamp: {
DataType: 'Number',
StringValue: Date.now().toString(),
},
},
});
const response = await this.client.send(command);
return response.MessageId!;
}
async sendBatch(
messages: Array<{ id: string; body: object; delaySeconds?: number }>
): Promise<void> {
// SQS allows max 10 messages per batch
const chunks = this.chunkArray(messages, 10);
for (const chunk of chunks) {
const command = new SendMessageBatchCommand({
QueueUrl: this.queueUrl,
Entries: chunk.map(({ id, body, delaySeconds }) => ({
Id: id,
MessageBody: JSON.stringify(body),
DelaySeconds: delaySeconds,
})),
});
await this.client.send(command);
}
}
async receiveMessages(maxMessages: number = 10): Promise<Message[]> {
const command = new ReceiveMessageCommand({
QueueUrl: this.queueUrl,
MaxNumberOfMessages: Math.min(maxMessages, 10),
WaitTimeSeconds: 20, // Long polling
VisibilityTimeout: 30,
AttributeNames: ['All'],
MessageAttributeNames: ['All'],
});
const response = await this.client.send(command);
return response.Messages || [];
}
async deleteMessage(receiptHandle: string): Promise<void> {
const command = new DeleteMessageCommand({
QueueUrl: this.queueUrl,
ReceiptHandle: receiptHandle,
});
await this.client.send(command);
}
// Polling loop
async startPolling(
handler: (message: Message) => Promise<void>,
options: { concurrency?: number } = {}
): Promise<void> {
const { concurrency = 5 } = options;
while (true) {
const messages = await this.receiveMessages(concurrency);
if (messages.length === 0) continue;
await Promise.all(
messages.map(async (message) => {
try {
await handler(message);
await this.deleteMessage(message.ReceiptHandle!);
} catch (error) {
console.error('Message processing failed:', error);
// Message automatically returns to queue (after Visibility Timeout)
}
})
);
}
}
private chunkArray<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
}
// Lambda integration (SQS trigger)
export const handler = async (event: {
Records: Array<{
body: string;
receiptHandle: string;
messageId: string;
}>;
}): Promise<{ batchItemFailures: Array<{ itemIdentifier: string }> }> => {
const failures: string[] = [];
await Promise.all(
event.Records.map(async (record) => {
try {
const body = JSON.parse(record.body);
await processMessage(body);
} catch (error) {
console.error(`Failed to process ${record.messageId}:`, error);
failures.push(record.messageId);
}
})
);
// Report partial batch failures
return {
batchItemFailures: failures.map((id) => ({ itemIdentifier: id })),
};
};
Redis Streams
// Redis Streams with ioredis
import Redis from 'ioredis';
class RedisStreamQueue {
private redis: Redis;
private consumerGroup: string;
private consumerName: string;
constructor(
redisUrl: string,
consumerGroup: string,
consumerName: string
) {
this.redis = new Redis(redisUrl);
this.consumerGroup = consumerGroup;
this.consumerName = consumerName;
}
// Create Consumer Group
async createConsumerGroup(stream: string): Promise<void> {
try {
await this.redis.xgroup(
'CREATE',
stream,
this.consumerGroup,
'0',
'MKSTREAM'
);
} catch (error: any) {
if (!error.message.includes('BUSYGROUP')) {
throw error;
}
// Ignore if group already exists
}
}
// Add message
async addMessage(
stream: string,
data: Record<string, string>,
maxLen?: number
): Promise<string> {
const args: (string | number)[] = [stream];
if (maxLen) {
args.push('MAXLEN', '~', maxLen); // Approximate length limit
}
args.push('*'); // Auto-generate ID
// Flatten data
for (const [key, value] of Object.entries(data)) {
args.push(key, value);
}
return (await this.redis.xadd(...args)) as string;
}
// Read messages (Consumer Group)
async readMessages(
stream: string,
count: number = 10,
blockMs: number = 5000
): Promise<Array<{ id: string; data: Record<string, string> }>> {
const result = await this.redis.xreadgroup(
'GROUP',
this.consumerGroup,
this.consumerName,
'COUNT',
count,
'BLOCK',
blockMs,
'STREAMS',
stream,
'>' // Undelivered messages only
);
if (!result) return [];
const messages: Array<{ id: string; data: Record<string, string> }> = [];
for (const [, entries] of result) {
for (const [id, fields] of entries) {
const data: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) {
data[fields[i]] = fields[i + 1];
}
messages.push({ id, data });
}
}
return messages;
}
// Acknowledge processing completion
async acknowledgeMessage(stream: string, messageId: string): Promise<void> {
await this.redis.xack(stream, this.consumerGroup, messageId);
}
// Reprocess unacknowledged messages
async claimPendingMessages(
stream: string,
minIdleMs: number = 60000,
count: number = 10
): Promise<Array<{ id: string; data: Record<string, string> }>> {
// Get pending messages
const pending = await this.redis.xpending(
stream,
this.consumerGroup,
'-',
'+',
count
);
const messages: Array<{ id: string; data: Record<string, string> }> = [];
for (const [id, , idleTime] of pending) {
if (idleTime >= minIdleMs) {
// Claim the message
const claimed = await this.redis.xclaim(
stream,
this.consumerGroup,
this.consumerName,
minIdleMs,
id
);
if (claimed.length > 0) {
const [, fields] = claimed[0];
const data: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) {
data[fields[i]] = fields[i + 1];
}
messages.push({ id, data });
}
}
}
return messages;
}
// Message processing loop
async startProcessing(
stream: string,
handler: (message: { id: string; data: Record<string, string> }) => Promise<void>
): Promise<void> {
await this.createConsumerGroup(stream);
while (true) {
// Process new messages
const messages = await this.readMessages(stream);
for (const message of messages) {
try {
await handler(message);
await this.acknowledgeMessage(stream, message.id);
} catch (error) {
console.error(`Failed to process message ${message.id}:`, error);
// Message remains pending
}
}
// Reprocess old pending messages
const pendingMessages = await this.claimPendingMessages(stream);
for (const message of pendingMessages) {
try {
await handler(message);
await this.acknowledgeMessage(stream, message.id);
} catch (error) {
console.error(`Failed to reprocess message ${message.id}:`, error);
}
}
}
}
async close(): Promise<void> {
await this.redis.quit();
}
}
Design Patterns and Best Practices
Message Idempotency
// Duplicate elimination using idempotency key
interface IdempotentMessage {
idempotencyKey: string;
payload: unknown;
timestamp: number;
}
class IdempotentProcessor {
private processedKeys: Map<string, { result: unknown; expiry: number }> = new Map();
private ttlMs: number = 24 * 60 * 60 * 1000; // 24 hours
async process<T>(
message: IdempotentMessage,
handler: (payload: unknown) => Promise<T>
): Promise<T> {
const { idempotencyKey, payload } = message;
// Check if already processed
const cached = this.processedKeys.get(idempotencyKey);
if (cached && cached.expiry > Date.now()) {
console.log(`Returning cached result for ${idempotencyKey}`);
return cached.result as T;
}
// Execute processing
const result = await handler(payload);
// Cache result
this.processedKeys.set(idempotencyKey, {
result,
expiry: Date.now() + this.ttlMs,
});
return result;
}
// Redis version (for distributed environments)
async processWithRedis<T>(
redis: Redis,
message: IdempotentMessage,
handler: (payload: unknown) => Promise<T>
): Promise<T> {
const { idempotencyKey, payload } = message;
const cacheKey = `idempotency:${idempotencyKey}`;
// Check if already processed
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached) as T;
}
// Exclusive control (processing flag)
const lockKey = `lock:${idempotencyKey}`;
const acquired = await redis.set(lockKey, '1', 'EX', 30, 'NX');
if (!acquired) {
// Another worker is processing, wait for result
await this.waitForResult(redis, cacheKey);
const result = await redis.get(cacheKey);
return JSON.parse(result!) as T;
}
try {
const result = await handler(payload);
// Cache result
await redis.setex(cacheKey, this.ttlMs / 1000, JSON.stringify(result));
return result;
} finally {
await redis.del(lockKey);
}
}
private async waitForResult(
redis: Redis,
key: string,
maxWaitMs: number = 30000
): Promise<void> {
const startTime = Date.now();
while (Date.now() - startTime < maxWaitMs) {
const result = await redis.get(key);
if (result) return;
await new Promise((resolve) => setTimeout(resolve, 100));
}
throw new Error('Timeout waiting for result');
}
}
Dead Letter Queue (DLQ)
flowchart TB
MQ["Main Queue"] --> C["Consumer"]
C --> Success
C --> Failure
Success --> Complete
Failure --> Check{"Retry count < 3?"}
Check -->|YES| Requeue["Re-queue<br/>(with delay)"]
Requeue --> MQ
Check -->|NO| DLQ["Dead Letter Queue"]
DLQ --> Monitor["Monitor/Alert<br/>Manual handling"]
// DLQ implementation example
interface DeadLetterMessage {
originalMessage: unknown;
error: string;
failedAt: string;
retryCount: number;
originalQueue: string;
}
class DeadLetterQueueHandler {
constructor(
private mainQueue: MessageQueue,
private dlq: MessageQueue
) {}
async processWithDLQ<T>(
message: Message,
handler: (msg: Message) => Promise<T>,
maxRetries: number = 3
): Promise<T | null> {
const retryCount = this.getRetryCount(message);
try {
return await handler(message);
} catch (error) {
if (retryCount < maxRetries) {
// Retry
await this.requeueWithDelay(message, retryCount + 1);
} else {
// Send to DLQ
await this.sendToDLQ(message, error as Error, retryCount);
}
return null;
}
}
private getRetryCount(message: Message): number {
return message.headers?.['x-retry-count'] || 0;
}
private async requeueWithDelay(
message: Message,
retryCount: number
): Promise<void> {
const delay = Math.min(
1000 * Math.pow(2, retryCount), // Exponential backoff
60000 // Max 1 minute
);
await this.mainQueue.publish({
...message,
headers: {
...message.headers,
'x-retry-count': retryCount,
'x-retry-delay': delay,
},
}, { delayMs: delay });
}
private async sendToDLQ(
message: Message,
error: Error,
retryCount: number
): Promise<void> {
const dlqMessage: DeadLetterMessage = {
originalMessage: message,
error: error.message,
failedAt: new Date().toISOString(),
retryCount,
originalQueue: this.mainQueue.name,
};
await this.dlq.publish(dlqMessage);
// Send alert
await this.sendAlert(dlqMessage);
}
private async sendAlert(message: DeadLetterMessage): Promise<void> {
console.error('Message sent to DLQ:', {
error: message.error,
retryCount: message.retryCount,
originalQueue: message.originalQueue,
});
// Notification to Slack/PagerDuty etc.
}
}
Order Guarantee
// Order guarantee using partition key
class OrderedMessageProcessor {
private processingMap: Map<string, Promise<void>> = new Map();
async processInOrder<T>(
partitionKey: string,
handler: () => Promise<T>
): Promise<T> {
// Wait for previous processing with the same partition key
const previousPromise = this.processingMap.get(partitionKey) || Promise.resolve();
const currentPromise = previousPromise.then(handler);
this.processingMap.set(partitionKey, currentPromise.then(() => {}));
try {
return await currentPromise;
} finally {
// Clean up map after processing
if (this.processingMap.get(partitionKey) === currentPromise) {
this.processingMap.delete(partitionKey);
}
}
}
}
// Usage example
const processor = new OrderedMessageProcessor();
// Using user ID as partition key
// Messages for the same user are processed in order
await processor.processInOrder(message.userId, async () => {
await processUserMessage(message);
});
Monitoring and Observability
// Metrics collection
interface QueueMetrics {
messagesPublished: number;
messagesConsumed: number;
messagesFailed: number;
processingLatencyMs: number[];
queueDepth: number;
}
class MetricsCollector {
private metrics: QueueMetrics = {
messagesPublished: 0,
messagesConsumed: 0,
messagesFailed: 0,
processingLatencyMs: [],
queueDepth: 0,
};
recordPublish(): void {
this.metrics.messagesPublished++;
}
recordConsume(latencyMs: number): void {
this.metrics.messagesConsumed++;
this.metrics.processingLatencyMs.push(latencyMs);
}
recordFailure(): void {
this.metrics.messagesFailed++;
}
updateQueueDepth(depth: number): void {
this.metrics.queueDepth = depth;
}
getStats(): {
throughput: number;
errorRate: number;
avgLatency: number;
p99Latency: number;
} {
const total = this.metrics.messagesConsumed + this.metrics.messagesFailed;
return {
throughput: this.metrics.messagesConsumed,
errorRate: total > 0 ? this.metrics.messagesFailed / total : 0,
avgLatency: this.calculateAverage(this.metrics.processingLatencyMs),
p99Latency: this.calculatePercentile(this.metrics.processingLatencyMs, 99),
};
}
private calculateAverage(values: number[]): number {
if (values.length === 0) return 0;
return values.reduce((a, b) => a + b, 0) / values.length;
}
private calculatePercentile(values: number[], percentile: number): number {
if (values.length === 0) return 0;
const sorted = [...values].sort((a, b) => a - b);
const index = Math.ceil((percentile / 100) * sorted.length) - 1;
return sorted[index];
}
}
Summary
Message queues are a key technology in distributed systems.
Selection Criteria
| Requirement | Recommended System |
|---|---|
| Simple task queue | RabbitMQ, SQS |
| Event streaming | Kafka |
| Real-time processing | Redis Streams |
| Serverless | SQS + Lambda |
| Log aggregation | Kafka |
Design Considerations
- Ensure idempotency: Prevent duplicate processing
- Order guarantee: Only when necessary, per partition
- Error handling: DLQ and retry strategy
- Monitoring: Throughput, latency, error rate
With proper message queue selection and design, you can build scalable and reliable systems.
Reference Links
- RabbitMQ Documentation
- Apache Kafka Documentation
- Amazon SQS Developer Guide
- Redis Streams Introduction