Потоки и процессы. GIL. subprocess, concurrent.futures.

Потоки и процессы.

Сегодня мы будем пытаться понять следующие вещи:

  1. Как запустить стороннюю программу с помощью Python?
  2. Как запустить несколько функций параллельно?

Что такое многозадачность?

multitasking.PNG

Чем параллельное выполнение отличается от многозадачности?

parallelism.PNG

Исключает ли одно другое?

Нет.

multiparallelism.png

А что такое процесс?

htop.PNG

taskmgr.PNG

Чем отличается процесс от потока?

Много процессов

processes.PNG

Многопоточный процесс

threads.PNG

Общая память - классический источник непредсказуемых проблем! meme_ancientevil.jpg

Процессы/потоки могут запускать дочерние процессы/потоки.

Процесс первичен - потоки могут существовать только внутри процесса.

Простыми словами, процесс - это запущенная программа. Поток - это единица, которой ОС может назначать (allocate) процессорное время.

Выводы:

  1. Thread (поток) - это квазисамостоятельная часть программы, которую можно выделить в отдельную подзадачу (например, выполнение отдельной функции).
  2. Потоки внутри одного процесса имеют общую память.
  3. Программирование на потоках крайне, крайне трудоемко.
  4. Process (процесс) - это отдельная субпрограмма, имеющая отдельную же память, PID, сокеты, I/O, и т.д.
  5. Обмен информацией между процессами сложнее, чем между потоками (но в нем намного труднее что-то сломать). Мы разберем обмен информацией через Pipe.

Вопросы на понимание:

Что ограничивается числом процессоров?

Фактически, число ядер не ограничивает ни общее число процессов, ни общее число потоков. Ограничивается лишь максимальное количество одновременно выполняемых потоков во всех процессах (по умолчанию в каждом процессе есть 1 поток).

В какой реализации параллельные вычисления будут занимать меньше памяти?

Если вы сможете сделать это на потоках, то хорошо. Но в большинстве сучаев это не стоит трудов, тем более в Python.

Сначала разберемся с простым?

Запуск дочернего процесса с помощью subprocess

Иногда нам необходимо вызвать какую-то внешнюю программу и использовать результат ее выполнения.

In [1]:
import subprocess

Как просто запустить стороннюю программу?

Почти никогда нам не нужно просто запустить программу, но...

In [2]:
subprocess.call(["./gen_svg", "out.svg"])
Out[2]:
0
In [3]:
from IPython.core.display import SVG
SVG(filename='out.svg')
Out[3]:

Что возвращает нам программа?

Это называется "код возврата" (return code).

In [4]:
retcod = subprocess.call(["./gen_svg", "out.svg"])
In [5]:
import sys

retcod = subprocess.call(["./gen_svg"])
if retcod != 0:
    print("Error", file=sys.stderr)
Error

Как ругаться на ошибку?

Для этого есть специальная функция subprocess.check_call.

In [6]:
subprocess.check_call(["./gen_svg", "out.svg"])
Out[6]:
0
In [7]:
subprocess.check_call(["./gen_svg"])
---------------------------------------------------------------------------
CalledProcessError                        Traceback (most recent call last)
<ipython-input-7-fc4a6d22d950> in <module>
----> 1 subprocess.check_call(["./gen_svg"])

~/.anaconda3/lib/python3.7/subprocess.py in check_call(*popenargs, **kwargs)
    361         if cmd is None:
    362             cmd = popenargs[0]
--> 363         raise CalledProcessError(retcode, cmd)
    364     return 0
    365 

CalledProcessError: Command '['./gen_svg']' returned non-zero exit status 1.

Как удобно нарезать команду?

Мы привыкли набирать команду в виде строки, а тут надо извращаться со списком. Но можно облегчить себе жизнь! :^)

Для этого есть модуль shlex.

In [10]:
import shlex
In [11]:
cmd = "./gen_svg out.svg"
shlex.split(cmd)
Out[11]:
['./gen_svg', 'out.svg']

А почему бы просто не использовать str.split?

In [12]:
cmd = "./gen_svg 'bad name.svg'"
cmd.split()
Out[12]:
['./gen_svg', "'bad", "name.svg'"]
In [13]:
shlex.split(cmd)
Out[13]:
['./gen_svg', 'bad name.svg']

