Глава 10. Параллельность и одновременность

Содержание

Глава 10. Параллельность и одновременность
Модели параллельности и одновременности
Структура процесса
Многопроцессный параллелизм
Ветвление процесса в POSIX
Многопроцессность в Windows
Пакет multiprocessing
Относящиеся к делу файлы
Порождение и ветвление процессов
Создание дочернего процесса
Конвейеризация данных дочернему процессу
Исполнение дочернего процесса
Обмен данными при помощи очередей и конвейеров
Очереди
Конвейеры
Семафоры
Разделяемые состояния между процессами
Пример приложения
Аннотация мультипроцессности
Многопоточность
GIL
Относящиеся к делу исходные файлы
Старт потоков в Python
Состояние потока
Потоки POSIX
Потоки Windows
Аннотация многопоточности
Асинхронное программирование
Генераторы
Структура генератора
Относящиеся к делу исходные файлы
Создание генераторов
Выполнение генераторов
Сопрограммы
Относящиеся к делу исходные файлы
Циклы событий
Пример
Асинхронные генераторы
Подчинённые интерпретаторы
Относящиеся к делу исходные файлы
Пример
Выводы

Самые первые компьютеры проектировались для выполнения одной вещи за раз. Бо́льшая часть их работы состояла в области вычислительной математики. Время шло, и компьютерам потребовалось обрабатывать входные данные из разнообразных источников, некоторые из которых настолько далеки, как расстояния галактик.

Последствие этого состоит в том, что компьютерные приложения теперь тратят большую часть времени впустую на ожидание для отклика, будь он из шины, входных данных, расположения в памяти, вычислениях, неком API или в удалённом ресурсе.

Другим движением вперёд в вычислениях был уход от однопользовательского терминала к многозадачным операционным системам. Приложениям требуется исполняться в фоновом режиме для ожидания откликов из сетевой среды и обработки входных данных, таких как перемещения курсора мыши.

Многозадачность была хорошо востребована до наступления современных многоядерных ЦПУ, поэтому операционная система долгое время была способна разделять совместные ресурсы между множеством процессов.

В ядре любой операционной системы заложена регистрация запущенных процессов. Всякий процесс обладает неким владельцем, а также он способен потреблять ресурсы, такие как оперативная память и ЦПУ. В нашей последней главе вы изучили выделение памяти.

Для некого ЦПУ соответствующий процесс запрашивает время ЦПУ в виде подлежащих исполнению операций. Он делает это распределяя время ЦПУ и планируя процессы по приоритетам:

 

Рисунок 10-1


Одновременная модель

Некому отдельному процессу может потребоваться выполнять более одной вещи за раз. Например, когда вы пользуетесь текстовым процессором, он проверяет правильность написание в то время как вы выполняете набор. Современные приложения осуществляют это выполняя множество потоков одновременно и обрабатывая их собственные ресурсы.

Одновременность (concurrency) это исключительное решение для ведения дел с многозадачностью, однако ЦПУ обладают своими собственными пределами. Некоторые высокопроизводительные компьютеры для растягивания задач применяют развёртывание либо множества ЦПУ, либо множества ядер. Операционные системы предоставляют некий способ планирования процессов по множеству ЦПУ:

 

Рисунок 10-2


Параллельная модель

Суммируя, компьютеры для обработки проблемы многозадачности применяют параллельность и одновременность:

  • Чтобы обладать параллельностью, вам требуется множество вычислиьельных элементов. Вычислительные элементы могут быть центральными процессорами ил ядрами.

  • Для получения одновременности, вам нужен некий способ планирования задач с тем, чтобы простой одной не блокировал имеющиеся ресурсы.

Многие части архитектуры CPhyton абстрагируют сложность операционных систем чтобы предоставить разработчикам простой API. Подход CPhyton к параллельности и одновременности не является исключением.

Модели параллельности и одновременности

CPython предлагает большое число подходов к параллельности и одновременности. Ваш выбор зависит от нескольких факторов. Между моделями, в которые вовлечён CPython также имеются перекрытия.

Вы можете обнаружить, что определённой задачи существует множество реализаций одновременности из которых предстоит выбор, причём каждая из них обладает своими за и против.

Существует четыре модели, которыми снабжается CPython:

Таблица 10-1. Модели одновременности и параллельности CPython
Подход Модуль Одновременность Параллельность

Обработка в потоках

threading

Да

Нет

Множество процессов

multiprocessing

Да

Да

Асинхронная обработка

asyncio

Да

Нет

Подчинённые интерпретаторы

subinterpreters

Да

Да

Структура процесса

Одна из основных задач операционной системы, такой как Windows, macOS или Linux является контроль над исполняемыми процессами. Эти процессы могут быть приложениями с пользовательским интерфейсом, такие как браузер или IDE. Это также могут быть фоновые процессы, такие как сетевые службы или службы операционной системы.

Для управления этими процессами соответствующая операционная система предоставляет некий API для старта нового процесса. Когда процесс создан, он регистрируется этой операционной системой с тем, чтобы она знала какие процессы выполняются. Процессам придаётся некий уникальный идентификатор (PID). В зависимости от самой операционной системы они могут обладать некоторыми прочими свойствами.

Процессы POSIX обладают минимальным набором свойств, регистрируемых в их операционной системе:

  • Управляющий терминал

  • Текущий рабочий каталог

  • Эффективный групповой идентификатор и эффективный идентификатор пользователя

  • Дескрипторы файлов и маска режима создания файла

  • Групповой идентификатор процесса и идентификатор процесса

  • Идентификатор реальной группы и идентификатор реального пользователя

  • Корневой каталог

Вы можете видеть эти атрибуты для запущенных процессов в macOS или Linux, выполняя команду ps.

[Совет]Смотри также

Стандарт IEEE POSIX (1003.1-2017) определяет собственно взаимодействие и стандартное поведение для процессов и потоков.

Windows обладает аналогичным перечнем свойств, однако устанавливает свой собственный стандарт. Файловые полномочия, структуры каталогов и реестр процессов Windows сильно отличается от POSIX.

Представленные Win32_Process процессы Windows могут запрашиваться в WMI, Windows Management Instrumentation (инструментарии управления Windows) времени выполнения, или при помощи Диспетчера задач (Task Manager).

После того как процесс стартовал в некой операционной системе, придаются:

Сам ЦПУ вашего компьютера также удерживает некоторые дополнительные сведения при выполнении такого процесса, например:

  • Некий регистр, содержащий текущую подлежащую исполнению инструкцию или иные прочие данные, необходимые данному процессу для данной инструкции

  • Указатель инструкций или счётчике программы, указывающий какая инструкция в данной последовательности программы подлежит исполнению

Процесс CPython содержит в себе скомпилированный интерпретатор CPython и скомпилированные модули. Эти модули загружаются во время выполнения и преобразуются в инструкции циклом вычислений CPython:

 

Рисунок 10-3



Указанные регистр программы и счётчик программы указывают на единственную инструкцию в этом процессе. Это означает, что за один раз может исполняться лишь одна инструкция. Для CPython это означает, что в некий заданный момент времени может исполняться лишь одна инструкция байтового кода Python.

Существует два основных подхода для допуска параллельного выполнения инструкций в неком процессе:

  1. Ответсвить (fork) другой процесс

  2. Породить (spawn) некий поток

Тепеь, когда вы прикинули что составляет некий процесс, вы можете изучить ветвление и порождение дочернего процесса.

Многопроцессный параллелизм

Системы POSIX предоставляют некий API для того чтобы всякий процесс мог ответвлять (fork) дочерний процесс. Ветвление процесса это API вызова нижнего уровня к операционной системе, который может делаться любым исполняемым процессом.

Когда такой вызов сделан, сама операционная система клонирует все имеющиеся атрибуты исполняемого в данный момент процесса и создаст новый процесс. Такая операция клонирования включает кучу, регистр и положение счётчика родительского процесса. В момент ветвления такой дочерний процесс способен считывать любые переменные из своего родительского процесса.

  Ветвление процесса в POSIX

В качестве некого образца воспользуемся примером приложения преобразования градусов по Фаренгейту в градусы по Цельсию., применявшееся в самом начале раздела Динамическое выделение памяти в C. Вы можете приспособить его для порождения некого дочернего процесса под каждое значение градуса по Фаренгейту вместо вместо вычисления их в последовательности при помощи fork(). Каждый дочерний процесс продолжит работу с этого момента, cpython-book-samples/33/thread_celsius.c:


#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

static const double five_ninths = 5.0/9.0;

double celsius(double fahrenheit){
    return (fahrenheit - 32) * five_ninths;
}

int main(int argc, char** argv) {
    if (argc != 2)
        return -1;
    int number = atoi(argv[1]);
    for (int i = 1 ; i <= number ; i++ ) {
        double f_value = 100 + (i*10);
        pid_t child = fork();
        if (child == 0) { // Is child process
            double c_value = celsius(f_value);
            printf("%f F is %f C (pid %d)\n", f_value, c_value, getpid());
            exit(0);
        }
    }
    printf("Spawned %d processes from %d\n", number, getpid());
    return 0;
}
 	   

Исполнение этой программы из командной строки предоставит вывод подобный следующему:


$ ./thread_celsius 4
110.000000 F is 43.333333 C (pid 57179)
120.000000 F is 48.888889 C (pid 57180)
Spawned 4 processes from 57178
130.000000 F is 54.444444 C (pid 57181)
140.000000 F is 60.000000 C (pid 57182)
		

Наш родительский процесс (57718) порождает четыре процесса. Для каждого дочернего процесса наша программа продолжается со строки child = fork(), где результирующим значением для child выступает 0. Она затем выполняет свои вычисления, выводит на печать полученное значение и выходит из своего процесса. Наконец, их родительский процесс выводит сколько процессов он породил и свой собственный PID.

Время, которое потребовалось для выполнения третьего и четвёртого процессов было длиннее чем потребовалось для завершения их родительскому процессу. Именно по этой причине родительский процесс вывел на печать окончательный результат до того как третий и четвёртый процессы вывели на печать свои собственные.

