Глава 18. Построение сервера с нуля

В этой главе мы проанализируем более развитое приложение программирования совместной обработки; построение с нуля некоего работающего неблокируемого сервера. Мы охватим сложные применения имеющегося модуля socket, такие как изоляция бизнес логики конкретного пользователя от обратных вызовов и написание конкретной логики обратного вызова внутри встраиваемых генераторов, причём оба экземпляра разработаны для одновременного исполнения. Мы также обсудим конкретное использование ключевых слов await и yield, воспользовавшись неким примером.

В этой главе будут рассмотрены такие темы:

  • Применение выразительного API из соответствующего модуля socket для построения с нуля некоторого сервера

  • Основные вопросы о генераторах Python и асинхронных генераторах

  • Как применять встроенные генераторы с ключевыми словами await и yield для преобразования некоего сервера с блокировкой в серевр без блокирований

Технические требования

Вот перечень предварительных требований для данной главы:

  • Убедитесь что на вашем компьютере уже установлен Python 3

  • Вам следует иметь установленными OpenCV и NumPy для вашего дистрибутива Python 3

  • Выгрузите необходимый репозиторий из GitHub

  • На протяжении данной главы мы будем работать с вложенной папкой, имеющей название Chapter18

  • Ознакомьтесь со следующими видеоматериалами Code in Action

Сетевое программирование на нижнем уровне при помощи модуля socket

В этой главе для построения своего рабочего сервера мы будем применять модуль socket, который является встроенной библиотекой в Python. Этот самый модуль socket является наиболее часто употребимым для реализации протоколов взаимодействия на нижнем уровне, предоставляя интуитивно ясные варианты для управления таким протоколами. В данном разделе мы сделаем введение в свой процесс реализации лежащей в основе сервера архитектуры нижнего уровня, а также в ключевые методы и свойства данного модуля, которые будут использованы позднее.

Отметим, что для того чтобы успешно следовать всем примерам из этой главы, вам необходимо установить в своей системе программу telnet. Telnet является некоторой программой, которая предоставляет команды терминала, содействующие протоколам в двунаправленном, интерактивном, основанном на тексте взаимодействии. Мы обсудили установку telnet в Главе 11, Построение каналов взаимодействия при помощи asyncio; если у вас ещё не установлен в вашей системе Telnet, просто обратитесь к этой главе (и выполните указанные в ней инструкции).

Заметим, что системы macOS имеют предварительно установленной альтернативу Telnet с названием Netcat. Если вы не желаете устанавливать Telnet в своём компьютере macOS, просто пользуйтесь командой nc вместо telnet во всех последующих примерах и вы получите тот же самый эффект.

Теория взаимодействия стороны сервера

В Главе 11, Построение каналов взаимодействия при помощи asyncio вы уже вкратце сталкивались с реализацией асинхронных каналов взаимодействия на верхнем уровне с применением модуля aiohttp. В этом разделе мы погрузимся глубже в саму структуру программирования канала взаимодействия со стороны сервера и то как он способен взаимодействовать со своими клиентами неким действенным образом.

В области сетевого программирования некий сокет определяется как какая- то теоретическая оконечная точка определённой вычислительной сетевой среды. Такой сокет отвечает за получение и отправку данных из конкретного узла, в котором он находится. Тот факт что этот сокет является уникальным для определённого узла, который им владеет, означает, что прочие узлы в той же самой вычислительной сети теоретически не имеют возможности взаимодействовать с данным сокетом. Иными словами, данный сокет доступен только своему соответствующему узлу.

Чтобы открыть некий канал взаимодействия со своей серверной стороны некий программист должен вначале создать некий сокет и связать его с определённым адресом. Такой адрес обычно является некой парой значений, содержащей информацию о самом хосте и порте для данного сервера. Затем, посредством такого сокета, наш сервер начинает выполнять ожидание любых потенциальных запросов на взаимодействие, создаваемых его клиентами в рассматриваемой сетевой среде. Всякий запрос от какого- то клиента на подключение к данному серверу должен, следовательно, поступать через такой созданный сокет.

По получению некого запроса на подключение от какого- то потенциального клиента, данный сервер может принять решение принимать ли этот запрос. После этого будет установлено некое соединение между двумя системами в рассматриваемой сетевой среде, что означает, что они могут начать взаимодействовать и разделять данные друг с другом. Когда некий клиент отправляет какое- то сообщение в свой сервер через созданный канал взаимодействия, после этого такой сервер обрабатывает полученное сообщение и в конце концов отправляет некий отклик обратно своему клиенту через тот же самый канал; данный процесс продолжается пока не завершится соединение между ними, либо когда один из них покинет данный канал соединения, либо по причине каких- то внешних факторов.

Всё предыдущее составляет основу процесса создания некоего сервера и установления соединений с потенциальными клиентами. Существует множество мер безопасности, реализуемых на каждой стадии данного процесса, хотя они и не являются предметом обсуждения здесь. Ниже приводится схема, которая также устанавливает соответствие только что описанного нами процесса:

 

Рисунок 18-1


Сетевое программирование посредством сокетов

Отметим, что для создания некоего запроса на подключение к конкретному серверу, некий потенциальный клиент также должен инициировать свой собственный сокет для такого канала взаимодействия (что показано на приводимой схеме). Опять же, в данном процессе нас интересует собственно теория стороны сервера, а следовательно, мы здесь не обсуждаем элементы стороны клиента.

API модуля socket

В этом разделе мы исследуем ключевой API модуля socket для получения той же самой функциональности, которую мы обсуждали ранее. Как мы уже упоминали, сам модуль socket поступает уже встроенным в любом дистрибутиве Python 3, поэтому мы можем просто импортировать этот модуль в свою программу без необходимости запуска команд инсталляции.

