Глава 10. Интеграция с базами данных

Данная глава обсуждает:

  • Публикацию из AMQP сообщений PostgreSQL

  • Принуждение RabbitMQ к ожиданию уведомлений от PostgreSQL

  • Применение хранилища InfluxDB для сохранения сообщений

Применение RabbitMQ для отвязки операций записи в базах данных OLTP является распространённым способом достижения великолепного хранилища данных и технологии обработки потоков событий. Когда вы публикуете сообщения с упорядоченными данными, которые будут записываться в вашу базу данных, некое простое потребляющее приложение может выступать в роли определённого моста между событиями и вашей базой данных. Однако можно и вовсе опустить шаг с потребителем и воспользоваться подключаемым модулем RabbitMQ, таким как обмен хранилища InfluxDB и автоматически запоминать сообщения в вашей базе данных напрямую из RabbitMQ.

Такая интеграция RabbitMQ с некой внешней базой данных не останавливается на этом. Другим мощным шаблоном является непосредственная публикация вашей базой данных сообщений в RabbitMQ. Этого можно достичь за счёт использования расширений или подключаемых модулей в вашей базе данных или при наличии подключаемого модуля RabbitMQ, который выступает в роли клиента базы данных, публикующего сообщения всякий раз когда происходит некое событие в базе данных.

[Замечание]Замечание

Оба таких шаблона интеграции баз данных могут снизить сложность работы, уменьшая потребность во внешних потребляющих приложениях для осуществления данного вида работ. Но такое упрощение имеет свою цену. При более тесной связи вашей базы данных и RabbitMQ ситуации с отказами могут становиться более сложными. Например, что произойдёт если ваш сервер базы данных становится медленным или вовсе перестаёт отвечать когда RabbitMQ пытается выполнять вставку записей в неё? Важно иметь ответ на вопросы подобные этому при тестировании ваших сборок для ситуаций отказов и определения правильных шагов по устранению неполадок и восстановлению прежде чем запускать их в промышленное применение.

Данная глава изучает оба таких шаблона интеграции баз данных с RabbitMQ. Вначале мы изучим как может применяться расширение pg_amqp PostgreSQL для публикации сообщений при помощи хранимых процедур. Затем вы изучите как вы можете достигать того же самого вида поведения при помощи функциональности PostgreSQL LISTEN/NOTIFY с использованием обмена LISTEN PostgreSQL. Затем мы переместимся из реляционного мира в мир NoSQL и вы увидите как обмен хранилища InfluxDB может применяться для запоминания сообщений в виде временных последовательностей данных по мере их публикации в RabbitMQ.

Расширение PostgreSQL pg_amqp

Основная идея с непосредственной публикацией сообщений из PostgreSQL при исполнении триггеров не содержит ничего нового. Ещё в начале 2003 система репликаций Slony применялась функциями триггера PostgreSQL для отправки сообщений о событии, реализуя репликацию хозяин- подчинённый (master-slave). В 2008 чтобы создать систему репликаций со слабой связью, которая предлагала бы большую гибкость чем имеющиеся системы репликации, я создал Golconde. Golconde применял триггеры POST COMMIT и PL/Python для отправки данных транзакции в прочие серверы PostgreSQL через протокол обмена сообщениями STOMP. Самая последняя версия PostgreSQL применяет обмен сообщениями событий для потоков данных транзакций для резервирования в горячем режиме экземпляров PotgreSQL, которые действуют как доступные только для чтения подчинённые (slaves) с возможностью отработки отказа в случае если хозяин становится недоступным.

Принимая во внимание такую основательную историю репликаций на основе событий в PostgreSQL, кажется естественным что кто- то добавит гибкие возможности обмена сообщениями в свою экосистему. В 2009 году Тео Шлосснагл выпустил pg_amqp, некое расширение PostgreSQL, которое выставляет публикации AMQP через функции PostgreSQL. Хотя pg_amqp выставляет только подмножество спецификации AMQP 0-8, оно жёстко выполняется при публикации сообщений из функций триггеров PostgreSQL. Вся выставляемая pg_amqp функциональность доступна в точности как и все прочие функции PostgreSQL и может вызываться в операторах SQL, а также в хранимых процедурах. pg_amqp выставляет простой AMQP с двумя методами для взаимодействия с сообщениями AMQP: amqp.publish и amqp.disconnect. Метод amqp.publish создаёт некое сообщение AMQP и доставляет его с помощью метода RPC Basic.Publish, в точности как и все прочие издатели AMQP (Рисунок 10-1). Соединения автоматически устанавливаются и ликвидируются, однако если вы желаете прекратить соединение после публикации некоторого сообщения, вы можете вызвать метод amqp.disconnect.

 

Рисунок 10-1


pg_amqp.publish uses Basic.Publish применяет LISTEN/NOTIFY для отправки сообщения в RabbitMQ

