PEP 342 – 透過增強型生成器實現協同程式 (Coroutines)
- 作者:
- Guido van Rossum, Phillip J. Eby
- 狀態:
- 最終 (Final)
- 類型:
- 標準軌跡 (Standards Track)
- 建立日期:
- 2005年5月10日
- Python 版本:
- 2.5
- 公告歷史:
簡介
本 PEP 提議對生成器的 API 和語法進行一些增強,使其能作為簡單的協同程式使用。這基本上是整合了以下兩個 PEP 的想法;若本 PEP 被採納,那兩個 PEP 可能會被視為冗餘:
動機
協同程式是表達許多演算法(如模擬、遊戲、非同步 I/O 以及其他事件驅動程式設計或協同式多工處理)的自然方式。Python 的生成器函式幾乎就是協同程式,但還差了一點——它們允許暫停執行以產生一個值,但卻不提供在恢復執行時傳入值或異常的功能。它們也不允許在 try/finally 區塊的 try 部分內暫停執行,因此導致中止的協同程式難以執行自我清理。
此外,生成器無法在其他函式執行時讓出控制權,除非這些函式本身也被定義為生成器,並且外部生成器被編寫為根據內部生成器產生的值來進行 yield。這使得即使是相對簡單的使用情境(例如非同步通訊)的實作也變得複雜,因為呼叫任何函式,要麼需要生成器「阻塞」(即無法讓出控制權),要麼必須在每個需要的函式呼叫周圍添加大量的樣板迴圈程式碼。
然而,如果能夠在生成器暫停的位置將值或異常「傳入」生成器,那麼一個簡單的協同程式排程器或「彈跳函式」(trampoline function) 將允許協同程式在不阻塞的情況下互相「呼叫」——這對非同步應用程式而言是一個巨大的福音。這樣的應用程式可以編寫協同程式來執行非阻塞 Socket I/O,方法是將控制權讓給 I/O 排程器,直到資料發送完畢或可用為止。同時,執行 I/O 的程式碼只需要簡單地執行如下操作:
data = (yield nonblocking_read(my_socket, nbytes))
以便暫停執行,直到 nonblocking_read() 協同程式產生一個值為止。
換句話說,透過對語言及生成器迭代器類型的實作進行一些相對較小的增強,Python 將能夠支援執行非同步操作,而無需將整個應用程式編寫成一系列的回呼 (callbacks),也無需為需要成百上千個協同式多工偽執行緒的程式使用資源密集型執行緒。因此,這些增強功能將賦予標準 Python 許多 Stackless Python 分支的好處,且無需對 CPython 核心或其 API 進行任何重大修改。此外,這些增強功能應可由任何已經支援生成器的 Python 實作(如 Jython)輕鬆實作。
規範摘要
透過在生成器迭代器類型中添加幾個簡單的方法,並進行兩處小的語法調整,Python 開發者將能夠使用生成器函式來實作協同程式及其他形式的協同式多工處理。這些方法與調整如下:
- 將
yield重新定義為一個表達式,而非陳述式。目前的 yield 陳述式將變為一個 yield 表達式,其值將被丟棄。每當生成器由一般的next()呼叫恢復時,yield 表達式的值為None。 - 為生成器迭代器新增一個
send()方法,該方法會恢復生成器並「傳送」一個值,該值成為當前 yield 表達式的結果。send()方法會傳回生成器產生的下一個值;若生成器未產生下一個值即結束,則引發StopIteration。 - 為生成器迭代器新增一個
throw()方法,該方法會在生成器暫停的位置引發一個異常,並傳回生成器產生的下一個值;若生成器未產生下一個值即結束,則引發StopIteration。(若生成器未捕捉傳入的異常,或引發了不同的異常,則該異常會傳播至呼叫者。) - 為生成器迭代器新增一個
close()方法,該方法會在生成器暫停的位置引發GeneratorExit。若生成器隨後引發StopIteration(透過正常退出,或因為已關閉)或GeneratorExit(透過未捕捉該異常),則close()返回至其呼叫者。若生成器產生了一個值,則引發RuntimeError。若生成器引發任何其他異常,則該異常會傳播至呼叫者。若生成器已因異常或正常退出而結束,close()則不執行任何操作。 - 新增支援,以確保當生成器迭代器被垃圾回收時會呼叫
close()。 - 允許在
try/finally區塊中使用yield,因為現在垃圾回收或明確呼叫close()將允許執行finally子句。
一個針對當前 Python CVS HEAD 實作了所有這些更改的原型修補程式,可從 SourceForge 取得,編號 #1223381 (https://bugs.python.org/issue1223381)。
規範:向生成器發送值
新增生成器方法:send(value)
提議為生成器迭代器新增一個名為 send() 的新方法。它接收唯一的一個參數,即應該「傳入」生成器的值。呼叫 send(None) 與呼叫生成器的 next() 方法完全等價。使用任何其他值呼叫 send() 也是一樣的,差別僅在於生成器當前 yield 表達式產生的值會有所不同。
由於生成器迭代器從生成器函式體的頂部開始執行,當生成器剛被建立時,沒有 yield 表達式可接收值。因此,當生成器迭代器剛啟動時,禁止使用非 None 的參數呼叫 send(),若發生此情況將引發 TypeError(通常是因為某種邏輯錯誤)。因此,在與協同程式通訊之前,必須先呼叫 next() 或 send(None) 將其執行推進至第一個 yield 表達式。
與 next() 方法一樣,send() 方法會傳回生成器迭代器產生的下一個值,或若生成器正常退出或已經退出,則引發 StopIteration。若生成器引發未捕捉的異常,則會傳播至 send() 的呼叫者。
新增語法:Yield 表達式 (Yield Expressions)
yield 陳述式將被允許用於賦值語句的右側;在這種情況下,它被稱為 yield 表達式。此 yield 表達式的值為 None,除非使用非 None 的參數呼叫了 send();詳見下文。
yield 表達式必須始終加上括號,除非它出現在賦值語句右側的最頂層表達式中。因此:
x = yield 42
x = yield
x = 12 + (yield 42)
x = 12 + (yield)
foo(yield 42)
foo(yield)
這些都是合法的,但:
x = 12 + yield 42
x = 12 + yield
foo(yield 42, 12)
foo(yield, 12)
這些都是非法的。(一些邊緣情況是基於當前 yield 12, 42 的合法性而考量的。)
請注意,不帶表達式的 yield 陳述式或 yield 表達式現在也是合法的。這很合理:當 next() 呼叫中的資訊流反轉時,應該可以不傳遞明確的值進行 yield(當然,yield 等同於 yield None)。
當呼叫 send(value) 時,它恢復的 yield 表達式將傳回傳入的值。當呼叫 next() 時,恢復的 yield 表達式將傳回 None。若 yield 表達式是一個 yield 陳述式,則忽略此回傳值,類似於忽略作為陳述式使用的函式呼叫所傳回的值。
實際上,yield 表達式就像一個反轉的函式呼叫;yield 的參數實際上是從當前執行函式傳回(yield 出)的,而 yield 的「回傳值」則是透過 send() 傳入的參數。
注意:yield 的語法擴充使其用法與 Ruby 非常相似。這是刻意為之。請注意,在 Python 中,區塊是使用 send(EXPR) 而非 return EXPR 將值傳遞給生成器,且在生成器與區塊之間傳遞控制權的底層機制完全不同。Python 中的區塊不會被編譯為 thunks;相反,yield 會暫停生成器幀 (frame) 的執行。一些邊緣情況的運作方式不同;在 Python 中,你不能儲存區塊以供後續使用,也無法測試是否存在區塊。(XXX - 這些關於區塊的內容現在似乎顯得不合時宜,或許 Guido 可以編輯釐清。)
規範:異常與清理
設一個生成器物件為呼叫生成器函式所產生的迭代器。在下文中,*g* 始終指代一個生成器物件。
新增語法:允許在 try-finally 內部使用 yield
生成器函式的語法已擴充,允許在 try-finally 陳述式內部使用 yield 陳述式。
新增生成器方法:throw(type, value=None, traceback=None)
g.throw(type, value, traceback) 會導致指定的異常在生成器 *g* 當前暫停的位置(即在 yield 陳述式處,或若尚未呼叫 next() 則在函式體開始處)拋出。若生成器捕捉到該異常並產生另一個值,該值即為 g.throw() 的回傳值。若生成器未捕捉到該異常,則 throw() 看起來就像引發了傳入的同一個異常(它會「穿透」)。若生成器引發了另一個異常(包含它返回時產生的 StopIteration),該異常會由 throw() 呼叫引發。總之,throw() 的行為就像 next() 或 send(),只是它會在暫停點引發異常。若生成器已經處於關閉狀態,throw() 只會引發傳入的異常,而不執行生成器的任何程式碼。
引發異常的效果完全等同於在暫停點執行了以下語句:
raise type, value, traceback
type 參數不能為 None,且 type 與 value 必須相容。若 value 不是該類型的實例,則會使用該 value 建立一個新的異常實例,遵循 raise 陳述式建立異常實例時所用的相同規則。若提供了 traceback,則必須是有效的 Python traceback 物件,否則會引發 TypeError。
注意:throw() 方法的名稱是基於多個原因選擇的。Raise 是一個關鍵字,因此不能用作方法名稱。與 raise(它會立即從當前執行點引發異常)不同,throw() 會先恢復生成器,然後再引發異常。「throw」一詞暗示將異常放入另一個位置,並且在其他語言中已經與異常關聯。
曾考慮過其他方法名稱:resolve()、signal()、genraise()、raiseinto() 以及 flush()。似乎沒有任何一個比 throw() 更合適。
新增標準異常:GeneratorExit
定義了一個新的標準異常 GeneratorExit,繼承自 Exception。生成器應透過重新引發它(或乾脆不捕捉它)或引發 StopIteration 來處理此異常。
新增生成器方法:close()
g.close() 由以下虛擬碼定義:
def close(self):
try:
self.throw(GeneratorExit)
except (GeneratorExit, StopIteration):
pass
else:
raise RuntimeError("generator ignored GeneratorExit")
# Other exceptions are not caught
新增生成器方法:__del__()
g.__del__() 是 g.close() 的封裝器。當生成器物件被垃圾回收時(在 CPython 中,這指其參照計數歸零時),它將被呼叫。若 close() 引發異常,該異常的 traceback 將被列印至 sys.stderr 並被忽略;它不會傳播回觸發垃圾回收的地方。這與類別實例上 __del__() 方法中異常的處理方式一致。
若生成器物件參與了循環參照,g.__del__() 可能不會被呼叫。這是 CPython 當前垃圾回收器的行為。此限制的原因是 GC 程式碼需要在任意點「中斷」循環以進行回收,此後不應允許任何 Python 程式碼查看構成循環的物件,因為它們可能處於無效狀態。不屬於循環一部份的「懸掛」物件不受此限制。
請注意,在實務中幾乎不可能看到生成器物件參與循環參照。然而,將生成器物件儲存在全域變數中會透過生成器幀的 f_globals 指標建立循環。建立循環的另一種方式是在作為參數傳遞給生成器的資料結構中儲存對生成器物件的參照(例如,若一個物件有一個方法是生成器,且它保留了對該方法所建立的執行中迭代器的參照)。考慮到生成器的典型使用模式,這兩種情況都不太可能發生。
此外,在本 PEP 的 CPython 實作中,生成器所使用的幀物件應在因錯誤或正常退出而終止執行時釋放。這將確保無法恢復的生成器不會成為不可回收參照循環的一部分。這使得其他程式碼可以在 try/finally 或 with 區塊(參見 PEP 343)中使用 close(),從而確保給定的生成器被正確地終結。
選用擴充功能
擴充後的 continue 陳述式
本 PEP 的早期草案曾提議在 for 迴圈中使用一種新的 continue EXPR 語法(延續自 PEP 340),該語法會將 *EXPR* 的值傳入正在進行迴圈的迭代器中。此功能目前已撤回,因為本 PEP 的範圍已縮小為僅專注於將值傳入生成器迭代器,而非其他類型的迭代器。Python-Dev 列表上的一些人也認為,為此特定功能新增語法至少在目前看來為時尚早。
待決問題
在 python-dev 上的討論揭示了一些懸而未決的問題。我在這裡列出它們,並附上我首選的解決方案及其動機。目前編寫的 PEP 反映了這些首選解決方案。
- 當生成器產生另一個值以回應
GeneratorExit異常時,close()應該引發什麼異常?我最初選擇了
TypeError,因為它代表了生成器函式的嚴重不當行為,應該透過修改程式碼來修復。但 PEP 343 中的with_template裝飾器類別對類似的違規行為使用了RuntimeError。可以說,它們都應該使用相同的異常。我不想僅為了這個目的引入一個新的異常類別,因為我不希望人們捕捉這個異常:我希望它轉化為 traceback,讓開發者看到後修正程式碼。因此,我現在認為它們都應該引發RuntimeError。這是有先例的:核心 Python 程式碼在檢測到無限遞迴、未初始化物件(以及各種其他雜項條件)時會引發該異常。 - Oren Tirosh 提議將
send()方法重新命名為feed(),以相容於 *consumer 介面*(參見 http://effbot.org/zone/consumer.htm 規範)。然而,仔細研究 consumer 介面後,似乎
feed()所需的語意與send()不同,因為send()不能在剛啟動的生成器上被有意義地呼叫。此外,當前定義的 consumer 介面未包含對StopIteration的處理。因此,建立一個簡單的裝飾器來封裝生成器函式,使其符合 consumer 介面,可能會更有用。例如,它可以用初始的
next()呼叫來「預熱」生成器、捕捉 StopIteration,甚至可能透過重新呼叫生成器函式來提供reset()功能。
範例
- 一個簡單的 *consumer* 裝飾器,使得生成器函式在最初被呼叫時會自動推進至其第一個 yield 點:
def consumer(func): def wrapper(*args,**kw): gen = func(*args, **kw) gen.next() return gen wrapper.__name__ = func.__name__ wrapper.__dict__ = func.__dict__ wrapper.__doc__ = func.__doc__ return wrapper
- 使用 *consumer* 裝飾器建立 *反向生成器* 的範例,它接收影像並建立縮圖頁面,然後將它們發送給另一個 consumer。像這樣的函式可以串聯起來,形成高效的 *consumer* 處理管道,每個 consumer 都可以擁有複雜的內部狀態:
@consumer def thumbnail_pager(pagesize, thumbsize, destination): while True: page = new_image(pagesize) rows, columns = pagesize / thumbsize pending = False try: for row in xrange(rows): for column in xrange(columns): thumb = create_thumbnail((yield), thumbsize) page.write( thumb, col*thumbsize.x, row*thumbsize.y ) pending = True except GeneratorExit: # close() was called, so flush any pending output if pending: destination.send(page) # then close the downstream consumer, and exit destination.close() return else: # we finished a page full of thumbnails, so send it # downstream and keep on looping destination.send(page) @consumer def jpeg_writer(dirname): fileno = 1 while True: filename = os.path.join(dirname,"page%04d.jpg" % fileno) write_jpeg((yield), filename) fileno += 1 # Put them together to make a function that makes thumbnail # pages from a list of images and other parameters. # def write_thumbnails(pagesize, thumbsize, images, output_dir): pipeline = thumbnail_pager( pagesize, thumbsize, jpeg_writer(output_dir) ) for image in images: pipeline.send(image) pipeline.close()
- 一個簡單的協同程式排程器或 *彈跳器*,允許協同程式透過 yield 它們想要呼叫的協同程式來「呼叫」其他協同程式。協同程式產生的任何非生成器值都會傳回給「呼叫」該產生值協同程式的協同程式。同樣地,若一個協同程式引發異常,該異常會傳播至其「呼叫者」。實際上,只要你使用 yield 表達式來呼叫原本會「阻塞」的常式,這個範例就能模擬 Stackless Python 中使用的簡單 tasklets。這只是一個非常簡單的範例,還有更多精密的排程器是可行的。(例如,現有的 Python GTasklet 框架 (http://www.gnome.org/~gjc/gtasklet/gtasklets.html) 和 peak.events 框架 (http://peak.telecommunity.com/) 已經實作了類似的排程功能,但目前必須為無法傳入值或異常給生成器的限制使用笨拙的變通方法。)
import collections class Trampoline: """Manage communications between coroutines""" running = False def __init__(self): self.queue = collections.deque() def add(self, coroutine): """Request that a coroutine be executed""" self.schedule(coroutine) def run(self): result = None self.running = True try: while self.running and self.queue: func = self.queue.popleft() result = func() return result finally: self.running = False def stop(self): self.running = False def schedule(self, coroutine, stack=(), val=None, *exc): def resume(): value = val try: if exc: value = coroutine.throw(value,*exc) else: value = coroutine.send(value) except: if stack: # send the error back to the "caller" self.schedule( stack[0], stack[1], *sys.exc_info() ) else: # Nothing left in this pseudothread to # handle it, let it propagate to the # run loop raise if isinstance(value, types.GeneratorType): # Yielded to a specific coroutine, push the # current one on the stack, and call the new # one with no args self.schedule(value, (coroutine,stack)) elif stack: # Yielded a result, pop the stack and send the # value to the caller self.schedule(stack[0], stack[1], value) # else: this pseudothread has ended self.queue.append(resume)
- 一個簡單的 *echo* 伺服器,以及使用彈跳器執行它的程式碼(假設存在
nonblocking_read、nonblocking_write以及其他 I/O 協同程式,例如若連線關閉則引發ConnectionLost):# coroutine function that echos data back on a connected # socket # def echo_handler(sock): while True: try: data = yield nonblocking_read(sock) yield nonblocking_write(sock, data) except ConnectionLost: pass # exit normally if connection lost # coroutine function that listens for connections on a # socket, and then launches a service "handler" coroutine # to service the connection # def listen_on(trampoline, sock, handler): while True: # get the next incoming connection connected_socket = yield nonblocking_accept(sock) # start another coroutine to handle the connection trampoline.add( handler(connected_socket) ) # Create a scheduler to manage all our coroutines t = Trampoline() # Create a coroutine instance to run the echo_handler on # incoming connections # server = listen_on( t, listening_socket("localhost","echo"), echo_handler ) # Add the coroutine to the scheduler t.add(server) # loop forever, accepting connections and servicing them # "in parallel" # t.run()
參考實作
一個實作了本 PEP 中描述的所有特性的原型修補程式,可從 SourceForge 取得,編號 #1223381 (https://bugs.python.org/issue1223381)。
該修補程式已於 2005 年 8 月 1 日至 2 日合併至 CVS。
致謝
Raymond Hettinger (PEP 288) 和 Samuele Pedroni (PEP 325) 最先正式提出了將值或異常傳入生成器的想法,以及「關閉」生成器的能力。Timothy Delaney 建議了本 PEP 的標題,Steven Bethard 協助編輯了之前的版本。另請參閱 PEP 340 的致謝部分。
參考文獻
待定 (TBD)。
版權
此文件已歸入公有領域 (public domain)。
來源:https://github.com/python/peps/blob/main/peps/pep-0342.rst
最後修改時間:2025-02-01 08:59:27 GMT