Очереди сообщений в современной архитектуре

Микросервисы общаются асинхронно через очереди сообщений и платформы event streaming. Вместо прямого вызова Сервиса B Сервисом A (синхронный HTTP), Сервис A публикует сообщение в очередь, а Сервис B потребляет его, когда готов. Это разделяет сервисы и повышает устойчивость.

Две доминирующие технологии служат этой цели:

RabbitMQ — Традиционный message broker. Сообщения маршрутизируются через exchange-и в очереди. После подтверждения (ACK) потребителем сообщение удаляется из очереди. Лучше для распределения задач и паттернов request/reply.

Apache Kafka — Распределённая платформа event streaming. Сообщения (события) добавляются в партиции topic-ов и хранятся настраиваемое время. Потребители отслеживают свою позицию (offset) и могут воспроизводить события. Лучше для event sourcing, data pipeline-ов и потокового streaming с высоким throughput.

Концепции RabbitMQ для тестировщиков

Exchange-и, очереди и привязки

Producer → Exchange → Binding → Queue → Consumer
Тип ExchangeПоведение маршрутизации
DirectМаршрутизирует в очереди с точным совпадением routing key
FanoutРассылает во все привязанные очереди
TopicМаршрутизирует по совпадению wildcard-паттернов
HeadersМаршрутизирует по атрибутам заголовков сообщения

Ключевые тестовые сценарии для RabbitMQ

1. Доставка сообщений:

  • Producer отправляет сообщение — приходит ли оно в правильную очередь?
  • Сообщение с routing key “order.created” — попадает ли в очередь “orders”, но не в “payments”?

2. Подтверждение потребителя:

  • Потребитель обрабатывает сообщение и отправляет ACK — удаляется ли сообщение?
  • Потребитель падает без ACK — доставляется ли сообщение повторно?
  • Потребитель отправляет NACK — сообщение возвращается в очередь или уходит в DLQ?

3. Dead letter queue:

  • Сообщение не удалось обработать 3 раза — перемещается ли в DLQ?
  • DLQ содержит исходное сообщение с метаданными ошибки?

4. TTL сообщения:

  • Сообщение с TTL 60 секунд истекает — удаляется или попадает в dead letter?
import pika
import json

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# Объявляем очередь с DLQ
channel.queue_declare(
    queue='orders',
    arguments={
        'x-dead-letter-exchange': '',
        'x-dead-letter-routing-key': 'orders.dlq',
        'x-message-ttl': 60000,
    }
)
channel.queue_declare(queue='orders.dlq')

# Публикуем сообщение
channel.basic_publish(
    exchange='',
    routing_key='orders',
    body=json.dumps({'order_id': 123, 'amount': 99.99}),
    properties=pika.BasicProperties(
        delivery_mode=2,  # persistent
        content_type='application/json',
    )
)

# Потребляем и проверяем
method, properties, body = channel.basic_get('orders', auto_ack=False)
assert method is not None, "Нет сообщения в очереди"
order = json.loads(body)
assert order['order_id'] == 123
channel.basic_ack(method.delivery_tag)

Концепции Kafka для тестировщиков

Topic-и, партиции и consumer group-ы

Producer → Topic (Партиция 0, 1, 2, ...) → Consumer Group → Consumers
КонцепцияОписание
TopicИменованный поток событий (например, “orders”, “payments”)
ПартицияУпорядоченная, неизменяемая последовательность событий внутри topic
OffsetПозиция сообщения внутри партиции
Consumer GroupНабор потребителей, разделяющих нагрузку topic
РетенцияКак долго Kafka хранит сообщения

Ключевые тестовые сценарии для Kafka

1. Продюсирование сообщений:

  • Producer отправляет событие — появляется ли оно в правильном topic и партиции?
  • Партиционирование по ключу — события с одинаковым ключом всегда попадают в одну партицию?

2. Порядок сообщений:

  • События внутри одной партиции сохраняют порядок — проверьте, продюсируя A, B, C и потребляя в том же порядке.
  • События между партициями не имеют гарантии порядка.

3. Consumer group-ы:

  • Два потребителя в одной группе — каждая партиция потребляется ровно одним потребителем.
  • Два потребителя в разных группах — оба получают все сообщения (паттерн pub/sub).

4. Consumer lag:

  • Producer отправляет 1,000 сообщений, потребитель обработал 500 — consumer lag должен быть 500.