При переводе команды-строки в список важно использовать shlex или другой специальный модуль.

Как не очень просто запустить стороннюю программу?..

Чаще нам бывает нужно не просто запустить программу, но еще и каким-то образом обработать ее вывод. Самый простой способ получить вывод - функция subprocess.check_output.

In [14]:
cmd = "blastx -version"
output = subprocess.check_output(shlex.split(cmd))
output
Out[14]:
b'blastx: 2.9.0+\n Package: blast 2.9.0, build Sep 30 2019 01:57:31\n'

Что это за b перед строкой?

#вопросназасыпку

Binary strings

Python не знает, в какой кодировке будет писать ответ вызываемой программы. Если уверены, что кодировка - utf8, то просто задайте это в параметре.

In [15]:
output.decode()  # same as `output.decode(encoding='utf8')`
Out[15]:
'blastx: 2.9.0+\n Package: blast 2.9.0, build Sep 30 2019 01:57:31\n'
In [16]:
output.decode(encoding='utf7')
Out[16]:
'blastx: 2.9.0\n Package: blast 2.9.0, build Sep 30 2019 01:57:31\n'

Иначе нужно задать str.decode другую кодировку.

Продвинутый запуск дочернего процесса с помощью subprocess

subprocess.run

Функционалом всех описанных выше команды из subprocess можно пользоваться, но обычно вместо них используют более удобную функцию subprocess.run и еще более гибкий класс subprocess.Popen.

In [17]:
cmd = "blastx --VeRsIoN"  # incorrect command line
output = subprocess.run(shlex.split(cmd), check=True)
output
---------------------------------------------------------------------------
CalledProcessError                        Traceback (most recent call last)
<ipython-input-17-d39b61040bc9> in <module>
      1 cmd = "blastx --VeRsIoN"  # incorrect command line
----> 2 output = subprocess.run(shlex.split(cmd), check=True)
      3 output

~/.anaconda3/lib/python3.7/subprocess.py in run(input, capture_output, timeout, check, *popenargs, **kwargs)
    510         if check and retcode:
    511             raise CalledProcessError(retcode, process.args,
--> 512                                      output=stdout, stderr=stderr)
    513     return CompletedProcess(process.args, retcode, stdout, stderr)
    514 

CalledProcessError: Command '['blastx', '--VeRsIoN']' returned non-zero exit status 1.

subprocess.Popen

Подробно параметры конструктора класса subprocess.Popen можно посмотреть во встроенной документации.

In [18]:
?subprocess.Popen

Важное отличие subprocess.Popen от subprocess.run в том, что Popen не дожидается завершения программы после вызова.

Как просто запустить процесс?

In [19]:
cmd = "./gen_svg out.svg"
process = subprocess.Popen(shlex.split(cmd))
process.wait()
Out[19]:
0

Как запустить процесс и получить вывод?

Плохой способ:

In [20]:
cmd = "blastx -version"
process = subprocess.Popen(
    shlex.split(cmd),
    stdout=subprocess.PIPE  # catches output at stdout
)
process.wait()
process.stdout.read()
Out[20]:
b'blastx: 2.9.0+\n Package: blast 2.9.0, build Sep 30 2019 01:57:31\n'

Хороший способ:

In [21]:
cmd = "blastx -version"
process = subprocess.Popen(
    shlex.split(cmd),
    stdout=subprocess.PIPE  # catches output at stdout
)
stdout, _ = process.communicate()
stdout
Out[21]:
b'blastx: 2.9.0+\n Package: blast 2.9.0, build Sep 30 2019 01:57:31\n'

Если мы ожидаем текст, а не бинарный вывод:

In [22]:
cmd = "blastx -version"
process = subprocess.Popen(
    shlex.split(cmd),
    stdout=subprocess.PIPE,  # catches output at stdout
    text=True
)
stdout, _ = process.communicate()
stdout
Out[22]:
'blastx: 2.9.0+\n Package: blast 2.9.0, build Sep 30 2019 01:57:31\n'

Как ловить информацию из обоих потоков (stdout, stderr)?

In [23]:
cmd = "blastx -version"
process = subprocess.Popen(
    shlex.split(cmd),
    stdout=subprocess.PIPE,  # catches output at stdout
    stderr=subprocess.PIPE   # catches output at stderr
)

