资讯专栏INFORMATION COLUMN

Python进程专题4:进程池Pool

Leo_chen / 1345人阅读

摘要:上一篇文章进程专题继承来创建进程下一篇文章进程专题进程间通信当我们需要创建大量的进程时,利用模块提供的来创建进程。关闭进程池,不再接受进的进程请求,但已经接受的进程还是会继续执行。

上一篇文章:Python进程专题3:继承Process来创建进程
下一篇文章:Python进程专题5:进程间通信

当我们需要创建大量的进程时,利用multiprocessing模块提供的Pool来创建进程。

进程初始化时,会指定一个最大进程数量,当有新的请求需要创建进程时,如果此时进程池还没有到达设置的最大进程数,该进程池就会创建新的进程来处理该请求,并把该进程放到进程池中,如果进程池已经达到最大数量,请求就会等待,知道进程池中进程数量减少,才会新建进程来执行请求。

语法

pool=Pool(numprocess,initializer,initargs)
numproxess:需要创建的进程个数,如果忽略将使用cpu_count()的值。即系统上的CPU数量。
initializer:每个进程启动时都要调用的对象。
initargs:为
initalizer传递的参数。

常用方法

multiprocessing.Pool常用函数解析:

apply_async(要调用的方法,参数列表,关键字参数列表):使用非阻塞方式调用指定方法,并行执行(同时执行)

apply(要调用的方法,参数列表,关键字参数列表):使用阻塞方式调用指定方法,,阻塞方式就是要等上一个进程退出后,下一个进程才开始运行。

close():关闭进程池,不再接受进的进程请求,但已经接受的进程还是会继续执行。

terminate():不管程任务是否完成,立即结束。

join():主进程堵塞(就是不执行join下面的语句),直到子进程结束,注意,该方法必须在close或terminate之后使用。

pool.map(func,iterable,chunksize):将可调用对象func应用给iterable的每一项,然后以列表形式返回结果,
通过将iterable划分为多块,并分配给工作进程,可以并行执行。chunksize指定每块中的项数,
如果数据量较大,可以增大chunksize的值来提升性能。

pool.map_async(func,iterable,chunksize,callback):与map方法不同之处是返回结果是异步的,
如果callback指定,当结果可用时,结果会调用callback。

pool.imap(func,iterable,chunksize):与map()方法的不同之处是返回迭代器而非列表。

pool.imap_unordered(func,iterable,chunksize):与imap()不同之处是:结果的顺序是根据从工作进程接收到的时间而定的。

pool.get(timeout):如果没有设置timeout,将会一直等待结果,
如果设置了timeout,超过timeout将引发multiprocessing.TimeoutError异常。

pool.ready():如果调用完成,返回True

pool.successful():如果调用完成并且没有引发异常,返回True,如果在结果就绪之前调用,jiang引发AssertionError异常。

pool.wait(timeout):等待结果变为可用,timeout为等待时间。

实例1:阻塞与非阻塞对比

#阻塞与非阻塞对比
from multiprocessing import Pool
import os
import time
import random#用来生成随机数

def test1(name):
    print("%s运行中,pid=%d,父进程:%d"%(name,os.getpid(),os.getppid()))
    t_start=time.time()
    #random.random()会生成一个0——1的浮点数
    time.sleep(random.random()*3)
    t_end=time.time()
    print("%s执行时间:%0.2f秒"%(name,t_end-t_start))

pool=Pool(5)#设置线程池中最大线程数量为5
for xx in range(0,7):
    #非阻塞运行
    pool.apply_async(test1,("mark"+str(id),))
print("--start1--")
pool.close()#关闭线程池,关闭后不再接受进的请求
pool.join()#等待进程池所有进程都执行完毕后,开始执行下面语句
print("--end1--")
print("*"*30)
pool=Pool(5)#设置线程池中最大线程数量为5
for xx in range(0,7):
    #阻塞运行
    pool.apply(test1,("mark"+str(id),))
print("--start2--")
pool.close()#关闭线程池,关闭后不再接受进的请求
pool.join()#等待进程池所有进程都执行完毕后,开始执行下面语句
print("--end2--")

结果:

--start1--
mark运行中,pid=28631,父进程:28626
mark运行中,pid=28632,父进程:28626
mark运行中,pid=28633,父进程:28626
mark运行中,pid=28634,父进程:28626
mark运行中,pid=28636,父进程:28626
mark执行时间:0.27秒
mark运行中,pid=28633,父进程:28626
mark执行时间:0.32秒
mark运行中,pid=28634,父进程:28626
mark执行时间:0.18秒
mark执行时间:0.55秒
mark执行时间:1.78秒
mark执行时间:1.92秒
mark执行时间:2.71秒
--end1--
******************************
mark运行中,pid=28647,父进程:28626
mark执行时间:0.70秒
mark运行中,pid=28648,父进程:28626
mark执行时间:1.66秒
mark运行中,pid=28649,父进程:28626
mark执行时间:2.87秒
mark运行中,pid=28650,父进程:28626
mark执行时间:2.68秒
mark运行中,pid=28651,父进程:28626
mark执行时间:1.42秒
mark运行中,pid=28647,父进程:28626
mark执行时间:1.20秒
mark运行中,pid=28648,父进程:28626
mark执行时间:2.01秒
--start2--
--end2--

