Глава 5. Асинхронное программирование

Помимо обсуждавшихся последовательной и параллельной моделей исполнения имеется некая третья, и именно она фундаментально важна совместно с понятием программирования событий: асинхронная модель.

Сама модель асинхронных задач может быть реализована посредством отдельного основного потока управления, причём как в системах с единственным процессором, так и в многопроцессорных системах. При одновременной асинхронной модели выполнения сами исполнения различных задач пересекаются по ходу временной линии и всё происходит под воздействием некого отдельного потока выполнения (ситуация с одним потоком). После запуска, само выполнение задач может приостанавливаться , а затем возобновляться по прошествию времени, перемежаясь с исполнением прочих текущих представленных задач.

Разработка кода для такой асинхронной модели целиком отличается от того что представляет многопоточное программирование. Значимое отличие между одновременной многопоточной параллельной моделью и однопотоковой одновременной моделью состоит в том факте, что в первом случае сама ОС принимает решение на временной линии в случае когда мы притормаживаем деятельность в одном потоке запускать другой.

Это остаётся вне поля зрения пишущего код, в отличии от асинхронной модели. Исполнение или прекращение некой здачи продолжается тогда, когда оно требуется в явном виде.

Самое основное свойство данного вида программирования состоит в том, что ваш код не выполняется во множестве потоков, как в некой классической одновременной программе, а в неком отдельном потоке. Таким образом, не всегда будет истинно что они выполняются почти в одно и то же время.

В частности, мы обсудим модуль Python asyncio, который был предложен в Python 3.4. Он делает для нас возможным применение сопрограмм (coroutines) и фьючерсов (futures) для упрощения написания асинхронного кода и делает его намного более читаемым.

В этой главе мы обсудим следующие рецепты:

  • Применение модуля Python concurrent.futures

  • Управление основным циклом событий при помощи asyncio

  • Обработка сопрограмм посредством asyncio

  • Манипуляции с задачами через asyncio

  • Работа с asyncio и фьючерсами

Применение модуля Python concurrent.futures

Модуль concurrent.futures, который является частью стандартной библиотеки Python, предоставляет некий уровень абстракции поверх потоков путём моделирования их в качестве асинхронных функций.

Этот модуль строится на основе двух основных классов:

  • concurrent.futures.Executor: Это некий класс абстракции, который предоставляет методы для асинхронного исполнения вызовов.

  • concurrent.futures.Future: Инкапсулирует некое доступное к вызову исполнение. Объекты фьючерсов создают экземпляры посредством представление задач (функции с необязательными параметрами) для Executor. {Прим. пер.: мне нравится в качестве синонима слово "местоблюститель", некий объект, создаваемый под последующее размещение в нём какого- то исполнителя в виде размещаемой позже ссылки на функцию, возможно без параметров. Спейсмейкер под мировой рекорд в марафоне Элиуда Кипчоге.}

Вот некоторые основные методы этого модуля:

  • submit(function,argument): Планирует своими аргументами само выполнение способной к вызову функции {подстановка}.

  • map(function,argument): Выполняет получаемые аргументами функции в неком асинхронном режиме {установка соответствия}.

  • shutdown(Wait=True): Выдаёт исполнителю (Executor) сигнал на освобождение всех ресурсов.

К необходимым исполнителям доступ осуществляется через их подклассы: ThreadPoolExecutor или ProcessPoolExecutor. По той причине, что размещение экземпляров потоков и процессов является требующей ресурсов задачей, лучше помещать такие ресурсы в некий пул и пользоваться ими в качестве повторных запусков или исполнителей (что и определяет само понятие Executor) для параллельных или одновременных задач.

Сам применяемый нами здесь подход предусматривает применение некого пула исполнителей. Мы будем помещать необходимые средства в свой пул (потоки и процессы) и получать фьючерсы, что в результате приводит к тому, что они будут нам доступны в будущем. Естественно, мы можем дождаться чтобы все фьючерсы принесли реальные результаты.

Некий пул потоков или процессов (также именуемый слиянием - pooling) обозначает некое программное обеспечение управления, которое используется для оптимизации и упрощения использования потоков и/ или процессов внутри некой программы. Через размещение в пулах мы можем подставлять необходимую задачу (или задачи) для их выполнения таким накопителем (pooler).

Такой пул снабжается некой внутренней очередью приостановленных задач и ряда потоков или процессов, которая и исполняет их. В пуле вторично применяется концепция периодичности: некий поток (или процесс) используется определённое число раз для различных задач в процессе своего жизненного цикла. Это снижает накладные расходы создания новых потоков или процессов и увеличивает общую производительность данной программы.

Повторное использование не догма, а всего лишь одна из основных причин, которая побуждает создателя программы применять слияние в пулы для своих задач.

Приготовление

Модуль concurrent.futures предоставляет два подкласса {Прим. пер.: в случае появления новых, они могут быть сюда добавлены} в своём классе Executor, который асинхронно манипулирует неким пулом потоков и пулом процессов. Эти два подкласса следующие:

  • concurrent.futures.ThreadPoolExecutor(max_workers)

  • concurrent.futures.ProcessPoolExecutor(max_workers)

Значение параметра max_workers указывает максимальное число исполнителей для выполнения асинхронных вызовов.

Как это сделать...

Здесь приводится некий пример применения пула потоков и процессов, в котором мы будем сравнивать время исполнения с тем временем, которое требуется для последовательного выполнения.