stdout, stderr = process.communicate()
stdout, stderr
Out[23]:
(b'blastx: 2.9.0+\n Package: blast 2.9.0, build Sep 30 2019 01:57:31\n', b'')

В stderr могут содержаться важные сообщения, к примеру, разные Warnings (предупреждения, что не нашлось определенного файла, результат неточен и т.д.)

In [24]:
cmd = "seqret fasta::egfr.fasta phylipnon::stdout"
process = subprocess.Popen(
    shlex.split(cmd),
    stdout=subprocess.PIPE,  # catches output at stdout
    stderr=subprocess.PIPE   # catches output at stderr
)
stdout, stderr = process.communicate()
print("===STDOUT===")
print(stdout.decode())
print("===STDERR===")
print(stderr.decode())
===STDOUT===
1 378
KP325210.1CCTCTGCCAT ATCAAGACGA TCAATTGGGA GGAAATAATT ACCGGTCCGG
          GAGGCCGGTA CTTTTACGTG TACAATTTTA CGTCGCCGGA ACGCAATTGT
          CCGGAATGCG ACGAGAGCTG CGAACAGGGT TGCTGGGGCG AGGGTCCGGA
          GAACTGTCAA AAGTACTCGA AGACGAACTG CTCGCCTCAG TGCTGGCAGG
          GCAGGTGTTT CGGTCCTAAT CCACGCGAGT GTTGCCATCT TTTTTGCGCC
          GGTGGCTGCA CCGGTCCCAA ACAGAGCGAC TGTCTTGCCT GTAAGAATTT
          CTTCGACGAT GGCGTGTGCA CCCAGGAATG CCCGCCCATG CAAAAGTAAG
          TAACATCACG ATTCTTCTGT CTTTTTGC

===STDERR===
Read and write (return) sequences

Также можно сразу заставлять Python записывать выдачу в отдельные файлы:

In [25]:
with open("egfr.phy", "w") as stdout_handle, open("seqret.log", "w") as stderr_handle:
    cmd = "seqret fasta::egfr.fasta phylipnon::stdout"
    process = subprocess.Popen(
        shlex.split(cmd),
        stdout=stdout_handle,  # redirects output from stdout
        stderr=stderr_handle   # redirects output from stderr
    )
In [26]:
!cat egfr.phy
!cat seqret.log
1 378
KP325210.1CCTCTGCCAT ATCAAGACGA TCAATTGGGA GGAAATAATT ACCGGTCCGG
          GAGGCCGGTA CTTTTACGTG TACAATTTTA CGTCGCCGGA ACGCAATTGT
          CCGGAATGCG ACGAGAGCTG CGAACAGGGT TGCTGGGGCG AGGGTCCGGA
          GAACTGTCAA AAGTACTCGA AGACGAACTG CTCGCCTCAG TGCTGGCAGG
          GCAGGTGTTT CGGTCCTAAT CCACGCGAGT GTTGCCATCT TTTTTGCGCC
          GGTGGCTGCA CCGGTCCCAA ACAGAGCGAC TGTCTTGCCT GTAAGAATTT
          CTTCGACGAT GGCGTGTGCA CCCAGGAATG CCCGCCCATG CAAAAGTAAG
          TAACATCACG ATTCTTCTGT CTTTTTGC
Read and write (return) sequences

...или даже объединить (в хронологическом порядке!) выдачу из обоих потоков в один файл (некоторые программы выдают логи в оба потока вперемешку).

In [27]:
with open("seqret_full.log", "w") as handle:
    cmd = "seqret fasta::egfr.fasta phylipnon::stdout"
    process = subprocess.Popen(
        shlex.split(cmd),
        stdout=handle,  # redirects output from stdout
        stderr=subprocess.STDOUT  # redirects output from stderr to stdout
    )
