注意: 这是一篇译文, 原文为Combining Coroutines with Threads and Processes

现有的许多Python库还没有准备好结合asyncio. 它们可能会阻塞, 或是依赖于模块未提供的 并发功能. 我们仍然有可能在基于asyncio的程序中使用到这些库. 方法就是使用 concurrent.futures提供的executor来将这些代码运行在单独的线程或是进程中.

线程

event loop中的run_in_executor方法需要一个executor对象, 一个可以被调用的对象, 以及一些需要被传递的参数. 它会返回一个Future对象, 这个对象可以等待函数结束, 并传递返回值. 如果我们没有传递executor对象, 一个ExecutorPoolExecutor将会 被创建, 下面这个例子显式的创建了executor来限制最大的并发线程数.

一个ThreadPoolExecutor会开启自己的线程, 而后会在线程中调用每个传入的函数. 这个例子显示了如何结合run_in_executor()wait(), 使得event loop在这些 阻塞函数运行的时候仍然有yield的能力, 而后在这些函数结束时, event loop 将会重新激活调用者.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# asyncio_executor_thread.py
import asyncio
import concurrent.futures
import logging
import sys
import time


def blocks(n):
log = logging.getLogger('blocks({})'.format(n))
log.info('running')
time.sleep(0.1)
log.info('done')
return n ** 2


async def run_blocking_tasks(executor):
log = logging.getLogger('run_blocking_tasks')
log.info('starting')

log.info('creating executor tasks')
loop = asyncio.get_event_loop()
blocking_tasks = [
loop.run_in_executor(executor, blocks, i)
for i in range(6)
]
log.info('waiting for executor tasks')
completed, pending = await asyncio.wait(blocking_tasks)
results = [t.result() for t in completed]
log.info('results: {!r}'.format(results))

log.info('exiting')


if __name__ == '__main__':
# Configure logging to show the name of the thread
# where the log message originates.
logging.basicConfig(
level=logging.INFO,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr,
)

# Create a limited thread pool.
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=3,
)

event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()

asyncio_executor_thread.py使用logging记录可以方便的表示是哪个函数或是线程生成. 因为每个阻塞函数中调用了单独的logger, 输出也显示了一些线程被重用, 以此 来完成工作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ python asyncio_exector_thread.py
MainThread run_blocking_tasks: starting
MainThread run_blocking_tasks: creating executor tasks
ThreadPoolExecutor-0_0 blocks(0): running
ThreadPoolExecutor-0_1 blocks(1): running
ThreadPoolExecutor-0_2 blocks(2): running
MainThread run_blocking_tasks: waiting for executor tasks
ThreadPoolExecutor-0_0 blocks(0): done
ThreadPoolExecutor-0_0 blocks(3): running
ThreadPoolExecutor-0_2 blocks(2): done
ThreadPoolExecutor-0_2 blocks(4): running
ThreadPoolExecutor-0_1 blocks(1): done
ThreadPoolExecutor-0_1 blocks(5): running
ThreadPoolExecutor-0_0 blocks(3): done
ThreadPoolExecutor-0_2 blocks(4): done
ThreadPoolExecutor-0_1 blocks(5): done
MainThread run_blocking_tasks: results: [4, 9, 0, 16, 25, 1]
MainThread run_blocking_tasks: exiting

进程

一个ProcessPoolExecutor的工作与上面的类似, 只是创建了一些工作进程而不是线程. 使用单独的线程需要更多的系统资源, 但是对于计算密集型的操作, 它能够充分的利用 每个CPU核.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# asyncio_executor_process.py
# changes from asyncio_executor_thread.py

if __name__ == '__main__':
# Configure logging to show the id of the process
# where the log message originates.
logging.basicConfig(
level=logging.INFO,
format='PID %(process)5s %(name)18s: %(message)s',
stream=sys.stderr,
)

# Create a limited process pool.
executor = concurrent.futures.ProcessPoolExecutor(
max_workers=3,
)

event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()

代码中与进程不同的就是创建了不同类型的executor. 这个例子中也改变了日志格式, 打印了线程的id而不是进程名称, 以次来表示任务是在单独的进程中运行.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ python asyncio_exector_process.py
PID 23876 run_blocking_tasks: starting
PID 23876 run_blocking_tasks: creating executor tasks
PID 23876 run_blocking_tasks: waiting for executor tasks
PID 23877 blocks(0): running
PID 23878 blocks(1): running
PID 23879 blocks(2): running
PID 23877 blocks(0): done
PID 23878 blocks(1): done
PID 23879 blocks(2): done
PID 23878 blocks(3): running
PID 23877 blocks(4): running
PID 23879 blocks(5): running
PID 23878 blocks(3): done
PID 23877 blocks(4): done
PID 23879 blocks(5): done
PID 23876 run_blocking_tasks: results: [4, 0, 25, 9, 16, 1]
PID 23876 run_blocking_tasks: exiting

我的理解与拓展

文章中第一部分关于线程的介绍, 给人一种线程池的感觉. 首先, 你创建一些线程, executor 就是一种池, 这里只是给定最大值, 而后提交任务. 不同的是, 这里使用await 将线程的结果与当前event loop线程的上下文粘合在了一起, 可以避免写回调.

可能这的确是一种过渡方式, 将某些暂时不支持asyncio的库, 强行拓展支持.

官方文档对于run_in_executor的介绍, 相比于原始的上面的博客, 可能更 偏基础一些, 正常的使用还是依照上面的例子比较合适.