В этой статье мы будем мигрировать с классических очередей на кворум очереди в RabbitMQ

Зачем переезжать на quorum очереди

Начиная с версии 4.0 классические очереди будут удалены, а depricated они были еще в версии 3.9, весь текст ниже относится к версии RabbitMQ 3.12

image

  • Очереди кворума обладают более высоким уровнем пропускной способности почти во всех случаях использования. Очередь кворума может поддерживать пропускную способность 30000 сообщений (при использовании сообщений размером 1 кб), обеспечивая при этом высокий уровень безопасности данных и реплицируя их на все 3 узла в кластере. Классические зеркальные очереди обеспечивают лишь треть от этой пропускной способности и гораздо более низкий уровень безопасности данных
  • Очереди кворума более надежны, быстрее для большинства рабочих нагрузок и не требуют особого обслуживания.

Параметры которые необходимо учитывать при миграции на quorum

Не все параметры классических очередей на 100% совместимы с кворум очередями, ниже табличка с параметрами, мы пройдемся по самым критичным и разберемся с тем, что нам грозит если на них просто забить

image

Матрица немного отличается в зависимости от того с какой версии на какую вы переезжаете, учитывайте это

Non-durable queues

durable означает, что очередь будет сохранена после рестарта брокера, non-durable более не поддерживается, если это кому-то было нужно…

Exclusivity

Эксклюзивная очередь может использоваться только подключением объявившим эту очередь, такая очередь по своей природе является временной, потому что не может пережить “свое подключение”, более не поддерживается

Message priority

В сценарии когда консьюмер подключен к очереди в которую попадают приоритетные сообщения, такие сообщения не ожидают когда консьюмер подтвердит их получение, все сообщения доставляются немедленно

Так же есть интересная особенность таких сообщений:

Скажем у нас есть консьюмер qos которого равен 10, при попадании в очередь 10 сообщений без приоритета все они будут отправлены консьюмеру на обработку и qos будет заполнен Если тут же в очередь пападут еще 5 сообщений с приоритетом, им придется ждать обработки первых 10 сообщений, поскольку qos заполнен Так же на все это тратится большее колличесвто ресурсов вашего кластера

Такое поведение как минимум странно, и лучше всего использовать prefetch для ваших консьюмеров, явно устанавливая колличество сообщений которое они могут принять, в совокупности с явным подтверждением получения сообщений

Consumer exclusivity

Объявляется при подключении к очереди и означает, что у очереди может быть только один эксклюзивный консьюмер, сработает только если в очереди еще нет консьюмеров, более не поддерживается

Global QoS Prefetch

Про QoS мы говорили в message priority, но QoS бывает разный, глобальный и не глобальный

  • Не глобальный это QoS per consumer когда колличество сообщений которое консьюмер может принять устанавливается самим консьюмером

  • Глобальный устанавливается на весь канал, для всех консьюмеров использующих этот канал

Server-named queues

Имена очередей сгенерированные сервером автоматически, если имя не передано явно

Мигрируем

Возможные варианты миграции

  • Blue-Green Deployment Перезд между blue и green кластером, в этом случае необходимо подготовить полностью новый кластер “рядом” и переехать на него, мы выберем не этот путь, но подходы будут похожи

  • Миграция между виртуальными хостами в рамках одного кластера (то, что нам нужно)

  • Миграция в рамках одного кластера и одного виртуального хоста Отключаем всех продюсеров и консьюмеров, пересоздаем вхост, подключаем всех обратно (такой метод едва ли подойдет кому то в продакшн среде)

Все манипуляции будут выполнены на тестовом кластере kubernetes и rabbitmq, но максимально приближены к реальным условиям и основаны на реальных событиях

Миграция между виртуальными хостами в рамках одного кластера

У нас имеется:

  • Классический вхост

  • Немного тестовых очередей image

  • Немного тестовых эксченджей image

  • Два пакетика мескалина

Создаем quorum vhost

Все новые очереди наследуют свой тип от вхоста, если этого не указать явно

image

Имеем classic vhost (с него мы будем мигрировать), и quorum vhost (на него мы поедем)

image

Создаем федерацию между нашими вхостами

Проще всего проделать это в консоли любого пода RabbitMQ

rabbitmqctl set_parameter federation-upstream quorum-migration-upstream --vhost quorum '{"uri":"amqp:///classic", "trust-user-id":true}'

Таким образом федерация создается на quorum вхосте, а апстримом федерации является classic вхост

image

Создаем политику миграции

