Python多线程Concurrent

06-29 1240阅读

背景

从 Python3.2 开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor两个类,实现了对 threading 和 multiprocessing 的进一步抽象(这里主要关注线程池),不仅可以自动调度线程,还可以做到:

Python多线程Concurrent
(图片来源网络,侵删)
  1. 主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
  2. 当一个线程完成的时候,主线程能够立即知道。
  3. 让多线程和多进程的编码接口一致。

总结:实现更容易,效率更高

ThreadPoolExecutor和ProcessPoolExecutor实现 互换很便捷,搞懂一个即可

看到 Pool 单词,我们就能想到 这是一个 池子,池子的概念 是 大小是有上限的,满足最大的数值以后就开始排队

1. 线程池ThreadPoolExecutor

使用 ThreadPoolExecutor 来实例化线程池对象。传入max_workers参数来设置线程池中最多能同时运行的线程数目。

from concurrent.futures import ThreadPoolExecutor
  
executor = ThreadPoolExecutor(max_workers=2)    # 表示在这个线程池中同时运行的线程有2个线程

2. 线程池Submit

使用 submit 函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的返回值,注意 submit() 不是阻塞的,而是立即返回,即无序返回。通过 submit 函数返回的任务句柄,能够使用 done() 方法判断任务是否结束,可以使用result() 方法获得返回值。

import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
def func(num):
    for i in range(num):
        time.sleep(1)
    return num * num
task = executor.submit(func, i)
print(future.done())

3. 线程池Cancel

使用 cancel() 方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。举个例子,线程池的大小设置为2,如果只有2个任务,那么任务已经在运行了,会取消失败。如果改变线程池的大小为1,那么先提交的是task1,task2还在排队等候,这是时候就可以成功取消。

import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
def func(num):
    for i in range(num):
        time.sleep(1)
    return num * num
task1 = executor.submit(func, i)
task2 = executor.submit(func, i)
print(task1.done())
print(task2.cancel()) # 如果到这里task1没有完成,就会取消

4. 线程池As_completed()

按照上面的逻辑如果我们在线程完之后需要一个一个判断是否完成 是非常不合理的,as_completed() 方法可以一次取出所有任务的结果。as_completed() 方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会 yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出,先完成的任务会先进去到as_completed。

import concurrent.futures
import time
from concurrent.futures import as_completed
from tqdm import trange
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
def func(num):
    for i in range(num):
        time.sleep(1)
    return num * num
future = []
for i in range(8, -1, -1):
    print(i)
    start = time.time()
    future.append(executor.submit(func, i))
    print(time.time() - start)
for ft in as_completed(future):
    print(ft.done())
    print(ft.result())

5. 线程池Map

map方法和2中submit方法是一样的功能,但是区别在于map返回出来的结果顺序与输入顺序,而submit是无序的,没有阻塞操作。

summit方法中args是按参数传输的,而map方法中args是一个List传输

import concurrent.futures
import time
from concurrent.futures import as_completed
from tqdm import trange
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
def func(num):
    for i in range(num):
        time.sleep(1)
    return num * num
for data in executor.map(func, range(8, -1, -1)):
    print(data)

输出顺序一定是 88, 77, …11顺序,

但是用submit方法会先打印出77…

6. 线程池Wait

wait 方法可以让主线程阻塞,直到满足设定的要求。wait 方法接收3个参数,等待的任务序列、超时时间以及等待条件。等待条件 reture_when 默认为 ALL_COMPLETED,表明要等待所有的任务都结束。可以看到运行结果中,确实是所有任务都完成了,主线程才打印出 main。等待条件还可以设置为 FIRST_COMPLETED,表示第一个任务完成就停止等待。

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
executor = ThreadPoolExecutor(max_workers=2)
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=ALL_COMPLETED)

总结:

  1. future的设计理念很棒,在线程池/进程池和携程中都存在future对象,是异步编程的核心。
  2. ThreadPoolExecutor 让线程的使用更加方便,减小了线程创建/销毁的资源损耗,无需考虑线程间的复杂同步,方便主线程与子线程的交互。
  3. 线程池的抽象程度很高,多线程和多进程的编码接口一致。

参考文章

https://blog.csdn.net/xiaoyu_wu/article/details/102820384

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]