Following system colour scheme - Python 增強提案 Selected dark colour scheme - Python 增強提案 Selected light colour scheme - Python 增強提案

Python 增強提案 (Python Enhancement Proposals)

PEP 525 – 非同步產生器

作者:
Yury Selivanov <yury at edgedb.com>
討論於:
Python-Dev 列表
狀態:
最終 (Final)
類型:
標準軌跡 (Standards Track)
建立日期:
2016年7月28日
Python 版本:
3.6
公告歷史:
2016年8月2日、2016年8月23日、2016年9月1日、2016年9月6日

目錄

摘要

PEP 492 為 Python 3.5 引入了對原生協同程式(coroutines)以及 async/await 語法的支援。本文提議透過增加對非同步產生器(asynchronous generators)的支援來擴充 Python 的非同步能力。

原理與目標

一般產生器(於 PEP 255 中引入)提供了一種編寫複雜資料產生器的優雅方式,並使其表現得像一個迭代器。

然而,目前對於非同步迭代協定async for)尚無對應的概念。這使得編寫非同步資料產生器變得過於複雜,因為開發者必須定義一個實作了 __aiter____anext__ 的類別,才能在 async for 陳述式中使用它。

本質上,PEP 255 的目標與基本原理,在應用於非同步執行時,同樣適用於本提案。

效能是本提案的另一個重點:在我們對參考實作的測試中,非同步產生器的速度比實作為非同步迭代器的對等程式碼快了 2 倍

為了說明程式碼品質的提升,考慮以下這個類別,它會在迭代時以給定的延遲印出數字

class Ticker:
    """Yield numbers from 0 to `to` every `delay` seconds."""

    def __init__(self, delay, to):
        self.delay = delay
        self.i = 0
        self.to = to

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= self.to:
            raise StopAsyncIteration
        self.i += 1
        if i:
            await asyncio.sleep(self.delay)
        return i

相同的邏輯可以實作為一個更簡單的非同步產生器

async def ticker(delay, to):
    """Yield numbers from 0 to `to` every `delay` seconds."""
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

規範

本提案向 Python 引入了非同步產生器的概念。

本規範預設讀者具備 Python 中產生器與協同程式的實作知識(PEP 342, PEP 380PEP 492)。

非同步生成器 (Asynchronous Generators)

Python 產生器是指任何包含一個或多個 yield 運算式的函式

def func():            # a function
    return

def genfunc():         # a generator function
    yield

我們建議使用相同的方法來定義非同步產生器

async def coro():      # a coroutine function
    await smth()

async def asyncgen():  # an asynchronous generator function
    await smth()
    yield 42

呼叫非同步產生器函式的結果是一個非同步產生器物件,它實作了 PEP 492 中定義的非同步迭代協定。

在非同步產生器中使用非空的 return 陳述式會引發 SyntaxError

對非同步迭代協定的支援

該協定要求實作兩個特殊方法

  1. 一個傳回非同步迭代器__aiter__ 方法。
  2. 一個傳回可等待(awaitable)物件的 __anext__ 方法,該物件使用 StopIteration 例外來「yield」值,並使用 StopAsyncIteration 例外來標示迭代結束。

非同步產生器定義了這兩個方法。讓我們手動迭代一個簡單的非同步產生器

async def genfunc():
    yield 1
    yield 2

gen = genfunc()

assert gen.__aiter__() is gen

assert await gen.__anext__() == 1
assert await gen.__anext__() == 2

await gen.__anext__()  # This line will raise StopAsyncIteration.

終止處理

PEP 492 要求使用事件迴圈或排程器來執行協同程式。由於非同步產生器旨在於協同程式中使用,它們同樣需要事件迴圈來執行與完成(finalize)它們。

非同步產生器可以擁有 try..finally 區塊以及 async with。提供保證非常重要,即使產生器在部分迭代後被垃圾回收,它們也能夠安全地完成清理。例如

async def square_series(con, to):
    async with con.transaction():
        cursor = con.cursor(
            'SELECT generate_series(0, $1) AS i', to)
        async for row in cursor:
            yield row['i'] ** 2

async for i in square_series(con, 1000):
    if i == 100:
        break

上述程式碼定義了一個非同步產生器,它使用 async with 來在交易中迭代資料庫游標。接著該產生器透過 async for 進行迭代,該迴圈會在某個點中斷迭代。

