В этой статье мы будем мигрировать с классических очередей на кворум очереди в RabbitMQ
Зачем переезжать на quorum очереди
Начиная с версии 4.0 классические очереди будут удалены, а depricated они были еще в версии 3.9, весь текст ниже относится к версии RabbitMQ 3.12
- Очереди кворума обладают более высоким уровнем пропускной способности почти во всех случаях использования. Очередь кворума может поддерживать пропускную способность 30000 сообщений (при использовании сообщений размером 1 кб), обеспечивая при этом высокий уровень безопасности данных и реплицируя их на все 3 узла в кластере. Классические зеркальные очереди обеспечивают лишь треть от этой пропускной способности и гораздо более низкий уровень безопасности данных
- Очереди кворума более надежны, быстрее для большинства рабочих нагрузок и не требуют особого обслуживания.
Параметры которые необходимо учитывать при миграции на quorum
Не все параметры классических очередей на 100% совместимы с кворум очередями, ниже табличка с параметрами, мы пройдемся по самым критичным и разберемся с тем, что нам грозит если на них просто забить
Матрица немного отличается в зависимости от того с какой версии на какую вы переезжаете, учитывайте это
Non-durable queues
durable означает, что очередь будет сохранена после рестарта брокера, non-durable более не поддерживается, если это кому-то было нужно…
Exclusivity
Эксклюзивная очередь может использоваться только подключением объявившим эту очередь, такая очередь по своей природе является временной, потому что не может пережить “свое подключение”, более не поддерживается
Message priority
В сценарии когда консьюмер подключен к очереди в которую попадают приоритетные сообщения, такие сообщения не ожидают когда консьюмер подтвердит их получение, все сообщения доставляются немедленно
Так же есть интересная особенность таких сообщений:
Скажем у нас есть консьюмер qos которого равен 10, при попадании в очередь 10 сообщений без приоритета все они будут отправлены консьюмеру на обработку и qos будет заполнен Если тут же в очередь пападут еще 5 сообщений с приоритетом, им придется ждать обработки первых 10 сообщений, поскольку qos заполнен Так же на все это тратится большее колличесвто ресурсов вашего кластера
Такое поведение как минимум странно, и лучше всего использовать prefetch для ваших консьюмеров, явно устанавливая колличество сообщений которое они могут принять, в совокупности с явным подтверждением получения сообщений
Consumer exclusivity
Объявляется при подключении к очереди и означает, что у очереди может быть только один эксклюзивный консьюмер, сработает только если в очереди еще нет консьюмеров, более не поддерживается
Global QoS Prefetch
Про QoS мы говорили в message priority, но QoS бывает разный, глобальный и не глобальный
Не глобальный это QoS per consumer когда колличество сообщений которое консьюмер может принять устанавливается самим консьюмером
Глобальный устанавливается на весь канал, для всех консьюмеров использующих этот канал
Server-named queues
Имена очередей сгенерированные сервером автоматически, если имя не передано явно
Мигрируем
Возможные варианты миграции
Blue-Green Deployment Перезд между blue и green кластером, в этом случае необходимо подготовить полностью новый кластер “рядом” и переехать на него, мы выберем не этот путь, но подходы будут похожи
Миграция между виртуальными хостами в рамках одного кластера (то, что нам нужно)
Миграция в рамках одного кластера и одного виртуального хоста Отключаем всех продюсеров и консьюмеров, пересоздаем вхост, подключаем всех обратно (такой метод едва ли подойдет кому то в продакшн среде)
Все манипуляции будут выполнены на тестовом кластере kubernetes и rabbitmq, но максимально приближены к реальным условиям и основаны на реальных событиях
Миграция между виртуальными хостами в рамках одного кластера
У нас имеется:
Классический вхост
Немного тестовых очередей
Немного тестовых эксченджей
Два пакетика мескалина
Создаем quorum vhost
Все новые очереди наследуют свой тип от вхоста, если этого не указать явно
Имеем classic vhost (с него мы будем мигрировать), и quorum vhost (на него мы поедем)
Создаем федерацию между нашими вхостами
Проще всего проделать это в консоли любого пода RabbitMQ
rabbitmqctl set_parameter federation-upstream quorum-migration-upstream --vhost quorum '{"uri":"amqp:///classic", "trust-user-id":true}'
Таким образом федерация создается на quorum вхосте, а апстримом федерации является classic вхост
Создаем политику миграции
Что бы все заработало так как задумано и сообщения отправлялись из classic в quorum vhost нужна соответствующая политика
Virtual host: quorum
Name: federate-to-quorum
Pattern: ^example-queue-.*
Apply to: exchanges and queues
Priority: 1
Definition: federation-upstream-set = all
Переносим очереди, эксченджи и пользователей
Сообщения из старого (classic) вхоста будут попадать в новый (quorum) вхост только при условии что и там и там созданы одинаковые очереди и эксченджи Что бы не переносить все руками (очередй могут быть тысячи), воспользуемся встроенным механизмом, доступным из коробки, Export definitions / Import definitions
Выполним export из classic vhost
Теперь import полученного json в quorum vhost
Очереди перенеслись как quorum, унаследовав тип от вхоста
federate-to-quorum говорит о том, что политика была настроенна правильно и наша федерация работает корректно
Так же убедиться, что федерация работает и очереди федерируются можно на страничке Federation status
Переключаем консьюмеров
На этом этапе представим, что все происходит на “боевом” кластере работающим в проде
Мы подготовили все, что было нужно, теперь переключим консьюмеров на использование нового quorum вхоста (продюсеры пока остаются на старом classic вхосте)
Запустим консьюмера на go, подключим его к новому quorum вхосту и будем слушать очередь example-queue-1
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
fmt.Println("RabbitMQ in Golang: Getting started tutorial")
connection, err := amqp.Dial("amqp://default_user_Y7DEkXxrZCvuGmG_qac:uLoYngeeyIwboKlUCYpeoV-p0SIKxSaY@192.168.2.109:31131/quorum")
if err != nil {
panic(err)
}
defer connection.Close()
fmt.Println("Successfully connected to RabbitMQ instance")
// opening a channel over the connection established to interact with RabbitMQ
channel, err := connection.Channel()
if err != nil {
panic(err)
}
defer channel.Close()
// declaring consumer with its properties over channel opened
msgs, err := channel.Consume(
"example-queue-1", // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, //args
)
if err != nil {
panic(err)
}
// print consumed messages from queue
forever := make(chan bool)
go func() {
for msg := range msgs {
fmt.Printf("Received Message: %s\n", msg.Body)
}
}()
fmt.Println("Waiting for messages...")
<-forever
}
Попробуем что-то отправить в эту очередь, запустим продюссера и подлючим его к classic вхосту
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
fmt.Println("RabbitMQ in Golang: Getting started tutorial")
connection, err := amqp.Dial("amqp://default_user_Y7DEkXxrZCvuGmG_qac:uLoYngeeyIwboKlUCYpeoV-p0SIKxSaY@192.168.2.109:31131/classic")
if err != nil {
panic(err)
}
defer connection.Close()
fmt.Println("Successfully connected to RabbitMQ instance")
// opening a channel over the connection established to interact with RabbitMQ
channel, err := connection.Channel()
if err != nil {
panic(err)
}
defer channel.Close()
// declaring queue with its properties over the the channel opened
queue, err := channel.QueueDeclare(
"example-queue-1", // name
true, // durable
false, // auto delete
false, // exclusive
false, // no wait
nil, // args
)
if err != nil {
panic(err)
}
// publishing a message
err = channel.Publish(
"", // exchange
"example-queue-1", // key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Test Message from classic vhost"),
},
)
if err != nil {
panic(err)
}
fmt.Println("Queue status:", queue)
fmt.Println("Successfully published message")
}
Успех, мы получили сообщение отправленное в классическую очередь
На этом этапе все наши консьюмеры уже переключены на новые очереди и получают сообщения от продюсеров работающих на старых очередях
Переключаем продюсеров
Остается переключить продюсеров, миграция завершена, самое время открывать пиво :)
В случае с Blue-Green нужно будет проделать все то же самое, только федерация будет настраиваться не между вхостами, а между кластерами