Глава 3. Прогулка по Asyncio

Asyncio предоставляет другой инструмент для параллельного программирования в Python, который обладает гораздо меньшим весом нежели потоки или многопроцессорность. В самом простом виде он выполняет это имея некий цикл событий, исполняющих некий набор задач с неким ключевым отличием в том, что каждая задача выбирает самостоятельно когда забирать управление обратно из этого цикла событий. [1].

-- Филип Джоунс, Носитель

Asyncio API в Python является сложным, так как имеет целью решать различные задачи для разных групп людей. К несчастью, чтобы помочь вам осознать какая именно часть asyncio важна для той группы, в которой находитесь вы, в доступности имеется очень мало руководств.

Моя цель состоит в том, чтобы помочь вам в этом разобраться. Существует две основные целевые аудитории для таких асинхронных свойств в Python:

  • Разработчики оконечных рабочих мест

    Эта группа может желать разрабатывать приложения с помощью asyncio. Я собираюсь предполагать что вы относитесь к этой категории.

  • Разработчики инструментальных средств

    Хотят делать некие инфраструктуры и библиотеки, которые разработчики для оконечных пользователей могут применять в своих приложениях.

Основная часть неразберихи относительно asyncio в имеющемся сообществе в наши дли проистекает и- за путаницы между этими двумя целями. Например, та документация для asyncio, которая представлена в официальной документации Python более подходит для разработчиков инструментальных средств, а не для конечных потребителей. Это означает, что читающие эту документацию разработчики для конечных пользователей быстро оказываются контуженными кажущейся сложностью. Вам придётся что- то предпринимать прежде чем вы сможете что- либо сделать с этим.

Я рассчитываю в этой книге помочь вам отделить в вашем сознании те свойства asyncio

, которые важны для разработки конечных приложений от тех, которые требуются разработчикам инструментальных средств.

[Совет]Совет

Если вам интересны подробности нижнего уровня, относящиеся к тому как создавать параллельно работающие инструментальные средства со встройкой вовнутрь чего- то побобного asyncio, я настоятельно рекомендую обсуждение Дэйва Бизли Python Concurrency From the Ground Up: LIVE!, в котором он демонстрирует помещение воедино простейшей версии асинхронной инфраструктуры, подобной asyncio.

Моя цель состоит в в предоставлении вам только самого основополагающего понимания строительных блоков Asyncio; достаточного чтобы вы были способны писать простые программы с его помощью и, несомненно, достаточными чтобы вы могли погружаться в полные руководства. [Когда это становится необходимым! На момент написания книги, единственным справочным руководством по Asyncio была спецификация API в официальной документации Python, а также набор блог постов, большая часть которых имеет ссылки в данной книге.]

Прежде всего, у нас имеется раздел "Быстрое начало", которое служит цели представления наиболее важных строительных блоков для приложений asyncio.

Быстрое начало

Чтобы применять Asyncio [на повседневной основе], вам необходимо знать о семи функциях.

-- Юрий Селиванов, автор PEP 492

Открывать официальную документацию Asyncio достаточно страшно. Имеется множество разделов с новыми, загадочными словами, а также понятия, с которыми не знакомы даже искушённые программисты Python, поскольку asyncio является очень новой вещью. Мы собираемся поломать всё это и объяснить какой подход следует применять к документации asyncio в дальнейшем, но на текущий момент вам следует знать, что реальная площадь поверхности, о которой вам придётся беспокоиться в имеющейся библиотеке asyncio намного меньше чем может показаться.

Юрий Селиванов, автор PEP 492 и основной разработчик асинхронного Python, объяснил в своём выступлении async/await в Python 3.5 и почему это восхитительно, представленном на PyCon 2016, относительно того, что большая часть API в модуле asyncio на самом деле предназначается для разработчиков инструментальных средств, а не для разработчиков конечных приложений. В э том выступлении он выделил те основные функции, о которых следует беспокоиться конечным пользователям, а эти функции всего лишь составляют небольшое подмножество всего API asyncio.

В данном разделе мы собираемся взглянуть на эти составляющие ядро функции и рассмотреть как вдарить по почве выполнения в цикле с помощью основанного на событиях программирования в Python.

Вам потребуются базовые знания сопрограмм (представленные в разделе, идущим вслед за этим), но помимо этого, если вы хотите иметь возможность использовать обсуждаемую библиотеку asyncio в качестве разработчика конечного приложения (а не как проектировщика инструментальных средств), вот моменты, которые вам необходимо знать в крошечном примере:

 

Пример 3-1. "Hello World" для Asyncio


# quickstart.py
import time
import asyncio

async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()(5)

loop = asyncio.get_event_loop()(1)
loop.create_task(main())(2)
loop.run_forever()(3)
pending = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*pending, return_exceptions=True)(4)
loop.run_until_complete(group)(3)
loop.close()(5)

Вывод:
$ python quickstart.py
Sun Sep 17 14:17:37 2017 Hello!
Sun Sep 17 14:17:38 2017 Goodbye!
 	   
  • (1) loop = asyncio.get_event_loop()

    Вам необходим некий экземпляр цикла прежде чем вы можете запустить какие бы то ни было сопрограммы, а это именно он и есть. На самом деле, где бы вы его не вызывали, get_event_loop() предоставит вам всякий раз тот же самый экземпляр loop, раз уж вы используете только один единственный поток. [API asyncio позволяет вам делать много безумных вещей с несколькими экземплярами и потоками циклов, но данная книга не является самой подходящей книгой для данной цели. 99% общего времени вы будете использовать некий единичный, основной поток для своей прикладной программы, как это показано тут.]

  • (2) task = loop.create_task(coro)

    В приведённом выше коде loop.create_task(main()) является неким особенным вызовом. Ваша функция сопрограммы не будет исполняться пока вы не сделает его. Мы говорим, что create_task() планирует исполнение вашей сопрограммы в данном цикле. Возвращаемый объект task может применяться для отслеживания текущего состояния данной задачи, например, исполняется ди она ещё или уже выполнена, а помимо этого может также применяться для получения некоторого результирующего значения от вашей завершённой сопрограммы. Вы также можете завершать данную задачу при помощи task.cancel(). [с другой стороны: вы можете заметить в этой строке кода что основным параметром в таком вызове функции для create_task() является coro. Именно это соглашение используется в большей части документации API, которую вы обнаружите, и он ссылается на некую coroutine; т.е., строго говоря, собственно результат вызова некоторой функции async def, а не саму эту функцию.]

  • (3) loop.run_until_complete(coro) и loop.run_forever()

    Это два способа запуска данного цикла. Они оба заблокируют данный текущий поток, которым обычно является самый основной поток {main}. Отметим, что run_until_complete() сохранит данный цикл исполняемым до тех пор, пока не завершится заданный coro - но все прочие запланированные задачи в этом цикле также будут исполняться при выполнении данного цикла.

  • (4) group = asyncio.gather(task1, task2, task3)

    Типичная манера для большинства программ будет состоять в том, чтобы начаться с loop.run_forever() для самой "главной" части ваше программы {main} , а затем, когда получен некий сигнал процесса, остановить данный цикл, собрать всё ещё приостановленные задачи, а затем воспользоваться loop.run_until_complete() до тех пор, пока эти задачи не выполнятся. именно этот метод служит для выполнения сбора. В более общем плане он также может применяться для сбора множества сопрограмм воедино и ожидания (при помощи await!) пока все собранные задачи не завершатся.

  • (5) loop.stop() и loop.close()

    Как уже описано выше, они применяются для постепенного останова некоторой программы. stop() обычно вызывается как следствие получения некоторого сигнала выключения, а close() обычно является самым последним действием: и оно очистит все очереди и остановит Исполнителя {Executor}. "Остановленный" цикл может быть запущен вновь, а "закрытый" цикл исчезает навсегда.

[Предостережение]Предостережение

Предыдущий пример является слишком упрощённым чтобы применяться в практических настройках. Требуется дополнительная информация относительно обработки правильного останова. Основной целью данного примера было просто представить большую часть важнейших функций и методов из asyncio. Дополнительная практическая информация по обработке останова представлена позднее в данной книге.

asyncio в Python представляет множество базовых механизмов вокруг основного цикла событий - и требует чтобы вы были осведомлены о таких вещах, как сам цикл событий и управление им на протяжении его времени жизни. Именно это является отличием, например, от Node.js, который также содержит некий цикл событий, но сохраняет его где- то вовне в скрытом виде. Однако, как только вы начнёте понемногу работать с asyncio, вы начнёте замечать, что ваш шаблон запуска и останова основного цикла событий не отходит страшно далеко от приведённого выше кода. А в оставшейся части данной книги мы рассмотрим более подробно некоторые нюансы относительно времени жизни цикла.

В приведённом выше примере я кое- что упустил. Самый последний элемент базовой функциональности, о котором вам следует знать, состоит в том как запускать блокирующие функции. Основной момент относительно кооперативной многозадачности состоит в том, что вам требуется все связанные с вводом/ выводом функции..., ну да, кооперировать, а это означает допуск переключения некоторого контекста обратно в данный цикл при помощи особого ключевого слова await. Большая часть имеющегося кода Python, доступного на текущий момент в диком виде, не делает этого, а вместо этого полагается на вас для запуска таких функций в потоках. До тех пор, пока не будет более широко распространённой поддержки функций async def, вы обнаружите, что применение таких библиотек с блокировкой неизбежно.

Для этого asyncio предоставляет некий API, который очень похож на API из пакета concurrent.futures. Данный пакет предоставляет некий ThreadPoolExecutor и какой- тоProcessPoolExecutor. По умолчанию он основан на потоке, но запросто заменяется на базирование на процессе. Я опустил это в своём предыдущем примере, так как это скрыло бы то описание, как подгоняются друг к другу все фундаментальные части. Теперь, когда они были обсуждены, мы можем взглянуть непосредственно на исполнителя.

Есть несколько причуд, о которых следует знать. Давайте рассмотрим пример кода:

 

Пример 3-2. Основной интерфейс исполнителя


# quickstart_exe.py
import time
import asyncio

async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

def blocking():(1)
    time.sleep(0.5)(2)
    print(f"{time.ctime()} Hello from a thread!")

loop = asyncio.get_event_loop()

loop.create_task(main())
loop.run_in_executor(None, blocking)(3)

loop.run_forever()

pending = asyncio.Task.all_tasks(loop=loop)(4)
group = asyncio.gather(*pending)
loop.run_until_complete(group)
loop.close()

Вывод:
$ python quickstart_exe.py
Sun Sep 17 14:17:38 2017 Hello!
Sun Sep 17 14:17:38 2017 Hello from a thread!
Sun Sep 17 14:17:39 2017 Goodbye!
 	   
  • (1) blocking() обычно вызывает внутри себяtime.sleep(), что привело бы к блокировке вашего основного потока и предотвращало бы исполнение вашего цикла событий. Это означает, что вам не следует делать такую функцию сопрограммой, но что ещё более серьёзно, вы не можете вызывать эту функцию из любого места в своём основном потоке, который является именно тем местом, в котором исполняется основной цикл asyncio. Мы решаем данную проблему запуская эту функцию в некотором исполнителе (executor).

  • (2) Безотносительно к данному разделу, но нечто, что следует держать на уме далее в этой книге: отметим, что значение времени сна с блокировкой (0.5 секунды) короче чем значение засыпания без блокировки (1 секунда) в вашей сопрограмме main(). Это делает данный код примера изящным и опрятным. В своём следующем разделе мы объясним что бы произошло если бы функции исполнителя переживали бы свои асинхронные эквиваленты во время последовательности выключения.

  • (3) await loop.run_in_executor(None, func)

    Это самый последний момент в нашем списке существенных функций asyncio, о которых следует знать. Иногда вам требуется запускать вещи в некотором отдельном потоке, или даже в отдельном процессе: данный метод применяется именно для этого. Здесь мы передаём свою блокирующую функцию для запуска в определённом по умолчанию исполнителе. [к сожалению, самым первым параметром run_in_executor() является сам экземпляр Executor, который следует применять и вы обязаны передать None чтобы использовать установленный по умолчанию. Всякий раз, как я применяю это, я ощущаю как мой параметр "исполнитель" плачет, будучи вынужденным именоваться ключевым словом аргумента с определённым по умолчанию значением None.]

    Отметим, что loop.run_in_executor() возвращает некое Future, что означает, что вы можете await {подождать} его при вызове в рамках функции другой сопрограммы.

  • (4) Продолжаем отмечать вслед за элементом 2: наш набор задач в pending не содержит некоего элемента для того вызова blocking(), который делается в run_in_executor(). Это будет справедливо для любого вызова, который возвращает некое Future вместо какой- то Task. Имеющаяся документация достаточно хорошо определяет возвращаемые типы, поэтому об этом несложно знать; всего лишь просто запомните, что all_tasks() на самом деле возвращает только Task, и никаких Future.

Теперь, когда вы ознакомились с наиболее существенными частями asyncio для потребностей разработки конечных приложений, наступило время объяснять нашу сферу и выстроить перечисленные API asyncio в некий вид иерархии. Это облегчит усвоение и понимание того как получать то что вам требуется из имеющейся документации, но не более того.

Основная башня Asyncio

Как мы уже отметили в Быстром начале, существует лишь небольшое число команд, которые вам следует знать чтобы быть способным применять asyncio в качестве разработчика конечных приложений. К несчастью, имеющаяся документация для asyncio предоставляет гигантское число различных API, причём они представлены в очень "однообразном" формате, из которого очень трудно определить какие вещи предназначаются для общего применения, а какие стороны предоставляются для разработчиков инструментальных средств.

Когда разработчики инструментальных средств просматривают ту же самую документацию, они ищут точки входа, к которым они могут подключать свои новые инструментальные средства (или новые сторонние библиотеки). В данном разделе мы взглянем на asyncio глазами какого- то разработчика инструментальных средств чтобы дать представление как они могут подходить к построению новой обладающей асинхронностью библиотекой. Надеемся, это поможет нам ещё чётче очертить те функции, о которых вам стоит заботиться в вашей собственной работе.

С этой точки зрения более полезно представлять себе модуль asyncio как отранжированный в виде некоей иерархии, вместо некоего плоского листа, причём всякий последующий уровень строится поверх спецификации предыдущего. К сожалению, это не совсем так, и я взял на себя некие вольности при выравнивании их в Таблице 3-1, но, надеюсь, это снабдит вас неким альтернативным представлением имеющегося API asyncio.

[Предостережение]Предостережение