Когда вы применяете pg_amqp, вы вызываете синхронное взаимодействие с RabbitMQ и необходимо проявить аккуратность с тем, чтобы гарантировать что ваше применение не окажет воздействия на вашу общую скорость запроса. Как и в случае любой тесно связанной интеграции вам надлежит выполнять эталонное тестирование и необходимо проверить варианты отказов прежде чем вы поместите систему в промышленное применение. Например, если вы обернёте использование amqp.publish в некую транзакцию, что произойдёт если pg_amqp не сможет соединиться с брокером RabbitMQ? Выполнит ли ваша база данных транзакции в случае отказа публикации?

Чтобы разузнать это, вначале нам следует установить само расширение pg_amqp.

Установка расширения pg_amqp

Существует два способа установки расширения pg_amqp: вы можете установить его выгрузив и скомпилировав исходники вручную, либо при помощи клиента PGXN (PostgreSQL Extension Network). PGXN является репозиторием пакетов для расширений PostgreSQL. Установка на базе PGXN впечатляюще проще, однако она не работает с установками PostgreSQL 9.3. Если вы не применяете PostgreSQL 9.3 или старше, я рекомендую вам начать с установки и вернуться обратно к ручной установке, если первая завершится неудачей.

[Замечание]Замечание

Прежде чем вы попробуете установить pg_amqp, вам следует вначале убедиться что у вас имеется полностью установленная версия PostgreSQL 9.1 или старше, включая файлы разработки, так как данное расширение компилируется как часть общего процесса установки. Кроме того, вам понадобится цепочка инструментария для компиляции PostgreSQL из исходного кода. Если вам требуется помощь в установке PostreSQL или необходимой для компиляции PostgreSQL цепочки инструментов разработчика, вы можете найти руководства по установке на официальном Wiki.

Установка через PGXN

Для установки расширения pg_amqp через PGXN вам вначале потребуется убедиться что необходимый клиент PGXN установлен в вашей системе. Он написан на Python, и может быть просто установлен при помощи easy_install:


easy_install pgxnclient
		

Имея установленным приложение pgxnclient, вы теперь можете попробовать автоматически установить само расширение. В качестве пользователя с правами доступа к каталогу PostgreSQL lib, запустите следующую команду:


pgxnclient install pg_amqp
		

Если всё отработает как ожидалось, данная команда не вернёт никакой ошибки. Однако не волнуйтесь если столкнётесь с какой- то ошибкой. Хотя установка вручную имеет больше шагов, она достаточно проста.

Установка вручную

Исходный код для pg_amqp доступен на GitHub. Если вы не знакомы с Git, вы можете выгрузить необходимый исходный код с https://github.com/omniti-labs/pg_amqp/archive/v0.3.0.zip и раскрыть его в некий каталог для компиляции. Приводимый ниже код, написанный на языке сценариев BASH, фиксирует установку для PostgreSQL 9.3 или последующих систем и его следует исполнять в самом верхнем каталоге развёрнутого исходного кода.


# Listing 10.1 Компиляция и установка pg_amqp
#!/bin/bash
LIBDIR=`pg_config --libdir`
INSTALLSH="$LIBDIR/pgxs/config/install-sh"
make && make INSTALL=$INSTALLSH install
		

После того как вы успешно установите расширение pg_amqp, вам понадобится загрузить его в некую базу данных PostgreSQL. Для более простой демонстрации следующий пример применяет установленные по умолчанию суперпользователя postgres и базу данных postgres; это не требуется для применения, хотя пользователь и должен являться суперпользователем.

Для загрузки данного расширения подключитесь к базе данных PostgreSQL при помощи psql:


$ psql -U postgres postgres
		

После того как вы выполните подключение, вы должны обнаружить вывод, аналогичный такому:


psql (9.3.5)
Type "help" for help.

postgres=>
		

Теперь вы можете загрузить своё расширение при помощи синтаксиса CREATE EXTENSION:


postgres=> CREATE EXTENSION amqp;
		

Если необходимое расширение было загружено успешно, вы получите подтверждение, похожее на следующее:


CREATE EXTENSION
		

После загрузки расширения вы теперь можете перейти к настройке этого расширения и последующей публикации сообщений.

Настройка расширения pg_amqp

Определённое расширение настраивается посредством заполнения основного отношения (таблицы) amqp.broker, которое автоматически создаётся при выполнении вами запроса CREATE EXTENSION из нашего предыдущего раздела. Как показано в Таблице 10-1, amqp.broker содержит все обычные настройки соединения AMQP вместе с полем broker_id, которое применяется при вызове как функции amqp.publish, так и функции amqp.disconnect.

Таблица 10-1. Определение отношения amqp.broker
Атрибут Тип Ключевое слово

broker_id

Integer

not null default nextval('broker_broker_id_seq')

host

Text

not null

port

Integer

not null default 5672

vhost

Text

 

username

Text

 

password

Text

 

