Часть 3. Интеграция и персонализация

RabbitMQ не останавливается на AMQP и обменах. Существует много вариантов позволяющих некоторые интересные возможности интеграции. В данной части книги мы рассмотрим протоколы MQTT и STOMP, публикацию без сохранения состояния с применением HTTP и интеграцию RabbitMQ с PostgreSQL и InfluxDB.

Глава 9. Использование альтернативных протоколов

Эта глава обсуждает

  • Преимущества протокола MQTT и как его применять

  • Как использовать приложения на основе STOMP с RabbitMQ

  • Как напрямую взаимодействовать с веб браузером при помощи Web STOMP

  • Как публиковать сообщения в rabbitMQ поверх HTTP применяя statelessd

Хотя AMQP 0-9-1 разработан с целью выступать в качестве надёжного протокола, который поддерживает все потребности большинства приложений, которые взаимодействуют с RabbitMQ, имеются особые варианты применения в которых имеются и лучшие варианты. Например, имеющиеся ненадёжные сетевые среды мобильных устройств с высокой латентностью могут испытывать проблемы с AMQP. В отличие от AMQP протоколы без сохранения состояния могут быть слишком сложными для некоторых прикладных сред в которых приложения клиентов не имеют возможности сопровождать долговременные соединения, однако требуют публикации с высокой скоростью. Кроме того, некоторые приложения могут уже содержать поддержку обмена сообщениями, но при этом не пользоваться протоколом AMQP. В каждом из таких сценариев экосистема приложения и подключаемых модулей RabbitMQ позволяет ему продолжать оставаться центральным звеном в вашем решении обмена сообщениями.

В данной главе мы рассмотрим ряд альтернатив имеющемуся стандартному протоколу AMQP 0-9-1: протокол MQTT, который является идеальным для мобильных приложений; STOMP, упрощённая альтернатива AMQP; Web STOMP, разработанный для применения в веб браузерах; а также statelessd для высокоскоростной публикации сообщений.

MQTT и RabbitMQ

Протокол MQ Telemetry Transport (MQTT, транспортировки телеметрии MQ) является легковесным протоколом обмена сообщениями, у которого растёт популярность для мобильных приложений, а его поддержка распространяется на RabbitMQ с помощью подключаемого модуля. Созданный в качестве протокола публикации на основе подписки, MQTT был первоначально изобретён в 1999 году Энди Стэнфордом-Кларком из IBM и Ариеном Ниппером из Eurotech. MQTT был разработан для обмена сообщениями на устройствах с ограниченными ресурсами и в средах с низкой пропускной способностью, без уступок ограничений надёжности для обмена сообщениями. Хотя он и не обладает такими функциональными характеристиками как AMQP, взрывной рост мобильных приложений привёл к росту популярности MQTT в последние годы.

От мобильных приложений до интеллектуальных автомобилей и автоматизации дома - основное направление использования MQTT в последние годы захватывает заголовки новостей о технологиях. Facebook применяет MQTT для обмена сообщениями и уведомлениями в реальном времени в своих мобильных приложениях. В 2013 году Ford Motor Company объединилась с IBM для внедрения технологий интеллектуальных автомобилей с использованием линейки продуктов IBM MessageSight на базе MQTT для концептуальных автомобилей Ford Evo. Коммерческие продукты для автоматизации дома могут быть несколько дальше в дорожной карте, однако существует множество использующих MQTT систем с открытым исходным кодом и на основе открытых стандартов, таких как проект FunTechHouse. Кроме того, в 2013 году MQTT, как и AMQP 1.0 годом ранее, был принят в качестве открытого стандарта через некоммерческую организацию OASIS, которая работает над поощрением разработки и принятия открытых стандартов. Это предоставило MQTT в качестве открытой, не зависящей от производителя площадки для дальнейшего развития и контроля.

Следует ли вам рассмотреть MQTT как протокол для своего решения обмена сообщениями? Скорее всего вам стоит вначале рассмотреть все преимущества и недостатки: Воспользуется ли ваше решение преимуществом функции последнего завещания (LWT, Last Will and Testament) MQTT? (LWT позволяет клиентам определять некое сообщение, которое следует публиковать в случае, если ваш клиент внезапно отключается.) Или же вы можете столкнуться с ограничениями максимального размера сообщения MQTT в 256 МБ. Даже если подключаемый модуль MQTT RabbitMQ прозрачно выполняет трансляцию между MQTT и AMQP для ваших приложений, весьма полезно хорошее понимание процесса взаимодействия протоколов для надлежащей оценки как MQTT, так и AMQP.

Протокол MQTT

Между протоколами AMQ и MQTT имеется некое единообразие. В конце концов, большинство протоколов обмена сообщениями совместно решают одни и те же вызовы, такие как поддержка согласования соединений, включая аутентификацию и публикацию сообщения. Под своим капотом, тем не менее, все протоколы уложены по- разному. Вместо наличия конструкций уровня протокола, таких как обмены и очереди AQMP, MQTT ограничен издателями и подписчиками. Конечно, это ограничение оказывает меньшее воздействие когда вы применяете RabbitMQ, так как публикуемые в RabbitMQ сообщения MQTT трактуются как сообщения, публикуемые через AMQP, а подписчики рассматриваются как потребители AMQP.

Хотя RabbitMQ поддерживает MQTT сразу после установки, существуют различия в публикуемых сообщениях при их издании через MQTT и AMQP, которые подчёркивают имеющиеся значения предложений каждого из протоколов. Будучи протоколом с малым весом, MQTT лучше приспособлен для ограниченного оборудования без надёжных сетевых соединений, в то время как AMQP разработан для того чтобы быть более гибким, но при этом требует более устойчивых и надёжных сетевых сред. Если вы не будете учитывать эти различия, ваши приложения могут испытывать проблемы в приложения взаимодействия при использовании обоих протоколов в одном и том же решении обмена сообщениями. В данном разделе мы рассмотрим имеющуюся анатомию сообщения MQTT и то, какое воздействие она может оказывать на архитектуру вашего обмена сообщениями.

Структура сообщения

В своей основе MQTT является структурой сообщения рассматриваемым как сообщение команды во многом аналогично кадрам нижнего уровня AMQP. Сообщения команд являются той структурой данных нижнего уровня, которая формирует все данные в сообщениях MQTT. (Рисунок 9-1).

 

Рисунок 9-1


Анатомия сообщения команды MQTT

Сообщение MQTT имеет фиксированный двухбайтовый заголовок, который описывает данное сообщение. В первом байте заголовка выстраиваются четыре значения:

  1. Тип сообщения: четырёхбитное значение, которое указывает текущее действие для сообщения, аналогично кадру метода AMQP. Примерами типа являются CONNECT, PUBLISH и SUBSCRIBE.

  2. Флаг DUP: один бит, указывающий является ли данное сообщение доставляемым повторно вне зависимости от того обновляет или нет клиент или сервер доставку данного сообщения.

  3. Флаг QoS: значение из двух бит, применяемое для указания качества обслуживания сообщения. В MQTT QoS определяет должно ли сообщение быть доставлено не более одного раза, по крайней мере один раз или в точности один раз.

  4. Флаг Retain: флаг из одного бита, указывающий своему серверу должно ли сообщение оставляться (retained) при его публикации для всех текущих подписчиков. Брокер MQTT будет оставлять только самое последнее сообщение с установленным значением флага Retain, предоставляя механизм, позволяющий новым подписчикам всегда получать самое последнее хорошее сообщение. Допустим, вы применяете MQTT для какого- то мобильного приложения. Если данное приложение утрачивает своё соединение с имеющимся сервером RabbitMQ, получение самого последнего хорошего сообщения с применением свойства Retain делает возможным вашей прикладной программе знать о самом последнем хорошем сообщении, что позволит ему восстановить состояние синхронизации при повторном подключении.

Второй байт заголовка MQTT переносит установленный размер имеющейся полезной нагрузки данного сообщения. Сообщения MQTT имеют максимальный размер полезной нагрузки в 256 МБ. В противовес этому максимальный размер AMQP составляет 16 экзабайт, а RabbitMQ ограничивает размер сообщения до 2 ГБ. Максимальный размер сообщения MQTT иногда стоит принимать во внимание при создании вами решения обмена сообщениями, поскольку вам требуется создавать собственный протокол поверх MQTT для расщепления полезных нагрузок, превышающих по размеру 256 МБ на отдельные сообщения с последующей их реконструкцией на стороне подписчика.

