Глава 10. Реализация асинхронного программирования в Python

Данная глава представляет введение в модуль Python asyncio. Она охватывает основную идею, стоящую за этим новым модулем совместной обработки, который применяет циклы событий и сопрограммы, а также предоставляет некий API, который настолько же читабелен, насколько и синхронный код. В этой главе мы также обсудим саму реализацию асинхронного программирования в дополнение к потокам и многопроцессности посредством модуля concurrent.futures. На протяжении этого мы также обсудим конкретное приложение асинхронного программирования при помощи самых распространённых применений asyncio, включая асинхронные ввод/ вывод и исключение задач с блокированием.

В этой главе будут рассмотрены следующие темы:

  • Все фундаментальные элементы реализации асинхронного программирования с применением asyncio

  • предоставляемую asyncio инфраструктуру для асинхронного программирования

  • Сам модуль concurrent.futures и его применение в отношении asyncio

Технические требования

Вот перечень предварительных требований для данной главы:

  • Убедитесь что на вашем компьютере уже установлен Python 3

  • Вам следует иметь установленными OpenCV и NumPy для вашего дистрибутива Python 3

  • Выгрузите необходимый репозиторий из GitHub

  • На протяжении данной главы мы будем работать с вложенной папкой, имеющей название Chapter10

  • Ознакомьтесь со следующими видеоматериалами Code in Action

Модуль asyncio

Как вы уже видели в предыдущей главе, модуль asyncio предоставляет простой способ преобразования некоторой последовательной программы в асинхронную. В этом разделе мы будем обсуждать саму общую структуру некоторой асинхронной программы, а впоследствии как реализовывать в Python собственно преобразование из последовательной программы в асинхронную.

Сопрограммы, цикл событий и фьючерсы

Сущствует несколько общих элементов, которые имеют большинство асинхронных программ и тремя из этих элементов являются сопрограммы, циклы событий и фьючерсы. Они определены следующим образом:

  • Циклы событий (Event loops) являются основными координаторами задач в какой- то асинхронной программе. Некий цикл событий отслеживает все свои задачи которые должны исполняться асинхронно и принимает решение какая из задач должна исполняться в некий определённый момент. Другими словами, циклы событий обрабатывают соответствующую сторону переключения задач (или имеющийся поток исполнения) асинхронной программы.

  • Сопрограммы (coroutines) являются неким особым типом функций, которые обёртывают определённые задачи с тем, чтобы они могли исполняться асинхронно. Некая сопрограмма требует упорядочения в определении где в данной функции должно иметь место соответствующее переключение задачи; иными словами, они определяют где данная функция должна вернуть обратно свой поток исполнения в соответствующий цикл событий. Все задачи для сопрограмм обычно сохраняются в некоторой очереди задач, либо создаются внутри самого цикла событий.

  • Фьючерсы (Futures) являются местоблюстителями для тех результатов, которые возвращаются из сопрограмм. Эти объекты фьючерсов создаются как только сопрограммы инициируются в своём цикле событий, тем самым, фьючерсы могут представлять актуальные результаты, находящиеся в рассмотрении результаты (если данная сопрограмма ещё пока не завершила своё исполнение) или даже какую- то исключительную ситуацию (если это именно то, что будет возвращать данная сопрограмма). {Прим. пер.: рекомендуем также ознакомиться с нашим переводом 2 издания Книга рецептов параллельного программирования Python Джанкарло Закконе}

{Прим. пер.: тем, кому интересно внутреннее устройство лежащих в основе Asyncio генераторов, сопрограмм и асинхронных генераторов, рекомендуем свой перевод Внутреннее устройство CPython Энтони Шоу, изданной в январе 2021 RealPython. Тут же вы можете ознакомиться с новой реализацией параллельности в Python, появившейся начиная с версии 3.9, подчинённым интерпретатором.}

Некий цикл событий, сопрограммы и соответствующие фьючерсы являются составляющими ядро элементами некоторого асинхронного процесса. Во- первых запускается сам цикл событий и осуществлет взаимодействие со своей очередью задач чтобы получить самую первую задачу. Затем создаются необходимая сопрограмма для такой задачи и соответствующие фьючерсы. Когда внутри такой сопрограммы должно иметь место некое переключение задачи, данная сопрограмма откладывается, и вызывается следующая сопрограмма; также сохраняются все данные и соответствующий контекст из приостанавливаемой первой сопрограммы.

Теперь, если данная сопрограмма являются блокирующей (к примеру, обработка ввода/ вывода или засыпание), основной поток исполнения высвобождается обратно в свой цикл событий. что выполнит перемещение к следующему элементу в имеющейся очереди задач. Перед тем как он переключится обратно на самую первую задачу, имеющийся цикл событий будет инициировать самую последнюю задачу в своей очереди задач и продолжит исполнение с того места, где была осуществлена приостановка.

Как только каждая задача завершает своё исполнение, она удаляется из текущей очереди задач, её сопрограмма прекращается, а соответствующий фьючерс регистрирует возвращаемые из эой сопрограммы результаты. Данный процесс будет продолжаться пока полностью не исполнятся все задачи из соответствующей очереди задач. Приводимая ниже схема дополнительно иллюстрирует общую структуру описанного ранее асинхронного процесса:

 

Рисунок 10-1


Процесс асинхронного программирования

API Asyncio