square_series() 產生器隨後會被垃圾回收;如果沒有一種機制來非同步地關閉產生器,Python 直譯器將無法執行任何後續清理動作。

為了解決這個問題,我們建議執行以下操作

  1. 在非同步產生器上實作一個 aclose 方法,傳回一個特殊的可等待物件。當被等待(awaited)時,它會將 GeneratorExit 丟入暫停的產生器,並對其進行迭代,直到出現 GeneratorExitStopAsyncIteration

    這與 close() 方法對一般 Python 產生器的作用非常相似,差別在於執行 aclose() 需要事件迴圈。

  2. 當非同步產生器在其 finally 區塊中執行 yield 運算式時,引發 RuntimeError(使用 await 則是允許的)。
    async def gen():
        try:
            yield
        finally:
            await asyncio.sleep(1)   # Can use 'await'.
    
            yield                    # Cannot use 'yield',
                                     # this line will trigger a
                                     # RuntimeError.
    
  3. sys 模組中增加兩個新方法:set_asyncgen_hooks()get_asyncgen_hooks()

sys.set_asyncgen_hooks() 背後的構想是允許事件迴圈攔截非同步產生器的迭代與清理,這樣最終使用者就不需要擔心清理問題,一切都能順利運作。

sys.set_asyncgen_hooks() 接受兩個引數

  • firstiter:一個可呼叫物件,當非同步產生器第一次被迭代時會被呼叫。
  • finalizer:一個可呼叫物件,當非同步產生器即將被垃圾回收(GC)時會被呼叫。

當非同步產生器第一次被迭代時,它會儲存對當前 finalizer 的參照。

當非同步產生器即將被垃圾回收時,它會呼叫其快取的 finalizer。假設該 finalizer 會向迭代開始時啟用的迴圈排程一個 aclose() 呼叫。

例如,這裡是 asyncio 如何被修改以允許安全清理非同步產生器

# asyncio/base_events.py

class BaseEventLoop:

    def run_forever(self):
        ...
        old_hooks = sys.get_asyncgen_hooks()
        sys.set_asyncgen_hooks(finalizer=self._finalize_asyncgen)
        try:
            ...
        finally:
            sys.set_asyncgen_hooks(*old_hooks)
            ...

    def _finalize_asyncgen(self, gen):
        self.create_task(gen.aclose())

第二個引數 firstiter 允許事件迴圈維護一個在其控制下實例化的非同步產生器的弱參照集合(weak set)。這使得實作「關機」機制成為可能,從而安全地完成所有開啟的產生器並關閉事件迴圈。

sys.set_asyncgen_hooks() 是執行緒特定的,因此在並行執行緒中執行的多個事件迴圈可以安全地使用它。

sys.get_asyncgen_hooks() 傳回一個類似 namedtuple 的結構,包含 firstiterfinalizer 欄位。

asyncio

asyncio 事件迴圈將使用 sys.set_asyncgen_hooks() API 來維護所有已排程非同步產生器的弱參照集合,並在產生器需要被 GC 時排程它們的 aclose() 協同程式方法。

為了確保 asyncio 程式能夠可靠地完成所有已排程的非同步產生器,我們提議增加一個新的事件迴圈協同程式方法 loop.shutdown_asyncgens()。該方法將排程所有目前開啟的非同步產生器,以 aclose() 呼叫進行關閉。

在呼叫 loop.shutdown_asyncgens() 方法後,若有新的非同步產生器第一次被迭代,事件迴圈將發出警告。這背後的考量是,在請求關閉所有非同步產生器後,程式不應再執行會迭代新產生器的程式碼。

一個如何使用 shutdown_asyncgens 協同程式的範例

try:
    loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

非同步產生器物件

該物件是仿照標準 Python 產生器物件所設計。本質上,非同步產生器的行為旨在複製同步產生器的行為,唯一的差別在於其 API 是非同步的。

