# import xxx
from concurrent import futures
def find_robots(filename):
# xxx
def find_all_robots(logdir):
files = glob.glob(logdir+'/*.log.gz')
all_robots = set()
with futures.ProcessPoolExecutor() as pool:
for robots in pool.map(find_robots, files):
all_robots.update(robots)
return all_robots
if __name__ == '__main__':
# xxx
通过这个修改后,运行这个脚本产生同样的结果,但是在四核机器上面比之前快了 3.5 倍。 实际的性能优化效果根据你的机器 CPU 数量的不同而不同。
讨论
ProcessPoolExecutor 的典型用法如下:
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as pool:
# ...
# do work in parallel using pool
# ...
其原理是,一个 ProcessPoolExecutor 创建 N 个独立的 Python 解释器, N 是系统上面可用 CPU 的个数。你可以通过提供可选参数给 ProcessPoolExecutor(N) 来修改处理器数量。这个处理池会一直运行到 with 块中最后一个语句执行完成, 然后处理池被关闭。不过,程序会一直等待直到所有提交的工作被处理完成。
# A function that performs a lot of work
def work(x):
...
return result
# Nonparallel code
results = map(work, data)
# Parallel implementation
with ProcessPoolExecutor() as pool:
results = pool.map(work, data)
另外,你可以使用 pool.submit() 来手动的提交单个任务:
# Some function
def work(x):
# ...
return result
with ProcessPoolExecutor() as pool:
# ...
# Example of submitting work to the pool
future_result = pool.submit(work, arg)
# Obtaining the result (blocks until done)
r = future_result.result()
# ...