Если вы исполняете PostgreSQL локально, а RabbitMQ в своей ВМ Vagrant, применявшейся в более ранних главах, вы должны иметь возможность подключиться к localhost. Следующий оператор SQL настроит pg_amqp вставкой кортежа в отношении, который подключается к RabbitMQ на localhost, с портом 5672 с применением виртуального хоста / и комбинацией имени пользователя и пароля guest/guest. Если такие настройки подключения не работают у вас из- за отличий в вашей тестовой среде, настройте данный SQL надлежащим образом:


INSERT INTO amqp.broker (host, port, vhost, username, password)
  VALUES ('localhost', 5672, '/', 'guest', 'guest')
  RETURNING broker_id;
		

После исполнения данной команды вы получите обратно значение broker_id, подтверждающее успешность операции вставки в обсуждаемое отношение.


broker_id
-----------
        1
(1 row)

INSERT 0 1
		

Запомните это значение broker_id, так как вы будете применять его при публикации сообщений в RabbitMQ.

Публикация сообщений через расширения pg_amqp

Получив вставленное и настроенное pg_amqp, настало время опубликовать ваше первое сообщение. Прежде чем вы сделаете это, вам следует настроить очередь в UI управления RabbitMQ для получения данного сообщения. Откройте в своём браузере http://localhost:15672/#/queues и создайте очередь с названием pg_amqp-test, как это показано на Рисунке 10-2.

 

Рисунок 10-2


Создание очереди pg_amqp-test

После того как вы создали необходимую очередь, вы можете проверить публикацию в эту очередь из PostgreSQL через установленный по умолчанию прямой обмен с помощью очереди с названием (pg_amqp-test) в качестве значения ключа маршрутизации. При помощи psql подключаемся к своей базе данных postgres, выпуская следующий запрос, который передаёт значения идентификатора брокера, обмен, ключ маршрутизации и сообщение:


SELECT amqp.publish(1, '', 'pg_amqp-test',
                    'Test message from PostgreSQL');
		

После подстановки вы должны получить подтверждение что ваш запрос выполнился успешно:


publish
---------
 t
(1 row)
		

Хотя PostgreSQL и сообщил, что данное сообщение было опубликовано, лучшей проверкой будет воспользоваться UI управления для выборки данного сообщения. В странице подробностей очереди http://localhost:15672/#/queues/%2F/pg_amqp-test вы можете воспользоваться разделом Get Messages для извлечения и просмотра конкретного сообщения. Как показано на Рисунке 10-3, после того как вы кликните по кнопке Get Message(s), вы должны обнаружить то сообщение, которое было опубликовано через функцию amqp.publish PostgreSQL.

 

Рисунок 10-3


Применение UI управления для подтверждения опубликованных сообщений

Как вы можете видеть, публикация сообщений через pg_amqp достаточно тривиальное упражнение после того как вы его установили и настроили. Стоит также отметить, что начиная с версии 0.3 вы не можете устанавливать свойства AMQP. Помимо прочего, публикуемые сообщения обёртываются в некую транзакцию AMQP. Если вы вызовете функцию amqp.publish в некоторой транзакции PostgreSQL, а затем откатите эту транзакцию обратно, ваша транзакция RabbitMQ также после этого будет откачена назад. В большинстве случаев публикация будет обёртываться в рамках отдельной хранимой процедуры, вместо того чтобы присутствовать совместно с прочими действиями определённой хранимой процедуры или в качестве функции триггера, которая исполняется в INSERT, UPDATE или DELETE кортежей в каком- то отношении.

[Замечание]Замечание

UI управления выдаст вам предупреждение, что данная операция Get Messages(s) является разрушающей операцией, что означает, что данное сообщение на самом деле будет удалено из отображающей его очереди, причём если опция Requeue установлена в значение Yes, это приведёт к повторной публикации данного сообщения и поэтому оно будет находиться в хвосте очереди, а не в её начале.

Обработка отказов

Вы могли заметить, что когда вы вызвали функцию amqp.publish, она вернула Булево значение. В случае успеха она возвращает t, или true, но что произойдёт если она не сможет подключиться к RabbitMQ? Попытка выпуска того же самого оператора в какой- то новой транзакции с новым подключением возвратит f или false, а также запись с предупреждением:


postgres=# SELECT amqp.publish(1, '', 'pg_amqp-test',
                    'Test message from PostgreSQL');
					
WARNING: amqp[localhost:5672] login socket/connect failed: Connection refused

 publish
---------
 f
(1 row)
		

В данном случае достаточно это просто проверить в качестве результата вызова amqp.publish и в том случае, если она возвращает false, вы не смогли выполнить публикацию. Однако что произойдёт внутри транзакции с длительным исполнением при отключении RabbitMQ? В таком случае данный вызов применит true, однако запись в журнал выдаст предупреждение что ваша транзакция AMQP не может быть зафиксирована:


postgres=# SELECT amqp.publish(1, '', 'pg_amqp-test',
                    'Test message from PostgreSQL');