定義了以下方法與屬性

  1. agen.__aiter__():傳回 agen
  2. agen.__anext__():傳回一個可等待物件,當被等待時執行一次非同步產生器迭代。
  3. agen.asend(val):傳回一個可等待物件,將 val 物件推入 agen 產生器。當 agen 尚未被迭代時,val 必須為 None

    範例

    async def gen():
        await asyncio.sleep(0.1)
        v = yield 42
        print(v)
        await asyncio.sleep(0.2)
    
    g = gen()
    
    await g.asend(None)      # Will return 42 after sleeping
                             # for 0.1 seconds.
    
    await g.asend('hello')   # Will print 'hello' and
                             # raise StopAsyncIteration
                             # (after sleeping for 0.2 seconds.)
    
  4. agen.athrow(typ, [val, [tb]]):傳回一個可等待物件,將一個例外丟入 agen 產生器。

    範例

    async def gen():
        try:
            await asyncio.sleep(0.1)
            yield 'hello'
        except ZeroDivisionError:
            await asyncio.sleep(0.2)
            yield 'world'
    
    g = gen()
    v = await g.asend(None)
    print(v)                # Will print 'hello' after
                            # sleeping for 0.1 seconds.
    
    v = await g.athrow(ZeroDivisionError)
    print(v)                # Will print 'world' after
                            # sleeping 0.2 seconds.
    
  5. agen.aclose():傳回一個可等待物件,將一個 GeneratorExit 例外丟入產生器。該可等待物件可以傳回一個 yielded 的值(如果 agen 處理了該例外),或者 agen 將被關閉,例外將傳遞回呼叫者。
  6. agen.__name__agen.__qualname__:可讀寫的名稱與限定名稱屬性。
  7. agen.ag_awaitagen 目前正在等待的物件,若無則為 None。這類似於現有的產生器屬性 gi_yieldfrom 與協同程式屬性 cr_await
  8. agen.ag_frameagen.ag_runningagen.ag_code:定義方式與標準產生器的對應屬性相同。

StopIterationStopAsyncIteration 不會從非同步產生器中傳遞出去,而是會被替換為 RuntimeError

實作細節

非同步產生器物件(PyAsyncGenObject)與 PyGenObject 共享結構佈局。此外,參考實作引入了三個新物件

  1. PyAsyncGenASend:實作了 __anext__asend() 方法的可等待物件。
  2. PyAsyncGenAThrow:實作了 athrow()aclose() 方法的可等待物件。
  3. _PyAsyncGenWrappedValue:每個從非同步產生器直接 yield 出來的物件都會被隱式封裝(boxed)到此結構中。這是產生器實作區分「使用一般迭代協定 yield 的物件」與「使用非同步迭代協定 yield 的物件」的方式。

PyAsyncGenASendPyAsyncGenAThrow 是可等待物件(它們有傳回 self__await__ 方法)且是類似協同程式的物件(實作了 __iter____next__send()throw() 方法)。本質上,它們控制非同步產生器是如何被迭代的。

../_images/pep-0525-1.png

PyAsyncGenASend 與 PyAsyncGenAThrow

PyAsyncGenASend 是一個類似協同程式的物件,它驅動 __anext__asend() 方法,並實作了非同步迭代協定。

agen.asend(val)agen.__anext__() 傳回 PyAsyncGenASend 的實例(該實例保持對父物件 agen 的參照)。

資料流定義如下

  1. PyAsyncGenASend.send(val) 第一次被呼叫時,val 會被推入父物件 agen(使用 PyGenObject 的現有功能)。

    後續對 PyAsyncGenASend 物件的迭代,會將 None 推入 agen

    當一個 _PyAsyncGenWrappedValue 物件被 yield 出來時,它會被解封(unboxed),並引發一個帶有解封值的 StopIteration 例外。

  2. PyAsyncGenASend.throw(*exc) 第一次被呼叫時,*exc 會被丟入父物件 agen

    後續對 PyAsyncGenASend 物件的迭代,會將 None 推入 agen

    當一個 _PyAsyncGenWrappedValue 物件被 yield 出來時,它會被解封(unboxed),並引發一個帶有解封值的 StopIteration 例外。

  3. 非同步產生器中的 return 陳述式會引發 StopAsyncIteration 例外,該例外會透過 PyAsyncGenASend.send()PyAsyncGenASend.throw() 方法傳遞。

PyAsyncGenAThrowPyAsyncGenASend 非常相似。唯一的差別在於 PyAsyncGenAThrow.send() 在第一次呼叫時,會將一個例外丟入父物件 agen(而不是推入一個值)。