Для создания некоего сокета мы воспользуемся методом socket.socket(), который возвращает какой- то объект сокета. Этот объект именно то с чем мы будем работать на протяжении всего процесса реализации различных протоколов взаимодействия. Кроме того, методы сокета имеют следующие методы, помогающие нам управлять такими протоколами взаимодействия:

  • socket.bind(): Этот метод связывает вызываемый сокет с тем адресом, который передаётся в данный метод. В наших примерах мы будем передавать некий кортеж, содержащий сам адрес хоста и его значение порта для создаваемого канала взаимодействия.

  • socket.listen(): Этот метод позволит создаваемому нами серверу принимать соединения от потенциальных клиентов. В данный метод может быть передан другой необязательный параметр положительного целого для задания общего числа допустимых неудачных соединений прежде чем наш сервер отклоняет новые подключения. Мы будем применять 5 в качестве некоторого произвольного числа для данного метода в своих последующих примерах.

  • socket.accept(): Данный метод, как и предполагает его название, принимает некое конкретное подключение, которое имеет данный вызываемый сокет. Такой вызываемый объект должен вначале быть привязан к некому адресу и находиться в ожидании соединений для вызова данного метода. Данный метод также возвращает некую пару значений, (conn, address), причём conn будет тем новым объектом, который принял данное подключение и способен отправлять и получать данные, а address будет тем самым адресом, который расположен на другом конце соединения.

  • socket.makefile(): Метод возвращает некий объект file, который связан с самим вызываемым объектом socket. Мы будем применять этот метод для создания файла, который содержит данные, отправляемые от полученных клиентов в наш сервер. Данный объект file также требует надлежащего закрытия с помощью метода close().

  • socket.sendall(): Этот метод отправит все данные, передаваемые в качестве параметра в сам вызываемый объект socket. Мы будем пользоваться этим методом для отправки данных обратно тем клиентам, которые подключены к нашему серверу. Отметим, что этот метод получает данные в байтах, поэтому мы будем передавать в своих примерах передавать в данный метод строки байт.

  • socket.close(): Метод помечает вызываемый объект socket как закрытый. После данной операции все применяемые к данному объекту socket операции будут завершаться неудачей. Это будет применяться при прекращении работы нашего сервера.

Построение образца сервера echo

Самый лучший способ понять применение тех методов и функций, что были описаны ранее, состоит в том чтобы посмотреть на них в действии в какой- то простой программе. Данный сервер , как и предполагает его название отправляет нечто получаемое им от каждого клиента обратно этому клиенту. На данном примере мы изучим как установить некий функционирующий сервер, а также как обрабатывать подключения клиентов и данные от них, а в последующих разделах мы построим более сложные серверы.

Прежде чем мы ринемся кодировать, однако, давайте обсудим саму структуру своей программы в которой мы реализуем саму логику взаимодействия для данного сервера. Прежде всего на получим то, что именуется реактором и что настраивает сам по себе сервер и предоставляет необходимую логику всякий раз когда некое новое подключение запрашивается со стороны потенциальных клиентов. В частности, после того как наш сервер настроен, данные реактор будет действовать в бесконечном цикле и обрабатывать все запрашиваемые подключения, получаемые этим сервером.

Если вы прочли предыдущие главы, относящиеся к асинхронному программированию, вы можете представлять себе этот реактор в виде некоего цикла событий. Такой цикл событий обходит все имеющиеся события, которые подлежат обработке (в том случае когда они запрошены), и обрабатывает их по одному при помощи обработчика событий. Приводимая ниже схема выступает в роли дополнительной иллюстрации описанного процесса:

 

Рисунок 18-2


Некий цикл событий в сетевом программировании

Далее, вторая часть нашей программы, по аналогии с циклом событий, является обработчиком событий, который содержит саму бизнес логику пользователя: как обрабатывать полученные от клиентов данные и что отправлять обратно каждому из них. Для нашего текущего примера, поскольку в его роли выступает сервер эхо, мы отправляем обратно каждому клиенту именно то, что было в свою очередь отправлено серверу (если эти данные допустимы).

Имея на уме данную структуру, давайте перейдём к самой реальной реализации такого сервера. Выгрузите необходимый для данной главы код со страницы Github для данной книги и переместитесь в папку Chapter18. Тот сценарий, который интересует нас на это раз, это следующий файл Chapter18/example1.py:


# Chapter18/example1.py

import socket

# Main event loop
def reactor(host, port):
    sock = socket.socket()
    sock.bind((host, port))
    sock.listen(5)
    print(f'Server up, running, and waiting for call on {host} {port}')

    try:
        while True:
            conn, cli_address = sock.accept()
            process_request(conn, cli_address)

    finally:
        sock.close()

def process_request(conn, cli_address):
    file = conn.makefile()

    print(f'Received connection from {cli_address}')

    try:
        while True:
            line = file.readline()
            if line:
                line = line.rstrip()
                if line == 'quit':
                    conn.sendall(b'connection closed\r\n')
                    return

                print(f'{cli_address} --> {line}')
                conn.sendall(b'Echoed: %a\r\n' % line)
    finally:
        print(f'{cli_address} quit')
        file.close()
        conn.close()

if __name__ == '__main__':
    reactor('localhost', 8080)
 	   

Данная программа структурирована именно так, как мы обсудили это ранее: некий реактор и какая- то бизнес- логика обработки (наша функция process_request()). Во- первых, реактор настраивает сам сервер (создавая некий сокет, привязывая его к соответствующим параметрам хоста и адресу порта и вызывая метод listen()). Затем он переходит в бесконечный цикл и оказывает содействие всем потенциальным подключениям с клиентом, вначале принимая само соединение вызывая метод accept() в своём объекте socket, а затем вызывая нашу функцию process_request(). Этот реактор также отвечает за закрытие самого объекта socket в случае возникновения какй бы то ни было ошибки на протяжении предыдущего процесса.

Наша функция process_request(), с другой стороны, для начала создаёт некий объект file, связанный с тем сокетом, который ей передан. Опять же, такой объект file применяется нашим сервером для считывания данных от своего клиента, который соединён через данный конкретный сокет. В частности, после создания такого объекта file данная функция перейдет в другой бесконечный цикл, который остаётся считывающим из полученного объекта file при помощи функции readline(). Если считанные данные из этого файла допустимы, мы отправляем обратно эти же самые данные при помощи соответствующего метода sendall().

Мы также выводим на печать то, что наш сервер получает от своих клиентов в виде вывода данного сервера, в том числе строку print(f'{cli_address} --> {line}'). Ещё одним дополнительным предписанием является то, что если считанные из соответствующего файла данные эквивалентны строке quit, тогда нам нужно аккуратно обработать сам по себе свой объект socket и связанный с ним объект file при помощи метода close() для них обоих.