WARNING:  amqp could not commit tx mode on broker 1
 publish
---------
 t
(1 row)
		

К несчастью, начиная с версии pg_amqp 0.3.0, у вас нет возможности перехвата такой ошибки и если вы не отслеживаете журналы регистрации своего PostgreSQL, вы можете утратить данные и не знать об этом. Хотя это и поведение с возможными проблемами, это лучше утраты ваших транзакций базы данных. Как и в случае всех операционных систем, ключом к успеху является мониторинг. Если вы применяете систему, аналогичную Splunk, вы можете создать некое задание, которое периодически будет просматривать ошибки в журналах PostgreSQL. В качестве альтернативы вы можете написать своё собственное приложение или подключаемый модуль для систем подобных Nagios {Прим. пер.: или Zabbix}, которое сканирует имеющиеся журналы на предмет таких предостережений.

Ожидание уведомлений PostgreSQL

Хотя pg_amqp предоставляет удобный и быстрый способ публикации сообщений напрямую из PostgreSQL, он создаёт достаточно тесно связанную интеграцию между экземплярами сервера. Если вдруг по какой- то причине ваш кластер RabbitMQ становится недоступным, это может неблагоприятно воздействовать на ваш сервер PostgreSQL. Чтобы избежать такой тесной стыковки и при этом оставить прямую интеграцию, я создал обмен LISTEN PostgreSQL.

Обмен LISTEN PostgreSQL выступает в роли клиента PostgreSQL выполняя ожидание уведомлений, выпускаемых в PostgreSQL оператором SQL NOTIFY. Уведомления PostgreSQL отправляются в канал, на который подписаны клиенты в текстовом значении. Это значение используется в обмене LISTEN PostgreSQL в качестве соответствующего ключа маршрутизации для данного публикуемого сообщения. Когда уведомления отправляются в некий канал, для которого зарегистрирован обмен LISTEN, эти уведомления будут возвращаться в некое сообщение, которое затем публикуется при помощи поведения, подобного прямому обмену (Рисунок 10-4).

 

Рисунок 10-4


Обмен LISTEN действует как некий клиент PostgreSQL, публикую уведомления в виде сообщений.

Конечно, любая технология имеет компромиссы. При использовании pg_amqp, если PostgreSQL не будет способен соединиться с RabbitMQ, вызов amqp.publish завершится неудачей. В случае с обменом LISTEN если отказывает соединение PostgreSQL, оно не может быть зарегистрировано для уведомлений и в свою очередь, не будет публиковать никаких сообщений. Подобный сценарий вам может понадобиться при мониторинге скорости общей пропускной способности конкретного обмена при помощи API управления RabbitMQ.

Установка обмена LISTEN PostgreSQL

Обмен LISTEN PostgreSQL можно выгрузить с его страницы проекта GitHub. В тексте README, отображаемом на данной странице проекта находятся ссылки на предварительно скомпилированные подключаемые модули для определённых версий RabbitMQ. Когда вы выгрузите и установите необходимый подключаемый модуль, убедитесь что вы получили самую последнюю версию RabbitMQ.

Предоставляемая выгрузка состоит из файла zip с двумя подключаемыми модулями RabbitMQ: собственно обмена и драйвера PostgreSQL. Приводимый ниже листинг кода выгрузит и установит данный подключаемый модуль в системе OS X с запущенным RabbitMQ 3.3.5, установленным через Homebrew. Для прочих систем вам понадобится изменить установленное назначение RABBITMQ_DIR, определив верный путь к базовому каталогу RabbitMQ.


# Листинг 10.2 установки сценария OS X для обмена LISTEN
#!/bin/bash
RABBITMQ_DIR=/usr/local/Cellar/rabbitmq/3.3.5/                      # Устанавливаем базовый каталог для вашей установки RabbitMQ
PLUGIN_DIR=$RABBITMQ_DIR/plugins/                                   # Устанавливаем значение пути каталога подключаемого модуля
cd /tmp                                                             # Переходим во временный каталог
curl -L -o pgsql-listen-exchange.zip http://bit.ly/1ndl8eK          # Выгружаем необходимый обмен с GitHub
unzip pgsql-listen-exchange.zip                                     # Развёртываем файл zip
rm pgsql-listen-exchange. zip                                       # Удаляем выгруженный файл zip
mv epgsql-1.4.1-rmq3.3.x-0.2.0-git3318bd5.ez $PLUGIN_DIR            # Перемещаем драйвер PostgreSQL в каталог подключаемого модуля
mv pgsql_listen_exchange-3.3.x-0.2.0.ez $PLUGIN_DIR                 # Перемещаем подключаемый модуль обмена LISTEN в каталог подключаемого модуля
$RABBITMQ_DIR/sbin/rabbitmq-plugins enable pgsql_listen_exchange    # Разрешаем подключаемый модуль обмена LISTEN
 	   
[Замечание]Замечание