5. Семантика exactly-once:

  • Producer с идемпотентными записями — дублирование приводит к одному сообщению в topic.
from kafka import KafkaProducer, KafkaConsumer
import json

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None,
)

producer.send(
    'orders',
    key='user-123',
    value={'order_id': 456, 'status': 'created'}
)
producer.flush()

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers='localhost:9092',
    group_id='order-processor',
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    consumer_timeout_ms=5000,
)

messages = []
for message in consumer:
    messages.append(message.value)
    if len(messages) >= 1:
        break

assert messages[0]['order_id'] == 456

Тестирование сериализации сообщений

Сообщения должны корректно сериализоваться (в байты) и десериализоваться. Распространённые форматы:

ФорматПлюсыМинусы
JSONЧитаемый, гибкийНет контроля схемы, больший размер
AvroКонтроль схемы, компактный, поддерживает эволюциюТребует schema registry
ProtobufБыстрая сериализация, типизированный, компактныйТребует генерации кода

Тестирование с Docker Compose

Настройте локальную инфраструктуру сообщений:

services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"

  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

Упражнение: Лаборатория тестирования очередей сообщений

Часть 1: Тестирование RabbitMQ

Запустите RabbitMQ через Docker и протестируйте следующие сценарии:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Задание 1.1: Маршрутизация exchange

Создайте topic exchange “events” с двумя очередями:

  • “orders_queue” с привязкой routing key “order.*”
  • “payments_queue” с привязкой routing key “payment.*”

Опубликуйте сообщения и проверьте маршрутизацию:

  1. Сообщение с ключом “order.created” → должно попасть только в orders_queue.
  2. Сообщение с ключом “payment.completed” → должно попасть только в payments_queue.
  3. Сообщение с ключом “user.registered” → не должно попасть ни в одну очередь.

Задание 1.2: Dead letter queue

Настройте очередь orders с DLQ:

  1. Установите максимум попыток доставки — 3.
  2. Опубликуйте сообщение, которое потребитель намеренно отклоняет (NACK).
  3. Проверьте, что сообщение повторяется 3 раза, затем перемещается в DLQ.
  4. Проверьте сообщение в DLQ — содержит ли оно исходный payload и метаданные отклонения?

Задание 1.3: Персистентность сообщений

  1. Опубликуйте 10 сообщений в durable-очередь с persistent delivery mode.
  2. Перезапустите контейнер RabbitMQ.
  3. После перезапуска проверьте, что все 10 сообщений на месте.

Часть 2: Тестирование Kafka

Запустите Kafka через Docker Compose и протестируйте:

Задание 2.1: Порядок по партициям

  1. Создайте topic “test-orders” с 3 партициями.
  2. Отправьте 10 событий с ключом “user-A” и 10 с ключом “user-B”.
  3. Потребите все события и проверьте:
    • Все события “user-A” в порядке относительно друг друга.
    • Все события “user-B” в порядке относительно друг друга.
    • События могут чередоваться между пользователями (разные партиции).

Задание 2.2: Consumer group-ы

  1. Запустите двух потребителей в одной группе (“order-processors”).
  2. Отправьте 100 сообщений в topic с 3 партициями.
  3. Проверьте, что каждый потребитель обрабатывает подмножество сообщений.
  4. Остановите одного потребителя и проверьте, что другой забирает все партиции (ребалансировка).

Задание 2.3: Мониторинг consumer lag

  1. Отправьте 1,000 сообщений в topic.
  2. Запустите медленного потребителя (с задержкой 100мс на сообщение).
  3. Используйте kafka-consumer-groups.sh --describe для мониторинга consumer lag.
  4. Зафиксируйте, как lag меняется со временем по мере того, как потребитель догоняет.

Часть 3: Сравнительный отчёт

После выполнения обеих частей напишите сравнение:

КритерийRabbitMQKafka
Хранение сообщенийУдаляется после ACKХранится по политике ретенции
Гарантия порядкаFIFO по очередиПорядок по партиции
Возможность replayНет (сообщение удалено)Да (потребитель сбрасывает offset)
ThroughputТысячи/секМиллионы/сек
Лучший случай использованияРаспределение задач, RPCEvent streaming, data pipeline-ы

Включите наблюдения о сложности тестирования, усилиях на настройку и инструментах отладки для каждой платформы.