Nauč se Python > Kurzy > Python a jeho knihovny > AsyncIO > AsyncIO

AsyncIO

Po vláknem a procesech přichází na řadu AsyncIO. AsyncIO je relativně nový způsob, jak v Pythonu psát konkurenční programy. Ano, správně, konkurenční programy — v tomto případě totiž zůstaneme jen s jedním vláknem a jedním procesem, ale i přes to budeme mít relativně hodně možností, jak naši aplikaci zrychlit.

Pojďme ale pěkně popořádku.

Servery typicky při zpracovávání požadavku stráví většinu času síťovou komunikací. Proto se často spouští několik vláken nebo přímo procesů najednou, aby se mohl vytížit procesor. Při velkém množství vláken ale nastanou dva problémy. První je, že vláken nemůže být neomezeně mnoho. Každé vlákno potřebuje vlastní stack, tj. poměrně velkou část paměti; a počet vláken bývá omezen i jinak (na Linuxu je globální limit počtu procesů, do kterého se počítají i jednotlivá vlákna – viz cat /proc/sys/kernel/threads-max). Druhý problém je, že přepnutí z jednoho vlákna do druhého se může stát kdykoli. Ověřit si, že je na to program připravený, je poměrně složité a na zajištění správné funkčnosti je potřeba zamykání či jiné techniky. Ty bývají relativně pomalé, a tak se jim programátoři snaží vyhnout. A chyby vzniklé nesprávným ošetřením přepínání vláken bývají složité na odhalení a vyřešení.

Vlákna jsou příklad preemptivního multitaskingu, kdy operační systém rozhoduje, kdy přepne z jednoho vlákna do druhého, a tuto změnu si prakticky vynutí. Jednotlivá vlákna se s tím musí vyrovnat. Alternativou je kooperativní multitasking, kdy se jednotlivé úlohy umí samy vzdát procesorového času, když např. čekají na síťovou komunikaci. Programátor tak ví, že dokud takto nepředá kontrolu ostatním úlohám, žádná jiná úloha mu pod rukama nemůže měnit stav procesu. Na druhou stranu je ale potřeba dostatečně často kontrolu předávat, aby se všechny úlohy dostaly ke slovu. Tuto techniku tak nemůže používat operační systém, pod kterým můžou běžet i špatně napsané programy. V rámci jednoho procesu se to ale dá s úspěchem využít.

Souběžnost v Pythonu

V Pythonu existovala a existuje řada knihoven, které nám umožňují „dělat více věcí zároveň“. Pro preemptivní multitasking jsou tu threading, tedy podpora pro vlákna, a multiprocessing, tedy způsob jak spustit nový pythonní proces, ve kterém se provede určitá funkce (přičemž vstup a výstup se předává serializovaný přes pipes).

Další knihovna, kterou lze z PyPI nainstalovat, je greenlet. Ta nám dává k dispozici tzv. mikro-vlákna, která se mezi sebou přepínají v rámci jednoho procesu. Na rozdíl od systémových vláken nepotřebují tolik paměti navíc, ale stále jde (alespoň z pohledu programátora) o preemptivní strategii: k přepnutí může dojít kdykoli, je tedy potřeba zamykat a složitě hledat málo časté chyby.

Byly vyvinuty i knihovny pro kooperativní přepínání, založené na tzv. futures. Nejznámější jsou Twisted a Tornado. Obě jsou relativně staré (2002, resp. 2009), ale stále populární.

Ačkoli byly Twisted, Tornado a podobné knihovny užitečné, měly zásadní problém v tom, že každá má jiné API. Vznikaly tak kolem nich ekosystémy vázané na konkrétní knihovnu: server napsaný pro Tornado se nedal použít pod Twisted a aplikace využívající Twisted nemohla využít knihovnu pro Tornado.

Jak to vyřešit?

Jeden standard

xkcd 927

Komiks xkcd, © Randall Munroe, CC-BY-NC

