摘要:序言最近闲暇无事阅读了一下的源码对整体的结构有了初步认识与大家分享不知道为什么右边的目录一直出不来非常不舒服不如移步到吧是的核心模块也是个调度模块各种异步事件都是由他调度的所以必须弄清他的执行逻辑源码分析而的核心部分则是这个循环内部的逻辑贴
序言
</>复制代码
最近闲暇无事,阅读了一下tornado的源码,对整体的结构有了初步认识,与大家分享
不知道为什么右边的目录一直出不来,非常不舒服.
不如移步到oschina吧....[http://my.oschina.net/abc2001x/blog/476349][1]
ioloop
</>复制代码
`ioloop`是`tornado`的核心模块,也是个调度模块,各种异步事件都是由他调度的,所以必须弄清他的执行逻辑
源码分析
</>复制代码
而`ioloop`的核心部分则是 `while True`这个循环内部的逻辑,贴上他的代码如下
</>复制代码
def start(self):
if self._running:
raise RuntimeError("IOLoop is already running")
self._setup_logging()
if self._stopped:
self._stopped = False
return
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running = True
old_wakeup_fd = None
if hasattr(signal, "set_wakeup_fd") and os.name == "posix":
try:
old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
if old_wakeup_fd != -1:
signal.set_wakeup_fd(old_wakeup_fd)
old_wakeup_fd = None
except ValueError:
old_wakeup_fd = None
try:
while True:
with self._callback_lock:
callbacks = self._callbacks
self._callbacks = []
due_timeouts = []
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[0].callback is None:
heapq.heappop(self._timeouts)
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))
else:
break
if (self._cancellations > 512
and self._cancellations > (len(self._timeouts) >> 1)):
self._cancellations = 0
self._timeouts = [x for x in self._timeouts
if x.callback is not None]
heapq.heapify(self._timeouts)
for callback in callbacks:
self._run_callback(callback)
for timeout in due_timeouts:
if timeout.callback is not None:
self._run_callback(timeout.callback)
callbacks = callback = due_timeouts = timeout = None
if self._callbacks:
poll_timeout = 0.0
elif self._timeouts:
poll_timeout = self._timeouts[0].deadline - self.time()
poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
else:
poll_timeout = _POLL_TIMEOUT
if not self._running:
break
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
if errno_from_exception(e) == errno.EINTR:
continue
else:
raise
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL,
self._blocking_signal_threshold, 0)
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None
finally:
self._stopped = False
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
IOLoop._current.instance = old_current
if old_wakeup_fd is not None:
signal.set_wakeup_fd(old_wakeup_fd)
</>复制代码
除去注释,代码其实没多少行. 由while 内部代码可以看出ioloop主要由三部分组成:
1.回调 callbacks
他是ioloop回调的基础部分,通过IOLoop.instance().add_callback()添加到self._callbacks
他们将在每一次loop中被运行.
主要用途是将逻辑分块,在适合时机将包装好的callback添加到self._callbacks让其执行.
例如ioloop中的add_future
</>复制代码
def add_future(self, future, callback):
"""Schedules a callback on the ``IOLoop`` when the given
`.Future` is finished.
The callback is invoked with one argument, the
`.Future`.
"""
assert is_future(future)
callback = stack_context.wrap(callback)
future.add_done_callback(
lambda future: self.add_callback(callback, future))
future对象得到result的时候会调用future.add_done_callback添加的callback,再将其转至ioloop执行
2.定时器 due_timeouts这是定时器,在指定的事件执行callback.
跟1中的callback类似,通过IOLoop.instance().add_callback
在每一次循环,会计算timeouts回调列表里的事件,运行已到期的callback.
当然不是无节操的循环.
因为poll操作会阻塞到有io操作发生,所以只要计算最近的timeout,
然后用这个时间作为self._impl.poll(poll_timeout) 的 poll_timeout ,
就可以达到按时运行了
但是,假设poll_timeout的时间很大时,self._impl.poll一直在堵塞中(没有io事件,但在处理某一个io事件),
那添加刚才1中的callback不是要等很久才会被运行吗? 答案当然是不会.
ioloop中有个waker对象,他是由两个fd组成,一个读一个写.
ioloop在初始化的时候把waker绑定到epoll里了,add_callback时会触发waker的读写.
这样ioloop就会在poll中被唤醒了,接着就可以及时处理timeout callback了
用这样的方式也可以自己封装一个小的定时器功能玩玩
3.io事件的event loop处理epoll事件的功能
通过IOLoop.instance().add_handler(fd, handler, events)绑定fd event的处理事件
在httpserver.listen的代码内,
netutil.py中的netutil.py的add_accept_handler绑定accept handler处理客户端接入的逻辑
如法炮制,其他的io事件也这样绑定,业务逻辑的分块交由ioloop的callback和future处理
关于epoll的用法的内容.详情见我第一篇文章吧,哈哈
总结ioloop由callback(业务分块), timeout callback(定时任务) io event(io传输和解析) 三块组成,互相配合完成异步的功能,构建gen,httpclient,iostream等功能
串联大致的流程是,tornado 绑定io event,处理io传输解析,传输完成后(结合Future)回调(callback)业务处理的逻辑和一些固定操作 . 定时器则是较为独立的模块
Futrue个人认为Future是tornado仅此ioloop重要的模块,他贯穿全文,所有异步操作都有他的身影
顾名思义,他主要是关注日后要做的事,类似jquery的Deferred吧
一般的用法是通过ioloop的add_future定义future的done callback,
当future被set_result的时候,future的done callback就会被调用.
从而完成Future的功能.
具体可以参考gen.coroutine的实现,本文后面也会讲到
他的组成不复杂,只有几个重要的方法
最重要的是 add_done_callback , set_result
tornado用Future和ioloop,yield实现了gen.coroutine
1. add_done_callback跟ioloop的callback类似 , 存储事件完成后的callback在self._callbacks里
</>复制代码
def add_done_callback(self, fn):
if self._done:
fn(self)
else:
self._callbacks.append(fn)
2.set_result
设置事件的结果,并运行之前存储好的callback
</>复制代码
def set_result(self, result):
self._result = result
self._set_done()
def _set_done(self):
self._done = True
for cb in self._callbacks:
try:
cb(self)
except Exception:
app_log.exception("Exception in callback %r for %r",
cb, self)
self._callbacks = None
为了验证之前所说的,上一段测试代码
</>复制代码
#! /usr/bin/env python
#coding=utf-8
import tornado.web
import tornado.ioloop
from tornado.gen import coroutine
from tornado.concurrent import Future
def test():
def pp(s):
print s
future = Future()
iol = tornado.ioloop.IOLoop.instance()
print "init future %s"%future
iol.add_future(future, lambda f: pp("ioloop callback after future done,future is %s"%f))
#模拟io延迟操作
iol.add_timeout(iol.time()+5,lambda:future.set_result("set future is done"))
print "init complete"
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
test()
运行结果:
gen.coroutine接着继续延伸,看看coroutine的实现
gen.coroutine实现的功能其实是将原来的callback的写法,用yield的写法代替. 即以yield为分界,将代码分成两部分.
如:
</>复制代码
#! /usr/bin/env python
#coding=utf-8
import tornado.ioloop
from tornado.gen import coroutine
from tornado.httpclient import AsyncHTTPClient
@coroutine
def cotest():
client = AsyncHTTPClient()
res = yield client.fetch("http://www.segmentfault.com/")
print res
if __name__ == "__main__":
f = cotest()
print f #这里返回了一个future哦
tornado.ioloop.IOLoop.instance().start()
运行结果:
源码分析接下来分析下coroutine的实现
</>复制代码
def _make_coroutine_wrapper(func, replace_callback):
@functools.wraps(func)
def wrapper(*args, **kwargs):
future = TracebackFuture()
if replace_callback and "callback" in kwargs:
callback = kwargs.pop("callback")
IOLoop.current().add_future(
future, lambda future: callback(future.result()))
try:
result = func(*args, **kwargs)
except (Return, StopIteration) as e:
result = getattr(e, "value", None)
except Exception:
future.set_exc_info(sys.exc_info())
return future
else:
if isinstance(result, types.GeneratorType):
try:
orig_stack_contexts = stack_context._state.contexts
yielded = next(result)
if stack_context._state.contexts is not orig_stack_contexts:
yielded = TracebackFuture()
yielded.set_exception(
stack_context.StackContextInconsistentError(
"stack_context inconsistency (probably caused "
"by yield within a "with StackContext" block)"))
except (StopIteration, Return) as e:
future.set_result(getattr(e, "value", None))
except Exception:
future.set_exc_info(sys.exc_info())
else:
Runner(result, future, yielded)
try:
return future
finally:
future = None
future.set_result(result)
return future
return wrapper
如源码所示,func运行的结果是GeneratorType ,yielded = next(result),
运行至原函数的yield位置,返回的是原函数func内部 yield 右边返回的对象(必须是Future或Future的list)给yielded.
经过Runner(result, future, yielded) 对yielded进行处理.
在此就 贴出Runner的代码了.
Runner初始化过程,调用handle_yield, 查看yielded是否已done了,否则add_future运行Runner的run方法,
run方法中如果yielded对象已完成,用对它的gen调用send,发送完成的结果.
所以yielded在什么地方被set_result非常重要,
当被set_result的时候,才会send结果给原func,完成整个异步操作
详情可以查看tornado 中重要的对象 iostream,源码中iostream的 _handle_connect,如此设置了连接的result.
</>复制代码
def _handle_connect(self):
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
self.error = socket.error(err, os.strerror(err))
if self._connect_future is None:
gen_log.warning("Connect error on fd %s: %s",
self.socket.fileno(), errno.errorcode[err])
self.close()
return
if self._connect_callback is not None:
callback = self._connect_callback
self._connect_callback = None
self._run_callback(callback)
if self._connect_future is not None:
future = self._connect_future
self._connect_future = None
future.set_result(self)
self._connecting = False
最后贴上一个简单的测试代码,演示coroutine,future的用法
</>复制代码
import tornado.ioloop
from tornado.gen import coroutine
from tornado.concurrent import Future
@coroutine
def asyn_sum(a, b):
print("begin calculate:sum %d+%d"%(a,b))
future = Future()
future2 = Future()
iol = tornado.ioloop.IOLoop.instance()
print future
def callback(a, b):
print("calculating the sum of %d+%d:"%(a,b))
future.set_result(a+b)
iol.add_timeout(iol.time()+3,lambda f:f.set_result(None),future2)
iol.add_timeout(iol.time()+3,callback, a, b)
result = yield future
print("after yielded")
print("the %d+%d=%d"%(a, b, result))
yield future2
print "after future2"
def main():
f = asyn_sum(2,3)
print ""
print f
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
main()
运行结果:
为什么代码中个yield都起作用了? 因为Runner.run里,最后继续用handle_yield处理了send后返回的yielded对象,意思是func里可以有n干个yield操作
</>复制代码
if not self.handle_yield(yielded):
return
总结
至此,已完成tornado中重要的几个模块的流程,其他模块也是由此而来.写了这么多,越写越卡,就到此为止先吧,
最后的最后的最后啊~~~~~~好想有份工作 和女朋友啊~~~~~
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/37553.html
摘要:学习笔记七数学形态学关注的是图像中的形状,它提供了一些方法用于检测形状和改变形状。学习笔记十一尺度不变特征变换,简称是图像局部特征提取的现代方法基于区域图像块的分析。本文的目的是简明扼要地说明的编码机制,并给出一些建议。 showImg(https://segmentfault.com/img/bVRJbz?w=900&h=385); 前言 开始之前,我们先来看这样一个提问: pyth...
摘要:软件开发者通常依据特定的框架实现更为复杂的商业运用和业务逻辑。所有,做开发,要用一个框架。的性能是相当优异的,因为它师徒解决一个被称之为问题,就是处理大于或等于一万的并发。 One does not live by bread alone,but by every word that comes from the mouth of God --(MATTHEW4:4) 不...
摘要:初步分析提升可从两方面入手,一个是增加并发数,其二是减少平均响应时间。大部分的时间花在系统与数据库的交互上,到这,便有了一个优化的主题思路最大限度的降低平均响应时间。不要轻易否定一项公认的技术真理,要拿数据说话。 本文最早发表于个人博客:PylixmWiki 应项目的需求,我们使用tornado开发了一个api系统,系统开发完后,在8核16G的虚机上经过压测qps只有200+。与我们当...
摘要:特别提醒,看官不要自宫,因为本教程不是辟邪剑谱,也不是葵花宝典,撰写本课程的人更是生理健全者。直到目前,科学上尚未有证实或证伪自宫和写程序之间是否存在某种因果关系。和是中用的最多的方法啦。 Do not store up for yourselves treasures on earth, where moth and rust consume and where thieves...
阅读 2834·2021-11-17 09:33
阅读 3175·2021-10-25 09:44
阅读 1368·2021-10-11 10:59
阅读 2571·2021-09-27 13:34
阅读 2976·2021-09-07 10:19
阅读 2246·2019-08-29 18:46
阅读 1606·2019-08-29 12:55
阅读 1002·2019-08-23 17:11