Глава 2. Как разговаривает Rabbit: протокол AMQ
Содержание
Эта глава описывает
-
Взаимодействие с RabbitMQ через протокол AMQ
-
Формирование кадров протокола AMQ на нижнем уровне
-
Публикация сообщений в RabbitMQ
-
Получение сообщений из RabbitMQ
Тот процесс, который преодолевают RabbitMQ и библиотеки клиентов чтобы получать сообщения из вашего приложения в RabbitMQ и из RabbitMQ в потребляющие приложения может быть сложным. Если вы обрабатываете критически важную информацию, такую как данные продаж, надёжность доставки всех канонических источников информации о продажах должно быть верхом приоритета. На уровне протокола спецификация AMQP определяет такую семантику для клиента и брокера по ведению переговоров и общению друг с другом о самом процессе ретрансляции вашей информации. Зачастую тот лексикон, который определяется в самой спецификации AMQP бьёт фонтаном в библиотеки клиентов RabbitMQ посредством методов и классов, применяемых приложениями при взаимодействии с RaggitMQ и отражающими классы и методы уровня этого протокола. Понимание того как происходит это взаимодействие поможет вам усвоить не только то "как" взаимодействовать с RabbitMQ, но и зачем.
Даже несмотря на то, что все команды в клиентских библиотеках имеет тенденцию не только походить, но и напрямую копировать все определяемые спецификацией AMQP действия, большинство клиентских библиотек пытаются скрывать имеющуюся сложность взаимодействия через протокол AMQP. Эта тенденция является хорошим подспорьем когда вы собираетесь писать некое приложение и не желаете беспокоиться о том как имеющиеся интерфейсы заставляют вещи работать. Однако перепрыгивание через технические основы того что делают клиенты RabbitMQ не очень полезно тогда, когда вы желаете на самом деле понимать что происходит с вашим приложением. Будет ли у вас желание узнать почему ваше приложение выполняет публикацию медленнее чем вы ожидали, либо вы хотите просто знать какие шаги некий клиент должен предпринять и в каком порядке чтобы установить такое первое соединение с RabbitMQ, знание того как ваш клиент общается с RabbitMQ сделает этот процесс намного яснее.
Для лучшей иллюстрации того что и как происходит, данной главе вы изучите как AMQP расщепляет взаимодействие между своими клиентом и брокером на порции данных, именуемые кадрами (frames) и как эти кадры детализируют те действия, которые ваше клиентское приложение желает предпринять с RabbitMQ, а действия RabbitMQ желают получить ваше приложение клиента. Кроме того, вы изучите как такие кадры строятся на уровне самого протокола и как они предоставляют то механизм, которым доставляются и потребляются сообщения.
Отталкиваясь от этой информации вы напишите своё первое приложение на Python применяя некую библиотеку клиента RabbitMQ, созданную с целью обучения для данной книги. Это приложение воспользуется AMQ для определения неких обмена и очереди и затем свяжет их вместе. Наконец, вы напишите приложение, которое будет считывать все сообщения из только что определённой очереди и выводить на печать всё содержимое этого сообщения. Если вы уже знакомы с выполнением этих вещей, вам всё равно стоит углубиться в данную главу. Я обнаружил, что лишь после того как я полностью понял всю семантику AMQP и "зачем" пришло на смену "как", только тогда я понял RabbitMQ.
В качестве некоторого брокера AMQP, RabbitMQ общается на строгом диалекте взаимодействия, используя шаблоны RPC (remote procedure call, удалённого вызова процедур) практически во всех аспектах взаимодействия с основным ядром продукта. Некий удалённый вызов процедуры является каким- то видом взаимодействия между компьютерами, который позволяет одному компьютеру исполнять некую программу или её методы на другом. Если вы выполнили некое веб программирование, в котором вы общались с каким- то удалённым API, вы воспользовались неким общим шаблоном RPC.
Однако те RPC общения, которые имеют место при взаимодействии с RabbirMQ отличаются от большинства вызовов API на основе веб интерфейса. В большинстве определений API веб интерфейса имеются RPC общения, в которых ваш клиент вызывает команды, а его сервер отвечает ему - сам сервер не вызывает команды в обратном порядке на вызвавшем его клиенте. В спецификации AMQP и клиент и сервер могут вызывать команды. Для некоего клиентского приложения это означает, что он обязан осуществлять ожидание (listen) взаимодействия со своим сервером, который может иметь общение по поводу того чем занимается это клиентское приложение.
Для иллюстрации того как работает RPC когда какой- то клиент общается с RabbitMQ, давайте рассмотрим процесс переговоров в конкретном соединении.
Когда вы общаетесь с каким- то незнакомцем в зарубежной стране, неизбежно, что один из вас начнёт общение с приветствия, чего- то, что даст
вам и второй стороне понимание того, способны ли вы общаться на одном языке. При общении с AMQP таким приветствием является заголовок протокола
protocol header и он отправляется вашим клиентом своему серверу. Это приветствие
не следует рассматривать как некий запрос, однако, в отличии от всего переговора, который будет иметь место в дальнейшем, это и не команда.
RabbitMQ начинает последовательность команде/ отклик с отклика на приветствие командой Connection.Start
,
а его клиент отвечает на такой запрос RPC кадром отклика Connection.StartOk
Рисунок 2-1).
Рисунок 2-1
Начальное обсуждение условий с RabbitMQ, демонстрирующее определённый процесс RPC в AMQP.
Весь процесс общения при инициации какого- то соединения не является чем- то страшно важным, только если вы не пишите некую библиотеку клиента, однако будет не лишним знать, что до окончательного соединения с RabbitMQ имеется некая последовательность из трёх синхронных запросов RPC на начало, регулировку и открытие такого соединения. Как только эта последовательность завершится, RabbitMQ будет готов к тому, что ваше приложение может делать запросы.
Существует широкий диапазон различных команд, которые ваше приложение может отправлять в RabbirMQ, а этот RabbitMQ может отправлять вашему клиенту. Позднее в этой главе вы изучите некое небольшое подмножество этих команд, но прежде чем это случится, вам следует открыть какой- то канал.
{Прим. пер.: если у вас возникло нестерпимое желание быстро попробовать RPC своими руками через пару минут, рекомендуем свой перевод Главы 5, Построение веб рассылки с микрослужбами из вышедшей в свет в феврале 2018 в Packt Publishing книги Даниеля Фуртейдо, Маркуса Пеннингтона "Копирка для программирования на Python".}
Аналогично каналам в двустороннем радио, имеющаяся спецификация AMQP определяет каналы для взаимодействия с RabbitMQ. Двусторонний обмен информацией по радио друг с другом применяет электромагнитные волны в качестве соединения между собой. В AMQP каналы применяют соединение обсуждения условий AMQP, как свой проводник для передачи информации друг другу, и так же как каналы в двустороннем радио, они изолируют свой обмен от прочих происходящих переговоров. Некое отдельное соединение AMQP может иметь множество каналов, делая возможным чтобы имело место множество переговоров между клиентом и сервером. В технической терминологии это именуется как мультиплексирование (multiplexing) и оно может быть полезным для многопоточных и асинхронных приложений, выполняющих множество задач.
Совет | |
---|---|
При создании своего клиентского приложения важно не усложнять вещи со слишком большим количеством каналов. В проводе с маршалированными кадрами каналы являются ничем иным как неким целым значением, которое назначается определённому сообщению, передаваемому между сервером и клиентом; в наших серверах и клиентах RabbitMQ они представляют нечто большее. Существуют некие структуры и объекты в памяти, устанавливаемые для всякого канала. Чем больше каналов имеет некое соединение, тем больше памяти должен использовать RabbitMQ для управления таким потоком сообщений данного соединения. Если вы используете его благоразумно, у вас имеется довольный сервер RabbitMQ и меньше проблем у приложений клиента. |
Во многом аналогично концепции объектно- ориентированного программирования в таких языках как C++, Java и Python, AMQP использует классы и методы,
именуемые AMQP commands для создания общего языка общения между клиентами и серверами.
Конкретные классы в AMQP определяют некую сферу функциональности, причём каждый класс содержит методы, выполняющие различные задачи. В процессе
обсуждения начальных условий соединения сервер RabbitMQ отправляет своему клиенту команду Connection.Start
,
маршалируемую {выстраиваемую} в какой- то кадр. Как иллюстрируется на
Рисунке 2-2),
команда Connection.Start
составляется из двух компонентов: class
и метода method AMQP.
В спецификации AMQP имеется множество команд, но если вы похожи на меня, вам следует пропустить всё это и получить более важные биты отправки и получения сообщений. Однако важно понимать будут оправляться и приниматься в RabbitMQ в представлении провода, чтобы по- настоящему оценить что происходит в ваших приложениях.
При отправлении команд в сторону RabbitMQ и от него, все те аргументы, которые требуются для их исполнения включаются в структуры данных, именуемые кадрами, которые кодируют все данные для их передачи. Кадры предоставляют некий эффективный способ для кодирования и разграничения в кабеле необходимых команд и аргументов. Вы можете представлять себе кадры как перевозимые в железнодорожном составе грузовые вагоны. В качестве некоторого обобщения, грузовые вагоны имеют одну и ту же базовую структуру и отличаются лишь содержимым. То же самое справедливо и для кадров AMQP нижнего уровня. Как иллюстрирует Рисунок 2-3, некий кадр AMPQ нижнего уровня составляется из пяти различных компонентов:
-
Типа кадра
-
Номера канала
-
Размера кадра в байтах
-
Полезной нагрузки кадра
-
Маркера завершающего байта (ASCII значение 206)
Кадр AMQP нижнего уровня стартует с трёх полей, именуемых заголовком кадра (frame header) в случае их объединения. Самое первое поле является единственным байтом, указывающим тип данного кадра, а второе поле определяет номер канала, для которого предназначен это кадр. Третье поле несёт размер в байтах полезной нагрузки данного кадра. Такой заголовок кадра совместно и маркером завершающего байта создают обрамление для кадра.
Несомое внутри данного кадра, после его заголовка и до маркера завершающего байта составляет полезную нагрузка данного кадра. Во многом аналогично тому как определённый грузовой вагон защищает своё содержимое в железнодорожном составе, кадр разработан с целью защищать целостность переносимого им содержимого.
Спецификация AMQP определяет пять типов кадров: кадр заголовка протокола, кадр метода, кадр заголовка содержимого, кадр тела и кадр сердцебиения. Каждый тип кадра имеет некую отличительную цель, причём какие- то из них применяются гораздо чаще остальных:
-
Кадр заголовка протокола применяется всего лишь один раз, а именно при соединении с RabbitMQ.
-
Кадр метода несёт в себе определённый запрос RPC или отклик, который отправляется или получается от RabbitMQ.
-
Кадр заголовка содержимого содержит размер и свойства некоторого сообщения.
-
Кадры тела собственно и составляют сообщения.
-
Кадр сердцебиения отправляется в сторону RabbitMQ и от него в качестве проверки обеспечения того, что обе стороны соединения доступны и работают надлежащим образом.
В то время как кадры заголовка протокола и сердцебиения обычно абстрагируются от разработчиков при применении библиотек клиента, кадры метода, заголовка содержимого и тела и их конструкции обычно выходят наружу при написании приложений, которые взаимодействуют с RabbitMQ. В следующем разделе вы изучите как отправляемые в RabbitMQ и принимаемые из него сообщения маршализуются (упорядочиваются) в кадре метода, кадр заголовка содержимого и один или несколько кадров тела.
Замечание | |
---|---|
Поведение сердцебиения в AMQP применяется для гарантии того, что и клиент и сервер отвечают друг другу и является исключительным примером AMQP
как двунаправленного протокола RPC. Если RabbitMQ отправляет некое сердцебиение в ваше клиентское приложение, а оно не отвечает, RabbitMQ отсоединит его.
Очень часто разработчики в однопотоковых или асинхронных средах разработки будут заинтересованы в увеличении устанавливаемого таймаута в какое- то
максимальное значение. Если вы установите, что ваше приложение блокирует взаимодействие таким образом, что это делает трудноосуществимой работу
сердцебиения, вы можете отключить его установив значение интервала сердцебиения равным 0 при создании своего клиентского соединения. Если, наоборот,
вы выбираете применение намного более высокого значения чем устанавливаемое по умолчанию в 600 секунд, вы можете изменить значение интервала максимума
сердцебиений RabbitMQ отредактировав значение |
При публикации некоторого сообщения в RabbitMQ применяются кадры метода, заголовка и тела. Самым первым отсылаемым кадром является кадр метода, несущий саму команду и параметры для её исполнения, такие как значение обмена и ключ маршрута. Вслед за кадром метода идут кадры содержимого: заголовка содержимого и тела. Кадр заголовка содержимого заключает необходимые свойства сообщения вместе с размером его тела. AMQP имеет некий максимальный размер кадра и если размер тела вашего сообщения превосходит это размер, всё содержимое будет расщепляться на множество кадров тела. Эти кадры всегда отправляются в одном и том же порядке по проводу: кадр метода, кадр заголовка содержимого и один или более кадров тела (Рисунок 2-4).
Рисунок 2-4
Отдельное публикуемое в RabbitMQ сообщение составляется из трёх типов кадров: кадра метода для вызова RPC
Basic.Publish
,
кадра заголовка и одного или более кадров тела.
Как иллюстрирует Рисунок 2-4, при отправке некоторого сообщения в RabbitMQ
в его кадре метода отправляется команда Basic.Publish
, а за ней следует кадр заголовка содержимого с
требуемыми свойствами сообщения, такими как тип содержимого сообщения и значение времени, когда это сообщение было отправлено. Эти свойства заключаются
в некую структуру данных, определяемую в спецификации AMQP как Basic.Properties
. Наконец, само содержимое данного
сообщения упорядочивается (маршализуется, выстраиваются) в соответствующее число кадров тела.
Замечание | |
---|---|
Хотя установленным значением размера кадра по умолчанию является 131 кБ, библиотеки клиента могут согласовывать больший или меньший размер кадра в процессе установления соединения, вплоть до 32- битного значения на общее число байт в некотором кадре. |
Чтобы быть более эффективными и минимизировать размер передаваемых данных, содержимое в кадре метода и кадре заголовка содержимого представляет собой упакованную бинарную информацию и не читается человеком. В отличие от кадров метода и заголовка, содержимое сообщения, переданное внутри кадра тела, не упаковано и не закодировано никоим образом и может быть чем угодно, от простого текста до двоичного изображения.
Для дальнейшей иллюстрации имеющейся анатомии некоторого сообщения AMQP давайте исследуем эти три типа кадров более подробно.
Кадры метода переносят в себе значения класса и метода вашего запроса RPC, который следует сделать, а также аргументы, которые одновременно передаются
для обработки. На Рисунке 2-5 значение кадра метода, переносящего команду
Basic.Publish
несёт двоичные упакованные данные, описывающие эту команду, а также аргументы данного запроса, которые
передаются вместе с ней. Первые два поля являются численным представлением имени класса Basic
и названия
метода Publish
. За этими полями следуют строковые значения для названия необходимого обмена и значения ключа
маршрутизации. Как уже упоминалось ранее, эти атрибуты указывают RabbitMQ как осуществлять маршрутизацию сообщения. Значение флага
mandatory
сообщает RabbitMQ что данное сообщение должно быть доставлено или публикацию этого сообщения следует
сбросить.
Рисунок 2-5
Кадр метода
Basic.Publish
составляется из пяти компонентов: значения типа класса, и типа метода, которые идентифицируются как
запрос RPC Basic.Publish
, название необходимого обмена, какое- то значение ключа маршрутизации, а также флаг mandatory
.
Все значения данных полезной нагрузки кадра метода кодируется в формате, специфичном для типа данных. Этот формат разработан для минимизации размера байтов в проводе и при этом обеспечивает целостность и гарантирует, что упорядочение и разборка этих данных были быстрыми настолько, насколько это возможно. Реальный размер изменяется в зависимости от определённого типа данных, но обычно за отдельным байтом следуют численные данные, либо отдельный байт сопровождается полем размера в байтах с последующими текстовыми данными.
Замечание | |
---|---|
Обычно оправка сообщения при помощи запроса RPC |
Те заголовки, которые отправляются в хвосте кадра метода несут больше чем сами данные, которые сообщают RabbitMQ насколько велико ваше сообщение.
Как иллюстрирует Рисунок 2-6, этот кадр заголовка также переносит атрибуты о вашем методе,
которые описывают это сообщение как для сервера RabbitMQ, так и для любого способного получить его приложения. Эти атрибуты в качестве значений
в таблице Basic.Properties
могут содержать данные, которые описывают передаваемое содержимое вашего сообщения или
они могут быть совершенно пустыми. Большинство библиотек клиента предварительно наполняют некий минимальный набор полей, таких как значение типа содержимого
и значение режима доставки.
При составлении ваших сообщений свойства являются мощным инструментарием. Они могут применяться для создания некоего контракта между издателями и
потребителями о передаваемом содержимом данного сообщения, что делает возможным большой объём специфических особенностей такого сообщения. Вы изучите
Basic.Properties
и различные варианты возможного применения для каждого из полей той структуры данных, которая
может передаваться в Главе 3.
Кадр тела для сообщения является равнодушным к значению типа подлежащих передаче данных и может содержать либо двоичные, либо текстовые данные. Будете ли вы отправлять бинарные данные, такие как образ JPEG, или последовательные данные в формате SON или XML, содержимое кадра тела сообщения является той структурой в вашем сообщении, которая переносит все реальные данные сообщения (Рисунок 2-7).
Объединяемые воедино, свойства сообщения и тело формируют мощный формат инкапсуляции ваших данных. Соединение в союзе имеющиеся описательные атрибуты вашего сообщения и равнодушного к содержимому тела обеспечивает вам возможность применения RabbitMQ для любого типа данных, которые вы сочтёте уместными.
Прежде чем вы сможете публиковать сообщения в очереди, имеется несколько относящихся к настройке шагов, которые вы обязаны принять во внимание. Как минимум, вы должны настроить и обмен и очередь, а затем связать их вместе.
Однако, прежде чем вы на самом деле выполните эти шаги, давайте взглянем на то что должно произойти на уровне протокола чтобы сделать доступной публикацию, маршрутизацию, постановку в очередь и доставку сообщения, начиная с настройки обмена для маршрутизации сообщений.
Обмены, как и очереди, являются первостепенными обитателями модели AMQ. А раз так, каждый из них имеет свой собственный класс в спецификации AMQ.
Обмены создаются при помощи команды Exchange.Declare
, которая имеет аргументы, определяющие необходимое имя
этого обмена, его тип и прочие метаданные, которые могут применяться для обработки сообщения.
После отправки команды и создания RabbitMQ необходимого обмена в ответ отправляется некий кадр метода
Exchange.DeclareOk
(Рисунок 2-8). Если по какой-
либо причине эта команда откажет, RabbitMQ закроет тот канал, который отправила команда Exchange.Declare
путём передачи команды Channel.Close
. Такой отклик будет содержать некий численный код и текстовое значение,
указывающие почему отказал Exchange.Declare
и данный канал был закрыт.
После создания обмена самое время создать некую очередь отправив в RabbitMQ команду Queue.Declare
. Как и
для команды Exchange.Declare
, имеется некая простая имеющая место последовательность взаимодействия
(Рисунок 2-9) и в случае возникновения отказа команды
Queue.Declare
данный канал будет закрыт.
Рисунок 2-9
Последовательность взаимодействия объявления очереди, состоящая из команды
Queue.Declare
и отклика Queue.DeclareOk
.
При объявлении некоей очереди нет никакого вреда в выдаче одной и той же команды Queue.Declare
более
одного раза. RabbitMQ будет считать, что объявляемая очередь является пассивной и будет возвращать полезную информацию об этой очереди, например
общее число сообщений в ожидании в данной очереди и общее число подписанных на неё потребителей.
Аккуратная обработка ошибок | |
---|---|
Когда вы осуществляете попытку объявления какой- то очереди с отличающимися свойствами от тех,которые имеются у очереди с точно таким же названием,
RabbitMQ закроет тот канал, в котором был запущен этот запрос RPC. Такое поведение согласуется с любым иным типом ошибок, которые может делать ваше
клиентское приложение при выдаче команд своему брокеру. Например, если вы выдаёте команду Для правильной обработки ошибок ваше клиентское приложение должно выполнять ожидание команды Если ваше клиентское приложение не выполняет ожидание (listening) или обработку событий, приходящих со стороны своего сервера, вы можете терять сообщения. Если вы выполняете публикацию в несуществующий или закрытый канал, RabbitMQ может закрыть данное соединение. Если ваше приложение потребляет сообщения и не знает что RabbitMQ закрыл этот канал, оно может не знать что RabbitMQ прекратил отправку сообщений вашему клиенту и не может всё ещё считать что оно работает надлежащим образом и выступать подписчиком на некую пустую очередь. |
Когда созданы необходимые обмен и очередь наступает время связать их воедино. Как и в случае с командой Queue.Declare
,
соответствующая команда для связывания очереди с неким обменом, Queue.Bind
, может за раз определять только
одну очередь. Во многом также как и в случае с командами Exchange.Declare
и
span class="term">Queue.Declare
, после выпуска команды Queue.Bind
ваше приложение
получит кадр метода Queue.BindOk
если она будет обработана успешно
(Рисунок 2-10).
Рисунок 2-10
После того, как ваш клиент успешно выдал некую команду
Queue.Bind
для связывания очереди с обменом при помощи ключа
маршрутизации, этот клиент получит откликом некий кадр метода Queue.BindOk
.
В качестве примеров взаимодействия между сервером и клиентом RabbitMQ команды Exchange.Declare
,
Queue.Declare
и Queue.Bind
иллюстрируют некий общий шаблон, которому
подражают все синхронные команды в спецификации AMQP. Однако существует ряд асинхронных команд, которые выделяются из такого простого шаблона
"Action" и "ActionOk". Эти команды имеют дело с отправкой и приёмом сообщений в RabbitMQ.
Как вы только что усвоили, при публикации сообщений в RabbitMQ множество кадров инкапсулируют отправляемые в сервер данные. Прежде чем реальное
содержимое сообщения сможет достичь RabbitMQ, имеющееся клиентское приложение отправляет кадр метода
Basic.Publish
, кадр заголовка содержимого и по крайней мере один кадр тела
(Рисунок 2-11).
Рисунок 2-11
При публикации какого- то сообщения в RabbitMQ отправляются по крайней мере три кадра: кадр метода
Basic.Publish
.
кадр заголовка содержимого и кадр тела
Когда RabbitMQ получит все необходимые сообщению кадры, он проверит полученную необходимую информацию из своего кадра метода перед определением
последующих шагов. Кадр метода Basic.Publish
несёт в себе название требуемого обмена и ключ маршрутизации
для данного сообщения. При вычислении этих данных RabbitMQ попытается установить соответствие полученного в данном кадре
Basic.Publish
имени обмена в собственной базе данных настроенных обменов.
Совет | |
---|---|
По умолчанию, если вы публикуете сообщения в каом- то обмене, который не существует в настройках rabbitMQ, тот втихую грохнет данное сообщение. Чтобы гарантировать доставку вашего сообщения, либо установите флаг mandatory в значение истины при публикации, либо примените подтверждение доставки. Эти опции подробно поясняются в Главе 4. Имейте в виду, что применение любого из этих методов может отрицательно воздействовать на скорость публикации данного сообщения вашим приложением.. |
После того как RabbitMQ определяет соответствующее название обмена указанному в кадре метода Basic.Properties
,
он вычисляет связывание в этом обмене и отыскивает соответствующие установленному ключу маршрутизации очереди. В случае когда критерий для сообщения
соответствует каким- либо привязанным очередям, сервер RabbitMQ выдаст это сообщение в очередь по правилу FIFO. Вместо помещения реального сообщения
в некую структуру данных очереди в эту очередь добавляется ссылка на такое сообщение. Когда RabbitMQ готов к доставке данного сообщения, он
воспользуется данной ссылкой чтобы составить упорядоченное (marshaled) сообщение и отправить его в провод. Это предоставляет прочную основу
оптимизации публикуемым во множестве очередей сообщениям. Удерживание только одного экземпляра отнимает меньше памяти при его публикации для
множества получателей. Размещение некоторого сообщения в очереди, будь то очередь потребления, просроченных сообщений или простоя, не повлияет на
расположение в любой другой очереди. После того как RabbitMQ перестанет нуждаться в данном сообщении по причине доставки всех имеющихся у него копий или
его удаления, данная единственная копия этого сообщения будет удалена из памяти в RabbitMQ.
По умолчанию, до тех пор пока никакие потребители не будут прослушивать данную очередь, сообщения будут храниться в этой очереди. По мере того как
вы добавляете больше сообщений эта очередь растёт в размере. RabbitMQ может хранить эти сообщения в оперативной памяти или записывать их на диск в зависимости
от свойства delivery-mode
, определяемого в Basic.Properties
данного сообщения.
Свойство delivery-mode
настолько важно, что оно будет рассмотрено в нашей следующей главе, а ещё дополнительные подробности в Главе 4.
После того как опубликованное сообщение снабжено маршрутом и поставлено в очередь в одной или более очередях, не остаётся болше тем для обсуждения кроме
его потребления. Для потребления сообщений из очереди в RabbitMQ, некое потребляющее приложение подписывается на соответствующую очередь в RabbitMQ
выпуская команду Basic.Consume
. Как и в случае с прочими синхронными командами, сервер ответит
Basic.ConsumeOk
чтобы позволить данному клиенту знать о том, что он собирается открыть ворота шлюза и выпустит
массивный поток сообщений или, по крайней мере, тоненькую струйку. По усмотрению RabbitMQ установленный потребитель начнёт получать сообщения в не вызывающей
удивления форме эквивалентов методов Basic.Deliver
и заголовков содержимого, а также кадров тела
(Рисунок 2-12).
Рисунок 2-12
Логика порядка доставки кадра между клиентом и сервером при подписке на какую- то очередь и получении сообщений
После вызова Basic.Consume
, он будет оставаться активным, пок4а не произойдёт одна из ряда вещей.
Если некий потребитель пожелает остановить приём сообщений, он может вызвать команду Basic.Cancel
.
Не лишним будет отметить, что эта команда выпускается асинхронно, в то время как RabbitMQ всё ещё может отправлять сообщения, поэтому
потребитель всё ещё будет получать некоторое число сообщений, которые RabbitMQ разместил ему прежде чем получил кадр отклика
Basic.CancelOk
.
При потреблении сообщений имеются некоторые установки, которые позволяют дать знать RabbitMQ о том как вы желаете получать их. Одной из таких настроек
является аргумент no_ack
для команды Basic.Consume
. Когда он установлен в
значение истины, RabbitMQ будет отправлять сообщения непрерывно до тех пор, пока имеющийся потребитель не отправит команду
Basic.Cancel
или этот потребитель не отсоединится. Если же значение флага
no_ack
установлено в значение ложь, потребитель обязан подтверждать приём каждого принимаемого им сообщения, отправляя
некий запрос RPC Basic.Ack
(Рисунок 2-13).
Рисунок 2-13
Всякое успешно доставленное клиенту RabbitMQ будет снабжаться откликом
Basic.Ack
до тех пор, пока не будет отправлена команда
Basic.Ack
. Если определён no_ack
, этот этап Basic.Deliver
пропускается.
При отправке кадра отклика Basic.Ack
данный потребитель должен передать с ним некий аргумент из кадра
метода Basic.Deliver
, именуемый как delivery tag
(маркер доставки). RabbitMQ использует данный маркер доставки помимо номера канала в качестве уникального идентификатора для квитирования
взаимодействия сообщений, их отклонения и отрицания подтверждения приёма. Об этих вариантах вы узнаете дополнительные подробности в
Главе 5.
Теперь, когда у вас за поясом есть жизнеспособное знание об основах AMQP, самое время окунуться из теории в практику и написать издателя и потребителя. Для того чтобы сделать это мы воспользуемся библиотекой rabbitpy. Для взаимодействия с rabbitMQ имеется множество библиотек, но я создал rabbitpy для цели обучения в этой книге чтобы оставлять все примеры программирования простыми и лаконичными при попытке оставаться справедливым относительно синтаксиса команд AMQP. Если вы ещё не сделали этого, установите, пожалуйста, rabbitpy следуя приводимым ниже в Дополнении A инструкциям установки требующейся ВМ.
Чтобы начать это упражнение вы воспользуетесь записной книжкой сервера IPython, установленного как часть нашей виртуальной машины RabbitMQ in Depth. Если вы ещё не сделали этого, выполните, пожалуйста, все отображённые в Дополнении A настройки для своего локального комп.тера. Откройте в своём браузере http://localhost:8888 и вы должны увидеть страницу, аналогичную отображаемой на Рисунок 2-14.
Закладка "2.4 Publisher Example" в индексе записной книжки содержит демонстрируемые на этих страницах все коды для взаимодействия с RabbitMQ. Вам следует импортировать библиотеку rabbitpy с тем, чтобы интерпретатор Python позволил вам применять её.
In [1]: # Импорт библиотеки клиента RabbitMQ
import rabbitpy
Если вы нажмёте кнопку Play или Run Cell в панели инструментов или если вы нажмёте Shift-Enter, исполнится тот элемент, который содержит данный код. В самом первом элементе данной записной книжки будет импортирована библиотека rabbitpy.
Вы также должны увидеть изменение звёздочки (*) на число 1. Активный элемент автоматически продвигается с самого первого на следующий. По мере того как вы читаете этот пример кода, вы должны исполнить каждый элемент перечисляемый здесь, продвигаясь по всему коду в этой записной книжке IPython.
Теперь, имея импортированной rabbitpy, вам следует создать некий URL соединения AMQP. Формат для такого URL очень схож с форматом, применяемым для запросов HTTP:
In [2]: # Определяем URL для соединения с ним
url = 'amqp://guest:guest@localhost:5672/%2F'
Такой URL AMQP определяет, что вы будете выполнять соединение поверх обычной связи AMQP с применением имени пользователя "guest" и его пароля "guest". Он будет подключаться к локальному хосту по порту с номером 5672 установленным по умолчанию "/" vhost. Этот URL подразумевает что вы соединитесь к RabbitMQ в своей локальной машине с установленной по умолчанию конфигурацией. Если вы установили RabbitMQ на каком- то удалённом сервере или изменили настройку имеющегося брокера RabbitMQ, вам придётся заменить соответствующим образом эти значения.
Теперь, когда URL определён, настало время открыть соединение с RabbitMQ:
In [3]: # Подключаемся к RabbitMQ с применением приведённого выше URL
connection = rabbitpy.Connection(url)
Если вы не получили некоей исключительной ситуации, вы теперь подключены к RabbitMQ. Если всё же какое- то исключение появилось, скорее всего, сценарий будет состоять в том, что RabbitMQ не запущен в вашей локальной машине. Убедитесь, пожалуйста, что он исполняется и попробуйте снова.
Если вы успешно выполнили подключение, настало время открыть некий канал для взаимодействия с RabbitMQ:
In [4]: # Открываем новый канал в имеющемся соединении
channel = connection.channel()
Когда необходимый канал открыт, вы можете теперь определить некий обмен создав какой- то новый экземпляр определённого класса
rabbitpy.Exchange
. Перейдите в этот канал и озаглавьте получаемый обмен, который вы хотите создать.
Я предполагаю воспользоваться в данный момент именем chapter2-example
.
In [5]: # Создаём новый объект обмена,переходя в соответствующий канал для его применения
exchange = rabbitpy.Exchange(channel, 'chapter2-example')
Когда он построен, примените метод declare
данного объекта обмена для отправки необходимой команды, объявляя
этот обмен в RabbitMQ:
In [6]: # Объявляем свой обмен в сервере RabbitMQ
exchange.declare()
Теперь, когда вы объявили свой обмен, вы можете установить конкретную очередь и связать её с этим обменом. Чтобы выполнить это, вы вначале создаёте
объект Queue
, переходите в соответствующий канал и именуете эту очередь. В нашем следующем примере названием
очереди является example
.
In [7]: # Создаём некий новый объект очереди и переходим в соответствующий канал для его использования
queue = rabbitpy.Queue(channel, 'example')
После создания необходимого объекта и возвращения его экземпляра в качестве значения переменной queue
,
вы можете отправить соответствующую команду Queue.Declare
в RabbitMQ восполбзовавшись методом
declare
. То что вы должны увидеть в строке вывода, это то, что Python получил кортеж структуры данных с
общим числом сообщений в данной очереди и общим числом потребителей для данной очереди. Кортеж является неизменяемым набором объектов Python. В данном
случае ими являются целочисленный значения.
In [8]: # Объявляем значение очереди в сервере RabbitMQ
queue.declare()
Out[8]: (10,0)
Теперь, после создания очереди, вы должны связать её чтобы она получала сообщения. Для связывания данной очереди с имеющимся обменом отправьте команду
Queue.Bind
, вызвав метод bind
объекта своей очереди, передав
соответствующий обмен и ключ маршрутизации. В нашем следующем примере значением ключа маршрутизации является
example-routing-key
. После возврата из исполнения данного элемента вы должны увидеть в своём выводе
True
, указывающим, что данное связывание было успешным.
In [9]: # Связываем свою очередь с имеющимся обменом в нашем сервере RabbitMQ
queue.bind(exchange, 'example-routing-key')
Out[9]: True
чв вашем приложении я рекомендую вам применять семантически подходящие ключевые слова с разделителем точкой для пространства имён ваших ключей маршрутизации. Zen of Python постулирует что "Пространство имён является одной кричащей хорошей идеей - давайте сделаем их больше!" и это верно также и в отношении RabbitMQ. Применяя ключевые слова, разделяемые точкой, вы будете способны осуществлять маршрутизацию на основе шаблонов и подразделов в своих ключах маршрутизации. Вы изучите и это дополнительно в Главе 6.
Совет | |
---|---|
Названия очереди и обмена, как и ключи маршрутизации, могут содержать символы Unicode. |
После того как ваши обмен и очередь связаны, вы можете теперь публиковать проверочные сообщения в RabbitMQ которые будут сохраняться в очереди
example
. Чтобы убедиться что мы можем играть с большим числом сообщений, следующий пример публикует 10 тестовых
сообщений в имеющуюся очередь.
In [10]: # Отправляем 10 сообщений
for message_number in range(0, 10):
message = rabbitpy.Message(channel,
'Test message #%i' % message_number,
{'content_type': 'text/plain'},
opinionated=True)
message.publish(exchange, 'example-routing-key')
Для публикации проверочных сообщений при каждой итерации цикла создаётся новый объект rabbitpy.Message
,
передаются значение канала, тело сообщения и некий словарь свойств сообщения. После того как такое сообщение создано, вызывается метод
publish
, создаётся необходимый кадр метода Basic.Publish
,
кадр заголовка содержимого и один кадр тела, и всё это доставляется в RabbitMQ.
Совет | |
---|---|
Когда вы пишите издателей для своей промышленной среды, применяйте формат последовательных данных, такой как JSON или XML с тем, чтобы ваш потребитель мог легко преобразовывать это сообщение и таким образом они проще читаются в стом случае, когда вы выполняете поиск любых возникающих проблем. |
Теперь вам следует перейти в консоль веб управления своего RabbitMQ и увидеть свои сообщения поставленными в имеющуюся очередь. Откройте свой веб
браузер и посетите UI управления по адресу http://localhost:15672/#/queues/%2F/example (если ваш брокер находится на другой машине, замените
localhost
в указанном URL на название соответствующего сервера). После аутентификации вы должны обнаружить
страницу, подобную экранному снимку на Рисунок 2-15.
Рисунок 2-15
UI веб управления RabbitMQ, отображающий 10 сообщений в очереди упорядоченной обработки
Если вы взгляните в нижнюю часть полученной страницы, вы увидите раздел Get Messages. Если вы изменяете значение поля Messages с 1 до 10 и кликаете по Get Messages, вы должны видеть каждое из имеющихся 10 сообщений, которые вы опубликовали ранее. Убедитесь, что вы оставляете значение поля Requeue установленным в Yes. Это сообщает RabbitMQ добавлять данное сообщение обратно в имеющуюся очередь после того как RabbitMQ выбирает его для отображения в UI управления. Если вы не сделаете этого, не беспокойтесь; просто вернитесь обратно и повторно выполните свой код публикации.
Теперь, когда мы знаем как публиковать сообщения, наступило время для их выборки. Следующий листинг собирает воедино повторяющиеся, однако импортируемые элементы соединения из данного публикуемого кода, обсуждаемого в данной главе, который позволяет вам получать сообщения от RabbitMQ. Этот код находится в закладке записной книжки "2.5 Basic.Get Example". Этот блокнот имеет шесть элементов в нём при использовании интерфейса записной книжки IPython. Вы можете кликнут по ниспадающему меню Cell и затем Run All вместо исполнения каждого элемента по отдельности, как это было в нашем предыдущем примере.
#!/usr/bin/env python
# Импорт клиентской библиотеки RabbitMQ
import rabbitpy
# URL для соединения
url = 'amqp://guest:guest@localhost:5672/%2F'
# Открытие соединения RabbitMQ
connection = rabbitpy.Connection(url) # Создание нового объекта соединения, подключение к RabbitMQ
# открытие канала для взаимодействия с RabbitMQ
channel = connection.channel() # Открытие канала для взаимодействия
# Создание объекта для взаимодействия с очередью
queue = rabbitpy.Queue(channel, 'example') # Создание нового объекта очереди для получения сообщений
# Пока в данной очереди имеются сообщения, идёт его опрос с применением Basic.Get
while len(queue) > 0: # цикл до тех пор, пока в очереди имеются сообщения
message = queue.get() # выборка очередного сообщения
print('Message:')
print(' ID: %s' % message.properties['message_id']) # Получение сообщения из обрабатываемой очереди
print(' Time: %s' % message.properties['timestamp'].isoformat()) # Печать штампа времени свойства отформатированного в виде штампа времени ISO 8601
print(' Body: %s' % message.body) # Печать тела сообщения
message.ack() # Квитанция подтверждения приёма данного сообщения в RabbitMQ
После набора текста и его исполнения предыдущего кода потребителя вы должны увидеть все 10 сообщений, которые вы опубликовали перед этим. Если вы
рассмотрите его внимательнее,вы можете заметить, что хотя вы не определили свойства message_id
или
timestamp
при публикации этих сообщений, каждое сообщение печатающееся из потребителя сообщение его имеет.
Наша клиентская библиотека rabbitpy будет автоматически заполнять эти свойства для вас если вы их не заполнили. Кроме того, если бы вы отправили Python
dict
в виде определённого сообщения, rabbitpy будет автоматически упорядочивать данные в виде JSON и устанавливать
набор соответствующих свойств content-type в виде application/json
.
Спецификация AMQP 0.9.1 определяет некий протокол взаимодействия, который использует команды в стиле RPC для поддержки общения межу клиентом и сервером RabbitMQ. Теперь, когда вы знаете как эти команды составляют кадры и как работает протокол, вы должны быть лучше снабжены для создания приложений взаимодействия с RabbitMQ по публикации и потреблению сообщений, а также поиска в них неисправностей. Многие приложения содержат немногим больше кода, чем мы уже реализовали для работы с вашим экземпляром RabbitMQ.
В нашей следующей главе мы изучим дополнительные подробности о применении свойств сообщения, которые позволяют вашим издателю и потребителю применять некий общий контракт для тех сообщений, которыми обмениваются ваши приложения.