Python Asyncio 協程(二)

Jimmy Huang
13 min readMar 6, 2021

--

最近工作上有碰到asyncio的需求 所以又花了點時間複習一次

延續前篇的概念解說

這篇會再用不同的角度去切入一次 並比較著重於python的語法糖進行解說

為何要用asyncio舉例 — 西洋棋

2017 Pycon有個很精彩的舉例

https://www.youtube.com/watch?v=iG6fr81xHKA

假設小明是一名厲害的西洋棋手

他有12個對手 每一步小明就想5秒 對手55秒 一盤棋有60個回合

這樣如果是同步的方式

(5+55)*60 *12 = 12hr

這樣就是很直覺的 跟第一個人下棋完後 再下第二個

如果是非同步的方式

(5*12)*60 = 1hr

這樣就是跳過所有等待response的時間 同時和12個人下棋

這邊就是把12個下棋任務放在一個event loop

像是以下程式碼

async def play_with_alex():
while True:
think_and_play() # take 5 secs
await alex_play()
async def play_with_jimmy():
while True:
think_and_play() # take 5 secs
await jimmy_play()

三層概念 — coroutine / task / Event loop

第一層:coroutine (執行function)
第二層:task (並發單位)
第三層:event loop ( 一個thread只能有event loop)

一個event loop 可能含有許多個task

以下依序介紹每個名詞

1. coroutine

a coroutine is a function that can suspend its execution before reaching return, and it can indirectly pass control to another coroutine for some time

簡單來說 coroutine就是 可以暫停釋放資源的function 在python中 用async def去定義 await是釋放資源的

協程可以看做是“能在中途中斷、中途返回值給其他協程、中途恢復、中途傳入參數的函數”,和一般的函數只能在起始傳入參數,不能中斷,而且最後返回值給父函數之後就結束的概念不一樣。定義協程很簡單,只要在定義函數時再前面加入“async”這個關鍵字就行了

凡是用async def 定義的 都要用await去調用 不可以直接調用

async def main():
print('hello')
await asyncio.sleep(1)
print('world')

如果是這樣的function 我們必須使用await調用它 或者是用特別的方式call他

asyncio.run(main())

如果直接呼叫 main() 只會返回一個對象

<coroutine object main at 0x1053bb7c8>

2. task

task就像剛剛西洋棋裡面 跟一個對手下棋 我們就叫做一個task

當一個協程通過 asyncio.create_task() 等函數被封裝為一個任務

task 是 並發運行的單位 負責作為Event Loop和協程對象的溝通介面 經過task對象的包裝才能被Event Loop執行

import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
await say_after(1, 'hello')
await say_after(2, 'world')
asyncio.run(main())

這要需要三秒 似乎沒有節省到時間 為什麼呢 就是因為沒有經過task包裝

async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")asyncio.run(main())

這樣就只需要兩秒了
如果還是不了解 可以參考這篇
深入理解asyncio(一)

請善用 asyncio.gather 或者 asyncio.wait
或者asyncio.create_task

3. 事件循環 (Event loop)

client發出request後,等待server回應的時間 其實就是類似IO-Bound的問題

傳統的寫法如果要取得A,B,C,D四個資料,就要A->B->C->D按照順序執行

如果是異步的寫法 A–> B–> C–> D–> 基本上就是類似先丟出request 之後再去等response

基本上就是 事件循環 1.創建Event Loop 2.在Event Loop上註冊任務(Tasks)

既然異步程式可以在多個任務之間切換,一定有個list包含所有的任務,而這個task list和機制就稱為event loop

回調函數(Callback)

若是event loop在搜尋task list發現有事情要發生了,就會執行執行callback

以上圖為例 就會執行Callback_B

注意 這個Callback_B一定要是non-blocking的!! (等待IO時必須await給event loop)

以下棋為例 同時和12名棋手下棋 跟每個人的下棋就是一個task

全部跟12個人下棋就是event loop callback就是 對手下好棋了 按鈴叫你

Python 文檔資源

介紹完基本的概念後 以下會依照python特有的函式庫進行介紹

Coroutines and Tasks — Python 3.9.2 documentation

Async & Await

async / await 是 Python 3.5+ 之後出現的語法糖

  • async:用來宣告 function 能夠有異步的功能
  • await:用來標記 Coroutine 切換暫停和繼續的點

await 與 可等待對象

如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象。

async def a_coroutine():
   # do something...
await listen_event_happend()
# do something after event happend...

因為當我們要在一個Coroutine裡中途監聽某一個Event發生後再執行後續行為時,只需要用await關鍵字來等待某個Event發生

await 後面必須接可等待的對象 只有三種種類

  1. coroutine協程
  2. task任務
  3. Future 對象

future是比較底層的對象 是一種特殊的低層級可等待對象 通常情況下沒有必要在應用層級的代碼中創建Future對象。

如何創造Task

asyncio.create_task( coroutine ) — python3.7後

asyncio.ensure_future( coroutine ) -python3.6前

大意: 將coroutine轉換成task 並放到當前的Event Loop

這函數把協程對象封裝成一個task對象 但經過task對象的包裝才能被Event Loop執行,所以說task對象負責作為Event Loop和協程對象的溝通介面

該任務會在get_running_loop()返回的循環中執行,如果當前線程沒有在運行的循環則會引發RuntimeError。

asyncio.gather( *aws)

使用 asyncio.gather( ) ,可同時放入多個 Coroutines 或 awaitable object 進入事件循環 (event loop),等 Coroutines 都結束後,並依序收集其回傳值。

asyncio.wait( *aws)

這函數的用處在於把兩個example1和example2的兩個協程對象包成一個大的協程對象,就是把兩個小任務包成一個大任務。

延伸閱讀: gather與wait的不同

深入理解asyncio(二)

  1. asyncio.gather 封装的 Task 全程黑盒,只告诉你协程结果。
  2. asyncio.wait 会返回封装的 Task (包含已完成和挂起的任务),如果你关注协程执行结果你需要从对应 Task 实例里面用 result 方法自己拿。

在大部分情况下,用 asyncio.gather 是足够的,如果你有特殊需求,可以选择 asyncio.wait,举 2 个例子: 需要拿到封装好的 Task,以便取消或者添加成功回调等 业务上需要 FIRST_COMPLETED/FIRST_EXCEPTION 即返回的

import asyncio
import time
async def test():
print('start')
await asyncio.sleep(1)
print('end')
async def main():
await asyncio.gather(test(),test(),test())
if __name__ == "__main__":
asyncio.run(main())
import asyncio
import time
now = lambda: time.time()async def dosomething(num):
print('第 {} 任務,第一步'.format(num))
await asyncio.sleep(2)
print('第 {} 任務,第二步'.format(num))
return '第 {} 任務完成'.format(num)
async def raise_error(num):
raise ValueError
print(‘這邊不會執行到’)
async def main():
tasks = [dosomething(i) for I in range(5)]
tasks1 = [raise_error(i) for I in range(5)]
results = await asyncio.gather(*tasks, *tasks1, return_exceptions=True)
print(results)
if __name__ == “__main__”: start = now()
asyncio.run(main())
print(‘TIME: ‘, now() - start)

asyncio.sleep(seconds)

await asyncio.sleep(1)簡單來說就是啟動一個只有一秒的倒數計時器

在Event Loop 運行asyncio程序

記住最上層是事件循環 要使用async function必須建立event loop

Eventloop 可以註冊的是前面提到 awaitable的 三種類型 coroutine, task or future

Python 3.5+ 使用 asyncio.get_event_loop

先建立一個 event_loop,然後再將 Coroutine 放進 run_until_complete() 裡面,直到所有 Coroutine 運行結束。

import asyncioasync def hello_world(x):
print('hello_world x' + str(x))
await asyncio.sleep(x)
loop = asyncio.get_event_loop()
loop.run_until_complete(hello_world(3))
loop.close()
import asyncioasync def example1(): # 定義一個中間會被中斷的協程
print("Start example1 coroutin.")
await asyncio.sleep(1) # 中斷協程一秒
print("Finish example1 coroutin.")
async def example2(): # 定義一個協程
print("Start example2 coroutin.")
await asyncio.sleep(2)
print("Finish example2 coroutin.")
tasks = [ # 建立一個任務列表
asyncio.ensure_future(example1()),
asyncio.ensure_future(example2()),
]
loop = asyncio.get_event_loop() #建立一個Event Loop
loop.run_until_complete(asyncio.wait(tasks))

asyncio.get_event_loop()

在當前thread 尋找是否已經有註冊的event loop 如果有 就返回 如果沒有 就創立一個新的

Event Loop — Python 3.9.2 documentation

  • asyncio.get_running_loop( ) 返回當前的loop 如果沒有 就拋error
  • asyncio.set_event_loop(loop) 將loop 設定為當前thread的loop

請記得這邊的關鍵是 請在一條thread中 只設定一個event loop

When to use multiple event loops?

loop.run_until_complete(coroutine)

就是讓註冊參數裡的任務並執行,等到任務完成就關閉Event Loop

loop.run_forever()

這個函數一執行,Event Loop就會永遠執行不會被關閉,除非在程式中出現loop.stop()就停止

Python 3.7 推出更簡潔的方法:asyncio.run()

Python 3.7+ 之後將 loop 封裝,只需要使用 asyncio.run() 一行程式就結束,不用在建立 event_loop 結束時也不需要 loop.close,因為他都幫你做完了

結語

隨著python的版本快速更迭 調用的語法糖也快速變化著
不過基本上概念還是分成
event loop / task / coroutine
下一篇應該會直接著重於實作範例上面
喜歡的話幫我按個讚 謝謝

參考資源

https://ithelp.ithome.com.tw/articles/10199408

https://www.maxlist.xyz/2020/03/29/python-coroutine/

深入理解asyncio(一)

https://realpython.com/async-io-python/

--

--