Родительский процесс может выйти со своим собственным кодом выхода до дочернего процесса. Дочерние процессы добавляются в некую группу процессов самой операционной системой, что облегчает контроль над взаимосвязанными процессами:

 

Рисунок 10-4



Самым крупным негативным последствием такого подхода к параллельности является то, что дочерний процесс является точной копией своего родительского процесса.

В случае с CPython это означает, что вам придётся иметь запущенными два интерпретатора CPython, причём обоим придётся загружать соответствующие модули и все библиотеки. Это создаёт значительные накладные расходы. Применение множества процессов становится существенным, когда накладные расходы ветвления процесса перевешиваются размером подлежащих исполнению задач.

Другим главным недостатком ветвления процессов является то, что они имеют раздельные, изолированные от своего родительского процесса кучи. Это означает, что такой дочерний процесс не способен выполнять запись пространство памяти своего родительского процесса.

Когда такой дочерний процесс создаётся, куча его предка становится доступной этому дочернему процессу. Для отправки сведений обратно своему предку должна применяться некая форма межпроцессного взаимодействия (IPC, interprocess communication).

[Замечание]Замечание

Модуль os предлагает некую обёртку вокруг fork().

  Многопроцессность в Windows

До сих пор вы знакомились с моделью POSIX. Windows не предоставляет эквивалента fork() и Python обязан (настолько хорошо, насколько это возможно), обладать одним и тем же API для Linux, macOS и Windows.

Чтобы преодолеть это, для порождения другого процесса python.exe с аргументом командной строки -c используется API CreateProcessW(). Этот этап носит название порождения (spawn) процесса и он также возможен в POSIX. На протяжении данной главы вы найдёте ссылку на это.

  Пакет multiprocessing

CPython предоставляет некий API поверх работы с ветвлением процессов в операционной системе, которое делает простым создания параллельности со множеством процессов в Python.

Этот API доступен из пакета multiprocessing, который предоставляет расширенные возможности для сбора в пулы процессов, очередей, ветвлений, создания куч разделяемой памяти, соединения процессов между собой и многое иное.

  Относящиеся к делу файлы

Вот те исходные файлы, которые относятся к многопроцессности:

Таблица 10-2. Относящиеся к множеству процессов исходные файлы
Файл Назначение

Lib/multiprocessing

Исходный код Python для пакета multiprocessing

Modules/_posixsubprocess.c

Модуль расширения C обёртывающий системный вызов fork()

Modules/_winapi.c

Модуль расширения C обёртывающий API ядра Windows

Modules/_multiprocessing

Модуль расширения C используемый пакетом multiprocessing

PC/msvcrtmodule.c

Интерфейс Python для библиотеки времени исполнения Microsoft Visual C

  Порождение и ветвление процессов

Пакет multiprocessing предлагает три метода для запуска новых параллельных процессов:

  1. Ветвление интерпретатора (только для POSIX)

  2. Порождение нового процесса интерпретатора (POSIX и Windows)

  3. Запуск некого сервера ветвлений, в котором создаётся новый процесс, который затем ответвляет любое число процессов (только POSIX)

[Замечание]Замечание

Для Windows и macOS, установленным по умолчанию методом является порождение. В Linux по умолчанию установлено ветвление. Вы можете перекрывать установленный по умолчанию метод при помощи multiprocessing.set_start_method().

API Python для запуска нового процесса, принимает вызываемую, target, и кортеж аргументов, args.

Воспользуемся этим примером порождения нового процесса для преобразования градусов Фаренгейта в градусы Цельсия, cpython-book-samples/33/spawn_process_celsius.py:


import multiprocessing as mp
import os

def to_celsius(f):
    c = (f - 32) * (5/9)
    pid = os.getpid()
    print(f"{f}F is {c}C (pid {pid})")

if __name__ == '__main__':
    mp.set_start_method('spawn')
    p = mp.Process(target=to_celsius, args=(110,))
    p.start()
 	   

Хотя вы можете запустить единственный процесс, API multiprocessing предполагает что вы желаете запустить множество. Существуют удобные методы порождения множества процессов и и наполнения их данными. Одним их таких методов является класс Pool.

Наш предыдущий пример может быть расширен для вычисления диапазона значений в отдельных интерпретаторах Python, cpython-book-samples/33/pool_process_celsius.py:


import multiprocessing as mp
import os

def to_celsius(f):
    c = (f - 32) * (5/9)
    pid = os.getpid()
    print(f"{f}F is {c}C (pid {pid})")

if __name__ == '__main__':
    mp.set_start_method('spawn')
    with mp.Pool(4) as pool:
        pool.map(to_celsius, range(110, 150, 10))
 	   

Обратите внимание, что наш вывод показывает тот же самый PID. По той причине что процесс интерпретатора CPython обладает значительными накладными расходами, Pool будет рассматривать всякий процесс в пуле неким исполнителем (worker). Если исполнитель выполнен, он будет применяться повторно.

Вы можете изменить это установив следующую замену эту строку:


with mp.Pool(4) as pool:
 	   

Изменяя её следующим кодом:


with mp.Pool(4, maxtasksperchild=1) as pool:
 	   

Теперь наш пример многозадачности будет выводить на печать нечто аналогичное такому:


$ python pool_process_celsius.py
110F is 43.333333333333336C (pid 5654)
120F is 48.88888888888889C (pid 5653)
130F is 54.44444444444445C (pid 5652)
140F is 60.0C (pid 5655)
		

Данный вывод отображает значения идентификаторов процессов вновь порождённых процессов и вычисленных значений.

  Создание дочернего процесса

Оба этих сценария создадут новый процесс интерпретатора Python и передадут ему данные при помощи pickle.

[Совет]Совет

Этот модуль pickle является пакетом упорядочивания, применяемого для преобразования преобразования в упорядоченную последовательность объектов Python. Для дополнительных сведений отправляем вас к книге Real Python The Python pickle Module: How to Persist Objects in Python.

Для систем POSIX создание соответствующего подчинённого процесса модуля multiprocessing является эквивалентом данной команды, где <i> это описатель файлового дескриптора, а <j> это описатель дескриптора конвейера:


$ python -c 'from multiprocessing.spawn import spawn_main; \
  spawn_main(tracker_fd=<i>, pipe_handle=<j>)' --multiprocessing-fork
		

Для систем Windows вместо файлового дескриптороа отслеживания применяется значение родительского PID, как в той команде, где <k> это значение родительского PID, а <j> это значение описания дескриптора конвейера:


> python.exe -c 'from multiprocessing.spawn import spawn_main; \
  spawn_main(parent_pid=<k>, pipe_handle=<j>)' --multiprocessing-fork
		

  Конвейеризация данных дочернему процессу

Когда этот новый дочерний процесс был установлен экземпляром в своей операционной системы, он будет дожидаться инициализации данных от своего родительского процесса.

Этот родительский процесс записывает в файловый поток конвейера два объекта. Такой файловый поток конвейера является особым потоком ввода/ вывода, применяемый для пересылки данных между процессами в командной строке.

Самый первый записанный родительским процессом объект это объект данных подготовки. Этот объект является словарём, содержащим некие сведения относительно своего родителя, такие как значение каталога исполнения, метод запуска, все специальные аргументы командной строки, а также sys.path.

Вы можете посмотреть некий пример того что выработано запуском multiprocessing.spawn.get_preparation_data(name):


>>> import multiprocessing.spawn
>>> import pprint
>>> pprint.pprint(multiprocessing.spawn.get_preparation_data("example"))
{'authkey': b'\x90\xaa_\x22[\x18\ri\xbcag]\x93\xfe\xf5\xe5@[wJ\x99p#\x00'
            b'\xce\xd4)1j.\xc3c',
 'dir': '/Users/anthonyshaw',
 'log_to_stderr': False,
 'name': 'example',
 'orig_dir': '/Users/anthonyshaw',
 'start_method': 'spawn',
 'sys_argv': [''],
 'sys_path': [
    '/Users/anthonyshaw',
    ]}
		

Вторым записываемым объектом выступает экземпляр дочернего класса BaseProcess. В зависимости от того как была вызвана многопроцессность и в какой операционной системе это применялось, один из дочерних классов BaseProcess будет соответствующим упорядоченным экземпляром.

Оба объекта данных подготовки и объект процесса упорядочены при помощи модуля pickle и записаны в поток конвейера своего родительского процесса:

 

Рисунок 10-5



[Замечание]Замечание

Реализация порождения дочернего процесса и упорядочение процесса POSIX расположены в Lib/multiprocessing/popen_spawn_posix.py.

Аналогичная реализация Windows находится в Lib/multiprocessing/popen_spawn_win32.py.

  Исполнение дочернего процесса

Точка входа дочернего процесса, multiprocessing.spawn.spawn_main(), получает значене аргумента pipe_handle, а также либо parent_pid для Windows, либо tracked_fd для POSIX:


def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
    '''
    Run code specified by data received over pipe
    '''
    assert is_forking(sys.argv), "Not forking"
 	   

Для Windows соответствующая функция вызовет API OpenProcess значения родительского PID. Это используется для создания файлового дескриптора, fd, конвейера своего родительского процесса:


    if sys.platform == 'win32':
        import msvcrt
        import _winapi

        if parent_pid is not None:
            source_process = _winapi.OpenProcess(
                _winapi.SYNCHRONIZE | _winapi.PROCESS_DUP_HANDLE,
                False, parent_pid)
        else:
            source_process = None
        new_handle = reduction.duplicate(pipe_handle,
                                         source_process=source_process)
        fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
        parent_sentinel = source_process
 	   

Для POSIX файловым дескриптором, fd, становится pipe_handle и он дублируется чтобы стать значением parent_sentinel:


    else:
        from . import resource_tracker
        resource_tracker._resource_tracker._fd = tracker_fd
        fd = pipe_handle
        parent_sentinel = os.dup(pipe_handle)
 	   