В операционных системах Ubuntu и RedHat/CentOS, RabbitMQ обычно устанавливается в специфичном для данной версии каталоге под /usr/lib/rabbitmq. В Windows RabbitMQ обычно устанавливается в особом для версии каталоге в C:\Program Files\RabbitMQ. Предварительно скомпилированные подключаемые модули не зависят от платформы и могут исполняться на любой платформе, на которой запущен RabbitMQ.

Чтобы убедиться что данный подключаемый модуль был установлен правильно, перейдите в закладку Обменов в UI управления по ссылке http://localhost:15672/#/exchanges. В разделе Add a New Exchange вы должны обнаружить значение x-pgsql-listen в ниспадающем списке Type (Рисунок 10-5).

 

Рисунок 10-5


Проверка того, что опцияx-pgsql-listen имеется в ниспадающем перечне Type.

После того как вы проверили, что данный подключаемый модуль установлен как положено, вы теперь можете перейти к настройке самого обмена. Если вы не увидели необходимого варианта в своём ниспадающем перечне, может быть что необходимые подключаемые модули не были скопированы в соответствующие каталоги или что вы работаете с более старой версией Erlang чем та, с которой скомпилированы данные подключаемые модули. Рекомендуется применять Erlang R16 или последующие версии.

Имеется множество способов настройки данного подключаемого модуляЖ непосредственная настройка данного обмена путём подключения аргументов в процессе объявления обмена, через настройку аргументов данного обмена в основном файле rabbitmq.config или через политики, которые применяются для данного обмена. Политики предоставляют наиболее гибкий способ настройки необходимого обмена и их следует применять пока вы не получите достаточный опыт работы с данным подключаемым модулем и станете компетентным относительно того как он ведёт себя.

Настройка на основе политик

Чтобы начать, переместитесь в закладку Admin в своём UI управления. Здесь кликните Policies с правой стороны данной страницы (Рисунок 10-6).

 

Рисунок 10-6


Страница политик UI управления.

Для создания необходимых для подключения к PostgreSQL политик, задайте название для такой политики, некий шаблон регулярного выражения для соответствия необходимому имени обмена, а также хост, порт, название базы данных, имя пользователя и необязательный пароль PostgreSQL. Кроме того, вы можете обуздать данную политику, указав что она применима только к обменам. Рисунок 10-7 показывает некую политику, которая осуществит подключение к PostgreSQL на localhost с потом 5432 при помощи имени пользователя и пароля postgres.

 

Рисунок 10-7


Объявление политик для обмена уведомлений.

После того как вы кликните Add Policy, вы увидите все политики перечисленными на той же самой странице, как и на Рисунке 10-8

 

Рисунок 10-8


Добавленная в раздел всех политик политика.

Когда вы добавили необходимую политику, предоставленная вами информация о соединении будет проверена на корректность типа, но не на само соединение. Вы не узнаете что данное соединение допустимо пока не создадите сам обмен.

Создание обмена

Имея созданную политику переместитесь в закладку Exchanges своего UI управления по ссылке http://localhost:15672/#/exchanges. Воспользовавшись формой Add a New Exchange в нижней части страницы добавьте новый обмен. Присвойте данному обмену название notification с тем, чтобы он соответствовал установленной политике и установите тип обмена x-pgsql-listen (Рисунок 10-9).

 

Рисунок 10-9


Проверка того, что опцияx-pgsql-listen имеется в ниспадающем перечне Type.

После того как вы добавили необходимый обмен, он подключится к PostgreSQL, однако он не начнёт осуществлять ожидание уведомлений. Чтобы заставить его начать ожидание обменов, вы должны привязать данный обмен при помощи того ключа маршрутизации, который соответствует строке канала уведомлений PostgreSQL.

Создание и привязка проверочной очереди

