Kodomo

Пользователь

Параллельное программирование в питоне

зачем нужно параллельное программирование?

Threads vs. Multi-processing в общем

Нити - выполняются в одном процессе, используют одну и ту же память (все объекты доступны одновременно всем нитям), нужна блокировка объектов и другие способы обеспечения взаимодействия нитей. Требует мало ресурсов для создания. Переключение между исполнением разных нитей быстрее, чем для процессов.

Процессы - разные процессы, каждый со своей памятью, нужен механизм коммуникации процессов. Соответственно, требуют много ресурсов для создания. Переключение между исполнением разных процессов медленнее, чем для нитей.

Реализация и тех, и других зависит от ОС.

балансировка нагрузки на нити/процессы: чем больше задача, тем больше толку от многих нитей, если задача небольшая, одна нить справляется с ней раньше, чем остальные успевают стартовать

threading

Нить - последовательность выполняемых действий. Обычно в программе одна нить, но можно создать несколько нитей, каждая из которых будет выполнять свою последовательность действий. "Нити похожи на процессы, только сильно отличаются." :-)

thread manager содержит таблицу активных нитей и по-очереди дает выполняться каждой нити, так же, как process manager в ОС. Нити могут быть в двух состояниях: run и sleep. Если нить находится в первом состоянии, thread manager может дать ей выполняться, если во втором - нить считается заблокированной и не будет выполняться, пока блок не будет снят.

Нити в питоне:

модули thread, threading, + модуль Queue("It is especially useful in threaded programming when information must be exchanged safely between multiple threads")

модуль thread

low-level, не удобен в использовании

модуль threading

содержит: класс Thread; функции, возвращающие объекты Lock, Condition, Semaphore, Event; функции active_count() и enumerate().

класс Thread

используется двумя способами:

   1 def some_function(foo, bar):
   2         print foo + bar
   3 my_thread = threading.Thread(target=some_function, args=(foo, bar), kwargs={})
   4 my_thread.start()

   1 class MyThread(threading.Thread):
   2         def __init__(self, foo, bar):
   3                 self.foo = foo
   4                 self.bar = bar
   5                 threading.Thread.__init__(self)
   6         def run(self):
   7                 global var
   8                 print self.foo + self.bar + var
   9 var = 10
  10 my_thread = MyThread(foo, bar)
  11 my_thread.start()

методы класса Thread

start() => run()

is_alive() - true если метод run() еще не завершился

join - some_thread.join() блокирует нить, в которой был вызван метод join (она считается alive, но не выполняется), до завершения нити some_thread.

Взаимодействие между нитями

Все объекты доступны всем нитям

Специальные сигнальные объекты Lock, RLock, Condition, Semaphore, Event

Lock - позволяет прервать выполнение нити до тех пор, пока lock'ом владеет другая нить.

   1 class MyThread_1(threading.Thread):
   2     def run():
   3         global lock
   4         lock.acquire()
   5         # do smth that takes 10 seconds
   6         lock.release()
   7 
   8 class MyThread_2(threading.Thread):
   9     def run():
  10         global lock
  11         time.sleep(5)
  12         lock.acquire()
  13         # do smth else
  14         lock.release()
  15 
  16 lock = threading.Lock()
  17 event = threading.Event()
  18 for thread in MyThread_1, MyThread_2:
  19         thread().start()

Начинают выполняться обе нити, но первая сразу захватывает лок и начинает делать долгую операцию, вторая ждет 5 секунд и пытается захватить лок, но он уже захвачен. Вторая нить блокируется до тех пор, пока первая не освободит лок. Таким образом smth else будет сделано через 10 секунд, а не через 5.

RLock - одна нить может несколько раз подряд вызвать метод Rlock.acquire(), тогда, чтобы jcвободить lock, та же нить дложна столько же раз вызвать метод release()

Semaphore - то же, что и lock, но имеет счетчик, то есть несколько нитей могут захватить lock одновременно