process.communicate()
Out[27]:
(None, None)
In [28]:
!cat seqret_full.log
Read and write (return) sequences
1 378
KP325210.1CCTCTGCCAT ATCAAGACGA TCAATTGGGA GGAAATAATT ACCGGTCCGG
          GAGGCCGGTA CTTTTACGTG TACAATTTTA CGTCGCCGGA ACGCAATTGT
          CCGGAATGCG ACGAGAGCTG CGAACAGGGT TGCTGGGGCG AGGGTCCGGA
          GAACTGTCAA AAGTACTCGA AGACGAACTG CTCGCCTCAG TGCTGGCAGG
          GCAGGTGTTT CGGTCCTAAT CCACGCGAGT GTTGCCATCT TTTTTGCGCC
          GGTGGCTGCA CCGGTCCCAA ACAGAGCGAC TGTCTTGCCT GTAAGAATTT
          CTTCGACGAT GGCGTGTGCA CCCAGGAATG CCCGCCCATG CAAAAGTAAG
          TAACATCACG ATTCTTCTGT CTTTTTGC

Как запустить внешний скрипт на Python?

Верный ответ: встроить его в вашу программу :^)

Но часто на такое нет времени, сил и желания. Что ж, скрипты на Python тоже можно запускать как процессы!

In [29]:
!cat solution_f.py
import sys

with open(sys.argv[1]) as inp:
    a = inp.readline()
    b = inp.readline()
print(int(a) + int(b))
In [30]:
cmd = "python3 solution_f.py tests/test1.txt"
process = subprocess.Popen(
    shlex.split(cmd),
    stdout=subprocess.PIPE,  # catches output from stdout
    text=True
)
stdout, _ = process.communicate()
print(stdout)
5

Как передать запущенной программе что-то на stdin?

In [31]:
!cat solution.py
a = input()
b = input()
print(int(a) + int(b))
In [32]:
!cat tests/test1.txt
2
3
In [33]:
with open("tests/test1.txt") as stdin_handle:
    cmd = "python3 solution.py"
    process = subprocess.Popen(
        shlex.split(cmd),
        stdin=stdin_handle,
        stdout=subprocess.PIPE,  # catches output from stdout
        text=True
    )
print(process.communicate()[0])
5

Как... запустить много процессов сразу?

In [34]:
procs = []
for i in range(10):
    cmd = "python solution_f.py tests/test1.txt"
    process = subprocess.Popen(
        shlex.split(cmd),
        stdout=subprocess.PIPE, # указываем, что мы хотим ловить stdout
        stderr=subprocess.STDOUT, # перенаправить stderr в stdout
        text=True,
        cwd='./'
    )
    procs.append(process)
In [35]:
for p in procs:
    print(p.communicate())
('5\n', None)
('5\n', None)
('5\n', None)
('5\n', None)
('5\n', None)
('5\n', None)
('5\n', None)
('5\n', None)
('5\n', None)
('5\n', None)

Фактически это был самый простой способ запустить параллельно множество процессов!

Однако... у нас нет контроля за тем, сколько процессов звпускать одновременно. Если на общем сервере запустить сразу 200 тяжелых процессов (или 5-10 на вашем локальном компьютере), то он наверняка зависнет, и его, возможно, даже придется перезагружать.

Кроме того, не очень ясно, как контролировать, завершился процесс или упал/завис, и в зависимости от этого менять работу основной программы.

К счастью, эти фичи реализованы в модуле concurrent.futures, который мы сейчас разберем.

Выводы:

  1. subprocess позволяет запускать сторонние программы в качестве дочерних процессов интерпретатора Python.
  2. Простой запуск делается с помощью subprocess.call.
  3. Если вы хотите использовать операции ввода/вывода для процесса, то используйте параметры subprocess.Popen.
  4. Для перенаправления вывода в родительский процесс (вашу программу) используйте subprocess.PIPE.

concurrent.futures

В модуле concurrent.futures описано 2 класса, с помощью которых удобно запускать параллельные вычисления.

In [36]:
import concurrent.futures

Но сначала немного теории

meme_bored.jpg

Люди задачи бывают двух типов:

CPU bound

bound_cpu.PNG Примеры?

  • Процессинг изображений
  • Матричные операции

I/O bound

bound_io.PNG Примеры?

  • Чтение файла/запись в файл
  • Загрузка данных из интернета
  • Ожидание ввода пользователя
  • Ожидание работы другой программы

...большинство программ относится к этому типу.

Классы в concurrent.futures

У нас есть 2 класса:

  • concurrent.futures.ThreadPoolExecutor
  • concurrent.futures.ProcessPoolExecutor

ThreadPoolExecutor запускает потоки.

ProcessPoolExecutor запускает процессы.

Как запускать пул потоков/процессов?

