Глава 3. Прогулка по Asyncio
Содержание
- Глава 3. Прогулка по Asyncio
Asyncio предоставляет другой инструмент для параллельного программирования в Python, который обладает гораздо меньшим весом нежели потоки или многопроцессорность. В самом простом виде он выполняет это имея некий цикл событий, исполняющих некий набор задач с неким ключевым отличием в том, что каждая задача выбирает самостоятельно когда забирать управление обратно из этого цикла событий. [1].
-- Филип Джоунс, Носитель
Asyncio API в Python является сложным, так как имеет целью решать различные задачи для разных групп людей. К несчастью, чтобы помочь вам осознать
какая именно часть asyncio
важна для той группы, в которой находитесь вы,
в доступности имеется очень мало руководств.
Моя цель состоит в том, чтобы помочь вам в этом разобраться. Существует две основные целевые аудитории для таких асинхронных свойств в Python:
-
Разработчики оконечных рабочих мест
Эта группа может желать разрабатывать приложения с помощью
asyncio
. Я собираюсь предполагать что вы относитесь к этой категории. -
Разработчики инструментальных средств
Хотят делать некие инфраструктуры и библиотеки, которые разработчики для оконечных пользователей могут применять в своих приложениях.
Основная часть неразберихи относительно asyncio
в имеющемся сообществе в наши дли проистекает и- за путаницы
между этими двумя целями. Например, та документация для asyncio
, которая представлена в официальной документации
Python более подходит для разработчиков инструментальных средств, а не для конечных потребителей. Это означает, что читающие эту документацию разработчики
для конечных пользователей быстро оказываются контуженными кажущейся сложностью. Вам придётся что- то предпринимать прежде чем вы сможете что- либо сделать
с этим.
Я рассчитываю в этой книге помочь вам отделить в вашем сознании те свойства asyncio
Совет | |
---|---|
Если вам интересны подробности нижнего уровня, относящиеся к тому как создавать параллельно работающие инструментальные средства со
встройкой вовнутрь чего- то побобного |
Моя цель состоит в в предоставлении вам только самого основополагающего понимания строительных блоков Asyncio; достаточного чтобы вы были способны писать простые программы с его помощью и, несомненно, достаточными чтобы вы могли погружаться в полные руководства. [Когда это становится необходимым! На момент написания книги, единственным справочным руководством по Asyncio была спецификация API в официальной документации Python, а также набор блог постов, большая часть которых имеет ссылки в данной книге.]
Прежде всего, у нас имеется раздел "Быстрое начало", которое служит цели представления наиболее важных строительных блоков для
приложений asyncio
.
Чтобы применять Asyncio [на повседневной основе], вам необходимо знать о семи функциях.
-- Юрий Селиванов, автор PEP 492
Открывать официальную документацию Asyncio достаточно страшно. Имеется множество разделов с новыми, загадочными словами, а также понятия,
с которыми не знакомы даже искушённые программисты Python, поскольку asyncio
является очень новой
вещью. Мы собираемся поломать всё это и объяснить какой подход следует применять к документации asyncio
в
дальнейшем, но на текущий момент вам следует знать, что реальная площадь поверхности, о которой вам придётся беспокоиться в имеющейся библиотеке
asyncio
намного меньше чем может показаться.
Юрий Селиванов, автор PEP 492 и основной разработчик асинхронного
Python, объяснил в своём выступлении async/await в Python 3.5 и
почему это восхитительно, представленном на PyCon 2016, относительно того, что большая часть API в модуле
asyncio
на самом деле предназначается для разработчиков инструментальных
средств, а не для разработчиков конечных приложений. В э том выступлении он выделил те основные функции, о которых следует беспокоиться
конечным пользователям, а эти функции всего лишь составляют небольшое подмножество всего API asyncio
.
В данном разделе мы собираемся взглянуть на эти составляющие ядро функции и рассмотреть как вдарить по почве выполнения в цикле с помощью основанного на событиях программирования в Python.
Вам потребуются базовые знания сопрограмм (представленные в разделе, идущим вслед за этим), но помимо этого, если вы хотите иметь возможность
использовать обсуждаемую библиотеку asyncio
в качестве разработчика конечного приложения (а не как проектировщика
инструментальных средств), вот моменты, которые вам необходимо знать в крошечном примере:
Пример 3-1. "Hello World" для Asyncio
# quickstart.py
import time
import asyncio
async def main():
print(f'{time.ctime()} Hello!')
await asyncio.sleep(1.0)
print(f'{time.ctime()} Goodbye!')
loop.stop()(5)
loop = asyncio.get_event_loop()(1)
loop.create_task(main())(2)
loop.run_forever()(3)
pending = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*pending, return_exceptions=True)(4)
loop.run_until_complete(group)(3)
loop.close()(5)
Вывод:
$ python quickstart.py
Sun Sep 17 14:17:37 2017 Hello!
Sun Sep 17 14:17:38 2017 Goodbye!
-
(1)
loop = asyncio.get_event_loop()
Вам необходим некий экземпляр цикла прежде чем вы можете запустить какие бы то ни было сопрограммы, а это именно он и есть. На самом деле, где бы вы его не вызывали,
get_event_loop()
предоставит вам всякий раз тот же самый экземплярloop
, раз уж вы используете только один единственный поток. [APIasyncio
позволяет вам делать много безумных вещей с несколькими экземплярами и потоками циклов, но данная книга не является самой подходящей книгой для данной цели. 99% общего времени вы будете использовать некий единичный, основной поток для своей прикладной программы, как это показано тут.] -
(2)
task = loop.create_task(coro)
В приведённом выше коде
loop.create_task(main())
является неким особенным вызовом. Ваша функция сопрограммы не будет исполняться пока вы не сделает его. Мы говорим, чтоcreate_task()
планирует исполнение вашей сопрограммы в данном цикле. Возвращаемый объектtask
может применяться для отслеживания текущего состояния данной задачи, например, исполняется ди она ещё или уже выполнена, а помимо этого может также применяться для получения некоторого результирующего значения от вашей завершённой сопрограммы. Вы также можете завершать данную задачу при помощиtask.cancel()
. [с другой стороны: вы можете заметить в этой строке кода что основным параметром в таком вызове функции дляcreate_task()
являетсяcoro
. Именно это соглашение используется в большей части документации API, которую вы обнаружите, и он ссылается на некую coroutine; т.е., строго говоря, собственно результат вызова некоторой функцииasync def
, а не саму эту функцию.] -
(3)
loop.run_until_complete(coro)
иloop.run_forever()
Это два способа запуска данного цикла. Они оба заблокируют данный текущий поток, которым обычно является самый основной поток {main}. Отметим, что
run_until_complete()
сохранит данный цикл исполняемым до тех пор, пока не завершится заданныйcoro
- но все прочие запланированные задачи в этом цикле также будут исполняться при выполнении данного цикла. -
(4)
group = asyncio.gather(task1, task2, task3)
Типичная манера для большинства программ будет состоять в том, чтобы начаться с
loop.run_forever()
для самой "главной" части ваше программы {main} , а затем, когда получен некий сигнал процесса, остановить данный цикл, собрать всё ещё приостановленные задачи, а затем воспользоватьсяloop.run_until_complete()
до тех пор, пока эти задачи не выполнятся. именно этот метод служит для выполнения сбора. В более общем плане он также может применяться для сбора множества сопрограмм воедино и ожидания (при помощиawait
!) пока все собранные задачи не завершатся. -
(5)
loop.stop()
иloop.close()
Как уже описано выше, они применяются для постепенного останова некоторой программы.
stop()
обычно вызывается как следствие получения некоторого сигнала выключения, аclose()
обычно является самым последним действием: и оно очистит все очереди и остановит Исполнителя {Executor}. "Остановленный" цикл может быть запущен вновь, а "закрытый" цикл исчезает навсегда.
Предостережение | |
---|---|
Предыдущий пример является слишком упрощённым чтобы применяться в практических настройках. Требуется дополнительная информация относительно
обработки правильного останова. Основной целью данного примера было просто представить большую часть важнейших функций и методов из
|
asyncio
в Python представляет множество базовых механизмов вокруг основного цикла событий - и требует чтобы вы
были осведомлены о таких вещах, как сам цикл событий и управление им на протяжении его времени жизни. Именно это является отличием, например, от
Node.js, который также содержит некий цикл событий, но сохраняет его где- то вовне в скрытом виде. Однако, как только вы начнёте понемногу работать с
asyncio
, вы начнёте замечать, что ваш шаблон запуска и останова основного цикла событий не отходит
страшно далеко от приведённого выше кода. А в оставшейся части данной книги мы рассмотрим более подробно некоторые нюансы относительно времени жизни
цикла.
В приведённом выше примере я кое- что упустил. Самый последний элемент базовой функциональности, о котором вам следует знать, состоит в том как
запускать блокирующие функции. Основной момент относительно кооперативной
многозадачности состоит в том, что вам требуется все связанные с вводом/ выводом функции..., ну да, кооперировать, а это означает допуск
переключения некоторого контекста обратно в данный цикл при помощи особого ключевого слова await
. Большая часть
имеющегося кода Python, доступного на текущий момент в диком виде, не делает этого, а вместо этого полагается на вас для запуска таких функций в
потоках. До тех пор, пока не будет более широко распространённой поддержки функций async def
, вы обнаружите,
что применение таких библиотек с блокировкой неизбежно.
Для этого asyncio
предоставляет некий API, который очень похож на API из пакета
concurrent.futures
. Данный пакет предоставляет некий ThreadPoolExecutor
и какой- тоProcessPoolExecutor
. По умолчанию он основан на потоке, но запросто заменяется на базирование на
процессе. Я опустил это в своём предыдущем примере, так как это скрыло бы то описание, как подгоняются друг к другу все фундаментальные части. Теперь,
когда они были обсуждены, мы можем взглянуть непосредственно на исполнителя.
Есть несколько причуд, о которых следует знать. Давайте рассмотрим пример кода:
Пример 3-2. Основной интерфейс исполнителя
# quickstart_exe.py
import time
import asyncio
async def main():
print(f'{time.ctime()} Hello!')
await asyncio.sleep(1.0)
print(f'{time.ctime()} Goodbye!')
loop.stop()
def blocking():(1)
time.sleep(0.5)(2)
print(f"{time.ctime()} Hello from a thread!")
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_in_executor(None, blocking)(3)
loop.run_forever()
pending = asyncio.Task.all_tasks(loop=loop)(4)
group = asyncio.gather(*pending)
loop.run_until_complete(group)
loop.close()
Вывод:
$ python quickstart_exe.py
Sun Sep 17 14:17:38 2017 Hello!
Sun Sep 17 14:17:38 2017 Hello from a thread!
Sun Sep 17 14:17:39 2017 Goodbye!
-
(1)
blocking()
обычно вызывает внутри себяtime.sleep()
, что привело бы к блокировке вашего основного потока и предотвращало бы исполнение вашего цикла событий. Это означает, что вам не следует делать такую функцию сопрограммой, но что ещё более серьёзно, вы не можете вызывать эту функцию из любого места в своём основном потоке, который является именно тем местом, в котором исполняется основной циклasyncio
. Мы решаем данную проблему запуская эту функцию в некотором исполнителе (executor). -
(2) Безотносительно к данному разделу, но нечто, что следует держать на уме далее в этой книге: отметим, что значение времени сна с блокировкой (0.5 секунды) короче чем значение засыпания без блокировки (1 секунда) в вашей сопрограмме
main()
. Это делает данный код примера изящным и опрятным. В своём следующем разделе мы объясним что бы произошло если бы функции исполнителя переживали бы свои асинхронные эквиваленты во время последовательности выключения. -
(3)
await loop.run_in_executor(None, func)
Это самый последний момент в нашем списке существенных функций
asyncio
, о которых следует знать. Иногда вам требуется запускать вещи в некотором отдельном потоке, или даже в отдельном процессе: данный метод применяется именно для этого. Здесь мы передаём свою блокирующую функцию для запуска в определённом по умолчанию исполнителе. [к сожалению, самым первым параметромrun_in_executor()
является сам экземплярExecutor
, который следует применять и вы обязаны передатьNone
чтобы использовать установленный по умолчанию. Всякий раз, как я применяю это, я ощущаю как мой параметр "исполнитель" плачет, будучи вынужденным именоваться ключевым словом аргумента с определённым по умолчанию значениемNone
.]Отметим, что
loop.run_in_executor()
возвращает некоеFuture
, что означает, что вы можетеawait
{подождать} его при вызове в рамках функции другой сопрограммы. -
(4) Продолжаем отмечать вслед за элементом 2: наш набор задач в
pending
не содержит некоего элемента для того вызоваblocking()
, который делается вrun_in_executor()
. Это будет справедливо для любого вызова, который возвращает некоеFuture
вместо какой- тоTask
. Имеющаяся документация достаточно хорошо определяет возвращаемые типы, поэтому об этом несложно знать; всего лишь просто запомните, чтоall_tasks()
на самом деле возвращает толькоTask
, и никакихFuture
.
Теперь, когда вы ознакомились с наиболее существенными частями asyncio
для потребностей
разработки конечных приложений, наступило время объяснять нашу сферу и выстроить перечисленные API
asyncio
в некий вид иерархии. Это облегчит усвоение и понимание того как получать то что вам требуется из
имеющейся документации, но не более того.
Как мы уже отметили в Быстром начале, существует лишь небольшое число команд, которые вам следует
знать чтобы быть способным применять asyncio
в качестве разработчика
конечных приложений. К несчастью, имеющаяся документация для asyncio
предоставляет гигантское число
различных API, причём они представлены в очень "однообразном" формате, из которого очень трудно определить какие вещи предназначаются для
общего применения, а какие стороны предоставляются для разработчиков инструментальных средств.
Когда разработчики инструментальных средств просматривают ту же самую документацию, они ищут точки входа,
к которым они могут подключать свои новые инструментальные средства (или новые сторонние библиотеки). В данном разделе мы взглянем на
asyncio
глазами какого- то разработчика инструментальных средств чтобы дать представление как они могут
подходить к построению новой обладающей асинхронностью библиотекой. Надеемся, это поможет нам ещё чётче очертить те функции, о которых вам стоит
заботиться в вашей собственной работе.
С этой точки зрения более полезно представлять себе модуль asyncio
как отранжированный в виде некоей
иерархии, вместо некоего плоского листа, причём всякий последующий уровень строится поверх спецификации
предыдущего. К сожалению, это не совсем так, и я взял на себя некие вольности при выравнивании их в
Таблице 3-1, но, надеюсь, это снабдит вас неким альтернативным представлением
имеющегося API asyncio
.
Предостережение | |
---|---|
Таблица 3-1, а также представляемая здесь названия и нумерация "Уровней"
являются исключительно моими собственным изобретением, которое имеет целью добавить некую небольшую структуру в помощь пояснению API
|
Уровень | Понятие | Htfkbpfwbz |
---|---|---|
Уровень 9 |
Сетевой: потоки |
|
Уровень 8 |
Сетевой: TCP & UDP |
|
Уровень 7 |
Сетевой: транспорт |
|
Уровень 6 |
Инструменты |
|
Уровень 5 |
подпроцессы и потоки |
|
Уровень 4 |
задачи |
|
Уровень 3 |
свойства |
|
Уровень 2 |
цикл событий |
|
Уровень 1 (основной) |
сопрограммы |
|
На самом основополагающем уровне, Уровне 1, у нас имеются сопрограммы (coroutines), которые вы уже видели ранее в этой книге. Это самый
нижний уровень, с которого следует начинать размышлять о разработке стороннего инструментального средства, и, что удивительно, он стал чем- то
популярным не у какого- то одного, а у двух таких из доступных на сегодняшний день в мире асинхронных
инфраструктур: Curio и
Trio. Они обе полагаются только
на естественные сопрограммы в Python и больше ни на что иное из библиотечного модуля asyncio
.
Нашим следующим уровнем является собственно цикл событий. Сопрограммы бесполезны сами по себе: они ничего не могут делать без некоего цикла, в котором
они запускаются (более того, с необходимостью, Curio и Trio реализуют свои собственные циклы событий). asyncio
предоставляет как некое определение цикла, Abstract EventLoop
,
так и какую- то реализацию, BaseEventLoop
.
Чёткое отделение описания от реализации делает возможным для сторонних разработчиков выполнять альтернативные реализации своих циклов
событий и это уже произошло в проекте uvloop, который предоставляет
намного более быструю реализацию цикла чем та, что имеется в стандартном библиотечном модуле asyncio
.
Что важно: uvloop просто "встраивается" в имеющуюся иерархию и подменяет только саму часть цикла
в общем стеке. Такая возможность выполнять подобные вещи по выбору именно то, для чего разрабатывался API asyncio
,
причём с ясным разделением между всеми различными подвижными частями.
Уровни 3 и 4 привносят нам свойства и задачи, которые очень тесно взаимосвязаны; они разделяются только по той причине, что
Task
является неким подклассом Future
, но их запросто можно рассматривать
как находящиеся на одном и том же уровне. Некий экземпляр Future
представляет какой- то вид происходящего
действия, которое возвратит некий результат посредством уведомления в свой цикл событий, в то время как
Task
представляет некую сопрограмму, исполняемую в этом цикле событий.
Краткая история такова: некое свойство является "осведомлённым о цикле", в то время как конкретная задача
одновременно "осведомлена о цикле" и
"знает о сопрограмме". Выступая в роли разработчика конечного продукта, вы будете преимущественно работать с задачами, нежели со свойствами,
однако для некоторого разработчика инструментальных средств данная пропорция может быть иной в зависимости от имеющихся подробностей.
Уровень 5 предоставляет возможности для запуска, а также снабжения await
работ, которые должны исполняться
либо в отдельном потоке, либо даже в отдельном процессе.
Уровень 6 представляет дополнительные осведомлённые об асинхронности инструмента, такие как asyncio.Queue
.
Нам можно было разместить этот уровень после всех сетевых уровне, но я полагаю, что более чётко будет поместить все относящиеся к осведомлённым о
сопрограммах API вначале, прежде чем мы взглянем на имеющиеся уровни ввода/ вывода. Те Queue
, которые
производятся asyncio
очень похожи на API имеющихся Queue
безопасных
потоков в модуле queue
, за исключением то8о, что наша версия asyncio
требует наличия ключевого слова await
для get()
и
put()
. Вы не можете использовать queue.Queue
непосредственно внутри
сопрограмм, поскольку его get()
заблокируют ваш основной поток {main}.
Наконец, у нас имеются уровни сетевого ввода/ вывода с 7 по 9. Для вас, как разработчика конечного продукта, большая часть подходящего вам для работы API это API "потоков" на уровне 9. Я представил этот API потоков на самом верхнем уровне абстракции в нашей башне. Непосредственно под ним расположен уровень "протоколов" (Уровень 8), является более тонко настраиваемым API по сравнению с "потоками"; вы можете применять "протоколы" во всех экземплярах, в которых вы можете применять уровень "потоков", но "потоки" проще. Наконец, вам скорее всего никогда не придётся работать непосредственно с транспортным уровнем (Уровень 7), если только вы не создаёте некое инструментальное средство для других, чтобы применять его и вам необходимо персонализировать настройку имеющихся обменов {Прим. пер.: например, вы пожелаете предоставить потоки на основе протокола, применяющего обмен RDMA для увеличения пропускной способности и снижения латентности, см. например, Рекомендации по разработке высокопроизводительных систем RDMA и Ceph поверх RDMA.}
D Быстром начале мы рассмотрели абсолютно необходимый минимум, который следует знать чтобы начать
работу с библиотекой asyncio
. Теперь, после того как мы взглянули на собранную воедино библиотеку API
asyncio
в целом, я бы хотел повторно вернуться к короткому перечню свойств и повторно выделить те части,
которые вам скорее всего следует изучить.
Для написания сетевых приложений вам следует сосредоточиться на изучении следующих наиболее важных уровней для применения библиотечного модуля
asyncio
;
-
Уровень 1: существенным является понимание того как писать функции
async def
и применятьawait
для вызова и исполнения прочих сопрограмм. -
Уровень 2: важным является знакомство с тем как выполнять запуск, останов и взаимодействовать с имеющимся циклом событий.
-
Уровень 5: для применения кода с блокировкой в вашем асинхронном приложении требуются исполнители, а к тому же текущая реальность состоит в том, что большинство сторонних библиотек пока ещё не совместимы с
asyncio
. Хорошим примером этого является ORM {Object Relational Mapper} библиотека базы данных SQLAlchemy, у которой в настоящее время нет никакой альтернативы дляasyncio
. {Прим. пер.: строго говоря, это не совсем так, см gino.} -
Уровень 6: если вам требуется кормить данными одну или более длительно исполняемыми сопрограммами, самым лучшим способом для этого будет применение
asyncio.Queue
. Это в точности та же самая стратегия, которая применяется вqueue.Queue
для распределении данных между потоками. Версияasyncio
Queue
применяет те же самые API что и стандартная библиотека модуляqueue
, но при этом применяет сопрограммы вместо блокирующих методов, подобныхget()
. -
Уровень 9: API Потоков (Streams) предоставляет вам самый простой способ обработки взаимодействия с сокетом поверх сетевой среды, а также именно тем местом, с которого вам следует начинать создание прототипов идей для сетевых приложений. Вы можете обнаружить, что вам требуется более тонкое управление, и тогда вы имеете возможность переключиться на API Протоколов (Protocols), однако - для большинства проектов - обычно лучше делать вещи более простыми пока вы в точности не поймёте какую именно проблему вы пытаетесь решить.
Конечно, если вы применяете некую совместимую с asyncio
библиотеку стороннего разработчика, которая
обрабатывает всё взаимодействие с нужным вам сокетом, скажем, aiohttp
, вам вовсе не требуется напрямую работать с
asyncio
. В этом случае вам следует в основном полагаться на предоставляемую этими библиотеками документацию.
Это подводит нас к завершению данного раздела. Библиотека asyncio
пытается предоставлять достаточную
функциональность как для разработчиков конечных приложений, так и для разработчиков инструментальных средств. К сожалению, это означает, что
весь API asyncio
может показаться слишком размашистым. Я надеюсь, что данный раздел смог снабдить вас сырой
"дорожной картой" чтобы помочь вам уцепить только те части, которые вам требуются.
В наших последующих частях мы собираемся рассмотреть необходимые вам составляющие части из моего приведённого выше короткого списка более подробно.
Совет | |
---|---|
Сайт pythonsheets предоставляет некое более глубокое резюме
(или "шпаргалку", если хотите) крупных фрагментов имеющегося API |
Давайте начнём с самого начала: что такое сопрограмма?
Я собираюсь позволить вам быстро заглянуть под капот и увидеть некоторые части того механизма, который вы обычно не видите, даже когда применяете
на протяжении повседневного программирования. Все приводимые далее примеры могут быть целиком воспроизведены в обычном интерпретаторе Python в
интерактивном режиме и я призываю вас поработать с ними самостоятельно набирая их вручную, отслеживая их вывод и, возможно, экспериментируя
различными путями с async
и await
.
Предостережение | |
---|---|
|
Давайте начнём с самой простой из возможных вещей:
Пример 3-3. Самый первый сюрприз
>>> async def f():(1)
... return 123
...
>>> type(f)(2)
<class 'function'>
>>> import inspect(3)
>>> inspect.iscoroutinefunction(f)(4)
True
-
(1)
loop = asyncio.get_event_loop()
Это самый простой способ объявления некоторой сопрограммы: она выглядит как какая- то обычная функция, за исключением того что начинается с определённых ключевых слов
async def
. -
(2) Сюрприз! В точности тип
f
не являетсяcoroutine
, а всего лишь некая ординарная функция. Это является обычной практикой ссылаться на такие функции "async def" функции как на "сопрограммы", даже хотя - строго говоря - они рассматриваются Python как являющиеся функциями сопрограмм. Такое поведение в точности идентично тому, как функции генератора работают в Python:>>> def g(): ... yield 123 ... >>> type(g) <class 'function'> >>> gen = g() >>> type(gen) <class 'generator'>
Даже хотя
g
порой неверно именуется как "генератор", она остаётся некоей функцией и возвращается собственно генератор только когда эта функция вычисляется (evaluated). Функции сопрограмм работают в точности так же: вам требуется вызвать эту функциюasync def
для получения необходимой сопрограммы. -
(3) Наш модуль
inspect
из имеющейся стандартной библиотеки может предоставить намного лучшую ретроспективу возможностей нежели использованная встроенная функцияtype()
. -
(4) Имеется некая функция
iscoroutinefunction()
, которая позволяет вам делать отличие некоей обычной функции от функции сопрограммы.
Возвращаясь к нашей функции async def f()
, что же происходит когда мы её вызываем?
Пример 3-4. Некая функция async def
возвращает... сопрограмму!
>>> coro = f()
>>> type(coro)
<class 'coroutine'>
>>> inspect.iscoroutine(coro)
True
Итак, теперь вопрос звучит следующим образом: что же такое в точности "сопрограмма"? Сопрограммы очень походи на генераторы.
В самом деле, прежде чем с помощью ключевых слов async def
и await
в Python 3.5 были введены натуральные сопрограммы, они уже были доступны к применению в библиотеке Asyncio
Python 3.4 с использованием обычных генераторов при особых декораторах.
[Более того, именно таким образом библиотеки открытого исходного кода, такие как Twisted и Tornado выставляли поддержку асинхронности в недалёком
прошлом.] Не является неожиданным, что наши новые функции async def
(и те сопрограммы, которые они возвращают)
ведут себя аналогично генераторам.
Мы можем ещё немного поиграть с сопрограммами, чтобы посмотреть как Python их применяет. Что ещё более важно, мы желаем увидеть как Python способен
"переключать " исполнение между сопрограммами. Давайте вначале взглянем на то как может быть получено само значение
return
.
Когда некая сопрограммы выполняет возврат, то что происходит на самом деле, так это то, что возбуждается
исключительная ситуация StopIteration
. Вот следующий пример, который продолжает тот же самый сеанс, что и наши
предыдущие примеры и поясняет это:
>>> async def f():
... return 123
>>> coro = f()
>>> try:
... coro.send(None)(1)
... except StopIteration as e:
... print('The answer was:', e.value)(2)
...
The answer was: 123
-
(1) Некая сопрограмма инициализацируется "отправкой" ей
None
. Рассматривая изнутри, именно это собирается делать имеющийся цикл событий с вашими бесценными сопрограммами. Вам никогда не придётся делать это вручную. Все создаваемые вами сопрограммы будут исполняться либо сloop.create_task(coro)
, или при помощиawait coro
. И именно этотloop
, который делает данный.send(None)
находится за сценой. -
(2) Когда ваша сопрограмма выполняет возврат, возбуждается некий особый вид прерывания, именуемый
StopIteration
. Отметим, что мы можем получить доступ к самому возвращаемому значению данной сопрограммы через значение атрибута value самой этой исключительной ситуации. Опять же, вам нет нужды знать что это работает именно так: с вашей точки зрения функцииasync def
просто возвращают некое значение при помощи соответствующего оператораreturn
, в точности как самая обычная функция.
Эти два момента, т.е. send()
и StopIteration
определяют соответственные
начало и окончание самого исполнения сопрограммы. До сих пор это выглядело просто как в действительности запутанный способ запуска некоей
функции, но всё хорошо: наш цикл событий будет отвечать за продвижение сопрограмм по этим внутренним
особенностям нижнего уровня. Со своей точки зрения вы просто планируете сопрограммы к исполнению в имеющемся цикле, а они исполняются сверху вниз
почти как обычные функции.
[7]
Нашим следующим шагом будет рассмотреть как само исполнение такой сопрограммы может быть подвешено.
Данное новое ключевое слово, await
, всегда берёт некий параметр и будет принимать
единственную вещь, называемую ожидаемой
(awaitable), которая определяется как одна из следующих (исключительно!):
-
Некую сопрограмму (т.е. конкретный результат какой- то вызываемой функции async def [Также допустимо некое наследование, сопрограмма на основе генератора, которая является некоторой функцией генератора, которая декорирована
@types.coroutine
и применяет внутри себя ключевое словоyield from
для подвешивания. Мы собираемся в данной книге полностью игнорировать наследуемые сопрограммы. Забудьте про них!] -
Любой объект, реализующий особый метод __await__(). Этот специализированный метод обязан возвращать некий итератор.
Второй из доступных случаев ожиданий выходит за рамки данной книги (он вам никогда не понадобится в повседневном программировании
asyncio
), тем не менее, самый первый пример достаточно очевиден:
Пример 3-5. Применение await
в какой- то сопрограмме
async def f():
await asyncio.sleep(1.0)
return 123
async def main():
result = await f()(1)
return result
-
(1) Вызов
f
производит некую сопрограмму; это означает, что мы позволяем ей выполнятьawait
. Получаемое значение такой переменнойresult
будет равняться 123 после завершенияf()
.
Прежде чем мы завершим данный раздел и приступим к нашему циклу событий, полезно взглянуть на то, как могут подаваться исключительные ситуации,
которые наиболее часто применяются для завершения: когда вы вызываете task.cancel()
, ваш цикл событий
внутренне применит coro.throw()
для возбуждения asyncio.CancelledError
внутри вашей сопрограммы:
Пример 3-6. Применение coro.throw()
для инъекции исключений в некую сопрограмму
>>> coro = f()(1)
>>> coro.send(None)
>>> coro.throw(Exception, 'blah')(2)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in f
Exception: blah
blah
Метод throw()
применяется (внутренним образом в asyncio
) для
прекращения задачи, что мы также достаточно просто можем продемонстрировать. Мы даже собираемся забежать
вперёд и обработать данное завершение внутри некоторой новой сопрограммы, которая и обработает это:
Пример 3-7. Прекращение сопрограммы при помощи CancelledError
>>> import asyncio
>>> async def f():
... try:
... while True: await asyncio.sleep(0)
... except asyncio.CancelledError:(1)
... print('I was cancelled!')(2)
... else:
... return 111
>>> coro = f()
>>> coro.send(None)
>>> coro.send(None)
>>> coro.throw(asyncio.CancelledError)(3)
I was cancelled!(4)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration5)
-
(1) Наша функция сопрограммы теперь обрабатывает некую исключительную ситуацию: фактически, это особый тип исключений, применяемый на протяжении всей библиотеки
asyncio
для прекращения задачи:asyncio.CancelledError
. Заметим, что данная исключительная ситуация внедряется в сопрограмму извне, т.е. самим циклом событий, который мы всё ещё пока имитируем вручную при помощи командsend()
иthrow()
. В реальном коде, который мы увидим позднее, при прекращении задач,CancelledError
возбуждается изнутри самой обёртывающей задачу сопрограммы, как это показано выше. -
(2) Некое простое сообщение чтобы сказать что мы выполнили прекращение. Отметим, что в результате обработки данной исключительной ситуации она более не распространяется и наша сопрограмма выполнит
return
. -
(3) Здесь мы выполняем
throw()
своей исключительной ситуацииCancelledError
. -
(4) Как и ожидалось, мы видим выведенным наше завершающее сообщение.
-
(5) А наша сопрограмма выходит обычным способом. (Повторный вызов такой исключительной ситуации
StopIteration
является обычным способом, при помощи которого сопрограмма выполняет выход.
Просто чтобы указать, что прекращение задачи не является ничем иным как обычным возбуждением исключительной ситуации (и её обработкой), вот некий пример, в котором поглощаем прекращение и переходим к некоторой иной сопрограмме:
>>> async def f():
... try:
... while True: await asyncio.sleep(0)
... except asyncio.CancelledError:
... print('Nope!')
... while True: await asyncio.sleep(0)(6)
... else:
... return 111
>>> coro = f()
>>> coro.send(None)
>>> coro.throw(asyncio.CancelledError)(7)
Nope!
>>> coro.send(None)(8)
-
(6) Вместо того чтобы печатать некое сообщение, что произойдёт если после прекращения мы просто вернёмся обратно подождать иное ожидание?
-
(7) Неудивительно, наша внешняя сопрограмма продолжает существовать и она немедленно подвешивается вновь внутри данной новой сопрограммы.
-
(8) Всё выполняется должным образом и наша сопрограмма продолжает быть вывешенной и ожидаемо возобновляется.
Конечно, это должно сопровождаться словами, что вам никогда не следует делать этого в действительности! Если ваша сопрограмма получает некий завершающий сигнал, это очевидное указание выполнения только необходимой очистки и выхода. Не игнорируйте это просто так.
На данный момент мы, естественно, очень устали выступать в роли некоего цикла событий вручную, выполняя все
вызовы .send(None)
, поэтому вместо этого мы привнесём соответствующий цикл, предоставляемый
asyncio
и очистим предыдущий пример надлежащим образом:
В своём предыдущем разделе мы показали как методы send()
и throw()
могут взаимодействовать с некоторой сопрограммой, но это было сделано только для понимания того как структурированы сами по себе сопрограммы.
Именно цикл событий в asyncio
обрабатывает все переключения между сопрограммами, а также перехватывает
необходимые исключительные ситуации StopIteration
, и многое другое, например, выполняет ожидание в сокетах и
файловых дескрипторах на предмет событий.
Вы можете получить некий цикл событий посредством вызова get_event_loop()
, которая является весьма
интересной функцией:
Пример 3-8. Получение при каждом обращении одного и того же цикла событий
>>> loop = asyncio.get_event_loop()
>>> loop2 = asyncio.get_event_loop()
>>> loop is loop2(1)
True
-
(1) Оба идентификатора, и
loop
, иloop2
, ссылаются на один и тот же экземпляр.
Это означает, что если вы внутри некоторой функции сопрограммы, а у вас имеется потребность в доступе к своему экземпляру цикла, будет достаточно вызвать
get_event_loop()
для его получения. Вам не требуется передавать какой- то
параметр loop
в явном виде во все свои функции. Эта ситуация меняется коренным образом если вы разработчик
инструментального средства: раз так, было бы лучше спроектировать ваши функции на приём некоторого параметра
loop
, просто на тот случай, когда ваши пользователи делают нечто необычное с
политиками цикла событий. Политики выходят за рамки
данной книги, и мы больше не будем обсуждать их.
Совет | |
---|---|
Метод |
Давайте исследуем некий пример: рассмотрим некую функцию сопрограммы, внутри которой создаются некие дополнительные задачи, причём без ожидания:
async def f():
# Создадим какие- то задачи!
loop = asyncio.get_event_loop()
for i in range():
loop.create_task(<some other coro>)
В этом примере замысел состоит в запуске полностью новых задач внутри данной сопрограммы. Без выполнения их ожидания, они запускаются независимо от
своего контекста внутри функции сопрограммы f()
. На самом деле, f()
осуществит выход до того как завершатся запускаемые ею задачи.
Для ситуации, подобной приводимой, вам порой следует посмотреть код, в котором переменная значения цикла передаётся в качестве какого- то параметра,
просто чтобы сделать её доступной для возможности вызова соответствующего метода create_task()
.
Определённой альтернативой является применение неудачно именованного asyncio.ensure_future()
, который делает всё то
же самое что и create_task()
, но не требует некоторой переменной локального цикла:
async def f():
# Создадим какие- то задачи!
for i in range():
asyncio.ensure_future(<some other coro>)
Если вы пожелаете, вы можете создать свою собственную вспомогательную функцию, которая также не требует некоего параметра
loop
, но имеет лучшее название:
def create_task(coro):
return asyncio.get_event_loop().create_task(coro)
async def f():
# Создадим какие- то задачи!
for i in range():
create_task(<some other coro>)
Единственная разница между loop.create_task()
и asyncio.ensure_future()
состоит в заголовке и введении в заблуждение вновь прибывших. Мы поясним эти различия в нашем следующем разделе.
В своём предыдущем разделе мы обсудили сопрограммы и как они должны запускаться в некотором цикле чтобы они стали полезными. Здесь мы кратко
поговорим об API Task
и Future
. То, с чем вы будете работать наиболее
часто, это Task
, поскольку большинство вашей работы будет содержать выполнение сопрограмм при помощи метода
loop.create_task()
, точно так же, как это было установлено ранее в разделе
Быстрое начало. Класс Future
на самом деле является
суперклассом Task
и предоставляет всю функциональность для работы с основным циклом.
Проще всего это воображать себе так: Future
(Фьючерс) представляет состояние завершения некоторого действия
и управляется самим циклом, в то время как Task
в точности то же самое и
то, где специфическим "действием" является некая сопрограмма; возможно, одна из созданных вами при помощи
async def
функция плюс loop.create_task()
.
Класс Future
представляет некое состояние чего- то, что взаимодействует
с циклом. Это описание слишком расплывчатое чтобы быть полезным, поэтому представляйте себе вместо этого некий экземпляр
Future
как такое: это именно то, что выступает переключателем для состояния завершения. Когда некий экземпляр
Future
создан, такой переключатель "ещё пока не завершён"; однако через некоторое время спустя он
завершится. На самом деле некий экземпляр Future
имеет метод с названием
done()
:
>>> from asyncio import Future
>>> f = Future()
>>> f.done()
False
Некий экземпляр Future
также может:
-
иметь некий набор значений "результата" (
.set_result(value)
и.result()
для его получения) -
завершаться с помощью
.cancel()
(и выполнять проверку завершения посредством.cancelled()
) -
иметь дополнительные функции обратного вызова, добавляющих то, что будет запускаться по завершению данного фьючерса.
Даже несмотря на то, что Tasks
более распространены, вы не можете полностью
избежать Future
: например, запуск некоей функции в каком- то исполнителе
возвратит экземпляр Future
, а не Tasks
.
{Прим. пер.: как определяет словарь, фьючерс- программная конструкция, указывающая на то, что результат некоего действия
будет использоваться в программе позже, но само вычисление может планироваться системой в любой произвольный момент времени.}
Давайте быстро взглянем на код примера чтобы получить представление о том, как работать с неким экземпляром
Future
напрямую:
>>> import asyncio
>>> async def main(f: asyncio.Future):(1)
... await asyncio.sleep(1)
... f.set_result('I have finished.')(2)
>>> loop = asyncio.get_event_loop()
>>> fut = asyncio.Future()(3)
>>> print(fut.done())(4)
False
>>> loop.create_task(main(fut))(5)
<Task pending coro=<main() running at <ast>:4>>
>>> loop.run_until_complete(fut)(6)
'I have finished.'
>>> print(fut.done())
True
>>> print(fut.result())(7)
I have finished.
-
(1) Создаём некий образей основной функции. Это даёт нам нечто для запуска, немного выжидаем, а затем устанавливаем какой- то результат в этом
Future, f
. -
(2) Устанавливаем необходимый результат.
-
(3) Вручную создаём некий экземпляр фьючерса. Отметим, что этот экземпляр (по умолчанию) связан с нашим
loop
, однако он не подключён, и не будет в дальнейшем подключаться к какой бы то ни было сопрограмме (то, для чего и создаются задачи). -
(4) Прежде чем что- то делать, проверяем что данный фьючерс ещё не сделан.
-
(5) Планируем необходимую сопрограмму
main()
, передаём данный фьючерс. Помните, всё что делает сопрограммаmain()
это то, что она выполняет ожидание, а затем переключает необходимый экземпляр фьючерса. (Отметим, что наша сопрограммаmain()
ещё пока не запущена: сопрограммы запускаются только после запуска их цикла.) -
(6)
run_until_complete()
Future
Task
[В этом месте в документации имеется ошибка: Подпись выдаётся какAbstractEventLoop .run_until_complete(future)
, но на самом деле должно бытьAbstractEventLoop .run_until_complete(coro_or_future)
, так как применяется то же самое правило.] Теперь, когда основной цикл запущен, наша сопрограммаmain()
начинает исполняться. -
(7)
В конце концов, когда результат фьючерса установлен, он завершается. После завершения можно осуществить доступ к полученному результату.
Конечно, маловероятно что вы будете работать с Future
напрямую, как мы показали это выше. Данный пример кода
приводится исключительно для обучения. Большая часть ваших контактов с asyncio
будет происходить
через экземпляры Task
.
Один из самых последних примеров, чтобы доказать что некая Task
на самом деле какое- то небольшое декорирование
в Future
: мы можем повторить точно тот же самый пример что и выше, но
вместо этого воспользовавшись Task
:
>>> import asyncio
>>> async def main(f: asyncio.Future):
... await asyncio.sleep(1)
... f.set_result('I have finished.')
>>> loop = asyncio.get_event_loop()
>>> fut = asyncio.Task(asyncio.sleep(1_000_000))(1)
>>> print(fut.done())
False
>>> loop.create_task(main(fut))
<Task pending coro=<main() running at <ast>:4>>
>>> loop.run_until_complete(fut)
'I have finished.'
>>> print(fut.done())
True
>>> print(fut.result())
I have finished.
-
(1) Одно единственное отличие: некий экземпляр
Task
вместоFuture
. Конечно, соответствующий APITask
требует от нас предоставления какой- то сопрограммы, поэтому мы просто применяемsleep()
, так как это подходит нам, и мы применяем некую комично большую продолжительность чтобы подчеркнуть некую потенциальную догадку: даннаяTask
завершится после установки некоторого результата, вне зависимости от того будет ли завершена или нет лежащая в основе сопрограмма.
Данный пример работает в точности как это было до сих пор и показывает что даже некая задача может быть
завершена ранее вручную путём установки в ней результата, в точности так же, как некий экземпляр Future
может осуществить завершение. И опять- таки, это не очень востребовано на практике - вместо этого вы обычно выполняете ожидание завершения задачи или
её прекращения в явном виде - но, надеюсь, это было полезно для понимания того что задачи и фьючерсы очень близки к тому чтобы быть синонимами:
единственное отличие состоит в том, что задача ассоциируется с какой- то сопрограммой.
В Быстром начале мы сказали, что верный путь для запуска сопрограмм состоит в использовании
loop.create_task(coro)
. Это возвращает нас к тому, что этого же можно достичь другой функцией уровня модуля:
asyncio.ensure_future()
.
В процессе исследований для данной книги я убедился, что данный метод asyncio.ensure_future()
несёт
ответственность за большую часть распространённого недоразумения относительно самой библиотеки asyncio
.
Основная часть имеющегося API является достаточно ясной, однако имеется пара в которых можно споткнуться при обучении, и именно это один из них.
Когда вы сталкиваетесь с ensure_future()
, ваше сознание очень усердно работает над тем как интегрировать его
в умственное построение модели того как следует применять asyncio
- и терпит неудачу!
Основная проблема с ensure_future()
лучше всего высвечивается этим имеющим теперь дурную репутацию
пояснением из документации asyncio
самого Python:
asyncio.ensure_future(coro_or_future, *, loop=None)
Планирование собственно исполнения некоторого объекта сопрограммы: его обёртывание в фьючерсе. Возвращает некий объект
Task
. Если в качестве аргумента выступает Future
, он возвращается непосредственно.
-- Документация Python 3.6 {Прим. пер.: см. изменения в Python 3.7}
Что-а!? К счастью, кот более ясное описание ensure_future()
:
-
Если вы передаёте некую сопрограмму, будет произведён некий экземпляр
Task
(и ваша сопрограмма будет запланирована на исполнение в соответствующем цикле событий). Это идентично вызовуloop.create_task(coro)
и возврату получаемого нового экземпляраTask
. {Прим. пер., Python 3.7: для проверки применяется вначалеisfuture()
, а затемiscoroutine()
и возвращается обёрнутый объектTask
в случае сопрограммы на входе или подобныйFuture
объект без изменений - см. следующий пункт! Если же объектTask
пребывает в состоянии ожидания - что проверяется при помощиinspect.isawaitable()
- возвращается некий объектTask
, который выполняет ожидание полученного в качестве аргумента объекта.} -
Если вы пробрасываете некий экземпляр
Future
(который содержит экземплярыTask
, так какTask
является субклассомFuture
), вы получаете возвращаемой ту же самую вещь, неизменной! Да, на самом деле!
Давайте подробнее рассмотрим как это работает:
import asyncio
async def f():(1)
pass
coro = f()(2)
loop = asyncio.get_event_loop()(3)
task = loop.create_task(coro)(4)
assert isinstance(task, asyncio.Task)(5)
new_task = asyncio.ensure_future(coro)(6)
assert isinstance(new_task, asyncio.Task)
mystery_meat = asyncio.ensure_future(task(7)
assert mystery_meat is task(8)
-
(1) Простая функция сопрограммы без действий. Нам просто требуется нечто чтобы создать какую- то сопрограмму.
-
(2) Здесь мы заставляем данный объект сопрограммы вызывать свою функцию напрямую. Ваш код будет редко делать такое, но я хочу сделать это явном виде (через несколько строк), чтобы мы передавали некий объект сопрограммы в каждый из
create_task
иensure_future()
. -
(3) Получаем необходимый цикл.
-
(4) С самого начала мы применяем
loop.create_task()
для планирования своей сопрограммы в нашем цикле и мы получаем некий новый экземплярTask
обратно. -
(5) Здесь мы проверяем полученный тип. До сих пор ничего примечательного.
-
(6) Здесь мы показываем как
asyncio.ensure_future()
может применяться для выполнения токо же самого действия, которое совершаетcreate_task
: мы передаём некую сопрограмму и получаем обратно какой- то экземплярTask
(а наша сопрограмма оказывается запланированной для исполнения в основном цикле)! Если вы передаёте некую сопрограмму, не имеется разницы междуloop.create_task()
иasyncio.ensure_future()
. -
(7) Но что произойдёт если мы передадим некий экземпляр
Task
чтобыensure_future()…
? Отметим, что мы передаём уже созданный экземпляр задачи, которая была созданаloop.create_task()
на шаге 4. -
(8) Мы получаем обратно в точности тот же самый экземпляр
Task
что мы и передали: он передаётся в неизменном виде. {Прим. пер.: только если эта задача не находится в ожидании - см нашу сноску выше с цитированием документации Python 3.7 - в этом случае возвращается новаяTask
, ожидающая полученный в качестве аргумента объект в состоянии ожидания.}
Итак: что именно является непосредственным пунктом передачи экземпляров Future
? И зачем делать два
разных дела в одной функции? Ответ состоит в том, что данная функция, ensure_future()
, предназначается
для применения авторами инструментальных средств для предоставления API
разработчиком конечных приложений и которая может обрабатывать оба вида параметров {даже три!}. Не верите мне?
Вот что следует от самого Благословенного Диктатора Всея Жизни (BDFL):
Основной момент ensure_future()
состоит в том, что если у вас есть нечто, что может быть либо сопрограммой,
либо неким Future
(и этот последний содержит некую Task
, поскольку она является субклассом Future
), и вы
желаете иметь возможность вызывать некий метод в нём, который определён только в Future
(скорее всего, единственным полезным примером
является cancel()
). Когда это уже Future
(или Task
) не делается ничего; когда это некая сопрограмма,
она обёртывается некоторой Task
.
Если вы знаете что у вас имеется некая сопрограмма и вы желаете её запланировать, более верным API для применения будет
create_task()
. Единственный случай, в котором вам следует вызывать ensure_future()
это когда вы предоставляете некий API
(как это делает большая часть владельцев API), который принимает либо некую сопрограмму, либо какой- то Future
и вам требуется делать
нечто с этим, и что требует чтобы у вас имелся Future
[10].
-- Гвидо ван Россум, github.com/python/asyncio/issues/477
Итак, суммируем: наш API asyncio.ensure_future()
является некоторой вспомогательной функцией предназначенной
для разработчиков инструментальных средств. Она проще всего поясняется по аналогии с более распространённым видом функции: если у вас есть за плечами
пять лет опыта программирования, вы вы уже могли видеть функции подобные приводимой ниже listify()
:
def listify(x: Any) > List:
""" Try hard to convert x into a list """
if isinstance(x, (str, bytes)):
return [x]
try:
return [_ for _ in x]
except TypeError:
return [x]
Эта функция пытается преобразовать полученный аргумент в некий перечень, вне зависимости от того что она получает. Такой вид функция часто
применяется в API и инструментальных средствах для принуждения вводов в некий известный тип, что упрощает
последующий код, так как вы знаете что этот параметр (выводимый из listify()
) всегда будет неким
списком.
Если я переименую свою функцию listify()
в
ensure_list()
, то вы должны начать замечать определённую параллель с
asyncio.ensure_future()
, не так ли? Он всегда пытается принудительно преобразовывать получаемый аргумент
в некий тип Future
. Эта утилитарная функция чтобы делать жизнь более простой для
разработчиков инструментальных средств, а не для разработчиков окончательных продуктов, коими являемся вы и я.
И на самом деле, даже стандартная библиотека asyncio
сама по себе применяет
ensure_future()
в точности для этой цели. Когда вы в следующий раз будете просматривать API, вы повсеместно
будете видеть некий параметр функции, описываемый как coro_or_future
, и именно его, скорее всего,
ensure_future()
будет применять внутри себя для принудительного преобразования данного параметра. Например,
имеющаяся функция asyncio.gather()
имеет такуб отличительную особенность:
asyncio.gather(*coros_or_futures, loop=None, ...)
Внутри себя gather()
применяет using ensure_future()
для принудительного
приведения типа и это делает для вас возможным передавать ему сопрограммы, задачи или фьючерсы.
Основным ключевым моментом здесь является: поскольку вы некий разработчик конечного приложения, вам никогда не приходится применять
asyncio.ensure_future
. Этот инструмент больше предназначен для разработчиков инструментальных средств.
Если вам требуется спланировать некую сопрограмму в своём цикле событий, просто делайте это напрямую с помощью
loop.create_task()
.
К сожалению, это не самый конец данной истории. Прямо сейчас некоторые разработчики конечных продуктов предпочитают
применять asyncio.ensure_future()
вместо loop.create_task()
по одной
простой и очень прагматичной причине: так меньше работы! Чтобы вызвать create_task()
вам либо требуется
некий экземпляр loop
, доступный в вашем локальном пространстве имён, в котором вы пишите код, или же вам необходим
некий дополнительный вызов asyncio.get_event_loop()
для получения соответствующей ссылки на
loop
; в противоположность этому, ensure_future()
может вызываться как есть.
В приведённом выше примере у нас имелось три функции сопрограмм внутри которых будут запущены некие фоновые
сопрограммы. Основная цель данного упражнения состояла в сопоставлении эстетических преимуществ применения
create_task()
по отношению к ensure_future()
при планировании
сопрограмм в основном цикле.
Пример 3-9. Сопоставление применения create_task()
и ensure_future()
import asyncio
async def background_job():
pass
async def option_A(loop):(1)
loop.create_task(background_job())
async def option_B():(2)
asyncio.ensure_future(background_job())
async def option_C():(3)
loop = asyncio.get_event_loop()
loop.create_task(background_job())
loop = asyncio.get_event_loop()
loop.create_task(option_A(loop))(1)
loop.create_task(option_B)(2)
loop.create_task(option_C)(3)
-
(1) В этом первом варианте, после создания необходимого цикла обычным образом мы планируем ту сопрограмму, которая возвращается из
option_A()
. Внутри создаётся некая новая задача для данного фонового задания. Поскольку мы не ожидаем данную фоновую задачу, наша сопрограммаoption_A()
выполнит выход; однако это не самое интересное здесь. Чтобы запланировать данную фоновую задачу при помощи вызоваcreate_task()
, было необходимо иметь доступным соответствующий объектloop
. В данном случае мы передаём его когда создавалась сопрограммаoption_A()
. -
(2) В нашем втором случае мы уже располагаем некоторой спланированной задачей, однако не требовалось передавать соответствующий экземпляр
loop
, так как наша функцияensure_future()
доступна непосредственно в самом модулеasyncio
. И это существенный момент: некоторые люди применяютensure_future()
не из- за её способности принудительного приведения типов к экземплярамFuture
, а вместо того чтобы не пришлось передавать идентификаторыloop
. -
(3) И наш окончательный вариант, в нём также требуется передавать некий экземпляр цикла. В этом случае мы получаем текущий цикл событий [в этом случае "текущий" означает тот экземпляр цикла событий, который связан с данным действующим потоком] путём вызова
get_event_loop()
, а затем мы вновь способны делать "все правильные вещи" и вызватьloop.create_task()
. Это аналогично тому что делает внутри себяensure_future()
.
Что в действительности следует иметь доступным, так это вспомогательную функцию
asyncio.create_task (coro)
, которая делает в точности то же самое что и наш "Вариант C"?
приведённый выше, но предопределённой в имеющейся стандартной библиотеке. Это низложит удобство ensure_future()
и сохранит ясность loop.create_task()
. Эта потребность не замечалась командой разработки Python, но я счастлив
сообщить, что asyncio.create_task()
в конечном счёте будет доступна в Python 3.7!
В своих последующих разделах мы будем возвращаться к свойствам уровня языка программирования, начиная с диспетчеров асинхронного контекста.
Сопровождение контекста диспетчеров оказывается исключительно удобным. Она действительно имеет смысл, так как многие варианты требуют чтобы сетевые ресурсы - например, соединения - должны открываться и закрываться в определённой надлежащим образом области.
Основным ключом к пониманию async with
является осознание того, что определённая операция некоторого диспетчера
контекста управляется вызовами метода; с последующим рассмотрением соображения: а что если бы такие методы были бы
функциями сопрограмм? И на самом деле, это в точности так и работает. Вот псевдокод, демонстрирующий всё это:
Пример 3-10. Диспетчер контекста Async
class Connection:
def __init__(self, host, port):
self.host = host
self.port = port
async def __aenter__(self):(1)
self.conn = await get_conn(self.host, self.port)
return conn
async def __aexit__(self, exc_type, exc, tb):(2)
await self.conn.close()
async with Connection('localhost', 9001) as conn:
<do stuff with conn>
-
(1) Вместо специального метода
__enter__()
для синхронизации диспетчеров контекста применяется особый новый метод__aenter__()
. -
(2) Более того, вместо
__exit__()
используется__aexit__()
. Все прочие параметры идентичны параметрам__exit__()
и заполняются если возбуждается некая исключительная ситуация в самом теле диспетчера контекста.
Предостережение | |
---|---|
просто потому что вы должны применять |
А теперь - только между нами - мне не сильно нравится такой стиль применения диспетчера контекста в явном виде, в то время как имеется великолепный декоратор
@contextmanager
в соответствующем модуле contextlib
стандартной библиотеки!
Как вы можете догадаться, имеется и некая асинхронная версия, @asynccontextmanager
, но, к сожалению, она
доступна только начиная с Python 3.7, который ещё не был доступен на момент написания данной книги {Прим. пер.: На момент
перевода, всё уже хорошо, Python 3.7 доступен.} Тем не менее, достаточно просто скомпилировать Python из исходного кода и в нашем следующем
разделе покажем как @asynccontextmanager
будет работать в Python 3.7.
Он аналогичен декоратору @contextmanager
из соответствующего модуля
contextlib
стандартной библиотеки. Для краткого повтора, давайте вначале рассмотрим соответствующий
вариант с блокировкой:
Пример 3-11. Вариант с блокировкой
from contextlib import contextmanager
@contextmanager(1)
def web_page(url):
data = download_webpage(url)(2)
yield data
update_stats(url)(3)
with web_page('google.com') as data:(4)
process(data)(5)
-
(1) Соответствующий декоратор
contextmanager
преобразовывает некую функцию генератора в какой- то диспетчер контекста. -
(2) Этот вызов функции, (который я изготовил специально для данного примера) выглядит подозрительно похоже на конкретный вид вещей, которые желают применять некий сетевой интерфейс, который на много порядков величины медленнее "обычного" кода, ограниченного ЦПУ. Такой диспетчер контекста должен применяться в некотором выделенном потоке, в противном случае вся программа будет приостановлена в ожидании данных.
-
(3) Представим что мы обновляем какие- то статистические данные всякий раз, когда мы обрабатываем данные из некоторого URL, например, общее число раз, которое URL был выгружен. Опять- же с точки зрения одновременного исполнения, нам придётся знать будет ли эта функция вовлечена в ввод/ вывод внутренним образом, например подобно записи в некую базу данных поверх сетевой среды. Если это так,
update_stats()
также является неким блокирующим вызовом. -
(4) Здесь применяется наш диспетчер контекста. Особенно отметим как этот сетевой вызов (для
download_webpage()
) скрывается внутри соответствующей конструкции нашего диспетчера контекста. -
(5) Данный вызов функции,
process()
, также может быть блокирующим. Нам придётся рассмотреть что делает данная функция. Чтобы предоставить вам общий взгляд на различные подлежащие рассмотрению вопросы, которые служат для принятия решения будет ли данный вызов функции выполнять "блокировку", либо же он будет "неблокирующим" перечислим те, которые могут ими служить:-
безобидные без блокировки (быстрые и ограниченный ЦПУ)
-
с мягкой блокировкой (быстрой и ограниченной вводом/ выводом, возможно, нечто подобное быстрому доступу к диску вместо сетевого ввода/ вывода)
-
блокирующие (медленный, ограниченные вводом/ выводом)
-
демонические (медленные, ограниченные ЦПУ)
С целью упрощения данного примера, давайте предположим, что данный вызов
process()
является быстрой, ограниченной ЦПУ операцией и, следовательно, не имеющей блокировки. -
Давайте сопоставим в точности тот же самый пример, но на этот раз и применением новой вспомогательной функции, осведомлённой об асинхронности, привносимой Python 3.7:
Пример 3-12. Вариант без блокировки
from contextlib import asynccontextmanager
@asynccontextmanage(1)
async def web_page(url):(2)
data = await download_webpage(url)(3)
yield data(4)
await update_stats(url)(5)
async with web_page('google.com') as data:(6)
process(data)
-
(1) Наш новый
asynccontextmanager
применяется точно так же. -
(2) Тем не менее, он требует, чтобы функция декорирующего генератора объявлялась с применением
async def
. -
(3) Как и ранее, прежде чем сделать данные доступными в самом теле нашего диспетчера контекста, мы выбираем эти данные из необходимого URL. В данном случае я добавил необходимое ключевое слово
await
, которое сообщает нам что эта сопрограмма позволит соответствующему циклу событий запускать прочие задачи пока мы ожидаем завершения своего сетевого вызова.Заметим, что мы не можем просто прибить своё ключевое слово
await
куда пожелаем. Данное изменение также предполагает, что мы также способны изменить саму свою функциюdownload_webpage()
и преобразовать её в некую сопрограмму, которая совместима с этим ключевым словомawait
. Для тех случаев, когда невозможно изменять данную функцию, необходим иной подход и мы обсудим его в своём следующем примере. -
(4) Как и ранее, все данные делаются доступными самому телу нашего диспетчера контекста. Я пытаюсь сохранять данный код простым, и поэтому я опускаю обработчики
try/finally
, которые обычно вам следует писать для рассмотрения исключительных ситуаций, возбуждаемых в теле вызывающей стороны.Отметим: Собственно присутствие
yield
является именно тем, что преобразует некую функцию в функцию генератора; дополнительно присутствие соответствующих ключевых словasync def
в пункте 1 превращает её в асинхронную функцию генератора. При своём вызове она будет возвращать некий асинхронный генератор. Наш модульinspect
имеет две функции, которые могут проверить это: соответственно,isasyncgenfunction()
иisasyncgen()
. -
(5) Тут мы предполагаем, что мы также преобразовали свой код внутри функции
update_stats()
чтобы позволить ему производить сопрограммы. Затем мы можем применить ключевое словоawait
, которое допустит некое переключение в нашем цикле событий пока мы ожидаем завершение работы, ограниченной соответствующим вводом/ выводом. -
(6) Для применения самого диспетчера контекста требовалось другое изменение: нам требовалось применять
async with
вместоwith
.
К счастью, данный пример показывает, что наш новый @asynccontextmanager
в точности аналогичен
имевшемуся ранее декоратору @contextmanager
.
В сноске 3 предыдущего примера я сказал, что было необходимо изменить некоторые функции для возврата сопрограмм; ими были
download_webpage()
иupdate_stats()
. Обычно это не так просто сделать, так
как необходимо добавить поддержку асинхронности на нижнем уровне сокета.
Основной фокус предыдущего примера был нацелен на то что бы просто показать наш новый @asynccontextmanager
,
а не показывать как преобразовывать функции с блокировкой в неблокирующие. Более общий случай состоит в том, что вам может понадобиться применять в своей
программе некую функцию с блокировкой, но не будет возможности изменять имеющийся в таких функциях код.
Такая ситуация часто происходит с библиотеками сторонних разработчиков и прекрасным примером может послужить соответствующая библиотека
requests
, которая внутри себя применяет вызовы с блокировкой [Может оказаться весьма сложным добавление
поддержки Async в некое имеющееся инструментальное средство по данной причине, так как могут потребоваться большие структурные изменения. Это обсуждается
в эссе для Requests github].
Хорошо, раз вы не можете изменить подлежащий вызову код, имеется иной путь, и это удобное место показать как именно для этого можно применять
Исполнитель (executor):
Пример 3-13. Вариант без блокировки с небольшой вспомогательной функцией от моих друзей
from contextlib import asynccontextmanager
@asynccontextmanager
async def web_page(url):(1)
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(
None, download_webpage, url)(2)
yield data
await loop.run_in_executor(None, update_stats, url)(3)
async with web_page('google.com') as data:
process(data)
-
(1) В данном примере мы предполагаем, что у нас нет возможности изменить свой код для двух вызовов с блокировкой
download_webpage()
иupdate_stats()
, то есть мы не можем модифицировать их в функции сопрограмм. Это плохо, ибо самый серъёзный грех основанного на событиях программирования нарушает то правило, которое не следует нарушать и ни при каких обстоятельствах не мешать основному циклу событий обрабатывать события.Чтобы обойти данную проблему, мы будем применять некий Исполнитель для запуска вызовов с блокировкой в каком- то отдельном потоке. Такой Исполнитель предоставляется еам в виде некоего атрибута самого основного цикла событий .
-
(2) Здесь мы вызываем своего Исполнителя. Его отличительной особенностью является
AbstractEvent Loop.run_in_executor(executor, func, *args)
и, если вы желаете применять определяемый по умолчанию Исполнитель (которым являетсяThreadPoolExecutor
) тогда вам следует передатьNone
в качестве аргумента для соответствующего "Исполнителя". [Это очень раздражает. Всякий раз, когда я применяю данный вызов, я не могу задаваться вопросом, почему бы не предпочесть в качестве аргумента некоего ключевого слова идиому использованияexecutor=None
.] -
(3) Как и в случае вызова
download_webpage()
, мы также можем запустить другой блокирующий вызов кupdate_stats()
в некотором Исполнителе. Отметим, что вы обязаны применять ключевое словоawait
спереди этого вызова. Если вы его забудете, получаемое исполнение вашего асинхронного генератора (то есть вашего асинхронного диспетчера контекста) не будет выполнять ожидания необходимого вызова на завершение перед обработкой.
Скорее всего асинхронные диспетчеры контекста станут широко применяться во многих кодах, основанных на asyncio
,
следовательно достаточно важно иметь о них хорошее представление. Вы можете прочесть дополнительные подробности о новом асинхронном декораторе
диспетчера контекста в документации Python 3.7.
Замечание | |
---|---|
|
Помимо async def
и await
имеется ещё несколько расширений в синтаксисе
языка Python. Первым из них является асинхронная версия для цикла "for". Проще всего понять как это работает если вы вначале поймёте что
простая итерация - так же как и многие другие функции языка программирования - реализуется при помощи специальных
методов, отличающих их двойными подчёркиваниями в соответствующих названиях.
Например, вот как некий стандартный (не асинхронный) итератор определяется посредством применения методов
__iter__()
и __next__()
:
>>> class A:
... def __iter__(self):(1)
... self.x = 0(2)
... return self(3)
... def __next__(self):(4)
... if self.x > 2:
... raise StopIteration(5)
... else:
... self.x += 1
... return self.x(6)
>>> for i in A():
... print(i)
123
-
(1) Некий Итератор обязан реализовывать соответствующие специальный метод
__iter__()
. -
(2) Инициализируем некое состояние в его "начальном" значении состояния.
-
(3) Наш особый метод
__iter__()
должен возвращать нечто итерируемое, т.е. какой- то объект, который реализует специальный метод__next__()
. В данном случае, тот же самый экземпляр, так какA
сам по себе также реализует соответствующий особый метод__next__()
. -
(4) Метод
__next__()
определён. Он буде вызываться для каждого последующего шага в итерациях, до тех пор пока ... -
(5) ... не будет возбуждена
StopIteration
. -
(6) Здесь вырабатываются возвращаемые значения для каждой итерации.
Теперь вы зададите справедливый вопрос: что произойдёт если вы объявите свой особый метод __next__()
в
качестве некоторой функции сопрограммы async def
? Это позволит ему выполнять
await
некоторого вида операций, ограниченных вводом/ выводом; а также это во многом в точности повторяет то,
как работает async for
, за исключением маленьких деталей вокруг именований. Имеющаяся спецификация (в
PEP 492) показывает, что чтобы применять
async for
в качестве какого- то асинхронного итератора в самом этом асинхронном итераторе требуются
определённые моменты:
-
Вам требуется реализовать
def __aiter__()
(Отмечаем: без)async def
!) -
__aiter__()
должен возвращать некий объект, который реализуетasync def __anext__()
-
__aiter__()
должен возвращать некое значение для каждой итерации и при завершении должен возбуждатьStopAsyncIteration
Давайте быстро взглянем на то как это может работать. Представим что у нас имеется пачка ключей в некоторой базе данных Redis и мы желаем выполнить итерации по их данным, но при этом делать выборку данных только по запросу. Некий асинхронный итератор может выглядеть примерно как- то так:
import asyncio
from aioredis import create_redis
async def main():(1)
redis = await create_redis(('localhost', 6379))(2)
keys = ['Americas', 'Africa', 'Europe', 'Asia'](3)
async for value in OneAtATime(redis, keys):(4)
await do_something_with(value)(5)
class OneAtATime:
def __init__(self, redis, keys):(6)
self.redis = redis
self.keys = keys
def __aiter__(self):(7)
self.ikeys = iter(self.keys)
return self
async def __anext__(self):(8)
try:
k = next(self.ikeys)(9)
except StopIteration:(10)
raise StopAsyncIteration
value = await redis.get(k)(11)
return value
asyncio.get_event_loop().run_until_complete(main())
-
(1) Это основная функция: вы запускаем её при помощи
run_until_complete()
в нижней части своего примера кода. -
(2) Для получения соединения мы применяем интерфейс верхнего уровня из
aioredis
. -
(3) Представим себе, что каждое из связанных с этими ключами значение достаточно большое и хранится в соответствующем экземпляре Redis.
-
(4) Здесь мы применяем
async for
: основной момент состоит в том, что итерация сама по себе способна подвешивать себя пока не дождётся появления своего следующего значения. -
(5) Для полноты допустим что мы также выполняем некие действия, связанные с вводом/ выводом для своей выборки данных; может происходить обмен данными и затем они отправляются другому получателю.
-
(6) Собственно инициализатор данного класса достаточно простой: мы сохраняем необходимое подключение к экземпляру Redis и соответствующий перечень ключей, по которым требуется выполнять итерацию.
-
(7) В точности как и для предыдущего примера кода с
__iter__()
, мы используем__aiter__()
для настройки необходимых для итерации моментов. Мы создаём некий обычный итератор по имеющимся ключам,self.ikeys
, а такжеreturn self
, так какOneAtATime
также реализует соответствующие метод__anext__()
сопрограммы. -
(8) Отметим, что метод
__anext__()
объявляется с применениемasync def
, в то время как метод__aiter__()
объявляется только при помощиdef
. -
(9) Для каждого ключа выполняем выборку его значения из Redis:
self.ikeys
является обычным итератором по имеющимся ключам, поэтому мы применяемnext()
для перемещения по ним. -
(10) Когда израсходованы
self.ikeys
, обрабатываем соответствующийStopIteration
и просто возвращаем его вStopAsyncIteration
! Именно таким образом вы сигнализируете об останове изнутри некоего асинхронного итератора. -
(11) Наконец - вся цель данного примера - мы можем получить необходимые данные из Redis при помощи данного ключа. Мы можем выполнить
await
для этих данных, что означает, что другой код вашего цикла событий может выполняться пока мы дожидаемся сетевого ввода/ вывода.
Надеемся, что этот пример является понятным: async for
предоставляет возможность оставаться в удобном простом
цикле for, даже при итерации данных при которых сама по себе итерация выполняет ввод/ вывод. Основное преимущество от этого состоит в том что вы
можете обрабатывать гигантские объёмы данных, причём всё это делать в отдельном цикле, так как вам придётся всякий раз иметь дело только с каждым
фрагментом в небольших партиях.
Асинхронные генераторы отвечают на законный вопрос: "Что произойдёт если применить yield
внутри
естественной функции сопрограммы async def
?" Эта концепция может вводить в заблуждение если у вас
имеется некий опыт применения генераторов как если бы они были сопрограммами, так как это происходит в
инструментальном средстве Twisted
или в инфраструктуре
Tornado
, или даже как в случае yield from
из
asyncio
Python 3.4.
Следовательно, прежде чем мы продолжим данный раздел, было бы лучше, чтобы вы убедили себя что:
-
Сопрограммы и генераторы совершенно разные понятия.
-
Асинхронные генераторы ведут себя во многом подобно обычным генераторам.
-
Для итераций вы применяете
async for
для асинхронных генераторов вместо обычногоfor
в обычных генераторах.
Наш пример, использованный в предыдущем разделе для демонстрации какого- то асинхронного итератора при итерациях по Redis оказывается намного проще если мы установим некий асинхронный генератор:
Пример 3-14. С асинхронным генератором проще
import asyncio
from aioredis import create_redis
async def main():(1)
redis = await create_redis(('localhost', 6379))
keys = ['Americas', 'Africa', 'Europe', 'Asia']
async for value in one_at_a_time(redis, keys):(2)
await do_something_with(value)
async def one_at_a_time(redis, keys):(3)
for k in keys:
value = await redis.get(k)(4)
yield value(5)
asyncio.get_event_loop().run_until_complete(main())
-
(1) Самая основная функция
main()
идентична той, которая была в нашем коде Итераторы Async: async for. -
(2) Ладно, почти идентична: мне пришлось изменить её название с варианта написания верблюдом, на змеиный регистр (с подчёркиванием).
-
(3) Наша функция теперь объявляется с помощью
async def
, превращая её в некую функцию сопрограммы, а так как эта функция также содержит ключевое словоyield
, мы именуем её асинхронной функцией генератора. -
(4) Нам не требуется выполнять путанные действия, требовавшиеся в нашем предыдущем примере с помощью
self.ikeys
: здесь мы просто обходим циклом по имеющимся ключам напрямую, получая необходимые значения... -
(5) ... и собирая их для вызывавшей стороны, в точности как в обычном генераторе.
Если вы новичок в данном вопросе, всё это может показаться слишком сложным, но я призываю вас поиграть со всеми этими примерами самостоятельно.
Достаточно быстро вы начнёте ощущать это естественным. Асинхронные генераторы скорее всего станут очень популярными при кодировании на основе
asyncio
, поскольку они привносят в точности те же самые преимущества, что и обычные генераторы:
делают код короче и проще.
Теперь, когда мы рассмотрели как Python поддерживает асинхронные итерации, следующим естественным вопросом будет спросить понимает ли он также списки в for - и наш ответ ДА! Данная поддержка была введена в PEP 530 и я рекомендую вам просмотреть этот PEP самостоятельно. Он достаточно короткий и хорошо читаемый.
Пример 3-15. Async достаточно искушён чтобы работать со списками, словарями и наборами
>>> import asyncio
>>> async def doubler(n):
... for i in range(n):
... yield i, i * 2(1)
... await asyncio.sleep(0.1)(2)
>>> async def main():
... result = [x async for x in doubler(3)](3)
... print(result)
...
... result = {x: y async for x, y in doubler(3)}(4)
... print(result)
...
... result = {x async for x in doubler(3)}(5)
... print(result)
>>> asyncio.get_event_loop().run_until_complete(main())
[(0, 0), (1, 2), (2, 4)]
{0: 0, 1: 2, 2: 4}
{(1, 2), (0, 0), (2, 4)}
-
(1)
doubler()
является очень простым асинхронным генератором: задавая некое верхнее значение он итеративно проходит по простому диапазону, принося кортежи из смого значения и его дублёра. -
(2) Небольшой сон, просто чтобы подчеркнуть что это некая асинхронная функция.
-
(3) Широта охвата некоторого асинхронного списка: обратите внимание как
async for
применяется вместо обыкновенногоfor
. Это отличие в точности то же, что а в Итераторы Async: async for. -
(4) Выразительность Async; все обычные трюки срабатывают, например, распаковка полученного кортежа в
x
иy
с тем, чтобы они смогли запитать охват синтаксиса словаря. -
(5) Широта охвата асинхронного множества работает в точности так, как вы могли бы ожидать.
Обратной стороной нашей монеты, как выводит PEP 530,
является применение await
внутри выразительных средств. Но это не совсем так:
await <coro>
является обычным выражением и оно может применяться в большинстве мест, в которых вы это можете
ожидать.
Именно async for
является тем, что превращает выразительность в асинхронную
выразительность, а не наличие await
. Всё что вам требуется для допустимости
await
(внутри некоторого поглощения), это то, что вы находитесь внутри самого тела какой- то функции сопрограммы,
то есть некоторой функции, объявленной с помощью async def
. Поэтому хотя применение
await
и async for
внутри того же самого поглощаемого списка в
действительности сочетает два различных подхода, давайте сделаем это в любом случае чтобы продолжить снижать порого чувствительности вхождения в
комфортное состояние от применения синтаксиса асинхронного языка программирования:
Пример 3-16. Собираем всё в кучу
>>> import asyncio
>>> async def f(x):(1)
... await asyncio.sleep(0.1)
... return x + 100
>>> async def factory(n):(2)
... for x in range(n):
... await asyncio.sleep(0.1)
... yield f, x(3)
>>> async def main():
... results = [await f(x) async for f, x in factory(3)](4)
... print('results = ', results)
>>> asyncio.get_event_loop().run_until_complete(main())
results = [100, 101, 102]
-
(1) Очень простая функция сопрограммы: небольшое засыпание с последующим возвратом увеличенного на 100 значения параметра.
-
(2) Именно этот асинхронный генератор мы будем вызывать внутри некоторого охватывающего асинхронного списка слегка ниже, применяя
async for
для продвижения по итерациям. -
(3) Этот асинхронный генератор будет производить некий кортеж из
f
и значения переменной итерациич
. Собственно возвращаемым значениемf
является некая функция сопрограммы, кщё пока не сопрограмма. -
(4) Наконец, собственно асинхронный охват (выразительность). Данный пример был слегка натянут чтобы продемонстрировать некую выразительность, которая включает в себя одновременно и
async for
, иawait
. Давайте разберём что происходит внутри данного охвата: вызов нашегоfactory(3)
возвращает некий асинхронный генератор, который должен управляться итерациями. Так как он является неким асинхронным генератором, вы не можете просто применятьfor
; вы обязаны воспользоватьсяasync for
.Далее, те значения, которые производятся данным асинхронным генератором являются кортежем из функции сопрограммы
f
и некоторогоint
. Вызов данной функции сопрограммыf()
производит некую сопрограмму, которая должна вычисляться при помощиawait
.Заметим, что внутри данного охвата использование
await
не имеет ничего общего с применениемasync for
: они выполняют совершенно различные вещи и действуют на целиком различные объекты.
Основная часть основанных на асинхронности программ предназначены для длительного исполнения сетевых приложений. Эта область таит в себе удивительно много сложностей, связанных с тем как корректно осуществлять запуск и останов.
Из этих двоих, запуск намного проще. Самый стандартный способ запуска некоторого приложения asyncio
состоит в создании какой- то задачи, с последующим вызовом loop.run_forever()
, как это было показано в
нашем примере Hello World из раздела
Быстрое начало.
Единственным исключением может быть случай когда вам требуется запустить некий сервер ожидания. В таком случае запуск обычно является процесом из двух этапов:
-
Вначале создаёте некую сопрограмму только для самой "запускающей" фазы такого сервера, а затем воспользуйтесь
run_until_complete()
в данной инициализированной сопрограмме для запуска самого сервера. -
Во- вторых, продолжите в свой обычной "главной" {main} части приложения посредством вызова
loop.run_forever()
.
Обычно запуск будет достаточно прямолинейным; а относительно описанного выше варианта сервера вы можете ознакомиться с дополнительными подробностями в документации. Кроме того мы также кратко рассмотрим некую демонстрацию запуска такого сервера в последующем примере кода.
Останов является намного более интригующим.
Для останова вначале мы рассмотрим соответствующий танец, который следует за тем, когда нечто останавливает имеющийся цикл событий. Когда некий
исполняемый цикл останавливается, соответствующий вызов run_forever()
выполняет снятие блокировки и выполняется
появляющийся после этого код. В этот момент вы обязаны:
-
собрать все всё ещё приостановленные объекты задач (если таковые имеются)
-
прекратить эти задачи (что возбудит
CancelledError
внутри каждой исполняемой сопрограммы, которые вы можете выбрать на обработку вtry/except
внутри самого тела данной функции сопрограммы -
собрать все эти задачи в некую "группу" задач
-
к этой "группе" задач применить
run_until_complete()
чтобы дождаться их завершения, т.е. позволить такой исключительной ситуацииCancelledError
возбудиться и подвергнуться обработке
Только после этого останов завершится.
Обряд посвящения в построение ваших первых нескольких прикладных программ asyncio
будет состоять из попыток
избавления от сообщений подобных Task was destroyed but it is pending! {Задача была уничтожена, но она
приостановлена!} на протяжении останова. Это происходит по причине пропуска одного или более перечисленных выше этапов. Вот пример данной ошибки:
Пример 3-17. Уничтожение приостановленных задач
# taskwarning.py
import asyncio
async def f(delay):
await asyncio.sleep(delay)
loop = asyncio.get_event_loop()
t1 = loop.create_task(f(1))(1)
t2 = loop.create_task(f(2))(2)
loop.run_until_complete(t1)(3)
loop.close()
Вывод:
$ python taskwarning.py
Task was destroyed but it is pending!
task: <Task pending coro=<f() done, defined at [...snip...]>>
Данная ошибка говорит вам о том, что некие задачи пока ещё не завершены при закрытии данного цикла. Мы желаем избежать этого, и именно по этой причине определённая формирующая отдельную сущность останова процедура служит сбору всех незавершённых задач, их прекращению с последующим допуском их завершения прежде чем будет закрыт данный цикл.
Давайте рассмотрим более подробно код из своего примера Быстрого начала и рассмотрим эти фазы вновь. Мы проделаем это через мини- обучение с помощью эхо сервера на основе telnet.
На Рисунке 3-1 наш сервер запущен с левой стороны панели. Затем, справа, некий сеанс telnet выполняет подключение к нашему серверу. Мы набираем несколько команд, которые возвращаются к нам обратно (все приведённые в верхний регистр) и после этого мы отключаемся. Теперь мы рассмотрим тот код, который управляет данной программой.
Пример 3-18. Жизненный цикл приложения Asyncio (на основе представленного в документации Python эхо сервера)
from asyncio import ((1)
get_event_loop, start_server, CancelledError,
StreamReader, StreamWriter, Task, gather)
async def echo(reader: StreamReader, writer: StreamWriter):(2)
print('New connection.')
try:
while True:(3)
data: bytes = await reader.readline()(4)
if data in [b'', b'quit']:
break
writer.write(data.upper())(5)
await writer.drain()
print('Leaving Connection.')
# except CancelledError:(6)
# writer.write_eof()
# print('Cancelled')
finally:
writer.close()
loop = get_event_loop()
coro = start_server(echo, '127.0.0.1', 8888, loop=loop)(7)
server = loop.run_until_complete(coro)(8)
try:
loop.run_forever()(9)
except KeyboardInterrupt:
print('Shutting down!')
server.close()(10)
loop.run_until_complete(server.wait_closed())(11)
tasks = Task.all_tasks()(12)
for t in tasks:
t.cancel()
group = gather(*tasks, return_exceptions=True)(13)
loop.run_until_complete(group)(14)
loop.close()
-
(1) Импортируем кучу вещей из пространства имён
asyncio
- для данной книги мне приходится контролировать длин строки! -
(2) Данная функция сопрограммы
echo()
будет применена (нашим сервером) для создания какой- то сопрограммы для каждого выполненного соединения. Эта сопрограмма применяет APIstreams
для сетевого обмена сasyncio
. -
(3) Для поддержки подключения у нас будет некий бесконечный цикл ожидающий сообщения.
-
(4) Ожидаем некую строку данных с другой стороны.
-
(5) Возвращаем полученный данные обратно, но целиком в верхнем регистре.
-
(6) Я интенсивно комментирую данный блок чтобы привлечь внимание к тому, что случится если ваша сопрограмма не отработает должным образом прекращение. Если вы действительно обработаете
CancelledError
, вы получите некий шанс выполнить всю очистку, особенную для сценария останова. -
(7) Именно это является запускающей фазой нашей программы: наш сервер требует некоторого отдельного шага из имеющегося "основного" этапа
run_forever()
. Наша функцияstart_server()
возвратит какую- то сопрограмму и она должна бытьrun_until_complete()
. Отметим как передаётся наша функция сопрограммыecho
: она будет использоваться в качестве некоторой фабрики, которая производит новую сопрограмму для каждого нового подключения. -
(8) Исполним эту сопрограмму для запуска своего сервера TCP.
-
(9) Только теперь мы начинаем свою основную, "ожидающую" часть программы. Начиная с этого момента всякое выполненное для нашего сервера подключение TCP породит некую сопрограмму из нашей функции сопрограммы
echo
. Единственная вещь, которая может остановить наш цикл событий этоKeyboardInterrupt
, которое является тем же самым, чем являетсяSIGINT
для систем Unix. (В некоторой промышленной системе вы будете применять особые обработчики сигналов, отличающиеся отKeyboardInterrupt
; это представлено далее в разделе Сигналы.) . -
(10) По достижению этого места мы знаем что был инициирован останов, например, при помощи Ctrl-C. Самая первая вещь, которую необходимо сделать это предотвратить наш сервер от приёма дополнительных новых подключений. Это требует двух шагов: во- первых вызываем
server.close()
... -
(11) ... а затем запускаем
wait_closed()
в определённом цикле для закрытия всех сокетов, которые всё ещё ожидают подключений, но которые ещё не соединились. Отметим, что все действующие подключения (например, порождённые изasyncio
) с активными подключениями не подвергнутся воздействию: эти соединения останутся работающими. -
(12) Теперь мы можем начать останавливать задачи. Данная формирующая единое целое последовательность должна собрать все приостановленные в данный момент задачи в рассматриваемом цикле событий, прекратить их и затем собрать их в некую отдельную задачу группы.
-
(13) Отметьте данный параметр
return_exceptions
. Он достаточно важен и отдельно обсуждается в нашем следующем разделе. -
(14) Как и ранее, для завершения запускаем полученную задачу группы.
Надеюсь, вы начинаете различать знакомый шаблон.
Совет | |
---|---|
Один самый последний момент прежде чем мы переместимся: Если вы перехватываете И вот почему: тот вызов |
И запомните: если вы применяете некую библиотеку или какое- то инструментальное
средство, убедитесь что вы следуете их документации относительно того как выполнять запуск и останов. Инструментальные средства сторонних
разработчиков обычно предоставляют свои собственные функции запуска и останова, к тому же они предоставляют особые точки входа событий для
индивидуализации. Вы можете увидеть это в примере из данной книги с инструментальным средством Sanic
позднее в этой книге в Примере: недостоверность кэширования.
Вы могли заметить, что я установил значение аргумента с ключевым словом return_exceptions=True
в своём
вызове gather()
при исполнении 13 в нашем предыдущем примере кода. Я также это делал ранее в
Быстром начале, причём я очень подло не сказал ничего об этом в тот раз. Настал момент для
пояснений.
Значением по умолчанию является gather(..., return_exceptions=False)
. Такое значение по умолчанию является
проблематичным для нашего процесса останова. Это слегка сложно объяснить напрямую; вместо этого давайте
пошагово пройдём некую последовательность фактов,которая сделает более простым понимание этого:
-
run_until_complete()
воздействует на некий фьючерс; на протяжении останова именно этот фьючерс возвращаетсяgather
. -
Если этот фьючерс возбуждает некую исключительную ситуацию, эта исключительная ситуация также будет возбуждена из
run_until_complete()
, что означает, что данный цикл будет остановлен.. -
Если в некотором фьючерсе "группы" применяется
run_until_complete()
, любая возбуждаемая внутри всех его подзадач исключительная ситуация также будет возбуждена и в самом фьючерсе "группы", если она не обрабатывается в данной подзадаче. Отметим, что они также включают в себя иCancelledError
. -
Если только некоторые задачи обрабатывают
CancelledError
, а прочие нет, те из них, что не делают этого вызовут останов своего цикла. Это означает, что данный цикл будет остановлен прежде чем все задачи сделают это. -
В режиме останова нам в действительности не желательно такое поведение. Мы хотим завершить
run_until_complete()
только когда все задачи в данной группе на самом деле завершены, вне зависимости от того возбуждали некие иные задачи исключительные ситуации или нет. -
Следовательно, нам приходится выполнять
gather(*, return_exceptions=True)
: такая установка заставляет имеющийся фьючерс "групп" трактовать исключительные ситуации из своих подзадач как возвращаемые значения, поэтому они не всплывают и не выступают помехой дляrun_until_complete()
.
И здесь у вас именно это: необходимая взаимосвязь между return_exceptions=True
и
run_until_complete()
!
Нежелательная последовательность перехватываемых исключительных ситуаций, исполняемых таким образом, может ускользнуть от вашего внимания поскольку
они теперь (в действительности) подлежат обработке внутри имеющейся задачи "группы". Если это вызывает беспокойство, вы можете получить вывод
из run_until_complete()
и просканировать его относительно любых подклассов
Exception
; а затем записать протокол сообщений для вашей ситуации. Вот быстрый взгляд на то что вы получите:
Пример 3-19. Все имеющиеся задачи завершатся
import asyncio
async def f(delay):
await asyncio.sleep(1 / delay)(1)
return delay
loop = asyncio.get_event_loop()
for i in range(10):
loop.create_task(f(i))
pending = asyncio.Task.all_tasks()
group = asyncio.gather(*pending, return_exceptions=True)
results = loop.run_until_complete(group)
print(f'Results: {results}')
loop.close()
-
(1) Было бы ужасно если бы некто передал ноль...
Вывод:
$ python alltaskscomplete.py
Results: [6, 9, 3, 7, ...
ZeroDivisionError('division by zero',), 4, ...
8, 1, 5, 2]
Без return_exceptions=True
, соответствующая ZeroDivisionError
была бы возбуждена из run_until_complete()
, останавливая весь цикл и таким образом не позволяя
завершиться прочим задачам.
В нашем следующем разделе мы рассмотрим обработку сигналов (помимо KeyboardInterrupt
), но прежде чем мы
перейдём к этому, было бы неплохо принять во внимание, что аккуратный останов является одной из самых сложных сторон сетевого программирования и
это остаётся справедливым и для asyncio
. Информация в этом разделе - это только начало. Я призываю вас провести
особые проверки для чистого останова в ваших собственных автоматизированных комплектах. Различные приложения часто требуют различных стратегий.
В индексе пакетов Python я опубликовал некий крошечный пакет aiorun
, преимущественно на основе своих собственных экспериментов и исследований, относящихся в останову и он
содержит множество идей из данного раздела. Также может оказаться полезным для вас повозиться с этим кодом и поэкспериментировать со своими
собственными мыслями относительно вариантов останова asyncio
.
В своём предыдущем примере мы показали как наш цикл событий будет остановлен с помощью KeyboardInterrupt
,
например, нажатием Ctrl-C. Такое возбуждение KeyboardInterrupt
действенно снимет блокировку имеющегося
вызова run_forever()
и позволит произойти дальнейшей последовательности останова.
KeyboardInterrupt
соответствует сигналу SIGINT
. Для сетевых служб
более распространёнными в действительности являются сигнал для обработки завершения SIGTERM
, и именно он
также является установленным по умолчанию сигналом,который вы применяете в соответствующей команде
kill()
из оболочки UNIX.
Совет | |
---|---|
Данная команда |
asyncio
имеет встроенную поддержку для обработки сигналов процессов, однако имеется удивительная степень
сложности относительно обработки сигналов в целом (безотносительно к asyncio
). Мы не можем охватить всё что
имеется, но мы можем взглянуть на некоторые из самых основных вопросов, которым следует уделить внимание. Наш следующий образец кода произведёт
такой вывод:
$ python shell_signal01.py
<Your app is running>
<Your app is running>
<Your app is running>
<Your app is running>
^CGot signal: SIGINT, shutting down.
Для останова данной программы я нажал Ctrl-C, что видно из последней строки. Вот сам код:
Пример 3-20. Памятка по применению KeyboardInterrupt
в качестве обработчика SIGINT
# shell_signal01.py
import asyncio
async def main():(1)
while True:
print('')
await asyncio.sleep(1)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(main())(2)
try:
loop.run_forever()
except KeyboardInterrupt:(3)
print('Got signal: SIGINT, shutting down.')
tasks = asyncio.Task.all_tasks()
for t in tasks:
t.cancel()
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
loop.close()
-
(1) Это основная часть нашего приложения. Чтобы сделать вещи более простыми мы просто собираемся заснуть в некотором бесконечном цикле.
-
(2) Данная последовательность запуска и останова покажется знакомой вам по нашим предыдущим разделам. Мы спланируем
main()
, вызовемrun_forever()
, и подождём чего- то чтобы остановить этот цикл. -
(3) В данном случае только Ctrl-C остановит наш цикл, а затем мы обработаем
KeyboardInterrupt
и выполним все необходимые кусочки очистки, котоые мы рассмотрели в своих предыдущих разделах.
До сих пор всё было достаточно прямолинейным. Теперь мы собираемся усложнить действия:
-
Один из ваших коллег обратился к вам с просьбой обрабатывать
SIGTERM
в дополнение кSIGINT
в качестве сигнала останова. -
В вашем реальном приложении вам требуется обрабатывать
CancelledError
внутри своей сопрограммыmain()
, а необходимый код очистки внутри соответствующего обработчика исключительной ситуации собирается занять несколько секунд до завершения (представим, что вам придётся взаимодействовать с одноранговой сетевой средой и закрывать пачку соединений сокетов). -
Ваша прикладная программа не должна делать странных вещей если вы отправляете сигналы несколько раз, вы просто игнорируете все прочие сигналы вплоть до выхода.
asyncio
предоставляет достаточную степень детализации в своём API для обработки подобных ситуаций. Я
изменил свой код приведённого ранее примера для ввода этих моментов:
Пример 3-21. Обработка как SIGINT
, так и SIGTERM
, но выполнение останова только один раз.
# shell_signal02.py
import asyncio
from signal import SIGINT, SIGTERM(1)
async def main():
try:
while True:
print('<Your app is running>')
await asyncio.sleep(1)
except asyncio.CancelledError:(2)
for i in range(3):
print('<Ваша прикладная программа завершается...>')
await asyncio.sleep(1)
def handler(sig):(3)
loop.stop()(4)
print(f'Got signal: {sig!s}, shutting down.')
loop.remove_signal_handler(SIGTERM)(5)
loop.add_signal_handler(SIGINT, lambda: None)(6)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
for sig in (SIGTERM, SIGINT):(7)
loop.add_signal_handler(sig, handler, sig)
loop.create_task(main())
loop.run_forever()(8)
tasks = asyncio.Task.all_tasks()
for t in tasks:
t.cancel()
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
loop.close()
-
(1) Импортируем все значения сигналов из стандартной библиотеки
signal
. -
(2) На этот раз наша сопрограмма
main()
собирается выполнять некую очистку внутри себя. После получения сигнала прекращения (инициируемого прекращением каждой из соответствующих задач), будет иметься промежуток времени в три секунды покаmain()
продолжит исполнение на протяжении фазы "run_until_complete()
" данного процесса останова. Я напечатаю "Ваша прикладная программа завершается..." . -
(3) Это обработчик обратного вызова для того случая, когда мы получили некий сигнал. Он настроен в данном цикле через вызов к
add_signal_handler()
немного дальше. -
(4) Первейшая цель данного обработчика состоит в остановке основного цикла: это снимет блокировку вызова
loop.run_forever()
и сделает возможным сбор приостановленных задач, прекращение и собственноrun_complete()
для останова. -
(5) Так как мы теперь пребываем в режиме останова, нам не нужны иные
SIGINT
илиSIGTERM
для переключения данного обработчика снова: это вызвало быloop.stop
в процессе соответствующей фазыrun_until_complete()
, что оказало бы воздействие на наш процесс останова. По этой причине мы удаляем данный обработчик сигнала дляSIGTERM
из своего цикла. -
(6) Попались! Мы не можем просто удалить данный обработчик для
SIGINT
, так как это повлечёт чтоKeyboardInterrupt
опять станет соответствующим обработчиком дляSIGINT
, точно так же, как если бы он добавлялся ранее нашим собственным обработчиком! Вместо этого мы устанавливаем некую пустую функциюlambda
в качестве текущего обработчика. Это означает, что мы избегаемKeyboardInterrupt
иSIGINT
(а также Ctrl-C) не имеют эффекта. [add_signal_handler()
скорее всего следует именовать какset_signal_handler()
, так как у вас может иметься только один обработчик для каждого типа сигнала, а повторный вызовadd_signal_handler()
для того же самого сигнала подменит предыдущий обработчик для данного сигнала, если таковой имеется.] -
(7) В этом месте наши обработчики сигналов подключаются к основному циклу. Отметим, что уже обсуждалось ранее, после что установки некоего обработчика
SIGINT
,KeyboardInterrupt
более не будет возбуждаться поSIGINT
. Такое возбуждениеKeyboardInterrupt
является установленным "по умолчанию" обработчиком дляSIGINT
и является предварительной настройкой в Python пока вы не предпримете чего бы то ни было для изменения такого обработчика, что мы и делаем здесь. -
(8) Как обычно, исполнение блокирует
run_forever()
пока что- то не остановит данный цикл. В таком случае основной цикл будет остановлен внутриhandler()
если в наш процесс буде отправлен либоSIGINT
, либоSIGTERM
. Остаток кода тот же самый что и прежде..
Вывод:
$ python shell_signal02.py
<Your app is running>
<Your app is running>
<Your app is running>
<Your app is running>
<Your app is running>
^CGot signal: Signals.SIGINT, shutting down.
<Your app is shutting down...>
^C<Your app is shutting down...>(1)
^C<Your app is shutting down...>
-
(1) Я много раз нажал CtrlC в процессе данной фазы останова, но, как и ожидалось, ничего не произошло пока в конце концов не завершилась сопрограмма
main()
.
Возвращаясь к Быстрому началу, в котором мы представили основы интерфейса Исполнителя в
Примере 2. В Примере 2
мы указали, что соответствующий вызов time.sleep()
был короче чем соответствующий вызов
asyncio.sleep()
- к счастью для нас - так как это означает, что собственная задача Исполнителя завершается
быстрее чем его сопрограмма main()
и в следствии этого данная программа останавливается корректно.
В этом разделе мы изучим что происходит в процессе останова когда задания Исполнителя требуют больше времени чем все приостановленные экземпляры
Task
. Самый короткий ответ таков: без вмешательства будьте готовы получить такой вид ошибок:
Пример 3-22. Исполнитель требует слишком долгого исполнения до завершения
# quickstart.py
import time
import asyncio
async def main():
print(f'{time.ctime()} Hello!')
await asyncio.sleep(1.0)
print(f'{time.ctime()} Goodbye!')
loop.stop()
def blocking():
time.sleep(1.5)(1)
print(f"{time.ctime()} Hello from a thread!")
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_in_executor(None, blocking)
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
loop.close()
-
(1) Этот пример кода в точности то же что и пример из Быстрого начала, за исключением того, что значение времени сна в нашей функции с блокировкой теперь длиннее чем соответствующее значение в асинхронной части.
Пример 3-23. Вывод
Fri Sep 15 16:25:08 2017 Hello!
Fri Sep 15 16:25:09 2017 Goodbye!
exception calling callback for <Future at [...snip...]>
Traceback (most recent call last):
<big nasty traceback>
RuntimeError: Event loop is closed
Fri Sep 15 16:25:09 2017 Hello from a thread!
Что здесь происходит, так это то, что за сценой, run_in_executor()
не создаёт некий экземпляр Task
, он возвращает некий
Future
. Это означает, что он не содержится в наборе "активных задач", возвращаемых из
asyncio.Task.all_tasks()
, и таким образом, run_until_complete()
не ждёт завершения задачи своего Исполнителя!
Существует три мысли, которые приходят в голову, для исправления этого, причём все они с различными компромиссами, и мы намерены рассмотреть их все. Действительная цель данного упражнения состоит в том чтобы помочь вам узнать о сроке службы основного цикла событий с разных точек зрения и задуматься об управлении временем жизни всех ваших сопрограмм, потоков и подпроцессов, которые могут взаимодействовать в некоторой нетривиальной программе.
Самая первая идея состоит в самая простая реализация состоит в постоянном выполнении await
некоторой
задачи Исполнителя внутри какой- то сопрограммы:
Пример 3-24. Вариант A: обёртка вызова соответствующего исполнителя внутри какой- то сопрограммы
# quickstart.py
import time
import asyncio
async def main():
print(f'{time.ctime()} Hello!')
await asyncio.sleep(1.0)
print(f'{time.ctime()} Goodbye!')
loop.stop()
def blocking():
time.sleep(2.0)
print(f"{time.ctime()} Hello from a thread!")
async def run_blocking():(1)
await loop.run_in_executor(None, blocking)
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.create_task(run_blocking())(2)
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=False)
loop.run_until_complete(group)
loop.close()
-
(1) Основная идея имеет целью исправление того недостатка, что
run_in_executor()
возвращает только некий экземплярFuture
, а не какую- то задачу. Мы не можем перехватывать все задания вall_tasks()
, но мы можем применять для своего фьючерсаawait
. Чтобы сделать это мы создаём такую новую функцию сопрограммыrun_blocking()
, а внутри неё мы не делаем ничего кроме того, что мы осуществляемawait
получаемого результата из вызоваrun_in_executor()
. Такая новая функция сопрограммыrun_blocking()
будет спланирована в некуюTask
и таким образом наш процесс останова включит её в необходимую группу. -
(2) Для запуска такой новой функции сопрограммы
run_blocking()
мы применяемcreate_task
в точности так же, как планировался запуск нашей сопрограммыmain()
.
Приведённый выше код выглядит великолепно, за исключением одного момента: он не способен обрабатывать прекращение! Если вы посмотрите
повнимательнее, вы увидите, что я опустил тот цикл прекращения задач, который появлялся в наших предыдущих примерах. Если вы вернёте его обратно,
мы получим jib,rb "Event loop is closed" {Цикл событий закрыт}, как и ранее. Мы даже не можем обработать
CancelledError
внутри run_blocking()
чтобы попытаться повторно
ожидать данный фьючерс. Вне зависимости от того что мы пытаемся сделать, та задача, которая обёртывает
run_blocking()
не может быть прекращена, однако соответствующее задание
Исполнителя будет исполняться пока не завершится его внутренний time.sleep()
.
Давайте перейдём в нашей следующей идее.
Следующая, приводимая ниже идея, является слегка более хитрой. Основная стратегия состоит в сборе всех приостановленных задач, с последующим
прекращением только их, однако перед этим мы вызываем run_until_complete()
,
который мы добавили в соответствующий фьючерс из run_in_executor()
.
Пример 3-25. Вариант B: добавление соответствующего Future
Исполнителя для всех собираемых задач
# quickstart.py
import time
import asyncio
async def main():
print(f'{time.ctime()} Hello!')
await asyncio.sleep(1.0)
print(f'{time.ctime()} Goodbye!')
loop.stop()
def blocking():
time.sleep(2.0)
print(f"{time.ctime()} Hello from a thread!")
loop = asyncio.get_event_loop()
loop.create_task(main())
future = loop.run_in_executor(None, blocking)(1)
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)(2)
for t in tasks:
t.cancel()(3)
group_tasks = asyncio.gather(*tasks, return_exceptions=True)
group = asyncio.gather(group_tasks, future)(4)
loop.run_until_complete(group)
loop.close()
-
(1) В точности как и в первоначальном примере с блокировкой, мы вызываем напрямую
run_in_executor()
, но мы гарантированно назначаем полученный результат идентификаторуfuture
. В скорости этот идентификатор будет применён. -
(2) Основной цикл остановлен, и теперь мы пребываем в соответствующей фазе останова. Для начала мы получаем все задачи. Отметим, что они не содержат само задание Исполнителя, так как
run_in_executor()
не создал какой- то задачи. -
(3) Прекращаем все задачи.
-
(4) Это основная уловка: мы создаём некую новую группу, которая объединяет все задачи и соответствующий фьючерс Исполнителя. Таким образом, наш Исполнитель завершится нормально, пока все задачи всё ещё претерпевают обычный процесс прекращения.
Это решение ведёт себя лучшим образом в процессе останова, но всё ещё имеет недостатки. В целом, будет достаточно неудобно собирать все имеющиеся фьючерсы Исполнителя на протяжении всей вашей программы в некую коллекцию, которую вы можете применять в своей последовательности останова; затем создавать некую группу задачи- плюс- фьючерсы; а потом запускать это вплоть до завершения. Это работает, но имеется лучший способ.
Пример 3-26. Вариант C: используйте ваш собственный Исполнитель и ждите
# quickstart.py
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor as Executor
async def main():
print(f'{time.ctime()} Hello!')
await asyncio.sleep(1.0)
print(f'{time.ctime()} Goodbye!')
loop.stop()
def blocking():
time.sleep(2.0)
print(f"{time.ctime()} Hello from a thread!")
loop = asyncio.get_event_loop()
executor = Executor()(1)
loop.set_default_executor(executor)(2)
loop.create_task(main())
future = loop.run_in_executor(None, blocking)(3)
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
for t in tasks:
t.cancel()
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
executor.shutdown(wait=True)(4)
loop.close()
-
(1) На этот раз мы создаём свой собственный экземпляр Исполнителя.
-
(2) Вам придётся настроить свой собственный индивидуальный Исполнитель в качестве установленного по умолчанию для основного цикла. Это означает, что делая вызов
run_in_executor()
из любого места своего кода, он применяться ваш индивидуальный экземпляр. -
(3) Как и ранее, запускаем соответствующую функцию с блокировкой.
-
(4) Наконец, мы можем в явном виде выполнять ожидание для всех своих заданий Исполнителя для завершения до закрытия основного цикла. Это исключит все видимые нами ранее сообщения "Event loop is closed". Мы можем сделать это потому что у нас имеется доступ к соответствующему объекту Исполнителя; установленный по умолчанию Исполнитель теперь не выставлен в соответствующем API
asyncio
и по этой причине мы не можем вызывать останов в нём.
Наконец, у нас есть некая стратегия с общей приемлемостью: вы можете вызвать run_in_executor()
где угодно, а ваша программа всё равно будет останавливаться чисто.
Я настоятельно рекомендую вам поэкспериментировать со всеми показанными здесь примерами кода и испытайте различные стратегии для создания задач и заданий Исполнителя, заменяйте их поочерёдно во времени попытайтесь выполнять завершения чисто.
В одном из предыдущих разделов я сказал, что вам не следует передавать некие переменные цикла внутри своей программы (если вам требуется доступ к
самому циклу), а вы просто можете применять asyncio.get_event_loop()
для получения данного цикла всякий раз
как он вам потребуется.
Однако, при написании проверок элементов, вы приходится работать со множеством циклов событий, так как каждая проверка элемента обычно требует некоторого нового экземпляра цикла. Вновь возникает основной вопрос: следует ли вам писать код, который требует передачи циклов событий в параметрах функций для поддержки проверок элементов?
Давайте рассмотрим некий пример, применяющий великолепные инструментальные средства проверки pytest. Для начала взглянем на ту функцию, которую мы хотим проверить:
Пример 3-27. Иногда вам приходится применять call_soon()
import asyncio
from typing import Callable
async def f(notify: Callable[[str], None]):(1)
# <...some code...>
loop = asyncio.get_event_loop()(2)
loop.call_soon(notify, 'Alert!')(3)
# <...some code...>
-
(1) Представим себе некую функцию сопрограммы
f
, внутри которой необходимо применитьloop.call_soon()
для того чтобы активировать другую функцию оповещения. (Она может выполнять протоколирование, записывать сообщения в Slack, короткие запоминания, либо что- то ещё, что вы можете придумать!) -
(2) В этой функции мы не получаем значение цикла через соответствующие параметры функции, поэтому они получаются через
get_event_loop()
; помните, данный вызов всегда возвращает тот цикл, который ассоциирован с данным текущим потоком. -
(3) Некий основной вызов предупреждения.
Лучший способ для переброски pytest в ваш код asyncio состоит в предоставлении некоторого цикла событий через какое- то приспособление. Pytest встраивает такое в каждый ваш тест в виде некоего параметра функции. Это звучит более ложно, чем есть на самом деле, поэтому вот пример приспособления для предоставления некоторого цикла событий:
Пример 3-28. Приспособление Pytest для предоставления какого- то цикла событий - с некоторой ошибкой!
# conftest.py(1)
import pytest
@pytest.fixture(scope='function')(2)
def loop():
loop = asyncio.new_event_loop()(3)
try:
yield loop
finally:
loop.close()(4)
-
(1) Pytest, по определению, автоматически будет импортирован и сделан доступным для всех ваших модулей проверок, причём все они определены в некотором файле с названием
conftest.py
. -
(2) Это создаёт некое приспособление. Установленный аргумент "scope" сообщает Pytest когда данное приспособление должно быть завершено и сделано некое новое. Для сферы "function", как и выше, каждый отдельный тест будет иметь изготовленным некий новый цикл.
-
(3) Создайте некий совершенно новый экземпляр цикла. Отметим, что мы не запускаем данный цикл. Мы оставляем это для каждого теста в котором будет использовано данное приспособление.
-
(4) Когда данное приспособление будет завершено (в нашем случае после каждого теста), закрываем основной цикл.
Приведённый выше код вызывает некую проблему, так что не применяйте его. Эта проблема едва уловима, но она также является основной сутью
данного раздела, поэтому в ближайшее время мы обсудим её подробно. Вначале давайте рассмотрим все проверки для функции сопрограммы
f
:
Пример 3-29. Соответствующие проверки
from somewhere import f(1)
def test_f(loop):(2)
collected_msgs = [](3)
def dummy_notify(msg):(4)
collected_msgs.append(msg)
loop.create_task(f(dummy_notify))(5)
loop.call_later(1, loop.stop)(6)
loop.run_forever()
assert collected_msgs[0] == 'Alert!'(7)
-
(1) Рассматривайте этот образец кода как псевдокод - нет никакого модуля
somewhere
! Здесьf
ссылается на функцию сопрограммы, определяемой далее. -
(2) Pytest распознаёт здесь соответствующий аргумент "loop", отыскивает это имя в соответствующем списке приспособлений и затем вызывает наш приведённый выше
test_f()
совместно с этим новым экземпляром цикла, выпускаемым из данного приспособления. -
(3) Мы собираемся передать некий обратный вызов "пустышки"
notify()
вf
, данный перечень соберёт все получаемые предупреждения (а также наш тест проверяет что мы получаем правильные сообщения обратно). -
(4) Это поддельная функция
notify()
, требующаясяf
. -
(5) Планируем некую сопрограмму из
f
, передавая свой поддельный обратный вызовnotify()
. -
(6) Для запуска своего цикла мы применяем
run_forever()
, следовательно это гарантирует нам что данный цикл будет на самом деле остановлен. В качестве альтернативы мы можем применятьrun_until_complete(f(...))
без некоторогоcall_later()
; однако что вы выберете зависит от того что вы проверяете. Для эффектов стороны проверки, как и в приводимом тесте , я обнаружил, что проще применятьrun_forever()
. С другой стороны, когда вы проверяете возвращаемые значения из сопрограмм, лучше воспользоватьсяrun_until_complete()
. [А ещё один вариант может быть очень полезным для ускорения проверок: я также могу вызыватьloop.stop()
изнутриfake_notify()
, непосредственно сразу после сбора всехmsg
в свой перечень. Это сберегает время, поскольку данный вызовrun_forever()
немедленно снимает блокировку и мы не выполняем ожидание дляf()
чтобы производить любую дальнейшую обработку. Конечно, это может означать что вы получаете предупредительный вывод об "отложенных задачах" когда данное приспособление вызываетloop.close()
в процессе завершения. Тестирование является хитроумным искусством и всякая ситуация имеет свои отличительные особенности.] -
(7) Наконец, проверяем!
Помните я сказал, что у ас была некая проблема? Вот она: внутри функции сопрограммы f
наш возвращаемый из
get_event_loop()
экземпляр цикла является иным по отношению к тому циклу
событий, который был предоставлен нам нашим приспособлением. Данная проверка завершается по этой причине неудачей; на самом деле данный цикл
из get_event_loop()
даже никогда не будет запущен.
Наше решение этой дилеммы состоит в добавлении некоторого параметра loop
в явном виде в нашу функцию
сопрограмму f
. Затем вы всегда можете удостовериться что был использован правильный экземпляр цикла. На
практике, однако, это может вызывать страшную головную боль, так как большие базовые коды потребуют гигантского числа функций для передачи по
имеющемуся циклу.
Вот некий лучший способ: когда в игре некий новый цикл, установите этот цикл чтобы он был "тем
самым" циклом связанным с нашим текущим потоком, что означает что всякий вызов к get_event_loop()
возвратит совершенно новый образец. Это работает очень хорошо для тестирования. Всё что вам требуется сделать, так это слегка изменить наше
приспособление pytest:
Пример 3-30. Более удобное приспособление для pytest
# conftest.py
import pytest
@pytest.fixture(scope='function')
def loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)(1)
try:
yield loop
finally:
loop.close()
-
(1) Это та самая магическая строка: после вызова
set_event_loop()
, всякий последующий вызовasyncio.get_event_loop()
будет возвращать тот экземпляр цикла, который создаётся в данном приспособлении, а вам не придётся в явном виде передавать такой экземпляр цикла через все вызовы вашей функции.
Мы затратили достаточно много страниц чтобы получить важное исправление одной строки, однако это было необходимо чтобы объяснить
почему эта строка была так важна. Как и в предыдущих разделах, моя цель состояла не в том чтобы рассказать вам
об "единственном верном пути", а вместо этого дать вам руководство в вашем понимании как вы можете включать
asyncio
в свой набор инструментов.