实例2:利用进程池遍历目录

查看下面实例前,先来熟悉一下我们需要用到的一些知识。

os.walk(top,topdown=true,onerrorNone,followlinks=false):遍历目录地址,返回一个三元组(root,dirs,files)

top:想要遍历的目录

root:当前正在遍历的文件夹地址。

dirs:是一个list:当前文件夹下所有目录的名字(不包括子目录)

files:也是一个list,当前文件夹下所有文件的名字(不包括子目录文件)

topdown:默认为True:优先遍历top目录,为false会优先遍历top的子目录。

onerror:指定一个当方法执行异常时需要调用的方法。

followlinks:默认为True:会遍历目录环境下的快捷方式实际指向目录。

代码:

#遍历目录文件并求取SHA512的摘要值
import os
import multiprocessing
import hashlib

#定义进程大小
POOLSIZE=2 #工作进程的数量
#可以读取的缓冲区大小
BUFSIZE=8196

def mark(filename):
    try:
        f=open(filename,"rb")
    except IOError:
        return None
    digest=hashlib.sha512()
    while True:
        chunk=f.read(BUFSIZE)
        if not chunk:break
        digest.update(chunk)
    f.close()
    return filename,digest.digest()



def build_map(dir):
    #定义进程
    pool=multiprocessing.Pool(POOLSIZE)


    #os.path.join:拼接文件路径
    #根据文件目录和名称拼接
    all_files=(os.path.join(root,name)
               #循环遍历当前目录
               for root,dirs,files in os.walk(dir)
                   #取出当前目录下文件名
                   for name in files)

    #dict方法用于将结果转化成字典
    map=dict(pool.imap_unordered(mark,all_files,20))
    pool.close()
    return map


if __name__=="__main__":
    digest_map=build_map("/Users/zhaolixiang/Desktop/python/test1/进程")
    for item in  digest_map:
        print("文件目录:",item)
        print("SHA512摘要值:",digest_map[item])
    #个数
    print(len(digest_map))

结果

文件目录: /Users/zhaolixiang/Desktop/python/test1/进程/1、fork创建子线程.py
SHA512摘要值: b"x9eYLSxe5xb6xd2xecxd4&xa9xff~m?x87xd2Nxea39Gxe1x8fx9cdx83@x06/Bnxddx1exb5xe0jx10xd1xc9=xfd;y]x8dnR)xbdtxb8xc8xb46rExf8xd7t.xaexbbxe9"
文件目录: /Users/zhaolixiang/Desktop/python/test1/进程/8、JoinableQueue的生产者与消费者.py
SHA512摘要值: b"xfe4x18xee8xd1x97xe7vx86}xe6qnx13xf9Dxf1Xxe3xabx94xebx96xfbxedWx0bOn$x14d/+rx9bx0bxc1xd4x03xadxcbbxf7x8dxd5Ccx03yxd1xb4x801,?,rIXxa28xd7"
文件目录: /Users/zhaolixiang/Desktop/python/test1/进程/10、管道:返回数据.py
SHA512摘要值: b"x85xb1Cxe56x80x1dsx84txf6x95xcbx1d[xdaxe4}n)Y文件目录: /Users/zhaolixiang/Desktop/python/test1/进程/11、进程池.py
SHA512摘要值: b"bBxf4lxccxd5xaetrxc1x816xe5xfc;tx85x86xe5xd3x9e~SH]xe6xcbdxc9xfexe9xfbxfcxeb)Axd2ox8cOxbcx1etxe9xe92^xb5x10xe1xf4xc9\_x0cx8cxa7qxf4(x13Mxbe7"
文件目录: /Users/zhaolixiang/Desktop/python/test1/进程/2、multiprocessing.py
SHA512摘要值: b"mxb2xb5x00M1=qx86xb9Gxf9x02xb1x18xfax08xe7,.xefxff5xebt,nx17xbfB-xc6xc2x9dVxf9x88xa7xd8x1ex9b)x86Nxd5xab x89xa0J}xa9xdcxbcx06mx03xa7x87.x17xcb"x93"
文件目录: /Users/zhaolixiang/Desktop/python/test1/进程/6、验证:put方法会阻塞.py
SHA512摘要值: b"xa3x80x86xf6xc9xb8x0eOB 5xd0x949xfdxc2yx9axc7xcax9dxbc%xa1xe6_xfax84xb6x02$xf7x10xbbxb9Nxfbxdexc8xbe6F xd6x87xacxb2xf5x94x0cxedxe0.xf9Txdax91`;xd7x90x86"
文件目录: /Users/zhaolixiang/Desktop/python/test1/进程/7、closse方法、get方法、put方法简单使用.py
SHA512摘要值: b"lx98wx8fTx15xdfTx19x91^:xc5xa3x8dxf4x1ex9cx91Exe4xe7xbfxecVx1ex1bx19xa0ix96Lxc6xc4r.x9bxecx88xe9rxfeOx8bxdfAx90x7f6?xe7x1d8xa6Nx07xdex8dxb6xe7#x02p"
文件目录: /Users/zhaolixiang/Desktop/python/test1/进程/5、进程池.py
SHA512摘要值: b"xc2xf75x04xbaxa5Xxccx81x88xffxbazxd81@x0bQJ%xbbx15xf3`Qx9a4}xc0x07xa2&axc0x00x0c%xb4[xd2ex18x04x14ytx0cxb0xacx1dxf5xfbxe0xc7xb6$xd2xf1xd9sePx08"
文件目录: /Users/zhaolixiang/Desktop/python/test1/进程/3、multiprocessing.py
SHA512摘要值: b"x04N4Dxd5xab}xcfx03xe6x0fVx0fx910x91xe4x81,xbb-xdfxb36Ixf9!x84Axdf.xf3HVxfdx86:x0bx81<+ex00xd1x17uxf5hxb34FxfexebFxf5x1bxc3x8d`Axa0Ax02x10"
文件目录: /Users/zhaolixiang/Desktop/python/test1/进程/9、管道.py
SHA512摘要值: b"`xdbx8dx90Vx04=x0cxf9x9cxf7{x8fxcax9fxccxb8Dx97xecx82(xd4x9ax84xdb文件目录: /Users/zhaolixiang/Desktop/python/test1/进程/4、继承Process创建进程.py
SHA512摘要值: b"2xc1xa0x1fxd7xd6xb2x1c}x14xdedx8fxdbxedxd0x91mxc1,xb9xdd?TbXx04#2xfcxb8xbfurxabxfcFcx17x18xc7)sYx82x0exea{5x87xf3x8fcxbaPx91r0xefxabLxa8x1ex15"
11

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/42357.html

