资讯专栏INFORMATION COLUMN

python中简单好用的进程间数据通讯模块multiprocessing.Manager

jeyhan / 2698人阅读

摘要:目前开发中有遇到进程间需要共享数据的情况所以研究了下主要会以为例子说明下进程间共享同一个父进程使用说明创建一个对象创建一个创建一个测试程序创建进程池进行测试简单的源码分析这时我们再看一个例子创建一个对象创建一个创建一个测试程序创建进程池进行

目前开发中有遇到进程间需要共享数据的情况. 所以研究了下multiprocessing.Manager, 主要会以dict为例子, 说明下进程间共享(同一个父进程).
dict使用说明
import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
# 3. 创建一个测试程序
def test(idx, test_dict):
    test_dict[idx] = idx
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

too simple.

简单的源码分析

这时我们再看一个例子

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
temp_dict["test"] = {}
# 3. 创建一个测试程序
def test(idx, test_dict):
    test_dict["test"][idx] = idx
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

可以看到输出结果是奇怪的{"test": {}}
如果我们简单修改一下代码

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
temp_dict["test"] = {}
# 3. 创建一个测试程序
def test(idx, test_dict):
    row = test_dict["test"]
    row[idx] = idx
    test_dict["test"] = row
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

这时输出结果就符合预期了.

为了了解这个现象背后的原因, 我简单去读了一下源码, 主要有以下几段代码很关键.

def Manager():
    """
    Returns a manager associated with a running server process

    The managers methods such as `Lock()`, `Condition()` and `Queue()`
    can be used to create shared objects.
    """
    from multiprocessing.managers import SyncManager
    m = SyncManager()
    m.start()
    return m
    
...
    def start(self, initializer=None, initargs=()):
        """
        Spawn a server process for this manager object
        """
        assert self._state.value == State.INITIAL

        if initializer is not None and not hasattr(initializer, "__call__"):
            raise TypeError("initializer must be a callable")

        # pipe over which we will retrieve address of server
        reader, writer = connection.Pipe(duplex=False)

        # spawn process which runs a server
        self._process = Process(
            target=type(self)._run_server,
            args=(self._registry, self._address, self._authkey,
                  self._serializer, writer, initializer, initargs),
            )
        ident = ":".join(str(i) for i in self._process._identity)
        self._process.name = type(self).__name__  + "-" + ident
        self._process.start()
...

上面代码可以看出, 当我们声明了一个Manager对象的时候, 程序实际在其他进程启动了一个server服务, 这个server是阻塞的, 以此来实现进程间数据安全.
我的理解就是不同进程之间操作都是互斥的, 一个进程向server请求到这部分数据, 再把这部分数据修改, 返回给server, 之后server再去处理其他进程的请求.

回到上面的奇怪现象上, 这个操作test_dict["test"][idx] = idx实际上在拉取到server上的数据后进行了修改, 但并没有返回给server, 所以temp_dict的数据根本没有变化. 在第二段正常代码, 就相当于先向服务器请求数据, 再向服务器传送修改后的数据. 这样就可以解释这个现象了.

进程间数据安全

这个时候如果出现一种情况, 两个进程同时请求了一份相同的数据, 分别进行修改, 再提交到server上会怎么样呢? 那当然是数据产生异常. 基于此, 我们需要Manager的另一个对象, Lock(). 这个对象也不难理解, Manager本身就是一个server, dict跟lock都来自于这个server, 所以当你lock住的时候, 其他进程是不能取到数据, 自然也不会出现上面那种异常情况.

代码示例:

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
lock = manager.Lock()
temp_dict["test"] = {}
# 3. 创建一个测试程序
def test(idx, test_dict, lock):
    lock.acquire()
    row = test_dict["test"]
    row[idx] = idx
    test_dict["test"] = row
    lock.release()
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict, lock))
pool.close()
pool.join()
print(temp_dict)

切忌不要进程里自己新建lock对象, 要使用统一的lock对象.

終わり。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/43443.html

相关文章

  • python进阶笔记【1】--- 多进程

    摘要:很简单,这个模块实现了开辟一块共享内存空间,就好比中的方法一样,有兴趣的同学可以去查阅。查了下资料,返回的对象控制了一个进程,可用于多进程之间的安全通信,其支持的类型有和等。 有关于 multiprocessing 中共享变量的问题 现在的cpu都很强大,比方我用的至强2620有24核可以同时工作,并行执行进程程序。这在计算密集型的程序是很需要的,如沙漠中的绿洲,令人重获新生。那么,问...

    Wildcard 评论0 收藏0
  • Python进程专题7:托管对象

    摘要:连接带远程管理器对象,该对象的地址在构造函数中支出。在当前进程中运行管理器服务器。启动一个单的子进程,并在该子进程中启动管理器服务器。如果无法序列号对象将引发异常。 上一篇文章:Python进程专题6:共享数据与同步下一篇文章:Python进程专题8:分布集群的消息传递 进程不支持共享对象,上面描述的创建共享值和数组,但都是指定的特殊类型,对高级的Python对象(如:字典、列表、用...

    DevYK 评论0 收藏0
  • 实战案例分享:利用Python实现多任务进程

    摘要:效率高当然,对于爬虫这种密集型任务来说,多线程和多进程影响差别并不大。对于计算密集型任务来说,的多进程相比多线程,其多核运行效率会有成倍的提升。 一、进程介绍 进程...

    MudOnTire 评论0 收藏0
  • python并发4:使用thread处理并发

    摘要:如果某线程并未使用很多操作,它会在自己的时间片内一直占用处理器和。在中使用线程在和等大多数类系统上运行时,支持多线程编程。守护线程另一个避免使用模块的原因是,它不支持守护线程。 这一篇是Python并发的第四篇,主要介绍进程和线程的定义,Python线程和全局解释器锁以及Python如何使用thread模块处理并发 引言&动机 考虑一下这个场景,我们有10000条数据需要处理,处理每条...

    joywek 评论0 收藏0
  • Python 并发编程

    摘要:本文最先发布在博客这篇文章将讲解并发编程的基本操作。并发是指能够多任务处理,并行则是是能够同时多任务处理。虽然自带了很好的类库支持多线程进程编程,但众所周知,因为的存在,很难做好真正的并行。 本文最先发布在博客:https://blog.ihypo.net/151628... 这篇文章将讲解 Python 并发编程的基本操作。并发和并行是对孪生兄弟,概念经常混淆。并发是指能够多任务处...

    happen 评论0 收藏0

发表评论

0条评论

jeyhan

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<