Наконец, по завершению нашей программы мы просто вызываем саму функцию reactor() и передаём ей информацию о нашем сервере. В данном случае мы просто применяем интерфейс обратной петли своего сервера с портом 8080. Теперь мы исполним этот сценарий для инициализации своего локального сервера. Ваш вывод должен быть аналогичен следующему:


> python3 example1.py
Server up, running, and waiting for call on localhost 8080
		

На данный момент наш сервер поднят и исполняется (на что указывает полученный вывод). Теперь мы бы хотели создать неких клиентов для данного сервера. Чтобы выполнить это, откройте другое окно Терминала и воспользуйтесь программой telnet для соединения со своим запущенным сервером, выполнив telnet localhost 8080. Ваш вывод будет походить на следующее:


> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
		

Данный вывод означает, что наш клиент Telnet успешно подключился к тому серверу, что мы создали. Теперь мы можем проверить будет ли наш сервер обрабатывать свои запросы тем образом, который мы ожидаем от него. В частности, введите некие данные и нажмите return или Enter для отправки их в ваш сервер и вы обнаружите, что ваш клиент получит некое сообщение в виде эха от этого сервера, именно тем образом, как мы это реализовали в предыдущей функции process_request(). И снова, некий клиент может прекратить подключение к данному серверу отправив соответствующую строку quit в сам сервер.

Приводимый далее код показывает мой вывод после ввода нескольких фраз:


> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
Echoed: 'hello'
nice
Echoed: 'nice'
fdkgsnas
Echoed: 'fdkgsnas'
quit
connection closed
Connection closed by foreign host.
		

Просмотрев вывод нашего сервера вы также можете увидеть что произошло при данном подключении:


> python3 example1.py
Server up, running, and waiting for call on localhost 8080
Received connection from ('127.0.0.1', 59778)
('127.0.0.1', 59778) --> hello
('127.0.0.1', 59778) --> nice
('127.0.0.1', 59778) --> fdkgsnas
('127.0.0.1', 59778) quit
		

Наш сервер, как уже упоминалось, разработан для постоянной работы в некотором реакторе как в цикле событий, который может быть остановлен некой исключительной ситуацией KeyboardInterrupt.

Мы успешно реализовали свой первый сервер эхо, воспользовавшись методами нижнего уровня, предоставленными в модуле socket. В своём следующем разделе мы реализуем более развитую функциональность для своего сервера и проанализируем сам процесс его преобразования в некий сервер без блокировки, который способен обрабатывать одновременно множество клиентов.

Построение сервера калькулятора при помощи модуля socket

Та функциональность, которую мы пытаемся реализовать состоит в наличии простого обработчика запросов, который вычисляет либо сумму, либо произведение перечня целых и который включён в отправках со стороны клиентов. В частности, если клиент отправляет в наш сервер 1, 2, 4, затем сервер должен вернуть обратно 7 если он обязан вычислять суммы или 8 если он должен вычислять произведения.

Каждый сервер реализует некую форму обработки данных дополнительно к обработке запросов, поступающих от клиентов отправляет получаемые результаты таких задач обработки данных этим клиентам. Данный прототип будет тем самым служить неким первым строительным блоком для более всесторонних серверов с последующими сложными функциями.

Лежащая в основе логика вычислений

Для выделения элементов, которые мы отделяем в строке особым символом мы будем пользоваться методом Pyhton для строк split(). Таким образом мы будем требовать чтобы все поступающие от клиентов данные были представлены в формате таким образом (целые, разделяемые запятыми) и, если клиент отправляет нечто не соответствующее устанавливаемому формату, мы будем просто отправлять обратно сообщение об ошибке и требовать от него сделать новое сообщение.

Базовая логика вычислений содержится в следующем файле Chapter18/example2.py:


# Chapter18/example2.py

from operator import mul
from functools import reduce

try:
    while True:
        line = input('Please enter a list of integer, separated by commas: ')
        try:
            nums = list(map(int, line.split(',')))
        except ValueError:
            print('ERROR. Enter only integers separated by commas')
            continue

        print('Sum of input integers', sum(nums))
        print('Product of input integers', reduce(mul, nums, 1))

except KeyboardInterrupt:
    print('\nFinished.')
 	   

Повторим, что мы применяем соответствующий метод split() с аргументом ',' для выделения конкретного индивидуального числа в некоторой заданной строке. Наша функция sum(), очевидно, применяется для вычисления суммы параметрического списка чисел. Для вычисления агрегированного произведения нам необходимо импотрировать метод mul() (для умножения) из модуля operator чтобы применять произведение к каждому из элементов рассматриваемого перечня, а также мы применяем метод reduce() из модуля functools для применения к каждому из элементов в данном находящемся в рассмотрении списке чисел.

В качестве замечания за скобками, третий передаваемый в метод reduce() аргумент (число 1) является стартовым значением для нашего понижающего процесса. Для получения дополнительных сведений о понижающих операциях прочтите Главе 7, Понижающие операторы в процессах, если вы этого ещё не сделали.

Что касается нашего реального сервера, мы также будем отслеживать режим вычислений. Это значение режима вычислений, которое по умолчанию установлено на исполнение суммирования, диктует должен ли наш сервер исполнять суммирование или умножение для входного спсика чисел. Этот режим также уникален для каждого подключения клиента и может переключаться по таким клиентам. В частности, если если отправляемыми конкретным клиентом клиентом данными является строка со значением sum, тогда мы переключимся на режим вычислений суммированием и то же самое происходит со строкой product.

Реализация сервера калькулятора

Теперь давайте рассмотрим полную реализацию своего сервера в своём файле Chapter18/example3.py:


# Chapter18/example3.py

import socket
from operator import mul
from functools import reduce

# Main event loop
def reactor(host, port):
    sock = socket.socket()
    sock.bind((host, port))
    sock.listen(5)
    print(f'Server up, running, and waiting for call on {host} {port}')

    try:
        while True:
            conn, cli_address = sock.accept()
            process_request(conn, cli_address)

    finally:
        sock.close()

