Глава 3. Отправка сообщений множеству водителей такси

Глава 2, Создание приложения Такси, содержала сведения о том, как соединяться с RabbitMQ и потреблять из него сообщения. Эта глава демонстрирует настройку значений предварительной выборки, которые определяют общее число сообщений, подлежащих отправке потребителю за один раз. Она также охватывает то как потребители могут вручную выдавать подтверждение на получение сообщений, либо получать эти сообщения без подтверждений приёма, причём первое делает возможной архитектуру с нулевыми потерями сообщений.

Командой CC (omplete Car) была запрошена новая функциональность, поскольку основной офис желает обладать возможностью отправлять информационные сообщения во все такси за один раз. Именно это является первой возможностью ввести веерный обмен, который выполняет маршрутизацию сообщений во все очереди, привязывая к ним безотносительные ключи маршрутизации.

Данная глава рассматривает следующие темы:

  • Работу с каналами и очередями

  • Определение значения предварительной выборки клиента

  • Подтверждение приёма сообщений

  • Публикация во все очереди

Технические требования

Файлы кода для этой главы можно отыскать на GitHub.

Работа с каналами и очередями

Водители и клиенты CC довольны функциональной возможностью запроса такси, которую мы раскрутили в Главе 2, Создание приложения Такси. Во- первых, была разъяснена публикация сообщений в непосредственном обмене для заказа потребителями отдельного такси, а затем были даны инструкции по реализации тематического обмена, который применяют клиенты при заказе такси с особыми требованиями. В обоих случаях такой клиент ограничен теми каналами, которые использовались для потребления конкретной очереди. Если канал был закрыт, такой клиент прекращал бы получение сообщений. По той причине, что некий канал не может быть повторно открыт и должен повторно создаваться с нуля, и каналы, и его потребление должны устанавливаться повторно, когда возникли какие- то проблемы.

Давайте пробежимся по некоторым важным моментам относительно потребителей и очередей в RabitMQ;

  • Очередь может иметь множество потребителей (пока не применяется тег исключительного использования).

  • Каждый канал способен обладать множеством потребителей.

  • Всякий потребитель использует ресурсы сервера, поэтому лучше быть уверенным в том, что не применяется слишком большое число потребителей.

  • Каналы являются полнодуплексными, что означает, что один канал может использоваться как для публикации, так и для потребления сообщений.

[Замечание]Замечание

Не существует логического ограничемя на ощее число каналов или потребителей, который способен обрабатывать некий брокер RabbitMQ. Тем не менее, имеются ограничивающие факторы, такие как доступная память, мощность ЦПУ брокера и полоса пропускания сетевой среды.

По мере мобилизации каждым из каналов памяти и потреблении мощности ЦПУ, в некоторых средах может рассматриваться ограничение на общее число каналов или потребителей. Назначенный администратор может настроить некое максимальное число каналов на соединение, воспользовавшись параметром channel_max.

Теперь настало время узнать как получать от потребителей как можно больше, устанавливая счётчик предварительной выборки.

Определение счётчика предварительной выборки потребителя

Общее число сообщений, отсылаемых потребителю одновременно может задаваться через значение счётчика prefetch. Такое значение счётчика предварительной выборки используется для того чтобы получать от своего потребителя как можно больше из возможного.

Когда счётчик предварительной выборки слишком мал, это может оказывать отрицательное воздействие на производительность RabbitMQ, ибо наша платформа обычно ожидает разрешения на отправку большего числа сообщений. Наша следующая схема отображает некий пример длительного времени простоя. Этот пример имеет prefetch равный единице, что означает, что RabbitMQ не будет отсыласть своё следующее сообщение пока после его доставки, не завершатся обработка и подтвержление приёма этого сообщения.

В этом примере время обработки составляет всего лишь 5 мс, при том что время доставки в оба конца отнимает 125 мс (60 мс + 60 мс + 5 мс):

 

Рисунок 3-1


Время прохода в обе стороны составляет 125 мс при времени обработки всего лишь 5 мс

Большой счётчик предварительной выборки делает отправку RabbitMQ множества сообщений из одной очереди одному потребителю. Когда все сообщения отосланы одному клиенту, он может переполниться и оставить всех прочих потребителей пустыми. Приводимая ниже схема отображает такого клиента, который получает большое число сообщений в то время как прочие простаивают:

 

Рисунок 3-2


