资讯专栏INFORMATION COLUMN

SocketServer 源码分析

Eric / 2463人阅读

摘要:版权出现则重新调用注册函数。中实例化,调用用户定义的函数服务循环监听端口处理请求调用监视请求,处理异常有请求进来停止循环通知外部,循环已经退出注意的用法,只设置一次,避免使用进行频繁的设置清除。

SocketServer.py

</>复制代码

  1. Creating network servers.
contents

SocketServer.py

contents

file head

BaseServer

BaseServer.serve_forever

BaseServer.shutdown

BaseServer.handle_request

BaseServer._handle_request_noblock

BaseServer Overridden functions

TCPServer

UDPServer

ForkingMixIn

ThreadingMixIn

BaseRequestHandler

StreamRequestHandler

DatagramRequestHandler

版权

file head

</>复制代码

  1. __version__ = "0.4"
  2. import socket
  3. import select
  4. import sys
  5. import os
  6. import errno
  7. try:
  8. import threading
  9. except ImportError:
  10. import dummy_threading as threading
  11. __all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
  12. "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
  13. "StreamRequestHandler","DatagramRequestHandler",
  14. "ThreadingMixIn", "ForkingMixIn"]
  15. if hasattr(socket, "AF_UNIX"):
  16. __all__.extend(["UnixStreamServer","UnixDatagramServer",
  17. "ThreadingUnixStreamServer",
  18. "ThreadingUnixDatagramServer"])
  19. # 出现 EINTR 则重新调用
  20. def _eintr_retry(func, *args):
  21. """restart a system call interrupted by EINTR"""
  22. while True:
  23. try:
  24. return func(*args)
  25. except (OSError, select.error) as e:
  26. if e.args[0] != errno.EINTR:
  27. raise
BaseServer

RequestHandlerClass 注册 handle 函数。
finish_request 中实例化,调用用户定义的 handle 函数

</>复制代码

  1. class BaseServer:
  2. timeout = None
  3. def __init__(self, server_address, RequestHandlerClass):
  4. """Constructor. May be extended, do not override."""
  5. self.server_address = server_address
  6. self.RequestHandlerClass = RequestHandlerClass
  7. self.__is_shut_down = threading.Event()
  8. self.__shutdown_request = False
  9. def server_activate(self):
  10. """Called by constructor to activate the server.
  11. May be overridden.
  12. """
  13. pass
BaseServer.serve_forever

服务循环

监听端口

处理请求

</>复制代码

  1. def serve_forever(self, poll_interval=0.5):
  2. """Handle one request at a time until shutdown.
  3. Polls for shutdown every poll_interval seconds. Ignores
  4. self.timeout. If you need to do periodic tasks, do them in
  5. another thread.
  6. """
  7. self.__is_shut_down.clear()
  8. try:
  9. while not self.__shutdown_request:
  10. # 调用 select 监视请求,处理 EINTR 异常
  11. r, w, e = _eintr_retry(select.select, [self], [], [],
  12. poll_interval)
  13. # 有请求进来
  14. if self in r:
  15. self._handle_request_noblock()
  16. finally:
  17. self.__shutdown_request = False
  18. self.__is_shut_down.set()
BaseServer.shutdown

停止 serve_forever 循环.
__is_shut_down 通知外部,循环已经退出
注意 threading.Event() 的用法,只设置一次,避免使用 Event 进行频繁的设置/清除。
需要在与 serve_forever 不同的线程中调用.
因为调用 shutdown 后需要 wait 信号量,程序会 block,block 后 serve_forever 无法执行
serve_forever 收到请求后才能退出设置信号量

注意
self.__shutdown_request 的读写操作,属于原子操作,在多线程中使用是安全的

</>复制代码

  1. def shutdown(self):
  2. """Stops the serve_forever loop.
  3. Blocks until the loop has finished. This must be called while
  4. serve_forever() is running in another thread, or it will
  5. deadlock.
  6. """
  7. self.__shutdown_request = True
  8. self.__is_shut_down.wait()
BaseServer.handle_request

和 serve_forever 并列的函数
如果不调用 server_forever, 在外面循环调用 handle_request