Затем вызывается _main() со значением файлового дескриптора родительского конвейера, fd, а также сигнальной меткой родительского процесса, parent_sentinel. Возвращаемое значение _main() становится кодом выхода для данного процесса и его интерпретатор прекращается:


exitcode = _main(fd, parent_sentinel)
sys.exit(exitcode)
 	   

_main() вызывается с fd и parent_sentinel для проверки того что из родительского процесса был выполнен выход в процессе исполненния его потомка.

_main() преобразует упорядоченные двоичные данные байтового потока fd в параллельные . Помните, это файловый дескриптор соответствующего конвейера. Этот процесс применяет ту же самую библиотеку pickle, которой пользуется его родительский процесс:

 

Рисунок 10-6



Самым первым значением является dict, содержащий соответствующие данные подгоовки. Вторым значением выступает некий экземпляр SpawnProcess, который затем применяется как некий экземпляр для _bootstrap() при:


def _main(fd, parent_sentinel):
    with os.fdopen(fd, 'rb', closefd=True) as from_parent:
        process.current_process()._inheriting = True
        try:
            preparation_data = reduction.pickle.load(from_parent)
            prepare(preparation_data)
            self = reduction.pickle.load(from_parent)
        finally:
            del process.current_process()._inheriting
    return self._bootstrap(parent_sentinel)
 	   

_bootstrap() обрабатывает конкретизацию экземпляра BaseProcess из превращённых в параллельные из последовательных данных, а затем вызывается целевая функция с соответствующими аргументами и аргументами ключевых слов. Эта завершающая задача выполняется BaseProcess.run():


    def run(self):
        '''
        Method to be run in subprocess; can be overridden in subclass
        '''
        if self._target:
            self._target(*self._args, **self._kwargs)
 	   

Код выхода, self._bootstrap() устанавливается как код выхода и этот дочерний процесс прекращается.

Такой процесс позволяет своему родительскому процессу упорядочить сам модуль и его исполняемую функцию. Это также позволяет его дочернему процессу разобрать обратно этот экземпляр, исполнение этой функции с аргументами и возврат.

Это не позволяет обмениваться данными после запуска соответствующего дочернего процесса. Такая задача выполняется с применением расширений Queue и Pipe объектов.

Когда процессы подлежат созданию в пуле, тогда самый первый процесс будет готов и будет пребывать состоянии ожидания. Его родительский процесс повторяет этот процесс и отправляет необходимые данные своему следующему исполнителю:

 

Рисунок 10-7



Следующий исполнитель получает свои данные и инициализирует своё состояние и запускает свою целевую функцию:

 

Рисунок 10-8



Для совместного использования данных вне инициализации следует применять очереди и конвейеры.

  Обмен данными при помощи очередей и конвейеров

В нашем предыдущем раздели вы видели как порождаются дочерние процессы и затем применяются конвейеры в качестве упорядоченных потоков для сообщения своему дочернему процессу какую функцию вызывать с аргументами.

Существуют два типа взаимодействия между процессами, в зависимости от природы своих задач: очередей и конвейеров. Прежде чем изучить каждое из них, вы быстро взглянете на то как операционные системы защищают доступ к ресурсам при помощи переменных, носящих название семафоров.

 

Семафоры

Многие механизмы при работе со множеством процессов применяют семафоры в качестве некого способа сигнализации о том что данный ресурс заблокирован, пребывает в состоянии ожидания или не применяется. Для блокирования ресурсов, подобных файлам, сокетов и прочего операционные системы пользуются в качестве простого типа бинарные семафоры.

Когда один процесс выполняет запись в некий файл или сетевой сокет, тогда вы не желаете чтобы другой процесс внезапно начал запись в тот же файл. Такие данные немедленно были бы разрушены.

Вместо этого операционные системы помещают на ресурс некое блокирование при помощи светофоров. Процессы Могут также сигнализировать что они ожидают высвобождения такой блокировки с тем, чтобы когда оно произойдёт, они получат сообщение о том, что они готовы и процессы могут приступать к их использованию.

В нашем реальном мире семафоры это метод сигналов, которые применяют флаги для передачи сообщений. Поэтому вы можете представлять сигналы семафора для состояний ожидания, блокировки и отсутствия использования ресурса подобным образом:

 

Рисунок 10-9



В операционных системах API семафора отличаются, поэтому имеется некий класс абстракции, multiprocessing.synchronize.Semaphore

Для многопроцессности применяются семафоры потому, что они поддерживают безопасность как потоков, так и процессов. Сама операционная система обрабатывает взаимные блокировки чтения или записи для одного и того же семафора.

Реализация такий функций API семафоров располагается в модуле расширения C Modules/_multiprocessing/semaphore.c. Этот модуль расширения предлагаеи метод сигналов для создания, блокирования и высвобождения семафоров с прочими операциями.

Соответствующий вызов к самой операционной системе выполняется последовательностью макросов, которые компилируются в различных реализациях в зависимости от своей операционной системы платформы.

Для Windows этот макрос пользуется API функциями <winbase.h> для семафоров:


#define SEM_CREATE(name, val, max) CreateSemaphore(NULL, val, max, NULL)
#define SEM_CLOSE(sem) (CloseHandle(sem) ? 0 : -1)
#define SEM_GETVALUE(sem, pval) _GetSemaphoreValue(sem, pval)
#define SEM_UNLINK(name) 0
 	   

Для POSIX соответствующий макрос применяет API <semaphore.h>:


#define SEM_CREATE(name, val, max) sem_open(name, O_CREAT | O_EXCL, 0600,...
#define SEM_CLOSE(sem) sem_close(sem)
#define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval)
#define SEM_UNLINK(name) sem_unlink(name)
 	   
 

Очереди

Очереди это великолепный способ отправки небольших данных от или к множеству процессов.

Вы можете приспособить наш более ранний пример со множеством процессов для применения экземпляра multiprocessing Manager() и создания двух очередей:

  1. inputs для удержания входных значений градусов по Фаренгейту

  2. outputs для удержания получаемых в результате значений градусов по Цельсию

Измените значение размера пула на 2, чтобы у вас было два исполнителя, cpython-book-samples/33/pool_queue_celsius.py:


import multiprocessing as mp

def to_celsius(input: mp.Queue, output: mp.Queue):
    f = input.get()
    # Time-consuming task ...
    c = (f - 32) * (5/9)
    output.put(c)

if __name__ == '__main__':
    mp.set_start_method('spawn')
    pool_manager = mp.Manager()
    with mp.Pool(2) as pool:
        inputs = pool_manager.Queue()
        outputs = pool_manager.Queue()
        input_values = list(range(110, 150, 10))
        for i in input_values:
            inputs.put(i)
            pool.apply(to_celsius, (inputs, outputs))

        for f in input_values:
            print(outputs.get(block=False))
 	   

Это выводит на печать список кортежей своей очереди outputs:


$ python pool_queue_celsius.py
43.333333333333336
48.88888888888889
54.44444444444445
60.0
		

Наш родительский процесс вначале помещает свои входные значения в очередь inputs. Самый первый исполнитель затем получает из этой очереди некий элемент. Всякий раз, когда некий элемент берётся из этой очереди при помощи .get(), в объекте этой очереди применяется блокирующий семафор:

 

Рисунок 10-10



Пока этот исолнитель занят, затем второй исполнитель берёт из этой очереди другое значение:

 

Рисунок 10-11



Первый исполнитель завершил свои вычисления и помещает полученные результаты в очередь outputs:

 

Рисунок 10-12



Две очереди применяются для разделения входных и выходных значений. В конечном счёте, обрабатываются все данные на входе и очередь outputs заполнена. Полученные значения затем выводятся на печать своим родительским процессом:

 

Рисунок 10-13



Этот пример показывает как некий пул исполнителей может получать очередь небольших, дискретных значений и обрабатывать их параллельно для отправки обратно полученных результатов в свой процесс хоста.

На практике, преобразование градусов по Цельсию из градусов по Фаренгейту это маленькое, тривиальное вычисление, не подходящее для параллельного выполнения. Когда процесс исполнителя выполнялся бы для иного, интенсивно использующего ЦПУ вычисления, это привело бы к значительному увеличению производительности в компьютере со множеством ЦПУ или ядер.

Для поточных данных вместо дискретных очередей вы можете пользоваться вместо этого конвейерами.

 

Конвейеры

Внутри пакета multiprocessing имеется тип Pipe. Установленный конвейер возвращает два соединения, родителя и потомка. Оба способны отправлять и получать данные:

 

Рисунок 10-14



В примере очереди, в саму очередь неявным образом помещалась некая блокировка при отправке или получении данных. Конвейеры не обладают таким поведением, поэтому вам надлежит быть аккуратными в том, чтобы два процесса не пытались выполнять запись в одном и том же конвейере в одно и то же время.

Чтобы приспособить наш последний пример на работу с конвейером, потребуется заменить pool.apply() на pool.apply_async(). Это изменяет наше выполнение следующего процесса в некую операцию без блокирования, cpython-book-samples/33/pool_pipe_celsius.py:


import multiprocessing as mp

def to_celsius(child_pipe: mp.Pipe):
    f = child_pipe.recv()
    # time-consuming task ...
    c = (f - 32) * (5/9)
    child_pipe.send(c)


if __name__ == '__main__':
    mp.set_start_method('spawn')
    pool_manager = mp.Manager()
    with mp.Pool(2) as pool:
        parent_pipe, child_pipe = mp.Pipe()
        results = []
        for input in range(110, 150, 10):
            parent_pipe.send(input)
            results.append(pool.apply_async(to_celsius, args=(child_pipe,)))
            print("Got {0:}".format(parent_pipe.recv()))
        parent_pipe.close()
        child_pipe.close()
 	   

Существует некий риск, что два или более процессов попытаются выполнить чтение из своего родительского конвейера одновременно в следующей строке:


f = child_pipe.recv()
 	   

