摘要:很简单,这个模块实现了开辟一块共享内存空间,就好比中的方法一样,有兴趣的同学可以去查阅。查了下资料,返回的对象控制了一个进程,可用于多进程之间的安全通信,其支持的类型有和等。
有关于 multiprocessing 中共享变量的问题
现在的cpu都很强大,比方我用的至强2620有24核可以同时工作,并行执行进程程序。这在计算密集型的程序是很需要的,如沙漠中的绿洲,令人重获新生。那么,问题接踵而来,python中多进程能否共享一个变量,因为我需要更新矩阵。
我的办法是用list存储三元组信息,信息包括矩阵位置以及value。那么首先我们设定一个全局变量叫result_list就可以了?
答案是NO.
进程间共享变量就需要独立开辟一块内存空间或是文件共享,在python里很方面,直接用一个模块可以解决这个问题,那就是 multiprocessing 里的 Manager。当然,这是针对我们需要的是list而言,如果我们只是共享一个简单的变量如一个整数,可以直接用 multiprocessing 里的 value。
下面的实例是怎么去共享变量。
from multiprocessing import Process, Manager, Lock
import os
lock = Lock()
manager = Manager()
sum = manager.list()
def testFunc(cc, lock):
with lock:
sum.append(1)
if __name__ == "__main__":
threads = []
for ll in range(1000):
t = Process(target=testFunc, args=(1, lock))
t.daemon = True
threads.append(t)
sum = manager.list()
for i in range(len(threads)):
threads[i].start()
for j in range(len(threads)):
threads[j].join()
print "------------------------"
print "process id:", os.getpid()
print sum
很简单,manager这个模块实现了开辟一块共享内存空间,就好比c中的 shmget 方法一样,有兴趣的同学可以去查阅。 传送门
这样简单的处理并不能满足我。
首先,我需要一个线程池,当然,实现线程池也是非常简单的。但是就会遇到一个问题。
lock = multiprocessing.Lock()
pool = multiprocessing.Pool(processes=3)
for i in range(0,3):
pool.apply_async(child_worker, ((my_parameter, lock),))
pool.close()
pool.join()
以上代码执行时会出错。
RuntimeError: Lock objects should only be shared between processes through inheritance
查了下资料,multiprocessing.Manager()返回的manager对象控制了一个server进程,可用于多进程之间的安全通信,其支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array等。
所以代码修改成这样后就可以正常运行了:
lock = multiprocessing.Manager().Lock()
pool = multiprocessing.Pool(processes=3)
for i in range(0,3):
pool.apply_async(child_worker, ((my_parameter, lock),))
pool.close()
pool.join()
所以,lock的问题解决了,真是厉害我们现在可以充分地(往死里)用我们的电脑了。
But,还不够,我想要多次执行这个并行化计算sum的函数。也就是说我需要每次去清空result_list的内容,这个可是一个很关键的细节,因为这个需要明白一个细节,你不能用sum = [] 这样的方式去重置,我个人认为是局部变量的原因,我后来找到了del sum[:]的方法,解决了我的大问题,so,final version 如下。
from multiprocessing import Process, Manager,Pool
import os
lock = Manager().Lock()
manager = Manager()
sum = manager.list()
def testFunc(cc, lock):
with lock:
sum.append(1)
# 配合 multiprocessing pool 对多参数的要求添加的函数
def multi_test(args):
testFunc(*args)
def testing():
threads = []
_pool = Pool(24)
del sum[:]
lst_vars = []
for shot in range(1000):
lst_vars.append((1,lock))
_pool.map(multi_test, lst_vars)
_pool.close()
_pool.join()
print "------------------------"
print "process id:", os.getpid()
print sum
if __name__ == "__main__":
testing()
testing()
这些实例是我方便写博客想的,其实我是在写一个大工程遇到了这些个问题,忙的我焦头烂额,但是总结出了人生经验,希望帮到你,让你多活几年~~
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/44435.html
摘要:首发于我的博客线程池进程池网络编程之同步异步阻塞非阻塞后端掘金本文为作者原创,转载请先与作者联系。在了解的数据结构时,容器可迭代对象迭代器使用进行并发编程篇二掘金我们今天继续深入学习。 Python 算法实战系列之栈 - 后端 - 掘金原文出处: 安生 栈(stack)又称之为堆栈是一个特殊的有序表,其插入和删除操作都在栈顶进行操作,并且按照先进后出,后进先出的规则进行运作。 如...
摘要:正文总所周知,和根本就是两个东西,每次因为这个兼容性的问题都会把自己搞疯。提供了模块,把下一个新版本的特性导入到当前版本,于是我们就可以在当前版本中测试一些新版本的特性。传送门不多,才个。 写在前面 我是在学习cs231n的assignment3的课程,发现里面的代码大量频繁出现了这个库,那我就很奇怪了,为什么有个future这个奇怪名字的库会出现呢?到底这个库又有什么用?下面就让我为...
阅读 2157·2021-09-30 09:47
阅读 3988·2021-09-22 15:05
阅读 3129·2021-08-30 09:44
阅读 3864·2019-08-30 15:55
阅读 1621·2019-08-30 13:08
阅读 1573·2019-08-29 16:40
阅读 822·2019-08-29 12:45
阅读 1607·2019-08-29 11:25