In [37]:
def factorial_recursive(x):
    if x <= 1:
        return 1
    return x * factorial_recursive(x - 1)

def perform(x):
    return factorial_recursive(x)
In [38]:
with concurrent.futures.ThreadPoolExecutor() as executor:
    # or use ProcessPoolExecutor instead, works the same
    futures = {executor.submit(perform, task_arg) for task_arg in range(7)}

    for fut in concurrent.futures.as_completed(futures):
        print(f"The outcome is {fut.result()}")
The outcome is 6
The outcome is 720
The outcome is 24
The outcome is 2
The outcome is 1
The outcome is 1
The outcome is 120

Что происходит?

In [39]:
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {executor.submit(perform, task_arg) for task_arg in range(7)}
    
    for fut in concurrent.futures.as_completed(futures):
        print(f"The outcome is {fut.result()}")
The outcome is 120
The outcome is 1
The outcome is 720
The outcome is 24
The outcome is 1
The outcome is 2
The outcome is 6
  • Создаем Executor
  • Отсылаем ему задачи методом executor.submit (Executor создает для их выполнения потоки или процессы)
  • Executor возвращает нам объекты класса Future
  • Функция concurrent.futures.as_completed принимает коллекцию из объектов класса Future и выдает их в порядке их выполнения
  • Метод Future.result возвращает то, что вычислилось бы при запуске perform с аргументом task_arg

А как сопоставить результаты исходным данным?

Можно создать словарь:

In [40]:
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {executor.submit(perform, task_arg): task_arg for task_arg in range(7)}
    # Notice the order! Future objects are dict keys, not values!
    
    for fut in concurrent.futures.as_completed(futures):
        original_task_arg = futures[fut]
        print(f"The result of {original_task_arg} is {fut.result()}")
The result of 4 is 24
The result of 2 is 2
The result of 5 is 120
The result of 0 is 1
The result of 6 is 720
The result of 3 is 6
The result of 1 is 1

Еще один способ раскидать данные по процессам/потокам

Аналогично рассмотренной ранее функции map(), метод executor.map() применяет функцию к соответствующей коллекции!

In [41]:
with concurrent.futures.ProcessPoolExecutor() as executor:
    for arg, res in zip(range(7), executor.map(perform, range(7))):
        print(f"The result of {arg} is {res}")
The result of 0 is 1
The result of 1 is 1
The result of 2 is 2
The result of 3 is 6
The result of 4 is 24
The result of 5 is 120
The result of 6 is 720

В данном случае мы ждем окончания выполнения каждой из задач в нужном порядке.

Вопрос на непонимание:

Исходя из ваших знаний, что больше подходит для трудоемких вычислительных задач?

Возможно, и потоки, и процессы?..

А что больше подходит для задачи одновременной загрузки множества файлов?

Опять и потоки, и процессы? Но тут хотя бы понятно, что потоки наверняка лучше - нам не нужно задумываться о том, как будут взаимодействовать задачи между собой, нужно просто ждать очереди от ОС.

Кажется, что процессы тратят дополнительную память и менее выгодны в обоих случаях?..

(спойлер: это не совсем так)

Давайте рассмотрим несколько боевых примеров, и попытаемся понять, когда какой тип параллелизации лучше использовать.

Пример I/O-bound задачи

In [42]:
import urllib
In [43]:
def load_url(url, timeout):
    """Retrieve a single page and report its contents"""
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
In [44]:
URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

Запускаем просто так:

In [45]:
%%time
for url in URLS:
    try:
        data = load_url(url, 60)
    except Exception as exc:
        print(f"{url} generated an exception: {exc}")
    else:
        print(f"{url} page is {len(data)} bytes")
http://www.foxnews.com/ page is 280944 bytes
http://www.cnn.com/ page is 1117442 bytes
http://europe.wsj.com/ generated an exception: HTTP Error 403: Forbidden
http://www.bbc.co.uk/ page is 312452 bytes
http://some-made-up-domain.com/ generated an exception: <urlopen error [Errno -2] Name or service not known>
CPU times: user 145 ms, sys: 42.9 ms, total: 188 ms
Wall time: 5.71 s

~ 6 секунд!

Запускаем с процессами:

In [46]:
%%time
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print(f"{url} generated an exception: {exc}")
        else:
            print(f"{url} page is {len(data)} bytes")