Имеется также риск, что два или более процесса попробуют одновременно выполнить запись в свой дочерний конвейер:


child_pipe.send(c)
 	   

Если это произойдёт, данные будут разрушены либо в операции получения, либо в операции отправки:

 

Рисунок 10-15



Чтобы избежать этого, вы можете реализовать блокировку семафором операционной системы. Тогда все дочерние процессы будут проверять наличие блокирования перед чтением или записью в один и тот же конвейер.

Требуются две блокировки, одна на стороне получения родительского конвейера и другая на конце отправки дочернего конвейера, cpython-book-samples/33/pool_pipe_locks_celsius.py:


import multiprocessing as mp

def to_celsius(child_pipe: mp.Pipe, child_lock: mp.Lock):
    child_lock.acquire(blocking=False)
    try:
        f = child_pipe.recv()
    finally:
        child_lock.release()
    # time-consuming task ... release lock before processing
    c = (f - 32) * (5/9)
    # reacquire lock when done
    child_lock.acquire(blocking=False)
    try:
        child_pipe.send(c)
    finally:
        child_lock.release()

if __name__ == '__main__':
    mp.set_start_method('spawn')
    pool_manager = mp.Manager()
    with mp.Pool(2) as pool:
        parent_pipe, child_pipe = mp.Pipe()
        child_lock = pool_manager.Lock()
        results = []
        for i in range(110, 150, 10):
            parent_pipe.send(i)
            results.append(pool.apply_async(
                to_celsius, args=(child_pipe, child_lock)))
            print(parent_pipe.recv())
        parent_pipe.close()
        child_pipe.close()
 	   

Теперь соответствующий процесс исполнителя будет ожидать получения блокировки перед отправкой данных и снова дожидаться овладения другой блокировкой на отправку данных:

 

Рисунок 10-16



Этот пример подходит для тех ситуаций, когда проходящие по конвейеру данные велики, поскольку вероятность коллизий тут выше.

  Разделяемые состояния между процессами

До сих пор вы наблюдали как данные совместно используются процессами потомков и родителей.

Могут существовать ситуации, при которых вы пожелаете разделять данные между дочерними процессами. В таком случае пакет multiprocessing предоставляет два решения:

 

Пример приложения

В качестве демонстрационного примера на протяжении остающейся части этой главы, вы будете перестраивать некий сканер портов TCP для различных технологий одновременности и параллельности.

Поверх сетевой среды некий хост способен контактировать в портах, которые нумеруются с 1 до 65535. Распространённые службы обладают стандартными портами. Например, HTTP применяет порт 80, а HTTPS работает в 443. Сканеры портов TCP это распространённый инструмент тестирования для проверки того, что пакеты способны отправляться в некой сетевой среде.

Данный пример кода пользуется интерфейсом Queue, реализацию очереди в безопасном потоке, аналогичную той, что мы применяли в примерах со множеством процессов. Наш код также пользуется пакетом socket для пробы соединения к удалённому порту с коротким таймаутом в одну секунду.

check_port() обнаружит отклик от своего host по заданному port при его возникновении. Если это происходит, тогда check_port() добавит этот номер порта в очередь results.

При выполнении данного сценария check_port() вызывается последовательно для портов с номерами от 80 до 100. После его выполнения извлекается очередь results и полученные результаты выводятся на печать в командной строке. С тем чтобы вы могли сопоставлять отличия, здесь будет в самом конце выводиться на печать время исполнения, cpython-book-samples/33/portscanner.py:


from queue import Queue
import socket
import time
timeout = 1.0

def check_port(host: str, port: int, results: Queue):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    result = sock.connect_ex((host, port))
    if result == 0:
        results.put(port)
    sock.close()

if __name__ == '__main__':
    start = time.time()
    host = "localhost"  # Replace with a host you own
    results = Queue()
    for port in range(80, 100):
        check_port(host, port, results)
    while not results.empty():
        print("Port {0} is open".format(results.get()))
    print("Completed scan in {0} seconds".format(time.time() - start))
 	   

Наше выполнение выведет на печать полученные открытые порты и значение времени, которое ушло на это:


$ python portscanner.py
Port 80 is open
Completed scan in 19.623435020446777 seconds
		

Вы можете перестроить этот пример на использование множества процессов. Переставьте свой интерфейс Queue для multiprocessing.Queue и отсканируйте все порты вместе применяя исполнитель pool, cpython-book-samples/33/portscanner_mp_queue.py:


import multiprocessing as mp
import time
import socket

timeout = 1

def check_port(host: str, port: int, results: mp.Queue):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    result = sock.connect_ex((host, port))
    if result == 0:
        results.put(port)
    sock.close()

if __name__ == '__main__':
    start = time.time()
    processes = []
    scan_range = range(80, 100)
    host = "localhost"  # Replace with a host you own
    mp.set_start_method('spawn')
    pool_manager = mp.Manager()
    with mp.Pool(len(scan_range)) as pool:
        outputs = pool_manager.Queue()
        for port in scan_range:
            processes.append(pool.apply_async(check_port, 
                                              (host, port, outputs)))
        for process in processes:
            process.get()
        while not outputs.empty():
            print("Port {0} is open".format(outputs.get()))
        print("Completed scan in {0} seconds".format(time.time() - start))
 	   

Как вы могли ожидать, это приложение намного быстрее, потому как оно тестирует все порты параллельно:


$ python portscanner_mp_queue.py
Port 80 is open
Completed scan in 1.556523084640503 seconds
		

  Аннотация мультипроцессности

Работа со множеством процессов предлагает некий масштабируемый параллельный API для Python. Данные могут совместно применяться процессами, а интенсивно использующие ЦПУ исполнители могут разбиваться на параллельные задачи для получения преимуществ компьютеров со множеством ядер или множеством ЦПУ.

Обработка множеством процессов не подходящее решение когда ваша подлежащая выполнению задача ограничена вводом/ выводом больше чем интенсивным использованием ЦПУ. Например, когда вы расширяете на четыре исполнительных процесса чтения и запись в одни и те же файлы, затем один выполнял бы всю имеющуюся работу, а оставшиеся три ожидали бы высвобождения имеющегося блокирования.

Работа со множеством процессов также не подходит для варианта с задачами с коротким временем жизни по причине накладных расходов времени и обработки, требующихся для запуска нового интерпретатора Python.

В обоих подобных ситуациях вы можете обнаружить более удобными наши следующие подходы.

Многопоточность

CPython предоставляет для создания, порождения и управления потоков из Python API как верхнего уровня, так и нижнего уровня.

Чтобы разобраться с потоками Python, вам вначале следует понять как работают потоки операционной системы. В CPython имеются две реализации потоков:

  1. pthreads: потоки POSIX для Linux и macOS

  2. nt threads: потоки NT для Windows

В нашем разделе Структура процесса вы видели, что процесс обладает следующими функциональными возможностями:

  • Стеком подпрограмм

  • Кучей памяти

  • Доступом к файлам, блокировкам и сокетам своей операционной системы

Самым большим ограничением на масштабирование отдельного процесса является то, что его операционная система будет иметь для такого исполнения единственный программный счётчик.

Чтобы обойти это, современные операционные системы позволяют процессам сигнализировать своей операционной системе ветвить их исполнение по множеству потоков.

Каждый поток будет обладать своим собственным счётчиком программы, но применять те же самые ресурсы, что и его процесс хоста. Каждый поток также обладает своим собственным стеком вызовов, поэтому он способен исполнять различные функции.

Поскольку множество потоков способны выполнять чтение и запись в одном и том же пространстве памяти, могут возникать столкновения. Основным решением этого выступает безопасность потоков, которая вовлекает обеспечение уверенности что пространство памяти блокируется единственным потоком перед доступом к нему.

Некий отдельный процесс с тремя потоками будет обладать такой структурой:

 

Рисунок 10-17



[Совет]Смотри также

В качестве вводного руководства в API потоков Python обратитесь к книге Real Python Intro to Python Threading.

  GIL

Если вы знакомы с потоками NT или потоками POSIX в C, либо вы пользовались другим языком программирования верхнего уровня, тогда вы могли бы ожидать параллельности от множества потоков.

В CPython имеющиеся потоки основываются на API C, но являются потоками Python. Это означает,что всякий поток требует исполнения байтового кода Python через его цикл вычислений.

Имеющийся цикл вычислений Python не предоставляет безопасность потокам. Существует множество частей состояния интерпретатора, например, сборщик мусора, которые используются совместно и глобально. Чтобы обойти это, разработчики CPython реализовали некую мега- блокировку с названием GIL (global interpreter lock, глобальную блокировку интерпретатора). Прежде чем какой бы то ни было код выполняется в своём цикле вычисления кадра, имеющийся GIL блокируется его потоком. По окончанию выполнения конкретного кода операции, эта GIL высвобождается.

Хотя это и предоставляет глобальную сохранность потока для каждой операции в Python, такой подход обладает главным недостатком. Все операции, которые занимают продолжительное время исполнения, будут оставлять остальные потоки ждущими высвобождения GIL перед своим исполнением. Это означает, что в любой заданный момент времени только один поток способен исполнять некую операцию байтового кода.

Для овладения GIL выполняется вызов к take_gil(). Чтобы освободить её, вызывается drop_gil(). Овладение GIL выполняется внутри самого кадра ядра цикла вычисления, _PyEval_EvalFrameDefault().

Чтобы прекратить исполнение отдельным кадром от непрерывного удерживания имеющейся GIL, состояние цикла вычислений хранит некий флаг, gil_drop_request. После выполнения в неком кадре каждой операции байтового кода, проверяется этот флаг и GIL временно высвобождается перед повторным овладением ею:


    if (_Py_atomic_load_relaxed(&ceval->gil_drop_request)) {
        /* Give another thread a chance */
        if (_PyThreadState_Swap(&runtime->gilstate, NULL) != tstate) {
            Py_FatalError("ceval: tstate mix-up");
        }
        drop_gil(ceval, tstate);

        /* Other threads may run now */

        take_gil(ceval, tstate);

        /* Check if we should make a quick exit. */
        exit_thread_if_finalizing(tstate);

        if (_PyThreadState_Swap(&runtime->gilstate, tstate) != NULL) {
            Py_FatalError("ceval: orphan tstate");
        }
    }