Event - имеет методы set(), clear(), wait(), is_set(). Нить вызывает метод wait и блокируется. Другая нить вызывает метод set(), что присваивает некой внутренней переменной значение 1, после этого все нити, которые ждали этого события, пробуждаются и начинают работать. Сбросить значение переменной можно методом clear().

   1 class MyThread_1(threading.Thread):
   2     def run():
   3         global lock, event
   4         lock.acquire()
   5         do smth
   6         event.set()
   7         event.clear()
   8         lock.release()
   9 
  10 class MyThread_2(threading.Thread):
  11     def run():
  12         global lock, event
  13         event.wait()
  14         lock.acquire()
  15         do smth else
  16         lock.release()
  17 
  18 lock = threading.Lock()
  19 event = threading.Event()

Инструкция with

   1 lock = threading.Lock()
   2 lock.acquire()
   3 # lock acquired
   4 # do smth
   5 lock.release()
   6 # lock relesed

то же самое, но с with

   1 lock = threading.Lock()
   2 with lock:
   3     # lock acquired
   4     # do smth
   5     pass
   6 # lock is released after leaving the with block

модуль Queue

три типа объектов, реализующих очереди: queue (FIFO queue), LifoQueue, PriorityQueue. Все три принимают аргумент maxsize.

методы qsize, empty, full, put, put_nowait, get, get_nowait

task_done() - сказать очереди, что предыдущий взятый элемент обработан

join() - заблокироваться до того, как все элементы очереди будут обработаны

GIL

thread manager. Бывает менеджер на уровне ядра, тогда ОС воспринимает нити «как» процессы, бывает user-level, то есть на уровне приложения, тогда ОС знает только о существовании процесса приложения. Во втором случае нити не могут использовать разные процессоры одновременно. В питоне — свой менеджер, надстроенный над менеджером ОС, то есть нити — нити, созданные ОС, но питон может ими управлять. Или как-то так.

GIL — global interpreter lock. Для каждой нити питон порождает нить ОС, но! исполняемая в данный момент нить ОС захватывает lock GIL, который не дает исполняться другим нитям той же питонской программы(на уровне ОС), поэтому когда ОС прерывает нить и хочет передать управление следующей, она не может этого сделать, потому что lock захвачен. Продолжает исполняться первая нить. Только когда она освобождает lock (после выполнения 100 инструкций байт-кода), могут исполняться другие нити (которые точно также захватывают GIL). Результат — нити не исполняются параллельно даже на многопроцессорных системах. GIL защищает от потери данных вследствие одновременного доступа на уровне инструкций байт-кода, но не от логических ошибок!! (Ппц непонятный абзац о_О)

Multiprocessing

модуль multiprocessing стандартной библиотеки и стопицот сторонних решений.

some features may not work in interactive interpreter. API is similar to threading module. Следует всегда иметь ввиду, что это отдельный процесс со своей памятью. Объекты синхронизации необходимо передавать функциям как аргументы, а не использовать global. Методы start, join и is_alive должны всегда вызываться процессом-родителем

Взаимодействие между процессами

Process pool

Можно создать пул процессов и отгружать им задачи (вызовы функций), и делать другие полезные вещи. Удобно для ускорения вычислений.

   1 pool = multiprocessing.Pool(processes=4)
   2 # можно указать сколько процессов использовать, иначе по числу процессоров в системе.
   3 pool.apply(func, (arg), {kwarg:8}) # blocks until the result is ready
   4 pool.apply_async(func, (arg), {kwarg:8}) # does not block
   5 pool.map(func, iterable) # parallel map! blocks until the result is ready
   6 pool.map_async(func, iterable) # parallel map! does not block
   7 pool.imap(func, iterable) # parallel itertools.imap (the difference is when several iterables are passed)
   8 pool.imap_unordered(func, iterable) # the result is unordered

Ко всем методам можно добавить аргумент chunksize, то есть размер части, на которые разбивается задача (по умолчанию 1, лучше ставить больше для длинных iterable).

async методы возвращают объект AsyncResult, у него есть методы get(), wait() - ;ждать, пока результат не будет готов, ready() - готов или нет? successful — готов без ошибок?

Еще между процессами можно устроить аутентификацию.

Интересные ссылки