Имея ввиду основную общую структуру некоторой асинхронной программы, давайте рассмотрим тот особый API, который предоставляют модуль asyncio и Python для реализации асинхронных программ. Самым первым вкладом в данный API являются ключевые слова async и await, которые были добавлены в Python 3.5. Эти ключевые слова применяются для определения самых основных элементов некоторой асинхронной программы в Python.

В частности, async как правило помещается перед соответствующим ключевым словом def при объявлении некоторой функции. Некая функция с соответствующим ключевым словом async перед нею будет интерпретироваться Python как какая- то сопрограмма. Как мы уже обсуждали, внутри всякой сопрограммы должно присутствовать некое определение относительно того когда имеют место события переключения задачи. Соответствующее ключевое слово await затем применяется для предписания где и когда в точности вернуть обратно данный поток исполнения в свой цикл событий; обычно это выполняется через ожидание того чтобы иная сопрограмма произвела некий результат (await.coroutine), либо через вспомогательные функции из самого модуля asyncio, например, функции asyncio.sleep() и asyncio.wait().

Важно отметить, что данные ключевые слова async и await на самом деле предоставляются самим Python, а не модулем asyncio. Это означает, что асинхронное программирование в действительности может быть реализовано и без asyncio, но как вы видите, asyncio предоставляет некую рабочую среду и инфраструктуру для рационализации данного процесса и таким образом является первейшим инструментом в Python для реализации асинхронного программирования.

В особенности, наиболее часто применяемым API из этого модуля asyncio является функциональность управления циклом событий. При помощи asyncio вы можете начать манипулировать своими задачами и циклом событий интуитивно понятными и простыми вызовами функций без интенсивного приготовления кода. Они содержат следующее:

  • asyncio.get_event_loop(): Данный метод возвращает соответствующийцикл событий для нашего текущего контекста,которым является некий объект AbstractEventLoop. В большинстве случаев нам не следует беспокоиться об этом классе, так как наш модуль asyncio предоставляет некий API верхнего уровня для управления нашими циклами событий.

  • AbstractEventLoop.create_task(): Этот метод подлежит вызову некоторым циклом соытий. Он добавляет свои входные данные в текущуюочередь задач данного вызываемого цикла событий; собственно данными ввода обычно является некая сопрограмма (то есть некая функция с ключевым словом async).

  • AbstractEventLoop.run_until_complete(): Метод также подлежит вызову из некого цикла событий. Он получает свою основную сопрограмму некоторой асинхронной программы и исполняет его пока не будет возвращён соответствующий фьючерс данной сопрограммы. В то время как этот метод инициирует исполнение своего цикла событий, он также блокирует весь последующий код вслед за ним пока не завершатся все фьючерсы.

  • AbstractEventLoop.run_forever(): Данный метод является чем- то аналогичным AbstractEventLoop.run_until_complete(), за исключением того факта, что как предполагает его название, соответствующий цикл событий будет исполнять его всегда, пока не будет вызван соответствующий метод AbstractEventLoop.stop(). таким образом, вместо выхода наш цикл будет продолжать запуск, даже после получения его возвращаемых фьючерсов.

  • AbstractEventLoop.stop(): Этот метод вызывает со стороны вызываемого цикла событий останов исполнения и выход при первой же подходящей возможности, причём не вызывая при этом крушения всей программы целиком.

Помимо этих методов мы пользуемся некоторым числом неблокирующих функций для снаряжения своих задач переключением событий. Они включают в свой состав такие:

  • asyncio.sleep(): В то время как она сама является сопрограммой, данная функция создаёт некую дополнительную сопрограмму, которая выполняется через заданный промежуток времени (предписываемый во входных данных в секундах). Обычно применяется в виде asyncio.sleep(0), чтобы вызвать немедленное событие переключения задачи.

  • asyncio.wait(): Данная функция также некая сопрограмма и, следовательно, она может применяться для переключения задач. Она получает некую последовательность (как правило, список) фьючерсов и дожидается завершения их исполнения.

Инфраструктура asyncio в действии

Как вы уже заметили, asyncio предоставляет некий простой и интуитивно понятный способ реализации соответствующей инфраструктуры асинхронной программы при помощи ключевых слов асинхронного программирования Python. Вооружившись этим, давайте рассмотрим определённый процесс применения нашей среды разработки применительно к какому- то последовательному приложению из Python с его преобразованием в асинхронное.

Асинхронный обратный отсчёт

Давайте рассмотрим свой следующий файл Chapter10/example1.py:


# Chapter10/example1.py

import time

def count_down(name, delay):
    indents = (ord(name) - ord('A')) * '\t'

    n = 3
    while n:
        time.sleep(delay)

        duration = time.perf_counter() - start
        print('-' * 40)
        print('%.4f \t%s%s = %i' % (duration, indents, name, n))

        n -= 1

start = time.perf_counter()

count_down('A', 1)
count_down('B', 0.8)
count_down('C', 0.5)

print('-' * 40)
print('Done.')
 	   

Основной целю данного примера является иллюстрация самой асинхронной природы перекрывающихся процессов и времени ожидания независимых задач. Для этого мы проанализируем некую функцию обратного отсчёта (count_down()), которая получает некую строку и какое- то время задержки. Затем она будет вести обратный отсчёт от 3 до 1 в секундах и при этом выводить на печать значение времени, истекшего с самого начала исполнения данной функции, а также полученную на входе строку (с текущим числом обратного отсчёта).