...
 	   

Несмотря на те ограничения, которые налагает GIL на параллельное исполнение, она превращает многопоточность в Python в очень безопасное и идеальное для одновременного выполнения задач, связанных с вводом/ выводом.

  Относящиеся к делу исходные файлы

Вот те исходные файлы, которые относятся к потокам:

Таблица 10-3. Относящиеся к множеству процессов исходные файлы
Файл Назначение

Include/pythread.h

API и определение PyThread

Lib/threading.py

API построения потоков и модуль стандартной библиотеки верхнего уровня

Modules/_threadmodule.c

API построения потоков и модуль стандартной библиотеки нижнего уровня

Python/thread.c

Расширение C для модуля thread

Python/thread_nt.h

API построения модулей Windows

Python/thread_pthread.h

API построения модулей POSIX

Python/ceval_gil.h

Реализация блокировки GIL

  Старт потоков в Python

Чтобы продемонстрировать получаемый выигрыш производительности от обладания многопоточным кодом (несмотря на имеющуюся GIL), вы можете реализовать простое сканирование портов в Python.

Вы начнёте с клонирования нашего предыдущего сценария, но с заменой его логики на порождение некого потока для каждого порта с применением threading.Thread(). Это аналогично предыдущему API multiprocessing, где получались вызываемая, target, и кортеж, args.

Запускайте потоки внутри соответствующего цикла, однако не дожидайтесь их завершения. Вместо этого, добавляйте в конец некого списка, threads, соответствующий экземпляр потока:


    for port in range(80, 100):
        t = Thread(target=check_port, args=(host, port, results))
        t.start()
        threads.append(t)
 	   

После того как все потоки были созданы, выполнит итеративно проход по списку threads и вызывайте .join() чтобы дожидаться из выполнения:


    for t in threads:
        t.join()
 	   

Затем исчерпайте полностью все элементы из своей очереди results и выведите их на печать на экране:


    while not results.empty():
        print("Port {0} is open".format(results.get()))
 	   

Вот весь сценарий целиком, cpython-book-samples/33/portscanner_threads.py:


from threading import Thread
from queue import Queue
import socket
import time

timeout = 1.0

def check_port(host: str, port: int, results: Queue):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    result = sock.connect_ex((host, port))
    if result == 0:
        results.put(port)
    sock.close()

def main():
    start = time.time()
    host = "localhost"  # Replace with a host you own
    threads = []
    results = Queue()
    for port in range(80, 100):
        t = Thread(target=check_port, args=(host, port, results))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    while not results.empty():
        print("Port {0} is open".format(results.get()))
    print("Completed scan in {0} seconds".format(time.time() - start))

if __name__ == '__main__':
    main()
 	   

Когда вы вызовете этот сценарий с построением потоков из своей командной строки, он исполнится быстрее более чем в десять раз чем пример с единственным потоком:


$ python portscanner_threads.py
Port 80 is open
Completed scan in 1.0101029872894287 seconds
		

Он также исполняется на от 50 до 60 процентов быстрее чем пример со множеством процессов. Помните, что подход со множеством процессов обладает накладными расходами для запуска своих новых процессов. Потоки также обладают накладными расходами, однако они намного меньше.

Вы можете удивиться, Раз GIL означает что лишь одна операция способна выполняться за раз, тогда почему это может быть быстрее?

Вот предложение, которое отнимает 1 - 1000мс:


result = sock.connect_ex((host, port))
		

В модуле расширения C, Modules/socketmodule.c, эта функция реализует такое соединение, строка 3245 Modules/socketmodule.c:


static int
internal_connect(PySocketSockObject *s, struct sockaddr *addr, int addrlen,
                 int raise)
{
    int res, err, wait_connect;

    Py_BEGIN_ALLOW_THREADS
    res = connect(s-gt;sock_fd, addr, addrlen);
    Py_END_ALLOW_THREADS
		

Системный вызов connect() окружают макросы Py_BEGIN_ALLOW_THREADS и Py_END_ALLOW_THREADS. Эти макросы определены следующим образом в Include/ceval.h:


#define Py_BEGIN_ALLOW_THREADS { \
                        PyThreadState *_save; \
                        _save = PyEval_SaveThread();
#define Py_BLOCK_THREADS        PyEval_RestoreThread(_save);
#define Py_UNBLOCK_THREADS      _save = PyEval_SaveThread();
#define Py_END_ALLOW_THREADS    PyEval_RestoreThread(_save); \
                 }
		

Итак, при обращении к Py_BEGIN_ALLOW_THREADS, он вызывает PyEval_SaveThread(). Эта функция изменяет состояние текущего потока на NULL и отбрасывает (drops) GIL, строка 444 Python/ceval.c:


PyThreadState *
PyEval_SaveThread(void)
{
    PyThreadState *tstate = PyThreadState_Swap(NULL);
    if (tstate == NULL)
        Py_FatalError("PyEval_SaveThread: NULL tstate");
    assert(gil_created());
    drop_gil(tstate);
    return tstate;
}
		

Поскольку GIL отброшен, может продолжаться любой прочий выполняющийся поток. Этот поток будет покоиться и дожидаться системного вызова без блокирования своего цикла выполнения.

После успешного завершения connect() или выхода по таймауту, макро Py_END_ALLOW_THREADS исполняет PyEval_RestoreThread() с изначальным состоянием потока. Значение состояния потока восстанавливается и повторно берётся GIL. Вызов take_gil() это блокирующий вызов, дожидающийся семафора, строка 458 Python/ceval.c:


void
PyEval_RestoreThread(PyThreadState *tstate)
{
    if (tstate == NULL)
        Py_FatalError("PyEval_RestoreThread: NULL tstate");
    assert(gil_created());

    int err = errno;
    take_gil(tstate);
    /* _Py_Finalizing is protected by the GIL */
    if (_Py_IsFinalizing() && !_Py_CURRENTLY_FINALIZING(tstate)) {
        drop_gil(tstate);
        PyThread_exit_thread();
        Py_UNREACHABLE();
    }
    errno = err;

    PyThreadState_Swap(tstate);
}
		

Это не просто системный вызов, обёрнутый не блокирующей GIL парой Py_BEGIN_ALLOW_THREADS и Py_END_ALLOW_THREADS. Существует более трёх сотен его применений в стандартной библиотеке, включающих:

  • Выполнение запросв HTTP

  • Взаимодействие с локальным оборудованием

  • Шифрование данных

  • Чтение и запись файлов

  Состояние потока

CPython предоставляет свою собственную реализацию управления потоком. Поскольку потоками требуется исполнять байтовый код Python в своём цикле вычислений, запуск некого потока в CPython это не простое порождение некого потока операционной системы.

Потоки Python носят название PyThread. Вы кратко рассматривали их в Главе 8, Цикл расчёта.

Потоки Python исполняют код объектов и порождаются своим интерпретатором.

Чтобы напомнить:

  • CPython обладает единственной средой времени исполнения, которая обладает своим собственным состоянием времени исполнения.

  • CPython способен обладать одним или множеством интерпертаторов.

  • Интерпретатор обладает состоянием с названием состояние интерпетатора.

    Некий интерпретатор возьмёт объект кода и преобразует его в последовательность объектов кадров.

  • Некий интерпретатор обладает по крайней мере одним потоком и каждый поток обладает состоянием потока.

  • Объекты кадров исполняются в стеке, носящем название стека кадров.

  • CPython ссылается на переменные из стека значений.

  • Значение состояния интерпретатора содержит связанный список своих потоков.

Среда времени исполнения с единственным потоком, единственным интерпретатором будет обладать следующими состояниями:

 

Рисунок 10-18



Тип состояния соответствующего потока, PyThreadState, обладает более чем тридцатью свойствами, включая:

  • Уникальный идентификатор

  • Связный список с прочими состояниями потоков

  • Значение состояния породившего его интерпретатора

  • Значение исполняемого в данный момент кадра

  • Значение текущей глубины рекурсии

  • Не обязательные функции отслеживания

  • Обрабатываемая в данный момент исключительная ситуация

  • Все обрабатываемые в данный момент асинхронные исключительные ситуации

  • Стек возбуждённых исключительных ситуаций

  • Счётчик GIL

  • Счётчики асинхронного генератора

Как и подготовительные данные обработки множества процессов, потоки обладают неким состоянием запуска. Тем не менее, потоки совместно используют одно и то же пространство памяти, поэтому нет необходимости упорядочения данных и отправки их через некий файловый поток.

Потоки конкретизируются при помощи своего типа threading.Thread . Это модуль верхнего уровня, который абстрагирует тип PyThread. Экземпляры PyThread управляются модулем расширения C _thread.

Этот модуль _thread обладает точкой входа для запуска нового потока, thread_PyThread_start_new_thread(). start_new_thread() это метод в экземпляре с типом Thread.

Новые потоки конкретизируются в такой последовательности:

  1. Создаётся bootstate, связывается со своей target, причём с аргументами args и kwargs.

  2. Значение bootstate привязывается к состоянию его интерпретатора.

  3. Создаётся новое PyThreadState, которое привязывается к своему текущему интерпретатору.

  4. Включается его GIL, если она ещё не разрешена, при помощи вызова к PyEval_InitThreads().

  5. Полученный новый поток запускается в зависимой от своей операционной системы реализации PyThread_start_new_thread.

 

Рисунок 10-19



Значение bootstate потока обладает следующими свойствами:

Таблица 10-4. Свойства bootstate потока
Поле Тип Назначение

interp

PyInterpreterState*

Ссылка на управляющий этим потоком интерпретатор

func

PyObject * (callable)

Ссылка на свой callable для исполнения в процессе исполнения этого потока

args

PyObject * (tuple)