Podobně jako přístup k různým SQL databázím je v Pythonu standardizovaný (knihovny pro SQLite, Postgres, MySQL atd. všechny podporují API definované v PEP 249) nebo je standardizované API webových serverů (WSGI, PEP 3333), tak vzniklo standardizované API pro kooperativní multitasking. Toto API je definováno v PEP 3156 a jeho referenční implementace, asyncio, je ve standardní knihovně Pythonu. (Ne že by to zabránilo vzniku dalších asynchronních knihoven jako Curio a Trio. Ty ovšem spíš experimentují s novými paradigmaty a osvědčené principy se z nich postupně dostávají do asyncio.)

Interně je asyncio postavené na konceptu futures inspirovaných Tornado/Twisted, ale jeho „hlavní“ API je postavené na coroutines podobných generátorům.

Jak vypadá taková asynchronní funkce? Definuje se pomocí async def místo def, a může používat příkaz await.

Ukažme si to na příkladu:

import asyncio

async def count(name, interval):
    """Prints numbers from 0 in regular intervals"""
    i = 0
    while True:
        print(name, 'counts', i)
        await asyncio.sleep(interval)
        i += 1


loop = asyncio.get_event_loop()
loop.run_until_complete(count('Counter', 1))
loop.close()

Co se tu děje? Příkazem await asyncio.sleep(interval) se asynchronní funkce pozastaví (podobně jako generátor při yield) a předá kontrolu knihovně asyncio s informací že za daný čas by kontrolu chtěla zase zpátky. Než daný interval uplyne, asyncio může spouštět jiné úlohy; po jeho uplynutí pozastavenou funkci „probudí“ a její algoritmus pokračuje dál.

Když žádné jiné úlohy neexistují, je pomalé počítání trochu nudné. Zkuste proto spustit počítadla dvě. (Detaily funkce ensure_future a příkazu await task vysvětlíme níže.)

import asyncio

async def count(name, interval):
    ...

async def run_two_counters():
    fast_task = asyncio.ensure_future(count('Fast', 0.3))
    slow_task = asyncio.ensure_future(count('Slow', 1))
    await fast_task
    await slow_task

loop = asyncio.get_event_loop()
loop.run_until_complete(run_two_counters())
loop.close()

Spouštění a ukončení – poslední tři řádky – je poněkud složité na zápis, ale typicky to v každém programu potřebujete napsat jen jednou.

V Pythonu verze 3.7 a vyšší lze ty tři poslední řádky nahradit jednodušším:

asyncio.run(run_two_counters())

V Pythonu verze 3.4 a nižší ještě neexistovala klíčová slova async a await; asynchronní funkce byly implementovány jako generátory. Ve starších verzích Pythonu bylo potřeba místo:

async def ...:
    await ...

psát:

@asyncio.coroutine
def ...:
    yield from ...

Starý způsob zatím funguje i v novějším Pythonu a dokonce se ještě někdy objeví i v dokumentaci. Od Pythonu 3.8 je ale deprecated.

Event Loop

Knihovna asyncio nám dává k dispozici smyčku událostí, která se, podobně jako pyglet.app.run v Pygletu, stará o plánování jednotlivých úloh. Smyček událostí může být více. Tradiční způsob je, že každé vlákno může mít vlastní smyčku událostí, kterou získáme pomocí asyncio.get_event_loop a pak ji můžeme spustit dvěma způsoby:

  • loop.run_forever() spustí smyčku na tak dlouho, dokud jsou nějaké úlohy naplánovány (to trochu odporuje názvu, ale většinou se nestává, že by se úlohy „vyčerpaly“), nebo
  • loop.run_until_complete(task) – tahle funkce skončí hned, jakmile je hotová daná úloha, a vrátí její výsledek.

Nakonec je smyčku potřeba uzavřít (loop.close()), což např. dá použitým knihovnám možnost korektně uzavřít zbylá síťová spojení.