[Замечание]Замечание

Согласно закона Маслоу, если ваш единственный инструмент - молоток, всякая проблема становится похожей на гвоздь. Очень просто применять некий протокол, подобный MQTT или AMQP в качестве молотка для взаимодействия между приложениями. Однако для разных типов задач могут иметься лучшие инструменты. Например, отправка больших сообщений, таких как содержимое видео или изображений поверх MQTT может быть проблематичной для мобильных приложений. Хотя MQTT превосходен при отправке сообщений меньшего размера, таких как данные состояния приложения, вы можете пожелать рассмотреть в качестве предпочтительного для выгрузки видео или фотографий HTTP 1.1 в случае приложений мобильных или встроенных устройств. При использовании MQTT для сообщений меньшего размера, он может показывать лучшую производительность нежели HTTP, однако когда дело доходит до передачи таких вещей как файлы, HTTP будет быстрее. Запросто можно упустить из виду HTTP, однако он поддерживает фрагментарную выгрузку файлов, что идеально подходит для больших данных медиа, передаваемых по менее надёжным сетевым средам. Большинство зрелых клиентских библиотек будет поддерживать эту функциональность без необходимости для вас создавать некий дополнительный уровень управления такой функциональностью, который потребовался бы вам при использовании MQTT.

Переменные заголовки

В полезной нагрузке сообщения некоторых сообщений команд MQTT имеются двоичные упакованные данные , содержащие подробности в структуре данных, именуемой переменными заголовками (ariable headers). Установленный формат переменных может отличаться от одного сообщения команд к другому. Например, переменные заголовки сообщения CONNECT содержат данные, позволяющие согласовать соединение, в то время как соответствующие переменные PUBLISH содержат значение предмета для публикуемого сообщения и какой- то уникальный идентификатор. В случае сообщения команды PUBLISH, его полезная нагрузка содержит свои переменные заголовки и непрозрачное сообщение уровня приложения (Рисунок 9-2).

 

Рисунок 9-2


Полезная нагрузка сообщения команды PUBLISH

Для значений в переменных заголовках, которые не имеют фиксированного размера, таких как название его темы, соответствующее значение имеет префиксом с двумя байтами, которые указывают на размер этого значения (Рисунок 9-3). Эта структура позволяет серверам, а также клиентам считывать и декодировать сообщения после того как они проходят потоком через сокет вместо того чтобы ожидать пока будет считано всё сообщение прежде чем выполнить его декодирование.

 

Рисунок 9-3


Структура поля topic-name переменной заголовков сообщения команды PUBLISH

Все значения в полях этой переменной определяются как применяющие строку с кодировкой UTF-8 и они допускают дину 32 кБ. Важно помнить, что все значения в этих переменных заголовков сообщения PUBLISH вычитаются из установленного максимума размера сообщения для самого размера. Например, если вы применяете для публикации название предмета my/very/long/topic, вы задействовали 23 доступных байта из общей полезной нагрузки сообщения, поэтому ваше сообщение может теперь в длину только 268 435 433 байта.

Публикация через MQTT

Названия тем MQTT могут предоставлять мощный инструмент маршрутизации для ваших сообщений. На самом деле они очень похожи на концепцию ключей маршрутизации, которые аналогичны по идее с ключами маршрутизации, которые используются в предметном обмене RabbitMQ, и при маршрутизации сообщений MQTT в RabbitMQ, применяется исключительно именно предметный обмен. Строка темы MQTT является поддерживающей пространство имён при помощи символа прямого слеша (левой косой черты, /) в качестве ограничителя при публикации сообщения. Чтобы проиллюстрировать как может применяться MQTT в RabbitMQ, давайте начнём с примера издателя MQTT, публикующего некоторое сообщение, которое потребляется AMQP.

Создание получателя сообщения

Чтобы создать очередь для публикации сообщения MQTT, которое подлежит маршрутизации, следующий пример из записной книжки IPython "7.1.2 Setup" создаёт некую очередь с названием mqtt-messages и связывает её с обменом amq.topic при помощи ключа маршрутизации #.


import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'mqtt-messages')    # Создаёт объект очереди rabbitpy		
        queue.declare()                                     # Объявляет очередь mqtt-messages
        queue.bind('amq.topic', '#')                        # Связывает объявленную очередь с обменом amq.topic
 	   

Данный обмен amq.topic является определённым по умолчанию обменом, который публикуют клиенты MQTT и, когда он опубликован, подключаемый модуль MQTT автоматически заменяет символы прямого слэша в полученном значении названия темы MQTT на точки для ключа маршрутизации AMQP.

Запустите данную записную книжку в своём сервере Записной книжки IPython и, после того как она запустится, мы создадим издателя MQTT на основе Python.

Создание издателя MQTT

Для взаимодействия с MQTT при помощи Python популярным выбором является mosquitto. Это некая асинхронная библиотека, которая служит для исполнения при помощи цикла ввода/ вывода с блокировкой, однако мы подделаем её некоторыми встроенными операциями, которые позволят ей взаимодействовать с RabbitMQ. Следующий пример кода взят из записной книжки "7.1.2 MQTT Publisher" и начинается с импорта самой библиотеки mosquito:


import mosquitto
 	   

Импортировав эту библиотеку, необходимо создать некий класс клиента mosquitto с каким- то уникальным названием для подключения данного клиента. В данном случае мы всего лишь применяем значение rmqid-test, однако для промышленного применения данная строка представление идентификатора процесса операционной системой будет хорошей идеей:


client = mosquitto.Mosquitto('rmqid-test')
 	   

Данный класс клиента имеет метод connect, в который вы можете передать информацию своего соединения для имеющегося сервера MQTT. Этот метод connect получает множество аргументов, в том числе значения hostname, port и keepalive. В данном примере определяется только имя хоста, а для значений port и keepalive применяются установленные по умолчанию значения.


client.connect('localhost')
 	   

Наша библиотека вернёт 0 в случае успешного соединения. некоторое возвращаемое значение, которое превышает 0 указывает что при соединении с сервером возникли проблемы.

Получив подключённого клиента вы теперь можете опубликовать какое- то сообщение, передав в его теме название, содержимое сообщения и значение QoS 1, указывающее что данное сообщение должно быть опубликовано по крайней мере один раз, ожидая некое подтверждение получения от RabbitMQ.


client.publish('mqtt/example', 'hello world from MQTT via Python', 1)
 	   

Поскольку вы теперь исполняете цикл ввода/ вывода с блокировкой, вам необходимо проинструктировать своего клиента обрабатывать события ввода/ вывода. Вызовите соответствующий метод client.loop() для обработки событий ввода/ вывода, что должно вернуть 0, указывающий на успех.


client.loop()
 	   

Теперь вы можете отсоединиться от RabbitMQ и запустить метод client.loop для обработки всех прочих событий ввода/ вывода.


client.disconnect()
client.loop()
 	   

Когда вы запустите эту записную книжку, вы должны были успешно опубликовать некое сообщение, которое было помещено в созданную очередь mqtt-messages, которую вы определили ранее. Давайте убедимся что всё это работает с rabbitpy.


# 7.1.2 MQTT Publisher
import mosquitto"

client = mosquitto.Mosquitto('rmqid-test')

client.connect('localhost')

client.publish('mqtt/example', 'hello world from MQTT via Python', 1)

client.loop()

client.disconnect()
client.loop()
 	   

Получение публикуемого через MQTT сообщения через AMQP

Записная книжка "7.1.2 Confirm MQTT Publish" содержит следующий код для извлечения сообщения из очереди mqtt-messages с помощью Basic.Get и он применяет метод Message.pprint() для вывода содержимого данного сообщения.


import rabbitpy

message = rabbitpy.get(queue_name='mqtt-messages')    # Извлекаем сообщение из RabbitMQ с помощью Basic.Get
if message:                                           # Вычисляем было ли выбрано сообщение
    message.pprint(True)                              # Выводим текст сообщения включая свойства
    message.ack()                                     # Подтверждение получения сообщения
else:
    print('No message in queue')                      # Если не было сообщения, даём знать об этом пользователю
 	   

Когда вы запустите это код, вы должны обнаружить полученное от RabbitMQ сообщение AMQP, которое было прозрачно переправлено из семантики MQTT в семантику AMQP.


Exchange: amq.topic

Routing Key: mqtt.example

Properties:

{'app_id': '',
 'cluster_id': '',
 'content_encoding': '',
 'content_type': '',
 'correlation_id': '',
 'delivery_mode': None,
 'expiration': '',
 'headers': {'x-mqtt-dup': False, 'x-mqtt-publish-qos': 1},
 'message_id': '',
 'message_type': '',
 'priority': None,
 'reply_to': '',
 'timestamp': None,
 'user_id': ''}

Body:

'hello world from MQTT via Python'
 	   

Значением ключа маршрутизации больше не является mqtt/example как значение названия темы, которое было опубликовано, однако именно это сообщение было опубликовано. RabbitMQ заменил значение прямого слэша на точку чтобы соответствовать установленной семантике предметного обмена. Также отметим, что таблица заголовков свойств данного сообщения AMQP содержит два значения - x-mqtt-dup и x-mqtt-publish-qos - с установленными значениями из значений заголовка сообщения MQTT PUBLISH.

Теперь, когда мы проверили своего издателя, давайте исследуем как подписчик MQTT ведёт себя с RabbitMQ.

Подписчики MQTT

Чтобы подключиться к RabbitMQ через MQTT для подписки на сообщения, RabbitMQ создаст некую новую очередь. Данная очередь получит наименование в формате mqtt-subscriber-[NAME]qos[N], где [NAME] это уникальное имя клиента, а [N] является уровнем QoS, устанавливаемым для данного соединения клиента. К примеру, некая очередь с названием mqtt-subscriber-facebookqos0 будет создана для подписчика с именем facebook и уровнем QoS установленным в 0. После того как очередь создана для запросов подписчика, она будет связана с самой темой запроса при помощи семантики AMQP ключа марщрутизации с разграничением точкой.