</>复制代码

  1. # The distinction between handling, getting, processing and
  2. # finishing a request is fairly arbitrary. Remember:
  3. #
  4. # - handle_request() is the top-level call. It calls
  5. # select, get_request(), verify_request() and process_request()
  6. # - get_request() is different for stream or datagram sockets
  7. # - process_request() is the place that may fork a new process
  8. # or create a new thread to finish the request
  9. # - finish_request() instantiates the request handler class;
  10. # this constructor will handle the request all by itself
  11. def handle_request(self):
  12. """Handle one request, possibly blocking.
  13. Respects self.timeout.
  14. """
  15. # Support people who used socket.settimeout() to escape
  16. # handle_request before self.timeout was available.
  17. # 如果用户使用 socket.settimeout() 设置了超时时间,则选取一个小的
  18. timeout = self.socket.gettimeout()
  19. if timeout is None:
  20. timeout = self.timeout
  21. elif self.timeout is not None:
  22. timeout = min(timeout, self.timeout)
  23. # select,监听连接,会阻塞直到超时
  24. fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
  25. if not fd_sets[0]:
  26. self.handle_timeout()
  27. return
  28. # 处理请求
  29. self._handle_request_noblock()
BaseServer._handle_request_noblock

真正的请求处理函数

get_request: 接收请求 accept

verify_request: 验证,做一些验证工作,比如 ip 过滤

process_request: 处理请求,子类重写该方法后,需要 调用 SocketServer.BaseServer.process_request,

BaseServer.process_request 中有 BaseRequestHandler 的回调动作,实例化用户定义的 handler, __init__ 中完成对 handle() 的调用

shutdown_reques: 关闭连接

</>复制代码

  1. def _handle_request_noblock(self):
  2. """Handle one request, without blocking.
  3. I assume that select.select has returned that the socket is
  4. readable before this function was called, so there should be
  5. no risk of blocking in get_request().
  6. """
  7. try:
  8. # 接收请求
  9. # get_request 由子类实现,一般为接收请求,返回 socket
  10. request, client_address = self.get_request()
  11. except socket.error:
  12. return
  13. if self.verify_request(request, client_address):
  14. try:
  15. self.process_request(request, client_address)
  16. except:
  17. self.handle_error(request, client_address)
  18. self.shutdown_request(request)
  19. else:
  20. self.shutdown_request(request)
BaseServer Overridden functions

</>复制代码

  1. def handle_timeout(self):
  2. """Called if no new request arrives within self.timeout.
  3. Overridden by ForkingMixIn.
  4. """
  5. pass
  6. def verify_request(self, request, client_address):
  7. """Verify the request. May be overridden.
  8. Return True if we should proceed with this request.
  9. """
  10. return True
  11. def process_request(self, request, client_address):
  12. """Call finish_request.
  13. Overridden by ForkingMixIn and ThreadingMixIn.
  14. """
  15. self.finish_request(request, client_address)
  16. self.shutdown_request(request)
  17. def server_close(self):
  18. """Called to clean-up the server.
  19. May be overridden.
  20. """
  21. pass
  22. def finish_request(self, request, client_address):
  23. """Finish one request by instantiating RequestHandlerClass."""
  24. self.RequestHandlerClass(request, client_address, self)
  25. def shutdown_request(self, request):
  26. """Called to shutdown and close an individual request."""
  27. self.close_request(request)
  28. def close_request(self, request):
  29. """Called to clean up an individual request."""
  30. pass
  31. def handle_error(self, request, client_address):
  32. """Handle an error gracefully. May be overridden.
  33. The default is to print a traceback and continue.
  34. """
  35. print "-"*40
  36. print "Exception happened during processing of request from",
  37. print client_address
  38. import traceback
  39. traceback.print_exc() # XXX But this goes to stderr!
  40. print "-"*40
TCPServer

shutdown_request 先调用 socket.shutdown 后调用 socket.close

</>复制代码

  1. close()releases the resource associated with a connection but does not necessarily close the connection immediately. If you want to close the connection in a timely fashion, callshutdown() beforeclose().

  2. Shut down one or both halves of the connection. If how is SHUT_RD, further receives are disallowed. If how is SHUT_WR, further sends are disallowed. Ifhow is SHUT_RDWR, further sends and receives are disallowed. Depending on the platform, shutting down one half of the connection can also close the opposite half (e.g. on Mac OS X, shutdown(SHUT_WR) does not allow further reads on the other end of the connection).