Od Pythonu 3.7 můžete použít asyncio.run(task), což vytvoří novou smyčku událostí, spustí v ní danou úlohu (pomocí run_until_complete) a zase ji zavře. Chcete-li ji použít (a tedy psát kód jen pro Python 3.7+), používejte pak všude místo ensure_future funkci create_task, která vás lépe ochrání před těžko nalezitelnými chybami.

Async funkce a Task

Smyčka událostí provádí úlohy a asynchronní funkce.

Asynchronní funkce se definují pomocí async def a umožňují použít příkaz (nebo operátor) await, kterým se provádění funkce pozastaví a kontrola se předá jiným úlohám do doby, než nastane nějaká událost (např. uplynul časový intervaluj, jsou dostupná nová data ze socketu…).

Pozastavení funguje podobně jako yield u generátoru.

Zavoláním asynchronní funkce dostaneme coroutine pozastavenou na začátku těla funkce:

>>> async def demo():
...     print('Demo')
...
>>> coroutine = demo()
>>> coroutine
<coroutine object demo at 0x7fda8be22b90>

Naplánujeme-li provádění coroutine na smyčce událostí (např. pomocí run_until_complete), tělo funkce se začne vykonávat:

>>> loop = asyncio.get_event_loop()
>>> result = loop.run_until_complete(coroutine)
Demo

V rámci jedné coroutine pak lze provedení jiné coroutine naplánovat a počkat na jejich skončení pomocí await. Jak run_until_complete tak await nám dají k dispozici návratovou hodnotu příslušné asynchronní funkce.

import asyncio

async def add(a, b):
    await asyncio.sleep(1)  # schedule a "sleep" and wait for it to finish
    return a + b

async def demo():
    coroutine = add(2, 3)
    result = await coroutine  # schedule "add" and wait for it to finish
    print('The result is:', result)

loop = asyncio.get_event_loop()
result = loop.run_until_complete(demo())
loop.close()

Nevýhoda čistých coroutines spočívá v tom, že na každé zavolání takové funkce lze použít jen jeden await. Výsledek se nikam neukládá, jen se po skončení jednou předá. Druhý await pro stejné zavolání asynchronní funkce skončí s chybou. Zkuste si to – v kódu výše přidejte další řádek s await coroutine:

    print('The result is:', (await coroutine))

Tenhle problém můžeme vyřešit tak, že asynchronní funkci „zabalíme“ jako úlohu, Task. V Pythonu 3.7 se Task tvoří pomocí asyncio.create_task; pro kompatibilitu se staršími verzemi ale použijeme ekvivalentní asyncio.ensure_future. Task se chová stejně jako coroutine – lze použít v await nebo run_until_complete, ale navíc:

  • výsledek je k dispozici kdykoli po ukončení funkce (např. pro druhý await) a
  • úloha se naplánuje hned po zavolání ensure_future.

Druhou vlastnost je lepší ukázat na příkladu:

import asyncio

async def print_and_wait():
    print('Async function starting')
    await asyncio.sleep(0.5)
    print('Async function done')
    return 'result'

async def demo_coro():
    coroutine = print_and_wait()
    await asyncio.sleep(1)
    print('Awaiting coroutine')
    print(await coroutine)     # schedule coroutine and wait for it to finish

async def demo_task():
    task = asyncio.ensure_future(print_and_wait())  # schedule the task
    await asyncio.sleep(1)
    print('Awaiting task')
    print(await task)  # task is finished at this point; retreive its result


loop = asyncio.get_event_loop()
print('Coroutine:')
result = loop.run_until_complete(demo_coro())
print('Task:')
result = loop.run_until_complete(demo_task())
loop.close()

Fan-Out a Fan-In

S pomocí asynchronních funkcí můžeme nad našimi programy přemýšlet tak, jako by to byly „normální“ procedurálně zapsané algoritmy: máme jedno „vlákno“, které se provádí od začátku do konce, jen na některých místech (označených await) se provádění přeruší a zatímco náš kód čeká na výsledek nějaké operace, může se spustit jiný kus kódu. Funkce, na které je takto potřeba čekat, bývají v dokumentaci patřičně označeny (v síťovém programování je to většinou čtení ze socketů nebo inicializace či ukončení serveru).