def process_request(conn, cli_address):
    file = conn.makefile()

    print(f'Received connection from {cli_address}')
    mode = 'sum'

    try:
        conn.sendall(b'<welcome: starting in sum mode>\n')
        while True:
            line = file.readline()
            if line:
                line = line.rstrip()
                if line == 'quit':
                    conn.sendall(b'connection closed\r\n')
                    return

                if line == 'sum':
                    conn.sendall(b'<switching to sum mode>\r\n')
                    mode = 'sum'
                    continue
                if line == 'product':
                    conn.sendall(b'<switching to product mode>\r\n')
                    mode = 'product'
                    continue

                print(f'{cli_address} --> {line}')
                try:
                    nums = list(map(int, line.split(',')))
                except ValueError:
                    conn.sendall(
                        b'ERROR. 
                        Enter only integers separated by commas\n')
                    continue

                if mode == 'sum':
                    conn.sendall(b'Sum of input numbers: %a\r\n'
                        % str(sum(nums)))
                else:
                    conn.sendall(b'Product of input numbers: %a\r\n'
                        % str(reduce(mul, nums, 1)))
    finally:
        print(f'{cli_address} quit')
        file.close()
        conn.close()

if __name__ == '__main__':
    reactor('localhost', 8080)
 	   

Соответствующая компонента реактора в нашем сервере остаётся той же самой, что была в нашем предыдущем примере, так как наш цикл событий обслуживает ту же самую логику. В нашей части бизнес логики пользователя (соответствующая функция process_request()), причём мы всё ещё применяем объекты file возвращаемые из метода makefile() для получения данных, отправляемых клиентами в свой сервер. Если некий клиент отправляет строку содержащую quit, тогда соединение между таким клиентом и его сервером будет всё же остановлено.

Самым первым новым элементом в данной прорамме является значение локальной переменной mode в нашей функции process_request(). Эта переменная определяет устанавливаемый режим вычислений, который мы обсуждали ранее и имеет устанавливаемым по умолчанию значение sum. Как вы видите в самом конце соответствующего блока try в нашей функции process_request() данная переменная принимает решение какой вид данных следует отправлять обратно её текущему клиенту:


if mode == 'sum':
    conn.sendall(b'Sum of input numbers: %a\r\n'
        % str(sum(nums)))
else:
    conn.sendall(b'Product of input numbers: %a\r\n'
        % str(reduce(mul, nums, 1)))
 	   

Кроме того, если отправленные от некоторого клиента данные эквивалентны значению строки sum, тогда значение переменной mode будет установлено в sum и то же самое касается строки product. Наш клиент также получит некое уведомляющее сообщение что его режим вычислений был изменён. Данная логика содержится в следующей части кода:


if line == 'sum':
    conn.sendall(b'<switching to sum mode>\r\n')
    mode = 'sum'
    continue
if line == 'product':
    conn.sendall(b'<switching to product mode>\r\n')
    mode = 'product'
    continue
 	   

Теперь давайте рассмотрим как данный сервер работает в реальном эксперименте. Исполним данную программу чтобы запустить свой сервер и вы должны обнаружить вывод, аналогичный тому, что мы уже наблюдали в своём предыдущем примере:


> python3 example3.py
Server up, running, and waiting for call on localhost 8080
		

И снова, мы воспользуемся Telnet для создания клиентов этого сервера мы применим Telnet. После того как вы подключитесь к своему серверу через клиента Telnet, попробуйте ввести некие данные для проверки той логики сервера, которую мы реализовали. Ниже приводится тот код что я получил с различными типами входных данных:


> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
<welcome: starting in sum mode>
1,2
Sum of input numbers: '3'
4,9
Sum of input numbers: '13'
product
<switching to product mode>
0,-3
Product of input numbers: '0'
5,-9,10
Product of input numbers: '-450'
hello
ERROR. Enter only integers separated by commas
a,1
ERROR. Enter only integers separated by commas
quit
connection closed
Connection closed by foreign host.
		

Как вы можете видеть, наш сервер может обрабатывать те запросы, которые мы ожидали. В частности, он может вычислять и сумму и произведение для заданной в верном формате строки на входе; он способен надлежащим образом переключать свой режим вычислений; а также он может отправлять сообщения об ошибках своим клиентам если полученная на входе строка неверно форматирована. И вновь, такой всегда работающий сервер может быть остановлен некой исключительной ситуацией KeyboardInterrupt.

Построение неблокируемого сервера

Ещё одна вещь, которую нам предстоит изучить состоит в том, чтобы тот сервер что мы имеем в настоящий момент не свободен от блокировок. Иными словами, он не способен обрабатывать множество клиентов одновременно. В данном разделе мы исследуем как построить наш текущий сервер чтобы сделать его неблокируемым при помощи ключевых слов Python, которые способствуют программированию совместной обработки дополнительно в функциям нижнего уровня из встроенного модуля socket.

Анализ масштаба одновременных операций сервера

Сейчас мы покажем, что тот сервер что мы построили на текущий момент не способен обрабатывать множество клиентов одновременно. Для начала исполним свой файл Chapter18/example3.py чтобы запустить свой сервер снова следующим образом:


> python3 example3.py
Server up, running, and waiting for call on localhost 8080
		

Аналогично тому что мы делали в предыдущих примерах давайте теперь откроем другой Терминал и воспользуемся Telnet в своём запущенном сервере:


> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
<welcome: starting in sum mode>
		

Чтобы создать своего второго клиента для данного сервера откройте другой Терминал и наберите ту же самую команду telnet следующим образом:


> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
		

Уже здесь мы видим, что наш сервер не обрабатывает этого второго клиента должным образом: он не отправляет обратно своё приветственное сообщение (<welcome: starting in sum mode>) для этого клиента. Если мы заглянем в соответствующий вывод своего сервера, мы также можем обнаружить что он зарегистрировал только одного клиента - а именно первого из двух имеющихся:


> python3 example3.py
Server up, running, and waiting for call on localhost 8080
Received connection from ('127.0.0.1', 61099)
		

Далее мы попробуем сделать ввод из обоих своих клиентов. Мы обнаружим, что наш сервер успешно обрабатывает только запросы от своего самого первого клиента. В частности, ниже приведён мой вывод от самого первого клиента с различными типами ввода:


> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
<welcome: starting in sum mode>
hello
ERROR. Enter only integers separated by commas
1,5
Sum of input numbers: '6'
product
<switching to product mode>
6,7
Product of input numbers: '42'
		

Теперь имея своего первого клиента всё ещё поддерживающим подключение к серверу переключимся в Терминал второго нашего клиента и попробуем ввести его собственный ввод. Вы обнаружите, что в отличии от первого клиента этот клиент не получает обратно сообщения от сервера:


> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
1,5
product
6,7
		

Мы смотрим вывод самого сервера и видим, что этот сервер обрабатывает только запросы от самого первого клиента:


> python3 example3.py
Server up, running, and waiting for call on localhost 8080
Received connection from ('127.0.0.1', 61099)
('127.0.0.1', 61099) --> hello
('127.0.0.1', 61099) --> 1,5
('127.0.0.1', 61099) --> 6,7
		

Единственный способ которым наш второй клиент способен взаимодействовать с сервером состоит в том, чтобы наш первый клиент отключился от своего сервера - иными словами, когда мы прекратим само соединение между этим первым клиентом и его сервером следующим образом:


> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
<welcome: starting in sum mode>
hello
ERROR. Enter only integers separated by commas
1,5
Sum of input numbers: '6'
product
<switching to product mode>
6,7
Product of input numbers: '42'
quit
connection closed
Connection closed by foreign host.
		

Теперь, если вы переключитесь в Терминал второго клиента, вы обнаружите что этот клиент будет наводнён сообщениями от своего сервера, которые он должен был получить ранее:


> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
1,5
product
6,7
<welcome: starting in sum mode>
ERROR. Enter only integers separated by commas
Sum of input numbers: '6'
<switching to product mode>
Product of input numbers: '42'
		

Все соответствующие отклики от этого сервера теперь присутствуют, но они были отправлены все скопом за раз, а не после ввода каждого сообщения. Тот же самый всплеск информации также иллюстрируется в соответствующем выводе нашего Терминала сервера следующим образом:


> python3 example3.py
Server up, running, and waiting for call on localhost 8080
Received connection from ('127.0.0.1', 61099)
('127.0.0.1', 61099) --> hello
('127.0.0.1', 61099) --> 1,5
('127.0.0.1', 61099) --> 6,7
('127.0.0.1', 61099) quit
Received connection from ('127.0.0.1', 61100)
('127.0.0.1', 61100) --> hello
('127.0.0.1', 61100) --> 1,5
('127.0.0.1', 61100) --> 6,7
		

Этот вывод выглядит так, как если бы наш сервер получил соответствующее подключение от своего второго клиента после того как вышел первый клиент, хотя на самом деле мы создали двух клиентов и заставили их взаимодействовать с сервером одновременно. Это происходит по той причине, что наш сервер в его текущем состоянии способен обрабатывать только одного клиента за раз и может переключаться на следующего запросившего канал взаимодействия клиента только после того как этот клиент выйдет. Мы называем это блокирующим сервером.

Генераторы в Python

В своём следующем разделе мы обсудим как преобразовать имеющий блокирующий сервер который мы имеем в настоящий момент в некий без блокирования, причём с сохранением его вычислительных возможностей. Для осуществления этого нам вначале требуется рассмотреть другое понятие программирования в Python, именуемое генераторами. Есть шансы что вы уже работалис генераторами Python, но для краткого повтора мы в этом разделе пройдёмся по ключевым свойствам генераторов.

Генераторами являются функции, которые возвращают итераторы и которые могут динамически приостанавливаться и повторно запускаться. Возвращаемые генераторами значения часто сравниваются с объектами списков, так как итераторы генератора являются ленивыми (https:/​/​en.​wikipedia.​org/​wiki/​Lazy_​evaluation) и производят результаты только при получении запроса в явном виде. По этой причине итераторы генератора более действенны в смысле управления памятью и следовательно часто являются более предпочтительными нежели списки пр вовлечении больших объёмов данных.

Каждый генератор определяется как некая функция, но вместо применения ключевого слова return внутри её блока функции мы применяем yield, что служит указанием на то, что данное возвращаемое значение всего лишь временное и данный генератор сам по себе может всё ещё быть продолжен после получения данного возвращаемого значения. Давайте рассмотрим как генераторы Python работают в неком примере, содержащемся в следующем файле Chapter18/example4.py:


# Chapter18/example4.py

def read_data():
    for i in range(5):
        print('Inside the inner for loop...')
        yield i * 2

result = read_data()
for i in range(6):
    print('Inside the outer for loop...')
    print(next(result))

print('Finished.')
 	   

Здесь в нашем распоряжении некий генератор с названием read_data(), который возвращает умножения на 2 от 0 до 8, причём неким ленивым образом. Это осуществляется при помощи ключевого слова yield, которое помещается перед тем что подлежит возврату в некоторой в противоположном случае обычной функции: i * 2. Отметим, что данное ключевое слово yield помещается перед самими индивидуальными элементами в самом итераторе, который должен быть возращён обратно, что способствует такой неспешной генерации.

Теперь в своей основной программе мы получили весь итератор целиком и сохраняем его в соответствующей переменной result. Затем мы делаем цикл по такому генератору шесть раз при помощи своей функции next() (которая, как очевидно, возвращает следующий элемент, передаваемый в нашем генераторе). После исполнения данного кода ваш вывод должен походить на следующее:


> python3 example4.py
Inside the outer for loop...
Inside the inner for loop...
0
Inside the outer for loop...
Inside the inner for loop...
2
Inside the outer for loop...
Inside the inner for loop...
4
Inside the outer for loop...
Inside the inner for loop...
6
Inside the outer for loop...
Inside the inner for loop...
8
Inside the outer for loop...
Traceback (most recent call last):
File "example4.py", line 11, in <module>
print(next(result))
StopIteration
		

Вы можете наблюдать, что даже хотя наш итератор был выработан и возвращён из соответствующего генератора read_data() прежде чем мы обошли его циклом, все реальные инструкции внутри этого генератора исполнились только когда мы попробовали получать дополнительные элементы из этого итератора.

Это иллюстрируется тем фактом, что сами операторы вывода на печать в получаемом выводе в противоположном случае были бы помещены друг за другом (один оператор вывода на печать из внешнего цикла for и один из внутреннего цикла for в качестве альтернативы): такой поток исполнения проходит вначале во внешний цикл for, пытается выполнить доступ к своему следующему элементу в данном итераторе, проходит в сам генератор и идёт в свой собственный цикл for. Как только данный поток исполнения достигает самого ключевого слова yield, он возвращается обратно в свою основную программу. Этот процесс продолжается пока не завершится один из имеющихся циклов for; в нашем случае, цикл for из самого генератора остановится первым и мы, следовательно, столкнуться в самом конце с ошибкой StopIteration.

Такая неторопливость в нашей генерации данного итератора проистекает из того факта, что данный генератор прекращает исполнение когда он достигает соответствующее ключевое слово yield и продолжит своё исполнение только когда будет запрошен инструкциями извне (в данном случае из нашей функции next()). И снова, такой вид генерации данных значительно более действенен при управлении памятью чем простая выработка всего что нам понадобится для прохождения в итерациях (например в качестве списка).

Асинхронные генераторы и метод send

Насколько генераторы важны для наших целей построения некоего асинхронного сервера? Основная причина, по которой наш текущий сервер не способен обрабатывать одновременно множество клиентов состоит в том, что наша функция readline(), которую мы применяем в соответствующей части бизнес логики пользователя для получения данных клиента является блокирующей функцией, которая препятствует нашему потоку исполнения для его перемещения по прочим потенциальным клиентам до тех пор пока текущий объект file всё ещё открыт. Именно по этой причине когда наш текущий клиент прекращает своё подключение к данному серверу наш следующий клиент немедленно получает большой всплеск информации, который мы видели ранее.

Если бы мы были способны переписать данную функцию в некую асинхронную, которая допускала бы данному потоку исполнения переключаться между различными клиентами в то время как эти клиенты были бы подключены к этому серверу, такой сервер стал бы не блокирующим. Мы сделаем это при помощи асинхронных генераторов для одновременной выработки данных от потенциального множества клиентов в одно и то же время в нашем сервере.

Для того чтобы рассмотреть лежащую в основе структуру того асинхронного генератора, который мы будем применять в своём сервере давайте вначале рассмотрим следующий файл Chapter18/example5.py:


# Chapter18/example5.py

import types

@types.coroutine
def read_data():
    def inner(n):
        try:
            print(f'Printing from read_data(): {n}')
            callback = gen.send(n * 2)
        except StopIteration:
            pass

    data = yield inner
    return data

async def process():
    try:
        while True:
            data = await read_data()
            print(f'Printing from process(): {data}')
    finally:
        print('Processing done.')

gen = process()
callback = gen.send(None)

def main():
    for i in range(5):
        print(f'Printing from main(): {i}')
        callback(i)

if __name__ == '__main__':
    main()
 	   

Мы всё ещё рассматриваем свою задачу вывода на печать умножений на 2 между 0 и 8. Соответствующая функция process() является в данном примере нашим асинхронным генератором. Вы можете видеть внутри этого генератора на самом деле нет ключевого слова yield; это происходит по той причине, что мы вместо этого применяем соответствующее ключевое слово await. Наш асинхронный генератор отвечает за вывод на печать умножений на 2, вычисляемых другим генератором, read_data().

Соответствующий декоратор, @types.coroutine применяется для преобразования имеющегося генератора read_data() в некую функцию сопрограммы, которая возвращает некую сопрограмму на основе генератора и которая моет всё ещё применяться как обычный генератор, но также может выполнять ожидание. Такая сопрограмма на основе генератора выступает именно тем ключом, который преобразовывает наш блокирующий сервер в свободный от блокировок. Данная сопрограмма выполняет необходимые вычисления при помощи метода send(), который является неким способом для снабжения генератора выводом (в данном случае мы предоставляем свой генератор process() с умножениями на 2).

Данная сопрограмма возвращает некий обратный вызов, который может быть вызван позднее. Именно по этой причине прежде чем обходить в цикле через range(5) в своей основной программе, нам необходимо отсдедить сам генератор process() (сохраняемый в соответствующей переменной gen) и тот обратный вызов, который возвращается (с сохранением в переменной callback). Этот обратный вызов, в частности, является возвращаемым значением gen.send(None), который применяется для запуска самого исполнения соответствующего генератора process(). Наконец, мы просто обходим в цикле уже упомянутый объект range и вызываем соответствующий объект callback с соответствующими данными на входе.

Было много чего сказано о той теории, которая стоит за данным методом использования асинхронных генераторов. Теперь давайте посмотрим на них в действии. Исполните эту программу и вы должны получить следующий вывод:


> python3 example5.py
Printing from main(): 0
Printing from read_data(): 0
Printing from process(): 0
Printing from main(): 1
Printing from read_data(): 1
Printing from process(): 2
Printing from main(): 2
Printing from read_data(): 2
Printing from process(): 4
Printing from main(): 3
Printing from read_data(): 3
Printing from process(): 6
Printing from main(): 4
Printing from read_data(): 4
Printing from process(): 8
Processing done.
		

В данном выводе (в частности, в соответствующих операторах печати) мы всё ещё можем наблюдать те события переключения задач, которые являются квинтэссенцией как для самого асинхронного программирования, которое обсуждалось в более ранних главах, так и для тех генераторов, которые производят ленивый вывод. Что существенно, так это то, что мы получили ту же самую цель что и в своём предыдущем примере (вывод умножений на 2), на здесь мы воспользовались асинхронными генераторами (воспользовавшись ключевыми словами async и await) для содействия событиям переключения задач и мы также были способны передавать особенные параметры в генераторы при помощи некоторого обратного вызова. Эти техники при их соединении формируют ту основную структуру, которая будет применена в нашем в настоящий момент сервер с блокировкой.

Изготовление сервера без блокирования

наконец мы снова рассмотрим оставленную задачу реализации сервера без блокировок. Здесь мы применим те асинхронные генераторы, которые обсуждались ранее для содействия собственно асинхронным чтению и обработке данных, получаемых сервером от клиентов. Весь реальный код для этого сервера содержится в в нашем файле Chapter18/example6.py; мы собираемся обойти различные его части, поскольку это достаточно длинная программа. Давайте переключим своё внимание на те глобальные переменные, которые мы будем иметь в своей программе, вот они:


# Chapter18/example6.py

from collections import namedtuple

###########################################################################
# Reactor

Session = namedtuple('Session', ['address', 'file'])

sessions = {}         # { csocket : Session(address, file)}
callback = {}         # { csocket : callback(client, line) }
generators = {}       # { csocket : inline callback generator }
 	   

Чтобы иметь возможность сопутствовать обслуживанию множества клиентов одновременно, мы позволим своему серверу иметь множество сеансов (по одному для каждого клиента) в одно и то же время и, тем самым, нам понадобится отслеживать множество словарей, причём каждый из них будет отслеживать один определённый кусочек информации о текущем сеансе.

В частности, словарь sessions устанавливает соответствие подключения сокета клиента некому объекту Session, который является объектом Python namedtuple, который содержит значение адреса данного клиента и тот объект file, который ассоциирован с данным подключением клиента. Соответствующий словарь callback устанавливает соответствие соединений сокета клиента обратным вызовам, которые являются возвращаемым значением надлежащего асинхронного генератора, который мы реализуем позднее; каждый из этих обратных вызовов получает свои соответствующие подключения сокета клиента и считываемые у этого клиента данные в качестве аргументов. Наконец, словарь generators выполняет соответствие подключений сокета клиента его соответствующим асинхронным генераторам.

Теперь давайте рассмотрим свою функцию reactor:


# Chapter18/example6.py

import socket, select

# Main event loop
def reactor(host, port):
    sock = socket.socket()
    sock.bind((host, port))
    sock.listen(5)
    sock.setblocking(0) # Make asynchronous

    sessions[sock] = None
    print(f'Server up, running, and waiting for call on {host} {port}')

    try:
        while True:
            # Serve existing clients only if they already have data ready
            ready_to_read, _, _ = select.select(sessions, [], [], 0.1)
            for conn in ready_to_read:
                if conn is sock:
                    conn, cli_address = sock.accept()
                    connect(conn, cli_address)
                    continue

                line = sessions[conn].file.readline()
                if line:
                    callback[conn](conn, line.rstrip())
                else:
                    disconnect(conn)
    finally:
        sock.close()
 	   

Помимо того что мы уже имеем от своего сервера с блокированием, мы добавили ряд инструкций: мы применяем метод setblocking() из встроенного модуля socket чтобы потенциально превратить свой сервер в асинхронный, точнее без блокировок; когда мы запускаем некий сервер, мы также регистрируем этот конкретный сокет в соответствующем словаре sessions со значением None на текущий момент.

Внутри своего бесконечного цикла while (цикла событий) пребывает часть нашего нового свойства отсутствия блокирования, которое мы намерены реализовать. Для начала мы применяем метод select() из модуля select для выделения соответствующих сокетов из того словаря sessions, которые готовы к чтению (короче говоря, тех сокетов, которые имеют доступные данные). Так как самый первый аргумент данного метода служит подлежащим считыванию данным, мы передаём только сам словарь sessions в этом самом первом аргументе. Наш четвёртый аргумент определяет значение промежутка таймаута для данного метода (в секундах); если он не определён, это метод будет блокироваться бесконечно долго пока не станет доступным хотя бы один элемент в sessions, что не приемлемо для нашего сервера свободного от блокировок.

Затем, для каждого готового к чтению подключения сокета клиента, если такое соединение соответствует нашему первоначальному сокету сервера, мы получаем таое подключение и вызываем функцию connect() (которую мы вскоре увидим). В этом цикле for мы также обрабатываем соответствующие методологии обратного вызова. В частности, мы осуществим доступ к атрибуту file для данного сеанса своего текущего подключения сокета (напомним, что каждый сеанс имеет некий атрибут address и какой- то атрибут file) и считаем данные из него воспользовавшись методом readline(). Теперь, если то что мы считали является допустимыми данными, тогда мы передадим их (совместно с текущим подключением клиента) в соответствующий обратный вызов; в противном сучае мы завершаем это соединение.

Отметим, что хотя наш сервер и сделан асинхронным посредством установки данного сокета не блокирующим, сам предыдущий метод readline() всё ещё блокирующая функция. Эта функция readline() выполняет возврат когда она получает символ возврата каретки в своих данных на входе (соответствующий символ '\r' в ASCII). Это означает, что если те данные, которые отправлены неким образом клиентом не содержат какого- то символа возврата каретки, ткгда данная функция readline() откажет в возврате. Тем не менее, поскольку наш сервер свободен от блокирования, будет возбуждено некое исключительное состояние ошибки с тем, чтобы прочие клиенты не были блокированы.

Теперь давайте рассмотрим свои вспомогательные функции:


# Chapter18/example6.py

def connect(conn, cli_address):
    sessions[conn] = Session(cli_address, conn.makefile())

    gen = process_request(conn)
    generators[conn] = gen
    callback[conn] = gen.send(None) # Start the generator

def disconnect(conn):
    gen = generators.pop(conn)
    gen.close()
    sessions[conn].file.close()
    conn.close()

    del sessions[conn]
    del callback[conn]
 	   

Наша функция connect() которая должна быть вызвана когда некое подключение клиента имеет подлежащие считыванию данные, проинициализирует инструкции запуска в самом начале некоего допустимого подключения с каким- то клиентом. Для начала она инициализирует тот объект namedtuple, который ассоциирован с этим конкретным подключением клиента (здесь мы всё ещё применяем метод ) makefile() для создания требуемого объекта file). Оставшаяся часть данной функции являет собой то, что мы наблюдали при использовании нами шаблона асинхронных генераторов, что мы обсуждали выше: мы передаём само соединение клиента в process_request(), который теперь является неким асинхронным генератором; регистрируем его с своём словаре generators; заставляем его сделать вызов send(None) для инициации данного генератора; и сохраняем возвращаемое значение в своём словаре callback с тем, чтобы его можно было вызвать позднее (в частности, в самой последней части нашего цикла событий в соответствующем реакторе, который мы только что рассмотрели).