</>复制代码

  1. class TCPServer(BaseServer):
  2. address_family = socket.AF_INET
  3. socket_type = socket.SOCK_STREAM
  4. request_queue_size = 5
  5. allow_reuse_address = False
  6. def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
  7. """Constructor. May be extended, do not override."""
  8. BaseServer.__init__(self, server_address, RequestHandlerClass)
  9. self.socket = socket.socket(self.address_family,
  10. self.socket_type)
  11. if bind_and_activate:
  12. try:
  13. self.server_bind()
  14. self.server_activate()
  15. except:
  16. self.server_close()
  17. raise
  18. def server_bind(self):
  19. """Called by constructor to bind the socket.
  20. May be overridden.
  21. """
  22. if self.allow_reuse_address:
  23. self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  24. self.socket.bind(self.server_address)
  25. self.server_address = self.socket.getsockname()
  26. def server_activate(self):
  27. """Called by constructor to activate the server.
  28. May be overridden.
  29. """
  30. self.socket.listen(self.request_queue_size)
  31. def server_close(self):
  32. """Called to clean-up the server.
  33. May be overridden.
  34. """
  35. self.socket.close()
  36. def fileno(self):
  37. """Return socket file number.
  38. Interface required by select().
  39. """
  40. return self.socket.fileno()
  41. def get_request(self):
  42. """Get the request and client address from the socket.
  43. May be overridden.
  44. """
  45. return self.socket.accept()
  46. # 调用 shutdown 后调用 close,立即关闭并释放资源
  47. def shutdown_request(self, request):
  48. """Called to shutdown and close an individual request."""
  49. try:
  50. #explicitly shutdown. socket.close() merely releases
  51. #the socket and waits for GC to perform the actual close.
  52. request.shutdown(socket.SHUT_WR)
  53. except socket.error:
  54. pass #some platforms may raise ENOTCONN here
  55. self.close_request(request)
  56. def close_request(self, request):
  57. """Called to clean up an individual request."""
  58. request.close()
UDPServer

UDPServer get_request 返回的是一个 (data, socket) 的 tuple,而 TCPServer 返回的是 socket
handle 中要区分处理
msg, sock = self.request
msg 已经获取,无需额外 recv

</>复制代码

  1. 对于数据的传送, 你应该使用 socket 的 sendto() 和 recvfrom() 方法。 尽管传统的 send() 和 recv() 也可以达到同样的效果, 但是前面的两个方法对于 UDP 连接而言更普遍。
    from python3-cookbook

</>复制代码

  1. from SocketServer import BaseRequestHandler, UDPServer
  2. import time
  3. class TimeHandler(BaseRequestHandler):
  4. def handle(self):
  5. print("Got connection from", self.client_address)
  6. # Get message and client socket
  7. msg, sock = self.request
  8. resp = time.ctime()
  9. sock.sendto(resp.encode("ascii"), self.client_address)
  10. if __name__ == "__main__":
  11. serv = UDPServer(("", 20000), TimeHandler)
  12. serv.serve_forever()
  13. #-----------------------------
  14. >>> from socket import socket, AF_INET, SOCK_DGRAM
  15. >>> s = socket(AF_INET, SOCK_DGRAM)
  16. >>> s.sendto(b"", ("localhost", 20000))
  17. 0
  18. >>> s.recvfrom(8192)
  19. ("Thu Dec 20 10:01:01 2018", ("127.0.0.1", 20000))

</>复制代码

  1. class UDPServer(TCPServer):
  2. """UDP server class."""
  3. allow_reuse_address = False
  4. socket_type = socket.SOCK_DGRAM
  5. max_packet_size = 8192
  6. def get_request(self):
  7. data, client_addr = self.socket.recvfrom(self.max_packet_size)
  8. return (data, self.socket), client_addr
  9. def server_activate(self):
  10. # No need to call listen() for UDP.
  11. pass
  12. def shutdown_request(self, request):
  13. # No need to shutdown anything.
  14. self.close_request(request)
  15. def close_request(self, request):
  16. # No need to close anything.
  17. pass