Pomocí ensure_future a await můžeme k tomu dělat něco navíc: rozdělit běh našeho programu na víc úloh, které se budou vykonávat „souběžně“ – například autor scraperu chce stáhnout několik stránek najednou nebo server souběžně odpovídá na několik požadavků. Tomuto rozdělení se říká fan-out.

Opačná operace je fan-in, kdy několik úloh opět spojíme do jedné. Výše uvedený scraper může počkat, než jsou všechny stránky stažené – třeba pomocí jednoho await pro každý Task nebo asynchronní funkce gather. Poté může pokračovat zpracováním získaných dat.

Počkáním na úlohu (pomocí await, gather, run_until_complete atp.) získáte její návratovou hodnotu. Ale na všechny úlohy, i na ty, které nic zajímavého nevrací, je důležité počkat. Neuděláte-li to, bude asyncio vypisovat varovné hlášky:

>>> import asyncio
>>> asyncio.sleep(1)  # no await
>>> exit()
sys:1: RuntimeWarning: coroutine 'sleep' was never awaited

Toto varování nikdy neignorujte. Kdyby váš program nedělal co má, spolu s varováním byste ignorovali výjimky v této coroutine.

Asynchronní cykly a kontexty

Až budete používat některé „asynchronní“ knihovny, setkáte se pravděpodobně se dvěma novými konstrukcemi: async for a async with.

Fungují jako jejich „ne-async“ varianty, jen na začátku a konci každé iterace (resp. na začátku a konci bloku) můžou přerušit vykonávání funkce – podobně jako await.

Typický příklad je u databází: začátek a konec transakce i získávání jednotlivých řádků pravděpodobně potřebují komunikaci po síti, takže hypotetická databázová knihovna by se mohla používat nějak takto:

async with database.transaction_context():
    await database.execute('UPDATE ...')
    async for row in (await database.execute('SELECT ...')):
        handle(row)

Praktická ukázka s aiohttp

Výše popsané principy si ukážeme na praktickém příkladu. Knihovna aiohttp je navržena tak, aby nám umožnila komunikaci přes HTTP protokol s využitím všech možností asyncio a my ji porovnáme se známou knihovnou requests.

Zde je k dispozici hezká sbírka knihoven implementující asynchronní komunikaci pro mnoho různých protokolů a databází.

Mějme jednoduchý program, který provede celkem 10 HTTP požadavků, jejichž vyřízení bude trvat různě dlouhou dobu. Server httpbin.org je k takovým hrátkám ideální, protože nám umožňuje do značné míry ovlivnit jeho chování.

import requests
from random import shuffle

seconds = [1] * 3 + [2] * 3 + [3] * 3 + [4]
shuffle(seconds)

def fetch(session, index, delay):
    response = session.get(f"https://httpbin.org/delay/{delay}")
    html = response.text
    print(index, delay, response.status_code)

def main():
    with requests.Session() as session:
        for index, delay in enumerate(seconds):
            fetch(session, index, delay)

main()

Z celkem deseti požadavků budou po třech trvat jednu, dvě nebo tři vteřiny a poslední pak zabere vteřiny čtyři. Hlavní funkce nám vytvoří jednu společnou Session a zavolá desetkrát funkci fetch, která se postará o samotný HTTP požadavek a výpis informací. Výsledek je následující:

0 3 200
1 1 200
2 1 200
3 2 200
4 3 200
5 1 200
6 4 200
7 3 200
8 2 200
9 2 200

I když některé požadavky trvají jen jednu vteřinu, provedou se všechny sekvenčně bez ohledu na svou časovou náročnost. Celkový čas čekání na odpovědi testovacího serveru je 22 s a celkový běh ukázkového programu je pak ještě o nějakou tu vteřinu delší.

