Глава 6. Дополнения
Содержание
- Глава 6. Дополнения
- Краткая история поддержки Async в Python
- Пример Столовых приборов с использованием Asyncio
- Вспомогательные материалы для вытаскивания новостей с вебсайтов
- Вспомогательные материалы для примера ZeroMQ
- Обработка переключателя базы данных для примера asyncpg
- Вспомогательные материалы для примера Sanic: aelapsed и aprofiler
Несмотря на то, что модуль asyncore долгое время был частью стандартной библиотеки PythonВ, он страдает фундаментальными недостатками, проистекающими из не гибкого API, который не соответствует ожиданиям современного асинхронного сетевого модуля.
Более того, его подход черезчур упрощён для того чтобы предоставлять разработчикам все инструменты, необходимые им для применения полного потенциала построения асинхронных сетей.
Самое популярное решение в наши дни состоит в использовании при производстве библиотек сторонних производителей. Они зачастую предоставляют удовлетворительные решения, но между этими библиотеками отсутствует совместимость, что приводит к тому, что кодовые базы очень тесно переплетены с теми библиотеками, которые они применяют.[1]
-- Лоуренс ван Хутвен, PEP3153 (май 2012), Поддержка асинхронного ввода/ вывода
Основная цель данного раздела состоит в кратком описании всей истории, стоящей позади асинхронного программирования в Python, причём основной момент, который я хочу отметить - который всё ещё поражает меня когда я задумываюсь об этом - так это то, что тем ключевым нововведением, которого мы ожидали на протяжении 20 лет был синтаксис языка.
Многие люди могут быть удивлены этим, но Asyncio не самая первая попытка добавить поддержку для асинхронного сетевого программирования в Python, и именно это обсуждается следующим разделом.
[При сопоставлении с asyncore,] Twisted лучше практически во всех возможных отношениях. Он более портативный, более функциональный, проще, более масштабируемый, облалает лучшим сопровождением, лучше документирован и он способен делать деликатесные омлет. Asyncore, по сути, устарел.[2]
-- Glyph ca. 2010
asyncore следует рассматривать как нечто исторически ископаемое, причём он на самом деле никогда не применялся.[3]
-- Жан Поль Кальдероун ca. 2013 в StackOverflow
Поддержка так называемой "асинхронной функциональности" была добавлена в Python давным- давно, в модуле
asyncore
. Как вам это могут подсказать цитаты к данной главе, приём asyncore
был равнодушным, а использование низким. Что удивляет, по крайней мере автора этих строк, так это когда
это было сделано: в Python 1.5.2! Вот что говорится в заголовке Lib/asyncore.py
из исходного кода
CPython:
# * Mode: Python *
# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
# Author: Sam Rushing <rushing@nightmare.com>
# =============================================================
# Copyright 1996 by Sam Rushing
Более того, самый первый параграф документации Python для asyncore
говорит следующее, и это запросто может
появиться и в документации asyncio
в наши дни:
Данный модуль предоставляет базовую инфраструктуру для написания служб клиентов и серверов асинхронных сокетов.
Существует всего два способа иметь некую программу в единственном процессоре выполняющей "более чем одну вещь в некий момент времени". Многопоточное программирование является самым простым и наиболее популярным способом осуществления этого, но существует и другая, очень отличающаяся техника, которая позволяет вам иметь почти все преимущества многопоточности без реального применения множества потоков. Это на самом деле представляет практическую ценность только если ваша программа в основном связана с вводом/ выводом. Если ваша программа привязана к процессору, тогда предварительно запланированные потоки, скорее всего, являются именно тем что вам требуется. Однако сетевые серверы редко привязаны к процессору.
1996, я не ослышался? Ясно, что в 1996 в Python уже можно было управлять множеством событий сокета в каком- то отдельном потоке (причём, на самом деле, намного раньше чем в прочих языках программирования). Так что изменилось за последние 20 лет, что делает Asyncio особенным теперь?
Ответом является синтаксис языка. Мы намерены рассмотреть это более пристально в своём следующем разделе, но
прежде чем мы закроем это окно в прошлое, будет не лишним отметить некую небольшую деталь в документации для asyncore
в наши дни:
Исходный код: Lib/asyncore.py
Заявлена устаревшей начиная с версии 3.6: Применяйте, пожалуйста, вместо него asyncio.[4]
-- Официальная документация стандартной библиотеки Python ca. декабрь 2016
Напомним, что мы используем термин "Asyncio" как для ссылки на собственно изменения в синтаксисе языка Python, так и для самого нового
модуля asyncio
в его стандартной библиотеке [asyncio
был добавлен в
Python 3.4]. Давайте слегка больше погрузимся в это отличие.
В наши дни поддержка асинхронного программирования в Python имеет три отличающихся компонента, причём изучая их также интересно наблюдать когда они были добавлены в Python:
-
Синтаксис языка: Генераторы
-
Ключевое слово
yield
, добавленное в Python 2.2 (2001) в PEP 255 и расширенное в Python 2.5 (2005) в PEP 342 при помощи методовsend()
иthrow()
в объектах генератора, что впервые сделало возможным использование генераторов в виде сопрограмм. -
Ключевое слово
yield from
, добавленное в Python 3.3 (2009) в PEP 380 чтобы сделать намного более простым работу с вложенными выдач (yields) генераторов, в частности, в том случае, когда генераторы применяются как сопрограммы временной замены [makeshift: прилагательное, "дейстующее как промежуточное или временное средство"].
-
-
Синтаксис языка: Сопрограммы
-
Ключевые слова
async
иawait
, добавленные в Python 3.5 (2015) в PEP 492, которые дали первоклассную поддержку сопрограммам в качестве собственного самостоятельного языкового свойства. Это также означало, что генераторы могут снова применяться в качестве генераторов даже внутри функций сопрограмм.
-
-
Библиотечный модуль:
asyncio
добавленные в Python 3.4 (2012) в PEP 3156, предоставил поддержку "всё включено" как для проектировщиков инфраструктур, так и для конечных пользователей для работы напрямую с сопрограммами и сетевым вводом, выводом. Критически важно что сама архитектура имеющегося вasyncio
цикла событий была предназначена для предоставления общей основы поверх которой могут быть приведены к стандарту прочие имеющиеся сторонние производители инфраструктур, таких как Tornado и Twisted.
Все эти три момента совершенно отличаются друг от друга, хотя вам простительна такая патакица, ибо всю историю данной разработки этих свойств в Python трудно было отследить.
Воздействие нового синтаксиса для async
и await
является
существенным, причём оно оказало влияние также и на прочие языки программирования, такие как JavaScript, C#, Scala, Kotlin и Dart.
Тысячи вовлечённых в проект Python программистов потратили много времени, чтобы донести до нас этот момент.
В своём Примере: Роботы и Столовые приборы мы проанализировали ошибку условий
состязательности вызываемую тем, что множество потоков изменяют записи столовых приборов в
общем глобальном экземпляре объекта "кухня". Для придания законченного вида здесь приводится некая версия того как мы могли бы создать
некий вариант asyncio
той же самой задачи.
Имеется особый момент, который я бы хотел подчеркнуть, относительно наблюдаемости распараллеливания при
таком подходе asyncio
:
Пример 6-1. Управление столовыми приборами при помощи asyncio
import asyncio
class CoroBot(): (1)
def __init__(self):
self.cutlery = Cutlery(knives=0, forks=0)
self.tasks = asyncio.Queue() (2)
async def manage_table(self):
while True:
task = await self.tasks.get() (3)
if task == 'prepare table':
kitchen.give(to=self.cutlery, knives=4, forks=4)
elif task == 'clear table':
self.cutlery.give(to=kitchen, knives=4, forks=4)
elif task == 'shutdown':
return
from attr import attrs, attrib
@attrs
class Cutlery:
knives = attrib(default=0)
forks = attrib(default=0)
def give(self, to: 'Cutlery', knives=0, forks=0):
self.change(knives, forks)
to.change(knives, forks)
def change(self, knives, forks):
self.knives += knives
self.forks += forks
kitchen = Cutlery(knives=100, forks=100)
bots = [CoroBot() for i in range(10)]
import sys
for b in bots:
for i in range(int(sys.argv[1])):
b.tasks.put_nowait('prepare table')
b.tasks.put_nowait('clear table')
b.tasks.put_nowait('shutdown')
print('Kitchen inventory before service:', kitchen)
loop = asyncio.get_event_loop()
tasks = []
for b in bots:
t = loop.create_task(b.manage_table())
tasks.append(t)
task_group = asyncio.gather(*tasks)
loop.run_until_complete(task_group)
print('Kitchen inventory after service:', kitchen)
-
(1) Вместо
ThreadBot
у нас теперь естьCoroBot
. И это на самом деле бот! В данном примере кода имеется только один поток, причём этот поток будет управлять всеми десятью отдельными экземплярами "CoroBot", по одному для каждого стола в нашем ресторане. -
(2) Вместо
queue.Queue
мы используем очередь с включённым asyncio. -
(3) Именно это самый важный момент: единственным местом, в котором исполнение может переключаться между различными экземплярами
CoroBot
, это то, в котором появляется ключевое словоawait
. Не возможно иметь некое переключение контекста на протяжении всего остатка данной функции и именно по этой причине нет никакого условия состязательности на протяжении данного изменения инвентаризации кухонных столовых приборов.
Само присутствие ключевых слов await
делает переключения контекста
отслеживаемым. Это делает гораздо более простым делать заключение относительно потенциального условия
состязательности в приложениях с одновременным исполнением. Для приведённого выше кода CoroBot
, наш тест
всегда выполняется вне зависимости от того сколько задач назначать:
$ python cutlery_test_corobot.py 100000
Kitchen inventory before service: Cutlery(knives=100, forks=100)
Kitchen inventory after service: Cutlery(knives=100, forks=100)
Это на самом деле совсем не производит никакого впечатления: это абсолютно предсказуемый результат и он основан на том факте, что очевидно, что в данном коде отсутствует условие состязательности. И вся суть именно в этом.
Для запуска нашего кода в Примере: Вытаскивание новостей требуется такой файл
index.html
:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF8">
<title>The News</title>
<style>
.wrapper {
display: grid;
gridtemplatecolumns: 300px 300px 300px;
gridgap: 10px;
width: 920px;
margin: 0 auto;
}
.box {
borderradius: 40px;
padding: 20px;
border: 1px solid slategray;
}
.cnn {
backgroundcolor: #cef;
}
.aljazeera {
backgroundcolor: #fea;
}
h1 {
textalign: center;
fontsize: 60pt;
}
a {
color: black;
textdecoration: none;
}
span {
textalign: center;
fontsize: 15pt;
color: black;
}
</style>
</head>
<body>
<h1>The News</h1>
<div class="wrapper">
$body
</div>
</body>
</html>
Это достаточно базовый шаблон с элементарными стилями.
В разделе ØMQ, в Пример: APM (мониторинг производительности приложения), я упоминал,
что вам потребуется соответствующий файл HTML, который обслуживает показ диаграмм измерений. Вот этот файл, charts.html
,
приводимый ниже. Вам следует получить некий URL для smoothie.min.js
с
smoothiecharts.org или из своей
CDN и применить этот URL в качестве соответствующего
атрибута src
вместо применяемого:
Пример 6-2. charts.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF8">
<title>Server Performance</title>
<script src="smoothie.min.js"></script>
<script type="text/javascript">
function createTimeline() {
var cpu = {}; (1)
var mem = {};
var chart_props = {
responsive: true,
enableDpiScaling: false,
millisPerPixel:100,
grid: {
millisPerLine: 4000,
fillStyle: '#ffffff',
strokeStyle: 'rgba(0,0,0,0.08)',
verticalSections: 10
},
labels:{fillStyle:'#000000',fontSize:18},
timestampFormatter:SmoothieChart.timeFormatter,
maxValue: 100,
minValue: 0
};
var cpu_chart = new SmoothieChart(chart_props); (2)
var mem_chart = new SmoothieChart(chart_props);
function add_timeseries(obj, chart, color) { (3)
obj[color] = new TimeSeries();
chart.addTimeSeries(obj[color], {
strokeStyle: color,
lineWidth: 4
})
}
var evtSource = new EventSource("/feed"); (4)
evtSource.onmessage = function(e) {
var obj = JSON.parse(e.data); (5)
if (!(obj.color in cpu)) {
add_timeseries(cpu, cpu_chart, obj.color);
}
if (!(obj.color in mem)) {
add_timeseries(mem, mem_chart, obj.color);
}
cpu[obj.color].append(
Date.parse(obj.timestamp), obj.cpu); (6)
mem[obj.color].append(
Date.parse(obj.timestamp), obj.mem);
};
cpu_chart.streamTo(
document.getElementById("cpu_chart"), 1000
);
mem_chart.streamTo(
document.getElementById("mem_chart"), 1000
);
}
</script>
<style>
h1 {
textalign: center;
fontfamily: sansserif;
}
</style>
</head>
<body onload="createTimeline()">
<h1>CPU (%)</h1>
<canvas id="cpu_chart" style="width:100%; height:300px">
</canvas>
<hr>
<h1>Memory usage (MB)</h1>
<canvas id="mem_chart" style="width:100%; height:300px">
</canvas>
-
(1) Каждый из
cpu
иmem
соответствует объектамTimeSeries()
. Каждое свойство будет выделено неким цветом, а его значением будет экземплярTimeSeries()
. -
(2) Один экземпляр диаграммы создаётся для использования ЦПУ, а второй для оперативной памяти.
-
(3) Мы создаём экземпляр
TimeSeries()
внутри соответствующего событияonmessage()
для текущего экземпляраEventSource
. Это означает, что для каждых приходящих новых данных (например, для различных названий цветов) будет автоматически получаться новая временная последовательность, созданная под него. Наша функцияadd_timeseries()
одновременно и создаёт соответствующий экземпляр временной последовательности, и добавляет его к заданному экземпляруchart
. -
(4) Создаём новый экземпляр
EventSource
в соответствующем URL/feed
. Наш браузер подключится к этой оконечной точке в нашем сервере,metricserver.py
. Отметим, что сам браузер попытается автоматически выполнить повторное подключение в случае его утраты. SSE (Serversent events, отправляемые сервером события) часто остаются недооценёнными, однако имеется множество ситуаций при которых внутренняя простота SSE более предпочтительна чем веб сокеты. -
(5) Соответствующее событие
onmessage()
зажигается всякий раз, когда сервер отправляет данные. В ланном месте выполняется разбор полученных данных в виде JSON. -
(6) Повторный вызов данного идентификатора
cpu
устанавливает некое соответствие цвета экземпляруTimeSeries()
. В этом месте мы получаем такую временную последовательность и добавляем её в конец. Такде мы получаем необходимый временной штамп и выполняем его разбор для получения правильного формата, требуемого со стороны данной диаграммы.
В разделе asyncpg
, Пример: несостоятельность
кэширования, в интересах экономии места был опущен один из необходимых исходных файлов Python. Это файл представлен ниже с пояснениями:
Пример 6-3. triggers.py
# triggers.py
from asyncpg.connection import Connection (1)
async def create_notify_trigger( (2)
conn: Connection,
trigger_name: str = 'table_update_notify',
channel: str = 'table_change') > None:
await conn.execute(
'CREATE EXTENSION IF NOT EXISTS hstore') (3)
await conn.execute(
SQL_CREATE_TRIGGER.format(
trigger_name=trigger_name,
channel=channel)) (4)
async def add_table_triggers( (5)
conn: Connection,
table: str,
trigger_name: str = 'table_update_notify',
schema: str = 'public') > None:
templates = (SQL_TABLE_INSERT, SQL_TABLE_UPDATE,
SQL_TABLE_DELETE) (6)
for template in templates:
await conn.execute(
template.format(
table=table,
trigger_name=trigger_name,
schema=schema)) (7)
SQL_CREATE_TRIGGER = """\
CREATE OR REPLACE FUNCTION {trigger_name}()
RETURNS trigger AS $$
DECLARE
id integer; or uuid
data json;
BEGIN
data = json 'null';
IF TG_OP = 'INSERT' THEN
id = NEW.id;
data = row_to_json(NEW);
ELSIF TG_OP = 'UPDATE' THEN
id = NEW.id;
data = json_build_object(
'old', row_to_json(OLD),
'new', row_to_json(NEW),
'diff', hstore_to_json(hstore(NEW) hstore(OLD))
);
ELSE
id = OLD.id;
data = row_to_json(OLD);
END IF;
PERFORM
pg_notify(
'{channel}',
json_build_object(
'table', TG_TABLE_NAME,
'id', id,
'type', TG_OP,
'data', data
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""" (8)
SQL_TABLE_UPDATE = """\
DROP TRIGGER IF EXISTS
{table}_notify_update ON {schema}.{table};
CREATE TRIGGER {table}_notify_update
AFTER UPDATE ON {schema}.{table}
FOR EACH ROW
EXECUTE PROCEDURE {trigger_name}();
""" (9)
SQL_TABLE_INSERT = """\
DROP TRIGGER IF EXISTS
{table}_notify_insert ON {schema}.{table};
CREATE TRIGGER {table}_notify_insert
AFTER INSERT ON {schema}.{table}
FOR EACH ROW
EXECUTE PROCEDURE {trigger_name}();
"""
SQL_TABLE_DELETE = """\
DROP TRIGGER IF EXISTS
{table}_notify_delete ON {schema}.{table};
CREATE TRIGGER {table}_notify_delete
AFTER DELETE ON {schema}.{table}
FOR EACH ROW
EXECUTE PROCEDURE {trigger_name}();
"""
-
(1) Данная функция требует
asyncpg
, хотя данный импорт используется только чтобы разрешить применениеConnection
в типе аннотаций. -
(2) Эта функция сопрограммы
create_notify_trigger()
создаст необходимую функцию триггера саму по себе в нашей базе данных. Такая функция триггера будет содержать необходимое название канала, в который будут отправляться обновления. Необходимый код для самой функуии содержится в идентификатореSQL_CREATE_TRIGGER
и он устанавливается как некая форматированная строка. -
(3) Вспоминаем из своего образца примера, что уведомления изменения содержат некий раздел "diff" в котором отображается само отличие между старыми и новыми данными. Мы применяем свойство
hstore
PostgreSQL для вычисления данного отличия. Оно представляет нечто близкое к семантике наборов. Требуемое расширениеhstore
не включено по умолчанию, поэтому мы разрешаем его здесь. -
(4) В данный шаблон подставляются желаемые названия триггера и канала, а затем он исполняется.
-
(5) Наша вторая функция,
add_table_triggers()
, подключит полученную функцию триггера к таким событиям таблицы как вставка, обновление и удаление. -
(6) Имеются три формата строк для каждого из трёх методов.
-
(7) Желаемый переменные подставляются в соответствующие шаблоны и после этого исполняются.
-
(8) Зтот SQL отнял у меня слегка больше времени чем ожидалось, прежде чем стал в точности правильным! Данная процедура PostgreSQL вызывается для вставки, обновления и удаления; тот способ, которым мы определяем что именно проверяется в соответствующей переменной
TG_OP
. Если данной операцией являетсяINSERT
, тогда будет определёнNEW
(аOLD
не будет задан). ДляDELETE
будет определёнOLD
, а неNEW
. ДляUPDATE
определены оба, что позволяет нам вычислить их разницу. Мы также можем воспользоваться встроенной поддержкой PostreSQL для JSON при помощи функцийrow_to_json()
иhstore_to_json()
: это означает, что наш обработчик обратного вызова получит допустимый JSON.Наконец, наш вызыов соответствующей функции
pg_notify()
именно то, что на самом деле отправляет необходимое событие. Все подписчики на{channel}
получат необходимое уведомление. -
(9) Это достаточно стандартныц код триггера: он просту устанавливает некий триггер для вызова какой- то определённой процедуры
{trigger_name}()
в случае возникновения определённого события, например,INSERT
илиUPDATE
.
Наверняка может иметься множество полезных приложений, которые можно построить на основе уведомлений, получаемых от PostgreSQL.
Пример 6-4. perf.py
# perf.py
import logging
from time import perf_counter
from inspect import iscoroutinefunction
logger = logging.getLogger('perf')
def aelapsed(corofn, caption=''): (1)
async def wrapper(*args, **kwargs):
t0 = perf_counter()
result = await corofn(*args, **kwargs)
delta = (perf_counter() t0) * 1e3
logger.info(
f'{caption} Elapsed: {delta:.2f} ms')
return result
return wrapper
def aprofiler(cls, bases, members): (2)
for k, v in members.items():
if iscoroutinefunction(v):
members[k] = aelapsed(v, k)
return type.__new__(type, cls, bases, members)