Аргументы для вызова с ними func

keyw

PyObject * (dict)

Аргументы ключевых слов для вызова с ними func

tstate

PyThreadState *

Состояние потока для этого нового потока

Для bootstate соответствующего потока имеются две реализации PyThread:

  1. Потоки POSIX для Linux и macOS

  2. Потоки NT для Windows

Обе эти реализации создают необходимый поток операционной системы, устанавливают его атрибуты и затем выполняют соответствующий обратный вызов t_bootstrap() изнутри вновь созданного потока.

Эта функция вызывается с единственным аргументом, boot_raw, назначаемом bootstate, построенном в thread_PyThread_start_new_thread().

Эта функция t_bootstrap() выступает интерфейсом между потоком нижнего уровня и средой времени исполнения Python. Такой начальный самораскрутчик проинициализирует свой поток, а затем исполнит вызываемый target при помощи PyObject_Call().

После исполнения свей вызываемой цели соответствующий поток выполняет выход:

 

Рисунок 10-20



  Потоки POSIX

Потоки POSIX, носящие название pthreads, обладают реализацией в Python/thread_pthread.h. Эта реализация абстрагирует API C <pthread.h> при помощи некоторых дополнительных мер безопасности и оптимизаций.

Потоки способны обладать неким настраиваемым размером стека. Python обладает своей собственной конструкцией кадра стека, который вы изучили в Главе 8, Цикл расчёта. Когда существует некая проблема, вызываемая каким- то рекурсивным циклом, а сам исполняемый кадр достигает значения предела глубины, тогда Python возбуждает RecursionError, которую вы способны обрабатывать при помощи блока try...except в коде Python.

Поскольку pthreads обладают своим собственным размером стека, значение максимальной глубины размеров стека Python и pthread могут вступать в конфликт. Когда значение размера стека потока меньше чем величина максимальной глубины кадра в Python, тогда весь процесс Python целиком может испытать крушение перед возбуждением некой RecursionError.

Во время исполнения значение максимальной глубины в Python можно настраивать при помощи sys.setrecursionlimit(). Во избежание крушений реализация pthread CPython устанавливает величину размера стеа в значение pythread_stacksize состояния своего интерпретатора.

Большинство современных совместимых с POSIX операционных систем поддерживают системное планирование pthreads. Если PTHREAD_SYSTEM_SCHED_SUPPORTED определяется в pyconfig.h, тогда значение pthread устанавливается в PTHREAD_SCOPE_SYSTEM, что означает, что значение приоритета этого потока в планировщике операционной системы принимается на основе сопоставления с прочими потоками в этой системе, а не только с потоками внутри самого процесса Python.

После того как были настроены свойства потоков, при помощи API pthread_create() создаётся необходимый поток. Это запускает функцию самораскрутки изнутри такого нового потока.

Позднее, соответствующий обработчик потока, pthread_t, выполняет выравнивание на unsigned long возвращается чтобы превратиться в надлежащий идентификатор потока.

  Потоки Windows

Потоки Windows реализуются в Python/thread_nt.h, следуя аналогичным, но более простым шаблоном.

Значение размера стека своего нового потока настраивается на значение интерпретатора pythread_stacksize (когда оно установлено). Этот поток затем создаётся при помощи API Windows _beginthreadex() с применением его функции самораскрутки в качестве обратного вызова. Наконец, возвращается значение идентификатора потока.

  Аннотация многопоточности

Это не является исчерпывающим руководством по потокам Pytho. Реализация потока Python обширна и предлагает множество механизмов для совместного использования данных между потоками, блокировок объектов и ресурсов.

Потоки это великолепный, действенный способ улучшения времени выполнения вашего приложения Python когда вы ограничены операциями ввода/ вывода. В этом разделе вы рассмотрели чем является GIL, зачем она существует и какие части общей стандартной библиотеки могут выводиться из- под её ограничений.

Асинхронное программирование

Python предлагает множество способов одновременного программирования без применения потоков и множества процессов. Эти функциональные возможности добавлялись, расширялись и часто заменялись лучшими альтернативами.

Для нашей текущей версии в этой книге, 3.9, считается устаревшим декоратор @coroutine.

Всё ещё доступны следующие системы:

  • Создание фьючерсов из ключевых слов async

  • Запуск сопрограмм при помощи ключевых слов yield from

Генераторы

Генераторы Python это возвращающие предложение yield функции и способные постоянно вызываться для выработки последующих значений.

Генераторы зачастую применяются как более действенный в отношении памяти способ зацикливания по значениям в большом блоке данных, таком как некий файл, база данных или при работе с сетевой средой. Объекты генератора возвращаются на месте значения при использовании yield вместо return. Такой объект генератора создаётся из предложения yield и возвращается вызывавшей его стороне.

Эта простая функция генератора даст в результате буквы с a по z, cpython-book-samples/33/letter_generator.py:


def letters():
    i = 97  # Letter 'a' in ASCII
    end = 97 + 26  # Letter 'z' in ASCII
    while i < end:
        yield chr(i)
        i += 1
 	   

Если вы вызовете letters(), тогда она не возвратит вам некое значение. Вместо этого она будет возвращать некий объект генератора:


>>> from letter_generator import letters
>>> letters()
<generator object letters at 0x1004d39b0> 
		

В имеющийся синтаксис предложения for встроена возможность перебирать объект генератора пока он не прекратит выдавать значения:


>>> for letter in letters():
...     print(letter)
a 
b 
c 
d
... 
		

Эта реализация использует протокол самого итератора. Объекты, обладающие методом __next__() могут применяться в циклах for и while, либо при помощи встроенного next().

Все типы контейнеров (такие как списки, наборы и кортежи) в Python реализуют этот протокол итератора. Генераторы уникальны, поскольку сама реализация его метода __next__() повторно вызывает эту функцию генератора из её последнего состояния.

Генераторы не исполняются в своём фоновом режиме - они приостанавливаются. Когда вы запрашиваете другое значение, они восстанавливают исполнение. Внутри структуры самого объекта генератора имеется соответствующий объект кадра в том виде, каким он был в самом последнем предложении yield.

  Структура генератора

Объекты генератора создаются шаблонным макро, _PyGenObject_HEAD(prefix).

Этот макро применяется со следующими типами префиксов:

  • Объекты генератора: PyGenObject(gi_)

  • Объекты сопрограммы: PyCoroObject(cr_)

  • Объекты асинхронного генератора: PyAsyncGenObject(ag_)

Позднее в этой главе вы рассмотрите объекты сопрограмм и асинхронного генератора.

Сам тип PyGenObject обладает такими базовыми свойствами:

Таблица 10-5. Базовые свойства PyGenObject
Поле Тип Назначение

[x]_code

PyObject * (PyCodeObject*)

Скомпилированная функция, которую выдаёт этот генератор

[x]_exc_state

_PyErr_StackItem

Сведения исключительной ситуации если вызов этого генератора возбуждает исключительную ситуацию

[x]_frame

PyFrameObject*

Текущий объект кадра для данного генератора

[x]_name

PyObject * (str)

Название этого генератора

[x]_qualname

PyObject * (str)

Полностью определённое имя этого генератора

[x]_running

char

Устанавливается в 0 или в 1, когда этот генератор в текущий момент исполняется

[x]_weakreflist

PyObject * (list)

Список слабых ссылок на объекты внутри функции этого генератора

Поверх этих базовых свойств, PyCoroObject обладает таким свойством:

Таблица 10-6. Дополнительное свойство PyCoroObject
Поле Тип Назначение

cr_origin

PyObject * (tuple)

Кортеж, содержащий свой исходный кадр и вызывающую сторону

Поверх имеющихся базовых свойств, PyAsyncGenObject обладает такими свойствами:

Таблица 10-7. Дополнительные свойства PyAsyncGenObject
Поле Тип Назначение

ag_closed

int

Флаг маркировки того, что этот генератор закрыт

ag_finalizer

PyObject *

Ссылка на финализирующий метод

ag_hooks_inited

int

Флаг маркировки того, что соответствующие специальные точки входа проинициализированы

ag_running_async

int

Флаг маркировки того, что этот генератор исполняется

  Относящиеся к делу исходные файлы

Вот те исходные файлы, которые относятся к генераторам:

Таблица 10-8. Относящиеся к генераторам исходные файлы
Файл Назначение

Include/genobject.h

API генератора и определение PyGenObject

Objects/genobject.c

Реализация объекта генератора

  Создание генераторов

Когда компилируется функция, содержащая предложение yield, результирующий объект кода обладает неким дополнительным флагом, CO_GENERATOR.

В разделе Построение объектов кадров Главы 8, Цикл расчёта вы изучали как компилированный объект кода преобразуется в объект кадра при его исполнении. В этом процессе имеется специальный вариант для генераторов, сопрограмм и асинхронных генераторов.

_PyEval_EvalCode() проверяет соответствующий объект кода на флаги CO_GENERATOR, CO_COROUTINE и CO_ASYNC_GENERATOR. Когда он обнаруживает один из этих флагов, тогда вместо вычисления этого объекта кода в реальном времени, его функция создаёт некий кадр и возвращает его в объект генератора, сопрограммы или асинхронного генератора применяя, соответственно, PyGen_NewWithQualName(), PyCoro_New() или PyAsyncGen_New():