Následuje implementace stejného problému s využitím asyncio a aiohttp:

import aiohttp
import asyncio
from random import shuffle

seconds = [1] * 3 + [2] * 3 + [3] * 3 + [4]
shuffle(seconds)

async def fetch(session, index, delay):
    async with session.get(f"https://httpbin.org/delay/{delay}") as response:
        html = await response.text()
        print(index, delay, response.status)

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = []
        for index, delay in enumerate(seconds):
            tasks.append(asyncio.create_task(fetch(session, index, delay)))
        await asyncio.gather(*tasks)

asyncio.run(main())

Základ je stejný, jen je tam navíc trochu async a await. Opět máme jednu ClientSession pro všechny požadavky a jednu hlavní asynchronní funkci, která se stará o všechny ostatní. V tomto příkladu nám funkce main naplánuje jednotlivé požadavky jako samostatné úlohy a na konci počká na jejich dokončení.

Jednotlivé async with nám zajistí nejen to, že provádění požadavků je asynchronní a tedy je možné v tomto okamžiku předat kontrolu jiné úloze, ale také to, že bude vše po provedení požadavku resp. ukončení práce s ClientSession správně ukončeno.

Zajímavý je i rozdíl v získávání obsahu odpovědi od serveru. requests v rámci volání metody get provede stažení hlaviček i obsahu odpovědi a její dekódování — vše najednou. aiohttp stáhne v rámci volání get jen hlavičky odpovědi od serveru a stažení obsahu odpovědi je zcela samostatná asynchronní operace.

Výstup asynchronního programu je následující:

3 1 200
0 1 200
7 1 200
2 2 200
9 2 200
1 2 200
4 3 200
8 3 200
5 3 200
6 4 200

Jednotlivé požadavky jsou samostatné úlohy a tak je jejich zpracování závislé jen na rychlosti odpovědi od serveru. Celkem se v jednotlivých požadavcích dohromady čekalo na odpovědi od serveru stejných 22 vteřin, ale celkový běh programu skončil za méně než 5 vteřin.

Obsáhlejší popis a další příklady naleznete samozřejmě v dokumentaci.

A další

Nakonec několik tipů, o kterých je dobré vědět.

V asyncio najdeme synchronizační mechanismy známé z vláknového programování, např. Lock a Semaphore – viz dokumentace.

Musíme-li použít blokující funkci, která např. komunikuje po síti bez await a která by tedy zablokovala i všechny ostatní úlohy, můžeme použít loop.run_in_executor(), a tím danou funkci zavolat ve vlákně nebo podprocesu, ale výsledek zpřístupnit jako objekt, na který lze počkat pomocí await. Použití je opět popsáno v dokumentaci.

Občas vás při programování s asyncio zaskočí zrádná chyba. V takových případech je dobré zapnout debug režim pomocí proměnné prostředí PYTHONASYNCIODEBUG=1. V tomto režimu asyncio upozorňuje na časté chyby, do některých chybových výpisů přidává informaci o tom, kde aktuální Task vznikl, apod. Více informací je zase v dokumentaci.

Alternativní smyčky událostí

Jak bylo zmíněno na začátku, hlavní cíl asyncio je definovat společné rozhraní pro různé asynchronní knihovny, aby bylo možné např. kombinovat knihovny pro Tornado se smyčkou událostí v Twisted. Samotné asyncio je jen jedna z mnoha implementací tohoto rozhraní. Zajímavá je například knihovna [uvloop], která je asi 2-4× rychlejší než asyncio (ale má závislosti, které se pro součást standardní knihovny nehodí).

Další zajímavá implementace je [Quamash], která pod standardním asyncio API používá smyčku událostí z Qt. Umožňuje tak efektivně zpracovávat Qt události zároveň s asynchronními funkcemi známými z asyncio.


Toto je stránka lekce z kurzu, který probíhá nebo proběhl naživo s instruktorem.