Dnes se ponoříme hlouběji do paralelního programování a ukážeme si hned několik způsobů, jak lze souběžně vykonávané úlohy řídit a tím předcházet celkem složitě odhalitelným problémům.
Deadlock je situace, kdy dokončení jedné akce závisí na dokončení druhé akce, ale druhá akce zase naopak závisí na té první. Takové úlohy na sebe čekají nekonečně dlouho a dojde k tzv. deadlocku.
Typickým příkladem deadlocku z reálného světa je situace, kdy v dopravní špičce vjede do křižovatky spousta aut, aniž by ji mohli plynule projet. V takovém případě se celá křižovatka zasekne a pokud někdo nevzal rozum do hrsti a nevycouval na chvíli ven, stojí tam v deadlocku dodnes.
Vyhladovění je situace, kdy vlákno či proces čeká na nějaké prostředky, které potřebuje k úspěšnému splnění úlohy, ale protože tyto prostředky jsou neustále obsazené, nedočká se jich a program nikdy neskončí.
Představte si sami sebe stojící na křižovatce se stopkou. Pokud bude opravdu hustý provoz a nikdo vás nepustí, dojde k vyhladovění (a to i doslova :D), protože se nikdy nedostanete na hlavní cestu, která je neustále plně vytížená.
Souběh je situace, kdy více vláken najednou přistoupí ke sdíleným proměnným nebo zdrojům. Jednotlivá vlákna si pak mohou navzájem měnit data, což povede k nekonzistentním výsledkům.
Pečete sušenky pro své kamarády a při ukládání do misky je počítáte, aby se dostalo na každého. V nestřeženém okamžiku vám je ale někdo v průběhu počítání začne ujídat. Počet upečených sušenek sice sedí a miska se zdá plná, ale minimálně jeden kamarád bude bez sušenky.
Všechny výše zmíněné problémy se velmi těžko odhalují, protože se mimo jiné mohou objevit jen občas a za určitých specifických podmínek. Proto je daleko lepší jejich vzniku předcházet tak, že se naučíme vlákna a procesy řídit.
Nejdříve si na velmi jednoduchém příkladu ukážeme, jak to vypadá, když vznikne souběh. Pro přehlednost budeme všechna vlákna implementovat jako vlastní třídu, která dědí ze třídy Thread.
from threading import Thread
class Worker(Thread):
def __init__(self):
super().__init__()
def run(self):
for _ in range(10):
print('_', end='')
print('|', end='')
Implementace vlastního vlákna je velmi jednoduchá a není o moc delší než použití funkce. Hlavní výhodou je přehlednost a možnost funkcionalitu vlákna rozdělit do více metod.
Minimální vlákno musí implementovat metodu __init__
, která nám jej pomůže vytvořit, a metodu run
, která se zavolá při jeho spuštění.
Každé naše vlákno vypíše vedle sebe desetkrát podtržítko a svislítko a pak se ukončí. Představme si pro tentokrát, že je tohle naše kritická část aplikace a my potřebujeme, aby se tyto dvě operace (výpisy) provedly vždy najednou a nestalo se mezi nimi nic jiného.
Funguje to?
threads = []
for _ in range(100):
t = Worker()
t.start()
threads.append(t)
for t in threads:
t.join()
Nefunguje. I když pro menší množství vláken by se mohlo zdát, že je vše v naprostém pořádku a problém by nám zůstal skryt. Na výpisu je vidět, že se někdy operace více vláken pomíchají a znaky se vypíšou v jiném než žádaném pořadí. A co kdyby se něco podobného stalo při práci s databází?
S řešením přichází zámek.
Zámek je speciální objekt, který se určen ke sdílení mezi vlákny a který zajistí, že se pro danou sekci bude vykonávat vždy jen jedno vlákno. Volání lock.acquire()
nám zajistí zamknutí zámku a lock.release()
nám jej zase odemkne. Když bude chtít vlákno uzamknout zámek, který už uzamčený je, bude v tu chvíli zablokováno, dokud zámek nebude v jiném vláknu uvolněn.
Zámek se používá pro ohraničení nějaké kritické sekce, kterou nesmí v jednu chvíli vykonávat více vláken. Tato sekce by ale měla být co nejmenší, aby ostatní vlákna mohla dělat práci okolo a nemusela čekat na odemknutí zámku.
from threading import Thread
class Worker(Thread):
def __init__(self, lock):
Thread.__init__(self)
self.lock = lock
def run(self):
for _ in range(10):
self.lock.acquire()
print('_', end='')
print('|', end='')
self.lock.release()
Zámek se předává jako proměnná při vytváření vlákna a v rámci jeho běhu se popsaným způsobem použije.
from threading import Lock
threads = []
lock = Lock()
for _ in range(100):
t = Worker(lock)
t.start()
threads.append(t)
for t in threads:
t.join()
Zámek se pro všechna vlákna vytvoří jen jeden a předá se jim jako argument.
Na výsledku je vidět, že naše zámkem ohraničená kritická sekce se nyní opravdu vykonala vždy celá, aniž by byla přerušena jiným vláknem.
Co když se stane, že se v rámci jednoho vlákna zavolá vícekrát lock.acquire()
? Problém je na světě, protože zámek je uzamknut, další volání lock.acquire()
je tím pádem blokující a najednou je jediné vlákno, které bylo schopno zámek uvolnit zaseknuté v čekání samo na sebe.
Tento problém řeší Rlock
. Rlock
je speciální případ zámku, který je možné si v rámci jednoho vlákna uzamknout vícekrát. Je si jen třeba dát pozor na to, že jej v rámci toho samého vlákna musíme i odemknout tolikrát, kolikrát byl uzamčen.
Zámek je dobrý na ohraničení kritické sekce, ve které se chceme vyhnout souběhu. Dalším palčivým problémem je distribuce práce mezi jednotlivá vlákna tak, aby každý díl práce zpracovalo jen jedno vlákno a nelezly si do zelí. Od toho je tady fronta.
Fronta je komunikační prostředek, do kterého je možné dávat objekty ke zpracování a z druhé strany je vybírat. Navíc je vkládání i vybírání automaticky uzamčeno, takže si vlákna nelezou do zelí, a v neposlední řadě obsahuje informaci, zda byl daný záznam ve frontě již zpracován, což umožňuje snadno počkat do chvíle, kdy bude vše hotovo.
Pojďme si to ukázat na klasickém příkladu producer/consumer, ve kterém jedna skupina vláken generuje práci a ukládá ji do fronty a druhá skupina ji z fronty vybírá a zpracovává.
from threading import Thread
from random import randint, uniform
from time import sleep
class Producer(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
for _ in range(5):
integer = randint(100, 1000)
self.queue.put(integer)
print(f'Producer({self.name}): {integer} → Q')
class Consumer(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
integer = self.queue.get()
print(f'Consumer({self.name}): {integer} ← Q')
sleep(uniform(0.2, 1.0))
self.queue.task_done()
Obě vlákna dostávají frontu jako prostředek ke komunikaci. Producer do ní vkládá náhodná čísla a Consumer je vybírá ven a přitom o tom obě vlákna informují. Pokud není co z fronty vybrat, je volání metody get
blokující a vlákno se zastaví a čeká na data ke zpracování. Consumer navíc po zpracování čísla a výpisu informace dá vědět pomocí task_done()
, že je s danou úlohou hotov.
from queue import Queue
queue = Queue()
for _ in range(2):
t = Producer(queue)
t.start()
t = Consumer(queue)
t.start()
queue.join()
Na výstupu je vidět, jak to celé probíhá. Vlákna rozlišujeme automaticky generovaným jménem. Navíc, jelikož pracujeme s frontou úloh, nemusíme čekat na konec všech vláken, ale stačí nám počkat, až bude vše ve frontě vyřízeno.
Fronta se díky své ochraně při zápisu i čtení dá využít i jako prostředek pro sběr výsledků z vláken.
Nejpoužívanější fronta je typu FIFO (first in, first out) a tedy první vložený prvek je také jako první z fronty vytažen ven. V Pythonu máme k dispozici ještě zásobník neboli LifoQueue
frontu (last in, first out), což funguje stejně jako zásobník v pistoli, kde poslední vložený náboj letí jako první ven, a také prioritní frontu PriorityQueue
, kde můžeme jednotlivým úlohám zadat při vkládání prioritu a podlé ní je pak dostávají vlákna ke zpracování.
Všem těmto frontám můžeme také zadat maximální velikost, což způsobí, že vlákno, které se bude snažit něco vložit do fronty metodou put
, i když ta už bude plná, bude zastaveno, dokud se ve frontě neuvolní místo.
Zámek je fajn, pokud se chceme omezit jen na jedno běžící vlákno. Co když ale chceme limitovat počet vláken, ale přes to nechat fungovat více než jen jedno? Na to je tady Semaphore
.
Semaphore
je vlastně jednoduchý čítač, který má na začátku nějakou hodnotu. S každým uzamčením semaforu se tato hodnota sníží o jedničku. Když je semafor na nule, je jeho další uzamčení zablokováno a vlákno si musí počkat až jej jiné vlákno odemkne, čímž se počítadlo zase o jedničku zvýší.
Tím si můžeme zajistit, že se do určité kritické sekce našeho programu nedostane najednou více než určitý počet vláken daný počáteční hodnotou semaforu.
Implementace je zde téměř totožná jako v předchozích případech, jen místo fronty či zámku dostávají vlákna sdílený semafor. Na práci pak mají jen vypisovat informace o svém běhu a náhodný čas počkat mezi iteracemi.
from threading import Thread
from time import sleep
from random import uniform
class Worker(Thread):
def __init__(self, semaphore):
Thread.__init__(self)
self.semaphore = semaphore
def run(self):
print(f'{self.name} starting')
self.semaphore.acquire()
for _ in range(4):
print(f'{self.name} is working')
sleep(uniform(0.1, 0.5))
self.semaphore.release()
print(f'{self.name} is done')
Nastavením semaforu limitujeme počet pracujících vláken v naší kritické sekci na dvě, i když už od začátku máme nastartovaných všech šest.
from threading import Semaphore
threads = []
semaphore = Semaphore(2)
for _ in range(6):
t = Worker(semaphore)
t.start()
threads.append(t)
for t in threads:
t.join()
Dnes jsme si ještě měli povídat i o procesech přeci!
Ano, měli, ale není to potřeba, protože vše, co jsme si dnes vysvětlili, funguje velmi podobně i pro procesy. Je to podobná situace jako minule, kdy jsme mohli v implementaci snadno vyměnit proces za vlákno a vše fungovalo vesele dál. Jednotlivé mechanismy jsou samozřejmě pro procesy implementovány jinak, ale co se použití týče, jsou rozdílné jen minimálně.
multiprocessing.Queue
například neoperuje jako v případě vláken nad sdílenou pamětí, ale používá jednosměrnou komunikační rouru směrem od rodičovského procesu k potomkům. Kvůli tomu nemohou procesy označit úlohu z fronty za vyřešenou a jejich ukončení je třeba řešit jinak.
Zatímco u vláken v jednom procesu je Python svým pánem a operuje nad jedním kusem sdílené paměti, která procesu náleží, u řízení více procesů a komunikace mezi nimi je obecně podstatně větší závislost na tom, jak jsou jednotlivé mechanismy implementovány na úrovni operačního systému.