ForkingMixIn

典型的 fork 使用,这里我们能看到 fork 多进程的典型使用

限定最大进程数,保证系统资源不至于耗尽

父进程 wait defunct 进程

fork 后父进程返回

子进程处理请求后 _exit()

</>复制代码

  1. class ForkingMixIn:
  2. """Mix-in class to handle each request in a new process."""
  3. timeout = 300
  4. active_children = None
  5. max_children = 40
  6. def collect_children(self):
  7. """Internal routine to wait for children that have exited."""
  8. if self.active_children is None:
  9. return
  10. while len(self.active_children) >= self.max_children:
  11. try:
  12. pid, _ = os.waitpid(-1, 0)
  13. self.active_children.discard(pid)
  14. except OSError as e:
  15. if e.errno == errno.ECHILD:
  16. # we don"t have any children, we"re done
  17. self.active_children.clear()
  18. elif e.errno != errno.EINTR:
  19. break
  20. # Now reap all defunct children.
  21. for pid in self.active_children.copy():
  22. try:
  23. pid, _ = os.waitpid(pid, os.WNOHANG)
  24. # if the child hasn"t exited yet, pid will be 0 and ignored by
  25. # discard() below
  26. self.active_children.discard(pid)
  27. except OSError as e:
  28. if e.errno == errno.ECHILD:
  29. # someone else reaped it
  30. self.active_children.discard(pid)
  31. def handle_timeout(self):
  32. """Wait for zombies after self.timeout seconds of inactivity.
  33. May be extended, do not override.
  34. """
  35. self.collect_children()
  36. def process_request(self, request, client_address):
  37. """Fork a new subprocess to process the request."""
  38. self.collect_children()
  39. pid = os.fork()
  40. if pid:
  41. # Parent process
  42. if self.active_children is None:
  43. self.active_children = set()
  44. self.active_children.add(pid)
  45. self.close_request(request) #close handle in parent process
  46. return
  47. else:
  48. # Child process.
  49. # This must never return, hence os._exit()!
  50. try:
  51. self.finish_request(request, client_address)
  52. self.shutdown_request(request)
  53. os._exit(0)
  54. except:
  55. try:
  56. self.handle_error(request, client_address)
  57. self.shutdown_request(request)
  58. finally:
  59. os._exit(1)
ThreadingMixIn

ThreadingMixIn 重载了 process_request 函数

创建一个线程

在线程中处理请求

启动线程

</>复制代码

  1. class ThreadingMixIn:
  2. """Mix-in class to handle each request in a new thread."""
  3. # Decides how threads will act upon termination of the
  4. # main process
  5. daemon_threads = False
  6. def process_request_thread(self, request, client_address):
  7. """Same as in BaseServer but as a thread.
  8. In addition, exception handling is done here.
  9. """
  10. try:
  11. self.finish_request(request, client_address)
  12. self.shutdown_request(request)
  13. except:
  14. self.handle_error(request, client_address)
  15. self.shutdown_request(request)
  16. def process_request(self, request, client_address):
  17. """Start a new thread to process the request."""
  18. t = threading.Thread(target = self.process_request_thread,
  19. args = (request, client_address))
  20. t.daemon = self.daemon_threads
  21. t.start()

</>复制代码

  1. class ForkingUDPServer(ForkingMixIn, UDPServer): pass
  2. class ForkingTCPServer(ForkingMixIn, TCPServer): pass
  3. class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
  4. class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
  5. if hasattr(socket, "AF_UNIX"):
  6. class UnixStreamServer(TCPServer):
  7. address_family = socket.AF_UNIX
  8. class UnixDatagramServer(UDPServer):
  9. address_family = socket.AF_UNIX
  10. class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass
  11. class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass
BaseRequestHandler

基础请求类,对外提供三个接口

setup()

handle()

finish()

使用时继承该类,通过 BaseServer 注册
BaseServer.finish_request 中实例化 BaseRequestHandler 类,在 __init__函数调用中完成继承类重载的 handle() 接口的调用

