Глава 2. Создание приложения Такси

В повседневной беседе люди приветствуют друг друга, обмениваются подшучиванием, а затем, в конце концов, завершают свой разговор и продолжают свой путь. Соединения TCP на нижнем уровне работаю таким же образом с облегчёнными каналами RabbitMQ. Собирающимся обмениваться через RabbitMQ сообщениями приложениям необходимо установить постоянное соединение с брокером сообщений. Когда это соединение установлено, требуется создать канал чтобы можно выполнять ориентированные на сообщения взаимодействия, такие как публикация и потребление сообщений.

После демонстрации этих основ данная глава рассмотрит то как брокер применяет обмены (exchanges) для определения того куда должно быть доставлено каждое сообщение. Некий обмен подобен почтальону: он доставляет сообщения своим надлежащим очередям (почтовым ящикам) для потребителей чтобы найти позднее.

Базовые понятия abbitMQ отображены на следующей схеме:

 

Рисунок 2-1


Взаимодействие запрос- отклик между клиентом и его сервером

К концу этой главы вы будете обладать основательным пониманием стоящей за нашей платформой Complete Car (CC) архитектурой приложения и как оно отправляет самое первое своё сообщение через RabbitMQ. Это требует введения двух различных типов обменов: непосредственный обмен, который доставляет сообщения в отдельную очередь и тематический обмен, который доставляет сообщения во множество очередей на основании ключей маршрутизации соответствия шаблону.

Для наилучшего возможного старта рассматриваются такие темы:

  • Архитектура стоящего за CC приложения

  • Установление соединения с RabitMQ

  • Отправка самых первых сообщений

  • Добавление тематических сообщений

Давайте приступим!

Технические требования

Файлы кода для этой главы можно отыскать на GitHub.

Стоящая за CC архитектура приложения

СС требуется одно приложение, которое применяется водителями их такси, и одно, которое используется потребителем. Такой потребитель должен быть способным запрашивать такси через своё приложение, а водитель такси должен иметь возможность принимать запрос (поездку):

 

Рисунок 2-2


Пользовательские запросы такси через приложение CC

Потребитель должен иметь возможность вводить сведения относительно начальной и конечной точек своей поездки. Активные водители получают соответствующие запросы и имеют возможность принимать их. Этот потребитель, в конце концов, должен обладать возможностью отслеживать местонахождение такси во время своей поездки.

Следующая схема отображает архитектуру обмена сообщениями, которую желает достичь CC.

 

Рисунок 2-3


Архитектура основного приложения CC

Этот поток можно пояснить 10 этапами, которые выделены на предыдущей схеме:

  1. Пользователь применяет мобильное приложение CC для бронирования такси. Запрос теперь отправляется из этого мобильного приложения в надлежащую Службу приложения (Application Service). Это запрос содержит сведения относительно той поездки, которую жедает забронировать этот потребитель.

  2. Служба приложения сохраняет этот запрос в базе данных.

  3. Служба приложения добавляет сообщения со сведениями относительно такой поездки в очерель в RabbitMQ.

  4. Подключённые машины такси подписаны на такие сообщения (запросы бронирования).

  5. Такси отвечают своему потребителю отправляя сообщение обратно в RabbitMQ.

  6. Служба приложения подписывается на такие сообщения.

  7. И снова, Служба приложения сохраняет эти сведения в базе данных.

  8. Служба приложения пересылает эти сведения своему потребителю.

  9. Прикладное приложение такси начинает автоматически отправлять географическое положение своего такси через заданные интервалы времени в RabitMQ.

  10. Местоположение такси затем передаётся непосредственно мобильному приложению соответствующего потребителя, причём через WebSocket, с тем, чтобы они знали когда появится данное такси.

Давайте начнём обсуждать более пристально рассматривать шаги 1, 2, 3 и 4, отображённые на нашей предыдущей схеме, когда пользователь запрашивает такси (сообщение публикуется в RabbitMQ), а водитель такси получает это сообщение (сообщение потребляется из RabbitMQ).

Установка устойчивого подключения к RabbitMQ

Как уже упоминалось в Главе 1, Как оживает Rabbit, должно быть установлено физическое сетевое соединение между нашим сервером приложения и RabbitMQ. Соединение Advanced Message Queuing Protocol (AMQP) является связью между соответствующими клиентом и брокером, которое выполняет лежащие в основе сетевые задачи,включая начальную аутентификацию, разрешение IP и построение сетевой среды:

 

Рисунок 2-4


Соединение AMQP между самим приложением и RabitMQ

Каждое соединение AMQP поддерживает некий набор подлежащих каналов. Канал повторно применяет соединение, воздерживаясь от потребности повторной авторизации и открытия новых потоков TCP, делая их более эффективными в отношении ресурсов.

Следующая схема иллюстрирует некий канал внутри соединения между приложением и RabbitMQ.

 

Рисунок 2-5


Каналы позволяют вам более действенно использовать ресурсы

В отличии от создания каналов, создание соединений является затратной операцией, во много схожей с тем как это происходит с соединением базы даннх. Обычно соединения баз данных объединяются в пулы, в которых каждый экземпляр пула используется в отдельном исполняемом потоке. AMQP отличается в отношении того что отдельное соединение может применяться многими потоками через множество мультиплексных каналов.