В своей основной программе мы вызовем эту функцию count_down() для букв A, B и C с различными временами задержки. После исполнения данного сценария ваш вывод должен походить на следующий:


> python example1.py
----------------------------------------
1.0006 A = 3
----------------------------------------
2.0041 A = 2
----------------------------------------
3.0055 A = 1
----------------------------------------
3.8065         B = 3
----------------------------------------
4.6070         B = 2
----------------------------------------
5.4075         B = 1
----------------------------------------
5.9081                 C = 3
----------------------------------------
6.4105                 C = 2
----------------------------------------
6.9107                 C = 1
----------------------------------------
Done.
		

Значения чисел в самом начале каждой строки указывают общее число секунд, истекших с момента запуска данной программы. Вы можете видеть, что эта программа выполнила обратный отсчёт для буквы A с интервалами в одну секунду и перешла к букве B с интервалами в 0.8 секунд, а затем, наконец, к букве C с интервалами в 0.5 секунд. Это полностью последовательная, синхронная программа, так как нет никаких перекрытий между обработкой и временем ожидания. Кроме того, она занимает приблизительно 6.9 секунд на всё время исполнения, что также является суммой всех времён обратного отсчёта для всех трёх букв:


1 second x 3 (for A) + 0.8 seconds x 3 (for B) + 0.5 seconds x 3 (for C) = 6.9 seconds
 	   

Имея на уме основную мысль, стоящую за асинхронным программированием, мы можем обнаружить, что на самом деле мы можем преобразовать эту программу в асинхронную. В частности, давайте предположим, что на протяжении той первой секунды в своей программе, когда мы дожидаемся обратного отсчёта для буквы A, мы можем переключать задачи для перехода к прочим буквам. На самом деле мы реализуем эту установку для всех наших букв внутри соответствующей функции count_down() (иными словами, мы превратим count_down() в некую сопрограмму).

Теоретически, теперь, когда все задачи обратного отсчёта являются сопрограммами в какой- то асинхронной программе, мы должны достичь лучшего времени исполнения и лучшей отзывчивости от своей программы. Так как все три задачи обрабатываются независимо, получаемые сообщения обратного отсчёта будут выводиться на печать не упорядоченно (перепрыгивая с буквы на букву), а вся асинхронная программа должна занять только то время, которое требуется для самой большой задачи (то есть три секунды для буквы A).

Но для начала давайте превратим свою программу в асинхронную. Для этого нам прежде всего понадобится превратить count_down() в некую сопрограмму и определить некую точку внутри данной функции, которая будет неким событием переключения задач. Другими словами, мы добавляем необходимое ключевое слово async перед соответствующей функцией, а вместо применявшейся ранее функции time.sleep() мы будем применять соответствующую функцию asyncio.sleep() совместно с ключевым словом await; оставшаяся часть этой функии будет той же самой. Наша сопрограмма count_down() теперь должна выглядеть так:


# Chapter10/example2.py

async def count_down(name, delay):
    indents = (ord(name) - ord('A')) * '\t'

    n = 3
    while n:
        await asyncio.sleep(delay)

        duration = time.perf_counter() - start
        print('-' * 40)
        print('%.4f \t%s%s = %i' % (duration, indents, name, n))

        n -= 1
 	   

В своей основной программе мы проинициализируем некий цикл событий и будем управлять им. В частности, мы создадим некий пустой цикл при помощи метода asyncio.get_event_loop(), добавим все три задачи обратного отсчёта в его очередь задач с помощью AbstractEventLoop.create_task() и, наконец, запустим исполнение этого цикла событий посредством AbstractEventLoop.run_until_complete(). Наша основная программа должна выглядеть следующим образом:


# Chapter10/example2.py

loop = asyncio.get_event_loop()
tasks = [
    loop.create_task(count_down('A', 1)),
    loop.create_task(count_down('B', 0.8)),
    loop.create_task(count_down('C', 0.5))
]

start = time.perf_counter()
loop.run_until_complete(asyncio.wait(tasks))

print('-' * 40)
print('Done.')
 	   

Весь сценарий также можно отыскать в соответствующем репозитории кода этой книги внутри вложенной папки Chapter10, с названием example2.py. После запуска этого сценария ваш вывод должен быть аналогичен такому:


> python example2.py
----------------------------------------
0.5029                 C = 3
----------------------------------------
0.8008         B = 3
----------------------------------------
1.0049 A = 3
----------------------------------------
1.0050                 C = 2
----------------------------------------
1.5070                 C = 1
----------------------------------------
1.6011         B = 2
----------------------------------------
2.0090 A = 2
----------------------------------------
2.4068         B = 1
----------------------------------------
3.0147 A = 1
----------------------------------------
Done.
		

Теперь вы можете видеть как наличие некоей асинхронной программы может улучшить общее время исполнения и отзывчивость наших программ. Вместо исполнения индивидуальных задач последовательно, наша программа теперь переключается между различными обратными отсчётами и выполняет перекрытие их времён обработки/ ожидания. Это, как мы уже обсуждали, имеет результатом вывод на печать различных букв промеж собой или даже одновременно.