Таблица 3-1, а также представляемая здесь названия и нумерация "Уровней" являются исключительно моими собственным изобретением, которое имеет целью добавить некую небольшую структуру в помощь пояснению API asyncio. Это моя выдумка! Искушённый читатель может выстраивать вещи иным образом, и это тоже хорошо!

Таблица 3-1. Свойства asyncio, выстраиваемые в виде некоторой иерархии. Для разработчиков конечных приложений наиболее важные уровни отображены жирно
Уровень Понятие Htfkbpfwbz

Уровень 9

Сетевой: потоки

StreamReader & StreamWriter

Уровень 8

Сетевой: TCP & UDP

Protocol

Уровень 7

Сетевой: транспорт

BaseTransport

Уровень 6

Инструменты

asyncio.Queue

Уровень 5

подпроцессы и потоки

run_in_executor(), asyncio.subprocess

Уровень 4

задачи

asyncio.Task

Уровень 3

свойства

asyncio.Future

Уровень 2

цикл событий

BaseEventLoop

Уровень 1 (основной)

сопрограммы

async def & await

На самом основополагающем уровне, Уровне 1, у нас имеются сопрограммы (coroutines), которые вы уже видели ранее в этой книге. Это самый нижний уровень, с которого следует начинать размышлять о разработке стороннего инструментального средства, и, что удивительно, он стал чем- то популярным не у какого- то одного, а у двух таких из доступных на сегодняшний день в мире асинхронных инфраструктур: Curio и Trio. Они обе полагаются только на естественные сопрограммы в Python и больше ни на что иное из библиотечного модуля asyncio.

Нашим следующим уровнем является собственно цикл событий. Сопрограммы бесполезны сами по себе: они ничего не могут делать без некоего цикла, в котором они запускаются (более того, с необходимостью, Curio и Trio реализуют свои собственные циклы событий). asyncio предоставляет как некое определение цикла, Abstract​ EventLoop, так и какую- то реализацию, BaseEventLoop.

Чёткое отделение описания от реализации делает возможным для сторонних разработчиков выполнять альтернативные реализации своих циклов событий и это уже произошло в проекте uvloop, который предоставляет намного более быструю реализацию цикла чем та, что имеется в стандартном библиотечном модуле asyncio. Что важно: uvloop просто "встраивается" в имеющуюся иерархию и подменяет только саму часть цикла в общем стеке. Такая возможность выполнять подобные вещи по выбору именно то, для чего разрабатывался API asyncio, причём с ясным разделением между всеми различными подвижными частями.

Уровни 3 и 4 привносят нам свойства и задачи, которые очень тесно взаимосвязаны; они разделяются только по той причине, что Task является неким подклассом Future, но их запросто можно рассматривать как находящиеся на одном и том же уровне. Некий экземпляр Future представляет какой- то вид происходящего действия, которое возвратит некий результат посредством уведомления в свой цикл событий, в то время как Task представляет некую сопрограмму, исполняемую в этом цикле событий. Краткая история такова: некое свойство является "осведомлённым о цикле", в то время как конкретная задача одновременно "осведомлена о цикле" и "знает о сопрограмме". Выступая в роли разработчика конечного продукта, вы будете преимущественно работать с задачами, нежели со свойствами, однако для некоторого разработчика инструментальных средств данная пропорция может быть иной в зависимости от имеющихся подробностей.

Уровень 5 предоставляет возможности для запуска, а также снабжения await работ, которые должны исполняться либо в отдельном потоке, либо даже в отдельном процессе.

Уровень 6 представляет дополнительные осведомлённые об асинхронности инструмента, такие как asyncio.Queue. Нам можно было разместить этот уровень после всех сетевых уровне, но я полагаю, что более чётко будет поместить все относящиеся к осведомлённым о сопрограммах API вначале, прежде чем мы взглянем на имеющиеся уровни ввода/ вывода. Те Queue, которые производятся asyncio очень похожи на API имеющихся Queue безопасных потоков в модуле queue, за исключением то8о, что наша версия asyncio требует наличия ключевого слова await для get() и put(). Вы не можете использовать queue.Queue непосредственно внутри сопрограмм, поскольку его get() заблокируют ваш основной поток {main}.

Наконец, у нас имеются уровни сетевого ввода/ вывода с 7 по 9. Для вас, как разработчика конечного продукта, большая часть подходящего вам для работы API это API "потоков" на уровне 9. Я представил этот API потоков на самом верхнем уровне абстракции в нашей башне. Непосредственно под ним расположен уровень "протоколов" (Уровень 8), является более тонко настраиваемым API по сравнению с "потоками"; вы можете применять "протоколы" во всех экземплярах, в которых вы можете применять уровень "потоков", но "потоки" проще. Наконец, вам скорее всего никогда не придётся работать непосредственно с транспортным уровнем (Уровень 7), если только вы не создаёте некое инструментальное средство для других, чтобы применять его и вам необходимо персонализировать настройку имеющихся обменов {Прим. пер.: например, вы пожелаете предоставить потоки на основе протокола, применяющего обмен RDMA для увеличения пропускной способности и снижения латентности, см. например, Рекомендации по разработке высокопроизводительных систем RDMA и Ceph поверх RDMA.}

Выводы

D Быстром начале мы рассмотрели абсолютно необходимый минимум, который следует знать чтобы начать работу с библиотекой asyncio. Теперь, после того как мы взглянули на собранную воедино библиотеку API asyncio в целом, я бы хотел повторно вернуться к короткому перечню свойств и повторно выделить те части, которые вам скорее всего следует изучить.

Для написания сетевых приложений вам следует сосредоточиться на изучении следующих наиболее важных уровней для применения библиотечного модуля asyncio;

  • Уровень 1: существенным является понимание того как писать функции async def и применять await для вызова и исполнения прочих сопрограмм.

  • Уровень 2: важным является знакомство с тем как выполнять запуск, останов и взаимодействовать с имеющимся циклом событий.

  • Уровень 5: для применения кода с блокировкой в вашем асинхронном приложении требуются исполнители, а к тому же текущая реальность состоит в том, что большинство сторонних библиотек пока ещё не совместимы с asyncio. Хорошим примером этого является ORM {Object Relational Mapper} библиотека базы данных SQLAlchemy, у которой в настоящее время нет никакой альтернативы для asyncio. {Прим. пер.: строго говоря, это не совсем так, см gino.}

  • Уровень 6: если вам требуется кормить данными одну или более длительно исполняемыми сопрограммами, самым лучшим способом для этого будет применение asyncio.Queue. Это в точности та же самая стратегия, которая применяется в queue.Queue для распределении данных между потоками. Версия asyncio Queue применяет те же самые API что и стандартная библиотека модуля queue, но при этом применяет сопрограммы вместо блокирующих методов, подобных get().

  • Уровень 9: API Потоков (Streams) предоставляет вам самый простой способ обработки взаимодействия с сокетом поверх сетевой среды, а также именно тем местом, с которого вам следует начинать создание прототипов идей для сетевых приложений. Вы можете обнаружить, что вам требуется более тонкое управление, и тогда вы имеете возможность переключиться на API Протоколов (Protocols), однако - для большинства проектов - обычно лучше делать вещи более простыми пока вы в точности не поймёте какую именно проблему вы пытаетесь решить.

Конечно, если вы применяете некую совместимую с asyncio библиотеку стороннего разработчика, которая обрабатывает всё взаимодействие с нужным вам сокетом, скажем, aiohttp, вам вовсе не требуется напрямую работать с asyncio. В этом случае вам следует в основном полагаться на предоставляемую этими библиотеками документацию.

Это подводит нас к завершению данного раздела. Библиотека asyncio пытается предоставлять достаточную функциональность как для разработчиков конечных приложений, так и для разработчиков инструментальных средств. К сожалению, это означает, что весь API asyncio может показаться слишком размашистым. Я надеюсь, что данный раздел смог снабдить вас сырой "дорожной картой" чтобы помочь вам уцепить только те части, которые вам требуются.

В наших последующих частях мы собираемся рассмотреть необходимые вам составляющие части из моего приведённого выше короткого списка более подробно.

[Совет]Совет

Сайт pythonsheets предоставляет некое более глубокое резюме (или "шпаргалку", если хотите) крупных фрагментов имеющегося API asyncio, где все концепции снабжены короткими отрывками исходного кода. Данная презентация является краткой, поэтому я не рекомендую её начинающим, но если вы опытны в Python и вы из тех людей, которым требуется только "ухватить" новую информацию о программировании в представленном коде, несомненно, это будет полезный вам ресурс.

Сопрограммы

Давайте начнём с самого начала: что такое сопрограмма?

Я собираюсь позволить вам быстро заглянуть под капот и увидеть некоторые части того механизма, который вы обычно не видите, даже когда применяете на протяжении повседневного программирования. Все приводимые далее примеры могут быть целиком воспроизведены в обычном интерпретаторе Python в интерактивном режиме и я призываю вас поработать с ними самостоятельно набирая их вручную, отслеживая их вывод и, возможно, экспериментируя различными путями с async и await.

[Предостережение]Предостережение

asyncio впервые был добавлен в Python 3.4, однако применяемый нами новый синтаксис для сопрограмм с использованием async def и await был добавлен только в Python 3.5. Как люди делают что- то с asyncio в Python 3.4? Они применяют генераторы (generator) очень специальными способами для действия таким образом, как если бы они были сопрограммами. В некоторых более старых базовых кодах вы обнаружите функции генераьора, декорированные @asyncio.coroutine и содержащими операторы yield from. Создаваемые с помощью async def сопрограммы теперь именуются как "натуральные", так как они построены на том же самом языке, что и сопрограммы и ни на чём ином. Данная книга целиком игнорирует более старые сопрограммы, основанные на генераторе.

Ключевое слово нового async def

Давайте начнём с самой простой из возможных вещей:

 

Пример 3-3. Самый первый сюрприз


>>> async def f():(1)
... return 123
...
>>> type(f)(2)
<class 'function'>
>>> import inspect(3)
>>> inspect.iscoroutinefunction(f)(4)
True
 	   
  • (1) loop = asyncio.get_event_loop()

    Это самый простой способ объявления некоторой сопрограммы: она выглядит как какая- то обычная функция, за исключением того что начинается с определённых ключевых слов async def.

  • (2) Сюрприз! В точности тип f не является coroutine, а всего лишь некая ординарная функция. Это является обычной практикой ссылаться на такие функции "async def" функции как на "сопрограммы", даже хотя - строго говоря - они рассматриваются Python как являющиеся функциями сопрограмм. Такое поведение в точности идентично тому, как функции генератора работают в Python:

    
    >>> def g():
    ... yield 123
    ...
    >>> type(g)
    <class 'function'>
    >>> gen = g()
    >>> type(gen)
    <class 'generator'>
    		

    Даже хотя g порой неверно именуется как "генератор", она остаётся некоей функцией и возвращается собственно генератор только когда эта функция вычисляется (evaluated). Функции сопрограмм работают в точности так же: вам требуется вызвать эту функцию async def для получения необходимой сопрограммы.

  • (3) Наш модуль inspect из имеющейся стандартной библиотеки может предоставить намного лучшую ретроспективу возможностей нежели использованная встроенная функция type().

  • (4) Имеется некая функция iscoroutinefunction(), которая позволяет вам делать отличие некоей обычной функции от функции сопрограммы.

Возвращаясь к нашей функции async def f(), что же происходит когда мы её вызываем?

 

Пример 3-4. Некая функция async def возвращает... сопрограмму!


>>> coro = f()
>>> type(coro)
<class 'coroutine'>
>>> inspect.iscoroutine(coro)
True
 	   

Итак, теперь вопрос звучит следующим образом: что же такое в точности "сопрограмма"? Сопрограммы очень походи на генераторы. В самом деле, прежде чем с помощью ключевых слов async def и await в Python 3.5 были введены натуральные сопрограммы, они уже были доступны к применению в библиотеке Asyncio Python 3.4 с использованием обычных генераторов при особых декораторах. [Более того, именно таким образом библиотеки открытого исходного кода, такие как Twisted и Tornado выставляли поддержку асинхронности в недалёком прошлом.] Не является неожиданным, что наши новые функции async def (и те сопрограммы, которые они возвращают) ведут себя аналогично генераторам.

Мы можем ещё немного поиграть с сопрограммами, чтобы посмотреть как Python их применяет. Что ещё более важно, мы желаем увидеть как Python способен "переключать " исполнение между сопрограммами. Давайте вначале взглянем на то как может быть получено само значение return.

Когда некая сопрограммы выполняет возврат, то что происходит на самом деле, так это то, что возбуждается исключительная ситуация StopIteration. Вот следующий пример, который продолжает тот же самый сеанс, что и наши предыдущие примеры и поясняет это:


>>> async def f():
... return 123
>>> coro = f()
>>> try:
... coro.send(None)(1)
... except StopIteration as e:
... print('The answer was:', e.value)(2)
...
The answer was: 123
		
  • (1) Некая сопрограмма инициализацируется "отправкой" ей None. Рассматривая изнутри, именно это собирается делать имеющийся цикл событий с вашими бесценными сопрограммами. Вам никогда не придётся делать это вручную. Все создаваемые вами сопрограммы будут исполняться либо с loop.create_task(coro), или при помощи await coro. И именно этот loop, который делает данный .send(None) находится за сценой.

  • (2) Когда ваша сопрограмма выполняет возврат, возбуждается некий особый вид прерывания, именуемый StopIteration. Отметим, что мы можем получить доступ к самому возвращаемому значению данной сопрограммы через значение атрибута value самой этой исключительной ситуации. Опять же, вам нет нужды знать что это работает именно так: с вашей точки зрения функции async def просто возвращают некое значение при помощи соответствующего оператора return, в точности как самая обычная функция.

Эти два момента, т.е. send() и StopIteration определяют соответственные начало и окончание самого исполнения сопрограммы. До сих пор это выглядело просто как в действительности запутанный способ запуска некоей функции, но всё хорошо: наш цикл событий будет отвечать за продвижение сопрограмм по этим внутренним особенностям нижнего уровня. Со своей точки зрения вы просто планируете сопрограммы к исполнению в имеющемся цикле, а они исполняются сверху вниз почти как обычные функции. [7]

Нашим следующим шагом будет рассмотреть как само исполнение такой сопрограммы может быть подвешено.

Ключевое слово нового await