相关文章

  • Python进程专题3:继承Process来创建进程

    摘要:上一篇文章进程专题创建进程下一篇文章进程专题进程池实例重新方法下面一句是调用父类方法,这一本尽量不要少,因为父类还有很多事情需要在方法内处理重写方法子进程运行中,,父进程子进程运行结束,耗时秒父进程开始执行父进程运行结束,耗时秒结果父进 上一篇文章:Python进程专题2:multiprocessing创建进程下一篇文章:Python进程专题4:进程池Pool 实例: from mu...

    zxhaaa 评论0 收藏0
  • Python进程专题5:进程间通信

    摘要:上一篇文章进程专题进程池下一篇文章进程专题共享数据与同步模块支持的进程间通信主要有两种管道和队列。队列底层使用管道和锁,同时运行支持线程讲队列中的数据传输到底层管道中,来实习进程间通信。 上一篇文章:Python进程专题4:进程池Pool下一篇文章:Python进程专题6:共享数据与同步 multiprocessing模块支持的进程间通信主要有两种:管道和队列。一般来说,发送较少的大...

    eccozhou 评论0 收藏0
  • 【暂时Over】Python 从零开始爬虫(十)给爬虫加速:多线程,多进程

    摘要:限制同时运行线程数使用类就行,在内部管理着一个计数器。当计数器到时,再调用就会阻塞,直到其他线程来调用,这样就限制了同时运行线程的数量。 事前最好了解一下什么是进程,什么是线程,什么是GIL,本文不再赘述,直接介绍模块的使用: 推荐1,推荐2,推荐3,更多自寻 普通的python爬虫是单进程单线程的,这样在遇到大量重复的操作时就只能逐个进行,我们就很难过了。举个栗子:你有1000个...

    wangdai 评论0 收藏0
  • python中简单好用的进程间数据通讯模块multiprocessing.Manager

    摘要:目前开发中有遇到进程间需要共享数据的情况所以研究了下主要会以为例子说明下进程间共享同一个父进程使用说明创建一个对象创建一个创建一个测试程序创建进程池进行测试简单的源码分析这时我们再看一个例子创建一个对象创建一个创建一个测试程序创建进程池进行 目前开发中有遇到进程间需要共享数据的情况. 所以研究了下multiprocessing.Manager, 主要会以dict为例子, 说明下进程间共...

    jeyhan 评论0 收藏0
  • Python 中 Ctrl+C 不能终止 Multiprocessing Pool 的解决方案

    摘要:解决方法有两种。代码然而这段代码只有在运行在处的时候才能用中断,即前你按有效,一旦后则完全无效建议先确认是否真的需要用到多进程,如果是多的程序建议用多线程或协程,计算特别多则用多进程。 本文理论上对multiprocessing.dummy的Pool同样有效。 python2.x中multiprocessing提供的基于函数进程池,join后陷入内核态,按下ctrl+c不能停止所有的进...

    lmxdawn 评论0 收藏0

发表评论

0条评论

Leo_chen

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<