http://europe.wsj.com/ generated an exception: cannot serialize '_io.BufferedReader' object
http://some-made-up-domain.com/ generated an exception: <urlopen error [Errno -2] Name or service not known>
http://www.foxnews.com/ page is 280946 bytes
http://www.bbc.co.uk/ page is 312452 bytes
http://www.cnn.com/ page is 1117442 bytes
CPU times: user 17.9 ms, sys: 22.6 ms, total: 40.4 ms
Wall time: 3.61 s

Лучше, ~ 3.5 секунды. Но запуск самого процесса занимает много времени.

Запускаем с потоками:

In [47]:
%%time
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print(f"{url} generated an exception: {exc}")
        else:
            print(f"{url} page is {len(data)} bytes")
http://some-made-up-domain.com/ generated an exception: <urlopen error [Errno -2] Name or service not known>
http://europe.wsj.com/ generated an exception: HTTP Error 403: Forbidden
http://www.foxnews.com/ page is 280945 bytes
http://www.bbc.co.uk/ page is 312452 bytes
http://www.cnn.com/ page is 1118139 bytes
CPU times: user 153 ms, sys: 52 ms, total: 205 ms
Wall time: 2.95 s

Еще лучше, < 3 секунд!

Какой можно сделать вывод?

Для I/O-bound задач лучше использовать потоки (ThreadPoolExecutor).

Пример CPU-bound задачи

In [48]:
import math
In [49]:
def is_prime(n):
    if n == 2:
        return True
    if n < 2 or n % 2 == 0:
        return False
    
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True
In [50]:
PRIMES = [112272535095293, 112582705942171,
          112272535095293, 115280095190773,
          518463576809201, 535006138814359,
          555554444332213, 578415690713087,
          115797848077099, 777737777777777,
          777773333999551, 777777355553753,
          1099726899285411]

Запускаем просто так:

In [51]:
%%time
for prime in PRIMES:
    prime_bool = is_prime(prime)
    print(f"{prime} is {'prime' if prime_bool else 'not prime'}")
112272535095293 is prime
112582705942171 is prime
112272535095293 is prime
115280095190773 is prime
518463576809201 is prime
535006138814359 is prime
555554444332213 is prime
578415690713087 is prime
115797848077099 is prime
777737777777777 is prime
777773333999551 is prime
777777355553753 is prime
1099726899285411 is not prime
CPU times: user 6.16 s, sys: 7.74 ms, total: 6.17 s
Wall time: 6.17 s

~ 6.5 секунд!

Запускаем с процессами:

In [52]:
%%time
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
    future_to_prime = {executor.submit(is_prime, prime): prime for prime in PRIMES}
    for future in concurrent.futures.as_completed(future_to_prime):
        prime = future_to_prime[future]
        prime_bool = future.result()
        print(f"{prime} is {'prime' if prime_bool else 'not prime'}")
112272535095293 is prime
112272535095293 is prime
115280095190773 is prime
112582705942171 is prime
535006138814359 is prime
555554444332213 is prime
578415690713087 is prime
115797848077099 is prime
1099726899285411 is not prime
518463576809201 is prime
777737777777777 is prime
777773333999551 is prime
777777355553753 is prime
CPU times: user 15.6 ms, sys: 12 ms, total: 27.6 ms
Wall time: 2.07 s

Уже лучше, ~ 2 секунды!

Запускаем с потоками:

In [53]:
%%time
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_prime = {executor.submit(is_prime, prime): prime for prime in PRIMES}
    for future in concurrent.futures.as_completed(future_to_prime):
        prime = future_to_prime[future]
        prime_bool = future.result()
        print(f"{prime} is {'prime' if prime_bool else 'not prime'}")
115280095190773 is prime
112582705942171 is prime
112272535095293 is prime
112272535095293 is prime
518463576809201 is prime
115797848077099 is prime
578415690713087 is prime
535006138814359 is prime
1099726899285411 is not prime
555554444332213 is prime
777773333999551 is prime
777737777777777 is prime
777777355553753 is prime
CPU times: user 8.99 s, sys: 57.6 ms, total: 9.05 s
Wall time: 9.64 s

...

Какой можно сделать вывод?

wtf.gif

А вывод какой?..

Для CPU-bound задач лучше использовать процессы (ProcessPoolExecutor).