Что бы все заработало так как задумано и сообщения отправлялись из classic в quorum vhost нужна соответствующая политика

Virtual host: quorum
Name: federate-to-quorum
Pattern: ^example-queue-.*
Apply to: exchanges and queues
Priority: 1
Definition: federation-upstream-set = all

image

Переносим очереди, эксченджи и пользователей

Сообщения из старого (classic) вхоста будут попадать в новый (quorum) вхост только при условии что и там и там созданы одинаковые очереди и эксченджи Что бы не переносить все руками (очередй могут быть тысячи), воспользуемся встроенным механизмом, доступным из коробки, Export definitions / Import definitions

Выполним export из classic vhost

image

Теперь import полученного json в quorum vhost

image

Очереди перенеслись как quorum, унаследовав тип от вхоста
federate-to-quorum говорит о том, что политика была настроенна правильно и наша федерация работает корректно

image

Так же убедиться, что федерация работает и очереди федерируются можно на страничке Federation status

image

Переключаем консьюмеров

На этом этапе представим, что все происходит на “боевом” кластере работающим в проде
Мы подготовили все, что было нужно, теперь переключим консьюмеров на использование нового quorum вхоста (продюсеры пока остаются на старом classic вхосте)

Запустим консьюмера на go, подключим его к новому quorum вхосту и будем слушать очередь example-queue-1

package main

import (
	"fmt"
	"github.com/streadway/amqp"
)

func main() {

	fmt.Println("RabbitMQ in Golang: Getting started tutorial")

	connection, err := amqp.Dial("amqp://default_user_Y7DEkXxrZCvuGmG_qac:uLoYngeeyIwboKlUCYpeoV-p0SIKxSaY@192.168.2.109:31131/quorum")

	if err != nil {
		panic(err)
	}

	defer connection.Close()
	
	fmt.Println("Successfully connected to RabbitMQ instance")

	// opening a channel over the connection established to interact with RabbitMQ

	channel, err := connection.Channel()
	if err != nil {
		panic(err)
	}

	defer channel.Close()

	// declaring consumer with its properties over channel opened

	msgs, err := channel.Consume(
		"example-queue-1", // queue
		"", // consumer
		true, // auto ack
		false, // exclusive
		false, // no local
		false, // no wait
		nil, //args
	)

	if err != nil {
		panic(err)
	}

	// print consumed messages from queue

	forever := make(chan bool)

	go func() {

		for msg := range msgs {
			fmt.Printf("Received Message: %s\n", msg.Body)
		}

	}()

	fmt.Println("Waiting for messages...")

	<-forever

}

image

Попробуем что-то отправить в эту очередь, запустим продюссера и подлючим его к classic вхосту

package main

import (
	"fmt"
	"github.com/streadway/amqp"
)

func main() {

	fmt.Println("RabbitMQ in Golang: Getting started tutorial")
	connection, err := amqp.Dial("amqp://default_user_Y7DEkXxrZCvuGmG_qac:uLoYngeeyIwboKlUCYpeoV-p0SIKxSaY@192.168.2.109:31131/classic")

	if err != nil {
		panic(err)
	}

	defer connection.Close()

	fmt.Println("Successfully connected to RabbitMQ instance")

	// opening a channel over the connection established to interact with RabbitMQ

	channel, err := connection.Channel()

	if err != nil {
		panic(err)
	}

	defer channel.Close()

	// declaring queue with its properties over the the channel opened

	queue, err := channel.QueueDeclare(
		"example-queue-1", // name
		true,  // durable
		false, // auto delete
		false, // exclusive
		false, // no wait
		nil,   // args
	)

	if err != nil {
		panic(err)
	}

	// publishing a message

	err = channel.Publish(

		"",                // exchange
		"example-queue-1", // key
		false,             // mandatory
		false,             // immediate

		amqp.Publishing{
			ContentType: "text/plain",
			Body: []byte("Test Message from classic vhost"),
		},
	)

	if err != nil {
		panic(err)
	}

	fmt.Println("Queue status:", queue)
	fmt.Println("Successfully published message")

}

image

Успех, мы получили сообщение отправленное в классическую очередь
На этом этапе все наши консьюмеры уже переключены на новые очереди и получают сообщения от продюсеров работающих на старых очередях

Переключаем продюсеров

Остается переключить продюсеров, миграция завершена, самое время открывать пиво :)

В случае с Blue-Green нужно будет проделать все то же самое, только федерация будет настраиваться не между вхостами, а между кластерами