Глава 2. Ввод/ вывод мультиплексированных для улучшения производительности сокетов

В данной главе мы рассмотрим следующие рецепты:

  • Применение ForkingMixIn в ваших приложениях сервера сокета

  • Применение ThreadingMixIn в ваших приложениях сервера сокета

  • Написание некоего сервера чата с использованием select.select

  • Мультиплексирование какого- то веб сервера с применением elect.epoll

  • Мультиплексирование некоторого сервера echo с помощью библиотеки распараллеливания Diesel

Введение

Данная глава сосредотачивается на улучшении производительности имеющегося сервера сокета с помощью некоторых полезных техник. В отличии от предыдущей главы здесь мы рассматриваем множество клиентов, которые будут подключаться к данному серверу и всё взаимодействие может быть асинхронным. Данному серверу нет необходимости обрабатывать определённый запрос от клиентов каким- то блокируемым образом; они могут выполняться независимо друг от друга. Если один клиент потребляет больше времени для приёма или обработки данных, данному серверу не требуется ожидать этого. Он может сообщить прочим клиентам о применении отдельных потоков или процессов.

В данной главе мы исследуем имеющийся модуль select, который предоставляет зависимые от платформы функции отслеживания ввода/ вывода. Данный модуль строится поверх имеющегося системного вызова select лежащего в основе применяемой операционной системы ядра. Для Linux его страница руководства размещена по адресу http://man7.org/linux/man-pages/man2/select.2.html и вы можете обратиться к ней чтобы ознакомиться с доступными свойствами данного системного вызова {Прим. пер.: также рекомендуем наш перевод раздела 63.2, Мультиплексирование ввода/ вывода вышедшей в 2010г книги Майкла Керриска "Интерфейс программирования Linux".} Поскольку наш сервер пожелает взаимодействовать со множеством клиентов, select может быть очень полезным для отслеживания неблокируемых сокетов. Существуют сторонние библиотеки Python, которые также могут помочь нам иметь дело со множеством клиентов в одно и то же время. Мы включили один образец рецепта, применяющего библиотеку распараллеливания Diesel.

Хотя для достижения краткости мы будем применять два или несколько клиентов, читатели вольны расширить имеющиеся в данной главе рецепты и применять их для десятков и сотен клиентов.

Применение ForkingMixIn в ваших приложениях сервера сокета

Вы приняли решение написать некое приложение асинхронного сервера сокета Python. Данный сервер не будет блокироваться в процессе какого- то клиентского запроса. Поэтому данному серверу требуется некий механизм для ведения дел с каждым индивидуальным клиентом.

Класс SocketServer Python привносится с двумя классами утилит: ForkingMixIn и ThreadingMixIn. Имеющийся класс ForkingMixIn породит некий новый процесс для каждого клиентского запроса. Этот класс обсуждается в данном разделе. Следующий класс ThreadingMixIn будет обсуждаться в следующем разделе. Для получения дополнительной информации вы можете воспользоваться соответствующей документацией Python 2 по ссылке http://docs.python.org/2/library/socketserver.html и документацией Python 3 по ссылке http://docs.python.org/3/library/socketserver.html.

Как это сделать

Давайте перепишем наш сервер echo, первоначально разъяснённый в Главе 1, Сокеты, IPv4 и примеры программирования клиент/ сервер. Мы можем воспользоваться подклассами семейства класса SocketServer. Они имеют готовые серверы TCP, UDP и прочих протоколов. Мы можем создать некий класс ForkingMixIn, наследующий TCPServer и ForkingMixIn. Предшествующий родитель позволит нашему классу ForkingMixIn выполнять все необходимые серверные операции, которые мы ранее выполняли вручную, например, создание некоторого сокета, связывание с каким- то адресом, а также прослушивание входящих соединений. Наш сервер также нуждается в наследовании из ForkingMixIn для асинхронной обработки клиентов.

Класс ForkingMixIn также нуждается в настройке некоторого обработчика, который предписывает как обрабатывать некий запрос клиента. В данном случае наш сервер будет возвращать обратно эхом определённую текстовую строку, получаемую от конкретного клиента. Наш класс обработчика запроса, ForkingServerRequestHandler, наследуется из имеющегося BaseRequestHandler, предоставляемого библиотекой SocketServer.

Мы можем писать код своего клиента нашего сервера echo, ForkingClient неким объектно- ориентированным образом. В Python метод конструктора некоторого класса имеет название __init__(). В соответствии с соглашением он получает некий аргумент self для подключения атрибутов или свойств данного конкретного класса. Наш сервер echo ForkingClient будет проинициализирован в __init__() и будет отправлено надлежащее сообщение самому серверу соответствующим методом run().

Если вы совсем не знакомы с объектно- ориентированным программированием (ООП, OOP - object-oriented programming), может быть полезным просмотреть основные понятия ООП прежде чем вы попытаетесь ухватиться за этот рецепт.

Чтобы проверить наш класс ForkingServer, мы можем запустить множество клиентов echo и увидеть как имеющийся сервер отвечает обратно всем клиентам.