Но почему с пятью одновременно запущенными потоками вышло медленнее, чем при последовательном запуске?

На это есть в Python злосчастный...

GIL, или Global Interpreter Lock

GIL - это ограничение интерпретатора Python, позволяющее внутри одного процесса одновременно выполнять только один поток.

gil.gif

Так выполняются три потока одновременно. Источник - Wikipedia Commons.

GIL обеспечивает, чтобы каждый выполняемый поток имел эксклюзивный контроль над всем интерпретатором.

А почему потоки работают быстрее на I/O-bound задачах?

Ah, here lies a tale... При блокирующих операциях (например, sleep, ожидание I/O, и т.п.) GIL снимается.

Так что каждый раз, когда активный поток вынужден ждать, другие потоки могут выполняться.

И что происходит при CPU-bound задачах?

В них ОС периодически (~ раз в 100 ticks) проводит проверку, занимает ли еще процесс CPU, или заблокирован I/O.

В этот момент происходит снятие и переназначение GIL, что может привести к переключению между процессами, но занимает совсем немного времени.

Самые большие потери времени возникают на многоядерных машинах: на них после снятия GIL происходит планирование выполнения (scheduling) потоков одновременно на разные ядра. При этом GIL может быть получен только одним потоком.

В этот момент между потоками происходит смертельная битва за GIL.

meme_mortal_combat.jpg

GIL действует не всегда. Ограничение не распространяется на:

  • Блокирующие операции (I/O, ожидание)
  • C/C++ extensions

Если код написан на C/C++ и поддерживает такую функцию, то он может выполняться параллельно в потоках.

В NumPy, который мы разберем в следующий раз, уже встроена нативная параллелизация (OpenBLAS), и базовые матричные процессы обычно выполняются настолько быстро, насколько могут. Но большинство других написанных на С/С++ пакетов можно удобно параллелизовать и с помощью потоков.

Совместное использование памяти

Как мы уже знаем, все потоки одного процесса имеют одно общее пространство памяти. Процессы в некоторых случаях также могут иметь общую память, но по умолчанию память между ними разделена.

Пример совместного использования памяти

In [54]:
import random

def append_random_int(lst):
    number = random.randint(1, 100)
    lst.append(number)

Обратите внимание, что функция append_random_int изменяет переданный в нее объект.

Запускаем с потоками:

In [55]:
LIST = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for _ in range(20):
        executor.submit(append_random_int, LIST)
LIST
Out[55]:
[33, 27, 28, 17, 35, 32, 84, 21, 58, 22, 31, 96, 43, 81, 3, 43, 51, 45, 41, 94]

Запускаем с процессами:

In [56]:
LIST = list()
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
    for _ in range(20):
        executor.submit(append_random_int, LIST)
LIST
Out[56]:
[]

При изменении внутри потока объект изменяется везде. При изменении внутри процесса объект копируется и происходит изменение копии, исходный объект остается неизменным.

Выводы:

  1. В модуле concurrent.futures реализовано 2 класса (и множество работающих с ними функций), предназначенных для удобной параллелизации задач.
  2. Параллелизация процессами необходима для вычислительных (CPU bound) задач.
  3. Параллелизация потоками необходима для задач, блокирующихся вводом-выводом (I/O bound).
  4. С помощью параллелизации можно ускорить выполнение многих задач в десятки раз.

Несмотря на все плюсы параллелизации, она не всесильна. Если каждый последующий запуск определенного кода использует результат предыдущего запуска, то параллелизация здесь никак не получится. Есть множество других способов ускорить код, например, хэширование, прекомпиляция (numba) или переписывание логики на другом языке (C/C++).

В большинстве случаев наибольшее количество времени экономит правильный выбор алгоритма выполнения задачи. Этому искусству будет посвящен целый курс в следующем семестре.

Итог занятия

Выводы:

  1. Есть 2 типа задач: I/O bound и CPU bound.
  2. Для I/O bound используйте преимущественно параллелизацию потоками. Если не получается, то процессами.
  3. Для CPU bound используйте параллелизацию процессами. Если не получается, то проверьте параллелизацию потоками (может быть хуже или лучше).
  4. Параллелизация не серебряная пуля. Во многих случаях ускорить работу программы можно намного более простыми способами.

thatsallfolks.gif