В самом начале нашей программы, вместо того чтобы выполнять ожидание в течении всей секунды для вывода самого первого сообщения A = 3, наша программа переключается на следующую задачу из своей очереди задач (в данном случае это ожидание в течении 0.8 секунды для буквы B). Данный процесс продолжится пока не пройдут 0.5 секунд и не будет выведено на печать C = 3, а по истечении ещё 0.3 секунд (в момент времени, соответствующий 0.8 секунды), на печать выводится B = 3. Всё это происходит до того как будет выведено на печать A = 3.

Такое свойство переключения задач нашей асинхронной программы делает её значительно более отзывчивой. Вместо того чтобы зависать на одну секунду прежде чем вывести на печать своё первое сообщение, новой программе потребуется всего 0.5 секунды (самый короткий период ожидания) для вывода на печать своего первого сообщения. Что касается значения времени исполнения, вы можете обнаружить, что на этот раз оно отнимает всего три секунды для исполнения программы целиком (вместо 6.9 секунд). Это в точности соответствует нашим рассуждениям: что значение времени исполнения будет близко по значению к тому, которое требуется для исполнения самой большой задачи.

Замечание относительно функций с блокировкой

Вы наверное уже обратили внимание, что мы заменили нашу первоначальную функцию time.sleep() на её эквивалент из asyncio. Это сделано по той причине, что time.sleep() по своей сути является блокирующей функцией, что означает что она не может применяться для реализации некоторой задачи переключения событий. Для проверки этого в нашем файле Chapter10/example2.py (нашей асинхронной программе) мы заменим следующую строку кода:


await asyncio.sleep(delay)
 	   

Приведённый выше код будет заменён на такой:


time.sleep(delay)
 	   

После запуска этого нашего нового сценария ваш вывод будет в точности тем же самым, что был в нашей первоначальной, синхронной программе. Поэтому замена await asyncio.sleep() на time.sleep() в действительности преобразует нашу программу обратно в синхронную, игнорируя тот цикл событий, что мы организовали. Что произошло, когда наша дошла в своём исполнении до этой строки внутри рассматриваемой функции count_down(), так это то что в действительности time.sleep() выполняет блокирование и не допускает освобождение своего потока исполнения, по существу перестраивая всю программу в целом вновь в синхронную. Для исправления этой проблемы верните time.sleep() обратно в await asyncio.sleep().

Приводимая ниже схема иллюстрирует некий пример существенного различия во времени исполнения между блокирующей и не блокирующей обработкой файла:

 

Рисунок 10-2


Сопоставление работы с блокированием и без него

Это явление возбуждает интересную проблему: если некая тяжёлая, долгоиграющая задача является блокирующей, тогда в буквальном смысле слова невозможно реализовать асинхронное программирование при помощи такой задачи в качестве сопрограммы. Поэтому если бы вы на самом деле желаете достичь того чтобы блокирующие функции вернулись в некое асинхронное приложение, нам бы понадобилось реализовать некую иную версию такой функции с блокировкой, которую можно было бы сделать изнутри некоторой сопрограммы и позволить задачам переключения событий иметь место по крайней мере в одной точке внутри такой функции.

К счастью, после реализации Code в качестве одного из официальных средств Python, разработчики ядра Python находятся в постоянной работе по воспроизводству необходимых версий сопрограмм для наиболее часто применяемых функций с блокированием. Это означает, что даже ели вы и обнаружите функции с блокировкой, которые препятствуют тому чтобы ваша программа была истинно асинхронной, вы скорее всего будете иметь возможность отыскать версии с сопрограммами для таких функций, чтобы реализовать их в своей программе.

Тем не менее, тот факт, что имеются асинхронные версии традиционных функций Python с блокировкой и потенциально иным API означает, что вам потребуется ознакомиться с такими API различных функций. Иным способом обработки функций с блокировкой без необходимости реализации их версий сопрограмм состоит в применении исполнителя для запуска таких функций в отдельных потоках или отдельных процессах во избежание блокирования данного потока основного цикла событий.

Асинхронная проверка на простое число

Отталкиваясь от нашего начального примера обратного отсчёта, давайте перестроим ещё один свой пример из предыдущей главы. В качестве напоминания, ниже приводится код для нашей синхронной версии данной программы:


# Chapter09/example1.py

from math import sqrt

def is_prime(x):
    print('Processing %i...' % x)

    if x < 2:
        print('%i is not a prime number.' % x)

    elif x == 2:
        print('%i is a prime number.' % x)

    elif x % 2 == 0:
        print('%i is not a prime number.' % x)

    else:
        limit = int(sqrt(x)) + 1
        for i in range(3, limit, 2):
            if x % i == 0:
                print('%i is not a prime number.' % x)
                return

        print('%i is a prime number.' % x)

if __name__ == '__main__':

    is_prime(9637529763296797)
    is_prime(427920331)
    is_prime(157)
 	   

Как мы это уже обсуждали в своей последней главе, у нас имеется некая простая функция проверки на целое, is_prime(), которая выводит на печать сообщения, указывающие будет ли полученное на входе целое число, x, целым числом. В своей основной программе мы вызываем is_prime() для трёх целых чисел в порядке убывающей по величине последовательности. Такая настройка, опять- таки, создаёт значительный промежуток времени на протяжении которого которого наша программа кажется подвисшей, пока обрабатывает самое большое значение на входе, что имеет результатом слабую реакцию для этой программы.

Производимый этой программой вывод будет выглядеть как- то так:


Processing 9637529763296797...
9637529763296797 is a prime number.
Processing 427920331...
427920331 is a prime number.
Processing 157...
157 is a prime number.
		

