资讯专栏INFORMATION COLUMN

从 async_call_method() 开始探索

light / 1032人阅读

摘要:定义一个闭包相关官方文档是一种用于并发编程的模式,首次引入是在的模块。对象是一个对于异步返回结果的占位符。一个对象包含了一次异步操作的结果。在同步编程中,被用于等待从一个线程池或进程池里返回的结果在中,通常被用在或者在一个函数中它们。

from tornado.concurrent import Future

def async_call_method(fun, *args, **kwargs):
    future = Future()
    // 定义一个闭包 finish
    def finish():
        try:
            result = fun(*args, **kwargs)
            if future._callbacks:
                IOLoop.current().add_callback(future.set_result, result)
            else:
                future.set_result(result)
        except:
            if future._callbacks:
                IOLoop.current().add_callback(future.set_exc_info, sys.exc_info())
            else:
                future.set_exc_info(sys.exc_info())
    child_gr = greenlet.greenlet(finish)
    child_gr.switch()
    return future
tornado 相关官方文档

Future 是一种用于并发编程的模式,首次引入是在 python 3.2 的 concurrent.futures 模块。

Future 对象是一个对于异步返回结果的占位符。

一个 Future 对象包含了一次异步操作的结果。在同步编程中,Futures 被用于等待从一个线程池或进程池里返回的结果;在 tornado 中,future 通常被用在 IOLoop.add_future 或者在一个 gen.coroutine 函数中 yielding 它们。

tornado.concurrent.Future 和 concurrent.futures.Future 相似,但是其不是线程安全的(因此,在单线程事件循环应用在速度更快)

async_call_method() 的来源

经过一番搜索,查询到 async_call_method() 这个函数来自于 github.com/snower/TorMySQL.

经过对该项目代码的仔细阅读,我发现了它是如何实现了 mysql 的异步操作。

tormysql.client.connect()
...
def connect(self):
    # 入口函数
    # 设置 future 占位符
    future = Future()

    # 定义回调函数
    def on_connected(connection_future):
        if connection_future._exc_info is None:
            future.set_result(self)
        else:
            future.set_exc_info(connection_future.exc_info())
    self._connection = Connection(defer_connect = True, *self._args, **self._kwargs)
    self._connection.set_close_callback(self.connection_close_callback)

    # 用 greenlet 包装 self._connection.connect 并返回 future
    # 要使 async_call_method 包装后的函数有非阻塞的特性,必须达成以下要求
    # 1. 函数可以访问 父greenlet
    # 2. 函数中所有 IO 操作均支持非阻塞(比如: 非阻塞由 socket 的 non-blocking 特性支持)
    # 3. 函数中执行 IO 操作后立即将运行权交还给主函数(父greenlet, 如:ioloop 时间循环)(greenlet.switch)
    # 4. 函数中所有 IO 操作均返回 Future
    # 5. Future.callback 运行后立即将运行权(greenlet.switch)返回给当前函数(greenlet.current),完成当前函数的剩余部分
    connection_future = async_call_method(self._connection.connect)

    # 当 connection_future 状态为 finished, 调用 on_connected()
    # finished => 调用 connection_future.set_result()
    IOLoop.current().add_future(connection_future, on_connected)
    return future
...
self._connection.connect()
...
        # IOStream 基于 tornado.iostream.IOStream
        sock = IOStream(sock)
        sock.set_close_callback(self.stream_close_callback)

        # getcurrent() 返回包装了当前函数的 greenlet
        child_gr = greenlet.getcurrent()
        # main 是指 父greenlet(主函数, 时间循环?)
        main = child_gr.parent
        assert main is not None, "Execut must be running in child greenlet"