Листинг 2.1 показывает пример кода использования ForkingMixIn в некотором приложении сервера сокета следующим образом:

 

#!/usr/bin/env python
# Книга рецептов сетевого программирования Python, второе издание -- Глава - 2
# Данная программа оптимизирована под 3.5.2.
# Она может исполняться во всех прочих версиях с изменениями и/или без них.
# Чтобы она заработала с Python 2.7.x, необходимы некоторые обусловленные различиями API изменения.
# начните с замены "socketserver" на "SocketServer" по всей данной программе.
# Дополнительно: http://docs.python.org/2/library/socketserver.html
# Дополнительно: http://docs.python.org/3/library/socketserver.html

import os
import socket
import threading
import socketserver

SERVER_HOST = 'localhost'
SERVER_PORT = 0 # сообщает ядру о необходимости динамического захвата порта
BUF_SIZE = 1024
ECHO_MSG = 'Hello echo server!'

class ForkedClient():
    """ A client to test forking server"""
    def __init__(self, ip, port):
        # Создаём некий сокет
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # Подключаемся к необходимому серверу
        self.sock.connect((ip, port))
    def run(self):
        """ Client playing with the server"""
        # Отправка определённых данных серверу
        current_process_id = os.getpid()
        print ('PID %s Sending echo message to the server : "%s"' %(current_process_id, ECHO_MSG))
        sent_data_length = self.sock.send(bytes(ECHO_MSG, 'utf-8'))
        print ("Sent: %d characters, so far..." %sent_data_length)
        # Отображение отклика сервера
        response = self.sock.recv(BUF_SIZE)
        print ("PID %s received: %s" % (current_process_id, response[5:]))
    def shutdown(self):
        """ Cleanup the client socket """
        self.sock.close()

class ForkingServerRequestHandler(socketserver.BaseRequestHandler):
    def handle(self):
        # Обратная отправка echo соответствующему клиенту

        #received = str(sock.recv(1024), "utf-8")
        data = str(self.request.recv(BUF_SIZE), 'utf-8')

        current_process_id = os.getpid()
        response = '%s: %s' % (current_process_id, data)
        print ("Server sending response [current_process_id: data] = [%s]"%response)
        self.request.send(bytes(response, 'utf-8'))
        return

class ForkingServer(socketserver.ForkingMixIn,
                    socketserver.TCPServer,
                    ):
    """Nothing to add here, inherited everything necessary from parents"""
    pass

def main():
    # Запуск нашего сервера
    server = ForkingServer((SERVER_HOST, SERVER_PORT),
                            ForkingServerRequestHandler)
    ip, port = server.server_address # Retrieve the port number
    server_thread = threading.Thread(target=server.serve_forever)
    server_thread.setDaemon(True) # не отсоединяться после выхода
    server_thread.start()
    print ("Server loop running PID: %s" %os.getpid())
    # Запуск соответствующего клиента (клиентов)

    client1 = ForkedClient(ip, port)
    client1.run()

    print("First client running")
    client2 = ForkedClient(ip, port)
    client2.run()

    print("Second client running")

    # Очищаем их
    server.shutdown()
    client1.shutdown()
    client2.shutdown()
    server.socket.close()

if __name__ == '__main__':
    main()
 	   

Как это работает

Некий экземпляр ForkingServer запускается в самом главном потоке, который превращается в демон для исполнения в фоновом режиме. Теперь два имеющихся клиента начали взаимодействовать с данным сервером.

В нашем исполняемом сценарии это будет выглядеть следующим образом:


$ python 2_1_forking_mixin_socket_server.py
Server loop running PID: 26479
PID 26479 Sending echo message to the server :
"Hello echo server!"
Sent: 18 characters, so far...
Server sending response [current_process_id: data] = [26481: Hello echo server!]
PID 26479 received: b': Hello echo server!'
First client running
PID 26479 Sending echo message to the server : "Hello echo server!"
Sent: 18 characters, so far...
Server sending response [current_process_id: data] = [26482: Hello echo server!]
PID 26479 received: b': Hello echo server!'
Second client running
		

Имеющийся номер порта может быть другим в вашей машине, так как он выбирается динамически самим ядром операционной системы.

Применение ThreadingMixIn в ваших приложениях сервера сокета

Возможно, вы предпочтёте писать многопоточное приложение поверх некоторого процесса, которое по какой- либо причине, например, из- за совместного использования состояний данного приложения по потокам, избегая имеющуюся сложность межпроцессного взаимодействия, или ещё- либо почему. В данном случае если вам нужно писать некий асинхронный сетевой сервер с помощью библиотеки SocketServer, вам потребуется ThreadingMixIn.

Приготовление

Выполнив несколько незначительных изменений предыдущего рецепта вы можете получить рабочеспособную версию сервера сокета с применением ThreadingMixIn.

Как это сделать