Чтобы реализовать асинхронное программирование данного сценария, во- первых, нам придётся создать свой первый основной компонент: цикл событий. Для этого вместо применения сферы действия '__main__' , мы преобразуем её в некую отдельную функцию. Эта функция и наша функция проверки на целое is_prime() будут соответствующими сопрограммами в нашей окончательной асинхронной сопрограмме.

Теперь нам требуется преобразовать и функцию is_prime(), и функцию main() в сопрограммы; повторим, это означает помещение соответствующего ключевого слова aysnc перед их ключевым словом def, а также необходимого ключевого слова await внутри каждой функции для определения соответствующего события переключения задач. Для main() мы просто реализуем данное событие при ожидании в соответствующей очереди задач воспользовавшись aysncio.wait() следующим образом:


# Chapter09/example2.py

async def main():

    task1 = loop.create_task(is_prime(9637529763296797))
    task2 = loop.create_task(is_prime(427920331))
    task3 = loop.create_task(is_prime(157))

    await asyncio.wait([task1, task2, task3])
 	   

В нашей функции is_prime() дела несколько сложнее, так как там нет очевидной точки в которой можно было бы вернуть обратно в цикл событий поток исполнения, как это было в нашем предыдущем примере обратного отсчёта. Вспоминая, что основная цель асинхронного программирования состоит в достижении наилучшего времени исполнения и восприимчивости, и чтобы реализовать это, события переключения задач должны иметь место на протяжении тяжёлых долгоиграющих задач. Это требование, тем не менее, зависит от особенностей вашей программы - в частности, самой сопрограммы, имеющейся очереди задач данной программы, а также индивидуальных задач в этой очереди.

К примеру, имеющаяся в нашей программе очередь задач состоит из трёх чисел: 9637529763296797, 427920331 и 157; по порядку, мы можем рассматривать их как некую большую задачу, среднюю задачу и маленькую задачу. Для улучшения отзывчивости мы бы хотели переключать задачи на протяжнии своей большой задачи и не делать этого в маленькой задаче. Такая настройка позволит нашим средней и малой задачам стартовать, выполняться, а может быть даже и завершиться во время исполнения самой большой задачи, даже несмотря на то, что эта самая большая задача будет спереди нашей очереди задач в данной программе.

Далее мы рассмотрим свою сопрограмму is_prime(). После проверки на некоторые краевые возможности, она выполняет итерации в некотором цикле for для всех нечётных чисел ниже квадратного корня от входного числа и проверяет делением своё входное число на соответствующее текущее нечётное число в запросе. Внутри этого долгоиграющего цикла for, следовательно, самое исключительное место для помещения переключения задач - то есть для высвобождения данного потока исполнения в его цикл событий.

Однако нам всё ещё требуется принять решение в какой конкретной точке данного цикла for реализовать такое событие переключения задач. И вновь, принимая во внимание особенности индивидуальных задач в нашей очереди задач, мы ищем точку, которая является достаточно общей в наших больших задачах, не так распространена в средних и отсутствует в малых. Я принял решение, что такой точкой является всякий период из 100 000 чисел, что соответствует нашим требованиям и я воспользовался командой await asyncio.sleep(0) для организации необходимого события переключения задач таким манером:


# Chapter09/example2.py

from math import sqrt
import asyncio

async def is_prime(x):
    print('Processing %i...' % x)

    if x < 2:
        print('%i is not a prime number.' % x)

    elif x == 2:
        print('%i is a prime number.' % x)

    elif x % 2 == 0:
        print('%i is not a prime number.' % x)

    else:
        limit = int(sqrt(x)) + 1
        for i in range(3, limit, 2):
            if x % i == 0:
                print('%i is not a prime number.' % x)
                return
            elif i % 100000 == 1:
                await asyncio.sleep(0)

        print('%i is a prime number.' % x)
 	   

Наконец, в своей основной программе (не путать с сопрограммой main()), мы создаём свой цикл событий и применяем её для запуска своей сопрограммы main(), пока она не завершит своего исполнения:


try:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
except Exception as e:
    print('There was a problem:')
    print(str(e))
finally:
    loop.close()
 	   

Как вы это уже видели в нашей предыдущей главе, такая асинхронная версия нашего сценария достигает лучшей отзывчивости. В частности, вместо того чтобы выглядеть повисшей, при обработке своей самой большой задачи, наша программа теперь выводит на печать сообщения для прочих задач меньшего размера прежде чем завершить эту свою самую большую задачу. Наш окончательный результат выглядит примерно так:


Processing 9637529763296797...
Processing 427920331...
427920331 is a prime number.
Processing 157...
157 is a prime number.
9637529763296797 is a prime number.
		

Улучшения в Python 3.7

Относительно 2018, только что вышел Python 3.7, причём с некоторыми важными новыми функциями, такими как классы данных, словари с обеспечением упорядоченности, лучшая точность времени и тому подобное. Асинхронное программирование и модуль asyncio получили ряд важных улучшений.

Прежде всего, async и await теперь являются официальными зарезервированными ключевыми словами. Хотя мы и называли их ключевыми словами, Python фактически нет, трактуя данные слова как зарезервированные ключевые только начиная с этого момента. Это означает, что ни слово async, ни await не могут теперь применяться в качестве названия переменной или функции в программе на Python. Если вы применяете Python 3.7 или выше, включите интерпретатор Python и попробуйте воспользоваться этими ключевыми словами в качестве имён переменных или функций и вы должны получить такое сообщение об ошибке:


>>> def async():
  File "<stdin>", line 1
    def async():
            ^
SyntaxError: invalid syntax
>>> await = 0
  File "<stdin>", line 1
    await = 0
          ^
SyntaxError: invalid syntax
		

Некое существенное улучшение в Python 3.7 пришло и в модуле asyncio. В частности, из наших предыдущих примеров вы могли заметить, что наша основная программа обычно содержала достаточный объём подготовительного кода для инициализации и запуска необходимого цикла событий, который скорее всего остаётся одним и тем же во всех асинхронных программах:


loop = asyncio.get_event_loop()
asyncio.run_until_complete(main())
 	   

Когда main() выступает в роли сопрограммы в нашей программе, asyncio делает возможным для нас просто запускать её в некотором цикле событий при помощи метода asyncio.run(). Это позволяет избегать значительного подготовительного кода при асинхронном программировании в Python.

Поэтому мы можем преобразовать свой предыдущий код в его более простую версию для Python 3.7 таким манером:


asyncio.run(main())
 	   

Существуют и прочие улучшения реализованные в Python 3.7, как относительно производительности, так и более простого применения; тем не менее, мы не будем их обсуждать в данной книге. {Прим. пер.: более подробно они обсуждаются, например, в нашем переводе &quo;Asyncio в Python 3&quo;: Создать задачу? Обеспечить фьючерс? Прими решение! и далее: Диспетчеры контекста Async: async with}

Задачи с внутренне присущим блокированием

В нашем самом первом примере из этой главы вы увидели, что асинхронное программирование может предоставлять нашим программам Python значительное улучшение времени исполнения, но это не всегда так. Асинхронное программирование само по себе может предоставлять улучшения в скорости только если все обрабатываемые задачи не являются блокирующими. Тем не менее, аналогично нашему сравнению между совместной и последовательной обработками в программировании, некоторым вычислительным задачам в Python внутренне присуще блокирование и, тем самым, они не могут получать преимуществ асинхронного программирования.

Это означает, что если ваше асинхронное программирование имеет задачи с внутренне присущей им блокировкой в каких-то сопрограммах, такая программа не получит никаких дополнительных выигрышей в скорости от предоставляемой асинхронной архитектуры. Хотя события переключения задач и будут иметь место в таких программах, что улучшит отзывчивость этих программ, никакие инструкции не будут перекрывать друг друга и никакое дополнительное ускорение за счёт этого не будет достигнуто. На самом деле, поскольку имеются значительные накладные расходы, относящиеся к реализации асинхронного программирования в Python, наши программы могут даже потребовать большего времени на завершение своего исполнения, нежели первоначальные синхронные.

К примеру, давайте рассмотрим сопоставление в скорости между нашими двумя версиями программы проверки простых чисел. Так как наша порция первичной обработки данных состоит из сопрограммы is_prime(), которая содержит исключительно численное перемалывание, мы знаем, что эта сопрограмма содержит блокирующие задачи. Следовательно, наша асинхронная версия ожидаемо исполняется более медленно нежели её синхронная версия.

Перейдите во вложенную папку Chapter10 своего репозитория кода и взгляните на файлы example3.py и example4.py. Эти файлы содержат те же самые коды для синхронной и асинхронной программ, что мы уже видели, однако с тем добавлением, что мы теперь также и отслеживаем сколько времени занимает соответствующие программы. Ниже приводится мой вывод для исполнения example3.py, синхронной версии данной программы:


> python example3.py
Processing 9637529763296797...
9637529763296797 is a prime number.
Processing 427920331...
427920331 is a prime number.
Processing 157...
157 is a prime number.
Took 5.60 seconds.
		

А вот код отображения моего вывода при исполнении example4.py, нашей асинхронной программы:


> python example4.py
Processing 9637529763296797...
Processing 427920331...
427920331 is a prime number.
Processing 157...
157 is a prime number.
9637529763296797 is a prime number.
Took 7.89 seconds.
		

Хотя получаемый вами вывод может отличаться при определённых исполнениях каждой из программ, должен быть очевиден тот факт, что, как мы и предполагали, наша асинхронная программа требует больше времени чем её синхронный (последовательный) аналог. И опять же, это обусловлено тем, что имеющиеся задачи перемалывания чисел внутри нашей сопрограммы is_prime() являются блокирующими и, вместо перекрытия этих задач для получения дополнительной скорости, наша асинхронная программа просто переключает между задачами их исполнение. В данном случае за счёт асинхронного программирования достигается только отзывчивость.

Однако это вовсе не означает, что если ваша программа содержит блокирующие функции, асинхронное программирование будет вне круга ваших вопросов. Как мы уже упоминали ранее, всё исполнение в некоторой асинхронной программе, если не предписано обратное, происходит целиком в одном и том же потоке и процессе и блокирующие задачи, связанные с ЦПУ, могут тем самым препятствовать инструкциям программы перекрывать друг друга. Тем не менее, это не тот случай если ваши задачи распределены по отдельным потокам/ процессам. Иными словами, потоки и множество процессов могут способствовать асинхронным программам с блокирующими инструкциями достигать лучшего времени исполнения.

concurrent.futures как некое решение для задач с блокировкой