Потребитель в состоянии простоя

В Главе 2, Создание приложения Такси, некие соединение, канал и потребитель были созданы в Ruby. Наш следующий блок кода показывает как настраивать некое значение предварительной выборки в Ruby:


require "bunny"
connection = Bunny.new ENV["RABBITMQ_URI"]

connection.start
channel = connection.create_channel 
channel.prefetch(1)
 	   

Обратите внимание, что этот пример оказывает установленным значение предварительной выборки один (1). Это означает, что его потребителю будет доставляться одно сообщение, пока этот потребитель не ответит на него ack:ed/ nack:ed. Настройки по умолчанию предварительной выборки RabbitMQ предоставляют неограниченный буфер для отправки такого числа сообщений потребителям, какое только возможно из того что они способны принимать. У самого потребителя его клиентская библиотека кэширует эти сообщения пока они не обработаны. Установка prefetch ограничивает общее число, которое ваш клиент способен получить прежде чем подтвердить их приём, интерпретируя их невидимыми для прочих пользователей и удаляя их из своей очереди.

[Совет]Совет

RabbitMQ поддерживает счётчики предварительной выборки уровня канала, на основе сообщений, а не как основанные на соединении и размере байт.

Теперь давайте рассмотрим как установить верное значение предварительной выборки.

Настройка верного значения предварительной выборки

В ситуации с одним или более потребителей, которые быстро обрабатывают сообщения, рекомендуется выполнять предварительную выборку большого числа собщений за раз, оставляя этого клиента настолько загруженным, насколько это возможно. Можно разделить общее воемя прохода в обе стороны на время обработки для каждого сообщения для получения некого оценочного значения предварительной выборки - если значение времени обработки остаётся тем же самым, а поведение вашей сетевой среды стабильным.

Низкое значение предварительной выборки рекомендуется в тех случаях, когда имеется большое число потребителей и короткое время обработки. Если установить значение предварительной выборки слишком низким, эти клиенты будут простаивать большую часть времени, ожидая возникновения сообщений. С другой стороны, когда значение предварительной выборки слишком высокое, один клиент может быть черезчур загруженным, в то время как прочие простаивают.

[Совет]Совет

Одной из типичных ошибок является допуск неограниченной предварительной выборки, когда один клиент получает все имеющиеся сообщения, что приводит к высокому потреблению памяти и крушениям, что вызывает повторную доставку всех сообщений.

В случае, когда имеется много потребителей и/или некое длительное время для обработки сообщений, рекомендуется чтобы ваше значение предварительной выборки устанавливалось равным единице (1) для равномерного распределения сообщений по всем клиентам.

[Совет]Совет

Когда ваши клиенты настроены на автоматическое подтверждение приёма сообщений, настройка предварительной выборки не оказывает никакого влияния.

Как уже упоминалось в Главе 2, Создание приложения Такси, некий клиент способен подтверждать приём доставки сообщения обратно своему брокеру. Пришло время посмотреть как это может быть сделано.

Подтверждение сообщений

Сообщение, которое перемещается между своим брокером и самим потребителем может быть утрачено в случае отказа соединения и важные сообщения, вероятно, требуется передавать повторно. Для того чтобы позволять серверу и клиентам узнавать когда повторно передавать сообщения служат подтверждения сообщений.

Существует два возможных способа подтверждений доставки сообщений - когда некий потребитель получает такое сообщение (автоматическое подтверждение, auto-ack), и когда потребитель отправляет обратно подтверждение получения ( подтверждение в явном виде/ вручную)ю При auto-ack подтверждение сообщения выставляется как только оно покидает свою очередь (и тем самым удаляется из этой очереди). Лучше всего выполнять auto-ack при необходимости высокой скорости сообщений, причём когда установленные соединения надёжны и когда не принимаются во внимание утраченные сообщения.

[Совет]Совет

Использование подтвержений приёма сообщений может иметь воздействие на производительность всей системы по сравнению с автоматическим подтверждением. Когда имеется целью быстрая пропускная способность, следует отключать подтверждение приёма вручную и вместо этого применять автоматическое подтверждение приёма.

В случае CC риск утраты сообщений не приемлем,а потому наш предыдущий код был изменён для установления подтверждения приёма на manual, что делает возможны определять когда подтверждать приём соответствующего сообщения:


queue.subscribe(block: true, manual_ack: true)
 	   