Подписчики могут привязываться к темам при помощи строки соответствия или шаблона соответствия с применением семантики, аналогичной связываниям ключа маршрутизации предметного обмена AMQP. Символ решётки (#) применяется для многоуровневого соответствия и в AMQP, и в MQTT, символ плюс (+) используется для соответствия одному уровню в ключе маршрутизации вместо применения звёздочки (*). Например, если вы опубликовали новые сообщения изображений поверх MQTT при помощи названий image/new/profile и image/new/gallery, подписчики MQTT смогут получать все сообщения изображений подписавшись на image/#, все новые сообщения изображений с подпиской image/new/+, а только новые изображения профиля по подписке image/new/profile.

Следующий пример, взятый из записной книжки "7.1.3 MQTT Subscriber", подключится к RabbitMQ через протокол MQTT, настроит себя в качестве подписчика и будет находиться в цикле пока не поступит сообщение. После того как данное сообщение будет получено, он отменит подписку и отключится от RabbitMQ. Для начала необходимо вложить библиотеки mosquitto и os:


import mosquitto
import os
 	   

Для получения текущего значения идентификатора процесса своего подписчика вы можете воспользоваться модулем os стандартной библиотеки Python, который позволит вам создать уникальное имя клиента MQTT при создании нового клиента mosquitto. Вы можете пожелать применять более случайное или более устойчивое именование своего подписчика для предотвращения дубликатов имён клиентов в промышленном коде, однако для данного примера должно хватить идентификатора процесса.


client = mosquitto.Mosquitto('Subscriber-%s' % os.getpid())
 	   

Теперь вы можете определить несколько методов обратных вызовов которые будут запускаться библиотекой mosquitto на каждом этапе исполнения. Чтобы начать вы можете создать некий обратный вызов, который вызывается при подключении данного клиента:


def on_connect(mosq, obj, rc):
    if rc == 0:
        print('Connected')
    else:
        print('Connection Error')
client.on_connect = on_connect
 	   

После того как сообщение MQTT доставлено, наш клиент mosquitto вызывает определённый обратный вызов on_message. Этот обратный вызов выводит информацию о данном сообщении, а затем данный клиент отменяет подписку.


def on_message(mosq, obj, msg):
    print('Topic: %s' % msg.topic)
    print('QoS: %s' % msg.qos)
    print('Retain: %s' % msg.retain)
    print('Payload: %s' % msg.payload)
    client.unsubscribe('mqtt/example')
client.on_message = on_message
 	   

Окончательный обратный вызов вызывается когда данный клиент отменяет подписку и он отключит данного клиента от RabbitMQ.


def on_unsubscribe(mosq, obj, mid):
    print("Unsubscribe with mid %s received." % mid)
    client.disconnect()
client.on_unsubscribe = on_unsubscribe
 	   

При наличии всех обратных вызовов определёнными, вы можете подключиться к RabbitMQ и подписаться на данную тему:


client.connect("127.0.0.1")
client.subscribe("mqtt/example", 0)
 	   

Наконец, вы мажете запустить цикл событий ввода/ вывода вызвав client.loop() и определив значение таймаута 1 секундой. Следующий код осуществит это, находясь в цикле пока client.loop() не вернёт 1 благодаря отключению от RabbitMQ.


while client.loop(timeout=1) == 0:
    pass
 	   

После того как вы открыли свою записную книжку, вы можете исполнить все элементы за раз, кликнув по ниспадающему меню Cell и выбрав Run All. Кликните по закладке "7.1.2 MQTT Publisher" и выберите Cell > Run All, опубликовав новое сообщение. В соответствующей закладке подписчика вы должны теперь наблюдать вывод аналогичный представленному на (Рисунке 9-4).

 

Рисунок 9-4


Вывод записной книжки IPython “MQTT Subscriber”

Как вы можете видеть, ключ маршрутизации с ограничителем точкой был преобразован обратно в название темы mqtt/example с прямой косой чертой в качестве ограничителя. При двунаправленном преобразовании из названия темы MQTT в ключ маршрутизации AMQP, RabbitMQ успешно соединяет мостом оба протокола прозрачным и натуральным образом для любого типа соединения клиента. При этом RabbitMQ не только создаёт мощную платформу для приложений MQTT, но также и создаёт более надёжную платформу обмена сообщениями, чем специфичные для самого протокола брокеры.


# 7.1.3 MQTT Subscriber
import mosquitto
import os

client = mosquitto.Mosquitto('Subscriber-%s' % os.getpid())

def on_connect(mosq, obj, rc):
    if rc == 0:
        print('Connected')
    else:
        print('Connection Error')
client.on_connect = on_connect

def on_message(mosq, obj, msg):
    print('Topic: %s' % msg.topic)
    print('QoS: %s' % msg.qos)
    print('Retain: %s' % msg.retain)
    print('Payload: %s' % msg.payload)
    client.unsubscribe('mqtt/example')
client.on_message = on_message

def on_unsubscribe(mosq, obj, mid):
    print(\"Unsubscribe with mid %s received.\" % mid)
    client.disconnect()
client.on_unsubscribe = on_unsubscribe

client.connect(\"127.0.0.1\")
client.subscribe(\"mqtt/example\", 0)

while client.loop(timeout=1) == 0:
    pass
 	   

Настройка подключаемого модуля MQTT

Отклоняясь от установленных основ MQTT вам может потребоваться персонализировать поведение MQTT чтобы соответствовать различным сторонам вашего кластера RabbitMQ, например, предоставить учётные данные особые для MQTT, или же некий специфичные настройки очереди для подписчиков. Для изменения этих или иных значений настроек вам потребуется изменять основной файл настроек RabbitMQ, rabbitmq.config.

Файл настроек RabbitMQ обычно расположен в /etc/rabbitmq/rabbitmq.config в системах на основе UNIX. В то время как большинство файлов настройки применяют упорядоченный формат данных, файл rabbitmq.config использует формат кода естественных для Erlang структур данных. Почти как массив объектов JSON, имеющаяся настройка RabbitMQ содержит строфу верхнего уровня для самого RabbitMQ, а затем строфы для каждого подключаемого модуля, который вы желаете настроить. В приводимом далее фрагменте кода, AMQP RabbitMQ выполняет ожидание по порту 5672, а подключаемый модуль MQTT установлен на ожидание по порту 1883.


[{rabbit,         [{tcp_listeners,    [5672]}]},
 {rabbitmq_mqtt,  [{tcp_listeners,    [1883]}]}].
 	   

Многие из установленных по умолчанию установок, такие как виртуальный хост и используемое по умолчанию имя пользователя и пароль для применяемого подключаемого модуля MQTT, дублируют установки по умолчанию для RabbitMQ. В отличие от AMQP, клиенты MQTT не способны выбирать какой из виртуальных хостов использовать. Хотя данное поведение и может быть изменено в последующих версиях, в настоящее время единственный способ изменить используемый клиентом MQTT виртуальный хост состоит в изменении применяемого по умолчанию значения прямой косой черты при помощи директивы vhost в соответствующей строфе настройки MQTT с / на нужное вам значение:


[{rabbitmq_mqtt, [{vhost, <<"/">}]}]
 	   

Хотя MQTT и предоставляет возможностей для аутентификации, могут иметься варианты использования когда это не желательно. На эти случаи ваш подключаемый модуль MQTT имеет установленную по умолчанию комбинацию имени пользователя и пароля из guest и guest. Эти установленный по умолчанию значения изменяются при помощи директив настройки default_user и default_pass. Если вы не хотите требовать аутентификацию для клиентов MQTT, вы можете запретить поведение с устанавливаемым по умолчанию пользователем посредством соответствующей директивы настройки allow_anonymous в значение false.

[Совет]Совет

Ваше прикладное решение MQTT может требовать различных установок для различных типов клиентов MQTT. Применение кластера RabbitMQ является одним из способов обхода этого ограничения располагая отдельный виртуальный хост и соответствующие настройки значения по умолчанию пользователя и пароля. Имея различные настройки для узла, вы можете разделять сообщения MQTT в некотором кластере RabbitMQ с тем, чтобы каждый узел принимал соединения MQTT настроенные с различными установленными по умолчанию установками. Не существует требования для универсальных установок по узлам кластера RabbitMQ.

Таблица 9-1 описывает все имеющиеся директивы подключаемого модуля MQTT и установленные для них значения по умолчанию. Эти значения напрямую воздействуют на соответствующее поведение данного подключаемого модуля MQTT безотносительно к клиентам MQTT и маршрутизации сообщения.

Таблица 9-1. Опции настройки подключаемого модуля MQTT
Директива Тип Описание Значение по умолчанию

allow_anonymous

Boolean

Разрешает клиентам MQTT подключения без аутентификации.

true

default_user

String

Имя пользователя, применяемое клиентом MQTT не представляющем полномочий аутентификации.

guest

default_password

String

Применяемый пароль для не представляющего полномочий аутентификации клиента MQTT.

guest

exchange

String

Предметный обмен для его применения при публикации сообщений MQTT.

amq.topic

prefetch

Integer

Установка числа предварительных выборок AMQP QoS для очередей ожидания MQTT.

10

ssl_listeners

Array

Порты TCP для ожидания MQTT поверх подключений SSL. Если определена строфа верхнего уровня rabbit, файл настроек должен содержать строфу настроек ssl_options.

[]

subscription_ttl

Integer

Определяемая в миллисекундах длительность удержания очереди подписчика после внезапного отключения подписчика.

1800000

tcp_listeners

Array

Порты TCP для ожидания MQTT подключений.

1833

tcp_listen_options

Array

Массив директив настроек для изменения поведения TCP данного подключаемого модуля MQTT.

См. Таблицу 9.2

Некоторые директивы, такие как exchange, prefetch и vhost скорее всего будут кандидатами для изменения вашего окружения, в то время как прочие, такие как tcp_listen_options следует регулировать с предосторожностями.

Таблица 9-2 описывает директивы tcp_listen_options из документации RabbitMQ и их воздействие на поведение соединения TCP для клиентов MQTT и самого подключаемого модуля MQTT. Эти значения являются подмножеством тех, которые предоставляет API TCP Erlang для регулировок сокета TCP. Для получения дополнительной документации о том какие ещё директивы доступны, обратитесь к документации gen_tcp. По причине того как работают настройки RabbitMQ, все определяемые в таком файле настройки прозрачно передаются в gen_tcp:start_link через параметр listen_option. В большинстве вариантов применения установленные по умолчанию значения для подключаемого модуля MQTT не следует изменять; это проверенные и оптимизированные значения, рекомендуемые командой RabbitMQ.

Таблица 9-2. tcp_listen_options для подключаемого модуля MQTT
Директива Тип Описание Значение по умолчанию

binary

Atom

Указывает что сокет является двоичным сокетом TCP. Не удаляйте.

N/A

packet

Atom

Регулирует как ядро Erlang обрабатывает данные TCP прежде чем отдать их в RabbitMQ. Для получения дополнительной информации обращайтесь к документации gen_tcp.

raw

reuseaddr

Boolean

Указывает операционной системе разрешать повторно использовать RabbitMQ данный сокет ожидания даже если этот сокет занят.

true

backlog

Integer

Определяет сколько ожидающих клиентов может иметься прежде чем отклонять (refusing) новые соединения. Ожидающие новые соединения являются новыми подключениями TCP, которые RabbitMQ ещё не успел обработать.

10

nodelay

Boolean

Указывает должен ли сокет TCP применять алгоритм Nagle ожидая агрегации данных TCP на нижнем уровне для более эффективного обмена данными. По умолчанию значение установлено в false, что делает возможным более быстрый обмен сообщениями MQTT в большинстве случаев за счёт отправки данных TCP когда пожелает того RabbitMQ, вместо того чтобы буферизировать пакеты сообщений меньшего размера и отправлять их совместными группами.

true

Чтобы подытожить, MQTT является мощным инструментом для облегчённого обмена сообщениями в постоянно развивающемся мире мобильных компьютеров и встраиваемых устройств. Если вы рассматриваете RabbitMQ как центральный узел решения обмена сообщениями, которая к тому же включает и мобильные устройства, вам настоятельно рекомендуется рассмотреть возможность использования MQTT и подключаемого модуля MQTT. Они не только обеспечивают прозрачное перемещение семантики MQTT в AMQP RabbitMQ по всему миру, но и прозрачно транслирует семантику AMQP клиентам MQTT, что упрощает разработку требующую унификации шин обмена сообщениями. Хотя недостатки настроек и отсекают сложные экосистемы в отдельном узле, предпринимаются усилия по расширению имеющегося подключаемого модуля MQTT для обеспечения им динамического виртуального хоста и использования обмена. Тем не менее, для создания более сложных топологий в кластере может применяться множество узлов RabbitMQ.

Если ваше решение обмена сообщениями не получает преимуществ от применения MQTT, но вам всё ещё требуется более легковесное решение, нежели AMQP для взаимодействия с RabbitMQ, возможно, вам подойдёт STOMP.

STOMP и RabbitMQ

Первоначально именовавшийся как TMPP, Streaming Text Oriented Message Protocol (STOMP, ориентированный на потоки протокол обмена сообщениями) был впервые описан в 2005 Брайаном МакКалистером. Своблодно смоделированный после HTTP, STOMP применяет простой для чтения протокол на основе текста. Изначально реализованный в Apache ActiveMQ и проектировавшийся с учётом простоты, STOMP в настоящее время поддерживается во множестве реализаций брокеров и имеет клиентские библиотеки для большинства языков программирования.

Спецификация STOMP 1.2 была выпущена в 2012 и поддерживается RabbitMQ совместно со своими предыдущими двумя версиями. Поддержка STOMP осуществляется неким подключаемым модулем, который распространяется как часть центрального пакета RabbitMQ. Как и в случае с AMQP и MQTT, понимание протокола STOMP может помочь определению вашего мнения о его использовании в вашем приложении или среде.

Протокол STOMP

Разработанные с целью обработки на основе потоков, кадры STOMPO являются тестом UTF-8, который состоит из команды и общей полезной нагрузки данной команды, которая завершается нулевым байтом (0x00). В отличие от двоичных протоколов AMQP и MQTT, STOMP читается человеком и не требует двоичной упаковки информации для определения кадрами сообщений STOMP и их содержимым.

К примеру, приводимый ниже фрагмент кода является кадром STOMP для подключения некоего сообщения к брокеру. Он применяет ^@, control-@ в ASCII, для представления значения нулевого байта в самом конце кадра.


CONNECT
accept-version:1.2
host:rabbitmq-node

^@
 	   

В данном примере команда CONNECT сообщает принимающему брокеру что данный клиент хотел бы выполнит подключение. За ней следуют два поля заголовка, accept-version и host, которые инструктируют самого брокера о том соединении, о котором желал бы договориться клиент. Наконец, за пустой строкой следует нулевой байт, что указывает на окончание данного кадра CONNECT.

Если запрос бул успешен, получивший его брокер возвращает некий кадр CONNECTED своему клиенту. Это кадр во многом напоминает сам кадр CONNECT:


CONNECTED
version:1.2

^@
 	   

Во многом как и команды AMQP, команды STOMP являются запросами в стиле RPC для своего брокера сообщений, причём некоторые из них имеют отклики для своего клиента. Стандартный набор команд STOMP охватывает понятия, аналогичные AMQP и MQTT, в том числе переговоры о соединении, публикацию сообщений, а также подписку на получение сообщений от брокера сообщений. Если вам понадобится дополнительная информация о самом протоколе, его спецификация доступна на странице протокола STOMP по адресу https://stomp.github.io/.

Чтобы проиллюстрировать как вы можете применять STOMP в RabbitMQ, давайте начнём с простой публикации сообщения.

Публикация сообщений

При публикации сообщений поверх STOMP, для описания того куда следует отправлять сообщение применяется понятие получателя (destination). При использовании в RabbitMQ, получателем STOMP может быть один из:

  • Автоматически создаваемая подключаемым модулем STOMP очередь когда некое публикуется сообщение или когда клиент отправляет запрос на подписку.

  • Создаваемая обычными средствами, такими как клиент AMQP или посредством управления API очередь.

  • Комбинация некоторого обмена и ключа маршрутизации.

  • Автоматическое соответствие обмену amq.topic с использованием соответствующего получателя topic.

  • Временная очередь при использовании заголовков reply-to в команде STOMP SEND.

Каждый из этих получателей ограничивается прямой косой чертой, отделяющей значение типа получателя, а также в большинстве случаев дополнительную информацию, определяющую обмен, ключ маршрутизации или очередь.

Для иллюстрации такого применения получателя и публикации, давайте начнём с отправки какого- то сообщения в очередь STOMP с применением библиотеки Python stomp.py.

[Замечание]Замечание

Подключаемый модуль STOMP действует как уровень трансляции или представления (proxy) в самом RabbitMQ. Раз так, он выступает в роли клиента AMQP, создаёт некий канал AMQP и вызывает запросы RPC AMQP в сам RabbitMQ. Издатели STOMP являются объектом того для того же самого ограничения скорости и блокировки соединения, который ограничивает издателей AMQP, за исключением того, что в самом протоколе STOMP не имеется никакой семантики, которая позволила бы вашему издателю знать что он блокирован или подвержен дросселированию.

Отправка в очередь, определяемую STOMP

Отправка сообщения через STOMP очень похожа на отправку сообщения через MQTT или AMQP. Чтобы отправить сообщение непосредственно в какую- то очередь, воспользуйтесь строкой получателя в формате /queue/<queue-name>.

В нашем следующем примере мы применим получателя /queue/stomp-messages. Отправка нашего сообщения этому получателю будет публиковать сообщения в очереди stomp-messages с применением установленного по умолчанию поведения обмена RabbitMQ. Если такая очередь ещё не существует, то она будет создана автоматически. Пример кода содержится в записной книжке "7.2.2 STOMP Publisher".


import stomp

conn = stomp.Connection()
conn.start()
conn.connect()
conn.send(body='Example Message', destination='/queue/stomp-messages')
conn.disconnect()
 	   

При создании данной очереди нашим подключаемым модулем STOMP, она будет создаваться с применением установленных по умолчанию значений аргументов посредством инкрементальных вызовов запросов RPC Queue.Declare внутренним образом. Это означает, что если у вас имеется некая очередь в вашем сервере RabbitMQ, которая уже была создана при помощи установленных по умолчанию значений, вы можете всё ещё выполнять в неё публикацию с применением установленной получателем очереди STOMP. Если у вас имеется очередь, которая была создана с неким TTL сообщения, или иными персональными аргументами, вам понадобится применять вместо неё очередь получателя, определяемую как некую AMQP.

Отправка в очередь, определяемую AMQP

Подключаемый модуль STOMP имеет расширенный синтаксис получателя, который имеет особенности для реализации RabbitMQ в STOMP, делая возможной публикацию определяемых AMQP очередей с персональными настройками. Для этого вам вначале требуется создать некую очередь с максимальной длиной при помощи библиотепки rabbitpy в записной книжке "7.2.2 Queue Declare".


import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'custom-queue',
                               arguments={'x-max-length': 10})
        queue.declare()
 	   

Теперь, когда необходимая очередь объявлена, вам нужно применить синтаксис получателя, определяющий AMQP очередь. Создав строку получателя в формате /amq/queue/<queue-name>, наш подключаемый модуль STOMP будет способен выполнять маршрутизацию сообщений в очередь custom-queue. Это пример находится в записной книжке "7.2.2 Custom Queue".


import stomp

conn = stomp.Connection()
conn.start()
conn.connect()
conn.send(body='Example Message', destination='/amq/queue/custom-queue')
conn.disconnect()
 	   

Основная проблема с отправкой в очередь напрямую заключается в том, что вы не можете получать преимуществ, которые получают сообщения AMQP от различных типов обменов и ключей маршрутизации. К счастью, наш подключаемый модуль STOMP делает возможной специально форматируемую строку получателя, которая и достигает этой цели.

Отправка в обмен

Для отправки некоторого сообщения в обмен при помощи ключа маршрутизации средствами подключаемого модуля STOMP RabbitMQ вы применяете формат /exchange/<exchange-name>/<routing.key>. Он позволяет вам выполнять публикацию в STOMP настолько же гибко, как если бы у вас была возможность применять AMQP.

Следующий пример из записной книжки "7.2.2 Exchange and Queue Declare" установит все вещи так, чтобы вы могли выполнять публикацию сообщения STOMP через некий персональный обмен.


import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        exchange = rabbitpy.Exchange(channel, 'stomp-routing')
        exchange.declare()
        queue = rabbitpy.Queue(channel, 'bound-queue',
                               arguments={'x-max-length': 10})
        queue.declare()
        queue.bind(exchange, 'example')
 	   

Имея объявленным некий обмен и связанную с этим обменом очередь, пора опубликовать сообщение в такой новой очереди. Следующий пример взят из записной книжки "7.2.2 Exchange Publishing".


import stomp

conn = stomp.Connection()
conn.start()
conn.connect()
conn.send(body='Example Message',
          destination='/exchange/stomp-routing/example')
conn.disconnect()
 	   

Имея за поясом данный пример вы несомненно должны получить гибкость маршрутизации обмена. Но вы можете пользоваться гибкостью маршрутизацией предметного обмена и без того чтобы объявлять некий обмен или применять более длинную строку получателя. Вместо этого вы можете использовать строку темы получателя STOMP.

Отправка в STOMP с темой

Строка темы получателя, как и строки получателя очереди, используют общий формат, распознаваемый всеми брокерами сообщений, которые поддерживают обсуждаемый протокол STOMP. Представляя строку получателя в формате /topic/<routing.key>, отправляемые через STOMP в RabbitMQ сообщения будут направляться через обмен amq.topic во все очереди, связанные с данным ключом маршрутизации.

Вместо создания некоей новой очереди вы можете выполнить связывание с созданной ранее очередью bound-queue своего обмена amq.topic, воспользовавшись ключом маршрутизации # для получения всех сообщений, отправляемых в этот обмен. Следующий пример кода для связывания с очередью взят из записной книжки "7.2.2 Bind Topic".


import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'bound-queue')
        queue.bind('amq.topic', '#')
 	   

Получив связанную очередь, вы теперь можете выполнять публикацию поверх STOMP, отправляя сообщение с ключом маршрутизации /topic/routing.key применяя пример из записной книжки "7.2.2 Topic Publishing".


import stomp

conn = stomp.Connection()
conn.start()
conn.connect()
conn.send(body='Example Message',
          destination='/exchange/stomp-routing/example')
conn.disconnect()
 	   

Между очередью, очередью amq, обменом и строками тем получателя охватывается большая часть вариантов применения публикации сообщений. Однако STOMP добавляет отличное свойство, которое реплицирует часть работы, которую мы уже выполняли в Главе 6. Если вы публикуете сообщение через STOMP и устанавливаете некое значение заголовка reply-to, ваш подключаемый модуль STOMP автоматически создаст некую очередь откликов RPC для вашего издателя чтобы потреблять из неё сообщения.

Применение очередей временного отклика

Когда ваше решение обмена сообщениями делает RPC вызов развития отношений между вашими издателями и потребителями, если вы применяете STOMP, существует удобное, встроенное в подключаемый модуль STOMP RabbitMQ поведение. При установке заголовка reply-to для отправки вами сообщения через STOMP, автоматически создаётся необходимая очередь отклика, причём устанавливаются флаги и исключительного использования и автоматического удаления для этой очереди с тем, чтобы только данное соединение издателя STOMP могло потреблять сообщения из данной очереди отклика. Кроме того, данная очередь отклика будет автоматически удалена из RabbitMQ если вы отключите своё приложение публикации.

Следующий пример из записной книжки "7.2.2 Reply-To" демонстрирует как настроить заголовок reply-to. В следующем разделе данной главы мы обсудим как потреблять сообщения через STOMP, поэтому в данный момент мы позволим данной очереди отклика автоматически удаляться после публикации нашего сообщения.


import stomp

conn = stomp.Connection()
conn.start()
conn.connect()
conn.send(body='Example Message',
          destination='/exchange/stomp-routing/example',
          headers={'reply-to': 'my-reply-queue'})
conn.disconnect()
 	   

Основной приготовляемый на вынос бонус установки заголовка reply-to состоит в том, что сообщения STOMP могут иметь производбные заголовки сообщения. Такие значения заголовка во многом аналогичны свойствам сообщения AMQP.

Свойства сообщения AMQP поверх STOMP

Заголовки сообщения STOMP позволяют вам передавать произвольные значения заголовка сообщения брокеру сообщений. Данная функциональность является тем, что позволяет функциональности reply-to автоматически создавать очереди отклика для издателей. Хотя произвольные значения заголовка сообщения могут быть полезными для ваших приложений, если вы рассматриваете среду со смешанными протоколами, которые применяют и STOMP, и AMQP, вы пожелаете рассмотреть более ограниченный набор заголовком сообщений, который будет доступен в обоих протоколах. Если вы применяете заголовки сообщения, которые соответствуют названиям свойств соответствующего сообщения AMQP, причём сам подключаемый модуль STOMP автоматически устанавливает соответствие имеющихся значений заголовка свойствам сообщения AMQP.

Наш следующий пример из записной книжки "7.2.2 Send with Message Headers" установит заголовки сообщения, которые будут преобразованы в свойства сообщения AMQP.


import stomp
import time

conn = stomp.Connection()
conn.start()
conn.connect()
conn.send(body='Example message with Headers',
          destination='/queue/stomp-messages',
          headers={'app-id': '7.2.2 Example',
                   'priority': 5,
                   'reply-to': 'reply-to-example',
                   'timestamp': int(time.time())})
conn.disconnect()
 	   

Если вы установите заголовки сообщения STOMP, которые не могут получить соответствия свойствам AMQP, этими значениями будут заполняться свойства сообщения headers для данного AMQP. Как вы и могли бы ожидать, после того как вы потребляете сообщения с заполненными свойствами сообщения AMQP, эти значения пройдут через значения заголовка сообщения STOMP.

для данного поведения имеется одно исключение - свойство сообщения AMQP message-id. Это значение автоматически устанавливается как часть самого протокола STOMP и не может быть установлено вручную при отправке в RabbitMQ при помощи самого протокола STOMP.

Публикация сообщений в RabbitMQ с применением STOMP имеет немного больше накладных расходов, чем при выполнении этого через AMQP, однако существует отличное средство для выполнения этого, такое как автоматическое создание очереди и авто- создание очередей откликов. Применяя различные форматы строк назначения, ваши очереди могут отправляться напрямую в очередь или публиковаться в некоем обмене точно так же как и в случае публикации сообщений AMQP. Ваш подключаемый модуль STOMP автоматически установит соответствие семантики AMQP в сообщения STOMP, а также семантики STOMP в сообщения AMQP.

Чтобы увидеть такое автоматическое установление соответствия в действии вы можете потреблять все сообщения, публикуемые в наших предыдущих примерах. В нашем следующем разделе мы будем потреблять сообщения через подписчиков STOMP, в том числе сообщения со значениями в заголовках.

Потребление сообщений

Аналогично MQTT, клиенты STOMP рассматриваются как подписчики взамен потребителям. Однако поскольку RabbitMQ является первым и находящимся впереди всех брокером AMQP, ваш подключаемый модуль STOMP трактует подписчиков STOMP как потребителей AMQP, которые получают свои сообщения из очередей RabbitMQ. Что происходит в большинстве случаев, когда вы подписываетесь через STOMP, так это то, что будет создана некая очередь для подлежащих из неё потреблению сообщений.

Одним из замечательных свойств подключаемого модуля STOMP заключается в том, что все имеющиеся типы строк получателей для отправки сообщений через STOMP присутствуют для подписки на сообщения в STOMP. В данном разделе вы примените строки назначения в подписчике STOMP для следующего:

  • Потребления сообщений из автоматически создаваемой очереди

  • Потребления сообщений из предварительно созданной очереди AMQP

  • Потребления сообщений подпиской на обмен

  • Потребления сообщений подпиской на тему STOMP

При отправке сообщения через STOMP вы можете установить заголовок reply-to, который будет автоматически создавать некую исключительно используемую очередь с автоматическим удалением для данного соединения STOMP. В противоположность этому, потребление сообщений reply-to обрабатывается таким же способом как потребление сообщений при подписке на некую очередь, определяемую STOMP.

Давайте начнём подписку на ту же самую очередь, в которую мы отправляли сообщения в своём прошлом разделе, в нашу очередь stomp-messages.

Подписка на определяемую STOMP очередь

Определяемые STOMP очереди являются очередями, которые создаются RabbitMQ когда сообщения публикуются через строку получателя в формате /queue/<queue-name>. В нашем предыдущем раделе мы публиковали сообщения в RabbitMQ при помощи команды STOMP send в нашей записной книжке "7.2.2 Stomp Publisher". В нашем следующем примере из записной книжки "7.2.3 Queue Subscriber" вы потребите эти сообщения, выводя тело сообщения для каждого отправленного сообщения. Поскольку код потребителя несколько сложнее, давайте в этом разделе пройдём его по шагам.

В начале импортируем все необходимые для запуска подписчика библиотеки:


import stomp
import pprint
import time
 	   

Библиотека stomp.py Python требует объекта ConnectionListener для обработки получаемых от брокера сообщений. Этот объект должен содержать некий метод on_message, который будет вызываться всякий раз, когда данный подписчик получает некое сообщение. В данном примере вы выведите заголовки, если они присутствуют, а также само сообщение. Кроме того, вы желаете получить только одно сообщение и выйти. В данной демонстрации ожидания вы добавите некий флаг с названием can_stop, который позволит вашему примеру знать когда остановиться.


class Listener(stomp.ConnectionListener):

    can_stop = False
    def on_message(self, headers, message):
        if headers:
            print('\nHeaders:\n')
            pprint.pprint(headers)
        print('\nMessage Body:\n')
        print(message)
        self.can_stop = True
 	   

Имея определённым класс Listener, вы можете создать некий экземпляр необходимого объекта, подключение STOMP, установить подклюичение объекта ожидания, запустить данное соединение и затем подключиться к RabbitMQ:


listener = Listener()

conn = stomp.Connection()
conn.set_listener('', listener)
conn.start()
conn.connect()
 	   

Когда соединение установлено, метод Connection.subscribe отправляет запрос подписчика STOMP в RabbitMQ, который будет автоматически выдавать подтверждение получения на все принятые сообщения.


conn.subscribe('/queue/stomp-messages', id=1, ack='auto')
 	   

Чтобы гарантировать что данный код выполняет ожидание приёма сообщения, вы осуществляете цикл, засыпая на одну секунду, пока Listener.can_stop установлен в значение True:


while not listener.can_stop:
    time.sleep(1)
 	   

Наконец, мкогда сообщение принято, вы отключаетесь от данного соединения:


conn.disconnect()
 	   

Исполняйте даннй код пока не получите необходимое сообщение с определёнными заголовками сообщения в записной книжке "7.2.2 Send with Message Headers". Когда сообщение выбрано, вы должны обнаружить вывод, аналогичный приводимому на (Рисунке 9-5).

 

Рисунок 9-5


Вывод записной книжки IPython “7.2.3 Queue Subscriber”

Что вы должны были заметить, так это то, что свойства сообщения AMQP сливаются вместе с самими заголовками сообщения STOMP, включая content-length и destination. Если в свойствах самого принятого сообщения AMQP также имеются какие- либо значения, они также будут выведены в этих принятых как часть сообщения STOMP заголовках.

Как и при отправке сообщения STOMP, если вы делаете отправку некому получателю очереди в очередь, которая определяется через AMQP с применением индивидуальных аргументов, такая подписка завершится неудачей и вы не получите никакого сообщения. К счастью, команда RabbitMQ добавила формат строки получателя очереди AMQP и для таких сценариев.

Подписка на определяемую AMQP очередь

Если вам требуется перемежать подписчиков STOMP и потребителей AMQP, которые выполняют потребление из одной и той же очереди со своими индивидуальными аргументами, или если вам требуется чтобы отдельный подписчик STOMP потребляющий из очереди с персональными аргументами, вы можете воспользоваться форматом получателя /amq/queue/<queue-name> при применении своего подписчика STOMP.

Подписка на обмен или тему

Другим отличным свойством в подключаемом модуле STOMP состоит в том, что он делает возможным вам подписываться на некий обмен применяя формат /exchange/<exchange-name>/<binding-key>. Когда вы делаете это, создаётся некая временная очередь с исключительным использованием и она ограничена вашим подписчиком, который будет автоматически удалён после отключения вашего подписчика. Ваш подписчик затем будет прозрачно создан в качестве потребителя данной очереди и получать все направляемые в неё сообщения.

Аналогично, если вы подписаны при помощи формата /topic/<binding-key>, будет создана некая временная очередь с исключительным потреблением для вашего подписчика и она будет автоматически привязана при помощи предписанного ключа связывания. Выступая в роли ключа привязки для некоего обмена с темой, он может применять пространство имён с ограничителем точкой и с семантикой символов подстановки # и *, в точности как при подключении очереди с использованием AMQP. После того как данная временная очередь создана и подключена, ваш подписчик будет установлен на приём направляемых ему сообщений.

Подключаемые к RabbitMQ подписчики STOMP представляются потребителями AMQP в подключаемом модуле STOMP. Применяя различные форматы строк получателя вы можете передавать все шаги необходимые для потребления сообщений через AMQP, однако за некую дополнительную цену. Небольшие накладные расходы соединения мостом взаимодействия STOMP с AMQP позволяют вашим подписчикам STOMP автоматически создавать и связывать очереди без каких- либо дополнительных действий, требующихся как часть вашего кода. Кроме того, применяя строки получателя очереди AMQP, ваши подписчики STOMP могут потреблять из очередей, разделяемых с потребителями AMQP. Конечно, благодаря имеющейся простоты самого протокола STOMP, существуют такие вещи, которые подключаемый модуль STOMP должен делать через настройку для успешной поддержки как STOMP, так и для AMQP. В нашем следующем разделе вы изучите как настраивать подключаемый модуль для изменения поведения клиента и параметров подключения.

Настройка подключаемого модуля STOMP

Подключаемый модуль STOMP настраивается в центральном файле rabbitmq.config. Как и подключаемый модуль MQTT, он имеет свои собственные строфы в этом файле настроек и применяет формат структур данных Erlang. Изменения в этом файле не всупают в действие немедленно и требуют перезагрузки брокера RabbitMQ.

Как показывает следующий фрагмент кода, настройка верхнего уровня подключаемого модуля STOMP помещается в разделе настроек rabbitmq_stomp

Таблица 9-3 детализирует варианты настройки для подключаемого модуля STOMP.

Таблица 9-3. Опции настройки для подключаемого модуля STOMP
Директива Тип Описание Значение по умолчанию

default_user

String

Имя пользователя для его применения когда клиент STOMP не предоставил полномочия аутентификации.

[{login, "guest",  passcode, "guest"}]

implicit_connect

Integer

Допускает для подключений STOMP отказ от отправки кадров CONNECT. Если он включён, кадр CONNECTED не будет отправляться при подключении.

False

ssl_listeners

Array

Порты TCP для ожидания STOMP поверх SSL. Если определены, строфа верхнего уровня файла настроек rabbit должна содержать строфу настроек ssl_options.

[]

ssl_cert_login

Boolean

Разрешает аутентификацию на основе SSL.

False

tcp_listeners

Array

Порты TCP для ожидания соединений STOMP.

[61613]

Использование STOMP в веб браузере

Поставляемым с RabbitMQ является и Web STOMP. Применяющий возможности библиотеки SockJS, Web STOMP является приспособленным под RabbitMQ расширением, которое добавляет совместимый с вебсокетами сервер HTTP, позволяющим веб браузерам напрямую взаимодействовать с RabbitMQ. По умолчанию подключаемый модульWeb STOMP выполняет ожидание по порту 15670 и поддерживает весь протокол STOMP, за единственным исключением - собственно функциональность сердцебиений STOMP. По причине внутреннего устройства SockJS, основной применяемой Web STOMP для RabbitMQ библиотеки, сердцебиения не могут применяться.

Web STOMP включён в предоставляемой виртуальной машине Vagrant и содержит примеры, которые показывают вам как он может применяться. Для ознакомления с дополнительными примерами, демонстрирующими библиотеку и службу STOMP, посетите http://localhost:15670/web-stomp-examples/.

Прежде чем вы запустите и реализуете Web STOMP в качестве решения для своего приложения, рассмотрите последствия безопасности открытия своего сервера RabbitMQ в Интернет так, как если бы вы это делали с любыми иными приложениями или службами. Может иметь смысл изолировать сервер Web STOMP RabbitMQ в качестве отдельных кластеров или серверов, которые выступают мостом к вашим основным серверам при помощи инструментов, подобных подключаемым модулям Shovel и Federation для смягчения воздействия вредоносных или вводящих в заблуждение клиентов. Для получения дополнительной информации относительно Web STOMP посетите страницу этого подключаемого модуля www.rabbitmq.com/web-stomp.html.

Протокол STOMP является доступным для чтения человеком, потоковым протоклом на основе текста, разработанным для того чтобы он был простым и лёгким в реализации. Хотя двоичные протоколы, такие как AMQP и MQTT может быть более действенным в конкретном кабеле, используя меньше данных для обмена при одном и том же сообщении, обсуждаемый протокол STOMP имеет некоторые преимущества, в особенности когда применяется подключаемый модуль STOMP для RabbitMQ. Создание очереди и поведения привязки требуют меньше кода на вашей стороне, но это также приводит и к увеличению стоимости. Представительские соединения AMQP, создаваемые подключаемым модулем STOMP, которые применяются для взаимодействия транслируемых в RabbitMQ данных STOMP, имеют дополнительные накладные расходы в сравнении с непосредственным взаимодействием соединений AMQP, которым нет нужды в этом.

Что касается различных опций, которые доступны вам при публикации и потреблении сообщений AMQP, я настоятельно рекомендую вам чтобы вы провели эталонное тестирование применения STOMP совместно с RabbitMQ прежде чем применять его в промышленном решении. Всякий протокл имеет собственные преимущества и недостатки, а в некоторых случаях ни один из них не является идеальным.

В нашем следующем разделе обсудим statelessd, веб приложение, применяемое для высокопроизводительной, с отсутствием состояний публикаций в RabbitMQ. Оно было создано для сценариев, в которых оба протокола, и AMQP, и STOMP несут слишком большие накладные расходы для публикации с единственной транзакцией в стиле издал- и- забыл.

Публикация без сохранения состояния через HTTP

В некоторых ситуациях AMQP, MQTT, STOMP и прочие протоколы с сохранением состояния являются слишком затратными для сред, в которых присутствуют высокие скорости, которые не служат для сопровождения долгоживущих подключений к RabbitMQ. Поскольку эти протоколы имеют небольшие накладные расходы, связанные с подключением прежде чем предпринять относящиеся к обмену сообщениями действия, они могут быть далеко не идеальными с точки зрения производительности, что касается соединений с малым временем жизни. Именно эта реализация повлекла за собой statelessd, некого посредника (proxy) HTTP- к- AMQP для публикации, который позволяет публикацию сообщений с высокой производительностью в стиле поджёг- и- забыл для приложений клиентов без требования дополнительных накладных расходов для управления состоянием соединения.

Каак появился statelessd

Когда- то в середине 2008 мы начинали строить своё решение асинхронного обмена сообщениями на MeetMe.com (а затем myYearbook.com), в качестве средства отсоединения записей в базу данных от своего веб приложения на базе PHP. Первоначально мы строили своё решение при помощи Apache ActiveMQ, службы брокера на основе Java с поддержкой протокола STOMP. Основополагающим для memcached было успешное масштабирование базы данных на чтение, обмен сообщениями, STOMP и ActiveMQ позволил нам создавать потребляющие приложения, которые коренным образом изменили то, как мы представляли себе запись в базу данных, ограничивая рабочие нагрузки и масштабируя затратные с точки зрения вычислений рабочие нагрузки.

По мере роста обмена мы начали испытывать проблемы с ActiveMQ и приступили к оценкам прочих брокеров. Со временем RabbitMQ продемонстрировал множество доказательств в свою пользу и он при этом поддерживал то же самый протокол STOMP, который мы применяли в ActiveMQ. После того как мы осуществили миграцию на RAbbitMQ, мы нашли его хорошим выбором для своей среды, однако появились новые проблемы.

Один из моментов, который мы обнаружили практически сразу после того как начали применять RabbitMQ, состояла в том, что протокол с сохранением состояний AMQ был черезчур затратным для нашего стека приложений PHP. Мы обнаружили, что PHP не способен сопровождать текущее состояние для открытых соединений и каналов по запросам клиентов. Всякий необходимый для некоторого приложения PHP запрос требовал какого- то нового соединения с RabbitMQ чтобы публиковать все требующиеся ему для отправки сообщения.

Не поймите меня неправильно, но требующееся для создания AMQP соединения с RabbitMQ время не является очень существенным и может исчисляться миллисекундами. Однако когда вы публикуете десятки тысяч сообщений в секунду, обычно одно или два на каждый веб запрос, вы регулируете более десятков тысяч соединений с RabbitMQ в секунду. Для решения этой проблемы мы в конечном итоге создали statelessd, некий шлюз публикации HTTP- к- AMQP. Это приложение должно было принимать на себя высокую скорость HTTP-запросов при управлении стеком соединений, необходимым для публикации сообщений. Кроме того, это не могло быть узким местом для производительности, а также требовалось надёжное получение сообщения в RabbitMQ.

После выпуска statelessd в качестве открытого исходного кода мы обнаружили что мы были далеко не одиноки с данной проблемой. В 2013 парни из Weebly создали клон statelessd с названием Hare, который был исполнен на Go.

Использование statelessd

Разработанный с целью снижения накладных расходов настолько, насколько это возможно, statelessd ожидает, что публикующие сообщения клиенты делают это через HTTP и применяют естественные соглашения HTTP для перенаправления всей той информации, которая требуется для публикации обычных сообщений AMQP. Самая первая часть пути в URI HTTP содержит название виртуального хоста в RabbitMQ, для которого должно публиковаться это сообщение. Кроме того, должны применяться соответствующие обмен и ключ маршрутизации в качестве компонентов пути в данном запросе:


http://host[:port]/<virtual-host>/<exchange>/<routing-key>
 	   

Что касается имени пользователя и пароля, применяются заголовки базовой аутентификации HTTP. Когда поступает некий запрос, наш демон statelessd будет отслеживать то, имеется ли его стеке открытых соединений имя пользователя, пароль и виртуальных хост нашего RabbitMQ. Если это так, наш демон будет применять это открытое соединение для издания данного публикуемого сообщения, возвращая состояние 204 "request processed, no content returned" (запрос выполнен, нет возвращаемого содержимого) своему клиенту.

Поскольку statelessd обычно запускается в контролируемой среде, в которой проблемы с аутентификацией очень редки, был выбран компромисс между проектами для оптимальной эффективности запроса. Если соединение не установлено, statelessd будет выполнять внутреннюю буферизацию сообщения, запустит асинхронный процесс для подключения к RabbitMQ и вернёт состояние 204 клиенту. Как только соединение будет установлено, все буферизованные сообщения будут отправлены для конкретных полномочий. В случае возникновения проблем с соединением, данная комбинация полномочий помечается как плохая и все последующие запросы будут получать ошибку 424, или "request failed due to the nature of a previous request" (отказ в запросе по причине природы предыдущего запроса).

Запросы statelessd используют HTTP POST для отправки стандартной пары формы кодировки ключ/ значение, который переносит само тело и свойства данного сообщения для публикации. Допустимые ключи для запросов без сохранения состояния содержат тело, само значение реального тела самого по себе, а также всех стандартных свойств имён сообщения AMQP, причём символ тире заменяется на подчёркивание. Например, для установки значения свойства message-id, его полезная нагрузка в запросе должна содержать значение, назначаемое ключу с названием message_id. Для получения полного перечня допустимых ключей в полезной нагрузке запроса statelessd, ознакомьтесь с документацией.

Операционная архитектура

Statelessd разработан для запуска на том же самом сервере, что и серверы RabbitMQ, для которых должны публиковаться сообщения. Это демон на базе Python, который обычно настраивается так, чтобы он в своём сервере имел процесс, который запускается для каждого ядра ЦПУ в данном сервере. Каждый процесс сервера имеет свой собственный порт HTTP, по которому он выполняет ожидание. Эти процессы собираются воедино и могут работать через посредника (proxy) с единственным портом, применяемым в реверсивном прокси сервере, таком как Nginx (Рисунок 9-6), предоставляя горизонтально масштабируемое решение, которое прошло эталонное тестирование вплоть до сотен тысяч сообщений в секунду на каждом из серверов.

 

Рисунок 9-6


Архитектура действий statelessd

Если вам нужно запустить statelessd на множестве серверов, все экземпляры серверов Nginx могут добавляться к некому балансировщику нагрузки, распределяя все публикуемые запросы по множеству серверов в кластере. Statelessd включает в себя некий терминал URL для выборки статистических данных, которые могут применяться для сопоставления скоростей пропускной способности сообщений между кластером узлов statelessd и серверами RabbitMQ. Для ознакомления с документацией по установке и настройке statelessd вы можете ознакомиться на https://github.com/gmr/statelessd.

Публикация сообщений посредством statelessd

Для публикации сообщений в RabbitMQ можно воспользоваться любой стандартной библиотекой HTTP. Для данного примера мы применяем библиотеку Python с названием requests. Прежде чем опубликовать сообщение вам следует создать и выполнить привязку очереди для публикации сообщений. Приводимый далее код из записной книжки "7.4.4 Queue Setup" делает именно это.


import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'statelessd-messages')
        queue.declare()
        queue.bind('amq.topic', '#')
 	   

Получив объявленную очередь всё что осталось сделать, это опубликовать некое сообщение. Statelessd должен быть уже запущенным в вашей виртуальной машине Vagrant, поэтому запуская следующий код из записной книжки "7.4.4 Publish Message" публикует некое сообщение в нашей очереди "statelessd-messages".


import requests

payload = {'body': 'from statelessd', 'app_id': 'example'}
response = requests.post('http://localhost:8900/%2f/amq.topic/example',
                          auth=('guest', 'guest'),
                          data=payload)
 	   

Для проверки того, что данное сообщение опубликовано, переместитесь в интерфейс управления RabbitMQ по адресу http://localhost:15672/#/queues/%2F/statelessd-messages.

Теперь, после того как вы опубликовали сообщение через statelessd, не лишним будет повторно напомнить, что statelessd решает определённый вариант применения публикации сообщений в RabbitMQ. Когда вы размышляете о том где он может вписаться в вашу архитектуру обмена сообщениями, обратите внимание на цели и производительность statelessd. Он был разработан для включения высокоскоростной публикации из многих различных приложений публикации. Он не поддерживает протокол AMQP в полном объёме, а также не поддерживает многие из более развитых свойств публикации в RabbitMQ, таких как Подтверждение издателю или транзакционные публикации. Он подходит не для любых проектах, но о нём следует помнить на случаё тех решений, где он способен внести гигантский вклад.

Выводы

RabbitMQ выходит за рамки AMQP нацеленности на производителя и нейтральности к платформе поддерживая дополнительные протоколы, такие как STOMP и MQTT. Кроме того существует яркая экосистема подключаемых модулей и приложений, которые делают возможным разнообразное общение с RabbitMQ. Например, вместо применения подобного AMQP протокола для мобильных приложений, которые могут быть подвержены сетевым прерываниям и медленным скоростям обмена, применяется MQTT, именно тот протокол, который и создан для такого рода задач. Имеются такие приложения как Hare и statelessd, которые делают возможной более эффективную публикацию сообщений.

Кроме того, вот некоторый перечень подключаемых модулей, которые добавляют дополнительную поддержку протоколов в RabbitMQ:

Эти примеры демонстрируют то разнообразие протоколов обмена сообщениями, которые можно применять в RabbitMQ. Хотя подключаемый модуль rabbitmq-smtp может иметь ограниченные варианты использования в сравнении с подключаемым модулем Web STOMP, ваше приложение может иметь уникальные требования и я рекомендую вам убедиться, что вы применяете подходящий инструмент для своего задания, когда дело доходит до взаимодействия с RabbitMQ.