В этом разделе мы рассмотрим иной способ реализации потоков/ множества процессов: а именно модуль concurrent.futures, который разработан в качестве интерфейса верхнего уровня для реализации асинхронных задач. В частности, данный модуль concurrent.futures бесшовно работает с обсуждаемым модулем asyncio и, помимо этого, он предоставляет абстрактный класс Executor, который содержит основные скелеты двух основных классов, реализующих асинхронные потоки и многопроцессность, соответственно (как и предполагают их названия): ThreadPoolExecutor и ProcessPoolExecutor.

Изменения в инфраструктуре

Прежде чем мы ринемся в пучину concurrent.futures, давайте обсудим сами теоретические основы асинхронных потоков/ множества процессов, а также в чём их роль в имеющейся инфраструктуре асинхронного программирования, предоставляемой asyncio.

В качестве напоминания, у нас имеются три основных элемента в нашей экосистеме асинхронного программирования: цикл событий, сопрограммы и их соответствующие фьючерсы. Нам всё ещё требуется соответствующий цикл событий при реализации потоков/ множества процессов для координации имеющихся задач и для обработки их возвращаемых результатов (фьючерсов), следовательно, эти элементы остаются в согласии с асинхронным программированием единственного потока.

Что же касается сопрограмм, так как основная идея сочетания асинхронного программирования с потоками и множественностью процессов вовлекает исключение блокирующих задач в таких сопрограммах за счёт исполнения их в отдельных потоках и процессах, такие сопрограммы более со стороны Python не обязательно должны интерпретироваться как реальные сопрограммы. Вместо этого они могут быть просто обычными функциями Python.

Один новый элемент, который нам необходимо реализовать, это соответствующий исполнитель, который обеспечивает потоки и множество процессов; это может быть некий экземпляр класса ThreadPoolExecutor или ProcessPoolExecutor. Отныне, всякий раз когда мы добавляем некую задачу в свою очередь задач из имеющегося цикла событий, нам также требуется ссылаться и на этот исполнитель с тем, чтобы отдельные задачи выполнялись в раздельных потоках/ процессах. Это осуществляется при помощи метода AbstractEventLoop.run_in_executor(), который получает некий исполнитель, сопрограмму (хотя, снова- таки, она теперь не обязана быть реалной сопрограммой), а также агрументы для тех сопрограмм, которые подлежат исполнению в отдельных потоках/ процессах. В своём следующем разделе мы рассмотрим некий пример этого API.

Примеры в Python

Давайте рассмотрим конкретный пример для модуля concurrent.futures. Напомним, что в самом первом примере данной главы (наш пример обратного отсчёта), соответствующая блокирующая функция time.sleep() препятствовала нашей асинхронной программе быть полностью асинхронной и нам приходилось заменять её на некую версию без блокировки, asyncio.sleep(). Теперь, когда мы исполняем свои индивидуальные обратные отсчёты в отдельных потоках или процессах, и это означает, что блокирующая функция time.sleep() не будет вызывать никаких проблем в смысле асинхронного исполнения нашей программы.

Перейдите к нашему следующему файлу Chapter10/example5.py:


# Chapter10/example5.py

from concurrent.futures import ThreadPoolExecutor
import asyncio
import time

def count_down(name, delay):
    indents = (ord(name) - ord('A')) * '\t'

    n = 3
    while n:
        time.sleep(delay)

        duration = time.perf_counter() - start
        print('-' * 40)
        print('%.4f \t%s%s = %i' % (duration, indents, name, n))

        n -= 1

async def main():
    futures = [loop.run_in_executor(
        executor,
        count_down,
        *args
    ) for args in [('A', 1), ('B', 0.8), ('C', 0.5)]]

    await asyncio.gather(*futures)

    print('-' * 40)
    print('Done.')

start = time.perf_counter()
executor = ThreadPoolExecutor(max_workers=3)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
 	   

Отметим, что count_down() объявляется как обычная функция, не являющаяся сопрограммой Python. В main(), которая остаётся сопрограммой, мы объявляем свою очередь задач для имеющегося цикла событий. И снова, мы применяем в ходе данного процесса метод run_in_executor() вместо метода create_task(), который мы применяли при однопоточном асинхронном программировании. В нашей основной программе нам также необходимо инициировать некий исполнитель, который в данном случае является неким экземпляром класса ThreadPoolExecutor из модуля concurrent.futures.

Как мы уже обсуждали в предыдущих главах, само решение между использованием потоков и множества процессов зависит от природы нашей программы. В данном случае нам требуется разделять свою переменную start (содержащую значение времени, в которое данная программа начала исполнение) между разными сопрограммами с тем, чтобы они могли выполнять своё действие обратного отсчёта; следовательно потоки выбираются вместо множества процессов.

После исполнения данного сценария ваш вывод должен походить на такой:


> python example5.py
----------------------------------------
0.5033                 C = 3
----------------------------------------
0.8052         B = 3
----------------------------------------
1.0052 A = 3
----------------------------------------
1.0079                 C = 2
----------------------------------------
1.5103                 C = 1
----------------------------------------
1.6064         B = 2
----------------------------------------
2.0093 A = 2
----------------------------------------
2.4072         B = 1
----------------------------------------
3.0143 A = 1
----------------------------------------
Done.
		