Данное новое ключевое слово, await, всегда берёт некий параметр и будет принимать единственную вещь, называемую ожидаемой (awaitable), которая определяется как одна из следующих (исключительно!):

  • Некую сопрограмму (т.е. конкретный результат какой- то вызываемой функции async def [Также допустимо некое наследование, сопрограмма на основе генератора, которая является некоторой функцией генератора, которая декорирована @types.coroutine и применяет внутри себя ключевое слово yield from для подвешивания. Мы собираемся в данной книге полностью игнорировать наследуемые сопрограммы. Забудьте про них!]

  • Любой объект, реализующий особый метод __await__(). Этот специализированный метод обязан возвращать некий итератор.

Второй из доступных случаев ожиданий выходит за рамки данной книги (он вам никогда не понадобится в повседневном программировании asyncio), тем не менее, самый первый пример достаточно очевиден:

 

Пример 3-5. Применение await в какой- то сопрограмме


async def f():
    await asyncio.sleep(1.0)
    return 123

async def main():
    result = await f()(1)
    return result
 	   
  • (1) Вызов f производит некую сопрограмму; это означает, что мы позволяем ей выполнять await. Получаемое значение такой переменной result будет равняться 123 после завершения f().

Прежде чем мы завершим данный раздел и приступим к нашему циклу событий, полезно взглянуть на то, как могут подаваться исключительные ситуации, которые наиболее часто применяются для завершения: когда вы вызываете task.​cancel(), ваш цикл событий внутренне применит coro.throw() для возбуждения asyncio.CancelledError внутри вашей сопрограммы:

 

Пример 3-6. Применение coro.throw() для инъекции исключений в некую сопрограмму


>>> coro = f()(1)
>>> coro.send(None)
>>> coro.throw(Exception, 'blah')(2)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 2, in f
Exception: blah
blah
 	   
  • (1) Как и ранее, функцией сопрограммы f() создаётся некая новая сопрограмма.

  • (2) Вместо выполнения иного send(), мы делаем вызов throw() и предоставляем некий класс исключений, а также какое- то значение. Это возбуждает некую исключительную ситуацию внутри нашей сопрограммы, в точке await.

Метод throw() применяется (внутренним образом в asyncio) для прекращения задачи, что мы также достаточно просто можем продемонстрировать. Мы даже собираемся забежать вперёд и обработать данное завершение внутри некоторой новой сопрограммы, которая и обработает это:

 

Пример 3-7. Прекращение сопрограммы при помощи CancelledError


>>> import asyncio
>>> async def f():
...     try:
...         while True: await asyncio.sleep(0)
...     except asyncio.CancelledError:(1)
...         print('I was cancelled!')(2)
...     else:
...         return 111
>>> coro = f()
>>> coro.send(None)
>>> coro.send(None)
>>> coro.throw(asyncio.CancelledError)(3)
I was cancelled!(4)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration5)
 	   
  • (1) Наша функция сопрограммы теперь обрабатывает некую исключительную ситуацию: фактически, это особый тип исключений, применяемый на протяжении всей библиотеки asyncio для прекращения задачи: asyncio.CancelledError. Заметим, что данная исключительная ситуация внедряется в сопрограмму извне, т.е. самим циклом событий, который мы всё ещё пока имитируем вручную при помощи команд send() и throw(). В реальном коде, который мы увидим позднее, при прекращении задач, CancelledError возбуждается изнутри самой обёртывающей задачу сопрограммы, как это показано выше.

  • (2) Некое простое сообщение чтобы сказать что мы выполнили прекращение. Отметим, что в результате обработки данной исключительной ситуации она более не распространяется и наша сопрограмма выполнит return.

  • (3) Здесь мы выполняем throw() своей исключительной ситуации CancelledError.

  • (4) Как и ожидалось, мы видим выведенным наше завершающее сообщение.

  • (5) А наша сопрограмма выходит обычным способом. (Повторный вызов такой исключительной ситуации StopIteration является обычным способом, при помощи которого сопрограмма выполняет выход.

Просто чтобы указать, что прекращение задачи не является ничем иным как обычным возбуждением исключительной ситуации (и её обработкой), вот некий пример, в котором поглощаем прекращение и переходим к некоторой иной сопрограмме:


>>> async def f():
...     try:
...         while True: await asyncio.sleep(0)
...     except asyncio.CancelledError:
...         print('Nope!')
...         while True: await asyncio.sleep(0)(6)
...     else:
...         return 111
>>> coro = f()
>>> coro.send(None)
>>> coro.throw(asyncio.CancelledError)(7)
Nope!
>>> coro.send(None)(8)
 	   
  • (6) Вместо того чтобы печатать некое сообщение, что произойдёт если после прекращения мы просто вернёмся обратно подождать иное ожидание?

  • (7) Неудивительно, наша внешняя сопрограмма продолжает существовать и она немедленно подвешивается вновь внутри данной новой сопрограммы.

  • (8) Всё выполняется должным образом и наша сопрограмма продолжает быть вывешенной и ожидаемо возобновляется.

Конечно, это должно сопровождаться словами, что вам никогда не следует делать этого в действительности! Если ваша сопрограмма получает некий завершающий сигнал, это очевидное указание выполнения только необходимой очистки и выхода. Не игнорируйте это просто так.

На данный момент мы, естественно, очень устали выступать в роли некоего цикла событий вручную, выполняя все вызовы .send(None), поэтому вместо этого мы привнесём соответствующий цикл, предоставляемый asyncio и очистим предыдущий пример надлежащим образом:


>>> async def f():
...     await asyncio.sleep(0)
...     return 111
>>> loop = asyncio.get_event_loop()(9)
>>> coro = f()
>>> loop.run_until_complete(coro)(10)
111
 	   
  • (9) Получаем некий цикл.

  • (10) Запускаем завершение данной сопрограммы. Внутренним образом это выполняет все такие вызовы метода .send(None) для нас и определяет завершение наших сопрограмм при помощи исключительной ситуации StopIteration, которая также содержит наше возвращаемое значение.

Цикл событий

В своём предыдущем разделе мы показали как методы send() и throw() могут взаимодействовать с некоторой сопрограммой, но это было сделано только для понимания того как структурированы сами по себе сопрограммы. Именно цикл событий в asyncio обрабатывает все переключения между сопрограммами, а также перехватывает необходимые исключительные ситуации StopIteration, и многое другое, например, выполняет ожидание в сокетах и файловых дескрипторах на предмет событий.

Вы можете получить некий цикл событий посредством вызова get_event_loop(), которая является весьма интересной функцией:

 

Пример 3-8. Получение при каждом обращении одного и того же цикла событий


>>> loop = asyncio.get_event_loop()
>>> loop2 = asyncio.get_event_loop()
>>> loop is loop2(1)
True
 	   
  • (1) Оба идентификатора, и loop, и loop2, ссылаются на один и тот же экземпляр.

Это означает, что если вы внутри некоторой функции сопрограммы, а у вас имеется потребность в доступе к своему экземпляру цикла, будет достаточно вызвать get_event_loop() для его получения. Вам не требуется передавать какой- то параметр loop в явном виде во все свои функции. Эта ситуация меняется коренным образом если вы разработчик инструментального средства: раз так, было бы лучше спроектировать ваши функции на приём некоторого параметра loop, просто на тот случай, когда ваши пользователи делают нечто необычное с политиками цикла событий. Политики выходят за рамки данной книги, и мы больше не будем обсуждать их.

[Совет]Совет

Метод get_event_loop() работает только в пределах того же самого потока: на самом деле, get_event_loop() выдаст отказ в случае вызова внутри какого- то нового потока, если вы только не создадите намеренно некий новый цикл при помощи new_event_loop() и установите такой новый экземпляр в качестве "основного" цикла для этого потока через вызов set_event_loop(). Большинству из нас никогда не понадобится (и не захочется!) иметь некий отдельный экземпляр цикла запущенным в каом- то отдельном потоке. В первую очередь это касается практически всего асинхронного программирования.

Давайте исследуем некий пример: рассмотрим некую функцию сопрограммы, внутри которой создаются некие дополнительные задачи, причём без ожидания:


async def f():
    # Создадим какие- то задачи!
    loop = asyncio.get_event_loop()
    for i in range():
    loop.create_task(<some other coro>)
 	   

В этом примере замысел состоит в запуске полностью новых задач внутри данной сопрограммы. Без выполнения их ожидания, они запускаются независимо от своего контекста внутри функции сопрограммы f(). На самом деле, f() осуществит выход до того как завершатся запускаемые ею задачи.

Для ситуации, подобной приводимой, вам порой следует посмотреть код, в котором переменная значения цикла передаётся в качестве какого- то параметра, просто чтобы сделать её доступной для возможности вызова соответствующего метода create_task(). Определённой альтернативой является применение неудачно именованного asyncio.ensure_future(), который делает всё то же самое что и create_task(), но не требует некоторой переменной локального цикла:


async def f():
    # Создадим какие- то задачи!
    for i in range():
        asyncio.ensure_future(<some other coro>)
 	   

Если вы пожелаете, вы можете создать свою собственную вспомогательную функцию, которая также не требует некоего параметра loop, но имеет лучшее название:


def create_task(coro):
    return asyncio.get_event_loop().create_task(coro)

async def f():
    # Создадим какие- то задачи!
    for i in range():
        create_task(<some other coro>)
 	   

Единственная разница между loop.create_task() и asyncio​.ensure_future() состоит в заголовке и введении в заблуждение вновь прибывших. Мы поясним эти различия в нашем следующем разделе.

Задачи и фьючерсы

В своём предыдущем разделе мы обсудили сопрограммы и как они должны запускаться в некотором цикле чтобы они стали полезными. Здесь мы кратко поговорим об API Task и Future. То, с чем вы будете работать наиболее часто, это Task, поскольку большинство вашей работы будет содержать выполнение сопрограмм при помощи метода loop.create_task(), точно так же, как это было установлено ранее в разделе Быстрое начало. Класс Future на самом деле является суперклассом Task и предоставляет всю функциональность для работы с основным циклом.

Проще всего это воображать себе так: Future (Фьючерс) представляет состояние завершения некоторого действия и управляется самим циклом, в то время как Task в точности то же самое и то, где специфическим "действием" является некая сопрограмма; возможно, одна из созданных вами при помощи async def функция плюс loop.create_task().

Класс Future представляет некое состояние чего- то, что взаимодействует с циклом. Это описание слишком расплывчатое чтобы быть полезным, поэтому представляйте себе вместо этого некий экземпляр Future как такое: это именно то, что выступает переключателем для состояния завершения. Когда некий экземпляр Future создан, такой переключатель "ещё пока не завершён"; однако через некоторое время спустя он завершится. На самом деле некий экземпляр Future имеет метод с названием done():


>>> from asyncio import Future
>>> f = Future()
>>> f.done()
False
		

Некий экземпляр Future также может:

  • иметь некий набор значений "результата" (.set_result(value) и .result() для его получения)

  • завершаться с помощью .cancel() (и выполнять проверку завершения посредством .cancelled())

  • иметь дополнительные функции обратного вызова, добавляющих то, что будет запускаться по завершению данного фьючерса.

Даже несмотря на то, что Tasks более распространены, вы не можете полностью избежать Future: например, запуск некоей функции в каком- то исполнителе возвратит экземпляр Future, а не Tasks. {Прим. пер.: как определяет словарь, фьючерс- программная конструкция, указывающая на то, что результат некоего действия будет использоваться в программе позже, но само вычисление может планироваться системой в любой произвольный момент времени.} Давайте быстро взглянем на код примера чтобы получить представление о том, как работать с неким экземпляром Future напрямую:


>>> import asyncio
>>> async def main(f: asyncio.Future):(1)
...     await asyncio.sleep(1)
...     f.set_result('I have finished.')(2)
>>> loop = asyncio.get_event_loop()
>>> fut = asyncio.Future()(3)
>>> print(fut.done())(4)
False
>>> loop.create_task(main(fut))(5)
<Task pending coro=<main() running at <ast>:4>>
>>> loop.run_until_complete(fut)(6)
'I have finished.'
>>> print(fut.done())
True
>>> print(fut.result())(7)
I have finished.
		
  • (1) Создаём некий образей основной функции. Это даёт нам нечто для запуска, немного выжидаем, а затем устанавливаем какой- то результат в этом Future, f.

  • (2) Устанавливаем необходимый результат.

  • (3) Вручную создаём некий экземпляр фьючерса. Отметим, что этот экземпляр (по умолчанию) связан с нашим loop, однако он не подключён, и не будет в дальнейшем подключаться к какой бы то ни было сопрограмме (то, для чего и создаются задачи).

  • (4) Прежде чем что- то делать, проверяем что данный фьючерс ещё не сделан.

  • (5) Планируем необходимую сопрограмму main(), передаём данный фьючерс. Помните, всё что делает сопрограмма main() это то, что она выполняет ожидание, а затем переключает необходимый экземпляр фьючерса. (Отметим, что наша сопрограмма main() ещё пока не запущена: сопрограммы запускаются только после запуска их цикла.)

  • (6) run_until_complete() Future Task [В этом месте в документации имеется ошибка: Подпись выдаётся как AbstractEventLoop​ .run_until_complete(future), но на самом деле должно быть AbstractEventLoop​ .run_until_complete(coro_or_future), так как применяется то же самое правило.] Теперь, когда основной цикл запущен, наша сопрограмма main() начинает исполняться.

  • (7)

    В конце концов, когда результат фьючерса установлен, он завершается. После завершения можно осуществить доступ к полученному результату.

Конечно, маловероятно что вы будете работать с Future напрямую, как мы показали это выше. Данный пример кода приводится исключительно для обучения. Большая часть ваших контактов с asyncio будет происходить через экземпляры Task.

Один из самых последних примеров, чтобы доказать что некая Task на самом деле какое- то небольшое декорирование в Future: мы можем повторить точно тот же самый пример что и выше, но вместо этого воспользовавшись Task:


>>> import asyncio
>>> async def main(f: asyncio.Future):
...     await asyncio.sleep(1)
...     f.set_result('I have finished.')
>>> loop = asyncio.get_event_loop()
>>> fut = asyncio.Task(asyncio.sleep(1_000_000))(1)
>>> print(fut.done())
False
>>> loop.create_task(main(fut))
<Task pending coro=<main() running at <ast>:4>>
>>> loop.run_until_complete(fut)
'I have finished.'
>>> print(fut.done())
True
>>> print(fut.result())
I have finished.
		
  • (1) Одно единственное отличие: некий экземпляр Task вместо Future. Конечно, соответствующий API Task требует от нас предоставления какой- то сопрограммы, поэтому мы просто применяем sleep(), так как это подходит нам, и мы применяем некую комично большую продолжительность чтобы подчеркнуть некую потенциальную догадку: данная Task завершится после установки некоторого результата, вне зависимости от того будет ли завершена или нет лежащая в основе сопрограмма.

Данный пример работает в точности как это было до сих пор и показывает что даже некая задача может быть завершена ранее вручную путём установки в ней результата, в точности так же, как некий экземпляр Future может осуществить завершение. И опять- таки, это не очень востребовано на практике - вместо этого вы обычно выполняете ожидание завершения задачи или её прекращения в явном виде - но, надеюсь, это было полезно для понимания того что задачи и фьючерсы очень близки к тому чтобы быть синонимами: единственное отличие состоит в том, что задача ассоциируется с какой- то сопрограммой.

Создать задачу? Обеспечить фьючерс? Прими решение!

В Быстром начале мы сказали, что верный путь для запуска сопрограмм состоит в использовании loop.create_task(coro). Это возвращает нас к тому, что этого же можно достичь другой функцией уровня модуля: asyncio.ensure_future().

В процессе исследований для данной книги я убедился, что данный метод asyncio.ensure_future() несёт ответственность за большую часть распространённого недоразумения относительно самой библиотеки asyncio. Основная часть имеющегося API является достаточно ясной, однако имеется пара в которых можно споткнуться при обучении, и именно это один из них. Когда вы сталкиваетесь с ensure_future(), ваше сознание очень усердно работает над тем как интегрировать его в умственное построение модели того как следует применять asyncio - и терпит неудачу!

Основная проблема с ensure_future() лучше всего высвечивается этим имеющим теперь дурную репутацию пояснением из документации asyncio самого Python:

asyncio.ensure_future(coro_or_future, *, loop=None)

Планирование собственно исполнения некоторого объекта сопрограммы: его обёртывание в фьючерсе. Возвращает некий объект Task. Если в качестве аргумента выступает Future, он возвращается непосредственно.

-- Документация Python 3.6 {Прим. пер.: см. изменения в Python 3.7}

Что-а!? К счастью, кот более ясное описание ensure_future():

  • Если вы передаёте некую сопрограмму, будет произведён некий экземпляр Task (и ваша сопрограмма будет запланирована на исполнение в соответствующем цикле событий). Это идентично вызову loop.create_task(coro) и возврату получаемого нового экземпляра Task. {Прим. пер., Python 3.7: для проверки применяется вначале isfuture(), а затем iscoroutine() и возвращается обёрнутый объект Task в случае сопрограммы на входе или подобный Future объект без изменений - см. следующий пункт! Если же объект Task пребывает в состоянии ожидания - что проверяется при помощи inspect.isawaitable() - возвращается некий объект Task, который выполняет ожидание полученного в качестве аргумента объекта.}

  • Если вы пробрасываете некий экземпляр Future (который содержит экземпляры Task, так как Task является субклассом Future), вы получаете возвращаемой ту же самую вещь, неизменной! Да, на самом деле!

Давайте подробнее рассмотрим как это работает:


import asyncio

async def f():(1)
    pass

coro = f()(2)
loop = asyncio.get_event_loop()(3)

task = loop.create_task(coro)(4)
assert isinstance(task, asyncio.Task)(5)

new_task = asyncio.ensure_future(coro)(6)
assert isinstance(new_task, asyncio.Task)

mystery_meat = asyncio.ensure_future(task(7)
assert mystery_meat is task(8)
 	   
  • (1) Простая функция сопрограммы без действий. Нам просто требуется нечто чтобы создать какую- то сопрограмму.

  • (2) Здесь мы заставляем данный объект сопрограммы вызывать свою функцию напрямую. Ваш код будет редко делать такое, но я хочу сделать это явном виде (через несколько строк), чтобы мы передавали некий объект сопрограммы в каждый из create_task и ensure_future().

  • (3) Получаем необходимый цикл.

  • (4) С самого начала мы применяем loop.create_task() для планирования своей сопрограммы в нашем цикле и мы получаем некий новый экземпляр Task обратно.

  • (5) Здесь мы проверяем полученный тип. До сих пор ничего примечательного.

  • (6) Здесь мы показываем как asyncio.ensure_future() может применяться для выполнения токо же самого действия, которое совершает create_task: мы передаём некую сопрограмму и получаем обратно какой- то экземпляр Task (а наша сопрограмма оказывается запланированной для исполнения в основном цикле)! Если вы передаёте некую сопрограмму, не имеется разницы между loop.create_task() и asyncio.ensure_future().

  • (7) Но что произойдёт если мы передадим некий экземпляр Task чтобы ensure_future()…? Отметим, что мы передаём уже созданный экземпляр задачи, которая была создана loop.create_task() на шаге 4.

  • (8) Мы получаем обратно в точности тот же самый экземпляр Task что мы и передали: он передаётся в неизменном виде. {Прим. пер.: только если эта задача не находится в ожидании - см нашу сноску выше с цитированием документации Python 3.7 - в этом случае возвращается новая Task, ожидающая полученный в качестве аргумента объект в состоянии ожидания.}

Итак: что именно является непосредственным пунктом передачи экземпляров Future? И зачем делать два разных дела в одной функции? Ответ состоит в том, что данная функция, ensure_future(), предназначается для применения авторами инструментальных средств для предоставления API разработчиком конечных приложений и которая может обрабатывать оба вида параметров {даже три!}. Не верите мне? Вот что следует от самого Благословенного Диктатора Всея Жизни (BDFL):

Основной момент ensure_future() состоит в том, что если у вас есть нечто, что может быть либо сопрограммой, либо неким Future (и этот последний содержит некую Task, поскольку она является субклассом Future), и вы желаете иметь возможность вызывать некий метод в нём, который определён только в Future (скорее всего, единственным полезным примером является cancel()). Когда это уже Future (или Task) не делается ничего; когда это некая сопрограмма, она обёртывается некоторой Task.

Если вы знаете что у вас имеется некая сопрограмма и вы желаете её запланировать, более верным API для применения будет create_task(). Единственный случай, в котором вам следует вызывать ensure_future() это когда вы предоставляете некий API (как это делает большая часть владельцев API), который принимает либо некую сопрограмму, либо какой- то Future и вам требуется делать нечто с этим, и что требует чтобы у вас имелся Future[10].

-- Гвидо ван Россум, github.com/python/asyncio/issues/477

Итак, суммируем: наш API asyncio.ensure_future() является некоторой вспомогательной функцией предназначенной для разработчиков инструментальных средств. Она проще всего поясняется по аналогии с более распространённым видом функции: если у вас есть за плечами пять лет опыта программирования, вы вы уже могли видеть функции подобные приводимой ниже listify():


def listify(x: Any) ­> List:
    """ Try hard to convert x into a list """
    if isinstance(x, (str, bytes)):
        return [x]

    try:
        return [_ for _ in x]
    except TypeError:
        return [x]
 	   

Эта функция пытается преобразовать полученный аргумент в некий перечень, вне зависимости от того что она получает. Такой вид функция часто применяется в API и инструментальных средствах для принуждения вводов в некий известный тип, что упрощает последующий код, так как вы знаете что этот параметр (выводимый из listify()) всегда будет неким списком.

Если я переименую свою функцию listify() в ensure_list(), то вы должны начать замечать определённую параллель с asyncio.ensure_future(), не так ли? Он всегда пытается принудительно преобразовывать получаемый аргумент в некий тип Future. Эта утилитарная функция чтобы делать жизнь более простой для разработчиков инструментальных средств, а не для разработчиков окончательных продуктов, коими являемся вы и я.

И на самом деле, даже стандартная библиотека asyncio сама по себе применяет ensure_future() в точности для этой цели. Когда вы в следующий раз будете просматривать API, вы повсеместно будете видеть некий параметр функции, описываемый как coro_or_future, и именно его, скорее всего, ensure_future() будет применять внутри себя для принудительного преобразования данного параметра. Например, имеющаяся функция asyncio.gather() имеет такуб отличительную особенность:


asyncio.gather(*coros_or_futures, loop=None, ...)
 	   

Внутри себя gather() применяет using ensure_future() для принудительного приведения типа и это делает для вас возможным передавать ему сопрограммы, задачи или фьючерсы.

Основным ключевым моментом здесь является: поскольку вы некий разработчик конечного приложения, вам никогда не приходится применять asyncio.ensure_future. Этот инструмент больше предназначен для разработчиков инструментальных средств. Если вам требуется спланировать некую сопрограмму в своём цикле событий, просто делайте это напрямую с помощью loop.create_task().

К сожалению, это не самый конец данной истории. Прямо сейчас некоторые разработчики конечных продуктов предпочитают применять asyncio.ensure_future() вместо loop.create_task() по одной простой и очень прагматичной причине: так меньше работы! Чтобы вызвать create_task() вам либо требуется некий экземпляр loop, доступный в вашем локальном пространстве имён, в котором вы пишите код, или же вам необходим некий дополнительный вызов asyncio.get_event_loop() для получения соответствующей ссылки на loop; в противоположность этому, ensure_future() может вызываться как есть.

В приведённом выше примере у нас имелось три функции сопрограмм внутри которых будут запущены некие фоновые сопрограммы. Основная цель данного упражнения состояла в сопоставлении эстетических преимуществ применения create_task() по отношению к ensure_future() при планировании сопрограмм в основном цикле.

 

Пример 3-9. Сопоставление применения create_task() и ensure_future()


import asyncio

async def background_job():
    pass

async def option_A(loop):(1)
    loop.create_task(background_job())

async def option_B():(2)
    asyncio.ensure_future(background_job())

async def option_C():(3)
    loop = asyncio.get_event_loop()
    loop.create_task(background_job())

loop = asyncio.get_event_loop()

loop.create_task(option_A(loop))(1)
loop.create_task(option_B)(2)
loop.create_task(option_C)(3)
 	   
  • (1) В этом первом варианте, после создания необходимого цикла обычным образом мы планируем ту сопрограмму, которая возвращается из option_A(). Внутри создаётся некая новая задача для данного фонового задания. Поскольку мы не ожидаем данную фоновую задачу, наша сопрограмма option_A() выполнит выход; однако это не самое интересное здесь. Чтобы запланировать данную фоновую задачу при помощи вызова create_task(), было необходимо иметь доступным соответствующий объект loop. В данном случае мы передаём его когда создавалась сопрограмма option_A().

  • (2) В нашем втором случае мы уже располагаем некоторой спланированной задачей, однако не требовалось передавать соответствующий экземпляр loop, так как наша функция ensure_future() доступна непосредственно в самом модуле asyncio. И это существенный момент: некоторые люди применяют ensure_future() не из- за её способности принудительного приведения типов к экземплярам Future, а вместо того чтобы не пришлось передавать идентификаторы loop.

  • (3) И наш окончательный вариант, в нём также требуется передавать некий экземпляр цикла. В этом случае мы получаем текущий цикл событий [в этом случае "текущий" означает тот экземпляр цикла событий, который связан с данным действующим потоком] путём вызова get_event_loop(), а затем мы вновь способны делать "все правильные вещи" и вызвать loop.create_task(). Это аналогично тому что делает внутри себя ensure_future().

Что в действительности следует иметь доступным, так это вспомогательную функцию asyncio.create_task​ (coro), которая делает в точности то же самое что и наш "Вариант C"? приведённый выше, но предопределённой в имеющейся стандартной библиотеке. Это низложит удобство ensure_future() и сохранит ясность loop.create_task(). Эта потребность не замечалась командой разработки Python, но я счастлив сообщить, что asyncio.create_task() в конечном счёте будет доступна в Python 3.7!

В своих последующих разделах мы будем возвращаться к свойствам уровня языка программирования, начиная с диспетчеров асинхронного контекста.

Диспетчеры контекста Async: async with

Сопровождение контекста диспетчеров оказывается исключительно удобным. Она действительно имеет смысл, так как многие варианты требуют чтобы сетевые ресурсы - например, соединения - должны открываться и закрываться в определённой надлежащим образом области.

Основным ключом к пониманию async with является осознание того, что определённая операция некоторого диспетчера контекста управляется вызовами метода; с последующим рассмотрением соображения: а что если бы такие методы были бы функциями сопрограмм? И на самом деле, это в точности так и работает. Вот псевдокод, демонстрирующий всё это:

 

Пример 3-10. Диспетчер контекста Async


class Connection:
    def __init__(self, host, port):
        self.host = host
        self.port = port
    async def __aenter__(self):(1)
        self.conn = await get_conn(self.host, self.port)
        return conn
    async def __aexit__(self, exc_type, exc, tb):(2)
        await self.conn.close()

async with Connection('localhost', 9001) as conn:
    <do stuff with conn>
 	   
  • (1) Вместо специального метода __enter__() для синхронизации диспетчеров контекста применяется особый новый метод __aenter__().

  • (2) Более того, вместо __exit__() используется __aexit__(). Все прочие параметры идентичны параметрам __exit__() и заполняются если возбуждается некая исключительная ситуация в самом теле диспетчера контекста.

[Предостережение]Предостережение

просто потому что вы должны применять asyncio в своей программе, это не означает, что все ваши диспетчеры контекста обязаны синхронизироваться именно так! Такой порядок полезен только если вам требуется выполнять await чего- то внутри своих методов enter и exit. Если нет никаого кода ввода/ вывода с блокировкой, просто применяйте диспетчеры контекста.

А теперь - только между нами - мне не сильно нравится такой стиль применения диспетчера контекста в явном виде, в то время как имеется великолепный декоратор @contextmanager в соответствующем модуле contextlib стандартной библиотеки! Как вы можете догадаться, имеется и некая асинхронная версия, @asynccontextmanager, но, к сожалению, она доступна только начиная с Python 3.7, который ещё не был доступен на момент написания данной книги {Прим. пер.: На момент перевода, всё уже хорошо, Python 3.7 доступен.} Тем не менее, достаточно просто скомпилировать Python из исходного кода и в нашем следующем разделе покажем как @asynccontextmanager будет работать в Python 3.7.

Вариант contextlib

Он аналогичен декоратору @contextmanager из соответствующего модуля contextlib стандартной библиотеки. Для краткого повтора, давайте вначале рассмотрим соответствующий вариант с блокировкой:

 

Пример 3-11. Вариант с блокировкой


from contextlib import contextmanager

@contextmanager(1)
def web_page(url):
    data = download_webpage(url)(2)
    yield data
    update_stats(url)(3)

with web_page('google.com') as data:(4)
    process(data)(5)
 	   
  • (1) Соответствующий декоратор contextmanager преобразовывает некую функцию генератора в какой- то диспетчер контекста.

  • (2) Этот вызов функции, (который я изготовил специально для данного примера) выглядит подозрительно похоже на конкретный вид вещей, которые желают применять некий сетевой интерфейс, который на много порядков величины медленнее "обычного" кода, ограниченного ЦПУ. Такой диспетчер контекста должен применяться в некотором выделенном потоке, в противном случае вся программа будет приостановлена в ожидании данных.

  • (3) Представим что мы обновляем какие- то статистические данные всякий раз, когда мы обрабатываем данные из некоторого URL, например, общее число раз, которое URL был выгружен. Опять- же с точки зрения одновременного исполнения, нам придётся знать будет ли эта функция вовлечена в ввод/ вывод внутренним образом, например подобно записи в некую базу данных поверх сетевой среды. Если это так, update_stats() также является неким блокирующим вызовом.

  • (4) Здесь применяется наш диспетчер контекста. Особенно отметим как этот сетевой вызов (для download_webpage()) скрывается внутри соответствующей конструкции нашего диспетчера контекста.

  • (5) Данный вызов функции, process(), также может быть блокирующим. Нам придётся рассмотреть что делает данная функция. Чтобы предоставить вам общий взгляд на различные подлежащие рассмотрению вопросы, которые служат для принятия решения будет ли данный вызов функции выполнять "блокировку", либо же он будет "неблокирующим" перечислим те, которые могут ими служить:

    • безобидные без блокировки (быстрые и ограниченный ЦПУ)

    • с мягкой блокировкой (быстрой и ограниченной вводом/ выводом, возможно, нечто подобное быстрому доступу к диску вместо сетевого ввода/ вывода)

    • блокирующие (медленный, ограниченные вводом/ выводом)

    • демонические (медленные, ограниченные ЦПУ)

    С целью упрощения данного примера, давайте предположим, что данный вызов process() является быстрой, ограниченной ЦПУ операцией и, следовательно, не имеющей блокировки.

Давайте сопоставим в точности тот же самый пример, но на этот раз и применением новой вспомогательной функции, осведомлённой об асинхронности, привносимой Python 3.7:

 

Пример 3-12. Вариант без блокировки


from contextlib import asynccontextmanager

@asynccontextmanage(1)
async def web_page(url):(2)
    data = await download_webpage(url)(3)
    yield data(4)
    await update_stats(url)(5)

async with web_page('google.com') as data:(6)
    process(data)
 	   
  • (1) Наш новый asynccontextmanager применяется точно так же.

  • (2) Тем не менее, он требует, чтобы функция декорирующего генератора объявлялась с применением async def.

  • (3) Как и ранее, прежде чем сделать данные доступными в самом теле нашего диспетчера контекста, мы выбираем эти данные из необходимого URL. В данном случае я добавил необходимое ключевое слово await, которое сообщает нам что эта сопрограмма позволит соответствующему циклу событий запускать прочие задачи пока мы ожидаем завершения своего сетевого вызова.

    Заметим, что мы не можем просто прибить своё ключевое слово await куда пожелаем. Данное изменение также предполагает, что мы также способны изменить саму свою функцию download_webpage() и преобразовать её в некую сопрограмму, которая совместима с этим ключевым словом await. Для тех случаев, когда невозможно изменять данную функцию, необходим иной подход и мы обсудим его в своём следующем примере.

  • (4) Как и ранее, все данные делаются доступными самому телу нашего диспетчера контекста. Я пытаюсь сохранять данный код простым, и поэтому я опускаю обработчики try/finally, которые обычно вам следует писать для рассмотрения исключительных ситуаций, возбуждаемых в теле вызывающей стороны.

    Отметим: Собственно присутствие yield является именно тем, что преобразует некую функцию в функцию генератора; дополнительно присутствие соответствующих ключевых слов async def в пункте 1 превращает её в асинхронную функцию генератора. При своём вызове она будет возвращать некий асинхронный генератор. Наш модуль inspect имеет две функции, которые могут проверить это: соответственно, isasyncgenfunction() и isasyncgen().

  • (5) Тут мы предполагаем, что мы также преобразовали свой код внутри функции update_stats() чтобы позволить ему производить сопрограммы. Затем мы можем применить ключевое слово await, которое допустит некое переключение в нашем цикле событий пока мы ожидаем завершение работы, ограниченной соответствующим вводом/ выводом.

  • (6) Для применения самого диспетчера контекста требовалось другое изменение: нам требовалось применять async with вместо with.

К счастью, данный пример показывает, что наш новый @asynccontextmanager в точности аналогичен имевшемуся ранее декоратору @contextmanager.

В сноске 3 предыдущего примера я сказал, что было необходимо изменить некоторые функции для возврата сопрограмм; ими были download_webpage() иupdate_stats(). Обычно это не так просто сделать, так как необходимо добавить поддержку асинхронности на нижнем уровне сокета.

Основной фокус предыдущего примера был нацелен на то что бы просто показать наш новый @asynccontextmanager, а не показывать как преобразовывать функции с блокировкой в неблокирующие. Более общий случай состоит в том, что вам может понадобиться применять в своей программе некую функцию с блокировкой, но не будет возможности изменять имеющийся в таких функциях код.

Такая ситуация часто происходит с библиотеками сторонних разработчиков и прекрасным примером может послужить соответствующая библиотека requests, которая внутри себя применяет вызовы с блокировкой [Может оказаться весьма сложным добавление поддержки Async в некое имеющееся инструментальное средство по данной причине, так как могут потребоваться большие структурные изменения. Это обсуждается в эссе для Requests github]. Хорошо, раз вы не можете изменить подлежащий вызову код, имеется иной путь, и это удобное место показать как именно для этого можно применять Исполнитель (executor):

 

Пример 3-13. Вариант без блокировки с небольшой вспомогательной функцией от моих друзей


from contextlib import asynccontextmanager

@asynccontextmanager
async def web_page(url):(1)
    loop = asyncio.get_event_loop()
    data = await loop.run_in_executor(
        None, download_webpage, url)(2)
    yield data
    await loop.run_in_executor(None, update_stats, url)(3)

async with web_page('google.com') as data:
    process(data)
 	   
  • (1) В данном примере мы предполагаем, что у нас нет возможности изменить свой код для двух вызовов с блокировкой download_webpage() и update_stats(), то есть мы не можем модифицировать их в функции сопрограмм. Это плохо, ибо самый серъёзный грех основанного на событиях программирования нарушает то правило, которое не следует нарушать и ни при каких обстоятельствах не мешать основному циклу событий обрабатывать события.

    Чтобы обойти данную проблему, мы будем применять некий Исполнитель для запуска вызовов с блокировкой в каком- то отдельном потоке. Такой Исполнитель предоставляется еам в виде некоего атрибута самого основного цикла событий .

  • (2) Здесь мы вызываем своего Исполнителя. Его отличительной особенностью является AbstractEvent Loop.run_in_executor(executor, func, *args) и, если вы желаете применять определяемый по умолчанию Исполнитель (которым является ThreadPoolExecutor) тогда вам следует передать None в качестве аргумента для соответствующего "Исполнителя". [Это очень раздражает. Всякий раз, когда я применяю данный вызов, я не могу задаваться вопросом, почему бы не предпочесть в качестве аргумента некоего ключевого слова идиому использования executor=None.]

  • (3) Как и в случае вызова download_webpage(), мы также можем запустить другой блокирующий вызов к update_stats() в некотором Исполнителе. Отметим, что вы обязаны применять ключевое слово await спереди этого вызова. Если вы его забудете, получаемое исполнение вашего асинхронного генератора (то есть вашего асинхронного диспетчера контекста) не будет выполнять ожидания необходимого вызова на завершение перед обработкой.

Скорее всего асинхронные диспетчеры контекста станут широко применяться во многих кодах, основанных на asyncio, следовательно достаточно важно иметь о них хорошее представление. Вы можете прочесть дополнительные подробности о новом асинхронном декораторе диспетчера контекста в документации Python 3.7.

[Замечание]Замечание

asyncio всё ещё находится в стадии активной разработки командой разработчиков Python и в Python 3.7 доступны некие прочие небольшие, но существенные улучшения помимо asynccontextmanager, включая следующие небольшие примеры:

  • asyncio.run(): некая новая вспомогательная функция, предназначенная для применения в качестве основной точки входа для программ asyncio.

  • asyncio.create_task(): создаёт некую задачу без необходимости указания в явном виде идентификатора цикла.

  • AbstractEventLoop.sock_sendfile(): отправляет некий файл поверх сокета TCP с применением высокопроизводительного API os.sendfile().

  • AbstractEventLoop.start_tls(): обновляет некое имеющееся соединение до безопасного транспортного уровня (TLS).

  • asyncio.Server.serve_forever(): API очистки для запуска сетевых серверов asyncio.

Итераторы Async: async for

Помимо async def и await имеется ещё несколько расширений в синтаксисе языка Python. Первым из них является асинхронная версия для цикла "for". Проще всего понять как это работает если вы вначале поймёте что простая итерация - так же как и многие другие функции языка программирования - реализуется при помощи специальных методов, отличающих их двойными подчёркиваниями в соответствующих названиях.

Например, вот как некий стандартный (не асинхронный) итератор определяется посредством применения методов __iter__() и __next__():


>>> class A:
...     def __iter__(self):(1)
...         self.x = 0(2)
...         return self(3)
...     def __next__(self):(4)
...         if self.x > 2:
...             raise StopIteration(5)
...         else:
...             self.x += 1
...             return self.x(6)
>>> for i in A():
...     print(i)
123
 	   
  • (1) Некий Итератор обязан реализовывать соответствующие специальный метод __iter__().

  • (2) Инициализируем некое состояние в его "начальном" значении состояния.

  • (3) Наш особый метод __iter__() должен возвращать нечто итерируемое, т.е. какой- то объект, который реализует специальный метод __next__(). В данном случае, тот же самый экземпляр, так как A сам по себе также реализует соответствующий особый метод __next__().

  • (4) Метод __next__() определён. Он буде вызываться для каждого последующего шага в итерациях, до тех пор пока ...

  • (5) ... не будет возбуждена StopIteration.

  • (6) Здесь вырабатываются возвращаемые значения для каждой итерации.

Теперь вы зададите справедливый вопрос: что произойдёт если вы объявите свой особый метод __next__() в качестве некоторой функции сопрограммы async def? Это позволит ему выполнять await некоторого вида операций, ограниченных вводом/ выводом; а также это во многом в точности повторяет то, как работает async for, за исключением маленьких деталей вокруг именований. Имеющаяся спецификация (в PEP 492) показывает, что чтобы применять async for в качестве какого- то асинхронного итератора в самом этом асинхронном итераторе требуются определённые моменты:

  1. Вам требуется реализовать def __aiter__() (Отмечаем: без) async def!)

  2. __aiter__() должен возвращать некий объект, который реализует async def __anext__()

  3. __aiter__() должен возвращать некое значение для каждой итерации и при завершении должен возбуждать StopAsyncIteration

Давайте быстро взглянем на то как это может работать. Представим что у нас имеется пачка ключей в некоторой базе данных Redis и мы желаем выполнить итерации по их данным, но при этом делать выборку данных только по запросу. Некий асинхронный итератор может выглядеть примерно как- то так:


import asyncio
from aioredis import create_redis

async def main():(1)
    redis = await create_redis(('localhost', 6379))(2)
    keys = ['Americas', 'Africa', 'Europe', 'Asia'](3)

    async for value in OneAtATime(redis, keys):(4)
        await do_something_with(value)(5)

class OneAtATime:
    def __init__(self, redis, keys):(6)
        self.redis = redis
        self.keys = keys
    def __aiter__(self):(7)
        self.ikeys = iter(self.keys)
        return self
    async def __anext__(self):(8)
        try:
            k = next(self.ikeys)(9)
        except StopIteration:(10)
            raise StopAsyncIteration

        value = await redis.get(k)(11)
        return value

asyncio.get_event_loop().run_until_complete(main())
 	   
  • (1) Это основная функция: вы запускаем её при помощи run_until_complete() в нижней части своего примера кода.

  • (2) Для получения соединения мы применяем интерфейс верхнего уровня из aioredis.

  • (3) Представим себе, что каждое из связанных с этими ключами значение достаточно большое и хранится в соответствующем экземпляре Redis.

  • (4) Здесь мы применяем async for: основной момент состоит в том, что итерация сама по себе способна подвешивать себя пока не дождётся появления своего следующего значения.

  • (5) Для полноты допустим что мы также выполняем некие действия, связанные с вводом/ выводом для своей выборки данных; может происходить обмен данными и затем они отправляются другому получателю.

  • (6) Собственно инициализатор данного класса достаточно простой: мы сохраняем необходимое подключение к экземпляру Redis и соответствующий перечень ключей, по которым требуется выполнять итерацию.

  • (7) В точности как и для предыдущего примера кода с __iter__(), мы используем __aiter__() для настройки необходимых для итерации моментов. Мы создаём некий обычный итератор по имеющимся ключам, self.ikeys, а также return self, так как OneAtATime также реализует соответствующие метод __anext__() сопрограммы.

  • (8) Отметим, что метод __anext__()объявляется с применением async def, в то время как метод __aiter__() объявляется только при помощи def.

  • (9) Для каждого ключа выполняем выборку его значения из Redis: self.ikeys является обычным итератором по имеющимся ключам, поэтому мы применяем next() для перемещения по ним.

  • (10) Когда израсходованы self.ikeys, обрабатываем соответствующий StopIteration и просто возвращаем его в StopAsyncIteration! Именно таким образом вы сигнализируете об останове изнутри некоего асинхронного итератора.

  • (11) Наконец - вся цель данного примера - мы можем получить необходимые данные из Redis при помощи данного ключа. Мы можем выполнить await для этих данных, что означает, что другой код вашего цикла событий может выполняться пока мы дожидаемся сетевого ввода/ вывода.

Надеемся, что этот пример является понятным: async for предоставляет возможность оставаться в удобном простом цикле for, даже при итерации данных при которых сама по себе итерация выполняет ввод/ вывод. Основное преимущество от этого состоит в том что вы можете обрабатывать гигантские объёмы данных, причём всё это делать в отдельном цикле, так как вам придётся всякий раз иметь дело только с каждым фрагментом в небольших партиях.

Генераторы Async: yield внутри async def функций

Асинхронные генераторы отвечают на законный вопрос: "Что произойдёт если применить yield внутри естественной функции сопрограммы async def?" Эта концепция может вводить в заблуждение если у вас имеется некий опыт применения генераторов как если бы они были сопрограммами, так как это происходит в инструментальном средстве Twisted или в инфраструктуре Tornado, или даже как в случае yield from из asyncio Python 3.4.

Следовательно, прежде чем мы продолжим данный раздел, было бы лучше, чтобы вы убедили себя что:

  • Сопрограммы и генераторы совершенно разные понятия.

  • Асинхронные генераторы ведут себя во многом подобно обычным генераторам.

  • Для итераций вы применяете async for для асинхронных генераторов вместо обычного for в обычных генераторах.

Наш пример, использованный в предыдущем разделе для демонстрации какого- то асинхронного итератора при итерациях по Redis оказывается намного проще если мы установим некий асинхронный генератор:

 

Пример 3-14. С асинхронным генератором проще


import asyncio
from aioredis import create_redis

async def main():(1)
    redis = await create_redis(('localhost', 6379))
    keys = ['Americas', 'Africa', 'Europe', 'Asia']

    async for value in one_at_a_time(redis, keys):(2)
        await do_something_with(value)

async def one_at_a_time(redis, keys):(3)
    for k in keys:
        value = await redis.get(k)(4)
        yield value(5)

asyncio.get_event_loop().run_until_complete(main())
 	   
  • (1) Самая основная функция main() идентична той, которая была в нашем коде Итераторы Async: async for.

  • (2) Ладно, почти идентична: мне пришлось изменить её название с варианта написания верблюдом, на змеиный регистр (с подчёркиванием).

  • (3) Наша функция теперь объявляется с помощью async def, превращая её в некую функцию сопрограммы, а так как эта функция также содержит ключевое слово yield, мы именуем её асинхронной функцией генератора.

  • (4) Нам не требуется выполнять путанные действия, требовавшиеся в нашем предыдущем примере с помощью self.ikeys: здесь мы просто обходим циклом по имеющимся ключам напрямую, получая необходимые значения...

  • (5) ... и собирая их для вызывавшей стороны, в точности как в обычном генераторе.

Если вы новичок в данном вопросе, всё это может показаться слишком сложным, но я призываю вас поиграть со всеми этими примерами самостоятельно. Достаточно быстро вы начнёте ощущать это естественным. Асинхронные генераторы скорее всего станут очень популярными при кодировании на основе asyncio, поскольку они привносят в точности те же самые преимущества, что и обычные генераторы: делают код короче и проще.

Сообразительность Async

Теперь, когда мы рассмотрели как Python поддерживает асинхронные итерации, следующим естественным вопросом будет спросить понимает ли он также списки в for - и наш ответ ДА! Данная поддержка была введена в PEP 530 и я рекомендую вам просмотреть этот PEP самостоятельно. Он достаточно короткий и хорошо читаемый.

 

Пример 3-15. Async достаточно искушён чтобы работать со списками, словарями и наборами


>>> import asyncio
>>> async def doubler(n):
...     for i in range(n):
...         yield i, i * 2(1)
...         await asyncio.sleep(0.1)(2)
>>> async def main():
...     result = [x async for x in doubler(3)](3)
...     print(result)
...
...     result = {x: y async for x, y in doubler(3)}(4)
...     print(result)
...
...     result = {x async for x in doubler(3)}(5)
...     print(result)
>>> asyncio.get_event_loop().run_until_complete(main())
[(0, 0), (1, 2), (2, 4)]
{0: 0, 1: 2, 2: 4}
{(1, 2), (0, 0), (2, 4)}
 	   
  • (1) doubler() является очень простым асинхронным генератором: задавая некое верхнее значение он итеративно проходит по простому диапазону, принося кортежи из смого значения и его дублёра.

  • (2) Небольшой сон, просто чтобы подчеркнуть что это некая асинхронная функция.

  • (3) Широта охвата некоторого асинхронного списка: обратите внимание как async for применяется вместо обыкновенного for. Это отличие в точности то же, что а в Итераторы Async: async for.

  • (4) Выразительность Async; все обычные трюки срабатывают, например, распаковка полученного кортежа в x и y с тем, чтобы они смогли запитать охват синтаксиса словаря.

  • (5) Широта охвата асинхронного множества работает в точности так, как вы могли бы ожидать.

Обратной стороной нашей монеты, как выводит PEP 530, является применение await внутри выразительных средств. Но это не совсем так: await <coro> является обычным выражением и оно может применяться в большинстве мест, в которых вы это можете ожидать.

Именно async for является тем, что превращает выразительность в асинхронную выразительность, а не наличие await. Всё что вам требуется для допустимости await (внутри некоторого поглощения), это то, что вы находитесь внутри самого тела какой- то функции сопрограммы, то есть некоторой функции, объявленной с помощью async def. Поэтому хотя применение await и async for внутри того же самого поглощаемого списка в действительности сочетает два различных подхода, давайте сделаем это в любом случае чтобы продолжить снижать порого чувствительности вхождения в комфортное состояние от применения синтаксиса асинхронного языка программирования:

 

Пример 3-16. Собираем всё в кучу


>>> import asyncio
>>> async def f(x):(1)
...   await asyncio.sleep(0.1)
...   return x + 100
>>> async def factory(n):(2)
...   for x in range(n):
...     await asyncio.sleep(0.1)
...     yield f, x(3)
>>> async def main():
...   results = [await f(x) async for f, x in factory(3)](4)
...   print('results = ', results)
>>> asyncio.get_event_loop().run_until_complete(main())
results = [100, 101, 102]
 	   
  • (1) Очень простая функция сопрограммы: небольшое засыпание с последующим возвратом увеличенного на 100 значения параметра.

  • (2) Именно этот асинхронный генератор мы будем вызывать внутри некоторого охватывающего асинхронного списка слегка ниже, применяя async for для продвижения по итерациям.

  • (3) Этот асинхронный генератор будет производить некий кортеж из f и значения переменной итерации ч. Собственно возвращаемым значением f является некая функция сопрограммы, кщё пока не сопрограмма.

  • (4) Наконец, собственно асинхронный охват (выразительность). Данный пример был слегка натянут чтобы продемонстрировать некую выразительность, которая включает в себя одновременно и async for, и await. Давайте разберём что происходит внутри данного охвата: вызов нашего factory(3) возвращает некий асинхронный генератор, который должен управляться итерациями. Так как он является неким асинхронным генератором, вы не можете просто применять for; вы обязаны воспользоваться async for.

    Далее, те значения, которые производятся данным асинхронным генератором являются кортежем из функции сопрограммы f и некоторого int. Вызов данной функции сопрограммы f() производит некую сопрограмму, которая должна вычисляться при помощи await.

    Заметим, что внутри данного охвата использование await не имеет ничего общего с применением async for: они выполняют совершенно различные вещи и действуют на целиком различные объекты.

Запуск и останов (Аккуратные!)

Основная часть основанных на асинхронности программ предназначены для длительного исполнения сетевых приложений. Эта область таит в себе удивительно много сложностей, связанных с тем как корректно осуществлять запуск и останов.

Из этих двоих, запуск намного проще. Самый стандартный способ запуска некоторого приложения asyncio состоит в создании какой- то задачи, с последующим вызовом loop.run_forever(), как это было показано в нашем примере Hello World из раздела Быстрое начало.

Единственным исключением может быть случай когда вам требуется запустить некий сервер ожидания. В таком случае запуск обычно является процесом из двух этапов:

  1. Вначале создаёте некую сопрограмму только для самой "запускающей" фазы такого сервера, а затем воспользуйтесь run_until_complete() в данной инициализированной сопрограмме для запуска самого сервера.

  2. Во- вторых, продолжите в свой обычной "главной" {main} части приложения посредством вызова loop.run_forever().

Обычно запуск будет достаточно прямолинейным; а относительно описанного выше варианта сервера вы можете ознакомиться с дополнительными подробностями в документации. Кроме того мы также кратко рассмотрим некую демонстрацию запуска такого сервера в последующем примере кода.

Останов является намного более интригующим.

Для останова вначале мы рассмотрим соответствующий танец, который следует за тем, когда нечто останавливает имеющийся цикл событий. Когда некий исполняемый цикл останавливается, соответствующий вызов run_forever() выполняет снятие блокировки и выполняется появляющийся после этого код. В этот момент вы обязаны:

  1. собрать все всё ещё приостановленные объекты задач (если таковые имеются)

  2. прекратить эти задачи (что возбудит CancelledError внутри каждой исполняемой сопрограммы, которые вы можете выбрать на обработку в try/except внутри самого тела данной функции сопрограммы

  3. собрать все эти задачи в некую "группу" задач

  4. к этой "группе" задач применить run_until_complete() чтобы дождаться их завершения, т.е. позволить такой исключительной ситуации CancelledError возбудиться и подвергнуться обработке

Только после этого останов завершится.

Обряд посвящения в построение ваших первых нескольких прикладных программ asyncio будет состоять из попыток избавления от сообщений подобных Task was destroyed but it is pending! {Задача была уничтожена, но она приостановлена!} на протяжении останова. Это происходит по причине пропуска одного или более перечисленных выше этапов. Вот пример данной ошибки:

 

Пример 3-17. Уничтожение приостановленных задач


# taskwarning.py
import asyncio

async def f(delay):
    await asyncio.sleep(delay)

loop = asyncio.get_event_loop()
t1 = loop.create_task(f(1))(1)
t2 = loop.create_task(f(2))(2)
loop.run_until_complete(t1)(3)
loop.close()
 	   
  • (1) Задача 1 будет исполняться 1 секунду.

  • (2) Задача 2 будет исполняться 2 секунды.

  • (3) Исполняется пока не завершится задача 1.

Вывод:


$ python taskwarning.py
Task was destroyed but it is pending!
task: <Task pending coro=<f() done, defined at [...snip...]>>
		

Данная ошибка говорит вам о том, что некие задачи пока ещё не завершены при закрытии данного цикла. Мы желаем избежать этого, и именно по этой причине определённая формирующая отдельную сущность останова процедура служит сбору всех незавершённых задач, их прекращению с последующим допуском их завершения прежде чем будет закрыт данный цикл.

Давайте рассмотрим более подробно код из своего примера Быстрого начала и рассмотрим эти фазы вновь. Мы проделаем это через мини- обучение с помощью эхо сервера на основе telnet.

На Рисунке 3-1 наш сервер запущен с левой стороны панели. Затем, справа, некий сеанс telnet выполняет подключение к нашему серверу. Мы набираем несколько команд, которые возвращаются к нам обратно (все приведённые в верхний регистр) и после этого мы отключаемся. Теперь мы рассмотрим тот код, который управляет данной программой.

 

Рисунок 3-1


Взаимодействие Telnet при помощи эхо сервера

 

Пример 3-18. Жизненный цикл приложения Asyncio (на основе представленного в документации Python эхо сервера)


from asyncio import ((1)
    get_event_loop, start_server, CancelledError,
    StreamReader, StreamWriter, Task, gather)

async def echo(reader: StreamReader, writer: StreamWriter):(2)
    print('New connection.')
    try:
        while True:(3)
            data: bytes = await reader.readline()(4)
            if data in [b'', b'quit']:
                break
            writer.write(data.upper())(5)
            await writer.drain()
        print('Leaving Connection.')
    # except CancelledError:(6)
    # writer.write_eof()
    # print('Cancelled')
    finally:
        writer.close()

loop = get_event_loop()
coro = start_server(echo, '127.0.0.1', 8888, loop=loop)(7)
server = loop.run_until_complete(coro)(8)

try:
    loop.run_forever()(9)
except KeyboardInterrupt:
    print('Shutting down!')

server.close()(10)
loop.run_until_complete(server.wait_closed())(11)

tasks = Task.all_tasks()(12)
for t in tasks:
    t.cancel()
group = gather(*tasks, return_exceptions=True)(13)
loop.run_until_complete(group)(14)
loop.close()
 	   
  • (1) Импортируем кучу вещей из пространства имён asyncio - для данной книги мне приходится контролировать длин строки!

  • (2) Данная функция сопрограммы echo() будет применена (нашим сервером) для создания какой- то сопрограммы для каждого выполненного соединения. Эта сопрограмма применяет API streams для сетевого обмена с asyncio.

  • (3) Для поддержки подключения у нас будет некий бесконечный цикл ожидающий сообщения.

  • (4) Ожидаем некую строку данных с другой стороны.

  • (5) Возвращаем полученный данные обратно, но целиком в верхнем регистре.

  • (6) Я интенсивно комментирую данный блок чтобы привлечь внимание к тому, что случится если ваша сопрограмма не отработает должным образом прекращение. Если вы действительно обработаете CancelledError, вы получите некий шанс выполнить всю очистку, особенную для сценария останова.

  • (7) Именно это является запускающей фазой нашей программы: наш сервер требует некоторого отдельного шага из имеющегося "основного" этапа run_forever(). Наша функция start_server() возвратит какую- то сопрограмму и она должна быть run_until_complete(). Отметим как передаётся наша функция сопрограммы echo: она будет использоваться в качестве некоторой фабрики, которая производит новую сопрограмму для каждого нового подключения.

  • (8) Исполним эту сопрограмму для запуска своего сервера TCP.

  • (9) Только теперь мы начинаем свою основную, "ожидающую" часть программы. Начиная с этого момента всякое выполненное для нашего сервера подключение TCP породит некую сопрограмму из нашей функции сопрограммы echo. Единственная вещь, которая может остановить наш цикл событий это KeyboardInterrupt, которое является тем же самым, чем является SIGINT для систем Unix. (В некоторой промышленной системе вы будете применять особые обработчики сигналов, отличающиеся от KeyboardInterrupt; это представлено далее в разделе Сигналы.) .

  • (10) По достижению этого места мы знаем что был инициирован останов, например, при помощи Ctrl-C. Самая первая вещь, которую необходимо сделать это предотвратить наш сервер от приёма дополнительных новых подключений. Это требует двух шагов: во- первых вызываем server.close()...

  • (11) ... а затем запускаем wait_closed() в определённом цикле для закрытия всех сокетов, которые всё ещё ожидают подключений, но которые ещё не соединились. Отметим, что все действующие подключения (например, порождённые из asyncio) с активными подключениями не подвергнутся воздействию: эти соединения останутся работающими.

  • (12) Теперь мы можем начать останавливать задачи. Данная формирующая единое целое последовательность должна собрать все приостановленные в данный момент задачи в рассматриваемом цикле событий, прекратить их и затем собрать их в некую отдельную задачу группы.

  • (13) Отметьте данный параметр return_exceptions. Он достаточно важен и отдельно обсуждается в нашем следующем разделе.

  • (14) Как и ранее, для завершения запускаем полученную задачу группы.

Надеюсь, вы начинаете различать знакомый шаблон.

[Совет]Совет

Один самый последний момент прежде чем мы переместимся: Если вы перехватываете CancelledError внутри некоторой сопрограммы, вам следует быть аккуратеым и не создавать никаких новых задач внутри данного обработчика прерываний.

И вот почему: тот вызов all_tasks(), который выдаётся в 12 не собирается быть информированным о каких- либо новых задачах, создаваемых на протяжении данной фазы run_until_complete() при вызове 14 - что происходит когда данный код внутри ваших обработчиков прекращения когда они будут исполняться. Поэтому основное правило гласит: никаких новых задач внутри обработчиков исключительной ситуации CancelledError, если только вы также не снабжаете их await внутри какого- то обработчика исключительной ситуации CancelledError.

И запомните: если вы применяете некую библиотеку или какое- то инструментальное средство, убедитесь что вы следуете их документации относительно того как выполнять запуск и останов. Инструментальные средства сторонних разработчиков обычно предоставляют свои собственные функции запуска и останова, к тому же они предоставляют особые точки входа событий для индивидуализации. Вы можете увидеть это в примере из данной книги с инструментальным средством Sanic позднее в этой книге в Примере: недостоверность кэширования.

Для чего предназначено return_exceptions=True

Вы могли заметить, что я установил значение аргумента с ключевым словом return_exceptions=True в своём вызове gather() при исполнении 13 в нашем предыдущем примере кода. Я также это делал ранее в Быстром начале, причём я очень подло не сказал ничего об этом в тот раз. Настал момент для пояснений.

Значением по умолчанию является gather(..., return_exceptions=False). Такое значение по умолчанию является проблематичным для нашего процесса останова. Это слегка сложно объяснить напрямую; вместо этого давайте пошагово пройдём некую последовательность фактов,которая сделает более простым понимание этого:

  • run_until_complete() воздействует на некий фьючерс; на протяжении останова именно этот фьючерс возвращается gather.

  • Если этот фьючерс возбуждает некую исключительную ситуацию, эта исключительная ситуация также будет возбуждена из run_until_complete(), что означает, что данный цикл будет остановлен..

  • Если в некотором фьючерсе "группы" применяется run_until_complete(), любая возбуждаемая внутри всех его подзадач исключительная ситуация также будет возбуждена и в самом фьючерсе "группы", если она не обрабатывается в данной подзадаче. Отметим, что они также включают в себя и CancelledError.

  • Если только некоторые задачи обрабатывают CancelledError, а прочие нет, те из них, что не делают этого вызовут останов своего цикла. Это означает, что данный цикл будет остановлен прежде чем все задачи сделают это.

  • В режиме останова нам в действительности не желательно такое поведение. Мы хотим завершить run_until_complete() только когда все задачи в данной группе на самом деле завершены, вне зависимости от того возбуждали некие иные задачи исключительные ситуации или нет.

  • Следовательно, нам приходится выполнять gather(*, return_exceptions=True): такая установка заставляет имеющийся фьючерс "групп" трактовать исключительные ситуации из своих подзадач как возвращаемые значения, поэтому они не всплывают и не выступают помехой для run_until_complete().

И здесь у вас именно это: необходимая взаимосвязь между return_exceptions=True и run_until_complete()!

Нежелательная последовательность перехватываемых исключительных ситуаций, исполняемых таким образом, может ускользнуть от вашего внимания поскольку они теперь (в действительности) подлежат обработке внутри имеющейся задачи "группы". Если это вызывает беспокойство, вы можете получить вывод из run_until_complete() и просканировать его относительно любых подклассов Exception; а затем записать протокол сообщений для вашей ситуации. Вот быстрый взгляд на то что вы получите:

 

Пример 3-19. Все имеющиеся задачи завершатся


import asyncio

async def f(delay):
    await asyncio.sleep(1 / delay)(1)
    return delay

loop = asyncio.get_event_loop()
for i in range(10):
    loop.create_task(f(i))
pending = asyncio.Task.all_tasks()
group = asyncio.gather(*pending, return_exceptions=True)
results = loop.run_until_complete(group)
print(f'Results: {results}')
loop.close()
 	   
  • (1) Было бы ужасно если бы некто передал ноль...

Вывод:


$ python alltaskscomplete.py
Results: [6, 9, 3, 7, ...
          ZeroDivisionError('division by zero',), 4, ...
          8, 1, 5, 2]
		

Без return_exceptions=True, соответствующая ZeroDivisionError была бы возбуждена из run_until_complete(), останавливая весь цикл и таким образом не позволяя завершиться прочим задачам.

В нашем следующем разделе мы рассмотрим обработку сигналов (помимо Keyboard​Interrupt), но прежде чем мы перейдём к этому, было бы неплохо принять во внимание, что аккуратный останов является одной из самых сложных сторон сетевого программирования и это остаётся справедливым и для asyncio. Информация в этом разделе - это только начало. Я призываю вас провести особые проверки для чистого останова в ваших собственных автоматизированных комплектах. Различные приложения часто требуют различных стратегий.

В индексе пакетов Python я опубликовал некий крошечный пакет aiorun, преимущественно на основе своих собственных экспериментов и исследований, относящихся в останову и он содержит множество идей из данного раздела. Также может оказаться полезным для вас повозиться с этим кодом и поэкспериментировать со своими собственными мыслями относительно вариантов останова asyncio.

Сигналы

В своём предыдущем примере мы показали как наш цикл событий будет остановлен с помощью KeyboardInterrupt, например, нажатием Ctrl-C. Такое возбуждение KeyboardInterrupt действенно снимет блокировку имеющегося вызова run_forever() и позволит произойти дальнейшей последовательности останова.

KeyboardInterrupt соответствует сигналу SIGINT. Для сетевых служб более распространёнными в действительности являются сигнал для обработки завершения SIGTERM, и именно он также является установленным по умолчанию сигналом,который вы применяете в соответствующей команде kill() из оболочки UNIX.

[Совет]Совет

Данная команда kill() в системах Unix имеет обманчивое название: всё что она делает, это отправка сигналов в некий процесс! Без аргументов, $ kill <PID> отправит некий сигнал TERM: ваш процесс может получить этот сигнал и аккуратно выполнить останов или просто проигнорировать его! Однако, это плохая идея, так как если ваш процесс не может быть остановлен в конце концов, следующий момент который выполняет данный убивец, это $ kill ­s KILL <PID>, который отправляет соответствующий сигнал KILL. Вот он уже остановит вас и ваша программа ничего с этим не сможет поделать.

asyncio имеет встроенную поддержку для обработки сигналов процессов, однако имеется удивительная степень сложности относительно обработки сигналов в целом (безотносительно к asyncio). Мы не можем охватить всё что имеется, но мы можем взглянуть на некоторые из самых основных вопросов, которым следует уделить внимание. Наш следующий образец кода произведёт такой вывод:


$ python shell_signal01.py
<Your app is running>
<Your app is running>
<Your app is running>
<Your app is running>
^CGot signal: SIGINT, shutting down.
		

Для останова данной программы я нажал Ctrl-C, что видно из последней строки. Вот сам код:

 

Пример 3-20. Памятка по применению KeyboardInterrupt в качестве обработчика SIGINT


# shell_signal01.py
import asyncio

async def main():(1)
    while True:
        print('')
        await asyncio.sleep(1)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.create_task(main())(2)
    try:
        loop.run_forever()
    except KeyboardInterrupt:(3)
        print('Got signal: SIGINT, shutting down.')
    tasks = asyncio.Task.all_tasks()
    for t in tasks:
        t.cancel()
    group = asyncio.gather(*tasks, return_exceptions=True)
    loop.run_until_complete(group)
    loop.close()
 	   
  • (1) Это основная часть нашего приложения. Чтобы сделать вещи более простыми мы просто собираемся заснуть в некотором бесконечном цикле.

  • (2) Данная последовательность запуска и останова покажется знакомой вам по нашим предыдущим разделам. Мы спланируем main(), вызовем run_forever(), и подождём чего- то чтобы остановить этот цикл.

  • (3) В данном случае только Ctrl-C остановит наш цикл, а затем мы обработаем KeyboardInterrupt и выполним все необходимые кусочки очистки, котоые мы рассмотрели в своих предыдущих разделах.

До сих пор всё было достаточно прямолинейным. Теперь мы собираемся усложнить действия:

  • Один из ваших коллег обратился к вам с просьбой обрабатывать SIGTERM в дополнение к SIGINT в качестве сигнала останова.

  • В вашем реальном приложении вам требуется обрабатывать CancelledError внутри своей сопрограммы main(), а необходимый код очистки внутри соответствующего обработчика исключительной ситуации собирается занять несколько секунд до завершения (представим, что вам придётся взаимодействовать с одноранговой сетевой средой и закрывать пачку соединений сокетов).

  • Ваша прикладная программа не должна делать странных вещей если вы отправляете сигналы несколько раз, вы просто игнорируете все прочие сигналы вплоть до выхода.

asyncio предоставляет достаточную степень детализации в своём API для обработки подобных ситуаций. Я изменил свой код приведённого ранее примера для ввода этих моментов:

 

Пример 3-21. Обработка как SIGINT, так и SIGTERM, но выполнение останова только один раз.


# shell_signal02.py
import asyncio
from signal import SIGINT, SIGTERM(1)

async def main():
    try:
        while True:
            print('<Your app is running>')
            await asyncio.sleep(1)
    except asyncio.CancelledError:(2)
        for i in range(3):
            print('<Ваша прикладная программа завершается...>')
            await asyncio.sleep(1)

def handler(sig):(3)
    loop.stop()(4)
    print(f'Got signal: {sig!s}, shutting down.')
    loop.remove_signal_handler(SIGTERM)(5)
    loop.add_signal_handler(SIGINT, lambda: None)(6)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    for sig in (SIGTERM, SIGINT):(7)
        loop.add_signal_handler(sig, handler, sig)
    loop.create_task(main())
    loop.run_forever()(8)
    tasks = asyncio.Task.all_tasks()
    for t in tasks:
        t.cancel()
    group = asyncio.gather(*tasks, return_exceptions=True)
    loop.run_until_complete(group)
    loop.close()
 	   
  • (1) Импортируем все значения сигналов из стандартной библиотеки signal.

  • (2) На этот раз наша сопрограмма main() собирается выполнять некую очистку внутри себя. После получения сигнала прекращения (инициируемого прекращением каждой из соответствующих задач), будет иметься промежуток времени в три секунды пока main() продолжит исполнение на протяжении фазы "run_until_complete()" данного процесса останова. Я напечатаю "Ваша прикладная программа завершается..." .

  • (3) Это обработчик обратного вызова для того случая, когда мы получили некий сигнал. Он настроен в данном цикле через вызов к add_signal_handler() немного дальше.

  • (4) Первейшая цель данного обработчика состоит в остановке основного цикла: это снимет блокировку вызова loop.run_forever() и сделает возможным сбор приостановленных задач, прекращение и собственно run_complete() для останова.

  • (5) Так как мы теперь пребываем в режиме останова, нам не нужны иные SIGINT или SIGTERM для переключения данного обработчика снова: это вызвало бы loop.stop в процессе соответствующей фазы run_until_complete(), что оказало бы воздействие на наш процесс останова. По этой причине мы удаляем данный обработчик сигнала для SIGTERM из своего цикла.

  • (6) Попались! Мы не можем просто удалить данный обработчик для SIGINT, так как это повлечёт что KeyboardInterrupt опять станет соответствующим обработчиком для SIGINT, точно так же, как если бы он добавлялся ранее нашим собственным обработчиком! Вместо этого мы устанавливаем некую пустую функцию lambda в качестве текущего обработчика. Это означает, что мы избегаем KeyboardInterrupt и SIGINT (а также Ctrl-C) не имеют эффекта. [add_signal_handler() скорее всего следует именовать как set_signal_handler(), так как у вас может иметься только один обработчик для каждого типа сигнала, а повторный вызов add_signal_handler() для того же самого сигнала подменит предыдущий обработчик для данного сигнала, если таковой имеется.]

  • (7) В этом месте наши обработчики сигналов подключаются к основному циклу. Отметим, что уже обсуждалось ранее, после что установки некоего обработчика SIGINT, KeyboardInterrupt более не будет возбуждаться по SIGINT. Такое возбуждение KeyboardInterrupt является установленным "по умолчанию" обработчиком для SIGINT и является предварительной настройкой в Python пока вы не предпримете чего бы то ни было для изменения такого обработчика, что мы и делаем здесь.

  • (8) Как обычно, исполнение блокирует run_forever() пока что- то не остановит данный цикл. В таком случае основной цикл будет остановлен внутри handler() если в наш процесс буде отправлен либо SIGINT, либо SIGTERM. Остаток кода тот же самый что и прежде..

Вывод:


$ python shell_signal02.py
<Your app is running>
<Your app is running>
<Your app is running>
<Your app is running>
<Your app is running>
^CGot signal: Signals.SIGINT, shutting down.
<Your app is shutting down...>
^C<Your app is shutting down...>(1)
^C<Your app is shutting down...>
		
  • (1) Я много раз нажал Ctrl­C в процессе данной фазы останова, но, как и ожидалось, ничего не произошло пока в конце концов не завершилась сопрограмма main().

Ожидание исполнителя в процессе останова

Возвращаясь к Быстрому началу, в котором мы представили основы интерфейса Исполнителя в Примере 2. В Примере 2 мы указали, что соответствующий вызов time.sleep() был короче чем соответствующий вызов asyncio.sleep() - к счастью для нас - так как это означает, что собственная задача Исполнителя завершается быстрее чем его сопрограмма main() и в следствии этого данная программа останавливается корректно.

В этом разделе мы изучим что происходит в процессе останова когда задания Исполнителя требуют больше времени чем все приостановленные экземпляры Task. Самый короткий ответ таков: без вмешательства будьте готовы получить такой вид ошибок:

 

Пример 3-22. Исполнитель требует слишком долгого исполнения до завершения


# quickstart.py
import time
import asyncio

async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

def blocking():
    time.sleep(1.5)(1)
    print(f"{time.ctime()} Hello from a thread!")


loop = asyncio.get_event_loop()

loop.create_task(main())

loop.run_in_executor(None, blocking)
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
loop.close()
 	   
  • (1) Этот пример кода в точности то же что и пример из Быстрого начала, за исключением того, что значение времени сна в нашей функции с блокировкой теперь длиннее чем соответствующее значение в асинхронной части.

 

Пример 3-23. Вывод


Fri Sep 15 16:25:08 2017 Hello!
Fri Sep 15 16:25:09 2017 Goodbye!
exception calling callback for <Future at [...snip...]>
Traceback (most recent call last):

<big nasty traceback>

RuntimeError: Event loop is closed
Fri Sep 15 16:25:09 2017 Hello from a thread!
 	   

Что здесь происходит, так это то, что за сценой, run_in_executor() не создаёт некий экземпляр Task, он возвращает некий Future. Это означает, что он не содержится в наборе "активных задач", возвращаемых из asyncio.Task.all_tasks(), и таким образом, run_until_complete() не ждёт завершения задачи своего Исполнителя!

Существует три мысли, которые приходят в голову, для исправления этого, причём все они с различными компромиссами, и мы намерены рассмотреть их все. Действительная цель данного упражнения состоит в том чтобы помочь вам узнать о сроке службы основного цикла событий с разных точек зрения и задуматься об управлении временем жизни всех ваших сопрограмм, потоков и подпроцессов, которые могут взаимодействовать в некоторой нетривиальной программе.

Самая первая идея состоит в самая простая реализация состоит в постоянном выполнении await некоторой задачи Исполнителя внутри какой- то сопрограммы:

 

Пример 3-24. Вариант A: обёртка вызова соответствующего исполнителя внутри какой- то сопрограммы


# quickstart.py
import time
import asyncio

async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

def blocking():
    time.sleep(2.0)
    print(f"{time.ctime()} Hello from a thread!")

async def run_blocking():(1)
    await loop.run_in_executor(None, blocking)

loop = asyncio.get_event_loop()
loop.create_task(main())
loop.create_task(run_blocking())(2)
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=False)
loop.run_until_complete(group)
loop.close()
 	   
  • (1) Основная идея имеет целью исправление того недостатка, что run_in_executor() возвращает только некий экземпляр Future, а не какую- то задачу. Мы не можем перехватывать все задания в all_tasks(), но мы можем применять для своего фьючерса await. Чтобы сделать это мы создаём такую новую функцию сопрограммы run_blocking(), а внутри неё мы не делаем ничего кроме того, что мы осуществляем await получаемого результата из вызова run_in_executor(). Такая новая функция сопрограммы run_blocking() будет спланирована в некую Task и таким образом наш процесс останова включит её в необходимую группу.

  • (2) Для запуска такой новой функции сопрограммы run_blocking() мы применяем create_task в точности так же, как планировался запуск нашей сопрограммы main().

Приведённый выше код выглядит великолепно, за исключением одного момента: он не способен обрабатывать прекращение! Если вы посмотрите повнимательнее, вы увидите, что я опустил тот цикл прекращения задач, который появлялся в наших предыдущих примерах. Если вы вернёте его обратно, мы получим jib,rb "Event loop is closed" {Цикл событий закрыт}, как и ранее. Мы даже не можем обработать CancelledError внутри run_blocking() чтобы попытаться повторно ожидать данный фьючерс. Вне зависимости от того что мы пытаемся сделать, та задача, которая обёртывает run_blocking() не может быть прекращена, однако соответствующее задание Исполнителя будет исполняться пока не завершится его внутренний time.sleep(). Давайте перейдём в нашей следующей идее.

Следующая, приводимая ниже идея, является слегка более хитрой. Основная стратегия состоит в сборе всех приостановленных задач, с последующим прекращением только их, однако перед этим мы вызываем run_until_complete(), который мы добавили в соответствующий фьючерс из run_in_executor().

 

Пример 3-25. Вариант B: добавление соответствующего Future Исполнителя для всех собираемых задач


# quickstart.py
import time
import asyncio

async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

def blocking():
    time.sleep(2.0)
    print(f"{time.ctime()} Hello from a thread!")

loop = asyncio.get_event_loop()
loop.create_task(main())
future = loop.run_in_executor(None, blocking)(1)
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)(2)
for t in tasks:
    t.cancel()(3)