Подлежащая выполнению задача такова: у нас имеется список из 10 элементов. Каждый элемент этого списка создан для подсчёта 100 000 000 (просто чтобы занять время), а после этого, самое последнее число умножается на iй элемент из данного списка. В частности, мы выполняем оценку для таких случаев:

  • Последовательное выполнение

  • Пул потоков с пятью исполнителями

  • Пул процессов с пятью исполнителями

Теперь давайте рассмотрим как это сделать:

  1. Импортируем относящиеся к делу библиотеки:

    
    import concurrent.futures
    import time
    	   
  2. Задаём список чисел от 1 до 10:

    
    number_list = list(range(1, 11))
    	   
  3. Наша функция count(number) перечисляет числа от 1 до 100000000, а затем возвращает произведение number × 100,000,000:

    
    def count(number):
        for i in range(0,100000000):
            i += 1
        return i*number
    	   
  4. Следующая функция evaluate(item) вычисляет нашу функцию count для значения параметра item. Она выводит на печать само значение item и получаемый count(item) результат:

    
    def evaluate(item):
        result_item = count(item)
        print('Item %s, result %s' % (item, result_item))
    	   
  5. В __main__ осуществляются последовательное выполнение, пул потоков и пул процессов:

    
    if __name__ == '__main__':
    	   
  6. Для нашего последовательного исполнения имеющаяся функция evaluate исполняется для каждого элемента из number_list. После этого выводится на печать время исполнения:

    
    start_time = time.clock()
        for item in number_list:
            evaluate(item)
        print('Sequential Execution in %s seconds' % (time.clock() -\ 
            start_time))
    	   
  7. Вне зависимости от того исполняются ли потоки или процессы, используется одно и то же число исполнителей (max_workers=5). Естественно, для обоих пулов отображается время выполнения:

    
    start_time = time.clock()
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as\ 
        executor:
            for item in number_list:
                executor.submit(evaluate, item)
        print('Thread Pool Execution in %s seconds' % (time.clock() -\ 
            start_time))
        start_time = time.clock()
        with concurrent.futures.ProcessPoolExecutor(max_workers=5) as\ 
        executor:
            for item in number_list:
                executor.submit(evaluate, item)
        print('Process Pool Execution in %s seconds' % (time.clock() -\ 
            start_time))
    	   

Как это работает...

Мы строим некий список чисел, хранимый в number_list:


number_list = list(range(1, 11))
 	   

Для каждого элемента из этого списка мы выполняем свою процедуру перечисления пока не достигнем 100000000 итераций, а затем умножаем полученное число на 100000000:


def count(number) : 
    for i in range(0, 100000000):
        i=i+1
    return i*number

def evaluate_item(x):
    result_item = count(x)
 	   

В своей программе main мы выполняем ту же самую задачу последовательно:


if __name__ == "__main__":
   for item in number_list:
       evaluate_item(item)
 	   

Затем в параллельном режиме применяем возможности слития в пул concurrent.futures для некого пула потоков:


with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    for item in number_list:
        executor.submit(evaluate, item)
 	   

И то же самое для пула процесов:


with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
    for item in number_list:
        executor.submit(evaluate, item)
 	   
[Совет]Совет

Обратите внимание, что и пул потоков и пул процессов настроены с max_workers=5; более того, когда max_workers равен None, он будет определяться общим числом процессоров в вашей машине.

Для выполнения этой программы откройте приглашение Командной строки и в той же самой папке, где находится данный пример, наберите следующее:


> python concurrent_futures_pooling.py
		

Выполнив свой предыдущий пример мы можем наблюдать исполнение трёх моделей работы с сопоставлением времён:


Item 1, result 10000000
Item 2, result 20000000
Item 3, result 30000000
Item 4, result 40000000
Item 5, result 50000000
Item 6, result 60000000
Item 7, result 70000000
Item 8, result 80000000
Item 9, result 90000000
Item 10, result 100000000
Sequential Execution in 6.8109448 seconds
Item 2, result 20000000
Item 1, result 10000000
Item 4, result 40000000
Item 5, result 50000000
Item 3, result 30000000
Item 8, result 80000000
Item 7, result 70000000
Item 6, result 60000000
Item 10, result 100000000
Item 9, result 90000000
Thread Pool Execution in 6.805766899999999 seconds
Item 1, result 10000000
Item 4, result 40000000
Item 2, result 20000000
Item 3, result 30000000
Item 5, result 50000000
Item 6, result 60000000
Item 7, result 70000000
Item 9, result 90000000
Item 8, result 80000000
Item 10, result 100000000
Process Pool Execution in 4.166398899999999 seconds
		

Следует отметить, что хотя наш пример и не затратен в плане вычислений, последовательное исполнение и выполнение пула потоков сопоставимы по времени. Наивысший результат времени выполнения даёт пул процессов.

Этот пул распределяет все процессы (в нашем случае пять процессов) между всеми доступными ядрами (для примера этой машины мы применяли четыре ядра) в режиме FIFO (first in, first out, Первый пришёл, первым обработан).

Таким образом, для каждого из ядер все назначенные процессы выполняются последовательно. Сам пул планирует выполнение следующего процесса только после осуществления операций ввода/ вывода. Естественно, механизм будет тем же самым и при выполнении пула потоков.

Значения времён исполнения, которые меньше в случае нашего пула процессов, отсылают нас обратно к тому факту, что операции ввода/ вывода не значимы. Это позволяет нашему пулу процессов оказаться быстрее, так как в отличии от потоков они не требуют механизма синхронизации (как это пояснялось в рецепте Введение в параллельное программирование Python из Главе 1, Приступая к параллельному программированию и Python {Прим. пер.: так в авторском тексте. Более правильной причиной более плохого времени исполнения пула потоков по сравнению с пулом процессов, всё же кажется фактическое последовательное исполнение пула потоков.}.

Также ознакомьтесь...

Технологии пулов широко применяются в серверных приложениях, поскольку они требуются для управления множеством одновременных запросов от любого числа клиентов.

Многие иные приложения, тем не менее, требуют чтобы всякое действие выполнялось немедленно или чтобы вы имели больше контроля над тем потоком, который его выполняет: в этом случае слияние в пул будет не лучшим выбором.

Дополнительно

Занимательное руководство по concurrent.futures можно найти по следующей ссылке.

Управление основным циклом событий через asyncio

Модуль Python asyncio предоставляет средства для управления событиями , сопрограммами, задачами, а также потоками и примитивами синхронизации для написания одновременного кода.

Основные компоненты этого модуля таковы:

  • Цикл событий: Модуль asyncio делает возможным один цикл событий на процесс. Именно это тот логический элемент, который имеет дело с управлением исполнения различных задач и их распределением. В частности, он регистрирует имеющиеся задачи и управляет ими переключая свой поток управления с одной задачи на другую.

  • Сопрограммы: Это обобщение имеющегося понятия подпрограммы. Кроме того, некая сопрограмма способна приостанавливаться в процессе исполнения для ожидания внешней обработки (неких процедур с вводом/ выводом) и возобновляться с точки останова после внешней обработки.

  • Фьючерсы: Определяют объект Future в точности как это делает модуль concurrent.futures. Они представляют ещё не выполненные вычисления.

  • Задачи: Это подкласс asyncio, который применяется для инкапсуляции сопрограмм и управления ими в параллельном режиме.

В этом рецепте мы сосредоточимся на концепции событий и управление событием (а именно, на цикле событий) внутри некой программы.

Понимание циклов событий

В информатике некое событие (event) это прерванное данной программой действие, которое может управляться самой этой программой. В качестве примера таким событием может выступать соответствующее возможное нажатие на клавишу пользователем в процессе взаимодействия с его графическим интерфейсом, определённое нажатие на клавишу его физической клавиатуры, некий внешний сигнал прерывания или, более абстрактно, приём данных через имеющуюся сетевую среду. Однако в более общем случае может происходить событие в любом другом виде, которое может быть выявлено и обработано неким образом.

Внутри системы тот логический элемент, который способен вырабатывать события имеет название источника событий (event source), в то время как тот логический элемент, который имеет дело с обработкой какого- то сособного произойти события это обработчик события (event handler).

Программная конструкция Цикла событий (event loop) реализует необходимую функциональность управления событиями в рамках программы. Более точно, такой цикл событий работает циклически в процессе всего исполнения данной программы, отслеживая события, которые происходят внутри некой структуры данных постановки в очередь, а затем обрабатывают их по одному за раз вызывая надлежащий обработчик событий, когда его основной поток свободен.

Здесь отображён пример псевдокода такого цикла событий:


while (1) {
    events = getEvents()
    for (e in events)
        processEvent(e)
}
 	   

Все события, которые поданы в данный цикл while улавливаются и затем обрабатываются своим обработчиком событий. Соответствующий обработчик событий, который обрабатывает некое событие это всего лишь действие, имеющее место в данной системе. После завершения данного обработчика, управление передаётся определённому запланированному следующему событию.

Для управления неким циклом событий asyncio предоставляет следующие методы:

  • loop = get_event_loop(): Получает для текущего контекста надлежащий цикл событий.

  • loop.call_later(time_delay,callback,argument): Этим выставляется необходимый обратный вызов, который следует выполнить через заданный в секундах time_delay.

  • loop.call_soon(callback, argument): Выставляет обратный вызов, подлежащий настолько быстрому, насколько это возможно вызову . Такой вызываемый после call_soon() обратный вызов (подробнее...) возвращается после возврата управления в данный цикл событий.

  • loop.time(): Возвращает текущую величину времени в виде значения float (подробнее...), согласно внутренним часам данного цикла событий.

  • asyncio.set_event_loop(): Закольцовывает необходимый цикл событий для данного текущего контекста.

  • asyncio.new_event_loop(): Создаёт и возвращает некий новый объект цикла событий в соответствии с данными правилами политики.

  • loop.run_forever(): Выполняется вплоть до вызова stop() (подробнее...).

Как это сделать...

В данном примере мы рассмотрим как использовать предоставляемый библиотекой asyncio оператор цикла событий для построения неого работающего в асинхронном режиме приложения.

Для этого примера мы определяем три задачи. Каждая из задач имеет некое время исполнения, задаваемое случайным параметром времени. После завершения исполнения Task A вызывает Task B, Task B вызывает Task C, а Task C вызывает Task A.

Данный цикл событий будет продолжаться пока не осуществится некое условие прекращения. Как мы можем себе представить, этот пример следует такой асинхронной схеме:

 

Рисунок 5-1


Модель асинхронного программирования

Давайте рассмотрим следующие шаги:

  1. Начнём с импорта тех библиотек, которые потребуются в нашей реализации:

    
    import asyncio
    import time
    import random
    	   
  2. Далее мы определяем task_A, чьё время исполнения определяется случайным образом и может меняться от 1 до 5 секунд. По завершению её исполнения, если не было выполнено условие прекращения, управление переходит к task_B:

    
    def task_A(end_time, loop):
        print ("task_A called")
        time.sleep(random.randint(0, 5))
        if (loop.time() + 1.0) < end_time:
            loop.call_later(1, task_B, end_time, loop)
        else:
            loop.stop()
    	   
  3. Здесь задаётся task_B. Её время исполнения определяется случайным образом и может меняться от 4 до 7 секунд. В самом конце выполнения при не удовлетворении условия прекращения вычисление переходит к task_C:

    
    def task_B(end_time, loop):
        print ("task_B called ")
        time.sleep(random.randint(3, 7))
        if (loop.time() + 1.0) < end_time:
            loop.call_later(1, task_C, end_time, loop)
        else:
            loop.stop()
    	   
  4. После этого реализуется task_C. Её время выполнения определяется случайным образом в диапазоне от 6 до 10 секунд. По окончанию её исполнения, когда не удовлетворяется условие прекращения, управление переходит к task_A:

    
    def task_C(end_time, loop):
        print ("task_C called")
        time.sleep(random.randint(5, 10))
        if (loop.time() + 1.0) < end_time:
            loop.call_later(1, task_A, end_time, loop)
        else:
            loop.stop()
    	   
  5. Наш следующий оператор определяет необходимый параметр loop, который просто получает значение текущего цикла событий:

    
    loop = asyncio.get_event_loop()
    	   
  6. Значение величины end_loop определяет необходимое условие прекращения. Выполнение данного примера должно продолжаться 60 секунд:

    
    end_loop = loop.time() + 60
    	   
  7. После этого давайте запросим выполнение task_A:

    
    loop.call_soon(task_A, end_loop, loop)
    	   
  8. Теперь мы настраиваем цикл длительного выполнения, который продолжит откликаться на события пока они не будут остановлены:

    
    loop.run_forever()
    	   
  9. После этого мы закрываем свой цикл событий:

    
    loop.close()
    	   

Как это работает...

Для управления выполнением нашими тремя задачами, task_A, task_B и task_C, нам требуется перехватить установленный цикл событий:


loop = asyncio.get_event_loop()
	   

Затем мы планируем самый первый вызов task_A при помощи конструкции call_soon:


end_loop = loop.time() + 60
loop.call_soon(function_1, end_loop, loop)
	   

Давайте рассмотрим имеющееся определение task_A:


def task_A(end_time, loop):
    print ("task_A called")
    time.sleep(random.randint(0, 5))
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, task_B, end_time, loop)
    else:
        loop.stop()
	   

Установленное асинхронное поведение нашего приложения определяется следующими параметрами:

  • time.sleep(random.randint(0, 5)): Определяет продолжительность времени выполнения данной задачи.

  • end_time: Задаёт верхний предел времени внутри task_A и выполняет вызов task_B через метод call_later.

  • loop: Это сам цикл событий, ранее перехваченный р помощи метода get_event_loop().

После выполнения этой задачи loop.time сравнивается с end_time. Если значение времени выполнения находится в рамках установленного максимального времени (60 секунд), тогда наше вычисление продолжается вызовом task_B, в противном случае наше выполнение завершается, закрывая и сам цикл событий:


if (loop.time() + 1.0) < end_time:
        loop.call_later(1, task_B, end_time, loop)
    else:
        loop.stop()
	   

Для всех остальных задач все действия практически те же самые, за исключением значения времени выполнения и вызова следующей задачи.

Теперь позвольте мне подытожить положение дел:

  • task_A вызывает task_B со случайным значением времени исполнения между 1 и 5 секундами.

  • task_B вызывает task_C со случайным значением времени исполнения между 4 и 7 секундами.

  • task_C вызывает task_A со случайным значением времени исполнения между 6 и 10 секундами.

По истечениию отведённого на выполнение времени наш цикл должен завершиться:


loop.run_forever()
loop.close()
	   

Возможный вывод данного примера выглядит следующим образом:


task_A called
task_B called
task_C called
task_A called
task_B called
task_C called
task_A called
task_B called
task_C called
task_A called
task_B called
task_C called
task_A called
task_B called
task_C called
		

Также ознакомьтесь...

Асинхронное программирование событий заменяет некий вид одновременного программирования при котором отдельные части определённой программы выполняются одновременно в различных потоках, которые имеют доступ к одним и тем же данным в памяти, тем самым порождая проблемы критичных запусков. В то же самое время возникла необходимость наличия возможности применения различных ядер современных ЦПУ, так как для определённых областей производительность, аналогичная предоставляемая процессорами уже не может достигаться в процессоре с единственным ядром.

Дополнительно

Вот неплохое введение в asyncio.

{Прим. пер.: обращаем также ваше внимание на наши переводы Полного руководство параллельного программирования на Python Куан Нгуена и Asyncio в Python 3 Цалеба Хаттингха.}

Обработка сопрограммам при помощи asyncio

На протяжении различных представленных примеров мы наблюдали, что когда программа становится слишком длинной и сложной, удобно делить её на подпрограммы, причём каждая из них реализует некую особую задачу. Однако подпрограммы не могут выполняться независимо, а лишь по определённому запросу от своей основной программы, которая отвечает за координацию самого применения подпрограмм.

В данном разделе мы введём некое обобщение имеющегося понятия подпрограмм, имеющее название сопрограмм (coroutines): также как и подпрограммы, сопрограммы выполняют некий отдельный вычислительный этап, однако в отличии от подпрограмм нет никакой программы main для координации получаемых результатов. Такие сопрограммы связывают себя самостоятельно в виде некого конвейера без какой бы то ни было функции наблюдения, которая бы отвечала за их вызов в каком- то определённом порядке.

В некой сопрограмме конкретная точка исполнения может быть приостановлена и возобновлена позднее, так как сама сопрограмма отслеживает имеющееся состояние выполнения. Имея некий пул сопрограмм становится возможным перекрытие необходимых вычислений: самая первая исполняется пока она не вернёт управление обратно, после этого запускается вторая и следует до установленной черты.

такое чередование управляется устанавливаемым циклом событий, который был описан в рецепте Управление основным циклом событий через asyncio. Он отслеживает все имеющиеся сопрограммы и планирует их выполнение.

Прочие важные стороны сопрограмм таковы:

  • Сопрограммы допускают множество точек входа, которые способны выполнять выдачу множество раз.

  • Сопрограммы могут передавать управление любой иной сопрограмме.

Сам термин выдача (yield) применяется здесь для описания приостановки сопрограммы и передаче потока управления в другую сопрограмму.

Приготовление

Для работы с сопрограммами мы будем пользоваться такой нотацией:


import asyncio 

@asyncio.coroutine
def coroutine_function(function_arguments):
    ............
    DO_SOMETHING
    ............
	   

Сопрограммы применяют синтаксис yield from, введённый PEP 380 для останова выполнения текущих вычислений и приостановки имеющегося внутреннего состояния сопрограммы.

На практике, в случае применения yield from future, данная сопрограмма приостанавливается до тех пор, пока делается future, затем будет распространён полученный результат future (либо возникнет исключительная ситуация); в случае же yield from coroutine, наша сопрограмма ожидает производства результата другой сопрограммой, который будет опубликован (либо возникнет исключительная ситуация).

Как мы увидим в своём следующем примере, в котором наши сопрограммы будут использованы для имитации некой машины конечных состояний, мы воспользуемся такой нотацией же yield from coroutine.

[Замечание]Замечание

Дополнительные сведения о сопрограммах в asyncio доступны по ссылке.

{Прим. пер.: обращаем также ваше внимание на наши переводы Полного руководство параллельного программирования на Python Куан Нгуена и Asyncio в Python 3 Цалеба Хаттингха.}

Как это сделать...

В данном примере мы рассмотрим как применять сопрограммы для имитации машины конечных состояний из пяти состояний.

Некая машина конечных состояний или автомат конечных состояний это математическая модель, которая широко используется в инженерных дисциплинах, а также в таких науках как математика и информатика.

Тот автомат, поведение которого мы намерены имитировать пр помощи сопрограмм выглядит так:

 

Рисунок 5-2


Машина конечных состояний

Состояниями нашей системы являются S0, S1, S2, S3 и S4 со значениями 0 и 1: теми значениями, при которых наш автомат способен переходить из одного состояния в своё следующее состояние (эта операция имеет название перехода - transition). итак, например, состояние S0 способно перейти в состояние S1, но только для своего значения 1, и состояние S0 может перейти в состояние S2 только для своего значения 0.

аш следующий код имитирует некий переход нашего автомата из состояния S0 (начальное состояние) вплоть до состояния S4 (конечное, терминальное состояние):

  1. Самый первый шаг, очевидно, импорт всех относящихся к делу библиотек:

    
    import asyncio
    import time
    from random import randint
    	   
  2. Далее мы определяем свою сопрограмму, относящуюся к start_state. Её параметр input_value вычисляется случайным образом; он может принимать значения 0 или 1. Когда он равен 0, тогда управление переходит сопрограмме state2; в противном случае она меняется на state1:

    
    @asyncio.coroutine
    def start_state():
        print('Start State called\n')
        input_value = randint(0, 1)
        time.sleep(1)
        if input_value == 0:
            result = yield from state2(input_value)
        else:
            result = yield from state1(input_value)
        print('Resume of the Transition:\nStart State calling'+ result)
    	   
  3. Вот сопрограмма для state1 имеет параметр input_value, который позволяет передачу значения состояния. Как и в случае с input_value, он вычисляется случайным образом. Когда он равен 0, тогда управление переходит сопрограмме state2; в противном случае она меняется на state3:

    
    @asyncio.coroutine
    def state1(transition_value):
        output_value ='State 1 with transition value = %s\n'% \
                                                 transition_value
        input_value = randint(0, 1)
        time.sleep(1)
        print('...evaluating...')
        if input_value == 0:
            result = yield from state3(input_value)
        else:
            result = yield from state2(input_value)
        return output_value + 'State 1 calling %s' % result
    	   
  4. Аналогично и для сопрограммы state2 с параметром input_value. Когда он равен 0, тогда управление переходит сопрограмме state3; в противном случае она меняется на state1:

    
    @asyncio.coroutine
    def state2(transition_value):
        output_value = 'State 2 with transition value = %s\n' %\
                                                 transition_value
        input_value = randint(0, 1)
        time.sleep(1)
        print('...evaluating...')
        if input_value == 0:
            result = yield from state1(input_value)
        else:
            result = yield from state3(input_value)
        return output_value + 'State 2 calling %s' % result
    	   
  5. И, наконец у сопрограммы state2 параметр input_value также позволяет передавать управление. Когда он равен 0, тогда управление переходит сопрограмме state1; в противном случае управление отдаётся в end_state:

    
    @asyncio.coroutine
    def state3(transition_value):
        output_value = 'State 3 with transition value = %s\n' %\
                                                     transition_value
        input_value = randint(0, 1)
        time.sleep(1)
        print('...evaluating...')
        if input_value == 0:
            result = yield from state1(input_value)
        else:
            result = yield from end_state(input_value)
        return output_value + 'State 3 calling %s' % result
    	   
  6. end_state выводит на печать значение параметра transition_value, который сделал возможным переход в это состояние и затем останавливает вычисления:

    
    @asyncio.coroutine
    def end_state(transition_value):
        output_value = 'End State with transition value = %s\n'%\
                                                    transition_value
        print('...stop computation...')
        return output_value
    	   
  7. В своей функции __main__ получается необходимый цикл событий, а затем мы начинаем свою имитацию машины конечных состояний, вызывая start_state своего автомата:

    
    if __name__ == '__main__':
        print('Finite State Machine simulation with Asyncio Coroutine')
        loop = asyncio.get_event_loop()
        loop.run_until_complete(start_state())
    	   

Как это работает...

Каждое состояние нашего автомата было определено с использованием необходимого декоратора:


@asyncio.coroutine
	   

Например, так определяется состояние S0:


@asyncio.coroutine
def StartState():
    print ("Start State called \n")
    input_value = randint(0,1)
    time.sleep(1)
    if (input_value == 0):
        result = yield from State2(input_value)
    else :
        result = yield from State1(input_value)
	   

переход на следующее состояние задаётся input_value, которое определяется функцией randint (0,1) модуля Python random. Эта функция случайным образом производит некое значение 0 или 1.

Тем самым, randint случайно определяет то состояние, в которое перейдёт наша машина конечных состояний:


input_value = randint(0,1)
	   

После определения передаваемых значений, текущая сопрограмма вызывает свою следующую сопрограмму при помощи команды yield from:


if (input_value == 0):
        result = yield from State2(input_value)
    else :
        result = yield from State1(input_value)
	   

Значением переменной result является то значение, которое возвращается каждой сопрограммой. Это некая строка и, по завершению вычислений, мы имеем возможность реконструкции всего перехода от начального состояния своего автомата, start_state, вплоть до end_state.

Наша программа main запускает вычисления внутри установленного цикла событий:


if __name__ == "__main__":
    print("Finite State Machine simulation with Asyncio Coroutine")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(StartState())
	   

Исполнив полученный код мы получаем вывод, подобный следующему:


Finite State Machine simulation with Asyncio Coroutine
Start State called
...evaluating...
...evaluating...
...evaluating...
...evaluating...
...stop computation...
Resume of the Transition : 
Start State calling State 1 with transition value = 1
State 1 calling State 2 with transition value = 1
State 2 calling State 1 with transition value = 0
State 1 calling State 3 with transition value = 0
State 3 calling End State with transition value = 1
		

Также ознакомьтесь...

Вплоть до выхода Python 3.5, его модуль asyncio применял генераторы для подражания асинхронным вызовам, а следовательно, имел иной синтаксис чем у текущей версии Python 3.5.

Python 3.5 ввёл ключевые слова async и await. Обратите внимание на отсутствие скобок вокруг вызова await func().

Ниже приводится некий образец "Hello, world!", применяющий такой новый синтаксис asyncio, введённый Python 3.5+:


import asyncio
 
async def main():
    print(await func())
 
async def func():
    # Do time intensive stuff...
    return "Hello, world!"
 
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
	   

Дополнительно

Хорошее описание сопрограмм в Python имеется в следующем блоге.

{Прим. пер.: обращаем также ваше внимание на наши переводы Полного руководство параллельного программирования на Python Куан Нгуена и Asyncio в Python 3 Цалеба Хаттингха.}

Манипулирование задачами при помощи asyncio

Модуль asyncio спроектирован для обработки асинхронных процессов и исполнения одновременных задач поверх некого цикла событий. Он также предоставляет свой класс asyncio.Task() для целей обёртывания сопрограмм в некой задаче (подробнее...). Его использование делает возможным независимо запускаемым задачам исполняться одновременно с прочими задачами в одном и том же цикле событий.

Когда некая сопрограмма обёртывается в какую- то задачу, он подключает Task к имеющемуся циклу задач и затем запускается автоматически после старта самого цикла, тем самым предоставляя некий механизм автоматического продвижения такой сопрограммы.

Модуль asyncio предоставляет метод asyncio.Task(coroutine) для обработки вычислений посредством задач; более того, asyncio.Task(coroutine) составляет расписание соответствующего исполнения сопрограммы (подробнее...)

.

Задача отвечает за выполнение объекта сопрограммы в каком- то цикле событий.

Когда такая обёрнутая сопрограмма применяет установленную нотацию yields from future, как это уже описывалось в нашем разделе Обработка сопрограммам при помощи asyncio, тогда такая задача приостанавливает своё выполнение обёрнутой сопрограммы и дожидается (awaits) завершение соответствующего фьючерса (future).

По окончанию фьючерса, возобновляется прерванное исполнение такой обёрнутой сопрограммы с полученным результатом фьючерса или вызванной в нём исключительной ситуацией. Кроме того, мы должны обратить внимание что некий цикл событий за раз запускает лишь одну задачу. Прочие задачи могут исполняться параллельно когда прочие циклы событий запускаются в разных потоках.

Пока некая задача дожидается завершения какого- то фьючерса, её цикл событий исполняет новую задачу.

Как это сделать...

В данном примере мы покажем как наш оператор asyncio.Task() способен выполнять одновременно три математические функции:

  1. Естественно, давайте начнём с импорта необходимой библиотеки asyncio:

    
    import asyncio
    	   
  2. В самой первой сопрограмме определяется функция factorial:

    
    @asyncio.coroutine
    def factorial(number):
        f = 1
        for i in range(2, number + 1):
            print("Asyncio.Task: Compute factorial(%s)" % (i))
            yield from asyncio.sleep(1)
            f *= i
        print("Asyncio.Task - factorial(%s) = %s" % (number, f))
    	   
  3. После которой определяется наша вторая функция - функция fibonacci:

    
    @asyncio.coroutine
    def fibonacci(number):
        a, b = 0, 1
        for i in range(number):
            print("Asyncio.Task: Compute fibonacci (%s)" % (i))
            yield from asyncio.sleep(1)
            a, b = b, a + b
        print("Asyncio.Task - fibonacci(%s) = %s" % (number, a))
    	   
  4. И самая последняя подлежащая одновременному исполнению функция это вычисление биноминального коэффициента:

    
    @asyncio.coroutine
    def binomial_coefficient(n, k):
        result = 1
        for i in range(1, k + 1):
            result = result * (n - i + 1) / i
            print("Asyncio.Task: Compute binomial_coefficient (%s)" % 
                (i))
            yield from asyncio.sleep(1)
        print("Asyncio.Task - binomial_coefficient(%s , %s) = %s" % 
            (n,k,result))
    	   
  5. В нашей программе __main__ task_list содержит все функции, которые должны выполняться параллельно при помощи функции asyncio.Task:

    
    if __name__ == '__main__':
        task_list = [asyncio.Task(factorial(10)),
                     asyncio.Task(fibonacci(10)),
                     asyncio.Task(binomial_coefficient(20, 10))]
    	   
  6. Наконец, мы обзаводимся необходимым циклом событий и запускаем свои вычисления:

    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(task_list))
    loop.close()
    	   

Как это работает...

Все сопрограммы определяются с помощью необходимой аннотации @asyncio.coroutine (имеющей название декоратора):


@asyncio.coroutine
def function (args):
    do something
	   

Для параллельного запуска каждая функция выступает неким аргументом в соответствующем модуле asyncio.Task и таким образом они включаются в task_list:


if __name__ == '__main__':
    task_list = [asyncio.Task(factorial(10)),
                 asyncio.Task(fibonacci(10)),
                 asyncio.Task(binomial_coefficient(20, 10))]
	   

Затем мы получаем необходимый цикл событий:


loop = asyncio.get_event_loop()
	   

Наконец, мы добавляем это выполнение task_list в полученный цикл событий:


loop.run_until_complete(asyncio.wait(task_list))
loop.close()
	   
[Совет]Совет

Обратите внимание, что наш оператор asyncio.wait(task_list) дожидается завершения приданных ему сопрограмм.

Вывод нашего предыдущего кода выглядит как- то так:


Asyncio.Task: Compute factorial(2)
Asyncio.Task: Compute fibonacci(0)
Asyncio.Task: Compute binomial_coefficient(1)
Asyncio.Task: Compute factorial(3)
Asyncio.Task: Compute fibonacci(1)
Asyncio.Task: Compute binomial_coefficient(2)
Asyncio.Task: Compute factorial(4)
Asyncio.Task: Compute fibonacci(2)
Asyncio.Task: Compute binomial_coefficient(3)
Asyncio.Task: Compute factorial(5)
Asyncio.Task: Compute fibonacci(3)
Asyncio.Task: Compute binomial_coefficient(4)
Asyncio.Task: Compute factorial(6)
Asyncio.Task: Compute fibonacci(4)
Asyncio.Task: Compute binomial_coefficient(5)
Asyncio.Task: Compute factorial(7)
Asyncio.Task: Compute fibonacci(5)
Asyncio.Task: Compute binomial_coefficient(6)
Asyncio.Task: Compute factorial(8)
Asyncio.Task: Compute fibonacci(6)
Asyncio.Task: Compute binomial_coefficient(7)
Asyncio.Task: Compute factorial(9)
Asyncio.Task: Compute fibonacci(7)
Asyncio.Task: Compute binomial_coefficient(8)
Asyncio.Task: Compute factorial(10)
Asyncio.Task: Compute fibonacci(8)
Asyncio.Task: Compute binomial_coefficient(9)
Asyncio.Task - factorial(10) = 3628800
Asyncio.Task: Compute fibonacci(9)
Asyncio.Task: Compute binomial_coefficient(10)
Asyncio.Task - fibonacci(10) = 55
Asyncio.Task - binomial_coefficient(20, 10) = 184756.0
		

Также ознакомьтесь...

asyncio предоставляет и иные способы планирования задач при помощи методов ensure_future() или AbstractEventLoop.create_task(), причём оба они получают некий объект сопрограммы.

Дополнительно

Дополнительные сведения относительно задач asyncio можно почерпнуть в следующем руководстве.

{Прим. пер.: обращаем также ваше внимание на наши переводы Полного руководство параллельного программирования на Python Куан Нгуена и Asyncio в Python 3 Цалеба Хаттингха.}

Работа с asyncio и фьючерсы

Другим ключевым компонентом модуля asyncio выступает его класс asyncio.Future. Он очень похож на concurrent.Futures, но, конечно же, он адаптирован под самый основной механизм asyncio: его цикл событий.

Наш класс asyncio.Future представляет некий результат (но это также может быть некая исключительная ситуация), который пока ещё не доступен.

Тем самым он представляет некую абстракцию чего- то что ещё не достигнуто. Те обратные вызовы, которые и должны вырабатывать все результаты, по существу, добавляются в экземпляры этого класса.

Приготовление

Для определения некого объекта future следует применять такой синтаксис:


future = asyncio.Future
	   

Основные методу управления данным объектом такие:

  • cancel(): Отменяет данный объект future и планирует обратные вызовы.

  • result(): Возвращает полученные результаты, представленные данным future.

  • exception(): Возвращает значение исключительной ситуации, установленной в данном future.

  • add_done_callback(fn): Добавляет некий обратный вызов, подлежащий исполнению после выполнения future.

  • remove_done_callback(fn): Удаляет все экземпляры некого обратного вызова из данного вызова по завершению.

  • set_result(result): Помечает future как выполненный и устанавливает его результат.

  • set_exception(exception): Помечает future как выполненный и устанавливает его исключительную ситуацию.

Как это сделать...

Наш следующий пример показывает как применять класс asyncio.Future для управления двумя сопрограммами: first_coroutine и second_coroutine, которые выполняют такие задачи. first_coroutine выполняет суммирование первых N целых, а second_coroutine вычисляет значение факториала N:

  1. Сейчас давайте импортируем относящиеся к делу библиотеки:

    
    import asyncio
    import sys
    	   
  2. first_coroutine реализует необходимую функцию sum для первых N целых:

    
    @asyncio.coroutine
    def first_coroutine(future, num):
        count = 0
        for i in range(1, num + 1):
            count += i
        yield from asyncio.sleep(1)
        future.set_result('First coroutine (sum of N integers)\
                          result = %s' % count)
    	   
  3. В second_coroutine мы всё ещё реализуем соответствующую функцию factorial:

    
    @asyncio.coroutine
    def second_coroutine(future, num):
        count = 1
        for i in range(2, num + 1):
            count *= i
        yield from asyncio.sleep(2)
        future.set_result('Second coroutine (factorial) result = %s' %\ 
                          count)
    	   
  4. Воспользовавшись функцией got_result мы выводим на печать полученное в результате вычислений:

    
    def got_result(future):
        print(future.result())
    	   
  5. В нашей функции main параметры num1 должны быть установлены пользователем. Они будут применяться в качестве параметров для тех функций, которые реализованы в первой и второй сопрограммах:num2

    
    if __name__ == "__main__":
        num1 = int(sys.argv[1])
        num2 = int(sys.argv[2])
    	   
  6. Теперь давайте получим необходимый цикл событий:

    
    loop = asyncio.get_event_loop()
    	   
  7. Здесь посредством функции asyncio.future определяются наши фьючерсы:

    
    future1 = asyncio.Future()
    future2 = asyncio.Future()
    	   
  8. Наши две сопрограммы - first_couroutine и second_couroutine - включённые в установленный список tasks имеют фьючерсы future1 и future2, определённые пользователем аргумента и значения параметров num1 и num2:

    
    tasks = [first_coroutine(future1, num1),
            second_coroutine(future2, num2)]
    	   
  9. Данные фьючерсы добавляются как некий обратный вызов:

    
    future1.add_done_callback(got_result)
    future2.add_done_callback(got_result)
    	   
  10. Затем установленный список tasks добавляется к имеющемуся циклу событий чтобы можно было приступить к вычислениям:

    
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    	   

Как это работает...

В своей программе main мы определяем объекты future, соответственно future1 и future2, воспользовавшись директивой asyncio.Future():


if __name__ == "__main__":
        future1 = asyncio.Future()
        future2 = asyncio.Future()
	   

При определении своих задач, мы передаём необходимые объекты future как аргументы своих двух сопрограмм first_couroutine и second_couroutine:


tasks = [first_coroutine(future1,num1), 
         second_coroutine(future2,num2)]
	   

Наконец, мы добавляем некий обратный вызов, который надлежит исполнить после выполнения future:


future1.add_done_callback(got_result)
future2.add_done_callback(got_result)
	   

Здесь got_result это некая функция, которая выводит на печать полученные future результаты:


def got_result(future):
    print(future.result())
	   

В саму сопрограмму мы передаём необходимый объект future в качестве какого- то аргумента. После его вычисления мы устанавливаем время sleep в 3 секунды для своей первой сопрограммы и в 4 секунды для второй:


yield from asyncio.sleep(sleep_time)
	   

Приводимый ниже вывод получен путём вычисления данной команды с различными значениями:


> python asyncio_and_futures.py 1 1
First coroutine (sum of N integers) result = 1
Second coroutine (factorial) result = 1

> python asyncio_and_futures.py 2 2
First coroutine (sum of N integers) result = 2
Second coroutine (factorial) result = 2

> python asyncio_and_futures.py 3 3
First coroutine (sum of N integers) result = 6
Second coroutine (factorial) result = 6

> python asyncio_and_futures.py 5 5
First coroutine (sum of N integers) result = 15
Second coroutine (factorial) result = 120

> python asyncio_and_futures.py 50 50
First coroutine (sum of N integers) result = 1275
Second coroutine (factorial) result = 30414093201713378043612608166064768844377641568960512000000000000 
First coroutine (sum of N integers) result = 1275
		

Также ознакомьтесь...

Мы можем поменять местами получаемые результаты, то есть первым получить вывод second_coroutine, простой перестановкой времён сна между своими сопрограммами: yield from asyncio.sleep(2) в определении first_coroutine и yield from asyncio.sleep(1) в определении second_coroutine. Это можно показать в таком примере:


> python asyncio_and_future.py 1 10
second coroutine (factorial) result = 3628800
first coroutine (sum of N integers) result = 1
		

Дополнительно

Дополнительные примеры asyncio и фьючерсов можно найти в следующем блоге.