subprocess
, concurrent.futures
.¶Сегодня мы будем пытаться понять следующие вещи:
Общая память - классический источник непредсказуемых проблем!
Простыми словами, процесс - это запущенная программа. Поток - это единица, которой ОС может назначать (allocate) процессорное время.
Фактически, число ядер не ограничивает ни общее число процессов, ни общее число потоков. Ограничивается лишь максимальное количество одновременно выполняемых потоков во всех процессах (по умолчанию в каждом процессе есть 1 поток).
Если вы сможете сделать это на потоках, то хорошо. Но в большинстве сучаев это не стоит трудов, тем более в Python.
subprocess
¶Иногда нам необходимо вызвать какую-то внешнюю программу и использовать результат ее выполнения.
import subprocess
Почти никогда нам не нужно просто запустить программу, но...
subprocess.call(["./gen_svg", "out.svg"])
0
from IPython.core.display import SVG
SVG(filename='out.svg')
Это называется "код возврата" (return code).
retcod = subprocess.call(["./gen_svg", "out.svg"])
import sys
retcod = subprocess.call(["./gen_svg"])
if retcod != 0:
print("Error", file=sys.stderr)
Error
Для этого есть специальная функция subprocess.check_call
.
subprocess.check_call(["./gen_svg", "out.svg"])
0
subprocess.check_call(["./gen_svg"])
--------------------------------------------------------------------------- CalledProcessError Traceback (most recent call last) <ipython-input-7-fc4a6d22d950> in <module> ----> 1 subprocess.check_call(["./gen_svg"]) ~/.anaconda3/lib/python3.7/subprocess.py in check_call(*popenargs, **kwargs) 361 if cmd is None: 362 cmd = popenargs[0] --> 363 raise CalledProcessError(retcode, cmd) 364 return 0 365 CalledProcessError: Command '['./gen_svg']' returned non-zero exit status 1.
Мы привыкли набирать команду в виде строки, а тут надо извращаться со списком. Но можно облегчить себе жизнь! :^)
Для этого есть модуль shlex
.
import shlex
cmd = "./gen_svg out.svg"
shlex.split(cmd)
['./gen_svg', 'out.svg']
str.split
?¶cmd = "./gen_svg 'bad name.svg'"
cmd.split()
['./gen_svg', "'bad", "name.svg'"]
shlex.split(cmd)
['./gen_svg', 'bad name.svg']
При переводе команды-строки в список важно использовать shlex
или другой специальный модуль.
Чаще нам бывает нужно не просто запустить программу, но еще и каким-то образом обработать ее вывод. Самый простой способ получить вывод - функция subprocess.check_output
.
cmd = "blastx -version"
output = subprocess.check_output(shlex.split(cmd))
output
b'blastx: 2.9.0+\n Package: blast 2.9.0, build Sep 30 2019 01:57:31\n'
b
перед строкой?¶#вопросназасыпку
Python не знает, в какой кодировке будет писать ответ вызываемой программы. Если уверены, что кодировка - utf8
, то просто задайте это в параметре.
output.decode() # same as `output.decode(encoding='utf8')`
'blastx: 2.9.0+\n Package: blast 2.9.0, build Sep 30 2019 01:57:31\n'
output.decode(encoding='utf7')
'blastx: 2.9.0\n Package: blast 2.9.0, build Sep 30 2019 01:57:31\n'
Иначе нужно задать str.decode
другую кодировку.
subprocess
¶subprocess.run
¶Функционалом всех описанных выше команды из subprocess
можно пользоваться, но обычно вместо них используют более удобную функцию subprocess.run
и еще более гибкий класс subprocess.Popen
.
cmd = "blastx --VeRsIoN" # incorrect command line
output = subprocess.run(shlex.split(cmd), check=True)
output
--------------------------------------------------------------------------- CalledProcessError Traceback (most recent call last) <ipython-input-17-d39b61040bc9> in <module> 1 cmd = "blastx --VeRsIoN" # incorrect command line ----> 2 output = subprocess.run(shlex.split(cmd), check=True) 3 output ~/.anaconda3/lib/python3.7/subprocess.py in run(input, capture_output, timeout, check, *popenargs, **kwargs) 510 if check and retcode: 511 raise CalledProcessError(retcode, process.args, --> 512 output=stdout, stderr=stderr) 513 return CompletedProcess(process.args, retcode, stdout, stderr) 514 CalledProcessError: Command '['blastx', '--VeRsIoN']' returned non-zero exit status 1.
subprocess.Popen
¶Подробно параметры конструктора класса subprocess.Popen
можно посмотреть во встроенной документации.
?subprocess.Popen
Важное отличие subprocess.Popen
от subprocess.run
в том, что Popen
не дожидается завершения программы после вызова.
cmd = "./gen_svg out.svg"
process = subprocess.Popen(shlex.split(cmd))
process.wait()
0
Плохой способ:
cmd = "blastx -version"
process = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE # catches output at stdout
)
process.wait()
process.stdout.read()
b'blastx: 2.9.0+\n Package: blast 2.9.0, build Sep 30 2019 01:57:31\n'
Хороший способ:
cmd = "blastx -version"
process = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE # catches output at stdout
)
stdout, _ = process.communicate()
stdout
b'blastx: 2.9.0+\n Package: blast 2.9.0, build Sep 30 2019 01:57:31\n'
cmd = "blastx -version"
process = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE, # catches output at stdout
text=True
)
stdout, _ = process.communicate()
stdout
'blastx: 2.9.0+\n Package: blast 2.9.0, build Sep 30 2019 01:57:31\n'
cmd = "blastx -version"
process = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE, # catches output at stdout
stderr=subprocess.PIPE # catches output at stderr
)
stdout, stderr = process.communicate()
stdout, stderr
(b'blastx: 2.9.0+\n Package: blast 2.9.0, build Sep 30 2019 01:57:31\n', b'')
В stderr
могут содержаться важные сообщения, к примеру, разные Warnings (предупреждения, что не нашлось определенного файла, результат неточен и т.д.)
cmd = "seqret fasta::egfr.fasta phylipnon::stdout"
process = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE, # catches output at stdout
stderr=subprocess.PIPE # catches output at stderr
)
stdout, stderr = process.communicate()
print("===STDOUT===")
print(stdout.decode())
print("===STDERR===")
print(stderr.decode())
===STDOUT=== 1 378 KP325210.1CCTCTGCCAT ATCAAGACGA TCAATTGGGA GGAAATAATT ACCGGTCCGG GAGGCCGGTA CTTTTACGTG TACAATTTTA CGTCGCCGGA ACGCAATTGT CCGGAATGCG ACGAGAGCTG CGAACAGGGT TGCTGGGGCG AGGGTCCGGA GAACTGTCAA AAGTACTCGA AGACGAACTG CTCGCCTCAG TGCTGGCAGG GCAGGTGTTT CGGTCCTAAT CCACGCGAGT GTTGCCATCT TTTTTGCGCC GGTGGCTGCA CCGGTCCCAA ACAGAGCGAC TGTCTTGCCT GTAAGAATTT CTTCGACGAT GGCGTGTGCA CCCAGGAATG CCCGCCCATG CAAAAGTAAG TAACATCACG ATTCTTCTGT CTTTTTGC ===STDERR=== Read and write (return) sequences
Также можно сразу заставлять Python записывать выдачу в отдельные файлы:
with open("egfr.phy", "w") as stdout_handle, open("seqret.log", "w") as stderr_handle:
cmd = "seqret fasta::egfr.fasta phylipnon::stdout"
process = subprocess.Popen(
shlex.split(cmd),
stdout=stdout_handle, # redirects output from stdout
stderr=stderr_handle # redirects output from stderr
)
!cat egfr.phy
!cat seqret.log
1 378 KP325210.1CCTCTGCCAT ATCAAGACGA TCAATTGGGA GGAAATAATT ACCGGTCCGG GAGGCCGGTA CTTTTACGTG TACAATTTTA CGTCGCCGGA ACGCAATTGT CCGGAATGCG ACGAGAGCTG CGAACAGGGT TGCTGGGGCG AGGGTCCGGA GAACTGTCAA AAGTACTCGA AGACGAACTG CTCGCCTCAG TGCTGGCAGG GCAGGTGTTT CGGTCCTAAT CCACGCGAGT GTTGCCATCT TTTTTGCGCC GGTGGCTGCA CCGGTCCCAA ACAGAGCGAC TGTCTTGCCT GTAAGAATTT CTTCGACGAT GGCGTGTGCA CCCAGGAATG CCCGCCCATG CAAAAGTAAG TAACATCACG ATTCTTCTGT CTTTTTGC Read and write (return) sequences
...или даже объединить (в хронологическом порядке!) выдачу из обоих потоков в один файл (некоторые программы выдают логи в оба потока вперемешку).
with open("seqret_full.log", "w") as handle:
cmd = "seqret fasta::egfr.fasta phylipnon::stdout"
process = subprocess.Popen(
shlex.split(cmd),
stdout=handle, # redirects output from stdout
stderr=subprocess.STDOUT # redirects output from stderr to stdout
)
process.communicate()
(None, None)
!cat seqret_full.log
Read and write (return) sequences 1 378 KP325210.1CCTCTGCCAT ATCAAGACGA TCAATTGGGA GGAAATAATT ACCGGTCCGG GAGGCCGGTA CTTTTACGTG TACAATTTTA CGTCGCCGGA ACGCAATTGT CCGGAATGCG ACGAGAGCTG CGAACAGGGT TGCTGGGGCG AGGGTCCGGA GAACTGTCAA AAGTACTCGA AGACGAACTG CTCGCCTCAG TGCTGGCAGG GCAGGTGTTT CGGTCCTAAT CCACGCGAGT GTTGCCATCT TTTTTGCGCC GGTGGCTGCA CCGGTCCCAA ACAGAGCGAC TGTCTTGCCT GTAAGAATTT CTTCGACGAT GGCGTGTGCA CCCAGGAATG CCCGCCCATG CAAAAGTAAG TAACATCACG ATTCTTCTGT CTTTTTGC
Верный ответ: встроить его в вашу программу :^)
Но часто на такое нет времени, сил и желания. Что ж, скрипты на Python тоже можно запускать как процессы!
!cat solution_f.py
import sys with open(sys.argv[1]) as inp: a = inp.readline() b = inp.readline() print(int(a) + int(b))
cmd = "python3 solution_f.py tests/test1.txt"
process = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE, # catches output from stdout
text=True
)
stdout, _ = process.communicate()
print(stdout)
5
stdin
?¶!cat solution.py
a = input() b = input() print(int(a) + int(b))
!cat tests/test1.txt
2 3
with open("tests/test1.txt") as stdin_handle:
cmd = "python3 solution.py"
process = subprocess.Popen(
shlex.split(cmd),
stdin=stdin_handle,
stdout=subprocess.PIPE, # catches output from stdout
text=True
)
print(process.communicate()[0])
5
procs = []
for i in range(10):
cmd = "python solution_f.py tests/test1.txt"
process = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE, # указываем, что мы хотим ловить stdout
stderr=subprocess.STDOUT, # перенаправить stderr в stdout
text=True,
cwd='./'
)
procs.append(process)
for p in procs:
print(p.communicate())
('5\n', None) ('5\n', None) ('5\n', None) ('5\n', None) ('5\n', None) ('5\n', None) ('5\n', None) ('5\n', None) ('5\n', None) ('5\n', None)
Фактически это был самый простой способ запустить параллельно множество процессов!
Однако... у нас нет контроля за тем, сколько процессов звпускать одновременно. Если на общем сервере запустить сразу 200 тяжелых процессов (или 5-10 на вашем локальном компьютере), то он наверняка зависнет, и его, возможно, даже придется перезагружать.
Кроме того, не очень ясно, как контролировать, завершился процесс или упал/завис, и в зависимости от этого менять работу основной программы.
К счастью, эти фичи реализованы в модуле concurrent.futures
, который мы сейчас разберем.
subprocess
позволяет запускать сторонние программы в качестве дочерних процессов интерпретатора Python.subprocess.call
.subprocess.Popen
.subprocess.PIPE
.concurrent.futures
¶В модуле concurrent.futures
описано 2 класса, с помощью которых удобно запускать параллельные вычисления.
import concurrent.futures
Примеры?
Примеры?
...большинство программ относится к этому типу.
concurrent.futures
¶У нас есть 2 класса:
concurrent.futures.ThreadPoolExecutor
concurrent.futures.ProcessPoolExecutor
ThreadPoolExecutor
запускает потоки.
ProcessPoolExecutor
запускает процессы.
def factorial_recursive(x):
if x <= 1:
return 1
return x * factorial_recursive(x - 1)
def perform(x):
return factorial_recursive(x)
with concurrent.futures.ThreadPoolExecutor() as executor:
# or use ProcessPoolExecutor instead, works the same
futures = {executor.submit(perform, task_arg) for task_arg in range(7)}
for fut in concurrent.futures.as_completed(futures):
print(f"The outcome is {fut.result()}")
The outcome is 6 The outcome is 720 The outcome is 24 The outcome is 2 The outcome is 1 The outcome is 1 The outcome is 120
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(perform, task_arg) for task_arg in range(7)}
for fut in concurrent.futures.as_completed(futures):
print(f"The outcome is {fut.result()}")
The outcome is 120 The outcome is 1 The outcome is 720 The outcome is 24 The outcome is 1 The outcome is 2 The outcome is 6
Executor
executor.submit
(Executor
создает для их выполнения потоки или процессы)Executor
возвращает нам объекты класса Future
concurrent.futures.as_completed
принимает коллекцию из объектов класса Future
и выдает их в порядке их выполненияFuture.result
возвращает то, что вычислилось бы при запуске perform
с аргументом task_arg
Можно создать словарь:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(perform, task_arg): task_arg for task_arg in range(7)}
# Notice the order! Future objects are dict keys, not values!
for fut in concurrent.futures.as_completed(futures):
original_task_arg = futures[fut]
print(f"The result of {original_task_arg} is {fut.result()}")
The result of 4 is 24 The result of 2 is 2 The result of 5 is 120 The result of 0 is 1 The result of 6 is 720 The result of 3 is 6 The result of 1 is 1
Аналогично рассмотренной ранее функции map()
, метод executor.map()
применяет функцию к соответствующей коллекции!
with concurrent.futures.ProcessPoolExecutor() as executor:
for arg, res in zip(range(7), executor.map(perform, range(7))):
print(f"The result of {arg} is {res}")
The result of 0 is 1 The result of 1 is 1 The result of 2 is 2 The result of 3 is 6 The result of 4 is 24 The result of 5 is 120 The result of 6 is 720
В данном случае мы ждем окончания выполнения каждой из задач в нужном порядке.
Возможно, и потоки, и процессы?..
Опять и потоки, и процессы? Но тут хотя бы понятно, что потоки наверняка лучше - нам не нужно задумываться о том, как будут взаимодействовать задачи между собой, нужно просто ждать очереди от ОС.
Кажется, что процессы тратят дополнительную память и менее выгодны в обоих случаях?..
(спойлер: это не совсем так)
Давайте рассмотрим несколько боевых примеров, и попытаемся понять, когда какой тип параллелизации лучше использовать.
import urllib
def load_url(url, timeout):
"""Retrieve a single page and report its contents"""
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
%%time
for url in URLS:
try:
data = load_url(url, 60)
except Exception as exc:
print(f"{url} generated an exception: {exc}")
else:
print(f"{url} page is {len(data)} bytes")
http://www.foxnews.com/ page is 280944 bytes http://www.cnn.com/ page is 1117442 bytes http://europe.wsj.com/ generated an exception: HTTP Error 403: Forbidden http://www.bbc.co.uk/ page is 312452 bytes http://some-made-up-domain.com/ generated an exception: <urlopen error [Errno -2] Name or service not known> CPU times: user 145 ms, sys: 42.9 ms, total: 188 ms Wall time: 5.71 s
~ 6 секунд!
%%time
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print(f"{url} generated an exception: {exc}")
else:
print(f"{url} page is {len(data)} bytes")
http://europe.wsj.com/ generated an exception: cannot serialize '_io.BufferedReader' object http://some-made-up-domain.com/ generated an exception: <urlopen error [Errno -2] Name or service not known> http://www.foxnews.com/ page is 280946 bytes http://www.bbc.co.uk/ page is 312452 bytes http://www.cnn.com/ page is 1117442 bytes CPU times: user 17.9 ms, sys: 22.6 ms, total: 40.4 ms Wall time: 3.61 s
Лучше, ~ 3.5 секунды. Но запуск самого процесса занимает много времени.
%%time
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print(f"{url} generated an exception: {exc}")
else:
print(f"{url} page is {len(data)} bytes")
http://some-made-up-domain.com/ generated an exception: <urlopen error [Errno -2] Name or service not known> http://europe.wsj.com/ generated an exception: HTTP Error 403: Forbidden http://www.foxnews.com/ page is 280945 bytes http://www.bbc.co.uk/ page is 312452 bytes http://www.cnn.com/ page is 1118139 bytes CPU times: user 153 ms, sys: 52 ms, total: 205 ms Wall time: 2.95 s
Еще лучше, < 3 секунд!
Для I/O-bound задач лучше использовать потоки (ThreadPoolExecutor
).
import math
def is_prime(n):
if n == 2:
return True
if n < 2 or n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
PRIMES = [112272535095293, 112582705942171,
112272535095293, 115280095190773,
518463576809201, 535006138814359,
555554444332213, 578415690713087,
115797848077099, 777737777777777,
777773333999551, 777777355553753,
1099726899285411]
%%time
for prime in PRIMES:
prime_bool = is_prime(prime)
print(f"{prime} is {'prime' if prime_bool else 'not prime'}")
112272535095293 is prime 112582705942171 is prime 112272535095293 is prime 115280095190773 is prime 518463576809201 is prime 535006138814359 is prime 555554444332213 is prime 578415690713087 is prime 115797848077099 is prime 777737777777777 is prime 777773333999551 is prime 777777355553753 is prime 1099726899285411 is not prime CPU times: user 6.16 s, sys: 7.74 ms, total: 6.17 s Wall time: 6.17 s
~ 6.5 секунд!
%%time
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
future_to_prime = {executor.submit(is_prime, prime): prime for prime in PRIMES}
for future in concurrent.futures.as_completed(future_to_prime):
prime = future_to_prime[future]
prime_bool = future.result()
print(f"{prime} is {'prime' if prime_bool else 'not prime'}")
112272535095293 is prime 112272535095293 is prime 115280095190773 is prime 112582705942171 is prime 535006138814359 is prime 555554444332213 is prime 578415690713087 is prime 115797848077099 is prime 1099726899285411 is not prime 518463576809201 is prime 777737777777777 is prime 777773333999551 is prime 777777355553753 is prime CPU times: user 15.6 ms, sys: 12 ms, total: 27.6 ms Wall time: 2.07 s
Уже лучше, ~ 2 секунды!
%%time
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_prime = {executor.submit(is_prime, prime): prime for prime in PRIMES}
for future in concurrent.futures.as_completed(future_to_prime):
prime = future_to_prime[future]
prime_bool = future.result()
print(f"{prime} is {'prime' if prime_bool else 'not prime'}")
115280095190773 is prime 112582705942171 is prime 112272535095293 is prime 112272535095293 is prime 518463576809201 is prime 115797848077099 is prime 578415690713087 is prime 535006138814359 is prime 1099726899285411 is not prime 555554444332213 is prime 777773333999551 is prime 777737777777777 is prime 777777355553753 is prime CPU times: user 8.99 s, sys: 57.6 ms, total: 9.05 s Wall time: 9.64 s
Для CPU-bound задач лучше использовать процессы (ProcessPoolExecutor
).
Но почему с пятью одновременно запущенными потоками вышло медленнее, чем при последовательном запуске?
На это есть в Python злосчастный...
GIL - это ограничение интерпретатора Python, позволяющее внутри одного процесса одновременно выполнять только один поток.
Так выполняются три потока одновременно. Источник - Wikipedia Commons.
GIL обеспечивает, чтобы каждый выполняемый поток имел эксклюзивный контроль над всем интерпретатором.
Ah, here lies a tale... При блокирующих операциях (например, sleep
, ожидание I/O, и т.п.) GIL снимается.
Так что каждый раз, когда активный поток вынужден ждать, другие потоки могут выполняться.
В них ОС периодически (~ раз в 100 ticks) проводит проверку, занимает ли еще процесс CPU, или заблокирован I/O.
В этот момент происходит снятие и переназначение GIL, что может привести к переключению между процессами, но занимает совсем немного времени.
Самые большие потери времени возникают на многоядерных машинах: на них после снятия GIL происходит планирование выполнения (scheduling) потоков одновременно на разные ядра. При этом GIL может быть получен только одним потоком.
В этот момент между потоками происходит смертельная битва за GIL.
GIL действует не всегда. Ограничение не распространяется на:
Если код написан на C/C++ и поддерживает такую функцию, то он может выполняться параллельно в потоках.
В NumPy, который мы разберем в следующий раз, уже встроена нативная параллелизация (OpenBLAS), и базовые матричные процессы обычно выполняются настолько быстро, насколько могут. Но большинство других написанных на С/С++ пакетов можно удобно параллелизовать и с помощью потоков.
Как мы уже знаем, все потоки одного процесса имеют одно общее пространство памяти. Процессы в некоторых случаях также могут иметь общую память, но по умолчанию память между ними разделена.
import random
def append_random_int(lst):
number = random.randint(1, 100)
lst.append(number)
Обратите внимание, что функция append_random_int
изменяет переданный в нее объект.
LIST = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for _ in range(20):
executor.submit(append_random_int, LIST)
LIST
[33, 27, 28, 17, 35, 32, 84, 21, 58, 22, 31, 96, 43, 81, 3, 43, 51, 45, 41, 94]
LIST = list()
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
for _ in range(20):
executor.submit(append_random_int, LIST)
LIST
[]
При изменении внутри потока объект изменяется везде. При изменении внутри процесса объект копируется и происходит изменение копии, исходный объект остается неизменным.
concurrent.futures
реализовано 2 класса (и множество работающих с ними функций), предназначенных для удобной параллелизации задач.Несмотря на все плюсы параллелизации, она не всесильна. Если каждый последующий запуск определенного кода использует результат предыдущего запуска, то параллелизация здесь никак не получится. Есть множество других способов ускорить код, например, хэширование, прекомпиляция (numba
) или переписывание логики на другом языке (C/C++).
В большинстве случаев наибольшее количество времени экономит правильный выбор алгоритма выполнения задачи. Этому искусству будет посвящен целый курс в следующем семестре.