Этот вывод идентичен тому, что мы получали в своей асинхронной программе с чистой поддержкой asyncio. Таким образом, даже с функцией блокирующей обработки мы оказались способны исполнять свою программу асинхронно при помощи потоков, реализуемых благодаря concurrent.futures.

Давайте теперь применим ту же самую концепцию к нашей задаче простых чисел. Для начала мы преобразуем свою сопрограмму is_prime() в её первоначальный вид, когда она не являлась сопрограммой, и снова исполним её в отдельном процессе (который более желателен нежели поток, так как наша функция is_prime() является некоей задачей интенсивного перемалывания чисел). Дополнительное преимущество применения такой первоначальной версии is_prime() состоит в том, что нам нет нужды выполнять проверку условия переключения задач, которую мы выполняли в своей асинхронной программе с единственным потоком:


elif i % 100000 == 1:
    await asyncio.sleep(0)
 	   

Она также предоставит нам значительное ускорение. Давайте рассмотрим следующий пример Chapter10/example6.py:


# Chapter10/example6.py

from math import sqrt
import asyncio
from concurrent.futures import ProcessPoolExecutor
from timeit import default_timer as timer

#async def is_prime(x):
def is_prime(x):
    print('Processing %i...' % x)

    if x < 2:
        print('%i is not a prime number.' % x)

    elif x == 2:
        print('%i is a prime number.' % x)

    elif x % 2 == 0:
        print('%i is not a prime number.' % x)

    else:
        limit = int(sqrt(x)) + 1
        for i in range(3, limit, 2):
            if x % i == 0:
                print('%i is not a prime number.' % x)
                return

        print('%i is a prime number.' % x)

async def main():

    task1 = loop.run_in_executor(executor, is_prime, 9637529763296797)
    task2 = loop.run_in_executor(executor, is_prime, 427920331)
    task3 = loop.run_in_executor(executor, is_prime, 157)

    await asyncio.gather(*[task1, task2, task3])

if __name__ == '__main__':
    try:
        start = timer()

        executor = ProcessPoolExecutor(max_workers=3)
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())

        print('Took %.2f seconds.' % (timer() - start))

    except Exception as e:
        print('There was a problem:')
        print(str(e))

    finally:
        loop.close()
 	   

После исполнения данного сценария я получил следующий вывод:


> python example6.py
Processing 9637529763296797...
Processing 427920331...
Processing 157...
157 is a prime number.
427920331 is a prime number.
9637529763296797 is a prime number.
Took 5.26 seconds.
		

И опять же, ваше время исполнения, скорее всего, может отличаться от моего, хотя всегда должно согласовываться сопоставление между этой и двумя другими версиями нашей программы проверки простого числа: наша первоначальная, синхронная версия требует меньше времени чем асинхронная версия с единственным потоком, но больше чем асинхронная версия с множеством процессов. Иными словами, за счёт комбинирования множества процессов и асинхронного программирования мы получаем всё лучшее из двух вселенных: согласованную отзывчивость от асинхронного программирования и улучшение в скорости за счёт множества процессов.

Выводы

В этой главе мы изучали асинхронное программирование, которое является некоторой моделью программирования которая получает преимущества координации вычислительных задач по взаимному перекрытию времён ожидания и обработки. В некоторой асинхронной программе имеется три основных компонента: цикл событий, сопрограммы и фьючерсы. Цикл событий занимается контролем планирования и управления сопрограмм при помощи своей очереди задач. Сопрограммы являются вычислительными задачами, которые подлежат асинхронному исполнению; каждая сопрограмма должна определить внутри своей функции в точности где она отдаст поток исполнения обратно своему циклу событий (то есть событие переключения задач). Фьючерсы являются объектами выделения места, которые содержат все результаты, получаемые из своих сопрограмм.

сам модуль asyncio совместно с ключевыми словами Python async и await предоставляет некий простой в применении API и интуитивно понятную среду разрабтки для реализации асинхронных программ; к тому же данная среда разработки делает получаемый асинхронный код настолько же простым для чтения, как и синхронный код, что достаточно редко встречается при асинхронном программировании. Тем не менее мы не можем применять асинхронное программирование в отдельном потоке для задач с блокирующими вычислениями только при помощи модуля asyncio. Основным решением для этого является модуль concurrent.futures, который предоставляет API верхнего уровня для для реализации асинхронных потоков и множества процессов и может применяться дополнительно к модулю asyncio.

В своей следующей главе мы обсудим одни из самых распространённых приложений асинхронного программирования, Transmission Control Protocol (TCP), как средство взаимодействия сервер- клиент. Вы изучите основы этой концепции, как она получает преимущества от асинхронного программирования и как её реализовывать в Python.

Вопросы

  • Что такое асинхронное программирование? Какие преимущества оно предоставляет?

  • Что выступает в роли основных элементов асинхронной программы? Как они взаимодействуют друг с другом?

  • Чем являются ключевые слова async и await? Какие цели они обслуживают?

  • Какие варианты предоставляет модуль asyncio в смысле реализации асинхронное программирование?

  • Какие улучшения относящиеся к асинхронному программированию предоставляет Python 3.7?

  • Что представляет из себя блокирующая функция? Почему она вызывает проблемы при обычном асинхронном программировании?

  • Каким образом concurrent.futures предоставляет некое решение блокирующим функциям при асинхронном программировании? Какие варианты он предлагает?

Дальнейшее чтение

Для получения дополнительных сведений вы можете воспользоваться следующими ссылками: