Глава 6. Дополнения

Краткая история поддержки Async в Python

Несмотря на то, что модуль asyncore долгое время был частью стандартной библиотеки PythonВ, он страдает фундаментальными недостатками, проистекающими из не гибкого API, который не соответствует ожиданиям современного асинхронного сетевого модуля.

Более того, его подход черезчур упрощён для того чтобы предоставлять разработчикам все инструменты, необходимые им для применения полного потенциала построения асинхронных сетей.

Самое популярное решение в наши дни состоит в использовании при производстве библиотек сторонних производителей. Они зачастую предоставляют удовлетворительные решения, но между этими библиотеками отсутствует совместимость, что приводит к тому, что кодовые базы очень тесно переплетены с теми библиотеками, которые они применяют.[1]

-- Лоуренс ван Хутвен, PEP3153 (май 2012), Поддержка асинхронного ввода/ вывода

Основная цель данного раздела состоит в кратком описании всей истории, стоящей позади асинхронного программирования в Python, причём основной момент, который я хочу отметить - который всё ещё поражает меня когда я задумываюсь об этом - так это то, что тем ключевым нововведением, которого мы ожидали на протяжении 20 лет был синтаксис языка.

Многие люди могут быть удивлены этим, но Asyncio не самая первая попытка добавить поддержку для асинхронного сетевого программирования в Python, и именно это обсуждается следующим разделом.

В самом начале был asyncore

[При сопоставлении с asyncore,] Twisted лучше практически во всех возможных отношениях. Он более портативный, более функциональный, проще, более масштабируемый, облалает лучшим сопровождением, лучше документирован и он способен делать деликатесные омлет. Asyncore, по сути, устарел.[2]

-- Glyph ca. 2010

asyncore следует рассматривать как нечто исторически ископаемое, причём он на самом деле никогда не применялся.[3]

-- Жан Поль Кальдероун ca. 2013 в StackOverflow

Поддержка так называемой "асинхронной функциональности" была добавлена в Python давным- давно, в модуле asyncore. Как вам это могут подсказать цитаты к данной главе, приём asyncore был равнодушным, а использование низким. Что удивляет, по крайней мере автора этих строк, так это когда это было сделано: в Python 1.5.2! Вот что говорится в заголовке Lib/asyncore.py из исходного кода CPython:


# ­*­ Mode: Python ­*­
# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
# Author: Sam Rushing <rushing@nightmare.com>

# =============================================================
# Copyright 1996 by Sam Rushing
 	   

Более того, самый первый параграф документации Python для asyncore говорит следующее, и это запросто может появиться и в документации asyncio в наши дни:

Данный модуль предоставляет базовую инфраструктуру для написания служб клиентов и серверов асинхронных сокетов.

Существует всего два способа иметь некую программу в единственном процессоре выполняющей "более чем одну вещь в некий момент времени". Многопоточное программирование является самым простым и наиболее популярным способом осуществления этого, но существует и другая, очень отличающаяся техника, которая позволяет вам иметь почти все преимущества многопоточности без реального применения множества потоков. Это на самом деле представляет практическую ценность только если ваша программа в основном связана с вводом/ выводом. Если ваша программа привязана к процессору, тогда предварительно запланированные потоки, скорее всего, являются именно тем что вам требуется. Однако сетевые серверы редко привязаны к процессору.

1996, я не ослышался? Ясно, что в 1996 в Python уже можно было управлять множеством событий сокета в каком- то отдельном потоке (причём, на самом деле, намного раньше чем в прочих языках программирования). Так что изменилось за последние 20 лет, что делает Asyncio особенным теперь?

Ответом является синтаксис языка. Мы намерены рассмотреть это более пристально в своём следующем разделе, но прежде чем мы закроем это окно в прошлое, будет не лишним отметить некую небольшую деталь в документации для asyncore в наши дни:

Исходный код: Lib/asyncore.py

Заявлена устаревшей начиная с версии 3.6: Применяйте, пожалуйста, вместо него asyncio.[4]

-- Официальная документация стандартной библиотеки Python ca. декабрь 2016

Путь к естественным сопрограммам

Напомним, что мы используем термин "Asyncio" как для ссылки на собственно изменения в синтаксисе языка Python, так и для самого нового модуля asyncio в его стандартной библиотеке [asyncio был добавлен в Python 3.4]. Давайте слегка больше погрузимся в это отличие.

В наши дни поддержка асинхронного программирования в Python имеет три отличающихся компонента, причём изучая их также интересно наблюдать когда они были добавлены в Python:

  1. Синтаксис языка: Генераторы

    1. Ключевое слово yield, добавленное в Python 2.2 (2001) в PEP 255 и расширенное в Python 2.5 (2005) в PEP 342 при помощи методов send() и throw() в объектах генератора, что впервые сделало возможным использование генераторов в виде сопрограмм.

    2. Ключевое слово yield from, добавленное в Python 3.3 (2009) в PEP 380 чтобы сделать намного более простым работу с вложенными выдач (yields) генераторов, в частности, в том случае, когда генераторы применяются как сопрограммы временной замены [makeshift: прилагательное, "дейстующее как промежуточное или временное средство"].

  2. Синтаксис языка: Сопрограммы

    1. Ключевые слова async и await, добавленные в Python 3.5 (2015) в PEP 492, которые дали первоклассную поддержку сопрограммам в качестве собственного самостоятельного языкового свойства. Это также означало, что генераторы могут снова применяться в качестве генераторов даже внутри функций сопрограмм.

  3. Библиотечный модуль: asyncio добавленные в Python 3.4 (2012) в PEP 3156, предоставил поддержку "всё включено" как для проектировщиков инфраструктур, так и для конечных пользователей для работы напрямую с сопрограммами и сетевым вводом, выводом. Критически важно что сама архитектура имеющегося в asyncio цикла событий была предназначена для предоставления общей основы поверх которой могут быть приведены к стандарту прочие имеющиеся сторонние производители инфраструктур, таких как Tornado и Twisted.

Все эти три момента совершенно отличаются друг от друга, хотя вам простительна такая патакица, ибо всю историю данной разработки этих свойств в Python трудно было отследить.

Воздействие нового синтаксиса для async и await является существенным, причём оно оказало влияние также и на прочие языки программирования, такие как JavaScript, C#, Scala, Kotlin и Dart.

Тысячи вовлечённых в проект Python программистов потратили много времени, чтобы донести до нас этот момент.

Пример Столовых приборов с использованием Asyncio

В своём Примере: Роботы и Столовые приборы мы проанализировали ошибку условий состязательности вызываемую тем, что множество потоков изменяют записи столовых приборов в общем глобальном экземпляре объекта "кухня". Для придания законченного вида здесь приводится некая версия того как мы могли бы создать некий вариант asyncio той же самой задачи.

Имеется особый момент, который я бы хотел подчеркнуть, относительно наблюдаемости распараллеливания при таком подходе asyncio:

 

Пример 6-1. Управление столовыми приборами при помощи asyncio


import asyncio

class CoroBot(): (1)
  def __init__(self):
    self.cutlery = Cutlery(knives=0, forks=0)
    self.tasks = asyncio.Queue() (2)

  async def manage_table(self):
    while True:
      task = await self.tasks.get() (3)
      if task == 'prepare table':
        kitchen.give(to=self.cutlery, knives=4, forks=4)
      elif task == 'clear table':
        self.cutlery.give(to=kitchen, knives=4, forks=4)
      elif task == 'shutdown':
        return

from attr import attrs, attrib

@attrs
class Cutlery:
    knives = attrib(default=0)
    forks = attrib(default=0)

    def give(self, to: 'Cutlery', knives=0, forks=0):
        self.change(­knives, ­forks)
        to.change(knives, forks)

    def change(self, knives, forks):
        self.knives += knives
        self.forks += forks

kitchen = Cutlery(knives=100, forks=100)
bots = [CoroBot() for i in range(10)]

import sys
for b in bots:
    for i in range(int(sys.argv[1])):
        b.tasks.put_nowait('prepare table')
        b.tasks.put_nowait('clear table')
    b.tasks.put_nowait('shutdown')

print('Kitchen inventory before service:', kitchen)

loop = asyncio.get_event_loop()
tasks = []
for b in bots:
    t = loop.create_task(b.manage_table())
    tasks.append(t)

task_group = asyncio.gather(*tasks)
loop.run_until_complete(task_group)
print('Kitchen inventory after service:', kitchen)
 	   
  • (1) Вместо ThreadBot у нас теперь есть CoroBot. И это на самом деле бот! В данном примере кода имеется только один поток, причём этот поток будет управлять всеми десятью отдельными экземплярами "CoroBot", по одному для каждого стола в нашем ресторане.

  • (2) Вместо queue.Queue мы используем очередь с включённым asyncio.

  • (3) Именно это самый важный момент: единственным местом, в котором исполнение может переключаться между различными экземплярами CoroBot, это то, в котором появляется ключевое слово await. Не возможно иметь некое переключение контекста на протяжении всего остатка данной функции и именно по этой причине нет никакого условия состязательности на протяжении данного изменения инвентаризации кухонных столовых приборов.

Само присутствие ключевых слов await делает переключения контекста отслеживаемым. Это делает гораздо более простым делать заключение относительно потенциального условия состязательности в приложениях с одновременным исполнением. Для приведённого выше кода CoroBot, наш тест всегда выполняется вне зависимости от того сколько задач назначать:


$ python cutlery_test_corobot.py 100000
Kitchen inventory before service: Cutlery(knives=100, forks=100)
Kitchen inventory after service: Cutlery(knives=100, forks=100)
		

Это на самом деле совсем не производит никакого впечатления: это абсолютно предсказуемый результат и он основан на том факте, что очевидно, что в данном коде отсутствует условие состязательности. И вся суть именно в этом.

Вспомогательные материалы для вытаскивания новостей с вебсайтов

Для запуска нашего кода в Примере: Вытаскивание новостей требуется такой файл index.html:


<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF­8">
    <title>The News</title>
    <style>
        .wrapper {
            display: grid;
            grid­template­columns: 300px 300px 300px;
            grid­gap: 10px;
            width: 920px;
            margin: 0 auto;
        }

        .box {
            border­radius: 40px;
            padding: 20px;
            border: 1px solid slategray;
        } 

        .cnn {
            background­color: #cef;
        }

        .aljazeera {
            background­color: #fea;
        } 

        h1 {
            text­align: center;
            font­size: 60pt;
        }

        a {
            color: black;
            text­decoration: none;
        }

        span {
            text­align: center;
            font­size: 15pt;
            color: black;
        }
    </style>
</head>
<body>
<h1>The News</h1>
<div class="wrapper">
    $body
</div>
</body>
</html>
 	   

Это достаточно базовый шаблон с элементарными стилями.

Вспомогательные материалы для примера ZeroMQ

В разделе ØMQ, в Пример: APM (мониторинг производительности приложения), я упоминал, что вам потребуется соответствующий файл HTML, который обслуживает показ диаграмм измерений. Вот этот файл, charts.html, приводимый ниже. Вам следует получить некий URL для smoothie.min.js с smoothiecharts.org или из своей CDN и применить этот URL в качестве соответствующего атрибута src вместо применяемого:

 

Пример 6-2. charts.html


<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF­8">
    <title>Server Performance</title>
    <script src="smoothie.min.js"></script>
    <script type="text/javascript">
        function createTimeline() {
            var cpu = {}; (1)
            var mem = {};

            var chart_props = {
                responsive: true,
                enableDpiScaling: false,
                millisPerPixel:100,
                grid: {
                    millisPerLine: 4000,
                    fillStyle: '#ffffff',
                    strokeStyle: 'rgba(0,0,0,0.08)',
                    verticalSections: 10
                },
                labels:{fillStyle:'#000000',fontSize:18},
                timestampFormatter:SmoothieChart.timeFormatter,
                maxValue: 100,
                minValue: 0
            };

            var cpu_chart = new SmoothieChart(chart_props); (2)
            var mem_chart = new SmoothieChart(chart_props);

            function add_timeseries(obj, chart, color) { (3)
                obj[color] = new TimeSeries();
                chart.addTimeSeries(obj[color], {
                    strokeStyle: color,
                    lineWidth: 4
                })
            }

            var evtSource = new EventSource("/feed"); (4)
            evtSource.onmessage = function(e) {
                var obj = JSON.parse(e.data); (5)
                if (!(obj.color in cpu)) {
                    add_timeseries(cpu, cpu_chart, obj.color);
                }
                if (!(obj.color in mem)) {
                    add_timeseries(mem, mem_chart, obj.color);
                }
                cpu[obj.color].append(
                    Date.parse(obj.timestamp), obj.cpu); (6)
                mem[obj.color].append(
                    Date.parse(obj.timestamp), obj.mem);
            };

            cpu_chart.streamTo(
                document.getElementById("cpu_chart"), 1000
            );
            mem_chart.streamTo(
                document.getElementById("mem_chart"), 1000
            );
        }
    </script>
    <style>
        h1 {
            text­align: center;
            font­family: sans­serif;
        }
    </style>
</head>
<body onload="createTimeline()">
    <h1>CPU (%)</h1>
    <canvas id="cpu_chart" style="width:100%; height:300px">
    </canvas>
    <hr>
    <h1>Memory usage (MB)</h1>
    <canvas id="mem_chart" style="width:100%; height:300px">
    </canvas>
 	   
  • (1) Каждый из cpu и mem соответствует объектам TimeSeries(). Каждое свойство будет выделено неким цветом, а его значением будет экземпляр TimeSeries().

  • (2) Один экземпляр диаграммы создаётся для использования ЦПУ, а второй для оперативной памяти.

  • (3) Мы создаём экземпляр TimeSeries() внутри соответствующего события onmessage() для текущего экземпляра EventSource. Это означает, что для каждых приходящих новых данных (например, для различных названий цветов) будет автоматически получаться новая временная последовательность, созданная под него. Наша функция add_timeseries() одновременно и создаёт соответствующий экземпляр временной последовательности, и добавляет его к заданному экземпляру chart.

  • (4) Создаём новый экземпляр EventSource в соответствующем URL /feed. Наш браузер подключится к этой оконечной точке в нашем сервере, metric­server.py. Отметим, что сам браузер попытается автоматически выполнить повторное подключение в случае его утраты. SSE (Server­sent events, отправляемые сервером события) часто остаются недооценёнными, однако имеется множество ситуаций при которых внутренняя простота SSE более предпочтительна чем веб сокеты.

  • (5) Соответствующее событие onmessage() зажигается всякий раз, когда сервер отправляет данные. В ланном месте выполняется разбор полученных данных в виде JSON.

  • (6) Повторный вызов данного идентификатора cpu устанавливает некое соответствие цвета экземпляру TimeSeries(). В этом месте мы получаем такую временную последовательность и добавляем её в конец. Такде мы получаем необходимый временной штамп и выполняем его разбор для получения правильного формата, требуемого со стороны данной диаграммы.

Обработка переключателя базы данных для примера asyncpg

В разделе asyncpg, Пример: несостоятельность кэширования, в интересах экономии места был опущен один из необходимых исходных файлов Python. Это файл представлен ниже с пояснениями:

 

Пример 6-3. triggers.py


# triggers.py
from asyncpg.connection import Connection (1)

async def create_notify_trigger( (2)
        conn: Connection,
        trigger_name: str = 'table_update_notify',
        channel: str = 'table_change') ­> None:
    await conn.execute(
        'CREATE EXTENSION IF NOT EXISTS hstore') (3)
    await conn.execute(
            SQL_CREATE_TRIGGER.format(
                trigger_name=trigger_name,
                channel=channel)) (4)

async def add_table_triggers( (5)
        conn: Connection,
        table: str,
        trigger_name: str = 'table_update_notify',
        schema: str = 'public') ­> None:
    templates = (SQL_TABLE_INSERT, SQL_TABLE_UPDATE,
                 SQL_TABLE_DELETE) (6)
    for template in templates:
        await conn.execute(
            template.format(
                table=table,
                trigger_name=trigger_name,
                schema=schema)) (7)

SQL_CREATE_TRIGGER = """\
CREATE OR REPLACE FUNCTION {trigger_name}()
  RETURNS trigger AS $$
DECLARE
  id integer; ­­ or uuid
  data json;
BEGIN
  data = json 'null';
  IF TG_OP = 'INSERT' THEN
    id = NEW.id;
    data = row_to_json(NEW);
  ELSIF TG_OP = 'UPDATE' THEN
    id = NEW.id;
    data = json_build_object(
      'old', row_to_json(OLD),
      'new', row_to_json(NEW),
      'diff', hstore_to_json(hstore(NEW) ­ hstore(OLD))
    );
  ELSE
    id = OLD.id;
    data = row_to_json(OLD);
  END IF;
  PERFORM
    pg_notify(
      '{channel}',
      json_build_object(
        'table', TG_TABLE_NAME,
        'id', id,
        'type', TG_OP,
        'data', data
      )::text
    );
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""" (8)

SQL_TABLE_UPDATE = """\
DROP TRIGGER IF EXISTS
  {table}_notify_update ON {schema}.{table};
CREATE TRIGGER {table}_notify_update
  AFTER UPDATE ON {schema}.{table}
    FOR EACH ROW
      EXECUTE PROCEDURE {trigger_name}();
""" (9)

SQL_TABLE_INSERT = """\
DROP TRIGGER IF EXISTS
  {table}_notify_insert ON {schema}.{table};
CREATE TRIGGER {table}_notify_insert
  AFTER INSERT ON {schema}.{table}
    FOR EACH ROW
      EXECUTE PROCEDURE {trigger_name}();
"""

SQL_TABLE_DELETE = """\
DROP TRIGGER IF EXISTS
  {table}_notify_delete ON {schema}.{table};
CREATE TRIGGER {table}_notify_delete
  AFTER DELETE ON {schema}.{table}
    FOR EACH ROW
      EXECUTE PROCEDURE {trigger_name}();
"""
 	   
  • (1) Данная функция требует asyncpg, хотя данный импорт используется только чтобы разрешить применение Connection в типе аннотаций.

  • (2) Эта функция сопрограммы create_notify_trigger() создаст необходимую функцию триггера саму по себе в нашей базе данных. Такая функция триггера будет содержать необходимое название канала, в который будут отправляться обновления. Необходимый код для самой функуии содержится в идентификаторе SQL_CREATE_TRIGGER и он устанавливается как некая форматированная строка.

  • (3) Вспоминаем из своего образца примера, что уведомления изменения содержат некий раздел "diff" в котором отображается само отличие между старыми и новыми данными. Мы применяем свойство hstore PostgreSQL для вычисления данного отличия. Оно представляет нечто близкое к семантике наборов. Требуемое расширение hstore не включено по умолчанию, поэтому мы разрешаем его здесь.

  • (4) В данный шаблон подставляются желаемые названия триггера и канала, а затем он исполняется.

  • (5) Наша вторая функция, add_table_triggers(), подключит полученную функцию триггера к таким событиям таблицы как вставка, обновление и удаление.

  • (6) Имеются три формата строк для каждого из трёх методов.

  • (7) Желаемый переменные подставляются в соответствующие шаблоны и после этого исполняются.

  • (8) Зтот SQL отнял у меня слегка больше времени чем ожидалось, прежде чем стал в точности правильным! Данная процедура PostgreSQL вызывается для вставки, обновления и удаления; тот способ, которым мы определяем что именно проверяется в соответствующей переменной TG_OP. Если данной операцией является INSERT, тогда будет определён NEWOLD не будет задан). Для DELETE будет определён OLD, а не NEW. Для UPDATE определены оба, что позволяет нам вычислить их разницу. Мы также можем воспользоваться встроенной поддержкой PostreSQL для JSON при помощи функций row_to_json() и hstore_to_json(): это означает, что наш обработчик обратного вызова получит допустимый JSON.

    Наконец, наш вызыов соответствующей функции pg_notify() именно то, что на самом деле отправляет необходимое событие. Все подписчики на {channel} получат необходимое уведомление.

  • (9) Это достаточно стандартныц код триггера: он просту устанавливает некий триггер для вызова какой- то определённой процедуры {trigger_name}() в случае возникновения определённого события, например, INSERT или UPDATE.

Наверняка может иметься множество полезных приложений, которые можно построить на основе уведомлений, получаемых от PostgreSQL.

Вспомогательные материалы для примера Sanic: aelapsed и aprofiler

 

Пример 6-4. perf.py


# perf.py
import logging
from time import perf_counter
from inspect import iscoroutinefunction

logger = logging.getLogger('perf')

def aelapsed(corofn, caption=''): (1)
    async def wrapper(*args, **kwargs):
        t0 = perf_counter()
        result = await corofn(*args, **kwargs)
        delta = (perf_counter() ­ t0) * 1e3
        logger.info(
            f'{caption} Elapsed: {delta:.2f} ms')
        return result
    return wrapper

def aprofiler(cls, bases, members): (2)
    for k, v in members.items():
        if iscoroutinefunction(v):
            members[k] = aelapsed(v, k)
        return type.__new__(type, cls, bases, members)
 	   
  • (1) Наш декоратор aelapsed() запишет значение времени, которое заняло исполнение обёрнутой им сопрограммы.

  • (2) Наш метакласс aprofiler()обеспечит, что все участники этого класса, а именно функции сопрограмм, получат обёртку в соответствующем декораторе aelapsed().