Это сообщение также должно выдать подтверждение приёма после его полной обработки:


channel.acknowledge(delivery_info.delivery_tag, false)
 	   

Как было показано, наш метод для выставления подтверждения вручную получает два аргумента - смый первый это delivery tag, а второй требуется когда требуется подтверждение приёмв за раз более одного сообщения. Тег доставки является специфичным для канала номером, который сам сервер использует для идентификации доставок. Критически важным является то, что подтверждения приёма потребителями своих сообщений выполняется в том же самом канале, вкотором они и получались, ибо не выполнение его возбуждает некую ошибку.

После изменения нашего кода и запуска самого приложения колонка скорости ack показывает не нулевые значения. Это происходит по той причине, что теперь применяются подтверждения приёма вручную и наш клиент RabbitMQ теперь отправляет сообщения ck по имеющемуся проводу своему брокеру. Это имеет свою стоимость в отношении использования полосы пропускания и общей производительности; однако когда приоритетом выступает гарантия отработки успешной доставки над скоростью, это вполне приемлемо.

[Совет]Совет

Когда имеется риск того, что отработка некого сообщения может отказать и брокеру требуется в конечном счёте повторно доставлять его, применяте потверждение приёма сообщения вручную. Повторная доставка сообщений не получивших подтверждение получения не происходит немедленно, если сообщение не было отклонено или канал не был закрыт.

Путешествие CC с RabbitMQ становится всё более увлекательным, в результате чего наша команда запрашивает новую функциональную возможность для отправки важных сведений всем водителям такси напрямую. Рассмотрим как реализовать такую новую функцию!

Публикация во всех очередях

Получив на руки такую новую функциональную возможность запроса, команда программистов CC подходит к новой всеобщей архитектуры обмена сообщений, отображаемой на следующей схеме. Приложение штаб- квартиры будет соединяться с RabbitMQ для публикации информационных сообщений всем водителям такси:

 

Рисунок 3-3


Приложение основного офиса в его архитектурном представлении

Для развёртывания этого, одним из способов может быть применение уже имеющейся темы сообщений и создания особой темы, на которую бубду подписаны все водители. Однако протокол AMQP редлагает ещё более ясный и простой подход - веерный обмен.

Веерный обмен

Веерный обмен берёт все приходящие сообщения и доставляет их во все очереди, которые привязаны к нему. Неким простым для уяснения примером того как применять веерный обмен это когда сообщения надлежит распространять большому числу участников, как при чате (тем не менее, вероятно имеются и более лучшие способы для истинных приложений чата).

Прочие примеры включают следующее:

  • Табло результатов или обновления лидеров для спортивных новостей или иных глобальных событий к мобильным клиентам

  • Широковещание различных сосотояний и обновлений конфигураций в распределённых системах

Как это отображено на нашей следующей схеме, наш веерный обмен направляет копию каждого получаемого им сообщения во все подключенные к нему очереди. Эта модель исключительно подходит для поведения с общедоступными адресами, которое и выступает целью нашей новой функциональнй возможности, варианта отправки отдельного сообщения всем водителям:

 

Рисунок 3-4


Маршрутизация веерного обмена ко всем привязанным очередям

Пришло время добавить такой веерный обмен в приложение CC.

Привязка к вееру

Штаб- квартира должна иметь возможность публикации отдельного сообщения для всех водителей такси. Это сообщение может содержать сведения о текущем трафике или сведения о вечеринке, которая произойдёт сегодня вечером. Поскольку эта новая широковещательная система будет применяться не часто, наша команда CC не будет заботиться о действенном управлении соединением, как это они предпринимали для своего основного приложения. На самом деле даже лучше при веерном обмене подключаться и отключаться для каждого взаимодействия, потому как в случае возникновения временных проблем в нашим брокером RubbitMQ, в конечном счётебудут успешными повторные попытки приложения штаб- квартиры.

Для того чтобы начать применять такой новый веерный обмен в своей штаб квартире, следует осуществить два шага: во- первых, объявить такой веерный обмен при запуске самого приложения, а затем привязать необходимую очередь к нему при регистрации пользователя.

Номер пять в нашем следующем примере показывает добавленный код в службу штаб- квартиры для публикации сообщений в этом новом обмене:


# 1. Require client library
require "bunny"

# 2. Read RABBITMQ_URI from ENV
connection = Bunny.new ENV["RABBITMQ_URI"]