Как отражено в нашем предыдущем сервере сокета на основе ForkingMixIn, сервер сокета ThreadingMixIn будет следовать тем же самым шаблонам кодирования за исключением нескольких моментов. Во- первых, наш ThreadedTCPServer будет унаследован из TCPServer и ThreadingMixIn. Данная многопоточная версия будет запускать некий новый поток при подключении к нему некоторого клиента. Некторые дополнительные подробности вы можете почерпнуть на http://docs.python.org/2/library/socketserver.html.

Имеющийся класс обработчика запроса нашего сервера сокета, ForkingServerRequestHandler, отправляет данное echo обратно своему клиенту из некоторого нового потока. Вы можете проверить здесь информацию данного потока. Для целей упрощения мы помещаем код данного клиента в некую функцию вместо какого- то класса. Код данного клиента создаёт определённый сокет клиента и отправляет заданное сообщение своему серверу.

Приведённый ниже Листинг 2.2 отображает пример кода определённого echo сервера сокета с применением ThreadingMixIn.

 

#!/usr/bin/env python
# Книга рецептов сетевого программирования Python, второе издание -- Глава - 2
# Данная программа оптимизирована под 3.5.2.
# Она может исполняться во всех прочих версиях с изменениями и/или без них.
# Чтобы она заработала с Python 2.7.x, необходимы некоторые обусловленные различиями API изменения.
# начните с замены "socketserver" на "SocketServer" по всей данной программе.
# Дополнительно: http://docs.python.org/2/library/socketserver.html
# Дополнительно: http://docs.python.org/3/library/socketserver.html

import os
import socket
import threading
import socketserver

SERVER_HOST = 'localhost'
SERVER_PORT = 0 # сообщает ядру о необходимости динамического захвата порта
BUF_SIZE = 1024

def client(ip, port, message):
    """ A client to test threading mixin server"""
    # Подключение к нашему серверу
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((ip, port))
    try:
        sock.sendall(bytes(message, 'utf-8'))
        response = sock.recv(BUF_SIZE)
        print ("Client received: %s" %response)
    finally:
        sock.close()

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
    """ An example of threaded TCP request handler """
    def handle(self):
        data = self.request.recv(1024)
        cur_thread = threading.current_thread()
        response = "%s: %s" %(cur_thread.name, data)
        self.request.sendall(bytes(response, 'utf-8'))

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    """Nothing to add here, inherited everything necessary from parents"""
    pass
    if __name__ == "__main__":
    # Исполняем сервер
    server = ThreadedTCPServer((SERVER_HOST, SERVER_PORT),
                                ThreadedTCPRequestHandler)
    ip, port = server.server_address # получаем ip адрес
    # Запуск некоторого потока для данного сервера -- по одному потоку на запрос
    server_thread = threading.Thread(target=server.serve_forever)
    # Выход из потока данного сервера после выхода из из потока main
    server_thread.daemon = True
    server_thread.start()
    print ("Server loop running on thread: %s"  %server_thread.name)
    # Исполнение клиентов
    client(ip, port, "Hello from client 1")
    client(ip, port, "Hello from client 2")
    client(ip, port, "Hello from client 3")
    # Очистка сервера
    server.shutdown()
 	   

Как это работает

Данный рецепт вначале создаёт некий поток сервера и запускает его в фоновом режиме. Затем он запускает трёх клиентов тестирования для отправки сообщений на данный сервер. В овет имеющийся сервер возвращает обратно echo полученных сообщений своим клиентам. В определённом методе handle() обработчика запросов данного сервера вы можете увидеть, что мы извлекаем информацию о текущем потоке и выводим её. Она должна отличаться для каждого соединения клиента.

В данном общении клиент/ сервер был применён метод sendall() для обеспечения отправки всех данных без каких- либо потерь.


$ python 2_2_threading_mixin_socket_server.py
Server loop running on thread: Thread-1
Client received: b"Thread-2: b'Hello from client 1'"
Client received: b"Thread-3: b'Hello from client 2'"
Client received: b"Thread-4: b'Hello from client 3'"
		

Написание сервера чата с помощью select.select

Запуск некоторого отдельного потока или процесса для каждого клиента может быть не желательным для какого- то крупного серверного приложения в котором несколько сотен или тысяч клиентов одновременно подключаются к данному серверу. Из- за имеющихся ограничений доступной памяти и мощности ЦПУ хоста нам требуется лучшая техника для обработки некоторого большого числа клиентов. К счастью, Python предоставляет для преодоления данной проблемы имеющийся в наличии модуль select.

Как это сделать

Нам необходимо написать некий рациональный сервер чата, который может обрабатывать нексолько сотен или даже большее число соединений клиентов. Мы будем применять метод select() из имеющегося модуля select, который позволяет нашим сетверу чата и клиенту выполнять любые задачи блокирования отправки или приёма вызова на протяжении всего времени исполнения.

Давайте спроектируем данный рецепт таким образом, чтобы некий отдельный сценарий мог запускать и клиент, и сервер с каким- то дополнительным параметром --name. Только в случае передачи в его командной строке --name=server данный сценарий запустит сам сервер чата. Все прочие значения передаются в аргументе --name, например, client1, client2, запустит некие клиенты чата. Давайте определим номер порта своего сервера чата в командной строке при помощи параметра --port. Для некоторого приложения большего размера может быть более предпочтительным написать отдельные модули для имеющихся сервера и клиентов.

