鍍金池/ 問答/Python  網(wǎng)絡(luò)安全/ Python3 asyncio 在線程中使用

Python3 asyncio 在線程中使用

下面代碼中模擬了使用 2 個(gè)線程管理 event loop 的情況:

import time
import asyncio
import threading

async def task(c, i):
    for _ in range(i):
        print(c)
        await asyncio.sleep(1)
    return i

def thread(loop):  # 異步程序
    asyncio.set_event_loop(loop)
    asyncio.ensure_future(task('sub thread', 999))
    loop.run_forever()

def main():
    threading.Thread(target=thread, args=(asyncio.get_event_loop(), )).start()

    # 同步代碼開始
    future = asyncio.ensure_future(task('main thread', 5))
    while not future.done():
        time.sleep(1)
    print('main done: %s' % future.result())


if __name__ == '__main__':
    main()

其中子線程(thread 函數(shù))設(shè)定并啟用了一個(gè) 999 次的 task 任務(wù)。

而在主線程中,添加了一個(gè) 5 次的 task 任務(wù)。我設(shè)置了一個(gè) While 循環(huán)來檢查這項(xiàng)任務(wù)是否已經(jīng)運(yùn)行完畢,如若完畢則打印出 main done: 5 。


提出的問題:

  1. 像這樣的多線程 event loop 是否有其他方案可以實(shí)現(xiàn)?
  2. 在 main 函數(shù)的 5 個(gè) task 的任務(wù),最初嘗試使用 run_until_complete 來等待執(zhí)行結(jié)束,但是與 run_forever 沖突導(dǎo)致拋出 RuntimeError: This event loop is already running 的錯(cuò)誤,那么除了使用 While 循環(huán)外,還有其他方法阻塞后面代碼執(zhí)行嗎?

實(shí)際情況描述:

我有一個(gè)使用了 同步+異步 的程序,其中同步程序是運(yùn)行在主線程上的,異步是運(yùn)行在一個(gè)子線程中的。
同步與異步獨(dú)立運(yùn)行,各司其職。但是有時(shí)候需要在同步中使用異步的函數(shù)或方法并取得結(jié)果。

回答
編輯回答
風(fēng)清揚(yáng)

你不需要循環(huán)調(diào)用 future.done(),用 future.result() 便可。

我建議把 eventloop 放在主線程,其它工作視類型可以放入

  1. 同(主)線程
    非阻塞(非CPU運(yùn)算型)動(dòng)作,例如 asyncio.sleep
  2. 從線程(池)
    阻塞(非CPU運(yùn)算型)動(dòng)作,例如 time.sleep
  3. 單獨(dú)進(jìn)程
    CPU運(yùn)算型動(dòng)作,例如計(jì)算質(zhì)數(shù)

參考

https://docs.python.org/3/lib...
https://wiki.python.org/moin/...

例子

# -*- coding: utf-8 -*-
import asyncio
from datetime import datetime


async def add(a, b):
    await asyncio.sleep(1)
    return a + b


async def master_thread(loop):
    print("{} master: 1+2={}".format(datetime.now(), await add(1, 2)))


def slave_thread(loop):
    # 注意:這不是 coroutine 函數(shù)
    import time
    time.sleep(2)

    f = asyncio.run_coroutine_threadsafe(add(1, 2), loop)
    print("{} slave: 1+2={}".format(datetime.now(), f.result()))


async def main(loop):
    await asyncio.gather(
        master_thread(loop),
        # 線程池內(nèi)執(zhí)行
        loop.run_in_executor(None, slave_thread, loop),
    )


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()
2017年2月27日 23:13