group_tasks = asyncio.gather(*tasks, return_exceptions=True)
group = asyncio.gather(group_tasks, future)(4)
loop.run_until_complete(group)
loop.close()
 	   
  • (1) В точности как и в первоначальном примере с блокировкой, мы вызываем напрямую run_in_executor(), но мы гарантированно назначаем полученный результат идентификатору future. В скорости этот идентификатор будет применён.

  • (2) Основной цикл остановлен, и теперь мы пребываем в соответствующей фазе останова. Для начала мы получаем все задачи. Отметим, что они не содержат само задание Исполнителя, так как run_in_executor() не создал какой- то задачи.

  • (3) Прекращаем все задачи.

  • (4) Это основная уловка: мы создаём некую новую группу, которая объединяет все задачи и соответствующий фьючерс Исполнителя. Таким образом, наш Исполнитель завершится нормально, пока все задачи всё ещё претерпевают обычный процесс прекращения.

Это решение ведёт себя лучшим образом в процессе останова, но всё ещё имеет недостатки. В целом, будет достаточно неудобно собирать все имеющиеся фьючерсы Исполнителя на протяжении всей вашей программы в некую коллекцию, которую вы можете применять в своей последовательности останова; затем создавать некую группу задачи- плюс- фьючерсы; а потом запускать это вплоть до завершения. Это работает, но имеется лучший способ.

 

