Приложение A: Ceph поверх RDMA

Данное руководство является переводом материалов HowTo Configure Ceph RDMA с обновлениями от 27.03.2017 5:19.

Основной целью данного документа является описание того как поднять кластер Ceph с RDMA. Данные инструкции предназначаются искушённым пользователям Ceph.

Настройка RDMA Ceph

  Ссылки

  Предварительные требования

На всех своих серверах примените следующее:

  1. Убедитесь что у вас в наличии работающие ping и rping между всеми узлами Ceph.

    {Прим. пер.: Убедитесь что у вас установлена последняя версия OFED и после установки перезагружен драйвер openibd.
    Проверка rping выполняется так:
    Сервер: rping –s –v server_ip
    Клиент: rping –c –v –a server_ip }
    }

  2. Откройте /etc/security/limits.conf и добавьте следующие строки для выполнения ping к оперативной памяти. RDMA тесно связан с выделенными адресами физической памяти.

    
    * soft memlock unlimited
    * hard memlock unlimited
    root soft memlock unlimited
    root hard memlock unlimited
     	   
  3. Для выполнения процесса развёртывания вы должны разрешить регистрацию ssh без применения пароля, как это требуется Ceph. Подробнее Preflight Checklist — Ceph Documentation.

  4. Установите Ceph-deploy

    
    sudo rpm -Uvh https://download.ceph.com/rpm-kraken/el7/noarch/ceph-deploy-1.5.36-0.noarch.rpm;
     	   

  Создание кластера Ceph