# 3. Communication session with RabbitMQ
connection.start
channel = connection.create_channel

# 4. Declare queues for taxis
queue1 = channel.queue("taxi-inbox.1", durable: true)

queue2 = channel.queue("taxi-inbox.2", durable: true)

# 5. Declare a fanout exchange
exchange = channel.fanout("taxi-fanout")

# 6. Bind the queue 
queue1.bind(exchange, routing_key: "")
queue2.bind(exchange, routing_key: "")

# 7. Publish a message
exchange.publish("Hello everybody! This is an information message from the crew!", key: "")

# 8. Close the connection
connection.close
 	   

Основная логика в этом коде должна показаться знакомой. При привяке данной очереди в качестве ключа маршрутизации используется некая пустая строка. Это значение в действительности не имеет значения, ибо веерный обмен не заботится о ключах маршрутизации.

Обратите внимание, что exchange объявляется сразу перед её использованием. Это позволяет обойтись без того чтобы полагаться на предварительно имеющийся обмен в явном виде.В противном случае наше основное приложение должно было бы запускаться один раз для создания обмена перед тем как им сможет воспользоваться служба штаб- квартиры. Поскольку объявление самого обмена идемпотентно, его можно и нужно объявлять всякий раз.

[Замечание]Замечание

Будьте особенно внимательны к клиентским библиотекам AMQP, которые могут применять различные устанавливаемые по умолчанию значения для параметров обмена и очереди; лучше в явном виде определять все эти значения.

Не применяются те же самые очереди что и для примеров с непосредственным и тематическим обменами, так как эти Входящие очереди такси содержат только информационные сообщения. Вместо этого объявляются и привязываются к соответствующему обмену две новые очереди (taxi-inbox.1 и taxi-inbox.2).

[Совет]Совет

Пока нет строгой уверенности в том, что некий обмен или очередь будут изначально присутствовать, считайте что их нет и объявляйте их. Лучше перестраховаться, чем пожалеть потом, в особенности когда AMQP поощряет это и предоставляет для этого все необходимые средства.

Поместив этот кож куда следует, наше приложение штаб- квартиры теперь способно отправлять сообщения с общедоступными сведениями всем водителям. Это большой успех, который в очередной раз укрепил CC в своём решении развернуть RabbitMQ и применять его. Теперь запустим своё приложение.

Запуск приложения

Нет ничего впечатляющего на что стоит обратить внимание при запуске нашего приложения; сообщения из ншей штаб- квартиры успешно протекают в очереди Входящих водителей и единственным видимым изменением является то, что создан именно такой вновь созданный веерный обмен водителей.

Именно это можно наблюдать в консоли управления, что отображено на нашем следующем снимке экрана:

 

Рисунок 3-5


Веерный обмен для пользовательских очередей видимый в нашей консоли управления

В данный момент занятно взглянуть на саму привязку к любой конкретной очереди. Для этого кликните по закладке Queues, а затем прокрутить вниз и кликнуть по Bindings для отображения скрытого окна.

Это покажет что воспроизводится, как и на нашем приводимом ниже снимке экрана, на котором каждая очередь обладает несколькими привязками - одну для функции обмена сообщениями между пользователем и такси, одну для тематических сообщений и последнюю для функциональности веерной общедоступной адресации.

 

Рисунок 3-6


Каждая очередь такси обладает множеством привязок

Прежде чем закончить, давайте притормозим на секунду и насладимся тем фактом, что теперь имеется успешная интеграция запрсов такси, которая работает на всех платформах. Это может показаться не важным для тех, у кого имеется некий опыт работы с системами обмена сообщениями, однако это не что иное, как небольшое чудо. Благодаря AMQP и RabbitMQ наш брокер сообщенийможно заменять на любой иной брокер сообщений на основе AMQP, а также добавлять дополнительные услуги на любом выбранном языке программирования.

Выводы

Эта глава обсуждала предварительную выборку и подтверждение приёма потребителем вручную. Был введён веерный обмен для наличия возможности транслировать единственное сообщение во все активные очереди. Далее у CC имеются новые планы в отношении своей системы RabbitMQ - они хотят обладать возможностью беспрепятсвенно очищать старые сообщения и настраивать доставку сообщений. Они также желают обладать способностью отправки сообщений отдельным водителям из своей службы штаб - квартиры.

Перейдите к нашей следующей главе чтобы узнеать чем занята CC!