Пример 3-26. Вариант C: используйте ваш собственный Исполнитель и ждите


# quickstart.py
import time
import asyncio

from concurrent.futures import ThreadPoolExecutor as Executor

async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

def blocking():
    time.sleep(2.0)
    print(f"{time.ctime()} Hello from a thread!")

loop = asyncio.get_event_loop()
executor = Executor()(1)
loop.set_default_executor(executor)(2)
loop.create_task(main())
future = loop.run_in_executor(None, blocking)(3)
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
for t in tasks:
    t.cancel()
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
executor.shutdown(wait=True)(4)
loop.close()
 	   
  • (1) На этот раз мы создаём свой собственный экземпляр Исполнителя.

  • (2) Вам придётся настроить свой собственный индивидуальный Исполнитель в качестве установленного по умолчанию для основного цикла. Это означает, что делая вызов run_in_executor() из любого места своего кода, он применяться ваш индивидуальный экземпляр.

  • (3) Как и ранее, запускаем соответствующую функцию с блокировкой.

  • (4) Наконец, мы можем в явном виде выполнять ожидание для всех своих заданий Исполнителя для завершения до закрытия основного цикла. Это исключит все видимые нами ранее сообщения "Event loop is closed". Мы можем сделать это потому что у нас имеется доступ к соответствующему объекту Исполнителя; установленный по умолчанию Исполнитель теперь не выставлен в соответствующем API asyncio и по этой причине мы не можем вызывать останов в нём.