PyObject *
_PyEval_EvalCode(PyObject *_co, PyObject *globals, PyObject *locals, ...
...
    /* Handle generator/coroutine/asynchronous generator */
    if (co->co_flags & (CO_GENERATOR | CO_COROUTINE | CO_ASYNC_GENERATOR)) {
        PyObject *gen;
        PyObject *coro_wrapper = tstate->coroutine_wrapper;
        int is_coro = co->co_flags & CO_COROUTINE;
        ...
        /* Create a new generator that owns the ready-to-run frame
         * and return that as the value. */
        if (is_coro) {
>>>         gen = PyCoro_New(f, name, qualname);
        } else if (co->co_flags & CO_ASYNC_GENERATOR) {
>>>         gen = PyAsyncGen_New(f, name, qualname);
        } else {
>>>         gen = PyGen_NewWithQualName(f, name, qualname);
        }
        ...
        return gen;
    }
...
 	   

Имеющаяся фабрика генератора, PyGen_NewWithQualName(), получает соответствующий кадр и выполняет некие шаги по заполнению полей объекта этого генератора:

  • Устанавливает свойство gi_code в значение скомпилированного объекта кода

  • Устанавливает этот генератор как не исполняющийся (gi_running = 0)

  • Устанавливает списки исключительных ситуаций и слабых ссылок в NULL

Вы можете рассмотреть этот скомпилированный объект кода gi_code для своей функции генератора импортировав модуль dis и дизассемблировав полученный внутри байтовый код:


>>> from letter_generator import letters
>>> gen = letters()
>>> import dis
>>> dis.disco(gen.gi_code)
  2           0 LOAD_CONST               1 (97)
              2 STORE_FAST               0 (i)
...
		

В Главе 8, Цикл расчёта вы изучили Тип объекта кадра. Объекты кадра содержат локальные и глобальные элементы, самые последние исполненные инструкции и подлежащий выполнению код.

Встроенное поведение и состояние объектов кадра делает возможным для генераторов приостанавливаться и возобновляться по запросу.

  Выполнение генераторов

Всякий раз когда в неком объекте генератора вызывается __next__(), с этим экземпляром генератора вызывается gen_iternext(), который немедленно вызывает gen_send_ex() внутри Objects/genobject.c.

gen_send_ex() это функция, которая преобразует объект генератора в выдаваемый следующим результат. Вы обнаружите много аналогичного с тем способом, которым конструируются кадры из объекта кода, поскольку эти функции обладают схожими задачами.

gen_send_ex() совместно применяется с генераторами, сопрограммами и асинхронными генераторами и обладает следующими этапами:

  1. Выполняется выборка текущего состояния потока.

  2. Выполняется выборка необходимого объекта кадра из данного объекта генератора.

  3. Если этот генератор выполняется при вызове __next__(), тогда возбуждается ValueError.

  4. Когда этот кадр внутри самого генератора расположен в верху стека:

    • Если это сопрограмма и эта сопрограмма уже не помечена как закрытая, тогда возбуждается RuntimeError.

    • Если это некий асинхронный генератор, возбуждается StopAsyncIteration.

    • Если это стандартный генератор, возбуждается StopIteration.

  5. Когда самая последняя инструкция в этом кадре (f->f_lasti) всё ещё -1 по причине того что она только что была запущена, и если это сопрограмма или асинхронный генератор, тогда любое значение, за исключением None не может быть передано в качестве некого аргумента и возбуждается некая исключительная ситуация.

  6. В противном случае это самый первый случай её вызова и аргументы доступны. Значение этого аргумента выдаётся в стек значений этого кадра.

  7. Поле f_back данного кадра это вызывающая сторона, которой возвращаются отправленные значения, поэтому оно устанавливается в значение текущего кадра в этом потоке. Это означает, что такое возвращаемое отправляется вызываемой стороне, не самому создателю этого генератора.

  8. Данный генератор помечается как исполняемый.

  9. Самая последняя исключительная ситуация из имеющихся сведений исключительных ситуаций генератора копируется из самой последней исключительной ситуации состояния его потока.

  10. Сведения об исключительной ситуации состояния этого потока устанавливаются в адрес сведений исключительной ситуации данного генератора. Это означает, что если вызывающая сторона входит в некую точку прерывания в пределах исполнения некого генератора, тогда отслеживание стека проходит через данный генератор и проблемный код очищается.

  11. Сам код внутри этого генератора выполняется внутри основного цикла выполнения Python/ceval.c и возвращается полученное значение.

  12. Сведения о последней исключительной ситуации состояния потока сбрасываются в значение до вызова этого кадра.

  13. Данный генератор помечается как не выполняющийся.

  14. Затем следующие варианты ставят в соответствие возвращаемое значение и все исключительные ситуации перемещаются вызовом в сам генератор. Помните, что генераторы должны возбуждать по своему исчерпанию StopIteration, либо вручную, любо не выдавая некое значение:

    • Если из этого кадра не возвращается никакое значение, тогда для генераторов возбуждается StopIteration и StopAsyncIteration возбуждается для асинхронных генераторов.

    • Если StopIteration было возбуждено в явном виде, но это сопрограмма или асинхронный генератор, тогда возбуждается RuntimeError, поскольку это недопустимо.

    • Если было возбуждено StopAsyncIteration и это некий асинхронный генератор, тогда возбуждается RuntimeError, поскольку это недопустимо.

  15. В конце концов полученный результат возвращается обратно вызывающей стороне __next__().

Собрав всё это воедино, вы можете увидеть что выражение генератора это мощный синтаксис, в котором единственное ключевое слово, yield, включает целый поток создания уникального объекта, копирование некого скомпилированного объекта кода в качестве свойства, установки кадра и сохранения списка переменных в его локальной области действия.

Сопрограммы

Генераторы обладают основным ограничением: они способны выдавать значения только своей непосредственно вызывающей стороне.

Для обхода этого ограничения в Python был добавлен некий дополнительный синтаксис, предложение yield from. Применяя такой синтаксис вы можете повторно собирать генераторы в функции утилит и затем yield from их.

Например, ваш генератор букв можно повторно собрать в некую функцию утилиты, в которой значением начальной буквы выступает некий аргумент. Применяя yield from вы можете выбирать какой объект генератора возвращать, cpython-book-samples/33/letter_coroutines.py:


def gen_letters(start, x):
    i = start
    end = start + x
    while i < end:
        yield chr(i)
        i += 1

def letters(upper):
    if upper:
        yield from gen_letters(65, 26)  # A--Z
    else:
        yield from gen_letters(97, 26)  # a--z

for letter in letters(False):
    # Lowercase a--z
    print(letter)

for letter in letters(True):
    # Uppercase A--Z
    print(letter)
 	   

Генераторы также отлично походят для отложенных последовательностей, в которых они могут вызываться множество раз.

Опираясь на такое поведение генераторов как их способность приостанавливать и возобновлять исполнение, во множестве API Python вновь и вновь появлялось понятие сопрограммы.

Генераторы являются ограниченной формой сопрограммы, потому как вы можете отправлять данные им при помощи метода .send(). Имеется возможность отправки сообщений в двух направлениях между вызывающей стороной и её целью. Сопрограммы также запоминают вызывающую сторону в своём атрибуте cr_origin.

Сопрограммы изначально были доступны через некий декоратор, но с тех пор это было признано устаревшим в пользу "естественных" сопрограмм, применяющих ключевые слова async и await.

Чтобы пометить, что некая функция возвращает какую- то сопрограмму, вы обязаны предварить эту функцию ключевым словом async. Такое ключевое слово async в явном виде указывает на то, что в отличие от генераторов, данная функция возвращает некую сопрограмму вместо какого- то значения.

Для создания сопрограммы, вы определяете некую функцию ключевым словом async def. В данном примере вы добавляете таймер при помощи функции asyncio.sleep() и возвращаете некую строку после пробуждения:


>>> import asyncio
>>> async def sleepy_alarm(time):
...    await asyncio.sleep(time)
...    return "wake up!"
>>> alarm = sleepy_alarm(10)
>>> alarm
<coroutine object sleepy_alarm at 0x1041de340> 
		

Когда вы вызовете эту функцию, она возвратит некий объект сопрограммы.

Существует множество способов исполнения сопрограммы. Самый простой состоит в применении asyncio.run(coro). Исполнив asyncio.run() со своим объектом сопрограммы, далее, после 10 секунд, вы услышите будильник:


>>> asyncio.run(alarm)
'wake up'
		

Основное преимущество сопрограмм состоит в том, что можете исполнять их одновременно. По той причине, что сам объект сопрограммы это некая переменная, которую вы способны передавать в некую функцию, эти объекты можно связывать воедино и в цепочки, либо создавать некую последовательность.

К примеру, если вы желаете получить десять предупреждений с различными интервалами, а запустить их одновременно, тогда могли бы преобразовать эти объекты сопрограмм в задачи.

API задачи применяется для планирования и исполнения множества сопрограмм одновременно. Перед планированием задач надлежит запустить некий цикл событий. Основным заданием этого цикла событий является планирование одновременных задач и соединение событий, таких как окончание (completion), отмена (cancellation) и исключительные ситуации (exceptions), с обратными вызовами.

Когда вы вызываете asyncio.run() (из Lib/asyncio/runners.py), эта функция выполняет для вас такие задачи:

  1. Запускает новый цикл событий.

  2. Обёртывает соответствующий объект сопрограммы в некую задачу.

  3. Устанавливает некий обратный вызов по окончанию этой задачи.

  4. Выполняет цикл по всем задачам пока они не завершатся.

  5. Возвращает полученный результат.

  Относящиеся к делу исходные файлы

Вот тот исходный файл, который относится к сопрограммам:

Таблица 10-9. Относящийся к сопрограммам исходный файл
Файл Назначение

Lib/asyncio

Реализация стандартной библиотеки Python для asyncio

  Циклы событий

Циклы событий выступают тем клеем, который совместно удерживает асинхронный код. Написанные на чистом Python, циклы событий это содержащие задачи объекты.

Все имеющиеся в этом цикле задачи могут обладать обратными вызовами. Данный цикл будет запускать такие обратные вызовы в случае окончаний или отказов задачи:


loop = asyncio.new_event_loop()
 	   

Внутри некого цикла имеется последовательность задач, представленных типом asyncio.Task. Задачи планируются в неком цикле, а затем, когда этот цикл запущен, он обходит циклом все эти задачи до тех пор, пока они не окончатся.

Вы можете преобразовать отдельный таймер в некий цикл задач, cpython-book-samples/33/sleepy_alarm.py:


import asyncio

async def sleepy_alarm(person, time):
   await asyncio.sleep(time)
   print(f"{person} -- wake up!")

async def wake_up_gang():
    tasks = [
        asyncio.create_task(sleepy_alarm("Bob", 3), name="wake up Bob"),
        asyncio.create_task(sleepy_alarm("Yudi", 4), name="wake up Yudi"),
        asyncio.create_task(sleepy_alarm("Doris", 2), name="wake up Doris"),
        asyncio.create_task(sleepy_alarm("Kim", 5), name="wake up Kim")
    ]
    await asyncio.gather(*tasks)

asyncio.run(wake_up_gang())
 	   

Это выдаст на печать следующий вывод:


Doris -- wake up!
Bob -- wake up!
Yudi -- wake up!
Kim -- wake up!
		

Запущенный цикл событий исполняется по всем своим сопрограммам наблюдая их окончание. Аналогично тому как ключевое слово yield способно возвращать множество значений из одного и того же кадра, ключевое слово await может возвращать множество состояний.

Этот цикл событий будет выполнять имеющиеся объекты сопрограмм sleepy_alarm() снова и снова до тех пор, пока их await asyncio.sleep() не выдаст выполненный результат и не будет способен выполниться print().

Для этой работы вам надлежит воспользоваться asyncio.sleep() вместо блокирующего (и не осведомлённого об асинхронности) time.sleep().

  Пример

Вы можете преобразовать пример сканирования портов со множеством процессов в asyncio следующими шагами:

  • Заменить check_port() на применение соединения с сокетом из asyncio.open_connection(), что создаст некий фьючерс вместо какого- то немедленного подключения.

  • Воспользоваться этим фьючерсом подключения к сокету по событию таймера при помощи asyncio.open_connection().

  • В случае успеха добавить полученный порт в общий список результатов.

  • Добавить новую функцию, scan(), чтобы создавать сопрограммы check_port() для каждого порта и добавлять их в список tasks.

  • Слить все полученные tasks в некую новую сопрограмму при помощи asyncio.gather().

  • Запустить своё сканирование при помощи asyncio.run().

Вот этот код, cpython-book-samples/33/portscanner_async.py:


import time
import asyncio

timeout = 1.0

async def check_port(host: str, port: int, results: list):
    try:
        future = asyncio.open_connection(host=host, port=port)
        r, w = await asyncio.wait_for(future, timeout=timeout)
        results.append(port)
        w.close()
    except OSError:  # pass on port closure
        pass
    except asyncio.TimeoutError:
        pass  # Port is closed, skip and continue

async def scan(start, end, host):
    tasks = []
    results = []
    for port in range(start, end):
        tasks.append(check_port(host, port, results))
    await asyncio.gather(*tasks)
    return results

if __name__ == '__main__':
    start = time.time()
    host = "localhost"  # Pick a host you own
    results = asyncio.run(scan(80, 100, host))
    for result in results:
        print("Port {0} is open".format(result))
    print("Completed scan in {0} seconds".format(time.time() - start))
 	   

Данное сканирование оканчивается чуть больше чем за секунду:


$ python portscanner_async.py
Port 80 is open
Completed scan in 1.0058400630950928 seconds
		

Асинхронные генераторы

Те понятия, которые вы изучали до сих пор, генераторы и сопрограммы, могут быть скомбинированы в асинхронные генераторы.

Когда некая функция объявляется с ключевым словом async и содержит предложение yield, тогда она при вызове преобразуется в объект асинхронного генератора.

Как и генераторы, асинхронные генераторы должны исполнятся чем- то, что понимает протоколы. На месте __next__(), асинхронные генераторы обладают методом __anext__().

Обычный цикл for не понимает асинхронный генератор, поэтому вместо него пользуйтесь предложением async for.

Вы можете переделать check_port() в некий асинхронный генератор, который выдаёт значение следующего открытого порта пока он не столкнётся с самым последним портом или не обнаружит предписанное число открытых портов:


async def check_ports(host: str, start: int, end: int, max=10):
    found = 0
    for port in range(start, end):
        try:
            future = asyncio.open_connection(host=host, port=port)
            r, w = await asyncio.wait_for(future, timeout=timeout)
            yield port
            found += 1
            w.close()
            if found >= max:
                return
        except asyncio.TimeoutError:
            pass # Closed
 	   

Для выполнения этого воспользуйтесь предложением async for:


async def scan(start, end, host):
    results = []
    async for port in check_ports(host, start, end, max=1):
        results.append(port)
    return results
 	   

Весь пример полностью вы можете найти в cpython-book-samples/33/portscanner_async_generators.py.

Подчинённые интерпретаторы

На данный момент вы рассмотрели:

  • Параллельное исполнение с применением множества процессов

  • Одновременное выполнение при помощи потоков и асинхронности

Негативной стороной взаимодействия множества процессов выступает то, что применяемые межпроцессным взаимодействием конвейеры и очереди медленнее совместно используемой памяти, а также значительны накладные расходы на запуск нового процесса.

Потоки и асинхронное взаимодействие обладают малыми накладными расходами, но не предлагают реального параллельного исполнения по причине гарантий безопасности потока в своей GIL.

Четвёртым вариантом является subinterpreters, который обладает меньшими накладными расходами, нежели multiprocessing и делает возможным некий GIL для каждого подчинённого интерпретатора. В конце концов, это глобальная блокировка интерпретатора.

Внутри среды времени исполнения CPython всегда имеется один интерпретатор. Этот интерпретатор удерживает значение состояние интерпретатора, а также внутри интерпретатора у вас есть возможность обладать одним или более потоками Python. Такой интерпретатор выступает надлежащим контейнером для своего цикла вычислений. Он также управляет своей собственной памятью, счётчиком ссылок и сборкой мусора.

CPython обладает API C нижнего уровня для создания интерпретаторов, например, Py_NewInterpreter():

 

Рисунок 10-21



[Замечание]Замечание

Модуль всё ещё экспериментальный в 3.9, а потому его API выступает предметом изменений и его реализация всё ещё содержит большое число ошибок.

По той причине, что значение состояния интерпретатора содержит распределитель памяти арены - коллекцию всех указателей на объекты Python (как локальных, так и глобальных) - подчинённые интерпретаторы не могут получать доступ к имеющимся глобальным переменным прочих интерпретаторов.

Аналогично работе со множеством процессов, для совместного использования объектов между интерпретаторами вам надлежит упорядочивать их или применять ctypes и пользоваться некой формой IPC (сетевая среда, диск, либо совместная память).

  Относящиеся к делу исходные файлы

Вот те исходные файлы, которые относятся к подчинённым интерпретаторам:

Таблица 10-8. Относящиеся к подчинённым интерпретаторам исходные файлы
Файл Назначение

Lib/_xxsubinterpreters.c

Реализация C модуля subinterpreters

Python/pylifecycle.c

Реализация C API управления самим интерпретатором

  Пример

В финальном примере приложения, наш код реального подключения должен заключаться в некой строке. В 3.9, подчинённый интерпретатор может исполнять лишь строку кода.

Для запуска каждого подчинённого интерпретатора, запускается некий список потоков с обратным вызовом к функции, run().

Эта функция будет:

  • Создавать канал взаимодействия

  • Запускать новый интерпретатор

  • Отправлять в подчинённый интерпретатор необходимый для исполнения код

  • Получать данные через установленный канал взаимодействия

  • В случае успешного подключения к порту, добавлять его в очередь с безопасным потоком, cpython-book-samples/33/portscanner_subinterpreters.py:


import time
import _xxsubinterpreters as subinterpreters
from threading import Thread
import textwrap as tw
from queue import Queue

timeout = 1  # In seconds

def run(host: str, port: int, results: Queue):
    # Create a communication channel
    channel_id = subinterpreters.channel_create()
    interpid = subinterpreters.create()
    subinterpreters.run_string(
        interpid,
        tw.dedent(
    """
    import socket; import _xxsubinterpreters as subinterpreters
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    result = sock.connect_ex((host, port))
    subinterpreters.channel_send(channel_id, result)
    sock.close()
    """),
        shared=dict(
            channel_id=channel_id,
            host=host,
            port=port,
            timeout=timeout
        ))
    output = subinterpreters.channel_recv(channel_id)
    subinterpreters.channel_release(channel_id)
    if output == 0:
        results.put(port)

if __name__ == '__main__':
    start = time.time()
    host = "127.0.0.1"  # Pick a host you own
    threads = []
    results = Queue()
    for port in range(80, 100):
        t = Thread(target=run, args=(host, port, results))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    while not results.empty():
        print("Port {0} is open".format(results.get()))
    print("Completed scan in {0} seconds".format(time.time() - start))
 	   

По причине снижения накладных расходов по сравнению с работой со множеством процессов, этот пример должен исполняться на от 30 до 40 процентов быстрее и с меньшими ресурсами памяти:


$ python portscanner_subinterpreters.py
Port 80 is open
Completed scan in 1.3474230766296387 seconds
		

Выводы

Поздравляем с освоением самой большой главы в этой книге! Вы прошли большой путь. Давайте вспомним некоторые понятия и способы их применения.

Для действительного параллельного выполнения вам необходимо множество ЦПУ и ядер. Вам также требуется применять либо пакет multiprocessing, либо пакет subinterpreters с тем, чтобы интерпретатор Python был способен исполняться параллельно.

Помните, что время запуска значительно, а потому всякий интерпретатор обладает большими накладными расходами памяти. Когда та задача, которую вы желаете исполнить, имеет короткое время жизни, пользуйтесь пулом исполнителей и очередью задач.

Если у вас имеется большое число задач, ограниченных вводом/ выводом и вы желаете исполнять их одновременно, тогда вам следует применять работу со множеством потоков или сопрограммы из пакета asyncio.

Все эти четыре подхода требуют некого понимания того как безопасно и действенно обмениваться данными между процессами и потоками. Наилучший способ закрепления изученного вами материала состоит в просмотре написанного вами приложения и поиске того, как оно может быть воспроизведено с целью применения этих технологий.