Наша функция disconnect(), с другой стороны, сопутствует различным инструкциям очистки при остановке некоторого подключения с клиентом. Она удаляет те генераторы, которые ассоциированы с этим клиентским подключением из соответствующего словаря generators и закрывает сам генератор, тот объект file, который хранится в соответствующем словаре sessions, а также само это подключение клиента. Наконец, она удаляет значение ключа, которое соответствует данному подключению клиента из остающихся словарей.

Давайте переключим своё внимание на свою новую функцию process_request(), которая теперь является неким асинхронным генератором:


# Chapter18/example6.py

from operator import mul
from functools import reduce

###########################################################################
# User's Business Logic

async def process_request(conn):
    print(f'Received connection from {sessions[conn].address}')
    mode = 'sum'

    try:
        conn.sendall(b'<welcome: starting in sum mode>\n')
        while True:
            line = await readline(conn)
            if line == 'quit':
                conn.sendall(b'connection closed\r\n')
                return
            if line == 'sum':
                conn.sendall(b'<switching to sum mode>\r\n')
                mode = 'sum'
                continue
            if line == 'product':
                conn.sendall(b'<switching to product mode>\r\n')
                mode = 'product'
                continue

            print(f'{sessions[conn].address} --> {line}')
            try:
                nums = list(map(int, line.split(',')))
            except ValueError:
                conn.sendall(
                    b'ERROR. Enter only integers separated by commas\n')
                continue

            if mode == 'sum':
                conn.sendall(b'Sum of input integers: %a\r\n'
                    % str(sum(nums)))
            else:
                conn.sendall(b'Product of input integers: %a\r\n'
                    % str(reduce(mul, nums, 1)))
    finally:
        print(f'{sessions[conn].address} quit')
 	   