Наконец, у нас есть некая стратегия с общей приемлемостью: вы можете вызвать run_in_executor() где угодно, а ваша программа всё равно будет останавливаться чисто.

Я настоятельно рекомендую вам поэкспериментировать со всеми показанными здесь примерами кода и испытайте различные стратегии для создания задач и заданий Исполнителя, заменяйте их поочерёдно во времени попытайтесь выполнять завершения чисто.

Тестирование с помощью asyncio

В одном из предыдущих разделов я сказал, что вам не следует передавать некие переменные цикла внутри своей программы (если вам требуется доступ к самому циклу), а вы просто можете применять asyncio.get_event_loop() для получения данного цикла всякий раз как он вам потребуется.

Однако, при написании проверок элементов, вы приходится работать со множеством циклов событий, так как каждая проверка элемента обычно требует некоторого нового экземпляра цикла. Вновь возникает основной вопрос: следует ли вам писать код, который требует передачи циклов событий в параметрах функций для поддержки проверок элементов?

Давайте рассмотрим некий пример, применяющий великолепные инструментальные средства проверки pytest. Для начала взглянем на ту функцию, которую мы хотим проверить:

 

Пример 3-27. Иногда вам приходится применять call_soon()


import asyncio
from typing import Callable

async def f(notify: Callable[[str], None]):(1)
    # <...some code...>
    loop = asyncio.get_event_loop()(2)
    loop.call_soon(notify, 'Alert!')(3)
    # <...some code...>
 	   
  • (1) Представим себе некую функцию сопрограммы f, внутри которой необходимо применить loop.call_soon() для того чтобы активировать другую функцию оповещения. (Она может выполнять протоколирование, записывать сообщения в Slack, короткие запоминания, либо что- то ещё, что вы можете придумать!)

  • (2) В этой функции мы не получаем значение цикла через соответствующие параметры функции, поэтому они получаются через get_event_loop(); помните, данный вызов всегда возвращает тот цикл, который ассоциирован с данным текущим потоком.

  • (3) Некий основной вызов предупреждения.

Лучший способ для переброски pytest в ваш код asyncio состоит в предоставлении некоторого цикла событий через какое- то приспособление. Pytest встраивает такое в каждый ваш тест в виде некоего параметра функции. Это звучит более ложно, чем есть на самом деле, поэтому вот пример приспособления для предоставления некоторого цикла событий:

 

Пример 3-28. Приспособление Pytest для предоставления какого- то цикла событий - с некоторой ошибкой!


# conftest.py(1)
import pytest

@pytest.fixture(scope='function')(2)

def loop():
    loop = asyncio.new_event_loop()(3)
    try:
        yield loop
    finally:
        loop.close()(4)
 	   
  • (1) Pytest, по определению, автоматически будет импортирован и сделан доступным для всех ваших модулей проверок, причём все они определены в некотором файле с названием conftest.py.

  • (2) Это создаёт некое приспособление. Установленный аргумент "scope" сообщает Pytest когда данное приспособление должно быть завершено и сделано некое новое. Для сферы "function", как и выше, каждый отдельный тест будет иметь изготовленным некий новый цикл.

  • (3) Создайте некий совершенно новый экземпляр цикла. Отметим, что мы не запускаем данный цикл. Мы оставляем это для каждого теста в котором будет использовано данное приспособление.

  • (4) Когда данное приспособление будет завершено (в нашем случае после каждого теста), закрываем основной цикл.