Приводимый далее Листинг 2.3 отображает пример приложения чата с использованием select.select:

 

#!/usr/bin/env python
# Книга рецептов сетевого программирования Python, второе издание -- Глава - 2
# Данная программа оптимизирована под Python 2.7.12 и 3.5.2.
# Она может исполняться во всех прочих версиях с изменениями и/или без них.

import select
import socket
import sys
import signal
import pickle
import struct
import argparse

SERVER_HOST = 'localhost'
CHAT_SERVER_NAME = 'server'

# Некоторые утилиты
def send(channel, *args):
    buffer = pickle.dumps(args)
    value = socket.htonl(len(buffer))
    size = struct.pack("L",value)
    channel.send(size)
    channel.send(buffer)

def receive(channel):
    size = struct.calcsize("L")
    size = channel.recv(size)
    try:
        size = socket.ntohl(struct.unpack("L", size)[0])
    except struct.error as e:
        return ''
    buf = ""
    while len(buf) < size:
        buf = channel.recv(size - len(buf))
    return pickle.loads(buf)[0]
 	   

Приведённый метод send() получает один именованный канал аргумента и позиционный аргумент *args. Он упорядочивает все данные с помощью метода dumps() из импортированного модуля pickle. Имеющийся размер всех данных он определяет с помощью модуля struct. Аналогично, receive() получает один именованный параметр channel.

Теперь мы можем написать код класса ChatServer следующим образом:


class ChatServer(object):
    """ An example chat server using select """

    def __init__(self, port, backlog=5):
        self.clients = 0
        self.clientmap = {}
        self.outputs = [] # прослушиваем сокеты вывода
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server.bind((SERVER_HOST, port))
        print ('Server listening to port: %s ...' %port)
        self.server.listen(backlog)
        # Перехватываем прерывания клавиатуры
        signal.signal(signal.SIGINT, self.sighandler)

    def sighandler(self, signum, frame):
        """ Clean up client outputs"""
        # Закрываем данный сервер
        print ('Shutting down server...')
        # Закрываем имеющиеся сокеты клиента
        for output in self.outputs:
            output.close()
        self.server.close()

    def get_client_name(self, client):
        """ Return the name of the client """
        info = self.clientmap[client]
        host, name = info[0][0], info[1]
        return '@'.join((name, host))
 	   

Теперь основной исполняемый модуль нашего класса ChatServer должен выглядеть как следующий код:


def run(self):
    inputs = [self.server, sys.stdin]
    self.outputs = []

    running = True
    while running:
        try:
        readable, writeable, exceptional = select.select(inputs, self.outputs, [])
    except select.error as e:
        break

    for sock in readable:
        if sock == self.server:
            # обрабатываем сокет данного сервера
            client, address = self.server.accept()
            print ("Chat server: got connection %d from %s" %(client.fileno(), address))
            # Считываем имя регистрации
            cname = receive(client).split('NAME: ')[1]
            # Вычислем имя клиента и отправляем обратно
            self.clients += 1
            send(client, 'CLIENT: ' + str(address[0]))
            inputs.append(client)
            self.clientmap[client] = (address, cname)
            # Отправляем присоединённую информацию прочим клиентам
            msg = "\n(Connected: New client (%d) from %s)" %(self.clients, self.get_client_name(client))
            for output in self.outputs:
                send(output, msg)
            self.outputs.append(client)

        elif sock == sys.stdin:
            # обрабатываем стандартный ввод
            junk = sys.stdin.readline()
            running = False
        else:
            # обрабатываем все прочие сокеты
            try:
                data = receive(sock)
                if data:
                    # Отправляем как новое сообшение клиента...
                    msg = '\n#[' + self.get_client_name(sock)
                                 + ']>>' + data
                    # Отправляем данные всем за исключением себя
                    for output in self.outputs:
                        if output != sock:
                            send(output, msg)
                        else:
                            print ("Chat server: %d hung up" % sock.fileno())
                            self.clients -= 1
                            sock.close()
                            inputs.remove(sock)
                            self.outputs.remove(sock)

                            # Отправляем оставленную клиентом иформацию остальным
                            msg = "\n(Now hung up: Client from %s)" %self.get_client_name(sock)
                            for output in self.outputs:
                                send(output, msg)
            except socket.error as e:
                # Удаляем
                inputs.remove(sock)
                self.outputs.remove(sock)
self.server.close()
 	   

Данный сервер чата инициализируется несколькими атрибутами данных. Он сохраняет общее число клиентов, соответствие каждого клиента, а также сокеты вывода. Обычно применяемое создание сервера сокета также устанавливает определённый вариант повторного использования некоторого адреса с тем, чтобы не было проблем повторного запуска данного сервера снова с применением того же самого порта. Некий необязательный параметр невыполненных заданий для данного сервера чата устанавливает значение максимального числа находящихся в очереди соединения ожидаемых данным сервером.