Та логика, что обрабатывает данные клиента и выполняет необходимые вычисления остаётся прежней, и единственное отличие состоит в том, что эта новая функция содержит ключевое слово async (помещаемое перед соответствующим ключевым словом def) и ключевое слово await, применяемое с нашей новой функцией readline(). Эти отличия, по существу, преобразуют нашу функцию process_request() в некую свободную от блокирования, причём с тем условием, что наша новая функция readline() также не блокирующая:


# Chapter18/example6.py

import types

@types.coroutine
def readline(conn):
    def inner(conn, line):
        gen = generators[conn]
        try:
            callback[conn] = gen.send(line) # Continue the generator
        except StopIteration:
            disconnect(conn)

    line = yield inner
    return line
 	   

Аналогично тому что мы наблюдали в своём предыдущем примере, мы импортируем соответствующий модуль types из Python с помощью декоратора @types.coroutine чтобы превратить свою функцию readline() в некую сопрограмму на основе генератора, который свободен от блокирования. При каждом обращении к обратному вызову (который получает некое подключение клиента и какую- то строку данных), наш поток исполнения проходит в функцию inner() внутри этой сопрограммы и исполняет её инструкции.

В частности, он отправляет эту строку данных в надлежащий генератор, который позоволит необходимым инструкциям в process_request() обработать их асинхронным образом и сохранить само возвращаемое значение в соответствующем обратном вызове - пока не будет достигнут сам конец этого генератора и в этом случае будет вызвана соответствующая функция disconnect().

