资讯专栏INFORMATION COLUMN

Swoole 源码分析——基础模块之 Pipe 管道

Tikitoo / 1663人阅读

摘要:并没有使用命名管道。的创建创建匿名管道就是调用函数,程序自动设置管道为非阻塞式。函数同样的获取管道文件描述符根据来决定。模块负责为进程创建与。当线程启动的时候,会将加入的监控当中。

前言

管道是进程间通信 IPC 的最基础的方式,管道有两种类型:命名管道和匿名管道,匿名管道专门用于具有血缘关系的进程之间,完成数据传递,命名管道可以用于任何两个进程之间。swoole 中的管道都是匿名管道。

swoole 中,有三种不同类型的管道,其中 swPipeBase 是最基础的管道,swPipeUnsock 是利用 socketpair 实现的管道,swPipeEventfdeventfd 实现的管道。swoole 并没有使用 FIFO 命名管道。

Pipe 数据结构

不管哪种类型的管道,其基础都是 swPipe,该结构体包含一个具体的 pipeobject,代表着是否阻塞的 blocking,超时时间 timeout,还有对管道的操作函数readwritegetfdclose

</>复制代码

  1. typedef struct _swPipe
  2. {
  3. void *object;
  4. int blocking;
  5. double timeout;
  6. int (*read)(struct _swPipe *, void *recv, int length);
  7. int (*write)(struct _swPipe *, void *send, int length);
  8. int (*getFd)(struct _swPipe *, int master);
  9. int (*close)(struct _swPipe *);
  10. } swPipe;
swPipeBase 匿名管道 swPipeBase 数据结构

数据结构非常简单,就是一个数组,存放着 pipe 的读端和写端。值得注意的是,swPipeBase 是半全工的管道,也就是说 pipes[0] 只能用于读,pipes[1] 只能用于写。

当多个进程共享这个管道的时候,所有的进程读取都需要 read 读端 pipes[0],进程写入消息都要 write 写端 pipes[1]

因此使用这个匿名管道的时候,一般情形是一个进程只负责写,另一个进程只负责读,只能单向传递消息,不能双向传递,否则很有可能读到了自己刚刚发送的消息。

</>复制代码

  1. typedef struct _swPipeBase
  2. {
  3. int pipes[2];
  4. } swPipeBase;
swPipeBase 的创建

创建匿名管道就是调用 pipe 函数,程序自动设置管道为非阻塞式。

</>复制代码

  1. int swPipeBase_create(swPipe *p, int blocking)
  2. {
  3. int ret;
  4. swPipeBase *object = sw_malloc(sizeof(swPipeBase));
  5. if (object == NULL)
  6. {
  7. return -1;
  8. }
  9. p->blocking = blocking;
  10. ret = pipe(object->pipes);
  11. if (ret < 0)
  12. {
  13. swWarn("pipe() failed. Error: %s[%d]", strerror(errno), errno);
  14. sw_free(object);
  15. return -1;
  16. }
  17. else
  18. {
  19. //Nonblock
  20. swSetNonBlock(object->pipes[0]);
  21. swSetNonBlock(object->pipes[1]);
  22. p->timeout = -1;
  23. p->object = object;
  24. p->read = swPipeBase_read;
  25. p->write = swPipeBase_write;
  26. p->getFd = swPipeBase_getFd;
  27. p->close = swPipeBase_close;
  28. }
  29. return 0;
  30. }
swPipeBase_read 管道的读

由于匿名管道被设置为非阻塞式,无法实现超时等待写入。如果想要阻塞式的向管道写入数据,设置一定超时时间,就需要利用 poll 函数。当 pipefd 可读时,poll 立刻返回,或者达到超时时间。

</>复制代码

  1. static int swPipeBase_read(swPipe *p, void *data, int length)
  2. {
  3. swPipeBase *object = p->object;
  4. if (p->blocking == 1 && p->timeout > 0)
  5. {
  6. if (swSocket_wait(object->pipes[0], p->timeout * 1000, SW_EVENT_READ) < 0)
  7. {
  8. return SW_ERR;
  9. }
  10. }
  11. return read(object->pipes[0], data, length);
  12. }
  13. int swSocket_wait(int fd, int timeout_ms, int events)
  14. {
  15. struct pollfd event;
  16. event.fd = fd;
  17. event.events = 0;
  18. if (events & SW_EVENT_READ)
  19. {
  20. event.events |= POLLIN;
  21. }
  22. if (events & SW_EVENT_WRITE)
  23. {
  24. event.events |= POLLOUT;
  25. }
  26. while (1)
  27. {
  28. int ret = poll(&event, 1, timeout_ms);
  29. if (ret == 0)
  30. {
  31. return SW_ERR;
  32. }
  33. else if (ret < 0 && errno != EINTR)
  34. {
  35. swWarn("poll() failed. Error: %s[%d]", strerror(errno), errno);
  36. return SW_ERR;
  37. }
  38. else
  39. {
  40. return SW_OK;
  41. }
  42. }
  43. return SW_OK;
  44. }
swPipeBase_write 管道的写入

管道的写入直接调用 write 即可,非阻塞式 IO 会立刻返回结果。

</>复制代码

  1. static int swPipeBase_write(swPipe *p, void *data, int length)
  2. {
  3. swPipeBase *this = p->object;
  4. return write(this->pipes[1], data, length);
  5. }
swPipeBase_getFd

本函数用于获取管道的读端或者写端。

</>复制代码

  1. static int swPipeBase_getFd(swPipe *p, int isWriteFd)
  2. {
  3. swPipeBase *this = p->object;
  4. return (isWriteFd == 0) ? this->pipes[0] : this->pipes[1];
  5. }
swPipeBase_close 关闭管道

</>复制代码

  1. static int swPipeBase_close(swPipe *p)
  2. {
  3. int ret1, ret2;
  4. swPipeBase *this = p->object;
  5. ret1 = close(this->pipes[0]);
  6. ret2 = close(this->pipes[1]);
  7. sw_free(this);
  8. return 0 - ret1 - ret2;
  9. }
swPipeEventfd 管道 swPipeEventfd 数据结构

数据结构中仅仅存放 eventfd 函数返回的文件描述符。

pipe 管道不同的是,eventfd 只有一个文件描述符,读和写都是对这个文件描述符进行操作。

该管道同样也是只适用于进程间单向通信。

</>复制代码

  1. typedef struct _swPipeEventfd
  2. {
  3. int event_fd;
  4. } swPipeEventfd;
swPipeEventfd_read 管道的读取

类似于匿名管道,eventfd 也不支持超时等待,因此还是利用 poll 函数进行超时等待。

由于 eventfd 可能是阻塞式,因此 read 时可能会被信号打断。

</>复制代码

  1. static int swPipeEventfd_read(swPipe *p, void *data, int length)
  2. {
  3. int ret = -1;
  4. swPipeEventfd *object = p->object;
  5. //eventfd not support socket timeout
  6. if (p->blocking == 1 && p->timeout > 0)
  7. {
  8. if (swSocket_wait(object->event_fd, p->timeout * 1000, SW_EVENT_READ) < 0)
  9. {
  10. return SW_ERR;
  11. }
  12. }
  13. while (1)
  14. {
  15. ret = read(object->event_fd, data, sizeof(uint64_t));
  16. if (ret < 0 && errno == EINTR)
  17. {
  18. continue;
  19. }
  20. break;
  21. }
  22. return ret;
  23. }
swPipeEventfd_write 管道的写入

写入和读取的过程类似,注意被信号打断后继续循环即可。

</>复制代码

  1. static int swPipeEventfd_write(swPipe *p, void *data, int length)
  2. {
  3. int ret;
  4. swPipeEventfd *this = p->object;
  5. while (1)
  6. {
  7. ret = write(this->event_fd, data, sizeof(uint64_t));
  8. if (ret < 0)
  9. {
  10. if (errno == EINTR)
  11. {
  12. continue;
  13. }
  14. }
  15. break;
  16. }
  17. return ret;
  18. }
swPipeEventfd_getFd

</>复制代码

  1. static int swPipeEventfd_getFd(swPipe *p, int isWriteFd)
  2. {
  3. return ((swPipeEventfd *) (p->object))->event_fd;
  4. }
swPipeEventfd_close 关闭管道

</>复制代码

  1. static int swPipeEventfd_close(swPipe *p)
  2. {
  3. int ret;
  4. ret = close(((swPipeEventfd *) (p->object))->event_fd);
  5. sw_free(p->object);
  6. return ret;
  7. }
swPipeUnsock 管道 swPipeUnsock 数据结构

不同于 pipe 的匿名管道,swPipeUnsock 管道是双向通信的管道。

因此两个进程利用 swPipeUnsock 管道进行通信的时候,独占一个 sock,也就是说 A 进程读写都是用 socks[0]B 进程读写都是用 socks[1]socks[0] 写入的消息会在 socks[1] 读出来,反之,socks[0] 读出的消息是 sock[1] 写入的,这样就实现了两个进程的双向通信。

</>复制代码

  1. typedef struct _swPipeUnsock
  2. {
  3. /**
  4. * master : socks[1]
  5. * worker : socks[0]
  6. */
  7. int socks[2];
  8. /**
  9. * master pipe is closed
  10. */
  11. uint8_t pipe_master_closed;
  12. /**
  13. * worker pipe is closed
  14. */
  15. uint8_t pipe_worker_closed;
  16. } swPipeUnsock;
swPipeUnsock 的创建

swPipeUnsock 的创建主要是调用 socketpair 函数,protocol 决定了创建的 socketSOCK_DGRAM 类型还是 SOCK_STREAM 类型。

</>复制代码

  1. int swPipeUnsock_create(swPipe *p, int blocking, int protocol)
  2. {
  3. int ret;
  4. swPipeUnsock *object = sw_malloc(sizeof(swPipeUnsock));
  5. if (object == NULL)
  6. {
  7. swWarn("malloc() failed.");
  8. return SW_ERR;
  9. }
  10. bzero(object, sizeof(swPipeUnsock));
  11. p->blocking = blocking;
  12. ret = socketpair(AF_UNIX, protocol, 0, object->socks);
  13. if (ret < 0)
  14. {
  15. swWarn("socketpair() failed. Error: %s [%d]", strerror(errno), errno);
  16. sw_free(object);
  17. return SW_ERR;
  18. }
  19. else
  20. {
  21. //Nonblock
  22. if (blocking == 0)
  23. {
  24. swSetNonBlock(object->socks[0]);
  25. swSetNonBlock(object->socks[1]);
  26. }
  27. int sbsize = SwooleG.socket_buffer_size;
  28. swSocket_set_buffer_size(object->socks[0], sbsize);
  29. swSocket_set_buffer_size(object->socks[1], sbsize);
  30. p->object = object;
  31. p->read = swPipeUnsock_read;
  32. p->write = swPipeUnsock_write;
  33. p->getFd = swPipeUnsock_getFd;
  34. p->close = swPipeUnsock_close;
  35. }
  36. return 0;
  37. }
  38. int swSocket_set_buffer_size(int fd, int buffer_size)
  39. {
  40. if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffer_size, sizeof(buffer_size)) < 0)
  41. {
  42. swSysError("setsockopt(%d, SOL_SOCKET, SO_SNDBUF, %d) failed.", fd, buffer_size);
  43. return SW_ERR;
  44. }
  45. if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buffer_size, sizeof(buffer_size)) < 0)
  46. {
  47. swSysError("setsockopt(%d, SOL_SOCKET, SO_RCVBUF, %d) failed.", fd, buffer_size);
  48. return SW_ERR;
  49. }
  50. return SW_OK;
  51. }
swPipeUnsock_getFd 函数

同样的获取管道文件描述符根据 master 来决定。

</>复制代码

  1. static int swPipeUnsock_getFd(swPipe *p, int master)
  2. {
  3. swPipeUnsock *this = p->object;
  4. return master == 1 ? this->socks[1] : this->socks[0];
  5. }
swPipeUnsock_close 关闭管道

关闭管道就是调用 close 来依次关闭两个 socket.

</>复制代码

  1. static int swPipeUnsock_close(swPipe *p)
  2. {
  3. swPipeUnsock *object = p->object;
  4. int ret = swPipeUnsock_close_ext(p, 0);
  5. sw_free(object);
  6. return ret;
  7. }
  8. int swPipeUnsock_close_ext(swPipe *p, int which)
  9. {
  10. int ret1 = 0, ret2 = 0;
  11. swPipeUnsock *object = p->object;
  12. if (which == SW_PIPE_CLOSE_MASTER)
  13. {
  14. if (object->pipe_master_closed)
  15. {
  16. return SW_ERR;
  17. }
  18. ret1 = close(object->socks[1]);
  19. object->pipe_master_closed = 1;
  20. }
  21. else if (which == SW_PIPE_CLOSE_WORKER)
  22. {
  23. if (object->pipe_worker_closed)
  24. {
  25. return SW_ERR;
  26. }
  27. ret1 = close(object->socks[0]);
  28. object->pipe_worker_closed = 1;
  29. }
  30. else
  31. {
  32. ret1 = swPipeUnsock_close_ext(p, SW_PIPE_CLOSE_MASTER);
  33. ret2 = swPipeUnsock_close_ext(p, SW_PIPE_CLOSE_WORKER);
  34. }
  35. return 0 - ret1 - ret2;
  36. }
管道的应用 tasker 模块

当调用 taskwait 函数后,投递的 worker 进程会阻塞在 serv->task_notify[SwooleWG.id] 管道的读取中,tasker 模块处理完毕后,会向 serv->task_notify[source_worker_id] 管道写入数据。

这个就是 pipe 函数或者 eventfd 创建的匿名管道的用途,用于单向的进程通信(tasker 进程向 worker 进程传递数据)。

</>复制代码

  1. static inline int swPipeNotify_auto(swPipe *p, int blocking, int semaphore)
  2. {
  3. #ifdef HAVE_EVENTFD
  4. return swPipeEventfd_create(p, blocking, semaphore, 0);
  5. #else
  6. return swPipeBase_create(p, blocking);
  7. #endif
  8. }
worker 模块

manager 负责为 worker 进程创建 pipe_masterpipe_worker。用于 reactor 线程与 worker 进程直接进行通信。

</>复制代码

  1. int swManager_start(swFactory *factory)
  2. {
  3. ...
  4. for (i = 0; i < serv->worker_num; i++)
  5. {
  6. if (swPipeUnsock_create(&object->pipes[i], 1, SOCK_DGRAM) < 0)
  7. {
  8. return SW_ERR;
  9. }
  10. serv->workers[i].pipe_master = object->pipes[i].getFd(&object->pipes[i], SW_PIPE_MASTER);
  11. serv->workers[i].pipe_worker = object->pipes[i].getFd(&object->pipes[i], SW_PIPE_WORKER);
  12. serv->workers[i].pipe_object = &object->pipes[i];
  13. swServer_store_pipe_fd(serv, serv->workers[i].pipe_object);
  14. }
  15. ...
  16. }

reactor 线程启动的时候,会将 pipe_master 加入 reactor 的监控当中。

</>复制代码

  1. static int swReactorThread_loop(swThreadParam *param)
  2. {
  3. ...
  4. for (i = 0; i < serv->worker_num; i++)
  5. {
  6. if (i % serv->reactor_num == reactor_id)
  7. {
  8. pipe_fd = serv->workers[i].pipe_master;
  9. swSetNonBlock(pipe_fd);
  10. reactor->add(reactor, pipe_fd, SW_FD_PIPE);
  11. if (thread->notify_pipe == 0)
  12. {
  13. thread->notify_pipe = serv->workers[i].pipe_worker;
  14. }
  15. }
  16. }
  17. ...
  18. }

worker 进程中,会将 pipe_worker 作为另一端 socket 放入 workerreactor 事件循环中进行监控。

</>复制代码

  1. int swWorker_loop(swFactory *factory, int worker_id)
  2. {
  3. ...
  4. int pipe_worker = worker->pipe_worker;
  5. swSetNonBlock(pipe_worker);
  6. SwooleG.main_reactor->ptr = serv;
  7. SwooleG.main_reactor->add(SwooleG.main_reactor, pipe_worker, SW_FD_PIPE | SW_EVENT_READ);
  8. SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_PIPE, swWorker_onPipeReceive);
  9. SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_WRITE, swReactor_onWrite);
  10. ...
  11. }
tasker 进程

tasker 进程中管道的创建是 swProcessPool_create 函数完成的。

</>复制代码

  1. int swProcessPool_create(swProcessPool *pool, int worker_num, int max_request, key_t msgqueue_key, int ipc_mode)
  2. {
  3. ...
  4. else if (ipc_mode == SW_IPC_UNIXSOCK)
  5. {
  6. pool->pipes = sw_calloc(worker_num, sizeof(swPipe));
  7. if (pool->pipes == NULL)
  8. {
  9. swWarn("malloc[2] failed.");
  10. return SW_ERR;
  11. }
  12. swPipe *pipe;
  13. int i;
  14. for (i = 0; i < worker_num; i++)
  15. {
  16. pipe = &pool->pipes[i];
  17. if (swPipeUnsock_create(pipe, 1, SOCK_DGRAM) < 0)
  18. {
  19. return SW_ERR;
  20. }
  21. pool->workers[i].pipe_master = pipe->getFd(pipe, SW_PIPE_MASTER);
  22. pool->workers[i].pipe_worker = pipe->getFd(pipe, SW_PIPE_WORKER);
  23. pool->workers[i].pipe_object = pipe;
  24. }
  25. }
  26. ...
  27. }

tasker 进程发布任务的时候,会调用 swProcessPool_dispatch 函数,进而会向 pipe_master 管道写入任务数据。

</>复制代码

  1. int swProcessPool_dispatch(swProcessPool *pool, swEventData *data, int *dst_worker_id)
  2. {
  3. ...
  4. ret = swWorker_send2worker(worker, data, sendn, SW_PIPE_MASTER | SW_PIPE_NONBLOCK);
  5. ...
  6. }
  7. int swWorker_send2worker(swWorker *dst_worker, void *buf, int n, int flag)
  8. {
  9. int pipefd, ret;
  10. if (flag & SW_PIPE_MASTER)
  11. {
  12. pipefd = dst_worker->pipe_master;
  13. }
  14. else
  15. {
  16. pipefd = dst_worker->pipe_worker;
  17. }
  18. ...
  19. if ((flag & SW_PIPE_NONBLOCK) && SwooleG.main_reactor)
  20. {
  21. return SwooleG.main_reactor->write(SwooleG.main_reactor, pipefd, buf, n);
  22. }
  23. else
  24. {
  25. ret = swSocket_write_blocking(pipefd, buf, n);
  26. }
  27. return ret;
  28. }

tasker 进程并没有 reactor 事件循环,只会阻塞在某个系统调用中,如果 tasker 进程采用的是 unix socket 进行投递任务的时候,就会阻塞在对管道的 read 当中。

</>复制代码

  1. static int swProcessPool_worker_loop(swProcessPool *pool, swWorker *worker)
  2. {
  3. ...
  4. while (SwooleG.running > 0 && task_n > 0)
  5. {
  6. ...
  7. else
  8. {
  9. n = read(worker->pipe_worker, &out.buf, sizeof(out.buf));
  10. if (n < 0 && errno != EINTR)
  11. {
  12. swSysError("[Worker#%d] read(%d) failed.", worker->id, worker->pipe_worker);
  13. }
  14. }
  15. ...
  16. }
  17. ...
  18. }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/30864.html

相关文章

  • Swoole 源码分析——进程管理 Swoole_Process

    摘要:清空主进程残留的定时器与信号。设定为执行回调函数如果在回调函数中调用了异步系统,启动函数进行事件循环。因此为了区分两者,规定并不允许两者同时存在。 前言 swoole-1.7.2 增加了一个进程管理模块,用来替代 PHP 的 pcntl 扩展。 PHP自带的pcntl,存在很多不足,如 pcntl 没有提供进程间通信的功能 pcntl 不支持重定向标准输入和输出 pcntl 只...

    pepperwang 评论0 收藏0
  • Swoole 源码分析——Server模块Start

    摘要:是缓存区高水位线,达到了说明缓冲区即将满了创建线程函数用于将监控的存放于中向中添加监听的文件描述符等待所有的线程开启事件循环利用创建线程,线程启动函数是保存监听本函数将用于监听的存放到当中,并设置相应的属性 Server 的启动 在 server 启动之前,swoole 首先要调用 php_swoole_register_callback 将 PHP 的回调函数注册到 server...

    3fuyu 评论0 收藏0
  • Swoole源码研究】浅析swoole中server的实现

    摘要:的部分是基于以及协议的。例如父进程向中写入子进程从中读取子进程向中写入父进程从中读取。默认使用对进程进行分配交给对应的线程进行监听线程收到某个进程的数据后会进行处理值得注意的是这个线程可能并不是发送请求的那个线程。 作者:施洪宝 一. 基础知识 1.1 swoole swoole是面向生产环境的php异步网络通信引擎, php开发人员可以利用swoole开发出高性能的server服务。...

    rainyang 评论0 收藏0
  • Swoole笔记(四)

    摘要:配合模块,创建的子进程可以异步的事件驱动模式。默认为阻塞读取。函数用于将一个加入到的事件监听中。为事件类型的掩码,可选择关闭开启可读可写事件,如,,或者。在程序中使用,可以理解为在进程中将此注册到事件中。 Process Process是swoole内置的进程管理模块,用来替代PHP的pcntl扩展。 swoole_process支持重定向标准输入和输出,在子进程内echo不会打印屏...

    yzd 评论0 收藏0
  • Swoole 源码分析——Server模块Worker事件循环

    摘要:如果为,就不断循环,杀死或者启动相应的进程,如果为,那么就关闭所有的进程,调用函数退出程序。调用函数,监控已结束的进程如果函数返回异常,很有可能是被信号打断。函数主要用于调用函数,进而调用函数 swManager_loop 函数 manager 进程管理 manager 进程开启的时候,首先要调用 onManagerStart 回调 添加信号处理函数 swSignal_add,S...

    BDEEFE 评论0 收藏0

发表评论

0条评论

Tikitoo

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<