Python的进程、线程和协程的适用场景和使用技巧

网友投稿 1022 2022-05-28

1. 前言

前些日子写过几篇关于线程和进程的文章,概要介绍了Python内置的线程模块(threading)和进程模块(multiprocessing)的使用方法,侧重点是线程间同步和进程间同步。随后,陆续收到了不少读者的私信,咨询进程、线程和协程的使用方法,进程、线程和协程分别适用于何种应用场景,以及混合使用进程、线程和协程的技巧。归纳起来,核心的问题大致有以下几个:

使用线程是为了并行还是加速?

为什么我使用多线程之后,处理速度并没有预期的快,甚至更慢了?

我应该选择多进程处理还是多线程处理?

协程和线程有什么不同?

什么情况下使用协程?

在进程、线程和协程的使用上,初学者之所以感到困惑,最主要的原因是对任务的理解不到位。任务是由一个进程、或者线程、或者协程独立完成的、相对独立的一系列工作组合。通常,我们会把任务写成一个函数。任务有3种类型:

计算密集型任务:任务包含大量计算,CPU占用率高

IO密集型任务:任务包含频繁的、持续的网络IO和磁盘IO

混合型任务:既有计算也有IO

也有观点认为还有一种数据密集型任务,但我认为数据密集型任务一般出现在分布式系统或异构系统上,必定伴随着计算密集和IO密集,因此,仍然可以归类到混合型任务。

下面,我们就以几个实例来讲解演示进程、线程和协程的适用场景、使用方法,以及如何优化我们的代码。

2. 线程

2.1 线程的最大意义在于多任务并行

通常,代码是单线程顺序执行的,这个线程就是主线程。仅有主线程的话,在同一时刻就只能做一件事情;如果有多件事情要做,那也只能做完一件再去做另一件。这有点类似于过去的说书艺人,情节人物复杂时,只能“花开两朵,各表一枝”。下面这个题目,就是一个需要同时做两件事情的例子。

请写一段代码,提示用户从键盘输入任意字符,然后等待用户输入。如果用户在10秒钟完成输入(按回车键),则显示输入内容并结束程序;否则,不再等待用户输入,而是直接提示超时并结束程序。

我们知道,input()函数用于从键盘接收输入,time.sleep()函数可以令程序停止运行指定的时长。不过,在等待键盘输入的时候,sleep()函数就无法计时,而在休眠的时候,input()函数就无法接收键盘输入。不借助于线程,我们无法同时做这两件事情。如果使用线程技术的话,我们可以在主线程中接收键盘输入,在子线程中启动sleep()函数,一旦休眠结束,子线程就杀掉主线程,结束程序。

import os, time import threading def monitor(): time.sleep(10) print('\n超时退出!') os._exit(0) m = threading.Thread(target=monitor) m.setDaemon(True) m.start() s = input('请输入>>>') print('接收到键盘输入:%s'%s) print('程序正常结束。')

2.2 使用线程处理IO密集型任务

假如从100个网站抓取数据,使用单线程的话,就需要逐一请求这100个站点并处理应答结果,所花费时间就是每个站点花费时间的总和。如果使用多个线程来实现的话,结果会怎样呢?

import time import requests import threading urls = ['https://www.baidu.com', 'https://cn.bing.com'] def get_html(n): for i in range(n): url = urls[i%2] resp = requests.get(url) #print(resp.ok, url) t0 = time.time() get_html(100) # 请求100次 t1 = time.time() print('1个线程请求100次,耗时%0.3f秒钟'%(t1-t0)) for n_thread in (2,5,10,20,50): t0 = time.time() ths = list() for i in range(n_thread): ths.append(threading.Thread(target=get_html, args=(100//n_thread,))) ths[-1].setDaemon(True) ths[-1].start() for i in range(n_thread): ths[i].join() t1 = time.time() print('%d个线程请求100次,耗时%0.3f秒钟'%(n_thread, t1-t0))

上面的代码用百度和必应两个网站来模拟100个站点,运行结果如下所示。单线程处理大约需要30秒钟。分别使用2、5、10个线程来处理的话,所耗时间与线程数量基本上保持反比关系。当线程数量继续增加20个时,速度不再有显著提升。若将线程数量增至50个,时间消耗反倒略有增加。

1个线程请求100次,耗时30.089秒钟

2个线程请求100次,耗时15.087秒钟

5个线程请求100次,耗时7.803秒钟

10个线程请求100次,耗时4.112秒钟

20个线程请求100次,耗时3.160秒钟

50个线程请求100次,耗时3.564秒钟

这个结果表明,对于IO密集型(本例仅测试网络IO,没有磁盘IO)的任务,适量的线程可以在一定程度上提高处理速度。随着线程数量的增加,速度的提升不再明显。

2.3 使用线程处理计算密集型任务

对于曝光不足或明暗变化剧烈的照片可以通过算法来修正。下图左是一张落日图,因为太阳光线较强导致暗区细节无法辨识,通过低端增强算法可以还原为下图右的样子。

低端增强算法(也有人叫做伽马矫正)其实很简单:对于

[

0

,

255

]

[0,255]

[0,255]区间内的灰度值

v

0

v_0

v0 ,指定矫正系数

γ

\gamma

γ,使用下面的公式,即可得到矫正后的灰度值

v

1

v_1

v1 ,其中

γ

\gamma

γ一般选择2或者3,上图右就是

γ

\gamma

γ为3的效果。

v

1

=

255

×

(

v

0

255

)

1

γ

v_1 = 255\times(\frac{v_0}{255})^{\frac{1}{\gamma}}

v1 =255×(255v0 )γ1

下面的代码,对于一张分辨率为4088x2752的照片实施低端增强算法,这是一项计算密集型的任务。代码中分别使用了广播和矢量计算、单线程逐像素计算、多线程逐像素计算等三种方法,以验证多线程对于计算密集型任务是否有提速效果。

import time import cv2 import numpy as np import threading def gamma_adjust_np(im, gamma, out_file): """伽马增强函数:使用广播和矢量化计算""" out = (np.power(im.astype(np.float32)/255, 1/gamma)*255).astype(np.uint8) cv2.imwrite(out_file, out) def gamma_adjust_py(im, gamma, out_file): """伽马增强函数:使用循环逐像素计算""" rows, cols = im.shape out = im.astype(np.float32) for i in range(rows): for j in range(cols): out[i,j] = pow(out[i,j]/255, 1/3)*255 cv2.imwrite(out_file, out.astype(np.uint8)) im = cv2.imread('river.jpg', cv2.IMREAD_GRAYSCALE) rows, cols = im.shape print('照片分辨率为%dx%d'%(cols, rows)) t0 = time.time() gamma_adjust_np(im, 3, 'river_3.jpg') t1 = time.time() print('借助NumPy广播特性,耗时%0.3f秒钟'%(t1-t0)) t0 = time.time() im_3 = gamma_adjust_py(im, 3, 'river_3_cycle.jpg') t1 = time.time() print('单线程逐像素处理,耗时%0.3f秒钟'%(t1-t0)) t0 = time.time() th_1 = threading.Thread(target=gamma_adjust_py, args=(im[:rows//2], 3, 'river_3_1.jpg')) th_1.setDaemon(True) th_1.start() th_2 = threading.Thread(target=gamma_adjust_py, args=(im[rows//2:], 3, 'river_3_2.jpg')) th_2.setDaemon(True) th_2.start() th_1.join() th_2.join() t1 = time.time() print('启用两个线程逐像素处理,耗时%0.3f秒钟'%(t1-t0))

运行结果如下:

照片分辨率为4088x2752

借助NumPy广播特性,耗时0.381秒钟

单线程逐像素处理,耗时34.228秒钟

启用两个线程逐像素处理,耗时36.087秒钟

结果显示,对一张千万级像素的照片做低端增强,借助于NumPy的广播和矢量化计算,耗时0.38秒钟;单线程逐像素处理的话,耗时相当于NumPy的100倍;启用多线程的话,速度不仅没有加快,反倒是比单线程更慢。这说明,对于计算密集型的任务来说,多线程并不能提高处理速度,相反,因为要创建和管理线程,处理速度会更慢一些。

2.4 线程池

尽管多线程可以并行处理多个任务,但开启线程不仅花费时间,也需要占用系统资源。因此,线程数量不是越多越快,而是要保持在合理的水平上。线程池可以让我们用固定数量的线程完成比线程数量多得多的任务。下面的代码演示了使用 Python 的标准模块创建线程池,计算多个数值的平方。

>>> from concurrent.futures import ThreadPoolExecutor >>> def pow2(x): return x*x >>> with ThreadPoolExecutor(max_workers=4) as pool: # 4个线程的线程池 result = pool.map(pow2, range(10)) # 使用4个线程分别计算0~9的平方 >>> list(result) # result是一个生成器,转成列表才可以直观地看到计算结果 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

如果每个线程的任务各不相同,使用不同的线程函数,任务结束后的结果处理也不一样,同样可以使用这个线程池。下面的代码对多个数值中的奇数做平方运算,偶数做立方运算,线程任务结束后,打印各自的计算结果。

>>> from concurrent.futures import ThreadPoolExecutor >>> def pow2(x): return x*x >>> def pow3(x): return x*x*x >>> def save_result(task): # 保存线程计算结果 global result result.append(task.result()) >>> result = list() >>> with ThreadPoolExecutor(max_workers=3) as pool: for i in range(10): if i%2: # 奇数做平方运算 task = pool.submit(pow2, i) else: # 偶数做立方运算 task = pool.submit(pow3, i) task.add_done_callback(save_result) # 为每个线程指定结束后的回调函数 >>> result [0, 1, 8, 9, 64, 25, 216, 49, 512, 81]

3. 进程

3.1 使用进程处理计算密集型任务

和线程相比,进程的最大优势是可以充分例用计算资源——这一点不难理解,因为不同的进程可以运行在不同CPU的不同的核上。假如一台计算机的CPU共有16核,则可以启动16个或更多个进程来并行处理任务。对于上面的例子,我们改用进程来处理,效果会怎样呢?

import time import cv2 import numpy as np import multiprocessing as mp def gamma_adjust_py(im, gamma, out_file): """伽马增强函数:使用循环逐像素计算""" rows, cols = im.shape out = im.astype(np.float32) for i in range(rows): for j in range(cols): out[i,j] = pow(out[i,j]/255, 1/3)*255 cv2.imwrite(out_file, out.astype(np.uint8)) if __name__ == '__main__': mp.freeze_support() im_fn = 'river.jpg' im = cv2.imread(im_fn, cv2.IMREAD_GRAYSCALE) rows, cols = im.shape print('照片分辨率为%dx%d'%(cols, rows)) t0 = time.time() pro_1 = mp.Process(target=gamma_adjust_py, args=(im[:rows//2], 3, 'river_3_1.jpg')) pro_1.daemon = True pro_1.start() pro_2 = mp.Process(target=gamma_adjust_py, args=(im[rows//2:], 3, 'river_3_2.jpg')) pro_2.daemon = True pro_2.start() pro_1.join() pro_2.join() t1 = time.time() print('启用两个进程逐像素处理,耗时%0.3f秒钟'%(t1-t0))

运行结果如下:

照片分辨率为4088x2752

启用两个进程逐像素处理,耗时17.786秒钟

使用单个线程或两个线程的时候,耗时大约30+秒,改用两个进程后,耗时17.786秒,差不多快了一倍。如果使用4个进程(前提是运行代码的计算机至少有4个CPU核)的话,速度还能提高一倍,有兴趣的朋友可以试一下。这个测试表明,对于计算密集型的任务,使用多进程并行处理是有效的提速手段。通常,进程数量选择CPU核数的整倍数。

3.2 进程间通信示例

Python的进程、线程和协程的适用场景和使用技巧

多进程并行弥补了多线程技术的不足,我们可以在每一颗 CPU 上,或多核 CPU 的每一个核上启动一个进程。如果有必要,还可以在每个进程内再创建适量的线程,最大限度地使用计算资源来解决问题。不过,进程技术也有很大的局限性,因为进程不在同一块内存区域内,所以和线程相比,进程间的资源共享、通信、同步等都要麻烦得多,受到的限制也更多。

我们知道,线程间通信可以使用队列、互斥锁、信号量、事件和条件等多种同步方式,同样的,这些手段也可以应用在进程间。此外,multiprocessing 模块还提供了管道和共享内存等进程间通信的手段。下面仅演示一个进程间使用队列通信,更多的通信方式请参考由人民邮电出版社出版的拙著《Python高手修炼之道》。

这段代码演示了典型的生产者—消费者模式。进程 A 负责随机地往地上“撒钱”(写队列),进程 B 负责从地上“捡钱”(读队列)。

import os, time, random import multiprocessing as mp def sub_process_A(q): """A进程函数:生成数据""" while True: time.sleep(5*random.random()) # 在0-5秒之间随机延时 q.put(random.randint(10,100)) # 随机生成[10,100]之间的整数 def sub_process_B(q): """B进程函数:使用数据""" words = ['哈哈,', '天哪!', 'My God!', '咦,天上掉馅饼了?'] while True: print('%s捡到了%d块钱!'%(words[random.randint(0,3)], q.get())) if __name__ == '__main__': print('主进程(%s)开始,按回车键结束本程序'%os.getpid()) q = mp.Queue(10) p_a = mp.Process(target=sub_process_A, args=(q,)) p_a.daemon = True p_a.start() p_b = mp.Process(target=sub_process_B, args=(q,)) p_b.daemon = True p_b.start() input()

3.3 进程池

使用多进程并行处理任务时,处理效率和进程数量并不总是成正比。当进程数量超过一定限度后,完成任务所需时间反而会延长。进程池提供了一个保持合理进程数量的方案,但合理进程数量需要根据硬件状况及运行状况来确定,通常设置为 CPU 的核数。

multiprocessing.Pool(n) 可创建 n 个进程的进程池供用户调用。如果进程池任务不满,则新的进程请求会被立即执行;如果进程池任务已满,则新的请求将等待至有可用进程时才被执行。向进程池提交任务有以下两种方式。

apply_async(func[, args[, kwds[, callback]]]) :非阻塞式提交。即使进程池已满,也会接受新的任务,不会阻塞主进程。新任务将处于等待状态。

apply(func[, args[, kwds]]) :阻塞式提交。若进程池已满,则主进程阻塞,直至有空闲进程可以使用。

下面的代码演示了进程池的典型用法。读者可自行尝试阻塞式提交和非阻塞式提交两种方法的差异。

import time import multiprocessing as mp def power(x, a=2): """进程函数:幂函数""" time.sleep(1) print('%d的%d次方等于%d'%(x, a, pow(x, a))) def demo(): mpp = mp.Pool(processes=4) for item in [2,3,4,5,6,7,8,9]: mpp.apply_async(power, (item, )) # 非阻塞提交新任务 #mpp.apply(power, (item, )) # 阻塞提交新任务 mpp.close() # 关闭进程池,意味着不再接受新的任务 print('主进程走到这里,正在等待子进程结束') mpp.join() # 等待所有子进程结束 print('程序结束') if __name__ == '__main__': demo()

4. 协程

4.1 协程和线程的区别

如前文所述,线程常用于多任务并行。对于可以切分的IO密集型任务,将切分的每一小块任务分配给一个线程,可以显著提高处理速度。而协程,无论有多少个,都被限定在一个线程内执行,因此,协程又被称为微线程。

从宏观上看,线程任务和协程任务都是并行的。从微观上看,线程任务是分时切片轮流执行的,这种切换是系统自动完成的,无需程序员干预;而协程则是根据任务特点,在任务阻塞时将控制权交给其他协程,这个权力交接的时机和位置,由程序员指定。由此可以看出,参与协程管理的每一个任务,必须存在阻塞的可能,且阻塞条件会被其它任务破坏,从而得以在阻塞解除后继续执行。

尽管协程难以驾驭,但是由于是在一个线程内运行,免除了线程或进程的切换开销,因而协程的运行效率高,在特定场合下仍然被广泛使用。

4.2 协程演进史

Py2时代,Python并不支持协程,仅可通过yield实现部分的协程功能。另外,还可以通过gevent等第三方库实现协程。gevent最好玩的,莫过于monkey_patch(猴子补丁),曾经有一段时间,我特别喜欢使用它。

从Py3.4开始,Python内置asyncio标准库,正式原生支持协程。asyncio的异步操作,需要在协程中通过yield from完成,协程函数则需要使用@asyncio.coroutine装饰器。

不理解生成器的同学,很难驾驭yield这个反人类思维的东西,为了更贴近人类思维,Py3.5引入了新的语法async和await,可以让协程的代码稍微易懂一点点。如果此前没有接触过协程,我建议你只学习async和await的用法就足够了,不需要去了解早期的yield和后来的yield from。本质上,async就是@asyncio.coroutine,await就是yield from,换个马甲,看起来就顺眼多了。

4.3 协程应用示例

作为基础知识,在介绍协程应用示例前,先来介绍一下队列。在进程、线程、协程模块中,都有队列(Queue)对象。队列作为进程、线程、协程间最常用的通信方式,有一个不容忽视的特性:阻塞式读和写。当队列为空时,读会被阻塞,直到读出数据;当队列满时,写会被阻塞,直到队列空出位置后写入成功。因为队列具有阻塞式读写的特点,正好可以在协程中利用阻塞切换其他协程任务。

我们来构思一个这样的应用:某个富豪(rich)手拿一沓钞票,随机取出几张,撒在地上(如果地上已经有钞票的话,就等有人捡走了再撒);另有名为A、B、C的三个幸运儿(lucky),紧盯着撒钱的富豪,只要富豪把钱撒到地上,他们立刻就去捡起来。

如果用协程实现上述功能的话,我们可以用长度为1的协程队列来存放富豪每一次抛撒的钱。一旦队列中有钱(队列满),富豪就不能继续抛撒了,抛撒被阻塞,协程控制权转移。三个幸运儿中的某一个获得控制权,就去读队列(捡钱),如果队列中没有钱(队列空),捡钱被阻塞,协程控制权转移。依靠队列的阻塞和解除阻塞,一个富豪和三个幸运儿可以顺利地分配完富豪手中的钞票。为了让这个过程可以慢到适合观察,可以在富豪抛钱之前,再增加一个随机延时。当然,这个延时不能使用time模块的sleep()函数,而是使用协程模块asyncio的sleep()函数。下面是完整的撒钱-捡钱代码。

import asyncio, random async def rich(q, total): """任性的富豪,随机撒钱""" while total > 0: money = random.randint(10,100) total -= money await q.put(money) # 随机生成[10,100]之间的整数 print('富豪潇洒地抛了%d块钱'%money) await asyncio.sleep(3*random.random()) # 在0-3秒之间随机延时 async def lucky(q, name): """随时可以捡到钱的幸运儿""" while True: money = await q.get() q.task_done() print('%s捡到了%d块钱!'%(name, money)) async def run(): q = asyncio.Queue(1) producers = [asyncio.create_task(rich(q, 300))] consumers = [asyncio.create_task(lucky(q, name)) for name in 'ABC'] await asyncio.gather(*producers,) await q.join() for c in consumers: c.cancel() if __name__ == '__main__': asyncio.run(run())

运行结果如下:

富豪潇洒地抛了42块钱

A捡到了42块钱!

富豪潇洒地抛了97块钱

A捡到了97块钱!

富豪潇洒地抛了100块钱

B捡到了100块钱!

富豪潇洒地抛了35块钱

C捡到了35块钱!

富豪潇洒地抛了17块钱

A捡到了17块钱!

富豪抛完了手中的钱,转身离去

Python 任务调度

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Node.js安装教程(图文版)
下一篇:Linux 进程管理之四大名捕
相关文章