Процесс рукопожатия для некого соединения AMQP требует по крайней мере семи пакетов TCP, и даже больше когда применяется TLS. Каналы могут открываться и закрываться более часто, если это необходимо:

  • AMQP соединения: 7 TCP пакетов

  • AMQP канал: 2 TCP пакета

  • AMQP излание: 1 TCP пакет (больше для более длинных сообщений)

  • Закрытие канала AMQP: 2 TCP пакета

  • Закрытие соединения AMQP: 2 TCP пакета

  • Итого 14- 19 пакетов (плюс Acks)

Следующая схем иллюстрирует обзор тех сведений, которые отправляются для соедтнений и каналов:

 

Рисунок 2-6


Процесс рукопожатия для некого соединения AMQP

Установление отдельного соединения длительным временем жизни между Службой приложения и RabbitMQ является хорошим нчалом.

Необходимо принять некое решение в отношении того какой язык программирования и библиотеку клиент применять. Самые первые примеры в этой книге написаны на Ruby, а также для публикации и потребления сообщений применяется клиентская библиотек Bunny (https://github.com/ruby-amqp/bunny). Ruby является простым языком для чтения и понимания, даже когда вы и не знакомы с ним.

Наше приложение должно быть настроено на использование определённых конечных точек соединения, часто ссылаясь на него как на строку соединения; например, некий хост или порт. Такая строка соединения содержит сведения, требующиеся для способности к установлению соединения. Выделяемым AMQP порт обладает номером 5672. Через AMQPS можно применять шифрованные TLS/SSL AMQP; это безопасная версия протокола самого протокола AMQP, которой выделяется порт 5671.

Данная библиотека является тем элементом, который открывает необходимое соединение TCP с целевыми адресом IP и портом. Необходимые параметры соединения добавляются в виде строки URI в некую переменную среды для кодирования с названием RABBITMQ_URI. Не существует никакого стандарта для URI AMQP, но данный формат широко применяется:


RABBITMQ_URI="amqp://user:password@host/vhost"
 	   

Согласно документации Ruby (Bunny), соединение с RabbitMQ является простым. Сам код для этого подразделяется на блоки кода и его можно найти далее в этой главе:

  1. Добавьте имя пользователя, значение пароля и тот vhost, который был настроен в Главе 1, Как оживает Rabbit, а затем добавьте эту строку в переменную среды в данной машине:

    
    RABBITMQ_URI="amqp://cc-dev:taxi123@localhost/cc-dev-vhost"
     	   
  2. Затребуйте библиотеку клиента bunny:

    
    # Require client library
    require "bunny"
     	   
  3. Считайте соединение URI из своей переменной среды и стартуйте соединение:

    
    connection = Bunny.new ENV['RABBITMQ_URI']
    # Start a session with RabbitMQ
    connection.start
     	   

Всё это выглядит достаточно прямолинейным до сих пор, однако CC требует кода промышленного уровня, который способен аккуратно обрабатывать отказы. Что если RabbitMQ не работает? Ясно, что это плохо когда всё это приложение падает. Что если RabbitMQ нуждается в посторном запуске? CC желает чтобы его приложения аккуратно восстанавливались в случае возникновения любых проблем. На самом деле CC хочет чтобы его приложение продолжало работать вне зависимости от того будет ли вся система обмена сообщениями работать целиком или нет. Практика пользователей обязана быть гладкой и простой для понимания, а также и надёжной.

Сумируя, поведение CC желает достичь следующего:

  • Если соединение с RabbitMQ утрачено, оно должно восстанавливаться самостоятельно.

  • Когда соединение упало, отказ в отправке и выборке ообщений должен происходить аккуратно.

Когла наше приложение соединяется со своим брокером, ему требуется обрабатывать отказы. Никакая сетевая среда не является надёжной всё время и происходят неверные настройки и ошибки; наш брокер может отказывать и тому подобное. Не обладая автоматизацией, в таком случае, на ранней стации в этом процесс должно происходить выявление ошибок.

Для обработки отказов TCP соединений в Bunny, необходимо перехватывать такую исключительную ситуацию:


begin
  connection = Bunny.new ENV['RABBITMQ_URI']
  connection.start
rescue Bunny::TCPConnectionFailed => e
  puts "Connection to server failed"
end
 	   

Выявление отказов сетевого соединения является почти бесполезным когда некое приложение не имеет возможности его восстановления. Восстановление является важной частью обработки ошибок.

Некоторые клиентские библиотеки предлагают функциональные возможности автоматического восстановления, которые включают и восстановление потребителя. Все операции, попытки которых происходят в некотором закрытом канале завершатся отказом за одним исключением. Когда Bunny выявляет отказ соединения TCP, он предпринимает попытку каждые 5 секунд без ограничения в отношении общего числа попыток подключений. Имеется возможность запрета автоматического восстановления путём добавления automatic_recovery => false в Bunny.new. Эта настройка должна применяться только когда вы восстанавливаете соединение неким иным образом или при проверке самой строки соединения.

[Замечание]Замечание

Сообщения могут отправляться между языками программирования, платформами и операционными системами. Вы можете вибирать из некого числа различных библиотек клиентов для различных языков программирования. Существует большое число выпущенных библиотек клиентов, однако вот те некоторые которые рекомендуется применять:

  • Python: Pika

  • Node.js: amqplib

  • PHP: php-amqplib

  • Java: amqp-client

  • Clojure: Langohr

Этот раздел показал как CC управляет установлением некого соединения с RabbitMQ. Мы продемонстрировали почему рекомендуются соединения с длительным временем жизни и как обрабатывать некоторые распространённые ошибки. Теперь настало время создания некого канала внутри самого соединения.

Работа с каналами

Все относящиеся к протоколу AMQP операции происходят поверх некого канала. Конкретный экземпляры канала создаются соответственным экземпляром соединения. Как уже описывалось, некий канал является виртуальным (AMQP) соединением внутри своего подключение TCP. Все выполняемые клиентом операции происходят в неком канале, очереди обпределяются в каналах, а сообщения отправляются поверх каналов.

Некий канал никогда не существует сам в себе; он всегда пребывает в контексте некого соединения.


# Declare a channel
channel = connection.create_channel
 	   

Каналы в неком соединении закрываются после закрытия соответствующего соединения или когда происходит ошибка канала. Клиентские библиотеки позволяют нам обозревать и реагировать на исключительные ситуации каналов.

На уровне канала обычно возбуждается больше исключительных ситуаций (exceptions), нежели на уровне соединения. Исключительные ситуации уровня канала часто указывают на ошибки, которые само приложение способно исправить, например, когда у него нет полномочий или попытка выполнения операции в закрытом канале также завершится ошибкой с некой исключительной ситуацией.

[Совет]Совет

Даже хотя экземпляры канала технически являются сохраняющими поток, настоятельно рекомендуется избегать обладания различными потоками, которые применяют один и то же канал одновременно.

CC теперь способен подключаться к брокеру RabbitMQ, открывать канал и выдавать последовательность команд, причём все сохраняющим поток и сохраняющим исключительную ситуацию образом. Теперь настало время сборки на этой основе!

Построение инструментария запроса такси

Теперь пришёл момент для сборки потока сообщений.

Вначале наш потребитель отправит простой запрос HTTP из своего мобильного приложения с Службу приложения. Это сообщение будет содержать мета- сведения, такие как некую временную метку, идентификаторы отправителя и получателя, а также идентификаторы места назначения и запрошенного такси.

Собственно поток будет выглядит как-то так:

 

Рисунок 2-7


Интерфейс/ основа взаимодействия основного приложения CC

Служба приложения сохраняет эти сведения в базе данных с тем, чтобы все данные становились видимыми для сценариев анализа этих данных в последующем состоянии.

[Замечание]Замечание

В этих примерах не пассматривается как эта информация хранится в базе данных, поскольку это не основной рассматриваемый в этой главе прецедент. Самый простой метод будет заключаться в разрешении Службе приложения добавлять эти сведения в имеющуюся базу данных. Другой вариант состоит в разгрузке Службы приложения и помещении новых сообщений в очередь сообщений между пазой данных и Службой приложений, а также позволить другой службе подписаться на такие сообщения и обрабатывать их, то есть сохранять в соответствующей базе данных.

Такой поток между соответствующими мобильным устройством, Службой приложения и RabbitMQ иллюстрируется на схеме ниже:

 

Рисунок 2-8


Поток между соответствующими мобильным устройством, Службой приложения и RabbitMQ

Что касается нашего основного потока, обсуждение AMQP в Главе 1, Как оживает Rabbit подробно описывает как сообщения публикуются в обмене после перенаправления в очереди для их потребления.

Стратегия маршрутизации определяет очередь (или очереди), в которые выполняется направление этого сообщения. Установленная стратегия маршрутизации основывается на её решении по некому ключу маршрутизации (строке в свободной форме) и, потенциально, на мета- сведениях сообщения. Представляйте себе такой ключ маршрутизации в качестве некого адреса, который обмен применяет для определения того как следует выполнять маршрутизацию этого сообщения. Также требуется привязка между неким обменом и соответствующей очередью для разрешения некому сообщению протекать от первого к последнему.

Теперь давайте изучим непосредственный обмен.

Обмен напрямую

Прямой обмен доставляет сообщения в очереди на основе некого ключа маршрутизации сообщения. Сообщение следует в ту очередь (очереди), чей ключ маршрутизации привязки соответствует ключу маршрутизации самого сообщения.

CC обладает лишь двумя машинами, поэтому он начинает с некой простой системой взаимодействия, в которой один потребитель способен запросить такси одного из водителей. В этом случае требуется направить одно сообщение в ту очерель, которая действует в качестве Входящей одного из водителей. Таким образом, стратегия маршрутизации обмена, которая будет применяться является непосредственным обменом, соответствием названия очереди назначения ключу маршрутизации, применяемому при производстве этого сообщения, что иллюстрируется следующей схемой:

 

Рисунок 2-9


Прямой обмен направляет сообщения в конкретные очереди

Некий пример варианта прямого обмена может быть таким:

  1. Наш потребитель заказывает такси с названием taxi.1. HTTP запрос отправляется из мобильного приложения этого потребителя в нашу Службу приложения.

  2. Служба приложения отправляет некое сообщение в RabbitMQ с неким ключом маршрутизации, taxi.1. Этот ключ маршрутизации сообщения соответствует названию существующей очереди, а потому данное сообщение, в конце концов, доставляется в такую очередь taxi.1.

Схема ниже демонстрирует как бы происходила маршрутизация сообщения в таком непосредственном обмене:

 

Рисунок 2-10


Прямой обмен направляет сообщения в конкретные очереди на основе значения ключа маршрутизации

Это может оказаться не самым действенным подходом для масштабирования. В действительности он будет пересмотрен как только CC получит больше машин, однако это самый простой способ для того чтобы начать и быстро запустить это приложение.

Давате последуем созданию первого кода CC в качестве своего первоначального приложения и в то же самое время изучим определённые различные понятия. Наш код в самом начале блока кода был взят из соответствующего раздела соединения и канала:

  1. Затребовать библиотеку bunny.

  2. Считать значение URI соединения из соответствующей переменной среды и запустить соединение.

  3. Запустить сеанс взаимодействия с RabbitMQ.

  4. Определить очередь taxi.1.

  5. Задать прямой обмен taxi.1

  6. Привязать нау очередь taxi.1 к имеющумуся обмену taxi-direct со значением ключа варшрутизации taxi.1:

    
    # 1. Require client library
    require "bunny"
    
    # 2. Read RABBITMQ_URI from ENV
    connection = Bunny.new ENV["'RABBITMQ_URI"]
    
    # 3. Start a communication session with RabbitMQ
    connection.start
    channel = connection.create_channel
    
    def on_start(channel)
     # 4. Declare a queue for a given taxi
     queue = channel.queue("taxi.1", durable: true)
     # 5. Declare a direct exchange, taxi-direct
     exchange = channel.direct("taxi-direct", durable: true, auto_delete: true)
    
     # 6. Bind the queue to the exchange
     queue.bind(exchange, routing_key: "taxi.1")
    
     # 7. Return the exchange
     exchange
    end
    
    exchange = on_start(channel)
    		

Определять очереди и обмены для всякого отправляемого сообщения слегка расточительно и не является необходимым, поэтому настоятельно рекомендуется создать некий метод, который обрабатывает такую настройку нашего приложения. Это должен быть метод, который создаёт соединение и объединяет очереди, обмены и тому подобное. В данном примере метод просто именуется on_start, и он обявляет необходимую очередь и привязывает к этой очереди некий обмен.

Если такой обмен не существует при публикации в него чего бы то ни было, это возбудит исключительные ситуации. Когда такой обмен уже имеется, она не будет делать ничего; в противном слычае она в действительности создаст его. Именно поэтому безопасно объявлять очереди всякий раз когда запускается наше приложение или перед публикацией некого сообщения.

[Совет]Совет

Каналы уничтожаются исключительными ситуациями. В случае CC, отправка в некий не существующий обмен не только возбудит некую исключительную ситуацию, но также прекратило бы тот канал, в котором произошла ошибка. Всякий последующий код, который попытается применять прекращённый канал таже будет завершаться отказом.

Дополнительно к использованию этого непосредственного типа, CC также настроил свойства обмена аргументы типа durable, autoDelete и arguments. Данный обмен не должен пропадать ни после перезапуска RabbitMQ, ни когда он не применяется, что и поясняют применяемые в данной настройке значения.

[Совет]Совет

Некое объявление обмена является идемпотентным только тогда, когда совпадают свойства такого обмена. Попытки задать уже существующий обмен с иными свойствами завершится неудачей. Всегда применяйте согласованные свойства в неком объявлении обмена. Когда вы вносите изменения в соответствующие свойства, удалите этот обмен, прежде чем объявляет его с новыми свойствами. То же самое правило применяется и к объявлению очереди.

После создания необходимого обмена, для привязки к нему создаётся соответствующая очередь такси.

Такая очередь создаётся аналогичным обмену подходом, но со слегка иными свойствами, а именно:

  • durable: True - эта очередь должна оставаться объявленной даже после перезапуска брокера.

  • autoDelete: False - оставлять очерель, даже когда она более ничего не потребляет.

  • exclusive: False - эта очередь должна быть способной потребляться прочими соединениями (несколько серверов приложений могут быть подключены к RabbitMQ и иметь доступ из различных соединений).

  • arguments: Null - нет необходимости индивидуальной настройки этой очереди.

Такая очередь привязывается к соответствующему обмену при помощи собственного имени в качестве значения ключа маршрутизации с тем, чтобы наша стратегия непосредственной маршрутизации была способна направлять в неё сообщения. Когда это сделано, публикуемые в обмене taxi-direct сообщения будут действительно доставлять сообщения в надлежащую очередь такси, чьё название соответствует значению публикуемого ключа маршрутизации.

[Замечание]Замечание

Когда к некому обменом не привязана никакая очередь, или если его стратегия маршрутизации не может отыскать соответствующую очередь назначения, то сообщение , которое было опубликовано в данном обмене будет втихую отвергнуто. В качетсве варианта имеется возможность выполнения уведомлений при отклонении не перенаправленных сообщений, как это показывается в последующих главах.

И снова, когда применяются те же самые свойства, такие операции будут идемпотентными, а потому наши очереди могут безопасно объявляться и привязываться к конкретному обмену снова и снова.

Хотя в этой главе был описан непосредственный обмен, брокеры AMQP 0-9-1 предоставляют четыре различных типа обменов. В зависимости от привязок, настраиваемых вами между очередями, а также параметров, такие обмены направляют сообщения по разному. Наши последующие главы пристальнее рассмотрят такие прочие типы обменов. А пока вот краткое пояснение каждого из них:

  • Fanout (веерный): Сообщения направляются всем привязанным к такому веерному обмену очередям.

  • Topic (тематический): Групповые символы обязаны устанавливать соответствие между значением ключа маршрутизации и конкретным шаблоном маршрутизации привязки.

  • Headers (заголовки): Для маршрутизации используются атрибуты заголовка самого сообщения.

Теперь пришло время отправить наше самое первое сообщение в RabbitMQ!

Отправка самого первого сообщения

Уже были рассмотрены базовые понятия и начальная установка, а потому давайте приступим и отправим своё первое сообщение!

Прежде всего давайте рассмотрим метод order_taxi, который отвечает за отправку сообщений для начального запроса такси:


def order_taxi(taxi, exchange)
  payload = "example-message"
  message_id = rand
 exchange.publish(payload,
    routing_key: taxi,
    content_type: "application/json",
    content_encoding: "UTF-8",
    persistent: true,
    message_id: message_id)
end

exchange = on_start(channel)
order_taxi("taxi.1", exchange)
 	   

order_taxi намерен подвергаться вызову всякий раз когда некмй пользователь желает заказать такси. Нет никакой гарантии что этот адресат уже входил в нашу систему когда- либо, поэтому, что касается отправителя, невозможно быть уверенным что очередь назначения уже имеется. Самый безопасный способ - объявлять очерель для каждого отправляемого сообщения, подразумевая, что такая операция дакларирования идемпотентна, поэтому она ничего не делает когда такая очередь уже присутствует. На первых порах это может казаться странным, однако отвественность за присутствие очереди адресата лежит на самом отправителе, раз он хочет быть уверенным, что его сообщение не будет утрачено.

[Замечание]Замечание

Именно это является распространённой схемой для AMQP, когда между событиями нет строгой взаимосвязи произошло- до. Для движения в правильном направлении имеется повторное объявление. И наоборот, схема проверь- затем- действуй не одобряется; попытка проверки предварительного наличия некого обмена не гарантирует успех в обычных распределённых средах, в которых используется AMQP.

Сам метод для публикации сообщения очень прост; вызываем publish в сторону соответствующего publish. Затем применяем нужное имя очереди в качестве значения ключа маршрутизации (в случае использования непосредственной маршрутизации) и некий массив байтов, который и представляет реальную полезную нагрузку сообщения. Имеется возможность добавления некоторых необязательных свойств сообщения, которые могут содержать следующее:

  • content_type (string): Сообщение публикуется и потребляется в виде некого массива байт, однако ничто в реальности не сообщает о том, что представляют эти байты. При данных обстоятельствах и издатель, и потребитель находятся в одной и той же системе, а потому они могут предполагать, что ожидается значение типа содержимого. При этом всегда указывайте тип содержимого, чтобы такие сообщения были самодостаточными; вне зависимости от того какая именно система в конечном итоге получит или проанализирует сообщение, она точно знает что представляет содержащийся в нём массив данных.

  • content_encoding (string): при упоядочивании строковых сообений в массив байт используется особая кодировка (UTF-8) с тем, чтобы их можно было бы публиковать. И снова, чтобы такое сообщение самостоятельно себя описывало, предоставьте все необходимые метасведения, чтобы его можно было бы прочитать.

  • message_id (string): Как будет показано позднее в этой книге, идентификаторы сообщения являются важным моментом возможности отслеживания при обмене сообщениями и в распределённых приложениях. В данном примере вырабатывается произвольный идентификатор сообщения.

  • persistent (boolean): Определяет следует ли данной сообщение сохранять на постоянной основе на диск или нет.

[Совет]Совет

Не путайте надёжность (durable) обмена и очереди с постоянством (persistence); хранящиеся в надёжной очереди не постоянные сообщения будут удалены после перезапуска брокера, оставляя вам пустую очередь.

Кроме того, постоянные сообщения в не постоянной очереди исчезнут после перезапуска брокера, что также оставит вас с пустой очередью..

Предоставьте р=гарантию от утраты очереди объявляя такую очередь как durable (надёжную) и устанавливая режим доставки соответствующего сообщения в persistent (постоянный).

Однако что происходит когда отказывает отправка определённого сообщения, скажем, если разрывается соединение с RabbitMQ?

[Замечание]Замечание

Зачем вообще применять режим не постоянной доставки? Разве не весь смысл некого брокера сообщений, такого как RabbitMQ состоит в том, чтобы обеспечивать отстутствие потерь сообщений? Это так, но имеются обстоятельства, при которых такая гарантия может быть ослаблена. Рассмотрим ситуацию, при которой издатель засыпает своего брокера большим числов не критически важных сообщений. Применение в таком случае не постоянной доставки означало бы, что RabbitMQ не потребуется постоянно обращаться к диску, что обеспечит при данных обстоятельствах лучшую приизводительность.

Прежде чем двинуться дальше, давайте взглянем на структуру некого сообщения AMQP.

Структура сообщения AMQP

Следующий снимок экрана иллюстрирует собственно структуру некого сообщения AMQP и содержит четыре только что применённых свойства сообщения AMQP, плюс несколько новых. Обратите внимание, что эта схема использует названия полей самой спецификации, в то время как реализация каждого языка программирования их слегка переименовывает с тем чтоб они оказывались допустимыми именами. К примеру content-type превращается в contentType в Java и в content_type в Ruby:

 

Рисунок 2-11


Свойства сообщения AMQP

За исключением reserved, все эти свойства свободны для применения, пока не предписано обратное, игнорируются брокером AMQP. В случае RabbitMQ единственным поддерживаемым этим брокером полем выступет поле user-id, которое проверяется для того, чтобы убедиться что оно соответствует имени пользователя самого установившего это соединение брокера. Обратите внимание как свойство headers позволяет нам добавлять дополнительные пары ключ- значение в случае, когда ни одно из стандартных свойств не соответствует нашим требованиям.

Наш следующий раздел поясняет как потребляются сообщения.

Потребление сообщений

Теперь давайте переключим своё внимание на тот метод, которые отвечает за выборку сообщений, которой выступает шаг 4 в основной архитектуре СС и который можно найти в разделе Стоящая за CC архитектура приложения.

Здесь наше приложение такси может проверять свою очередь на предмет новых сообщений через регулярный промежуток времени. Это носит название синхронного подхода. Он подразумевало бы закрепление за самим потоком приложения ответственность за обработку опрашивающих звпросов до тех пор, пока все ожидающие сообщения не будут удалены из данной очереди, как это проиллюстрировано на следующей схеме:

 

Рисунок 2-12


Запрашивающий у брокера новые сообщения клиент

Некий интерфейс регулярно опрашивает свой сервер на предмет сообщений и это вскорости начнёт сказываться на загрузке, что, в свою очередь, означает, что начнёт страдать от снижения производительности всё решений целиком.

Вместо этого CC принимает решение собирать своё решение в угоду подходу активной доставки сервером. Основная идея состоит в активной доставке сервером сообщений своим клиентом от их брокера. К счастью, RabbitMQ предлагает два способа получения сообщений: имеется метод на основе опросов basic.get и метод на основе активной доставки basic.consume. Как это показывается на схеме внизу, сообщения активно доставляются своему потребителю:

 

Рисунок 2-13


Подписка потребителя у брокера на сообщения

Соответствующий метод subscribe добавляет некого потребителя в очередь, который затем подписан на получение доставки сообщений.

[Совет]Совет

Проверяйте что ваш клиент потребляет ообщения из своей очереди вместо того чтобы применять базовое действие GET. Команда basic.get сравнительно затратная когда речь заходит о ресурсах.

При помощи subscribe необходимые сообщения доставляются к смому клиенту от его брокера когда новые сообщения готовы и сам клиент имеет возможность. Это делает возможным, обычно, гладкую обработку сообщений. Кроме того, применение subscribe означает что потребитель подключён до тех пор, пока доступен сам канал, в котором он был определён или пока сам клиент не прекратит его.

Сам процесс обработки обмена сообщениями проходит гладко и без усилий, как будто ничто и не происходит! Естественно, это длится до тех пор, пока не будут приведены в действие предупреждения для подтверждений и/ или отказов того, была ли некая часть этого процесса выполенена как ожидалось, или же не так как была спланирована.

Подтверждение и отрицательное подтверждение

Для RabbitMQ требуется знать когда некое сообщение может рассматриваться как успешное в плане того, что оно было отправлено его потребителю как ожидалось. Сам брокер затем должен удалять сообщения из своей очереди раз этот брокер получил надлежащий отклик; в противном случае такая очередь переполнялась бы. Соответствующий клиент может откликаться своему брокеру либо ack (acknowledge, подтверждением приёма) этого сообщения, когда он получил его или когда сам потребитель полностью обработал данное сообщение. В каждой из этих ситуаций, после того как на это сообщение получен ack, оно удаляется из своей очереди.

Следовательно, сам потребитель обязан подтверждать приём сообщения тогда и только тогда, когда выполнена его обработка, или же он уверен что отсутствует риск его утраты, когда оно обрабатывается асинхронно.

Во избежание обстоятельств, прикоторых некое сообщение может быть навечно урачено (к примеру, при крушенрр обработчика, исключительной ситуации и т.п.), это потребляющее приложение должно отвечать отсутствием приёма сообщения (nack) до тех пор, пока оно окончательно не завершит с ним.

Некое сообщение отвергается приложением когда это приложение указывает своему брокеру что обработка отказала или не может быть закончена вовремя. Nack (negative-acknowledge) или отсутствие приёма сообщения, по умолчанию, отправляется обратно в свою очередь для другой попытки.

Более подробно мы рассмотрим подтверждения в Главе 3, Отправка сообщений множеству водителей такси.

Готовы? Настроили всё? Время рвануть, Rubbit!

Запуск кода

Теперь настал момент для настройки некого кода нашего потребителя. Вы бкдете способны распознать основную часть этого кода по нашему предыдкщему разделу, Отправка самого первого сообщения:

  1. Затребовать библиотеку клиента.

  2. Считать значение RABBITMQ_URI из ENV.

  3. Запустить сеанс взаимодействия с RabbitMQ.

  4. Определить очередь для заданного такси.

  5. Задать прямой обмен taxi-direct.

  6. Привязать нау очередь с надлежащим обменом.

  7. Подписаться на необходимую очередь.

Далее приводится код, который требуется для установки нашего первоначального потребителя:


# 1. Require client library
require "bunny"

# 2. Read RABBITMQ_URI from ENV
connection = Bunny.new ENV["RABBITMQ_URI"]

# 3. Start a communication session with RabbitMQ
connection.start
channel = connection.create_channel

# Method for the processing
def process_order(info)

  puts "Handling taxi order"
  puts info
  sleep 5.0
  puts "Processing done"
end

def taxi_subscribe(channel, taxi)
  # 4. Declare a queue for a given taxi
  queue = channel.queue(taxi, durable: true)

  # 5. Declare a direct exchange, taxi-direct
  exchange = channel.direct("taxi-direct", durable: true, auto_delete: true)

  # 6. Bind the queue to the exchange
  queue.bind(exchange, routing_key: taxi)

  # 7. Subscribe from the queue
  queue.subscribe(block: true, manual_ack: false) do |delivery_info, properties, payload|
    process_order(payload)
  end
end

taxi = "taxi.1"
taxi_subscribe(channel, taxi)
		

Здесь были добавлены два флага в наш метод subscribe, которые следует пояснить. Давайте рассмотрим их подробнее:

  • block (Boolean, по умолчанию false): Должен ли этот вызов блокировать вызываемый им поток? Этот вариант может быть полезен для поддержки активности основного потока сценария. Он не совместим с автоматическим восстановлением соединения и обчно не рекомендуется.

  • manual_ack (Boolean, по умолчанию false): В случае CC, поскольку риск потери сообщения на протяжении этой фазы приемлем, наша система не выполняет вручную подтверждения сообщений. Вместо этого она информирует своего брокера рассматривать их как подтверждённые до тех пор, пока она выполняет их выборку (дополнительно о подтверждении вручную позднее в этой книге).

И это всё! CC теперь обладает работающим Входящим для запросов заказов чтобы проверить его. Далее мы рассмотрим свою консоль управления при активации работы такси.

Исполнение приложения

При запущенном приложении и подключённом к RabbitMQ сервером в нашей консоли управления можно увидеть такие установленные соединения:

 

Рисунок 2-14


Консоль управления предоставляет сведения соединения

Обратите внимани как ясно представлены восходящая и нисходящая пропускные полосы сетевой среды, а то, что наши каналы каналы, которые открываются и закрываются очень быстро, тяжело обнаруживать в этой консоли упралвения. Поэтому давайте рассмотрим следующие обмены:

 

Рисунок 2-15


Отображемый в консоли управления прямой обмен taxi-direct

Обмен самого пользователя и скорость приходящих и исходящих сообщений отражаются в нашей консоли управления. Тот факт состоит в том, что они потреблялись настолько же быстро, насколько они поступали, это хороший знак что нашей текущей архитектуры достаточно для потребностей CC и что сообщения не нагромождаются. Но что это за все эти прочие обмены, которые не были созданы при помощи кода и откуда они появляются? Этот безымянный обмен представленный (по установленным в AMQP умолчаниям) как и все прочие обмены, с начинающимися на amq. определяются самой спецификацией AMQP и, раз так, должны предоставляться по умолчанию RabitMQ. Теперь, что относительно очередей? давайте взглянем:

 

Рисунок 2-16


В консоли управления видны все Входящие client-to-taxi

Как и ожидалось, имеется по одной очереди на такси и некоторые стильные статистики загруженности. Обратите внимание, что колонка ack, что не является сюрпризом, учитывая как работают подтверждения получения сообщений. Наша очередь получает сообщения, в то же время позволяя RabitMQ знать что она не получает подтверждений от них, а потому и нет никакой активности, относящейся к подтверждениям получения сообщений.

[Совет]Совет

При достаточном объёме ОЗУ RabbitMQ может иметь дело с сотнями очередей и привязок без проблем, а потому множество очередей это не проблема.

Уверенная в своей архитектуре и реализации CC развёртывает подсистему заказа такси со стороны клиента. Клиент имеет возможность отправлять запрос, а такси способно обрабатывать этот запрос.

CC быстро расширяет свою компанию двумя новыми экологически чистыми автомобилями. Как и в предыдущем решении, клиенту необходимо отправить сообщение с запросом заказа конкретному водителю. Теперь была запрошена новая функциональная возможность -способность отправлять некое сообщение группе машин такси. Клиенты должны иметь возможность выбирать обычное такси или экологически чистое такси. Давайте рассмотрим как CC реализует эту новую функциональную возможность, воспользовавшись силой RabbitMQ.

Добавление тематических сообщений

Приложение CC позволяет их такси организовываться самостоятельно в группы через их регистрацию по тематическим интересам. Эта новая подлежащая развёртыванию функциональная возможность позволит клиентам отправлять запросы на заказ всем такси в определённой теме. Оказывается эта функциональная возможность соответствует конкретному правилу маршрутизации обмена, которое, как это не удивительно, носит название темы! Этот тип обмена позволяет нам направлять сообщения во все очереди, которые были привязаны к ключу маршрутизации, соответствующему собственно ключ маршрутизации самого сообщения. Таким образом, в отличии от прямого обмена, который максимально направляет некое сообщение в одну очередь, тематический обмен способен отправлять их во множество очередей. Двумя другими примерами того, где может применяться тематическая маршрутизация, выступают сведения о местоположении, такие как трансляции предупреждений о дорожной обстановке или обновлении стоимости поездки.

[Замечание]Замечание

Некий шаблон маршрутизации составляется из нескольких слов, разделяемых точками. Рекомендуется следовать структуре ключей маршрутизации из наиболее общих элементов для наиболее специфичных из них, например, news.economy.usa или europe.sweden.stockholm. Такие тематические обмены поддерживают строгое соответствие ключей маршрутизации и также выполняет соответствие групповым символам при помощи * и # в качестве местоблюстителей для в точности одного слова и нуля или большего числа слов, соответственно.

Приводимая далее схема иллюстрирует как такой тематический обмен будет применяться в приложении CC. Заметьте как конкретная очередь Входящие остаётся неизменной и просто получает соедигение с тематическим обменом через дополнительные привязки, причём каждая из них отражает соответствую заинтересованность некого пользователя:

 

Рисунок 2-17


Тематический обмен отправляет тематические сообщения в очереди eco

Поскольку собственно код Входящих применим для всего, тот код, который мы уже разместили для выборки сообщения не требует изменений. На самом деле эти функциональные возможности целиком могут реализовываться всего лишь несколькими добавлениями. Самые первые из этих добавления принимают участие в определении самих тематических обмене в уже существующем методе on_start следующим образом:


def on_start(channel)
  # Declare and return the topic exchange, taxi-topic
  channel.topic("taxi-topic", durable: true, auto_delete: true)
end
 	   

Здесь нет ничего нового или необычного; самое основное отличие в том, что такой обмен имеет название taxi-topic и является обменом с типом topic Отправка сообщения даже проще, чем при функциональной возможности клиент- к- такси, ибо нет попытки созавать очередь адресата. Отправителю нет смысла перебирать всех пользователей для создания их очередей и привязки последних, поскольку лишь уже подписанные на целевые темы пользователи в момент отправки получат это сообщение, что в точночти соответствует ожидаемой функциональности. Здесь приводится метод order_taxi:


# Publishing an order to the exchange
def order_taxi(type, exchange)
  payload = "example-message"
  message_id = rand
  exchange.publish(payload,
                   routing_key: type,
                   content_type: "application/json",
                   content_encoding: "UTF-8",
                   persistent: true,
                   message_id: message_id)
end

exchange = on_start(channel)
# Order will go to any eco taxi
order_taxi('taxi.eco', exchange) 
# Order will go to any eco taxi
order_taxi('taxi.eco', exchange) 
# Order will go to any taxi
order_taxi('taxi', exchange) 
# Order will go to any taxi
order_taxi('taxi', exchange)
 	   

Основное отличие здесь теперь состоит в том, что сообжения публикуются в обмене taxi-topic. Остальная часть кода, который создаёт и публикует наши сообщения в точности та же самая, что и для обмена сообщениями клиент- к- такси. И наконец, сведения, которые надлежит добавлять когда какое- то новое такси подписывается или отписывается от определённых тем:


# example_consumer.rb

def taxi_topic_subscribe(channel, taxi, type)
  # Declare a queue for a given taxi
  queue = channel.queue(taxi, durable: true)

  # Declare a topic exchange
  exchange = channel.topic('taxi-topic', durable: true, auto_delete: true)

  # Bind the queue to the exchange
  queue.bind(exchange, routing_key: type)

  # Bind the queue to the exchange to make sure the taxi will get any order
  queue.bind(exchange, routing_key: 'taxi')

  # Subscribe from the queue
  queue.subscribe(block:true,manual_ack: false) do |delivery_info, properties, payload|
    process_order(payload)
  end
end

taxi = "taxi.3"
taxi_topic_subscribe(channel, taxi, "taxi.eco.3")
 	   

taxi.3 является нашим новым, дружественным к окружающей среде такси и теперь оно готово получать заказы от клиентов, которые хотят некую дружественную к окружающей среде машину.

Сама по себе спецификация AMQP не представляет никаких средств для анализа текущих привязок очереди, поэтому нет возможности перебора очередей и удаления тех, которые больше не требуются, дабы отразить изменение в представляющих интерес темах такси. Это не страшная проблема, так как само приложение всё равно обязано сопровождать это состояние.

[Замечание]Замечание

Консоль управления RabbitMQ раскрывает некий API REST, который можно применять для выполнения самостоятельного наблюдения за привязкой очередей, помимо многих прочих функциональных возможностей, не рассматривающихся в самой спецификации AMQP. Мы поговорим об этом дополнительно в своих последующих главах.

Поместив этот новый код куда следует, всё заработало как мы и ожидали. Никаких изменений кода не потребовалось для выборки в новых заказах клиента- к- такси, потому что они появляются в той же самой очереди Входящие, что и наши предыдущие сообщения. Тематические сообщения корректно отправляются в машины такси и получаются ими, причём всё это происходит при минимальных изменениях и без увеличения общего числа очередей. При подключении к нашей консоли управления, кликните по имеющейся закладке Exchanges; единственное заметное отличие состоит в новом тематическом обмене; а именно, в taxi-topic.

Выводы

Данная глава рассмотрела как подключаться к RabbitMQ и как отправлять и получать сообщения заказов. Система заказа определённой машины была успешно создана и были приведены в движение непосредственный и тематический обмены в контексте функциональных возможностей CC клиент- к- такси и клиент- к- группе такси.

По мере роста Complete Car растёт и спрос на новые функциональные возможности в их приложении такси. Что будет дальше с CC, когда она удовлетворит потребности пользователя? В нашей следйющей главе объясняется как работать с каналами и очередями для расширения возможностей нашего прикладного приложения.