Наша последняя задача состоит в проверке того, что этот сервер на самом деле способен обрабатывать множество клиентов одновременно.Для этого запустим вначале следующий сценарий:


> python3 example6.py
Server up, running, and waiting for call on localhost 8080
		

Аналогично тому, что мы наблюдали ранее, откроем два дополнительных Терминала и воспользуемся Telnet в обоих для подключения к запущенному серверу:


> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
<welcome: starting in sum mode>
		

Как вы можете наблюдать, оба клиента обрабатываются должным образом: оба имеют возможность подключения и оба получили приветственные сообщения. Это также иллюстрируется следующим выводом самого сервера:


> python3 example6.py
Server up, running, and waiting for call on localhost 8080
Received connection from ('127.0.0.1', 63855)
Received connection from ('127.0.0.1', 63856)
		

Дальнейшие проверки могут содержать отправку сообщений в наш сервер в одно и то же время, что всё ещё может быть обработано. Данный сервер также способен отслеживать индивидуальные режимы вычислений, которые являются уникальными для персональных клиентов (иначе говоря, допуская чтобы каждый из клиентов имел некий обособленный режим вычислений). Мы успешно построили с нуля сервер совместной обработки свободный от блокировок.

Выводы

Зачастую сетевое программирование нижнего уровня вовлекает в себя манипуляцию и обработку сокетов (определяемых как теоретические оконечные точки внутри своих узлов в некоторой конкретной вычислительной среде, ответственных за получение или отправку данных из тех узлов, в которых они расположены). Сама архитектура взаимодействия стороны сервера состоит из множества этапов, вовлекаемых в обработку сокетом, таких как связывание, ожидание, приём, чтение и запись. Встроенный модуль socket предоставляет интуитивно понятный API, который способствует этим этапам.