...
    def connected(future):
        if self._loop_connect_timeout:
            self._loop.remove_timeout(self._loop_connect_timeout)
            self._loop_connect_timeout = None
    
        if future._exc_info is not None:
            child_gr.throw(future.exception())
        else:
            self._sock = sock
            # 将运行权交还给当前 greenlet
            child_gr.switch()
    
    # IOStream.connect 是 no-blocking 的 socket 操作
    future = sock.connect(address)
    # 给 sock.connect 操作添加回调函数
    self._loop.add_future(future, connected)
    # 然后把运行权交还给 父greenlet
    # 直到连接成功,connected() 中会将运行权交还给 当前greenlet
    main.switch()
...
结论

要使 async_call_method 包装后的函数有非阻塞的特性,必须达成以下要求

函数可以访问 父greenlet

函数中所有 IO 操作均支持非阻塞(比如: 非阻塞由 socket 的 non-blocking 特性支持)

函数中执行 IO 操作后立即将运行权交还给主函数(父greenlet, 如:ioloop 时间循环)(greenlet.switch)

函数中所有 IO 操作均返回 Future

Future.callback 运行后立即将运行权(greenlet.switch)返回给当前函数(greenlet.current),完成当前函数的剩余部分

async_call_method 包装后的函数要实现非阻塞,最终还是依赖于 socket 的非阻塞
=> socket.setblocking(False)

github.com/snower/TorMySQL 中于 mysql 的交互全部通过 IOStream() 的以下方法实现:

* def _handle_events(self, fd, events):  # ioloop 在事件发生时调用 _handle_events
* def _handle_connect(self): 
* def _handle_read(self):  # 当事件为读取事件时,读取数据到 buffer, 然后 future.set_result(data)
* def _handle_write(self):  # 当事件为写事件时,读取数据到 buffer, 然后 future.set_result(data)
* def read(self, num_bytes):
* def write(self, data):

通过对上述方法进行 设置 future 占位符,并基于 non-blocking socket 实现上述方法的非阻塞。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/40725.html

相关文章

  • 基于Tableau探索分析世界银行提供的关于科学技术的数据

    摘要:并且在年中国第一次在专利总量超过美国,跃升至第一位,成为提交国际专利申请量最多的国家,从而打破了美国长达年的专利申请霸主地位。中国的技术创新仍处于大部分技术靠从外国购买技术专利的阶段。 要求                 对 CSE512: Data Visualization 完成探索性...

    alighters 评论0 收藏0
  • [译] 解密 Uber 数据部门的数据可视化最佳实践

    摘要:让我们看看都做了哪些工作可视化分析增强数据可操作性测试平台的表格和置信区间可视化可视化分析主要都是由抽象数据可视化组成的。大多数有效的可视化分析在这种情况下都是关于报告仪表盘实时分析的图标和网络图。 showImg(https://segmentfault.com/img/remote/1460000006771644); 概述 在2015年初,我们在Uber规划了一个官方的数据科学团...

    darkbug 评论0 收藏0
  • [译] 解密 Uber 数据部门的数据可视化最佳实践

    摘要:让我们看看都做了哪些工作可视化分析增强数据可操作性测试平台的表格和置信区间可视化可视化分析主要都是由抽象数据可视化组成的。大多数有效的可视化分析在这种情况下都是关于报告仪表盘实时分析的图标和网络图。 showImg(https://segmentfault.com/img/remote/1460000006771644); 概述 在2015年初,我们在Uber规划了一个官方的数据科学团...

    susheng 评论0 收藏0
  • 简洁明了探索浏览器Event loop

    摘要:前段时间我对于浏览器中的和哪个先执行有所困惑,苦于搜索也没有发现很明确的答案,于是决定深入探索浏览器,现有所愚见,想与大家分享,希望能帮助到那些还在爬坑的人。浏览器端中的异步队列有两种队列和队列。浏览器会不断从队列中按顺序取执行。 前段时间我对于浏览器Event loop中的MacroTask和MicroTask哪个先执行有所困惑,苦于搜索也没有发现很明确的答案,于是决定深入探索浏览器...

    DrizzleX 评论0 收藏0

发表评论

0条评论

light

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<