Приведённый выше код вызывает некую проблему, так что не применяйте его. Эта проблема едва уловима, но она также является основной сутью данного раздела, поэтому в ближайшее время мы обсудим её подробно. Вначале давайте рассмотрим все проверки для функции сопрограммы f:

 

Пример 3-29. Соответствующие проверки


from somewhere import f(1)

def test_f(loop):(2)

    collected_msgs = [](3)

    def dummy_notify(msg):(4)
        collected_msgs.append(msg)

    loop.create_task(f(dummy_notify))(5)
    loop.call_later(1, loop.stop)(6)
    loop.run_forever()

    assert collected_msgs[0] == 'Alert!'(7)
 	   
  • (1) Рассматривайте этот образец кода как псевдокод - нет никакого модуля somewhere! Здесь f ссылается на функцию сопрограммы, определяемой далее.

  • (2) Pytest распознаёт здесь соответствующий аргумент "loop", отыскивает это имя в соответствующем списке приспособлений и затем вызывает наш приведённый выше test_f() совместно с этим новым экземпляром цикла, выпускаемым из данного приспособления.

  • (3) Мы собираемся передать некий обратный вызов "пустышки" notify() в f, данный перечень соберёт все получаемые предупреждения (а также наш тест проверяет что мы получаем правильные сообщения обратно).

  • (4) Это поддельная функция notify(), требующаяся f.

  • (5) Планируем некую сопрограмму из f, передавая свой поддельный обратный вызов notify().

  • (6) Для запуска своего цикла мы применяем run_forever(), следовательно это гарантирует нам что данный цикл будет на самом деле остановлен. В качестве альтернативы мы можем применять run_until_complete(f(...)) без некоторого call_later(); однако что вы выберете зависит от того что вы проверяете. Для эффектов стороны проверки, как и в приводимом тесте , я обнаружил, что проще применять run_forever(). С другой стороны, когда вы проверяете возвращаемые значения из сопрограмм, лучше воспользоваться run_until_complete(). [А ещё один вариант может быть очень полезным для ускорения проверок: я также могу вызывать loop.stop() изнутри fake_notify(), непосредственно сразу после сбора всех msg в свой перечень. Это сберегает время, поскольку данный вызов run_forever() немедленно снимает блокировку и мы не выполняем ожидание для f() чтобы производить любую дальнейшую обработку. Конечно, это может означать что вы получаете предупредительный вывод об "отложенных задачах" когда данное приспособление вызывает loop.close() в процессе завершения. Тестирование является хитроумным искусством и всякая ситуация имеет свои отличительные особенности.]

  • (7) Наконец, проверяем!

Помните я сказал, что у ас была некая проблема? Вот она: внутри функции сопрограммы f наш возвращаемый из get_event_loop() экземпляр цикла является иным по отношению к тому циклу событий, который был предоставлен нам нашим приспособлением. Данная проверка завершается по этой причине неудачей; на самом деле данный цикл из get_event_loop() даже никогда не будет запущен.

Наше решение этой дилеммы состоит в добавлении некоторого параметра loop в явном виде в нашу функцию сопрограмму f. Затем вы всегда можете удостовериться что был использован правильный экземпляр цикла. На практике, однако, это может вызывать страшную головную боль, так как большие базовые коды потребуют гигантского числа функций для передачи по имеющемуся циклу.

Вот некий лучший способ: когда в игре некий новый цикл, установите этот цикл чтобы он был "тем самым" циклом связанным с нашим текущим потоком, что означает что всякий вызов к get_event_loop() возвратит совершенно новый образец. Это работает очень хорошо для тестирования. Всё что вам требуется сделать, так это слегка изменить наше приспособление pytest:

 

Пример 3-30. Более удобное приспособление для pytest


# conftest.py
import pytest

@pytest.fixture(scope='function')
def loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)(1)
    try:
        yield loop
    finally:
        loop.close()
 	   
  • (1) Это та самая магическая строка: после вызова set_event_loop(), всякий последующий вызов asyncio.get_event_loop() будет возвращать тот экземпляр цикла, который создаётся в данном приспособлении, а вам не придётся в явном виде передавать такой экземпляр цикла через все вызовы вашей функции.

Мы затратили достаточно много страниц чтобы получить важное исправление одной строки, однако это было необходимо чтобы объяснить почему эта строка была так важна. Как и в предыдущих разделах, моя цель состояла не в том чтобы рассказать вам об "единственном верном пути", а вместо этого дать вам руководство в вашем понимании как вы можете включать asyncio в свой набор инструментов.