Самый последний этап проверки обмена LISTEN состоит в создании некторой очереди проверки, в которую будут направляться ваши уведомления. Если вы проследуете в закладку Queues своего UI управления (http://localhost:15672/#/queues), вы сможете создать некую очередь в соответствующем разделе Add a New Queue. Для целей данной проверки мы назовём эту очередь notification-test. Вам нет нужды определять какие- то атрибуты или изменять некие установленные по умолчанию свойства в данной форме при добавлении очереди для проверки.

Когда вы добавите эту очередь, перейдите на её страницу в UI управления http://localhost:15672/#/queues/%2F/notification-test. В разделе Bindings на этой странице вы можете создать некую новую привязку с имеющимся обменом notification при помощи ключа маршрутизации example (Рисунок 10-10).

 

Рисунок 10-10


Связывание очереди проверки с соответствующим обменом уведомлений.

После его добавления данный обмен соединится с PostgreSQL и выполнит оператор LISTEN, регистрируясь для всех отправок уведомлений в имеющемся канале example. Теперь вы готовы к отправке проверочных уведомлений.

Публикация через NOTIFY

Чтобы убедиться что все настройки даного обмена корректны, вы теперь можете отправить некое уведомление в PostgreSQL, воспользовавшись оператором SQL NOTIFY. Для этого примените psql, подключаясь от имени postgres к своей базе данных postgres:


$ psql -U postgres postgres
		

После выполнения подключения вы можете отправить своё уведомление:


psql (9.3.5)
Type "help" for help.
postgres=# NOTIFY example,  'This is a test from PostgreSQL';
NOTIFY
		

Когда это уведомление отправится, переключитесь обратно в UI управления RabbitMQ для получения сообщения из очереди notification-test в разделе Get Messages (Рисунок 10-11).

 

Рисунок 10-11


Получение сообщения из очереди notification-test.

Как вы можете видеть, наш обмен LISTEN добавляет метаданные, относящиеся к самому сообщению, которые не заполнялись при применении pg_amqp. Данные свойства сообщения определяют app_id, отмечающий что данное сообщение было выпущено из подключаемого модуля pgsql-listen-exchange. Также отображено значение timestamp, взятое из значения текущего времени в сервере RabbitMQ. Кроме того, headers устанавливаются для определения значений канала уведомлений PostgreSQL, базы данных, сервера и названия самого источника обмена.

Хотя данный пример представлял простую текстовую строку, функции отправки уведомлений способны упорядочивать данные в различных форматах, делая уведомления многосторонней частью вашего приложения. Возможно вы пожелаете применять их для отладки сложных хранимых процедур, делая более простым отслеживание состояния данных по мере их прохождения через вашу базу данных. Или же вы можете пожелать применять их для обновления неравноправных систем в определённом облачном решении. В любом случае, обмен LISTEN добавляет интеграцию с PostgreSQL со слабой связностью при очень малых накладных расходах на всю систему в целом.

Сохранения сообщений в InfluxDB

InfluxDB (http://influxdb.com/docs/) является распределённой базой данных временных последовательностей с открытым исходным кодом, написанная на Go. Она очень легко устанавливается как в системах Linux , так и в OS X. Это интригующая система для хранения данных временных последовательностей с аналитическими целями, поскольку она предоставляет множество простых в применении протоколов наполнения данными и встроенный интерфейс запросов на веб основе для опроса хранимых данных. Она быстро становится некоторой альтернативой системам подобным Graphite, так как она предоставляет намного более масштабируемое хранилище, которое доступно через связный, горизонтально масштабируемый кластера.

Сообщения, которые направляются через обмен хранилища InfluxDB изучаются на предмет того, должны ли они сохраняться в InfluxDB. Если тип данного сообщения установлен как application/json, данное сообщение будет преобразовано в надлежащий формат и сохранено в InfluxDB при помощи установленного ключа маршрутизации как название события InfluxDB. Кроме того, если определён timestamp, он будет автоматически поставлен в соответствие атрибуту времени события InfluxDB.

Установка и настройка InfluxDB

Чтобы начать работу с RabbitMQ и InfluxDB вы сначала должны убедиться что у вас имеется установленным InfluxDB. В документации проекта имеются подробные инструкции установки. Возьмите самую последнюю версию из индекса документации, и затем следуйте инструкциям по установке указаниям по началу работы чтобы убедиться что ваша система установлена и настроена как нужно.

В качестве альтернативы, для изучения InfluxDB этот проект предоставляет сервер общедоступной игровой площадки по адресу http://play.influxdb.org. Если вы применяете Windows или не желаете устанавливать сервер локально на своём компьютере, вы можете воспользоваться общедоступным местом для упражнений с необходимым обменом хранилища InfluxDB для проверки интеграции. Примеры в данном разделе главы используют локальную установку, но вам нужно будет только поменять информацию о подключении и аутентификации для использования сервера общедоступных площадок тестирования.

Если вы установили локальный экземпляр InfluxDB, вам требуется создать и базу данных, и пользователя для RabbitMQ. Чтобы сделать это, откройте http://localhost:8083 в своём браузере и зарегистрируйтесь в интерфейсе администрирования при помощи пользователя root с паролем root (Рисунок 10-12).

 

Рисунок 10-12


Регистрация в интерфейсе администрирования InfluxDB.

Если вы зарегистрировались в первый раз, вам будет предложено создать базу данных. Для проверки обмена хранилища InfluxDB создайте некий обмен с названием rabbitmq-test (Рисунок 10-13).

 

Рисунок 10-13


Создание базы данных rabbitmq-test.

После того как вы создали базу данных, она появляется в перечне в верхней части вашей веб страницы. Кликните в этом списке по rabbitmq-test и вы получите страницу, в которой вы можете добавить некоторого пользователя для RabbitMQ, которого данный подключаемый модуль будет применять для аутентификации в RabbitMQ (Рисунок 10-14). В полученной форме в качестве имени пользователя введите rabbitmq и его пароль test и кликните по кнопке Create.

 

Рисунок 10-14


Создание пользователя rabbitmq для базы данных rabbitmq-test.

После того как вы создали этого пользователя, он будет отображён в таблице Database Users в верхней части данной страницы. После этого вы готовы к установке и настройке своего обмена хранилища InfluxDB.

Установка обмена хранилища InfluxDB

Установка и настройка обмена хранилища InfluxDB очень похожа на этот процесс для обмена LISTEN PostgreSQL. Этот подключаемый модуль может быть выгружен со страницы проекта GitHub. Его README, отображаемый на данной странице проекта перечисляет ссылки выгрузки на предварительно скомпилированные двоичные файлы подключаемого модуля для конкретных версий RabbitMQ. Когда вы выгрузите и установите соответствующий подключаемый модуль, убедитесь что у вас имеется самая последняя версия для вашей версии RabbitMQ.

Выгруженный файл имеет тип zip с двумя подключаемыми модулями RabbitMQ: самим обменом и клиентской библиотекой HTTP. Приводимый далее листинг кода выгрузит и установит подключаемый модуль в системе OS X с запущенным RabbitMQ 3.3.5, установленным с помощью Homebrew. Для прочих систем вам понадобится изменить назначение RABBITMQ_DIR, предписав правильный путь для базового каталога своего RabbitMQ.


# Листинг 10.3 Установка сценария OS X для обмена LISTEN
#!/bin/bash
RABBITMQ_DIR=/usr/local/Cellar/rabbitmq/3.3.5/                          # Устанавливаем базовый каталог для вашей установки RabbitMQ
PLUGIN_DIR=$RABBITMQ_DIR/plugins/                                       # Устанавливаем значение пути каталога подключаемого модуля
cd /tmp                                                                 # Переходим во временный каталог
curl -L -o influxdb-storage-exchange.zip http://bit.ly/1j7UvXf          # Выгружаем необходимый обмен с GitHub
unzip influxdb-storage-exchange.zip                                     # Развёртываем файл zip
rm influxdb-storage-exchange.zip                                        # Удаляем выгруженный файл zip
mv ibrowse-4.0.2-rmqv3.3.x-git7871e2e.ez $PLUGIN_DIR                    # Перемещаем драйвер HTTP в каталог подключаемого модуля
mv influxdb_storage_exchange-v3.3.x-0.1.1.ez $PLUGIN_DIR                # Перемещаем подключаемый модуль обмена InfluxDB в каталог подключаемого модуля
$RABBITMQ_DIR/sbin/rabbitmq-plugins enable influxdb_storage_exchange    # Разрешаем подключаемый модуль обмена InfluxDB
 	   
[Замечание]Замечание

В операционных системах Ubuntu и RedHat/CentOS, RabbitMQ обычно устанавливается в специфичном для данной версии каталоге под /usr/lib/rabbitmq. В Windows RabbitMQ обычно устанавливается в особом для версии каталоге в C:\Program Files\RabbitMQ. Предварительно скомпилированные подключаемые модули не зависят от платформы и могут исполняться на любой платформе, на которой запущен RabbitMQ.

Чтобы убедиться что ваш подключаемый модуль был установлен правильно, переместитесь в закладку Exchanges UI управления по ссылке http://localhost:15672/#/exchanges. В разделе Add a New Exchange вы должны обнаружить значение x-influxdb-storage в ниспадающем списке Type (Рисунок 10-15).

 

Рисунок 10-15


Проверка того, что обмен InfluxDB установлен как требовалось.

Получив подтверждение правильной установки вы теперь можете создать некий экземпляр необходимого обмена хранилища InfluxDB.

Создание проверочного обмена

Как и уже обсуждавшийся обмен LISTEN PostgreSQL, обмен хранилища InfluxDB может настраиваться через политику, посредством rabbitmq.config или передавая персональные аргументы при объявлении данного обмена. Для знакомства с различными опциями и переменными настройки, применяемыми в каждом из доступных подходов к настройке, ознакомьтесь в файлом README на его GitHub. Для иллюстрации различий настройки на основе политик, применявшейся нами для обмена LISTEN PostgreSQL и настройкой на базе аргументов, следующий наш пример использует при создании необходимого обмена настройку на основе аргументов.

Вначале перейдите в закладку Exchanges UI управления RabbitMQ по адресу http://localhost:15672/#/exchanges е перейдите в раздел Add a New Exchange. Настройте необходимый обмен с помощью индивидуальных аргументов, сделанных из переменных, которые имеют префикс x-, который указывает, что что каждый из этих аргументов не является стандартной переменной AMQP или RabbitMQ. Вам понадобится настроить хост, порт, название базы данных, пользователя и его пароль для подключения к соответствующей InfluxDB. Каждое из этих значений имеет префикс x-, как это показано на (Рисунок 10-16). Если вы ошибочно не укажете для этих переменных префикс x-, это приведёт к тому, что данный обмен будет применять установленные по умолчанию значения для каждой из предоставленной вами установки.

 

Рисунок 10-16


Добавление нового обмена InfluxDB с настройкой на основе аргументов.

После добавления данного обмена его параметры будут проверены на соответствие типа, однако сама информация соединения не будет протестирована. Из- за присущей обменам AMQP природы неизменности, в случае если в ошиблись в данном обмене, его необходимо будет удалить и добавить повторно.

Публикуемые в данном обмене сообщения будут сначала сохраняться в InfluxDB, а затем направляться во все подключённые очереди или обмены с применением поведения предметного обмена с ключом маршрутизации. Неверно настроенные обмены не будут препятствовать прохождению через них сообщений, но они не смогут сохранять эти сообщения.

Получив созданным необходимый обмен, вы теперь способны проверить этот обмен посредством публикации сообщений в нём. Если вы применяете настройку через rabbitmq.config или на основании политик, вы можете оставлять все аргументы пустыми, а значения каждого из применявшихся методов будут применены при настройке данного обмена.

Проверка обмена

Для проверки надлежещей интеграции вашего обмена перейдите на страницу нового обмена в UI управления RabbitMQ по адресу http://localhost:15672/#/exchanges/%2F/influx-test. В его разделе Publish a Message определите некое сообщение, у которого имеется content_type с application/json, допустимое значение timestamp, а также правильным образом сформированное тело JSON (Рисунок 10-17).

 

Рисунок 10-17


Публикация сообщения JSON в обмене хранилища InfluxDB.

Так как вы не связали никакую очередь со своим обменом, вы получите предупреждение что ваше сообщение было опубликовано, но не маршрутизация не выполнена при публикации вами этого сообщения. Это приемлемо для данной проверки, так как вы всего лишь желаете проверить что данный пункт данных встраивает их в InfluxDB.

Чтобы удостовериться что данное событие было сохранено как нужно, откройте интерфейс администрирования в своём веб браузере переместившись в http://localhost:8083 и зарегистрируйтесь под root с применением пароля root. Как показано на Рисунке 10-18, вам будет представлен перечень баз данных. Кликните по ссылке Explore Data для своей базы данных rabbitmq-test.

 

Рисунок 10-18


Интерфейс администрирования InfluxDB отображающий перечень баз данных.

После того как вы кликните по Explore Data, вы попадёте в интерфейс в котором вы можете запрашивать сами данные. Ввод простого запроса SELECT * FROM pageview должен вернуть вам единственную строку, как это показано на Рисунке 10-19.

 

Рисунок 10-19


Проверка того, что кортеж был вставлен в базу данных.

Если вы не видите в результате своего запроса результата, скорее всего вы допустили опечатку в заголовке своего сообщения или в самом сообщении. Убедитесь что тип содержимого определён как установленный в значение application/json. Кроме того проверьте что все публикуемые вами сообщения сформированы правильным образом в JSON. Вы можете проверить тело своего сообщения с помощью http://jsonlint.com. Наконец, убедитесь что InfluxDB запущена и что все необходимые данные настройки аккуратно предоставлены при создании вашего обмена.

Если всё работает как ожидалось, ваш обмен хранилища InfluxDB продемонстрирует общую гибкость и мощность, которую может создавать непосредственная интеграция базы данных с RabbitMQ. Если вы применяете систему аналогичную Sensu для мониторинга вашей инфраструктуры, у вас теперь имеется мощный способ прозрачным образом отслеживать ваш поток событий и сохранять его в базе данных для последующей аналитики поступающих данных, или представления полученной информацию в инструментальной панели с помощью Grafana.

Выводы

Интеграция баз данных с RabbitMQ снижает операционные накладные расходы исполнения приложение потребителей или издателей за пределами вашей базы данных или стека RabbitMQ. За такое упрощение, однако, приходится платить. Так как RabbitMQ и ваша база данных более тесно связаны, сценарии обработки отказов становятся более сложными.

В данной главе вы изучили как PostgreSQL может применяться в качестве источника сообщений, которые направляются через RabbitMQ либо при помощи расширения pg_amqp PostgreSQL, или с применением обмена LISTEN PostgreSQL. Было приведено подробное описание установка и использование обмена хранилища InfluxDB, которое продемонстрировало как публикуемые в RabbitMQ сообщения могут сохраняться в базе данных самим RabbitMQ.

Интеграции базы данных в этой главе всего лишь показывают кусочек айсберга. Существуют и прочие проекты, которые напрямую интегрируют RabbitMQ с базами данных , такие как обмен Riak и его конкуренты, проект, который реализует точки входа фиксации Riak RabbitMQ, публикующие сообщения в RabbitMQ при возникновении записей транзакций в Riak. Чтобы отыскать некий подключаемый модуль для выбранной вами базы данных проверьте страницу подключаемых модулей сообщества RabbitMQ, а также страницу клиентов и инструментов разработки RabbitMQ.

не можете обнаружить искомое? Возможно вы способны сделать свой вклад в следующий подключаемый модуль, предоставляющий интеграцию базы данных с RabbitMQ.