Данная настройка основана на repo — ceph-deploy 1.5.37 documentation и Storage Cluster Quick Start — Ceph Documentation.

  1. Создайте некий рабочий каталог.

    
    mkdir my_cluster
    cd my_cluster
     	   
  2. Установите Ceph:

    1. Создайте ceph.conf и установите ceph на все узлы:

      
      $ceph-deploy new --cluster-network=11.130.1.0/24 --public-network=11.130.1.0/24 "list of monitors"
      $ceph-deploy --overwrite-conf install --repo-url=ftp://user:password@ftpsupport.mellanox.com/rpm-v11.1.0-6639-gb304df1 --gpg-url=ftp://user:password@ftpsupport.mellanox.com/rpm-v11.1.0-6639-gb304df1/release.asc "list all nodes"
       	   
      [Замечание]Замечание

      Если пользователь или пароль имеют особые символы, такие как "@", зарезервированные символы, тогда такой символ должен быть заменён "зарезервированным символом, закодированным после символа процента", например %40.

      [Совет]Совет

      Помните, что общее число мониторов Ceph должно быть нечётным числом.

    2. В my_cluster/ceph.conf добавьте следующие строки:

      
      [global]
      ...
      ms_type=async+rdma
      ms_async_rdma_device_name=mlx5_0
       	   
      [Замечание]Замечание

      "ms_async_rdma_device_name" должен быть активным устройством

      Для определения активных устройств вы можете исполнить следующее:

      
      $ ifconfig
            ...
          ens4: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500             >------------ Это то устройство, которое вы проверили rping на этапе предварительной подготовки
          inet 11.1.1.14  netmask 255.255.255.0  broadcast 11.1.1.255
          inet6 ffff::ffff:ffff:ffff:ffff  prefixlen 64  scopeid 0x20
          ether ff:ff:ff:ff:ff:ff  txqueuelen 1000  (Ethernet)
          RX packets 10104263288  bytes 646758740138 (602.3 GiB)
          RX errors 0  dropped 0  overruns 0  frame 0
          TX packets 13  bytes 2410 (2.3 KiB)
          TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0
      
      $ cat /sys/class/net/ens4/device/infiniband/mlx5_0/ports/*/state
           4: ACTIVE
      ----------> mlx5_0 - это ваше активное устройство
       	   
    3. Выполните:

      
      ceph-deploy --overwrite-conf mon create-initial
      ceph-deploy --overwrite-conf admin  "list_all_nodes"
       	   

      Для каждого узла из перечня list_all_nodes выполните:

      
      sudo chmod +rx /etc/ceph/ceph.client.admin.keyring
       	   
  3. Настройте все демоны OSD Ceph.

    Выполните для каждого диска на всех узлах:

    
    ceph-deploy --overwrite-conf disk zap node1:sdb
    ceph-deploy ---overwrite-conf osd prepare node1:sdb
    ceph-deploy osd activate node1:"sdb"1
     	   
    [Замечание]Замечание

    Имеющийся минимум числа узлов OSD должен быть то же значение, которое имеет число, определённое для реплик. Значение реплик по умолчанию равно 3.

  4. Для проверки состояния исполните ceph -s. Вы должну увидеть в качестве отклика HEALTH_OK или нечто подобное.

    
    # ceph -s
    
        health *HEALTH_OK*
    
          monmap e1: 1 mons at {r-aa-zorro002=2.2.68.102:6789/0}
                 election epoch 7, quorum 0 r-aa-zorro002
          *osdmap e26: 3 osds: 3 up, 3 in*
                 flags sortbitwise,require_jewel_osds
          pgmap v57: 64 pgs, 1 pools, 0 bytes data, 0 objects
                 101 MB used, 1381 GB / 1381 GB avail
                       64 active+clean
     	   

Поздравляем! Ваш RDMA Ceph кластер поднят и работает!

[Замечание]Замечание

Если вы обнаружили некоторое предостережение Number of placement-groups (pg), изучите Placement Groups — Ceph Documentation для получения дополнительной информации.

Решения RDMA/ RoCE

Асинхронная система сообщений Ceph

Данный материал является переводом материалов Ceph Async Messenger с обновлениями от 28.12.2016.

  Обзор

Исходный код Ceph имеет три вида реализации сетевого взаимодействия. В самой ранней реализации SimpleMessenger для каждой пары взаимодействия между имеющимися равноправными участниками создавалось четыре потока (thread) чтобы поддерживать соответствующее состояние соединения (два потока на каждой стороне несут ответственность за чтение и запись). Таким образом, по мере роста кластера это приводит к созданию большого числа потоков. Благодаря реализации Linux epoll на помощь приходит высокое распараллеливание сетевого ввода/ вывода за счёт таких системных вызовов epoll, как библиотека libevent. Исходный код также реализует на основе epoll систему AsyncMessenger, которая помогает снизить общее число потоков, необходимое для сетевого обмена в имеющемся кластере. Имеющаяся в настоящее время {Прим. пер.: 28 декабря 2016} ещё пока не стабильная и не является компонентой взаимодействия по умолчанию, однако в будущем заменит SimpleMessenger.

  Сервер

Для прослушивания порта, ожидания поступления запросов на соединение и последующего приёма запросов на установление некоторого соединения для взаимодействия необходим соответствующий сервер.

 

Инициализация

Рассмотрим в качестве примера OSD. При запуске его процесса будет создан некий объект Messenger , применяемый для управления текущим сетевым соединением, прослушивания порта и получения всех запросов, его исходный код находится в соответствующем файле /ceph_osd.cc:


int main(int argc, const char **argv) 
{ 
  ...... 
  // public --используется для взаимодействия с клиентом
  Messenger *ms_public = Messenger::create(g_ceph_context, g_conf->ms_type, 
                                           entity_name_t::OSD(whoami), "client", getpid()); 
  // cluster --применяется для внутреннего взаимодействия кластера
  Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type, 
                                            entity_name_t::OSD(whoami), "cluster", 
                                            getpid()); 
  ...... 
  } 
  // src/msg/Messenger.cc 
  Messenger *Messenger::create(CephContext *cct, const string &type, 
                               entity_name_t name, string lname, 
                               uint64_t nonce) 
  { 
  ...... 
  // в исходном файле /common/config_opts.h вам необходимо настроить связанные асинхронным доступом опции, чтобы они вступили в действие
  // OPTION(enable_experimental_unrecoverable_data_corrupting_features, OPT_STR, "ms-type-async") 
  // OPTION(ms_type, OPT_STR, "async") 
  else if ((r == 1 || type == "async") && 
            cct->check_experimental_feature_enabled("ms-type-async")) 
    return new AsyncMessenger(cct, name, lname, nonce); 

  ...... 
  return NULL; 
} 
 	   

Конструкторы класса AsyncMessenger должны быть осведомлены о том, что хотя все шесть систем обмена сообщения создаются в процессе начального запуска процесса OSD, однако все они совместно используют некий WorkerPool, причём функция lookup_or_create_singleton_object гарантирует что создаётся только один пул, так как её имя во входных параметрах, WorkerPool::name является одним и тем же:


AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, 
                               string mname, uint64_t _nonce) 
  : SimplePolicyMessenger(cct, name,mname, _nonce), 
    processor(this, cct, _nonce), 
    lock("AsyncMessenger::lock"), 
    nonce(_nonce), need_addr(true), did_bind(false), 
    global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"), 
    cluster_protocol(0), stopped(true) 
{ 
  ceph_spin_init(&global_seq_lock); 
  cct->lookup_or_create_singleton_object<WorkerPool>(pool, WorkerPool::name); // создаём объект pool, 
  //                                 отметим, что второй аргумент является статической константой в WorkerPool
  // Создаём некий объект локального соединения для отправки сообщения самому данному процессу
  local_connection = new AsyncConnection(cct, this, &pool->get_worker()->center);
  init_local_connection(); //  инициализируем необходимый локальный объект
  } 
  
  template<typename T> 
  void lookup_or_create_singleton_object(T*& p, const std::string &name) { 
    ceph_spin_lock(&_associated_objs_lock); 
    if (!_associated_objs.count(name)) { // name определяет некий процесс, который будет иметь только какой-то pool 
      p = new T(this); // new некий объект, здесь это WorkerPool 
    _associated_objs[name] = reinterpret_cast<AssociatedSingletonObject*>(p); // присоединить map 
  } else { 
    p = reinterpret_cast<T*>(_associated_objs[name]);
  } 
  ceph_spin_unlock(&_associated_objs_lock); 
} 
 	   

Также отметим что в таком конструкторе данной системы сообщений размещается только единственный пул данного процесса, причём деструктор не отвечает за высвобождение его памяти, так как множество систем сообщений являются использующими его совместно и то что некая система сообщений удалена не означает что прочие системы сообщений будут также несомненно удалены. Основной указатель на пул хранится как участник CephContext в переменной _associated_objs, поскольку данный процесс демона имеет некий глобально уникальный CephContext, который высвобождает имеющуюся память основного указателя на пул при деструкции его CephContext.

Все процессы OSD будут иметь только какой- то WorkerPool во время инициализации чтобы сделать некую работу? Как предполагает его имя, WorkerPool несомненно используется для управления конкретным Worker, именно конструктором несомненно нового класса объектов Worker и причём наследуемый из имеющегося класса thread класс Worker несомненно является неким отодельным рабочим потоком. В исходном коде src/msg/async/AsyncMessenger.[h|c]:


WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false), 
                       barrier_lock("WorkerPool::WorkerPool::barrier_lock"), 
                       barrier_count(0) 
{ 
  assert(cct->_conf->ms_async_op_threads > 0); 

  for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) { 
    Worker *w = new Worker(cct, this, i); // Новый класс объектов Worker
    workers.push_back(w); // Хранится в данном контейнере вектора, применяется для отслеживания всех workers
  } 

  ...... 
} 

class Worker : public Thread { // Наследует класс Thread, отображает тот факт, что самм класс  Worker содержит потоки
  static const uint64_t InitEventNumber = 5000; // Общее число событий 
  static const uint64_t EventMaxWaitUs = 30000000; // Значение максимального времени ожидания события, 30 секунд 
  CephContext *cct; 
  WorkerPool *pool; 
  bool done; 
  int id; 

 public: 
  EventCenter center; // Центр обработки событий 
  Worker(CephContext *c, WorkerPool *p, int i) 
    : cct(c), pool(p), done(false), id(i), center(c) { 
    center.init(InitEventNumber); // Инициализация выполняется событиями, 
	                  // на самом деле служит для инициализации относящейся к epoll структуре
  } 
  void *entry(); 
  void stop(); 
};
 	   

Для общности всего кода здесь присутствует некий отдельный уровень абстракции, а именно EventCenter, который применяется для управления различными событиями в имеющемся драйвере, такими как epoll, kqueue, select и тому подобными. Исходный код в src/msg/async/Event.[h]c]:


class EventCenter { 
  ...... 

  FileEvent *file_events; // Все события ввода/ вывода 
  EventDriver *driver; // Определённый драйвер 
  map<utime_t, list<TimeEvent> > time_events; // Все события времени 
  ...... 
}; 

// Интерфейс EventDriver
// epoll Данный драйвер наследует этот интерфейс, 
// причём реализация данного интерфейса является тремя системными вызовами epoll
// epoll_create, epoll_ctl,epoll_wait данного пакета 
class EventDriver { 
 public: 
   virtual ~EventDriver() {} // мы хотим иметь виртуальный деструктор!!! 
   virtual int init(int nevent) = 0; 
   virtual int add_event(int fd, int cur_mask, int mask) = 0; 
   virtual void del_event(int fd, int cur_mask, int del_mask) = 0; 
   virtual int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0; 
   virtual int resize_events(int newsize) = 0; 
 }; 
 
 class EpollDriver : public EventDriver { 
   int epfd; // epoll fd 
   struct epoll_event *events; // Ожидаем появления определения структуры данного указателя на событие, 
                               // вы можете просмотреть соответствующую информацию Epoll
   ephContext *cct; 
   int size; 
   ...... 
 }; 
 	   

Функция конструктора Worker, вызывает center функции init чтобы просмотреть что делает center.init?


int EventCenter::init(int n) 
{ 
  ...... 
  driver = new EpollDriver(cct); // Создаём некий новый объект драйвера 

  int r = driver->init(n); // Инициализируем определённый драйвер 

  int fds[2]; // pipe Применяется для пробуждения данного потока worker, далее мы проанализируем это 
  if (pipe(fds) < 0) { 
    lderr(cct) << __func__ << " can't create notify pipe" << dendl; 
    return -1; 
  } 

  notify_receive_fd = fds[0]; 
  notify_send_fd = fds[1]; 

  ...... 

  create_file_event(notify_receive_fd, EVENT_READABLE, EventCallbackRef(new C_handle_notify())); // Наблюдает за событием чтения конвейера
  return 0; 
} 

// Инициализация epoll
int EpollDriver::init(int nevent) 
{ 
  events = (struct epoll_event*)malloc(sizeof(struct epoll_event)*nevent); // nevent является значением InitEventNumber в определённом классе Worker
  memset(events, 0, sizeof(struct epoll_event)*nevent); 
  epfd = epoll_create(1024); // Получить fd epoll 
  fd size = nevent; return 0; 
} 
 	   

От самого первого процесса OSD к классу AsyncMessenger, затем ко всем совместно использующим WorkerPool системам сообщений и затем инициализируя определённый процесс единственного пула для всех Worker, а потом worker при помощи универсального обработчика всех событий EventCenter и инициализирует данный определённый механизм обработки события, например, epoll. Вся работа выполнена? В действительности, во- первых данный поток worker не запущен, а во- вторых, процесс системы сообщений не связан с неким определённым портом для наблюдения, поэтому OSD запускает тот процесс, который должен предпринять прочие шаги.

 

Привязка и ожидание

После создания системы сообщений установите стратегию и текущие пределы всех параметров, следующими будут привязка адреса, обработка имеющегося сетевого уровня сокета, например socket/bind/listen/accept, причём в основном с управлением через класс Processor.


// Продолжение кода ceph_osd.cc
int main(int argc, const char **argv) 
{ 
  ...... 
  // Устанавливаем протокол 
  ms_cluster->set_cluster_protocol(CEPH_OSD_PROTOCOL); 
  ...... 

  // Устанавливаеем все политики и вс текущие пределы 
  ms_public->set_default_policy(Messenger::Policy::stateless_server(supported, 0)); 
  ms_public->set_policy_throttlers(entity_name_t::TYPE_CLIENT, 
                                      client_byte_throttler.get(), 
                                      client_msg_throttler.get()); 
  ...... 

  // Связываем имеющийся адрес 
  r = ms_public->bind(g_conf->public_addr); 
  if (r < 0) 
    exit(1); 
  r = ms_cluster->bind(g_conf->cluster_addr); 
  if (r < 0) 
    exit(1); 

  ...... 
  ms_public->start(); // Запускаем поток (thread)

  ...... 
  err = osd->init(); // Это ключ, анализ далее 

  ...... 
  ms_public->wait(); // Ожидание окончания потока 
  ...... 
}
 
int AsyncMessenger::bind(const entity_addr_t &bind_addr) 
{ 
  ...... 

  // привязываемся к некоторому сокету 
  set<int> avoid_ports; 
  int r = processor.bind(bind_addr, avoid_ports); // Вызов для обработки объекта processor 

  ...... 
} 

// processor --Действие происходит прямо в API socket пакетов: socket, bind, listen 
// Создаём некий сокет, привязываем его к определённому порту и переходим в ожидание (listen) 
int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports) 
{ 
  ...... 
  listen_sd = ::socket(family, SOCK_STREAM, 0); 

  ...... 
  rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr.ss_addr(), listen_addr.addr_size()); 

  ...... 
  rc = ::listen(listen_sd, 128); 

  ...... 
  msgr->init_local_connection(); // Обновляем имеющийся адрес, однако поскольку отсутствует объект координации,
                                    // данное соединение не обрабатывается
  return 0; 
} 

void init_local_connection() { 
  Mutex::Locker l(lock); 
  _init_local_connection(); 
} 

void _init_local_connection() { 
  assert(lock.is_locked()); 
  local_connection->peer_addr = my_inst.addr; 
  local_connection->peer_type = my_inst.name.type(); 
  ms_deliver_handle_fast_connect(local_connection.get()); 
} 

void ms_deliver_handle_fast_connect(Connection *con) { 
  for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); // fast_dispatchers в настоящее время пуст 
       p != fast_dispatchers.end(); 
	   ++p) 
    (*p)->ms_handle_fast_connect(con); 
  }
 	   
 

Обработка события

При привязке адреса для мониторинг порта будет выполняться ожидание момента когда соединение начнёт дело с запросом на такое соединение. Обязательно ли необходимо создать поток worker для решения данной задачи?


// ceph_osd.cc --Продолжим вызывать messenger->start(), см. предыдущий код 
int AsyncMessenger::start() 
{ 
  lock.Lock(); 
  ...... 
  pool->start(); // Запускаем все потоки (thread) 

  lock.Unlock(); 
  return 0; 
} 

void WorkerPool::start() 
{ 
  if (!started) { 
    for (uint64_t i = 0; i < workers.size(); ++i) { 
      workers[i]->create(); // Создаём некий поток 
    } 
    started = true; 
  } 
} 

// Функция элемента потока 
void *Worker::entry() {
  ...... 

  center.set_owner(pthread_self()); 
  while (!done) { // Потоки зацикливаются по событиям 
    int r = center.process_events(EventMaxWaitUs); // При обслуживании событий центр обработки событий, 
	                                               // отметим, что максимальное время ожидания 30 секунд
  } 

  return 0; 
} 

// Возвращаем все готовые fd через epoll_wait, а затем сразу вызываем его callback 
int EventCenter::process_events(int timeout_microseconds) 
{ 
  ...... 
  vector<FiredFileEvent> 
  fired_events; 
  next_time = shortest; 
  numevents = driver->event_wait(fired_events, &tv); // Получить текущее событие ввода/ вывода 
  for (int j = 0; j < numevents; j++) { 
    int rfired = 0; 
    FileEvent *event; 
    { 
      Mutex::Locker l(file_lock); 
      event = _get_file_event(fired_events[j].fd); 
    } 
    if (event->mask & fired_events[j].mask & EVENT_READABLE) { 
      rfired = 1; 
      event->read_cb->do_request(fired_events[j].fd); // Обработка доступных на чтение событий 
    } 

    if (event->mask & fired_events[j].mask & EVENT_WRITABLE) { 
      if (!rfired || event->read_cb != event->write_cb) 
        event->write_cb->do_request(fired_events[j].fd); // Дело с доступными для записи событиями 
      } 
    } 
    ...... 
} 

int EpollDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tvp) 
{ 
  int retval, numevents = 0; retval = epoll_wait(epfd, events, size, 
                                                 tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); // epoll_wait
												                               // --некий системный вызов, событие готовности ожидания или таймаут
    for (j = 0; j < numevents; j++) { 
      int mask = 0; 
      struct epoll_event *e = events + j; 

      if (e->events & EPOLLIN) mask |= EVENT_READABLE; 
      if (e->events & EPOLLOUT) mask |= EVENT_WRITABLE; 
      if (e->events & EPOLLERR) mask |= EVENT_WRITABLE; 
      if (e->events & EPOLLHUP) mask |= EVENT_WRITABLE; 
	  // Записываем что произошло
      fired_events[j].fd = e->data.fd; 
      fired_events[j].mask = mask; 
    } 

  return numevents; 
} 
 	   

Функция Process_events, стоит отметить, что здесь мы имеем дело с тремя событиями, а также относящимися к данным событиям чтения и записи fd, относящимися ко времени событиями по установленному времени, а также всеми дополнительными внешними событиями в связи с fd, если нет fd готовых к ожиданию таймаута (максимальный тайаут не превышает имеющегося значения для следующего события по времени). Однако, при данном процессе имеются две ситуации, которые должны пробуждаться, а именно, первая состоит в добавлении некоторого меньшего интервала времени (только что произошедшего), а другая состоит в добавлении некоторого внешнего события.

 

Добавление ожидания fd

Поток (thread) worker в цикле обрабатывает события, фактически, вызывает epoll_wait, возвращает определённому готовому событию fd, а затем вызывает соответствующий fd обратный вызов read_cb или write_cb. Совершенно ясно, что чтобы epoll_wait мог вернуть готовому событию fd, такой fd должен быть добавлен заранее. Когда же его добавить? Также запомните, что на втором шаге, связывания, для управления epoll, чтобы ожидать необходимый запрос от такого fd должен быть добавлен listen_fd, созданный классом Processor.

Однако, похоже, из подлежащего здесь для исполнения кода OSD не добавляется действие? Вызываемый в OSD messenger->start() приводит к:


err = osd->init();
 	   

Необходимый трюк тут:


int OSD::init() 
{ 
  ...... 
  // я готов! 
  client_messenger->add_dispatcher_head(this); 
  cluster_messenger->add_dispatcher_head(this); 
  ...... 
} 

void add_dispatcher_head(Dispatcher *d) { 
  bool first = dispatchers.empty(); // Самое начало курса, пусто, для истинности first
  dispatchers.push_front(d); 
  if (d->ms_can_fast_dispatch_any()) 
    fast_dispatchers.push_front(d); 
  if (first) ready(); // Готов добавить fd к epoll 
} 

void AsyncMessenger::ready() 
{ 
  ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; 
  Mutex::Locker l(lock); 
  Worker *w = pool->get_worker(); // Получить один worker для работы 
  processor.start(w); // listen_sd если внутри Processor
} 

int Processor::start(Worker *w) 
{ 
  ldout(msgr->cct, 1) << __func__ << " " << dendl; // Запуск потока (thread)
  if (listen_sd > 0) { 
    worker = w; // Создаём доступные для чтения события и, со временем, вызываем epoll_ctl чтобы listen_sd добавил epoll для управления 
    w->center.create_file_event(listen_sd, EVENT_READABLE, 
                                   EventCallbackRef(new C_processor_accept(this))); // Обратите внимание на происходящий со временем обратный вызов (callback) 
  } 
  return 0; 
} 
 	   
 

Приём соединения

После всего процесса инициализации добавляется ожидание (Listen) fd, даже если всё завершено. Когда появится новый запрос на соединение, как уже упоминалось ранее, поток worker вызовет необходимую функцию process_event и выполнится обратный вызов:


// listen fd --Обратный вызов (callback) 
class C_processor_accept : public EventCallback 
{ 
  Processor *pro; 

 public: 
  C_processor_accept(Processor *p): pro(p) {} 
  void do_request(int id) { 
    pro->accept(); // Обратный вызов 
  } 
}; 

void Processor::accept() 
{ 
  while (errors &дt; 4) { 
    entity_addr_t addr; 
    socklen_t slen = sizeof(addr.ss_addr()); 
    int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen); // Принять запрос на соединение 
    if (sd >= 0) { 
      msgr->add_accept(sd); // Принять системой сообщений тот сокет sd, который обрабатывается этой системой (messenger) 
      continue; 
    } else { 
    ...... 
    } 
  } 
} 

AsyncConnectionRef AsyncMessenger::add_accept(int sd) 
{ 
  lock.Lock(); 
  Worker *w = pool->get_worker(); 
  AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center); // Создать некое соединение 
  w->center.dispatch_event_external(EventCallbackRef(new C_conn_accept(conn, sd))); // Распространение события, новое внешнее соединение, поэтому вызов внешний
  accepting_conns.insert(conn); // Записываем то соединение, которое вступит в силу и будет удалено из собрания после окончательного завершения
  lock.Unlock(); 
  return conn; 
} 

void EventCenter::dispatch_event_external(EventCallbackRef e) 
{ 
  external_lock.Lock(); 
  external_events.push_back(e); // Помещаем данную функцию обратного вызова события в имеющуюся очередь центра обработки событий и ожидаем исполнения 
  external_lock.Unlock(); 
  wakeup(); // Пробуждаем поток (thread) worker 
} 
 	   

Не очень понятно зачем нужно помещать эту очередь, ожидать следующего вызова worker process_event, когда это может быть реализовано напрямую?

В любом случае, что касается данной очереди и необходимости реализовать обратный вызов очереди, когда он будет реализованы? Очевидно, что в потоке worker для функции process_event, но поток worker может заснуть в epoll_wait (управление epoll всеми fd не готово, может лишь ожидать тайм-аута), если имеется некое новое соединение, вам нужно немедленно получать запросы на соединение, поэтому возбуждаем пробуждение. Данный поток записывается в имеющейся функции EventCenter :: init (), так что другая сторона доступна для чтения, т.е. Notify_receive_fd готов, epoll_wait будет записан в конец канала. Верните его доступное доя читаемое событие, а затем выполните его обратный вызов (обратный вызов - это простое чтение канала) с тем, чтобы поток worker продолжал обработку, а после этого просто помещался в обратный вызов очереди.


void EventCenter::wakeup() 
{ 
  ldout(cct, 1) << __func__ << dendl; 
  char buf[1]; 
  buf[0] = 'c'; // wake up "event_wait" 
  int n = write(notify_send_fd, buf, 1); // Прбудить поток worker
  // FIXME ? -- Исправить?
  assert(n == 1); 
} 

int EventCenter::process_events(int timeout_microseconds) 
{ 
  ...... 

  numevents = driver->event_wait(fired_events, &tv); // Первоначально поток worker может находиться здесь в спящем состоянии и будет тут пробуждён со стороны wakeup

  // На данный момент имеется готовым по крайней мере один fd, а именно, notify_receive_fd
  // Данная реализация всего обратного вызова fd, причём для notify_receive_fd, как вы можете обнаружить, обратный вызов, является неким простым чтением, что не витеевато
  for (int j = 0; j < numevents; j++) { 
        ...... 
    event->read_cb->do_request(fired_events[j].fd); 
        ..... 
  } 

  ...... 
  // Немедленно следуем за данной очередью, именно она является поставленной целью для пробуждения worker
  external_lock.Lock(); 
  while (!external_events.empty()) { 
    EventCallbackRef e = external_events.front(); 
    external_events.pop_front(); 
    external_lock.Unlock(); 
    if (e) e->do_request(0); // Вызов запоса на обратный вызов 
	external_lock.Lock(); 
  } 
  external_lock.Unlock(); 
  } 
  ...... 
}
 	   
 

Добавление приёма fd

Из данного анализа заключаем, что такой запрос вызова обратного вызова исполнится быстро. В передней части такого запроса получается приём fd, теперь необходимо добавить такую структуру fd epoll, управление, и после этого вы можете осуществлять взаимодействие, обратные вызовы со временем выполнят все эти вещи:


// Определение типа обратного вызова в данной очереди 
class C_conn_accept : public EventCallback { 
  AsyncConnectionRef conn; 
  int fd; 

   public: C_conn_accept(AsyncConnectionRef c, int s): conn(c), fd(s) {} 
    void do_request(int id) { 
     conn->accept(fd); 
    } 
  }; 

void AsyncConnection::accept(int incoming) 
{ 
  ldout(async_msgr->cct, 10) << __func__ << " sd=" << incoming << dendl; 
  assert(sd < 0); 
  sd = incoming; 
  state = STATE_ACCEPTING; 
  center->create_file_event(sd, EVENT_READABLE, read_handler); // Sd является успешным подключением fd, добавлено управление epoll
  process(); // Запускает своё исполнение машина состояния серверной стороны и вначале отправляет сообщение BANNER обратившемуся клиенту
}
 	   
 

Взаимодействие

Отметим, что самым начальным состоянием машины состояния AsyncConnection данного сервера является STATE_ACCEPTING и это состояние данного сервера является самым первым для отправки сообщения BANNER своему клиенту. Позже, приняв данное сообщение, поток (thread) worker вызовет обработку read_handler, а после этого вызовет процесс, причём машина состояния не остановит преобразование состояний:


// Зарегистрированный класс обратного вызова 
class C_handle_read : public EventCallback { 
  AsyncConnectionRef conn; 

 public: 
  C_handle_read(AsyncConnectionRef c): conn(c) {} 
  void do_request(int fd_or_id) { 
    conn->process(); // Вызов обработки соединения 
  } 
}; 

void AsyncConnection::process() 
{ 
  int r = 0; 
  int prev_state = state; 
  Mutex::Locker l(lock); 
  do { 
    prev_state = state; 

        // Машина состояний соединения 
    switch (state) { 
      case STATE_OPEN: 
           ...... 

           default: 
        { 
           if (_process_connection() < 0) 
             goto fail; break; 
        } 
    } 
  } 

  return 0; 

  fail: 
  ...... 
} 

// Отдельная обработка информации о соединении 
int AsyncConnection::_process_connection() 
{ 
  int r = 0; 

  switch(state) { 
    case STATE_WAIT_SEND: 
    ...... 
  } 
  ...... 
} 
 	   

AsyncConnection отвечает за имеющийся класс communication, чтобы понять все принципа этой машины состояний, вы должны понимать имеющийся прикладной уровень коммуникационного протокола Ceph, с которым вы можете ознакомиться в официальном поясняющем документе.

Инфраструктура AsyncMessenger, даже если вся вводная часть завершена, в случае поступления некоторого нового запроса будет повторять следующие этапы:

  • принять соединение (accept connection)

  • добавить принятие fd (add accept fd)

  • взаимодействие (communication)

Как можно увидеть, общее число потоков не растёт линейным образом одновременно с числом соединений, только в самом начале первичного запуска когда определённое число worker принимает решение стартовать.

  Клиент

Операции клиента в основном инициируют выполнение соединения, само установление соединения для взаимодействия. Все клиенты основаны на библиотеке librados и тогда могут соединяться со всем кластером через RadosClient:


int librados::Rados::connect()
{
    return client->connect();
}

int librados::RadosClient::connect()
{
    ......

    // Создать систему сообщений --messenger
    messenger = Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(-1),
            "radosclient", nonce);

    ......

    // Создать систему объектов --objecter
    // Отправлять сообщения, например, код librbd, который обрабатывается objecter
    // Для системы объектов требуются отправки системой сообщений, поэтому необходимо создать 
    //  определённую систему сообщений, переданную в имеющийся класс objecter 
    objecter = new (std::nothrow) Objecter(cct, messenger, &lmonclient,
                &finisher,
                cct->_conf->rados_mon_op_timeout,
                cct->_conf->rados_osd_op_timeout);


    // Аналогично, наблюдение за соединением также необходимо чтобы иметь дело с приёмом сообщений
    monclient.set_messenger(messenger);

    objecter->init();
    messenger->add_dispatcher_tail(objecter);
    messenger->add_dispatcher_tail(this);

    messenger->start();

    ......

    messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id())); // ID является глобально уникальным, поэтому вам необходимо получить его для наблюдения

    ......
}

 	   

Операция соединения только инициализирует имеющийся объект системы сообщений, для необходимого в действительности взаимодействия в качестве примера будет установлено некое соединение к op_submit objecter.cc:


ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc)
{
    ......
    int r = _get_session(op->target.osd, &s, lc);
    ......
}

int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc)
{
    ......

    // session --Не существует, создаст некий один новый сеанс.
    s->con = messenger->get_connection(osdmap->get_inst(osd));
    ......
}

ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest)
{
    ......
    conn = create_connect(dest.addr, dest.name.type());
    ......
}

AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
{
  // create connection
  Worker *w = pool->get_worker();
  AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center); // Создать connection
  conn->connect(addr, type); // Cоединение
  assert(!conns.count(addr));
  conns[addr] = conn;

  return conn;
}

void connect(const entity_addr_t& addr, int type)
{
    set_peer_type(type);
    set_peer_addr(addr);
    policy = msgr->get_policy(type);
    _connect();
}

void AsyncConnection::_connect()
{
  state = STATE_CONNECTING; // Это инициализирующее состояние является кртитчески важным и является отправной точкой машины состояний данного клиента
  stopping.set(0);
  center->dispatch_event_external(read_handler); // Помещает в очередь ожидания данный обработчик worker
}
 	   

Здесь в точности так же как и раньше, worker будет иметь дело с такими внешними событиями, а read_handler вызовет функцию процесса, за чам последует исполнение _process_connection:


int AsyncConnection::_process_connection()
{
  int r = 0;

  switch(state) {

    case STATE_CONNECTING: // Начальное состояние
      {
        ......

        sd = net.connect(get_peer_addr()); // Через данный класс сетевой функции, в действительности, 
		                                   // вызывается системный вызов соединения, который устанавливает взаимодействие сокета
	

        // После успешного соединения fd данного сокета будет добавлен в имеющийся epoll для управления
        center->create_file_event(sd, EVENT_READABLE, read_handler);
        state = STATE_CONNECTING_WAIT_BANNER;
        break;
      }
  }
}
 	   

Следующий этап состоит во взаиодействии саого клиента с имеющейся серверной стороной, осуществляемое через выполнение имеющейся машины состояний AsyncConnection. Аналогично, даже если данный клиент создаёт множество систем сообщений (messenger), они всё ещё совместно используют некий рабочий пул, и общее число потоков (thread) определяется инициализацией такого пула, а не растёт линейно по мере роста числа соединений.

  Выводы

Все AsyncMessenger в данном процессе совместно применяют некий workerpool для управления имеющимися worker.

Потоки worker отвечают за определённую обработку событий в имеющемся EventCenter (центре обработки событий).

Весь обмен сетевого уровня приложений обрабатывается машиной состояний AsyncConnection.

Стек асинхронной системы сообщений Ceph

Данный материал является переводом материалов Ceph Asyncmessenger Stack с обновлениями от 12.01.2017.

  Введение

С быстрым развитием аппаратных средств и оборудования узкие места системы хранения постепенно перемещаются в само программное обеспечение такой системы хранения, что приводит к проявлению роли DPDK / SPDK, таким компонентам разработки, которые помогают системным разработчикам развивать программное обеспечение систем хранения с применением самых последних технологий для улучшения производительности.

Для Ceph это многократно усиливается благодаря разработчиками открытого исходного кода распределённых систем хранения направленных на то, чтобы логично воспользоваться такой новой технологией, лежащей в основе автономного механизма хранения, последней версии BlueStore, которая поддерживает SPDK. Для сетевого уровня Ceph AsyncMessenger это уровень, который добавляет абстракцию NetworkStack для поддержки различных протоколов стеков (Posix/ DPDK/ RDMA).

В своей более ранней статье мы подробно анализировали AsyncMessenger для такой рабочей нагрузки, когда имелся эталонный код Hummer, в целом, с той поры структура AsyncMessenger не изменилась и эта статья анализирует как ввести NetworkStack, некоторого уровня абстракции для поддержки различных протоколов стека, в основном, в качестве примера PosixStack. Для главного кода зафиксировано значение is5b97cce360fe1f6b15dfad0866d90c85262f8253.

  Инициализация

Для начала запустите конструктор:


AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, 
                string mname, uint64_t _nonce) 
        : SimplePolicyMessenger(cct, name,mname, _nonce), 
        ...... 
{ 
        ceph_spin_init(&global_seq_lock); 
        StackSingleton *single; 
        cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack&qout;); // Относящийся к некоторому уникальному стеку процесс, который присутствует до WorkerPool 
        stack = single->stack.get(); 
        stack->start(); // запускаем поток (thread) 
        ...... 
} 

struct StackSingleton { 
        std::shared_ptr<NetworkStack> stack; 
        StackSingleton(CephContext *c) { 
                stack = NetworkStack::create(c, c->_conf->ms_async_transport_type); // создать другой stack, зависящий от типа 
        } 
        ~StackSingleton() { 
                stack->stop(); 
        } 
}; 

NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c) 
{ 
        ...... 
        for (unsigned i = 0; i < num_workers; ++i) { 
                Worker *w = create_worker(cct, type, i); // создать worker 
                w->center.init(InitEventNumber, i); // инициализировать обработчик событий 
                workers.push_back(w); 
        } 
        ...... 
}
 	   

Ранее в конструкторе создавался только один WorkerPool, который применялся для управления всеми worker, а класс worker наследовался из класса потока (thread), который теперь замещён абстракцией NetworkStack, а сам thread перемещён во владельца stack для его достижения, и только соответствующий класс worker развит во всего один oneEventCenter, соответствующий каждому stack для реализации одного собственного worker. Ниже приводится взаимосвязь наследований:

 

Рисунок 1



Следующий шаг состоит в инициализации наших worker и потоков:


void NetworkStack::start() 
{ 
        ...... 
        for (unsigned i = 0; i < num_workers; ++i) { 
                if (workers[i]->is_init()) 
                        continue; 
                std::function<void ()> thread = add_thread(i); // Получает анонимную функцию данного элемента потока 
                spawn_worker(i, std::move(thread)); // Запускаем поток 
        } started = true; 
        ...... 
}

// Создаём реализацию своего потока PosixNetworkStack
virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override { 
        threads.resize(i+1); 
        threads[i] = std::thread(func); // Создаём некий поток с применением cli библиотеки thread 
}

// Точка входа в поток
std::function<void ()> NetworkStack::add_thread(unsigned i) 
{ 
        Worker *w = workers[i]; 
        return [this, w]() { 
                const uint64_t EventMaxWaitUs = 30000000; 
                w->center.set_owner(); 
                w->initialize(); 
                w->init_done(); 
                while (!w->done) { 
                        int r = w->center.process_events(EventMaxWaitUs); // Обработка событий 
                } 
        } 
        w->reset(); 
        w->destroy(); 
};
 	   

Здесь представлен конструктор класса AsyncMessenger, который будет перенаправлять первоначально выполненный готовый к обработке событий поток stack, в то время как имеющаяся точка входа данного обработчика событий worker влечёт за собой реализацию различных worker. Упомянутый ранее worker, относящийся к EventCenter, при том что сам EventCenter внутри соответствует некоторому EventDriver для DPDK, так как данный пользователь устанавливает определённый интерфейс опроса в котором он обращается к DPDKDriver (который всё ещё наследует EventDriver) и в свой EventCenter, добавляющий относящуюся к опросу структуру.

  Worker

Реализация класса абстракции worker достаточно проста и наиболее важными являются два интерфейса listen и connect, причём первый применяется для создания некоторого сокета с заданным адресом и данный сокет может обрабатывать следующий запрос контроллера, который применяется для создания некоторого успешно подключённого сокета и может через него читать и записывать данные.


class Worker { 
        unsigned id; // идентификатор worker 
        EventCenter center; // обработка события 

        ...... 
        virtual int listen(entity_addr_t &addr, 
                         const SocketOptions &opts, ServerSocket *) = 0; 
        virtual int connect(const entity_addr_t &addr, 
                         const SocketOptions &opts, ConnectedSocket *socket) = 0; 
        ...... 
};
 	   

Для выполнения различных протоколов стека, несомненно, реализация socket не одна и та же, вся разница оформляется здесь, таким образом, абстрагируемся двумя обычными обёртками классов ServerSocket и ConnectedSocket:


class ConnectedSocket { 
        std::unique_ptr<ConnectedSocketImpl> _csi; // зависит от реализации определённого стека протокола 
        ssize_t read(char* buf, size_t len) { // считать данные
                return _csi->read(buf, len); 
        } 
        ssize_t send(bufferlist &bl, bool more) { // записать данные 
                return _csi->send(bl, more); 
        } 
};

class ServerSocket { 
        std::unique_ptr<ServerSocketImpl> _ssi; // зависит от реализации определённого стека протокола 

        int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) { // Принимает запрос на соединение после того как первый параметр успешно подключился к необходимому сокету 
                return _ssi->accept(sock, opt, out, w); 
        } 
};
 	   

Взгляните, как и ранее, на реализацию posix:


int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt, 
                ServerSocket *sock) 
{ 
        int listen_sd = net.create_socket(sa.get_family(), true); // создаём сокет 
        r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len()); // привязываем его адрес 
        r = ::listen(listen_sd, 128); // начинаем ожидание 
        *sock = ServerSocket( 
                        std::unique_ptr<PosixServerSocketImpl>( 
                                new PosixServerSocketImpl(net, listen_sd))); // создать вернуть исполнение пользователю, вы можете принимать запрос на данный сокет
        return 0; 
}

int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) 
{ 
        int sd; sd = net.connect(addr); // инициализируем соединение 
        // создаём connectedsocket,возвращаем его пользователю, теперь вы можете применять этот сокет для send/read 
        *socket = ConnectedSocket(std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock))); 
        return 0; 
}
 	   

  Socket

Различные режимы взаимодействия реализуют различные абстракции интерфейса сокета, наследуемые как это отображено ниже:

 

Рисунок 2



В качестве примера возьмём реализацию posix:


int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) 
{ 
        int sd = ::accept(_fd, (sockaddr*)&ss, &slen); // Принять запрос на соединение

        std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true)); // создать некий сокет с успешным соединением 
        *sock = ConnectedSocket(std::move(csi)); // вернуть исполнение пользователю 
        return 0; 
} 
 	   

Реализация ConnectedSocket в основном является интерфейсом read/send, сооттветствующие read и sendmsg, а также прочие системные вызовы реализуются не здесь.

  Ожидание

Теперь мы понимаем, что для некоторых различных worker создаются различные стеки, причём worker через listen и connect создаёт поток ожидания (listen), который может отслеживать данный serversocket и может отправлять и принимать данные для connectedsocket, причём различные реализации соответствуют различным сокетам. Прочие модули async messenger достигаю надлежащую работу сетевого взаимодействия прочими модулями через реализацию двух абстракций сокета для ожидания данных, а также чтения и записи.

Для иллюстрации того привязать определённый процесс к некоторому конкретному адресу, ссылаемся на предыдущую статью: AsyncMessenger.bind -> Processor.bind()


int Processor::bind(const entity_addr_t &bind_addr, 
                const set<int>& avoid_ports, 
                entity_addr_t* bound_addr) 
{ 
        ...... 
        // Представить некое внешнее событие соответствующему центру событий worker в случае, когда 
        // реализация этого события является реализацией второго параметра анонимного объекта функции, 
        // т.е. вызов ожидания worker  
        worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() { 
                        r = worker->listen(listen_addr, opts, &listen_socket); // после обработки данного события listen_socket обновляется и может использоваться для приёма 
        }, false); 
        ...... 
}
 	   

И аналогично предыдущему имеется определённый сокет для наблюдения, тем не менее не присоединённый к центру обработки событий управления fd, либо вынужденный ожидать время инициализации osd: OSD.init() -> AsyncMessenger.ready() -> Processor.start()


void Processor::start() 
{ 
        // Представляет некое внешнее событие центру обработки событий worker. Когда такое событие обработано, 
        // имеется некий асинхронный объект функции, который исполняет имеющийся второй аргумент, 
        // т.е. добавляет fd в центр обработки событий для управления 
        if (listen_socket) { 
                worker->center.submit_to(worker->center.get_id(), [this]() { 
                        worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler); }, false); 
        } 
} 

// В случае наступления описанного выше события обратным вызовом является listen_handler, 
// таким образом, ниже приводится его реализация
void Processor::accept() 
{ 
        ....... 
        while (true) { 
                int r = listen_socket.accept(&cli_socket, opts, &addr, w); // Данная реализация ожидания сокета приёма 
                // после успешного выполнения первого значения параметра для надлежащего типа connectedsocket 
                // может применяться для отправки и приёма данных
        } 
        ...... 
} 
 	   

  Соединение

Давайте взглянем на некий пример клиентского соединения, рассмотренного в предыдущей статье: AsyncMessenger.create_connect() -> AsyncConnection.connect() -> AsyncConnection._process_connection()


ssize_t AsyncConnection::_process_connection() 
{ 
        ...... 
        case STATE_CONNECTING: { 
                ...... 
                r = worker->connect(get_peer_addr(), opts, &cs); // вызов соединения с worker
                center->create_file_event(cs.fd(), EVENT_READABLE, read_handler); // соединяется для управления 
                // с успешно созданным в центре обработки событий connectedsocket
                state = STATE_CONNECTING_RE; 
                break; 
        } 
}
 	   

Последующим этапом является чтение и отправки данных через ConnectedSocket, сам основной кадр данных не изменяется.

  Выводы

Итак, суммируем, введение некоторого уровня NetworkStack и соответствующего стека worker, основное применение которых состоит в интерфейсе ожидания и приёма, используются для создания неких абстрактов ServerSocket и ConnectedSocket, причём первый служит ожиданию входящих запросов, а второй применяется для чтения и записи данных. Основное изменение с AsyncMessenger состоит в вызове со стороны worker двух абстрактных интерфейсов для режима опроса DPDK, для соответствующих EventCenter DPDKDriver определённого worker добавляется специальный обработчик.

Сетевой интерконнект Ангара

Есть вопросы готовы попытаться ответить!