Интересной стороной данного сервера чата является перехват всех пользовательских прерываний, обычно через клавиатуру с использованием модуля signal. Таким образом некий обработчик сигнала sighandler регистрируется для данного сигнала прерывания (SIGINT). Этот обработчик сигнала перехватывает все сигналы прерываний клавиатуры и закрывает все сокеты вывода где могут ожидать отправки данные.

Основной исполняемый метод нашего сервера чата run() выполняет свои операции внутри некоторого цикла while. Данный метод регистрируется с неким выбранным интерфейсом в котором имеющимся аргументом на входе является сокет сервера чата, stdin. Имеющийся аргумент вывода определяется имеющимся перечнем сокетов вывода данного сервера. В return select предоставляет три списка: сокетов чтения, записи и исключений. Данный сервер чата заинтересован только в сокетах доступных для чтения, в которых некоторые данные доступны к прочтению. Если такой сокет указывает на себя, тогда это будет означать установление некоторого нового клиентского соединения. Итак, сервер извлекает имя самого клиента и широковещательно сообщает эту информацию прочим клиентам. В другом случае, если что- то поступает из имеющихся на входе аргументов, данный сервер чата выполняет выход. Аналогично, данный сервер чата поступает и со всеми остальными сокетами вввода клиентов. Он передаёт все полученные от любого клиента данные остальным клиентам и также делится информацией о присоединении/ отключении.

Необходимый класс кода клиента чата должен содержать следующий код:


class ChatClient(object):
    """ A command line chat client using select """

    def __init__(self, name, port, host=SERVER_HOST):
        self.name = name
        self.connected = False
        self.host = host
        self.port = port
        # Начальное приглашение
        self.prompt='[' + '@'.join((name, socket.gethostname().split('.')[0])) + ']> '
        # Подключиться к серверу по порту
        try:
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.sock.connect((host, self.port))
            print ("Now connected to chat server@ port %d" % self.port)
            self.connected = True
            # Отправка моего имени...
            send(self.sock,'NAME: ' + self.name)
            data = receive(self.sock)
            # Содержит адрес клиента, устаналиваем его
            addr = data.split('CLIENT: ')[1]
            self.prompt = '[' + '@'.join((self.name, addr)) + ']> '
        except socket.error as e:
            print ("Failed to connect to chat server @ port %d" % self.port)
            sys.exit(1)

def run(self):
    """ Chat client main loop """
    while self.connected:
        try:
            sys.stdout.write(self.prompt)
            sys.stdout.flush()
            # Ожидаем ввод из stdin и сокета
            readable, writeable,exceptional = select.select([0, self.sock], [],[])
            for sock in readable:
                if sock == 0:
                    data = sys.stdin.readline().strip()
                    if data: send(self.sock, data)
                elif sock == self.sock:
                    data = receive(self.sock)
                    if not data:
                        print ('Client shutting down.')
                        self.connected = False
                        break
                    else:
                        sys.stdout.write(data + '\n')
                        sys.stdout.flush()
        except KeyboardInterrupt:
            print (" Client interrupted. """)
            self.sock.close()
break
 	   

Данный клиент чата инициализируется с неким аргументом имени и отправлет это имя своему серверу чата через соединение. Он также устанавливает некое индивидуальное приглашение [ name@host ]>). Исполняемый метод данного клиента run() продолжает его работу пока является активным имеющееся соединение с его сервером. Аналогичным образом со своим сервером чата, данный клиент также регистрируется с помощью select(). Если что- то в доступных для чтения сокетах готово, он позволяет данному клиенту получать данные. Если значение sock равно 0, а также имеются какие- то данные, тогда эти данные могут быть отправлены. Та же самая информация также отображается в stdout или, в нашем случае, в имеющейся консоли командной строки. Наш главный метод теперь должен получить аргументы командной строки и вызвать клиента либо сервер следующим образом:


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Socket Server Example with Select')
    parser.add_argument('--name', action="store", dest="name", required=True)
    parser.add_argument('--port', action="store", dest="port", type=int, required=True)
    given_args = parser.parse_args()
    port = given_args.port
    name = given_args.name
    if name == CHAT_SERVER_NAME:
        server = ChatServer(port)
        server.run()
    else:
        client = ChatClient(name=name, port=port)
        client.run()
 	   

Мы бы хотели исполнить данный сценарий трижды: один раз для своего сервера чата и дважды для двух клиентов чата. Для своего сервера мы передаём -name=server и port=8800. Для client1 мы изменяем аргумент имени на -name=client1, а для client2 мы помещаем -name=client2. Затем из приглашения client1 мы отправляем своё сообщение "Hello from client 1", которое выводится в имеющемся приглашении для нашего client2. Аналогично мы отправляем "hello from client 2" из имеющегося приглашения своего client2, которое отображается в имеющемся у client1 приглашении.

Имеющийся для данного сервера вывод таков:


$ python 2_3_chat_server_with_select.py --name=server --port=8800
Server listening to port: 8800 ...
Chat server: got connection 4 from ('127.0.0.1', 59254)
Chat server: got connection 5 from ('127.0.0.1', 59256)
		

Вывод для client1 такой:


$ python 2_3_chat_server_with_select.py --name=client1 --port=8800
Now connected to chat server@ port 8800
[client1@127.0.0.1]>
(Connected: New client (2) from client2@127.0.0.1)
[client1@127.0.0.1]> Hello from client1
[client1@127.0.0.1]>
#[client2@127.0.0.1]>>hello from client2
[client1@127.0.0.1]>
		

А вывод для client2 следующий:


$ python 2_3_chat_server_with_select.py --name=client2 --port=8800
Now connected to chat server@ port 8800
[client2@127.0.0.1]>
#[client1@127.0.0.1]>>Hello from client1
[client2@127.0.0.1]> hello from client2
[client2@127.0.0.1]>
		   

Всё взаимодействие отображено на следующем снимке экрана:

 

Рисунок 1


Сервер и клиенты чата

Как это работает

В самом верху своего модуля мы определили две функции утилит: send() и receive().

Данные сервер и клиенты чата пользуются этими функциями утилит, что было продемонстрировано выше. Все подробности данных методов сервера и клиента также были пояснены выше.

Мультиплексирование веб сервера с помощью select.epoll

Модуль Python select имеет некоторые зависящие от платформы функции управления сетевым событием. В машине Linux также доступен epoll. Он использует имеющееся ядро операционной системы, которое опросит события сетевой среды и позволит нашему сценарию узнать о том что что- то произошло. Это воспринимается как нечто намного более эффективное чем упомянутый выше подход select.select. {Прим. пер.: более подробные сведения вы можете найти в посвящённых API epoll разделах нашего перевода главы из книги 2010г Майкла Керриска "Интерфейс программирования Linux".}

Как это сделать

Давайте напишем некий простой веб сервер, который может возвращать какую- то отдельную строку текста любому подключённому веб браузеру.

Центральная идея состоит в самой инициализации данного веб сервера, а именно: нам следует вызов select.epoll() и зарегистрировать файловый дескриптор своего сервера для уведомлений о событиях. В самом исполняемом коде веб сервера все события сокета отслеживаются так:

 

Листинг 2.4 Простой веб сервер с применением select.epoll()


#!/usr/bin/env python
# Книга рецептов сетевого программирования Python, второе издание -- Глава - 2
# Данная программа оптимизирована под Python 2.7.12 и 3.5.2.
# Она может исполняться во всех прочих версиях с изменениями и/или без них.

import socket
import select
import argparse

SERVER_HOST = 'localhost'
EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
SERVER_RESPONSE = b"""HTTP/1.1 200 OK\r\nDate: Mon, 1 Apr 2013 01:01:01
GMT\r\nContent-Type: text/plain\r\nContent-Length: 25\r\n\r\n
Hello from Epoll Server!"""

class EpollServer(object):
    """ A socket server using Epoll"""
    def __init__(self, host=SERVER_HOST, port=0):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind((host, port))
        self.sock.listen(1)
        self.sock.setblocking(0)
        self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        print ("Started Epoll Server")
        self.epoll = select.epoll()
        self.epoll.register(self.sock.fileno(), select.EPOLLIN)
    def run(self):
        """Executes epoll server operation"""
        try:
            connections = {}; requests = {}; responses = {}
            while True:
                events = self.epoll.poll(1)
                for fileno, event in events:
                    if fileno == self.sock.fileno():
                        connection, address = self.sock.accept()
                        connection.setblocking(0)
                        self.epoll.register(connection.fileno(),
                        select.EPOLLIN)
                        connections[connection.fileno()] = connection
                        requests[connection.fileno()] = b''
                        responses[connection.fileno()] = SERVER_RESPONSE
                     elif event & select.EPOLLIN:
                        requests[fileno] += connections[fileno].recv(1024)
                        if EOL1 in requests[fileno] or EOL2
                        in requests[fileno]:
                            self.epoll.modify(fileno, select.EPOLLOUT)
                            print('-'*40 + '\n' + requests[fileno].decode() [:-2])
                    elif event & select.EPOLLOUT:
                        byteswritten = connections[fileno].
                        send(responses[fileno])
                        responses[fileno] = responses[fileno][byteswritten:]
                        if len(responses[fileno]) == 0:
                            self.epoll.modify(fileno, 0)
                            connections[fileno].shutdown(socket.SHUT_RDWR)
                    elif event & select.EPOLLHUP:
                        self.epoll.unregister(fileno)
                        connections[fileno].close()
                        del connections[fileno]
        finally:
            self.epoll.unregister(self.sock.fileno())
            self.epoll.close()
            self.sock.close()

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Socket Server Example with Epoll')
    parser.add_argument('--port', action="store", dest="port", type=int, required=True)
    given_args = parser.parse_args()
    port = given_args.port
    server = EpollServer(host=SERVER_HOST, port=port)
    server.run()
		