Для создания некоего сервера без блокирований при помощи такого модуля socket необходимо реализовать асинхронные генераторы чтобы переключать имеющийся поток исполнения между задачами и данными. Это процесс также вовлекает в себя применение обратных вызовов, которые могут запускаться исполнением потока в более позднее время. Два данных элемента позволяют нашему серверу считывать и обрабатывать те данные, что поступают от множества клиентов одновременно, позволяя становиться такому серверу свободным от блокирования.

Мы завершим свою книгу в следующей главе практической техникой проектирования и реализации программ совместной обработки. В частности, мы обсудим как проверять, отлаживать и планировать приложения совместной обработкиЮ причём методично и действенно.

Вопросы

  • Что такое сокет? Насколько он значим в сетевом программировании?

  • Что представляет из себя процедура взаимодействия стороны сервера когда некий потенциальный клиент выполняет запрос на подключение?

  • Расскажите о некоторых предоставляемых встроенным модулем socket методах для содействия сетевому программированию нижнего уровня стороны сервера.

  • Что представляют собой генераторы? В чём их преимущество в сопоставлении со списками Python?

  • Что являют собой асинхронные генераторы? Как они могут применяться для построения сервера, свободного от блокировок?

Дальнейшее чтение

Для получения дополнительных сведений вы можете воспользоваться следующими ссылками: