资讯专栏INFORMATION COLUMN

Swoole 源码分析——Client模块之Send

caozhijian / 2092人阅读

摘要:当此时的套接字不可写的时候,会自动放入缓冲区中。当大于高水线时,会自动调用回调函数。写就绪状态当监控到套接字进入了写就绪状态时,就会调用函数。如果为,说明此时异步客户端虽然建立了连接,但是还没有调用回调函数,因此这时要调用函数。

前言

上一章我们说了客户端的连接 connect,对于同步客户端来说,连接已经建立成功;但是对于异步客户端来说,此时可能还在进行 DNS 的解析,onConnect 回调函数还未执行。

本节中,我们将继续说明客户端发送数据的流程,同时我们可以看到 TCP 异步客户端执行 onConnect 回调函数的过程。

send 入口

本入口函数逻辑非常简单,从 PHP 函数中获取数据 data,然后调用 connect 函数。

</>复制代码

  1. static PHP_METHOD(swoole_client, send)
  2. {
  3. char *data;
  4. zend_size_t data_len;
  5. zend_long flags = 0;
  6. #ifdef FAST_ZPP
  7. ZEND_PARSE_PARAMETERS_START(1, 2)
  8. Z_PARAM_STRING(data, data_len)
  9. Z_PARAM_OPTIONAL
  10. Z_PARAM_LONG(flags)
  11. ZEND_PARSE_PARAMETERS_END();
  12. #else
  13. if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|l", &data, &data_len, &flags) == FAILURE)
  14. {
  15. return;
  16. }
  17. #endif
  18. swClient *cli = client_get_ptr(getThis() TSRMLS_CC);
  19. //clear errno
  20. SwooleG.error = 0;
  21. int ret = cli->send(cli, data, data_len, flags);
  22. if (ret < 0)
  23. {
  24. swoole_php_sys_error(E_WARNING, "failed to send(%d) %zd bytes.", cli->socket->fd, data_len);
  25. zend_update_property_long(swoole_client_class_entry_ptr, getThis(), SW_STRL("errCode")-1, SwooleG.error TSRMLS_CC);
  26. RETVAL_FALSE;
  27. }
  28. else
  29. {
  30. RETURN_LONG(ret);
  31. }
  32. }
swClient_tcp_send_sync 同步 TCP 客户端

对于同步客户端来说,发送数据是通过 swConnection_send 函数来进行阻塞式调用 send,当返回的错误是 EAGAIN 的时候调用 swSocket_wait 等待 1s。

</>复制代码

  1. static int swClient_tcp_send_sync(swClient *cli, char *data, int length, int flags)
  2. {
  3. int written = 0;
  4. int n;
  5. assert(length > 0);
  6. assert(data != NULL);
  7. while (written < length)
  8. {
  9. n = swConnection_send(cli->socket, data, length - written, flags);
  10. if (n < 0)
  11. {
  12. if (errno == EINTR)
  13. {
  14. continue;
  15. }
  16. else if (errno == EAGAIN)
  17. {
  18. swSocket_wait(cli->socket->fd, 1000, SW_EVENT_WRITE);
  19. continue;
  20. }
  21. else
  22. {
  23. SwooleG.error = errno;
  24. return SW_ERR;
  25. }
  26. }
  27. written += n;
  28. data += n;
  29. }
  30. return written;
  31. }
swClient_tcp_send_async 异步 TCP 客户端

由于异步客户端已经设置为非阻塞,并且加入了 reactor 的监控,因此发送数据只需要 reactor->write 函数即可。当此时的套接字不可写的时候,会自动放入 out_buff 缓冲区中。

out_buffer 大于高水线时,会自动调用 onBufferFull 回调函数。

</>复制代码

  1. static int swClient_tcp_send_async(swClient *cli, char *data, int length, int flags)
  2. {
  3. int n = length;
  4. if (cli->reactor->write(cli->reactor, cli->socket->fd, data, length) < 0)
  5. {
  6. if (SwooleG.error == SW_ERROR_OUTPUT_BUFFER_OVERFLOW)
  7. {
  8. n = -1;
  9. cli->socket->high_watermark = 1;
  10. }
  11. else
  12. {
  13. return SW_ERR;
  14. }
  15. }
  16. if (cli->onBufferFull && cli->socket->out_buffer && cli->socket->high_watermark == 0
  17. && cli->socket->out_buffer->length >= cli->buffer_high_watermark)
  18. {
  19. cli->socket->high_watermark = 1;
  20. cli->onBufferFull(cli);
  21. }
  22. return n;
  23. }
swClient_udp_send UDP 客户端

对于 UDP 客户端来说,如果 Socket 缓存区塞满,并不会像 TCP 进行等待 reactor 可写状态,而是直接返回了结果。对于异步客户端来说,仅仅是非阻塞调用 sendto 函数。

</>复制代码

  1. static int swClient_udp_send(swClient *cli, char *data, int len, int flags)
  2. {
  3. int n;
  4. n = sendto(cli->socket->fd, data, len, 0, (struct sockaddr *) &cli->server_addr.addr, cli->server_addr.len);
  5. if (n < 0 || n < len)
  6. {
  7. return SW_ERR;
  8. }
  9. else
  10. {
  11. return n;
  12. }
  13. }
sendto UDP 客户端

类似于 send 函数,sendto 函数专门针对 UDP 客户端,与 send 函数不同的是,sendto 函数在底层套接字缓冲区塞满的时候,会调用 swSocket_wait 进行阻塞等待。

</>复制代码

  1. static PHP_METHOD(swoole_client, sendto)
  2. {
  3. char* ip;
  4. zend_size_t ip_len;
  5. long port;
  6. char *data;
  7. zend_size_t len;
  8. if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sls", &ip, &ip_len, &port, &data, &len) == FAILURE)
  9. {
  10. return;
  11. }
  12. swClient *cli = (swClient *) swoole_get_object(getThis());
  13. int ret;
  14. if (cli->type == SW_SOCK_UDP)
  15. {
  16. ret = swSocket_udp_sendto(cli->socket->fd, ip, port, data, len);
  17. }
  18. else if (cli->type == SW_SOCK_UDP6)
  19. {
  20. ret = swSocket_udp_sendto6(cli->socket->fd, ip, port, data, len);
  21. }
  22. else
  23. {
  24. swoole_php_fatal_error(E_WARNING, "only supports SWOOLE_SOCK_UDP or SWOOLE_SOCK_UDP6.");
  25. RETURN_FALSE;
  26. }
  27. SW_CHECK_RETURN(ret);
  28. }
  29. int swSocket_udp_sendto(int server_sock, char *dst_ip, int dst_port, char *data, uint32_t len)
  30. {
  31. struct sockaddr_in addr;
  32. if (inet_aton(dst_ip, &addr.sin_addr) == 0)
  33. {
  34. swWarn("ip[%s] is invalid.", dst_ip);
  35. return SW_ERR;
  36. }
  37. addr.sin_family = AF_INET;
  38. addr.sin_port = htons(dst_port);
  39. return swSocket_sendto_blocking(server_sock, data, len, 0, (struct sockaddr *) &addr, sizeof(addr));
  40. }
  41. int swSocket_udp_sendto6(int server_sock, char *dst_ip, int dst_port, char *data, uint32_t len)
  42. {
  43. struct sockaddr_in6 addr;
  44. bzero(&addr, sizeof(addr));
  45. if (inet_pton(AF_INET6, dst_ip, &addr.sin6_addr) < 0)
  46. {
  47. swWarn("ip[%s] is invalid.", dst_ip);
  48. return SW_ERR;
  49. }
  50. addr.sin6_port = (uint16_t) htons(dst_port);
  51. addr.sin6_family = AF_INET6;
  52. return swSocket_sendto_blocking(server_sock, data, len, 0, (struct sockaddr *) &addr, sizeof(addr));
  53. }
  54. int swSocket_sendto_blocking(int fd, void *__buf, size_t __n, int flag, struct sockaddr *__addr, socklen_t __addr_len)
  55. {
  56. int n = 0;
  57. while (1)
  58. {
  59. n = sendto(fd, __buf, __n, flag, __addr, __addr_len);
  60. if (n >= 0)
  61. {
  62. break;
  63. }
  64. else
  65. {
  66. if (errno == EINTR)
  67. {
  68. continue;
  69. }
  70. else if (swConnection_error(errno) == SW_WAIT)
  71. {
  72. swSocket_wait(fd, 1000, SW_EVENT_WRITE);
  73. continue;
  74. }
  75. else
  76. {
  77. break;
  78. }
  79. }
  80. }
  81. return n;
  82. }
swClient_onWrite 写就绪状态

reactor 监控到套接字进入了写就绪状态时,就会调用 swClient_onWrite 函数。

从上一章我们知道,异步客户端建立连接过程中 swoole 调用了 connect 函数,该返回 0,或者返回错误码 EINPROGRESS 都会对写就绪进行监控。无论哪种情况,swClient_onWrite 被调用就说明此时连接已经建立成功,三次握手已经完成,但是 cli->socket->active 还是 0。

如果 cli->socket->active 为 0,说明此时异步客户端虽然建立了连接,但是还没有调用 onConnect 回调函数,因此这时要调用 execute_onConnect 函数。如果使用了 SSL 隧道加密,还要进行 SSL 握手,并且设置 _socket->ssl_state = SW_SSL_STATE_WAIT_STREAM

active 为 1 的时候,就可以调用 swReactor_onWrite 来发送数据。

</>复制代码

  1. static int swClient_onWrite(swReactor *reactor, swEvent *event)
  2. {
  3. swClient *cli = event->socket->object;
  4. swConnection *_socket = cli->socket;
  5. if (cli->socket->active)
  6. {
  7. #ifdef SW_USE_OPENSSL
  8. if (cli->open_ssl && _socket->ssl_state == SW_SSL_STATE_WAIT_STREAM)
  9. {
  10. if (swClient_ssl_handshake(cli) < 0)
  11. {
  12. goto connect_fail;
  13. }
  14. else if (_socket->ssl_state == SW_SSL_STATE_READY)
  15. {
  16. goto connect_success;
  17. }
  18. else
  19. {
  20. if (_socket->ssl_want_read)
  21. {
  22. cli->reactor->set(cli->reactor, event->fd, SW_FD_STREAM_CLIENT | SW_EVENT_READ);
  23. }
  24. return SW_OK;
  25. }
  26. }
  27. #endif
  28. if (swReactor_onWrite(cli->reactor, event) < 0)
  29. {
  30. return SW_ERR;
  31. }
  32. if (cli->onBufferEmpty && _socket->high_watermark && _socket->out_buffer->length <= cli->buffer_low_watermark)
  33. {
  34. _socket->high_watermark = 0;
  35. cli->onBufferEmpty(cli);
  36. }
  37. return SW_OK;
  38. }
  39. socklen_t len = sizeof(SwooleG.error);
  40. if (getsockopt(event->fd, SOL_SOCKET, SO_ERROR, &SwooleG.error, &len) < 0)
  41. {
  42. swWarn("getsockopt(%d) failed. Error: %s[%d]", event->fd, strerror(errno), errno);
  43. return SW_ERR;
  44. }
  45. //success
  46. if (SwooleG.error == 0)
  47. {
  48. //listen read event
  49. cli->reactor->set(cli->reactor, event->fd, SW_FD_STREAM_CLIENT | SW_EVENT_READ);
  50. //connected
  51. _socket->active = 1;
  52. #ifdef SW_USE_OPENSSL
  53. if (cli->open_ssl)
  54. {
  55. if (swClient_enable_ssl_encrypt(cli) < 0)
  56. {
  57. goto connect_fail;
  58. }
  59. if (swClient_ssl_handshake(cli) < 0)
  60. {
  61. goto connect_fail;
  62. }
  63. else
  64. {
  65. _socket->ssl_state = SW_SSL_STATE_WAIT_STREAM;
  66. }
  67. return SW_OK;
  68. }
  69. connect_success:
  70. #endif
  71. if (cli->onConnect)
  72. {
  73. execute_onConnect(cli);
  74. }
  75. }
  76. else
  77. {
  78. #ifdef SW_USE_OPENSSL
  79. connect_fail:
  80. #endif
  81. _socket->active = 0;
  82. cli->close(cli);
  83. if (cli->onError)
  84. {
  85. cli->onError(cli);
  86. }
  87. }
  88. return SW_OK;
  89. }
  90. static sw_inline void execute_onConnect(swClient *cli)
  91. {
  92. if (cli->timer)
  93. {
  94. swTimer_del(&SwooleG.timer, cli->timer);
  95. cli->timer = NULL;
  96. }
  97. cli->onConnect(cli);
  98. }
client_onConnect

</>复制代码

  1. static void client_onConnect(swClient *cli)
  2. {
  3. zval *zobject = (zval *) cli->object;
  4. #ifdef SW_USE_OPENSSL
  5. if (cli->ssl_wait_handshake)
  6. {
  7. client_execute_callback(zobject, SW_CLIENT_CB_onSSLReady);
  8. }
  9. else
  10. #endif
  11. if (!cli->redirect)
  12. {
  13. client_execute_callback(zobject, SW_CLIENT_CB_onConnect);
  14. }
  15. else
  16. {
  17. client_callback *cb = (client_callback *) swoole_get_property(zobject, 0);
  18. if (!cb || !cb->onReceive)
  19. {
  20. swoole_php_fatal_error(E_ERROR, "has no "onReceive" callback function.");
  21. }
  22. }
  23. }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/29498.html

相关文章

  • Swoole 源码分析——Server模块Stream 模式

    摘要:新建可以看到,自动采用包长检测的方法该函数主要功能是设置各种回调函数值得注意的是第三个参数代表是否异步。发送数据函数并不是直接发送数据,而是将数据存储在,等着写事件就绪之后调用发送数据。 swReactorThread_dispatch 发送数据 reactor 线程会通过 swReactorThread_dispatch 发送数据,当采用 stream 发送数据的时候,会调用 sw...

    wums 评论0 收藏0
  • Swoole 源码分析——Reactor模块ReactorBase

    前言 作为一个网络框架,最为核心的就是消息的接受与发送。高效的 reactor 模式一直是众多网络框架的首要选择,本节主要讲解 swoole 中的 reactor 模块。 UNP 学习笔记——IO 复用 Reactor 的数据结构 Reactor 的数据结构比较复杂,首先 object 是具体 Reactor 对象的首地址,ptr 是拥有 Reactor 对象的类的指针, event_nu...

    baukh789 评论0 收藏0
  • Swoole 源码分析——Client模块Connect

    摘要:两个函数是可选回调函数。附带了一组可信任证书。应该注意的是,验证失败并不意味着连接不能使用。在对证书进行验证时,有一些安全性检查并没有执行,包括证书的失效检查和对证书中通用名的有效性验证。 前言 swoole_client 提供了 tcp/udp socket 的客户端的封装代码,使用时仅需 new swoole_client 即可。 swoole 的 socket client 对比...

    Charles 评论0 收藏0
  • Swoole 源码分析——Server模块OpenSSL(下)

    摘要:对于服务端来说,缓存默认是不能使用的,可以通过调用函数来进行设置生效。在回调函数中,首先申请一个大数数据结构,然后将其设定为,该值表示公钥指数,然后利用函数生成秘钥。此时需要调用函数将新的连接与绑定。 前言 上一篇文章我们讲了 OpenSSL 的原理,接下来,我们来说说如何利用 openssl 第三方库进行开发,来为 tcp 层进行 SSL 隧道加密 OpenSSL 初始化 在 sw...

    LiuRhoRamen 评论0 收藏0
  • Swoole 源码分析——Server模块ReactorThread事件循环(下)

    摘要:之后如果仍然有剩余未发送的数据,那么就如果已经没有剩余数据了,继续去取下一个数据包。拿到后,要用函数转化为相应的类型即可得到包长值。 swPort_onRead_check_eof EOF 自动分包 我们前面说过,swPort_onRead_raw 是最简单的向 worker 进程发送数据包的方法,swoole 会将从客户端接受到的数据包,立刻发送给 worker 进程,用户自己把...

    Maxiye 评论0 收藏0

发表评论

0条评论

caozhijian

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<