Очереди сообщений в современной архитектуре
Микросервисы общаются асинхронно через очереди сообщений и платформы event streaming. Вместо прямого вызова Сервиса B Сервисом A (синхронный HTTP), Сервис A публикует сообщение в очередь, а Сервис B потребляет его, когда готов. Это разделяет сервисы и повышает устойчивость.
Две доминирующие технологии служат этой цели:
RabbitMQ — Традиционный message broker. Сообщения маршрутизируются через exchange-и в очереди. После подтверждения (ACK) потребителем сообщение удаляется из очереди. Лучше для распределения задач и паттернов request/reply.
Apache Kafka — Распределённая платформа event streaming. Сообщения (события) добавляются в партиции topic-ов и хранятся настраиваемое время. Потребители отслеживают свою позицию (offset) и могут воспроизводить события. Лучше для event sourcing, data pipeline-ов и потокового streaming с высоким throughput.
Концепции RabbitMQ для тестировщиков
Exchange-и, очереди и привязки
Producer → Exchange → Binding → Queue → Consumer
| Тип Exchange | Поведение маршрутизации |
|---|---|
| Direct | Маршрутизирует в очереди с точным совпадением routing key |
| Fanout | Рассылает во все привязанные очереди |
| Topic | Маршрутизирует по совпадению wildcard-паттернов |
| Headers | Маршрутизирует по атрибутам заголовков сообщения |
Ключевые тестовые сценарии для RabbitMQ
1. Доставка сообщений:
- Producer отправляет сообщение — приходит ли оно в правильную очередь?
- Сообщение с routing key “order.created” — попадает ли в очередь “orders”, но не в “payments”?
2. Подтверждение потребителя:
- Потребитель обрабатывает сообщение и отправляет ACK — удаляется ли сообщение?
- Потребитель падает без ACK — доставляется ли сообщение повторно?
- Потребитель отправляет NACK — сообщение возвращается в очередь или уходит в DLQ?
3. Dead letter queue:
- Сообщение не удалось обработать 3 раза — перемещается ли в DLQ?
- DLQ содержит исходное сообщение с метаданными ошибки?
4. TTL сообщения:
- Сообщение с TTL 60 секунд истекает — удаляется или попадает в dead letter?
import pika
import json
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Объявляем очередь с DLQ
channel.queue_declare(
queue='orders',
arguments={
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': 'orders.dlq',
'x-message-ttl': 60000,
}
)
channel.queue_declare(queue='orders.dlq')
# Публикуем сообщение
channel.basic_publish(
exchange='',
routing_key='orders',
body=json.dumps({'order_id': 123, 'amount': 99.99}),
properties=pika.BasicProperties(
delivery_mode=2, # persistent
content_type='application/json',
)
)
# Потребляем и проверяем
method, properties, body = channel.basic_get('orders', auto_ack=False)
assert method is not None, "Нет сообщения в очереди"
order = json.loads(body)
assert order['order_id'] == 123
channel.basic_ack(method.delivery_tag)
Концепции Kafka для тестировщиков
Topic-и, партиции и consumer group-ы
Producer → Topic (Партиция 0, 1, 2, ...) → Consumer Group → Consumers
| Концепция | Описание |
|---|---|
| Topic | Именованный поток событий (например, “orders”, “payments”) |
| Партиция | Упорядоченная, неизменяемая последовательность событий внутри topic |
| Offset | Позиция сообщения внутри партиции |
| Consumer Group | Набор потребителей, разделяющих нагрузку topic |
| Ретенция | Как долго Kafka хранит сообщения |
Ключевые тестовые сценарии для Kafka
1. Продюсирование сообщений:
- Producer отправляет событие — появляется ли оно в правильном topic и партиции?
- Партиционирование по ключу — события с одинаковым ключом всегда попадают в одну партицию?
2. Порядок сообщений:
- События внутри одной партиции сохраняют порядок — проверьте, продюсируя A, B, C и потребляя в том же порядке.
- События между партициями не имеют гарантии порядка.
3. Consumer group-ы:
- Два потребителя в одной группе — каждая партиция потребляется ровно одним потребителем.
- Два потребителя в разных группах — оба получают все сообщения (паттерн pub/sub).
4. Consumer lag:
- Producer отправляет 1,000 сообщений, потребитель обработал 500 — consumer lag должен быть 500.
5. Семантика exactly-once:
- Producer с идемпотентными записями — дублирование приводит к одному сообщению в topic.
from kafka import KafkaProducer, KafkaConsumer
import json
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
)
producer.send(
'orders',
key='user-123',
value={'order_id': 456, 'status': 'created'}
)
producer.flush()
consumer = KafkaConsumer(
'orders',
bootstrap_servers='localhost:9092',
group_id='order-processor',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
consumer_timeout_ms=5000,
)
messages = []
for message in consumer:
messages.append(message.value)
if len(messages) >= 1:
break
assert messages[0]['order_id'] == 456
Тестирование сериализации сообщений
Сообщения должны корректно сериализоваться (в байты) и десериализоваться. Распространённые форматы:
| Формат | Плюсы | Минусы |
|---|---|---|
| JSON | Читаемый, гибкий | Нет контроля схемы, больший размер |
| Avro | Контроль схемы, компактный, поддерживает эволюцию | Требует schema registry |
| Protobuf | Быстрая сериализация, типизированный, компактный | Требует генерации кода |
Тестирование с Docker Compose
Настройте локальную инфраструктуру сообщений:
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
Упражнение: Лаборатория тестирования очередей сообщений
Часть 1: Тестирование RabbitMQ
Запустите RabbitMQ через Docker и протестируйте следующие сценарии:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Задание 1.1: Маршрутизация exchange
Создайте topic exchange “events” с двумя очередями:
- “orders_queue” с привязкой routing key “order.*”
- “payments_queue” с привязкой routing key “payment.*”
Опубликуйте сообщения и проверьте маршрутизацию:
- Сообщение с ключом “order.created” → должно попасть только в orders_queue.
- Сообщение с ключом “payment.completed” → должно попасть только в payments_queue.
- Сообщение с ключом “user.registered” → не должно попасть ни в одну очередь.
Задание 1.2: Dead letter queue
Настройте очередь orders с DLQ:
- Установите максимум попыток доставки — 3.
- Опубликуйте сообщение, которое потребитель намеренно отклоняет (NACK).
- Проверьте, что сообщение повторяется 3 раза, затем перемещается в DLQ.
- Проверьте сообщение в DLQ — содержит ли оно исходный payload и метаданные отклонения?
Задание 1.3: Персистентность сообщений
- Опубликуйте 10 сообщений в durable-очередь с persistent delivery mode.
- Перезапустите контейнер RabbitMQ.
- После перезапуска проверьте, что все 10 сообщений на месте.
Часть 2: Тестирование Kafka
Запустите Kafka через Docker Compose и протестируйте:
Задание 2.1: Порядок по партициям
- Создайте topic “test-orders” с 3 партициями.
- Отправьте 10 событий с ключом “user-A” и 10 с ключом “user-B”.
- Потребите все события и проверьте:
- Все события “user-A” в порядке относительно друг друга.
- Все события “user-B” в порядке относительно друг друга.
- События могут чередоваться между пользователями (разные партиции).
Задание 2.2: Consumer group-ы
- Запустите двух потребителей в одной группе (“order-processors”).
- Отправьте 100 сообщений в topic с 3 партициями.
- Проверьте, что каждый потребитель обрабатывает подмножество сообщений.
- Остановите одного потребителя и проверьте, что другой забирает все партиции (ребалансировка).
Задание 2.3: Мониторинг consumer lag
- Отправьте 1,000 сообщений в topic.
- Запустите медленного потребителя (с задержкой 100мс на сообщение).
- Используйте
kafka-consumer-groups.sh --describeдля мониторинга consumer lag. - Зафиксируйте, как lag меняется со временем по мере того, как потребитель догоняет.
Часть 3: Сравнительный отчёт
После выполнения обеих частей напишите сравнение:
| Критерий | RabbitMQ | Kafka |
|---|---|---|
| Хранение сообщений | Удаляется после ACK | Хранится по политике ретенции |
| Гарантия порядка | FIFO по очереди | Порядок по партиции |
| Возможность replay | Нет (сообщение удалено) | Да (потребитель сбрасывает offset) |
| Throughput | Тысячи/сек | Миллионы/сек |
| Лучший случай использования | Распределение задач, RPC | Event streaming, data pipeline-ы |
Включите наблюдения о сложности тестирования, усилиях на настройку и инструментах отладки для каждой платформы.