Если вы исполните данный сценарий и получите доступ к данному веб серверу из своего браузера, например, в случае с Google Chrome или Mozilla Firefox введя http://localhost:8800/, в вашей консоли будет отображён следующий вывод:


$ python 2_4_simple_web_server_with_epoll.py --port=8800
Started Epoll Server
----------------------------------------
GET / HTTP/1.1
Host: localhost:8800
Connection: keep-alive
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36
Accept:
text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
DNT: 1
Accept-Encoding: gzip, deflate, sdch, br
Accept-Language: en-US,en;q=0.8
----------------------------------------
GET /favicon.ico HTTP/1.1
Host: localhost:8800
Connection: keep-alive
User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36
Accept: image/webp,image/*,*/*;q=0.8
DNT: 1
Referer: http://localhost:8800/
Accept-Encoding: gzip, deflate, sdch, br
Accept-Language: en-US,en;q=0.8
		

Также мы будем в состоянии увидеть в своём браузере следующую строку:


Hello from Epoll Server!
		

Приводимый ниже снимок экрана отображает этот сценарий:

 

Рисунок 2


Простой веб сервер: терминал и браузер

Как это работает

В нашем конструкторе EpollServer создаётся некий сервер сокета и привязывается к локальному хосту для заданного порта. Данный сокет сервера установлен в определённый режим без блокирования (setblocking(0)). Имеющаяся опция TCP_NODELAY также установлена таким образом, чтобы наш сервер мог обмениваться данными без буферизации (как это имеет место в случае некоторого соединения SSH). Далее, создаётся определённый экземпляр select.epoll() и файловый дескриптор его сокета передаётся данному экземпляру для помощи в мониторинге.

В имеющемся методе run() нашего веб сервера запускается приём событий данного сокета. Эти события обозначены следующим образом:

  • EPOLLIN: Данный сокет считывает события

  • EPOLLOUT: Данный сокет записывает события

В данном случае некоторого сервера сокета он устанавливает свой отклик SERVER_RESPONSE. Когда данный сокет имеет какое- то соединение, которое желает записать данные, он может сделать это изнутри варианта события EPOLLOUT. Возникшее событие EPOLLHUP сигнализирует некое не ожидаемое закрытие какого- то сокета, которое случилось по причинам внутренних ошибок.

Мультиплексирование сервера echo с помощью библиотеки Diesel concurrent

Иногда вам бывает нужно написать некое большое индивидуальное сетевое приложение, которому требуется избегать повтора коди инициализации сервера, который создаёт некий сокет, привязывает к нему какой- то адрес, прослушивание и обработку основных ошибок. Имеется множество сетевых библиотек Python, которые помогут вам избежать стандартных вставок кода. В нашем случае мы исследуем такую библиотеку с названием Diesel.

Приготовление

Diesel применяет не блокируемую технологию с сопрограммами для эффективного создания сетевых серверов. Как постулируется на её вебсайте, ядро Diesel представляет собой некий цикл плотных событий, которые используют epoll для предоставления практически линейной зависимости производительности для соединений общим числом до 10 000 и более. Здесь мы представляем Diesel с неким простым сервером echo. Вам может понадобиться библиотека Diesel 3.0 или любая более поздняя версия. Вы можете сделать это с помощью команды pip:


$ pip install diesel
		

Если вы испытываете некие проблемы с установкой, убедитесь что вы имеете установленными все необходимые зависимости. Следующая команда может исправить большую часть таких ошибок:


$ sudo apt-get install build-essential libssl-dev libffi-dev python-dev
		

Вам может понадобиться испольнить это от имени суперпользователя в ваших настройках операционной системы, так как diesel устанавливает некие критически важные зависимости, такие как модуль cryptography, требующий для установки привилегий администратора.

Diesel имеет некие проблемы зависимости в Python 3. Его установка и подготовка к работе более простая в Python 2.

Вы можете установить diesel следующим образом:


$ sudo su
# pip install diesel
		

Это отобразит некую приводимую ниже регистрацию при установке diesel:


Collecting diesel
Requirement already satisfied: http-parser>=0.7.12 in
/usr/local/lib/python3.5/dist-packages (from diesel)
Requirement already satisfied: flask in /usr/local/lib/python3.5/distpackages (from diesel)
Requirement already satisfied: greenlet in /usr/local/lib/python3.5/distpackages (from diesel)
Requirement already satisfied: twiggy in /usr/local/lib/python3.5/distpackages (from diesel)
Requirement already satisfied: dnspython in /usr/local/lib/python3.5/distpackages (from diesel)
Collecting pyopenssl (from diesel)\
Using cached pyOpenSSL-17.0.0-py2.py3-none-any.whl
Requirement already satisfied: Werkzeug>=0.7 in
/usr/local/lib/python3.5/dist-packages (from flask->diesel)
Requirement already satisfied: Jinja2>=2.4 in /usr/lib/python3/distpackages (from flask->diesel)
Requirement already satisfied: itsdangerous>=0.21 in
/usr/local/lib/python3.5/dist-packages (from flask->diesel)
Requirement already satisfied: click>=2.0 in /usr/local/lib/python3.5/distpackages (from flask->diesel)
Requirement already satisfied: six>=1.5.2 in /usr/lib/python3/dist-packages
(from pyopenssl->diesel)
Collecting cryptography>=1.7 (from pyopenssl->diesel)
Using cached cryptography-1.9.tar.gz
Requirement already satisfied: MarkupSafe in /usr/lib/python3/dist-packages
(from Jinja2>=2.4->flask->diesel)
Requirement already satisfied: idna>=2.1 in /usr/local/lib/python3.5/distpackages (from cryptography>=1.7->pyopenssl->diesel)
Requirement already satisfied: asn1crypto>=0.21.0 in
/usr/local/lib/python3.5/dist-packages (from
cryptography>=1.7->pyopenssl->diesel)
Requirement already satisfied: cffi>=1.7 in /usr/local/lib/python3.5/distpackages (from cryptography>=1.7->pyopenssl->diesel)
Requirement already satisfied: pycparser in /usr/local/lib/python3.5/distpackages (from cffi>=1.7->cryptography>=1.7->pyopenssl->diesel)
Building wheels for collected packages: cryptography
Running setup.py bdist_wheel for cryptography ... done
Stored in directory:
/root/.cache/pip/wheels/ff/a5/ef/186bb4f6a89ef0bb8373bf53e5c9884b96722f0857bd3111b8
Successfully built cryptography
Installing collected packages: cryptography, pyopenssl, diesel
Found existing installation: cryptography 1.2.3
Uninstalling cryptography-1.2.3:
Successfully uninstalled cryptography-1.2.3
Successfully installed cryptography-1.9 diesel-3.0.24 pyopenssl-17.0.0
 	   

Как это сделать

В инфраструктуре Diesel Python приложения инициализируются неким экземпляром имеющегося класса Application() и каким- то обработчиком событий, регистрируемого этим экземпляром. Давайте посмотрим насколько просто написать такой сервер echo.

 

Листинг 2.5 отображает код определённого сервера echo следующим образом:


#!/usr/bin/env python
# Книга рецептов сетевого программирования Python, второе издание -- Глава - 2
# Данная программа оптимизирована под Python 2.7.12. 
# Она может работать с Python 3.5.2 после устранения проблем необходимых для Diesel зависимостей.
# Она может исполняться во всех прочих версиях с изменениями и/или без них.
# Также вам может потребоваться библиотека Diesel 3.0 или более поздней версии.
# Убедитесь что заблаговременно установили все необходимые зависимости.

import diesel
import argparse

class EchoServer(object):
    """ An echo server using diesel"""

    def handler(self, remote_addr):
        """Runs the echo server"""
        host, port = remote_addr[0], remote_addr[1]
        print ("Echo client connected from: %s:%d" %(host, port))
        while True:
            try:
                message = diesel.until_eol()
                your_message = ': '.join(['You said', message])
                diesel.send(your_message)
            except Exception as e:
                print ("Exception:",e)

def main(server_port):
    app = diesel.Application()
    server = EchoServer()
    app.add_service(diesel.Service(server.handler, server_port))
    app.run()

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Echo server example with Diesel')
    parser.add_argument('--port', action="store", dest="port", type=int, required=True)
    given_args = parser.parse_args()
    port = given_args.port
    main(port)
 	   

Если вы исполните этот код, ваш сервер отобразит следующий вывод:


$ python 2_5_echo_server_with_diesel.py --port=8800
[2017/06/04 13:37:36] {diesel} WARNING|Starting diesel <hand-rolled select.epoll>
Echo client connected from: 127.0.0.1:57506
		

В другом окне консоли может быть запущен другой клиент telnet и наше сопровождаемое echo на наш сервер сообщение может быть проверено следующим образом:


$ telnet localhost 8800
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Привет сервер Diesel ?
You said: Привет сервер Diesel ?
		

Следующий снимок экрана иллюстрирует взаимодействие с сервером чата Diesel:

 

Рисунок 3


Сервер чата и Telnet

Как это работает

Наш сценарий получил некий аргумент командной строки для --port и передал его в функцию main(), в которой было запущено и выполняется наше приложение Diesel.

Diesel имеет некое понятие службы, когда какое- то приложение может сториться со многими службами. EchoServer имеет метод handler(). Это позволяет данному серверу иметь дело с отдельными клиентскими соединениями. Метод Service() получает такой метод handler и некий номер порта для исполнения указанной службы.

Внутри такого метода handler() мы определяем само поведение данного сервера. В таком случае данный сервер просто возвращает данное текстовое сообщение.

Если мы сравним данный код с кодом Главы 1, Сокеты, IPv4 и примеры программирования клиент/ сервер в нашем рецепте Написание простого приложения клиент/ сервер echo TCP (Листинг 1.13a), станет совершенно ясно что нам нет нужды писать какой бы то ни было шаблонный код и следовательно будет очень просто сосредоточиться на логике приложения более высокого уровня.