PEP 525 – 非同步產生器
- 作者:
- Yury Selivanov <yury at edgedb.com>
- 討論於:
- Python-Dev 列表
- 狀態:
- 最終 (Final)
- 類型:
- 標準軌跡 (Standards Track)
- 建立日期:
- 2016年7月28日
- Python 版本:
- 3.6
- 公告歷史:
- 2016年8月2日、2016年8月23日、2016年9月1日、2016年9月6日
摘要
PEP 492 為 Python 3.5 引入了對原生協同程式(coroutines)以及 async/await 語法的支援。本文提議透過增加對非同步產生器(asynchronous generators)的支援來擴充 Python 的非同步能力。
原理與目標
一般產生器(於 PEP 255 中引入)提供了一種編寫複雜資料產生器的優雅方式,並使其表現得像一個迭代器。
然而,目前對於非同步迭代協定(async for)尚無對應的概念。這使得編寫非同步資料產生器變得過於複雜,因為開發者必須定義一個實作了 __aiter__ 與 __anext__ 的類別,才能在 async for 陳述式中使用它。
本質上,PEP 255 的目標與基本原理,在應用於非同步執行時,同樣適用於本提案。
效能是本提案的另一個重點:在我們對參考實作的測試中,非同步產生器的速度比實作為非同步迭代器的對等程式碼快了 2 倍。
為了說明程式碼品質的提升,考慮以下這個類別,它會在迭代時以給定的延遲印出數字
class Ticker:
"""Yield numbers from 0 to `to` every `delay` seconds."""
def __init__(self, delay, to):
self.delay = delay
self.i = 0
self.to = to
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
if i >= self.to:
raise StopAsyncIteration
self.i += 1
if i:
await asyncio.sleep(self.delay)
return i
相同的邏輯可以實作為一個更簡單的非同步產生器
async def ticker(delay, to):
"""Yield numbers from 0 to `to` every `delay` seconds."""
for i in range(to):
yield i
await asyncio.sleep(delay)
規範
本提案向 Python 引入了非同步產生器的概念。
本規範預設讀者具備 Python 中產生器與協同程式的實作知識(PEP 342, PEP 380 與 PEP 492)。
非同步生成器 (Asynchronous Generators)
Python 產生器是指任何包含一個或多個 yield 運算式的函式
def func(): # a function
return
def genfunc(): # a generator function
yield
我們建議使用相同的方法來定義非同步產生器
async def coro(): # a coroutine function
await smth()
async def asyncgen(): # an asynchronous generator function
await smth()
yield 42
呼叫非同步產生器函式的結果是一個非同步產生器物件,它實作了 PEP 492 中定義的非同步迭代協定。
在非同步產生器中使用非空的 return 陳述式會引發 SyntaxError。
對非同步迭代協定的支援
該協定要求實作兩個特殊方法
- 一個傳回非同步迭代器的
__aiter__方法。 - 一個傳回可等待(awaitable)物件的
__anext__方法,該物件使用StopIteration例外來「yield」值,並使用StopAsyncIteration例外來標示迭代結束。
非同步產生器定義了這兩個方法。讓我們手動迭代一個簡單的非同步產生器
async def genfunc():
yield 1
yield 2
gen = genfunc()
assert gen.__aiter__() is gen
assert await gen.__anext__() == 1
assert await gen.__anext__() == 2
await gen.__anext__() # This line will raise StopAsyncIteration.
終止處理
PEP 492 要求使用事件迴圈或排程器來執行協同程式。由於非同步產生器旨在於協同程式中使用,它們同樣需要事件迴圈來執行與完成(finalize)它們。
非同步產生器可以擁有 try..finally 區塊以及 async with。提供保證非常重要,即使產生器在部分迭代後被垃圾回收,它們也能夠安全地完成清理。例如
async def square_series(con, to):
async with con.transaction():
cursor = con.cursor(
'SELECT generate_series(0, $1) AS i', to)
async for row in cursor:
yield row['i'] ** 2
async for i in square_series(con, 1000):
if i == 100:
break
上述程式碼定義了一個非同步產生器,它使用 async with 來在交易中迭代資料庫游標。接著該產生器透過 async for 進行迭代,該迴圈會在某個點中斷迭代。
square_series() 產生器隨後會被垃圾回收;如果沒有一種機制來非同步地關閉產生器,Python 直譯器將無法執行任何後續清理動作。
為了解決這個問題,我們建議執行以下操作
- 在非同步產生器上實作一個
aclose方法,傳回一個特殊的可等待物件。當被等待(awaited)時,它會將GeneratorExit丟入暫停的產生器,並對其進行迭代,直到出現GeneratorExit或StopAsyncIteration。這與
close()方法對一般 Python 產生器的作用非常相似,差別在於執行aclose()需要事件迴圈。 - 當非同步產生器在其
finally區塊中執行yield運算式時,引發RuntimeError(使用await則是允許的)。async def gen(): try: yield finally: await asyncio.sleep(1) # Can use 'await'. yield # Cannot use 'yield', # this line will trigger a # RuntimeError.
- 在
sys模組中增加兩個新方法:set_asyncgen_hooks()與get_asyncgen_hooks()。
sys.set_asyncgen_hooks() 背後的構想是允許事件迴圈攔截非同步產生器的迭代與清理,這樣最終使用者就不需要擔心清理問題,一切都能順利運作。
sys.set_asyncgen_hooks() 接受兩個引數
firstiter:一個可呼叫物件,當非同步產生器第一次被迭代時會被呼叫。finalizer:一個可呼叫物件,當非同步產生器即將被垃圾回收(GC)時會被呼叫。
當非同步產生器第一次被迭代時,它會儲存對當前 finalizer 的參照。
當非同步產生器即將被垃圾回收時,它會呼叫其快取的 finalizer。假設該 finalizer 會向迭代開始時啟用的迴圈排程一個 aclose() 呼叫。
例如,這裡是 asyncio 如何被修改以允許安全清理非同步產生器
# asyncio/base_events.py
class BaseEventLoop:
def run_forever(self):
...
old_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(finalizer=self._finalize_asyncgen)
try:
...
finally:
sys.set_asyncgen_hooks(*old_hooks)
...
def _finalize_asyncgen(self, gen):
self.create_task(gen.aclose())
第二個引數 firstiter 允許事件迴圈維護一個在其控制下實例化的非同步產生器的弱參照集合(weak set)。這使得實作「關機」機制成為可能,從而安全地完成所有開啟的產生器並關閉事件迴圈。
sys.set_asyncgen_hooks() 是執行緒特定的,因此在並行執行緒中執行的多個事件迴圈可以安全地使用它。
sys.get_asyncgen_hooks() 傳回一個類似 namedtuple 的結構,包含 firstiter 與 finalizer 欄位。
asyncio
asyncio 事件迴圈將使用 sys.set_asyncgen_hooks() API 來維護所有已排程非同步產生器的弱參照集合,並在產生器需要被 GC 時排程它們的 aclose() 協同程式方法。
為了確保 asyncio 程式能夠可靠地完成所有已排程的非同步產生器,我們提議增加一個新的事件迴圈協同程式方法 loop.shutdown_asyncgens()。該方法將排程所有目前開啟的非同步產生器,以 aclose() 呼叫進行關閉。
在呼叫 loop.shutdown_asyncgens() 方法後,若有新的非同步產生器第一次被迭代,事件迴圈將發出警告。這背後的考量是,在請求關閉所有非同步產生器後,程式不應再執行會迭代新產生器的程式碼。
一個如何使用 shutdown_asyncgens 協同程式的範例
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
非同步產生器物件
該物件是仿照標準 Python 產生器物件所設計。本質上,非同步產生器的行為旨在複製同步產生器的行為,唯一的差別在於其 API 是非同步的。
定義了以下方法與屬性
agen.__aiter__():傳回agen。agen.__anext__():傳回一個可等待物件,當被等待時執行一次非同步產生器迭代。agen.asend(val):傳回一個可等待物件,將val物件推入agen產生器。當agen尚未被迭代時,val必須為None。範例
async def gen(): await asyncio.sleep(0.1) v = yield 42 print(v) await asyncio.sleep(0.2) g = gen() await g.asend(None) # Will return 42 after sleeping # for 0.1 seconds. await g.asend('hello') # Will print 'hello' and # raise StopAsyncIteration # (after sleeping for 0.2 seconds.)
agen.athrow(typ, [val, [tb]]):傳回一個可等待物件,將一個例外丟入agen產生器。範例
async def gen(): try: await asyncio.sleep(0.1) yield 'hello' except ZeroDivisionError: await asyncio.sleep(0.2) yield 'world' g = gen() v = await g.asend(None) print(v) # Will print 'hello' after # sleeping for 0.1 seconds. v = await g.athrow(ZeroDivisionError) print(v) # Will print 'world' after # sleeping 0.2 seconds.
agen.aclose():傳回一個可等待物件,將一個GeneratorExit例外丟入產生器。該可等待物件可以傳回一個 yielded 的值(如果agen處理了該例外),或者agen將被關閉,例外將傳遞回呼叫者。agen.__name__與agen.__qualname__:可讀寫的名稱與限定名稱屬性。agen.ag_await:agen目前正在等待的物件,若無則為None。這類似於現有的產生器屬性gi_yieldfrom與協同程式屬性cr_await。agen.ag_frame、agen.ag_running與agen.ag_code:定義方式與標準產生器的對應屬性相同。
StopIteration 與 StopAsyncIteration 不會從非同步產生器中傳遞出去,而是會被替換為 RuntimeError。
實作細節
非同步產生器物件(PyAsyncGenObject)與 PyGenObject 共享結構佈局。此外,參考實作引入了三個新物件
PyAsyncGenASend:實作了__anext__與asend()方法的可等待物件。PyAsyncGenAThrow:實作了athrow()與aclose()方法的可等待物件。_PyAsyncGenWrappedValue:每個從非同步產生器直接 yield 出來的物件都會被隱式封裝(boxed)到此結構中。這是產生器實作區分「使用一般迭代協定 yield 的物件」與「使用非同步迭代協定 yield 的物件」的方式。
PyAsyncGenASend 與 PyAsyncGenAThrow 是可等待物件(它們有傳回 self 的 __await__ 方法)且是類似協同程式的物件(實作了 __iter__、__next__、send() 與 throw() 方法)。本質上,它們控制非同步產生器是如何被迭代的。
PyAsyncGenASend 與 PyAsyncGenAThrow
PyAsyncGenASend 是一個類似協同程式的物件,它驅動 __anext__ 與 asend() 方法,並實作了非同步迭代協定。
agen.asend(val) 與 agen.__anext__() 傳回 PyAsyncGenASend 的實例(該實例保持對父物件 agen 的參照)。
資料流定義如下
- 當
PyAsyncGenASend.send(val)第一次被呼叫時,val會被推入父物件agen(使用PyGenObject的現有功能)。後續對
PyAsyncGenASend物件的迭代,會將None推入agen。當一個
_PyAsyncGenWrappedValue物件被 yield 出來時,它會被解封(unboxed),並引發一個帶有解封值的StopIteration例外。 - 當
PyAsyncGenASend.throw(*exc)第一次被呼叫時,*exc會被丟入父物件agen。後續對
PyAsyncGenASend物件的迭代,會將None推入agen。當一個
_PyAsyncGenWrappedValue物件被 yield 出來時,它會被解封(unboxed),並引發一個帶有解封值的StopIteration例外。 - 非同步產生器中的
return陳述式會引發StopAsyncIteration例外,該例外會透過PyAsyncGenASend.send()與PyAsyncGenASend.throw()方法傳遞。
PyAsyncGenAThrow 與 PyAsyncGenASend 非常相似。唯一的差別在於 PyAsyncGenAThrow.send() 在第一次呼叫時,會將一個例外丟入父物件 agen(而不是推入一個值)。
新的標準函式庫函式與型別
types.AsyncGeneratorType– 非同步產生器物件的型別。sys.set_asyncgen_hooks()與sys.get_asyncgen_hooks()方法,用於設定事件迴圈中的非同步產生器 finalizer 與迭代攔截器。inspect.isasyncgen()與inspect.isasyncgenfunction()內省函式。- asyncio 事件迴圈的新方法:
loop.shutdown_asyncgens()。 - 新的
collections.abc.AsyncGenerator抽象基底類別。
回溯相容性
本提案完全向後相容。
在 Python 3.5 中,定義一個內部含有 yield 運算式的 async def 函式是 SyntaxError,因此在 3.6 引入非同步產生器是安全的。
效能
一般產生器
對一般產生器沒有效能損耗。以下微基準測試在 CPython 上,有無非同步產生器時的執行速度相同
def gen():
i = 0
while i < 100000000:
yield i
i += 1
list(gen())
相對於非同步迭代器的改進
以下微基準測試顯示,非同步產生器的速度比以純 Python 實作的非同步迭代器快約 2.3 倍
N = 10 ** 7
async def agen():
for i in range(N):
yield i
class AIter:
def __init__(self):
self.i = 0
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
if i >= N:
raise StopAsyncIteration
self.i += 1
return i
設計考量
aiter() 與 anext() 內建函式
最初,PEP 492 將 __aiter__ 定義為一個應傳回可等待物件的方法,進而產生一個非同步迭代器。
然而,在 CPython 3.5.2 中,__aiter__ 被重新定義為直接傳回非同步迭代器。為了避免破壞向後相容性,決定 Python 3.6 將同時支援兩種方式:__aiter__ 仍然可以傳回一個可等待物件,但會發出 DeprecationWarning。
由於 Python 3.6 中 __aiter__ 的這種雙重性質,我們無法增加同步版本的 aiter() 內建函式。因此,提議等到 Python 3.7 再進行。
非同步列表/字典/集合綜合運算式
非同步綜合運算式的語法與非同步產生器機制無關,應在單獨的 PEP 中考慮。
非同步 yield from
雖然理論上可以在非同步產生器中實作 yield from 的支援,但這需要對產生器實作進行重大的重新設計。
yield from 對非同步產生器而言也不太重要,因為不需要在協同程式之上再提供一種實作其他協同程式協定的機制。且若要組合非同步產生器,可以使用簡單的 async for 迴圈。
async def g1():
yield 1
yield 2
async def g2():
async for v in g1():
yield v
為何 asend() 與 athrow() 方法是必要的
它們使得利用非同步產生器實作類似 contextlib.contextmanager 的概念成為可能。例如,透過此提案的設計,可以實作以下模式
@async_context_manager
async def ctx():
await open()
try:
yield
finally:
await close()
async with ctx():
await ...
另一個原因在於,雖然可以使用從 __anext__ 物件傳回的物件將資料與例外丟入非同步產生器,但要正確做到這點相當困難。增加明確的 asend() 與 athrow() 將為達成此目標鋪平安全之路。
就實作而言,asend() 是 __anext__ 的稍微通用版本,而 athrow() 與 aclose() 非常相似。因此,為非同步產生器定義這些方法不會增加任何額外的複雜性。
範例
一個使用目前參考實作的可執行範例(將以一秒間隔印出 0 到 9 的數字)
async def ticker(delay, to):
for i in range(to):
yield i
await asyncio.sleep(delay)
async def run():
async for i in ticker(1, 10):
print(i)
import asyncio
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run())
finally:
loop.close()
接受
實作
參考文獻
致謝
感謝 Guido van Rossum、Victor Stinner、Elvis Pranskevichus、Nathaniel Smith、Łukasz Langa、Andrew Svetlov 以及許多其他人對本 PEP 提供的回饋、程式碼審查與討論。
版權
此文件已歸入公有領域 (public domain)。
原始來源:https://github.com/python/peps/blob/main/peps/pep-0525.rst
最後修改時間:2025-02-01 08:59:27 GMT