</>复制代码

  1. class BaseRequestHandler:
  2. def __init__(self, request, client_address, server):
  3. self.request = request
  4. self.client_address = client_address
  5. self.server = server
  6. self.setup()
  7. try:
  8. self.handle()
  9. finally:
  10. self.finish()
  11. def setup(self):
  12. pass
  13. def handle(self):
  14. pass
  15. def finish(self):
  16. pass
StreamRequestHandler

提供文件操作接口

</>复制代码

  1. class StreamRequestHandler(BaseRequestHandler):
  2. """Define self.rfile and self.wfile for stream sockets."""
  3. # Default buffer sizes for rfile, wfile.
  4. # We default rfile to buffered because otherwise it could be
  5. # really slow for large data (a getc() call per byte); we make
  6. # wfile unbuffered because (a) often after a write() we want to
  7. # read and we need to flush the line; (b) big writes to unbuffered
  8. # files are typically optimized by stdio even when big reads
  9. # aren"t.
  10. rbufsize = -1
  11. wbufsize = 0
  12. # A timeout to apply to the request socket, if not None.
  13. timeout = None
  14. # Disable nagle algorithm for this socket, if True.
  15. # Use only when wbufsize != 0, to avoid small packets.
  16. disable_nagle_algorithm = False
  17. def setup(self):
  18. self.connection = self.request
  19. if self.timeout is not None:
  20. self.connection.settimeout(self.timeout)
  21. if self.disable_nagle_algorithm:
  22. self.connection.setsockopt(socket.IPPROTO_TCP,
  23. socket.TCP_NODELAY, True)
  24. self.rfile = self.connection.makefile("rb", self.rbufsize)
  25. self.wfile = self.connection.makefile("wb", self.wbufsize)
  26. def finish(self):
  27. if not self.wfile.closed:
  28. try:
  29. self.wfile.flush()
  30. except socket.error:
  31. # A final socket error may have occurred here, such as
  32. # the local error ECONNABORTED.
  33. pass
  34. self.wfile.close()
  35. self.rfile.close()
DatagramRequestHandler

</>复制代码

  1. class DatagramRequestHandler(BaseRequestHandler):
  2. """Define self.rfile and self.wfile for datagram sockets."""
  3. def setup(self):
  4. try:
  5. from cStringIO import StringIO
  6. except ImportError:
  7. from StringIO import StringIO
  8. self.packet, self.socket = self.request
  9. self.rfile = StringIO(self.packet)
  10. self.wfile = StringIO()
  11. def finish(self):
  12. self.socket.sendto(self.wfile.getvalue(), self.client_address)
版权

作者:bigfish
许可协议:许可协议 知识共享署名-非商业性使用 4.0 国际许可协议

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/42860.html

相关文章

  • flask源码分析,run函数启动分析

    摘要:对背后运行机制感兴趣,参考网上资料,结合源码分析函数运行时的机制,主要整理出函数调用栈。以分析首先官方文档经典示例现在来分析启动时发生了什么代码只列出用到的函数,去掉注释等函数导入运行函数主要运行调用返回类,然后调用返回类的。 对flask背后运行机制感兴趣,参考网上资料,结合源码分析run函数运行时的机制,主要整理出函数调用栈。以flask0.1分析 首先Flask官方文档经典示例 ...

    Tony 评论0 收藏0
  • flask源码走读

    摘要:另外,如果你对模板渲染部分的内容感兴趣,也可以考虑阅读文档文档文档源码阅读,可以参考下面的函数打断点,再测试一个请求,理清过程。 Flask-Origin 源码版本 一直想好好理一下flask的实现,这个项目有Flask 0.1版本源码并加了注解,挺清晰明了的,我在其基础上完成了对Werkzeug的理解部分,大家如果想深入学习的话,可以参考werkzeug_flow.md. 阅读前 为...

    Coly 评论0 收藏0
  • 对python socket编程的初探

    摘要:对于网络编程来说,免不了要用到模块。表示另一端的地址。以上主要是针对流数据的编程。对于协议的数据,处理略有不同。通过传入对象调用来监听对象的文件描述符,一旦发现对象就绪,就通知应用程序进行相应的读写操作。 对于python网络编程来说,免不了要用到socket模块。下面分享一下个人对python socket的一些理解。 socket编程步骤 服务端创建一个socket,绑定地址和端...

    stormgens 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<