新的標準函式庫函式與型別

  1. types.AsyncGeneratorType – 非同步產生器物件的型別。
  2. sys.set_asyncgen_hooks()sys.get_asyncgen_hooks() 方法,用於設定事件迴圈中的非同步產生器 finalizer 與迭代攔截器。
  3. inspect.isasyncgen()inspect.isasyncgenfunction() 內省函式。
  4. asyncio 事件迴圈的新方法:loop.shutdown_asyncgens()
  5. 新的 collections.abc.AsyncGenerator 抽象基底類別。

回溯相容性

本提案完全向後相容。

在 Python 3.5 中,定義一個內部含有 yield 運算式的 async def 函式是 SyntaxError,因此在 3.6 引入非同步產生器是安全的。

效能

一般產生器

對一般產生器沒有效能損耗。以下微基準測試在 CPython 上,有無非同步產生器時的執行速度相同

def gen():
    i = 0
    while i < 100000000:
        yield i
        i += 1

list(gen())

相對於非同步迭代器的改進

以下微基準測試顯示,非同步產生器的速度比以純 Python 實作的非同步迭代器快約 2.3 倍

N = 10 ** 7

async def agen():
    for i in range(N):
        yield i

class AIter:
    def __init__(self):
        self.i = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= N:
            raise StopAsyncIteration
        self.i += 1
        return i

設計考量

aiter()anext() 內建函式

最初,PEP 492__aiter__ 定義為一個應傳回可等待物件的方法,進而產生一個非同步迭代器。

然而,在 CPython 3.5.2 中,__aiter__ 被重新定義為直接傳回非同步迭代器。為了避免破壞向後相容性,決定 Python 3.6 將同時支援兩種方式:__aiter__ 仍然可以傳回一個可等待物件,但會發出 DeprecationWarning

由於 Python 3.6 中 __aiter__ 的這種雙重性質,我們無法增加同步版本的 aiter() 內建函式。因此,提議等到 Python 3.7 再進行。

非同步列表/字典/集合綜合運算式

非同步綜合運算式的語法與非同步產生器機制無關,應在單獨的 PEP 中考慮。

非同步 yield from

雖然理論上可以在非同步產生器中實作 yield from 的支援,但這需要對產生器實作進行重大的重新設計。

yield from 對非同步產生器而言也不太重要,因為不需要在協同程式之上再提供一種實作其他協同程式協定的機制。且若要組合非同步產生器,可以使用簡單的 async for 迴圈。

async def g1():
    yield 1
    yield 2

async def g2():
    async for v in g1():
        yield v

為何 asend()athrow() 方法是必要的

它們使得利用非同步產生器實作類似 contextlib.contextmanager 的概念成為可能。例如,透過此提案的設計,可以實作以下模式

@async_context_manager
async def ctx():
    await open()
    try:
        yield
    finally:
        await close()

async with ctx():
    await ...

另一個原因在於,雖然可以使用從 __anext__ 物件傳回的物件將資料與例外丟入非同步產生器,但要正確做到這點相當困難。增加明確的 asend()athrow() 將為達成此目標鋪平安全之路。

就實作而言,asend()__anext__ 的稍微通用版本,而 athrow()aclose() 非常相似。因此,為非同步產生器定義這些方法不會增加任何額外的複雜性。

範例

一個使用目前參考實作的可執行範例(將以一秒間隔印出 0 到 9 的數字)

async def ticker(delay, to):
    for i in range(to):
        yield i
        await asyncio.sleep(delay)


async def run():
    async for i in ticker(1, 10):
        print(i)


import asyncio
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(run())
finally:
    loop.close()

接受

PEP 525 已於 2016 年 9 月 6 日被 Guido 接受 [2]

實作

實作進度追蹤於 issue 28003 [3]。參考實作的 git 儲存庫可於此處存取 [1]

參考文獻

致謝

感謝 Guido van Rossum、Victor Stinner、Elvis Pranskevichus、Nathaniel Smith、Łukasz Langa、Andrew Svetlov 以及許多其他人對本 PEP 提供的回饋、程式碼審查與討論。


原始來源:https://github.com/python/peps/blob/main/peps/pep-0525.rst

最後修改時間:2025-02-01 08:59:27 GMT