资讯专栏INFORMATION COLUMN

Swoft 源码剖析 - Swoole和Swoft的那些事(Task投递/定时任务篇)

vvpvvp / 2443人阅读

摘要:作为定时任务的执行者,通过每唤醒自身一次,然后把执行表遍历一次,挑选当下需要执行的任务,通过投递出去并更新该任务执行表中的状态。

作者:bromine
链接:https://www.jianshu.com/p/b44...
來源:简书
著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。
Swoft Github: https://github.com/swoft-clou...

</>复制代码

  1. Swoft源码剖析系列目录:https://segmentfault.com/a/11...
前言

Swoft的任务功能基于SwooleTask机制,或者说SwoftTask机制本质就是对SwooleTask机制的封装和加强。

任务投递

</>复制代码

  1. //SwoftTaskTask.php
  2. class Task
  3. {
  4. /**
  5. * Deliver coroutine or async task
  6. *
  7. * @param string $taskName
  8. * @param string $methodName
  9. * @param array $params
  10. * @param string $type
  11. * @param int $timeout
  12. *
  13. * @return bool|array
  14. * @throws TaskException
  15. */
  16. public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3)
  17. {
  18. $data = TaskHelper::pack($taskName, $methodName, $params, $type);
  19. if(!App::isWorkerStatus() && !App::isCoContext()){
  20. return self::deliverByQueue($data);//见下文Command章节
  21. }
  22. if(!App::isWorkerStatus() && App::isCoContext()){
  23. throw new TaskException("Please deliver task by http!");
  24. }
  25. $server = App::$server->getServer();
  26. // Delier coroutine task
  27. if ($type == self::TYPE_CO) {
  28. $tasks[0] = $data;
  29. $prifleKey = "task" . "." . $taskName . "." . $methodName;
  30. App::profileStart($prifleKey);
  31. $result = $server->taskCo($tasks, $timeout);
  32. App::profileEnd($prifleKey);
  33. return $result;
  34. }
  35. // Deliver async task
  36. return $server->task($data);
  37. }
  38. }

任务投递Task::deliver()将调用参数打包后根据$type参数通过Swoole$server->taskCo()$server->task()接口投递到Task进程
Task本身始终是同步执行的,$type仅仅影响投递这一操作的行为,Task::TYPE_ASYNC对应的$server->task()是异步投递,Task::deliver()调用后马上返回;Task::TYPE_CO对应的$server->taskCo()是协程投递,投递后让出协程控制,任务完成或执行超时后Task::deliver()才从协程返回。

任务执行

</>复制代码

  1. //SwoftTaskBootstrapListenersTaskEventListener
  2. /**
  3. * The listener of swoole task
  4. * @SwooleListener({
  5. * SwooleEvent::ON_TASK,
  6. * SwooleEvent::ON_FINISH,
  7. * })
  8. */
  9. class TaskEventListener implements TaskInterface, FinishInterface
  10. {
  11. /**
  12. * @param SwooleServer $server
  13. * @param int $taskId
  14. * @param int $workerId
  15. * @param mixed $data
  16. * @return mixed
  17. * @throws InvalidArgumentException
  18. */
  19. public function onTask(Server $server, int $taskId, int $workerId, $data)
  20. {
  21. try {
  22. /* @var TaskExecutor $taskExecutor*/
  23. $taskExecutor = App::getBean(TaskExecutor::class);
  24. $result = $taskExecutor->run($data);
  25. } catch (Throwable $throwable) {
  26. App::error(sprintf("TaskExecutor->run %s file=%s line=%d ", $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));
  27. $result = false;
  28. // Release system resources
  29. App::trigger(AppEvent::RESOURCE_RELEASE);
  30. App::trigger(TaskEvent::AFTER_TASK);
  31. }
  32. return $result;
  33. }
  34. }

此处是swoole.onTask的事件回调,其职责仅仅是将将Worker进程投递来的打包后的数据转发给TaskExecutor

SwooleTask机制的本质是Worker进程将耗时任务投递给同步的Task进程(又名TaskWorker)处理,所以swoole.onTask的事件回调是在Task进程中执行的。上文说过,Worker进程是你大部分HTTP服务代码执行的环境,但是从TaskEventListener.onTask()方法开始,代码的执行环境都是Task进程,也就是说,TaskExecutor和具体的TaskBean都是执行在Task进程中的。

</>复制代码

  1. //SwoftTaskTaskExecutor
  2. /**
  3. * The task executor
  4. *
  5. * @Bean()
  6. */
  7. class TaskExecutor
  8. {
  9. /**
  10. * @param string $data
  11. * @return mixed
  12. */
  13. public function run(string $data)
  14. {
  15. $data = TaskHelper::unpack($data);
  16. $name = $data["name"];
  17. $type = $data["type"];
  18. $method = $data["method"];
  19. $params = $data["params"];
  20. $logid = $data["logid"] ?? uniqid("", true);
  21. $spanid = $data["spanid"] ?? 0;
  22. $collector = TaskCollector::getCollector();
  23. if (!isset($collector["task"][$name])) {
  24. return false;
  25. }
  26. list(, $coroutine) = $collector["task"][$name];
  27. $task = App::getBean($name);
  28. if ($coroutine) {
  29. $result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);
  30. } else {
  31. $result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);
  32. }
  33. return $result;
  34. }
  35. }

任务执行思路很简单,将Worker进程发过来的数据解包还原成原来的调用参数,根据$name参数找到对应的TaskBean并调用其对应的task()方法。其中TaskBean使用类级别注解@Task(name="TaskName")或者@Task("TaskName")声明。

值得一提的一点是,@Task注解除了name属性,还有一个coroutine属性,上述代码会根据该参数选择使用协程的runCoTask()或者同步的runSyncTask()执行Task。但是由于而且由于SwooleTask进程的执行是完全同步的,不支持协程,所以目前版本请该参数不要配置为true。同样的在TaskBean中编写的任务代码必须的同步阻塞的或者是要能根据环境自动将异步非阻塞和协程降级为同步阻塞的

从Process中投递任务

前面我们提到:

</>复制代码

  1. SwooleTask机制的本质是Worker进程将耗时任务投递给同步的Task进程(又名 TaskWorker)处理。

换句话说,Swoole$server->taskCo()$server->task()都只能在Worker进程中使用。
这个限制大大的限制了使用场景。 如何能够为了能够在Process中投递任务呢?Swoft为了绕过这个限制提供了Task::deliverByProcess()方法。其实现原理也很简单,通过Swoole$server->sendMessage()方法将调用信息从Process中投递到Worker进程中,然后由Worker进程替其投递到Task进程当中,相关代码如下:

</>复制代码

  1. //SwoftTaskTask.php
  2. /**
  3. * Deliver task by process
  4. *
  5. * @param string $taskName
  6. * @param string $methodName
  7. * @param array $params
  8. * @param string $type
  9. * @param int $timeout
  10. * @param int $workId
  11. *
  12. * @return bool
  13. */
  14. public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool
  15. {
  16. /* @var PipeMessageInterface $pipeMessage */
  17. $server = App::$server->getServer();
  18. $pipeMessage = App::getBean(PipeMessage::class);
  19. $data = [
  20. "name" => $taskName,
  21. "method" => $methodName,
  22. "params" => $params,
  23. "timeout" => $timeout,
  24. "type" => $type,
  25. ];
  26. $message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data);
  27. return $server->sendMessage($message, $workId);
  28. }

数据打包后使用$server->sendMessage()投递给Worker:

</>复制代码

  1. //SwoftBootstrapServerServerTrait.php
  2. /**
  3. * onPipeMessage event callback
  4. *
  5. * @param SwooleServer $server
  6. * @param int $srcWorkerId
  7. * @param string $message
  8. * @return void
  9. * @throws InvalidArgumentException
  10. */
  11. public function onPipeMessage(Server $server, int $srcWorkerId, string $message)
  12. {
  13. /* @var PipeMessageInterface $pipeMessage */
  14. $pipeMessage = App::getBean(PipeMessage::class);
  15. list($type, $data) = $pipeMessage->unpack($message);
  16. App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId);
  17. }

$server->sendMessage后,Worker进程收到数据时会触发一个swoole.pipeMessage事件的回调,Swoft会将其转换成自己的swoft.pipeMessage事件并触发.

</>复制代码

  1. //SwoftTaskEventListenersPipeMessageListener.php
  2. /**
  3. * The pipe message listener
  4. *
  5. * @Listener(event=AppEvent::PIPE_MESSAGE)
  6. */
  7. class PipeMessageListener implements EventHandlerInterface
  8. {
  9. /**
  10. * @param SwoftEventEventInterface $event
  11. */
  12. public function handle(EventInterface $event)
  13. {
  14. $params = $event->getParams();
  15. if (count($params) < 3) {
  16. return;
  17. }
  18. list($type, $data, $srcWorkerId) = $params;
  19. if ($type != PipeMessage::MESSAGE_TYPE_TASK) {
  20. return;
  21. }
  22. $type = $data["type"];
  23. $taskName = $data["name"];
  24. $params = $data["params"];
  25. $timeout = $data["timeout"];
  26. $methodName = $data["method"];
  27. // delever task
  28. Task::deliver($taskName, $methodName, $params, $type, $timeout);
  29. }
  30. }

swoft.pipeMessage事件最终由PipeMessageListener处理。在相关的监听其中,如果发现swoft.pipeMessage事件由Task::deliverByProcess()产生的,Worker进程会替其执行一次Task::deliver(),最终将任务数据投递到TaskWorker进程中。

一道简单的回顾练习:从Task::deliverByProcess()到某TaskBean 最终执行任务,经历了哪些进程,而调用链的哪些部分又分别是在哪些进程中执行?

从Command进程或其子进程中投递任务

</>复制代码

  1. //SwoftTaskQueueTask.php
  2. /**
  3. * @param string $data
  4. * @param int $taskWorkerId
  5. * @param int $srcWorkerId
  6. *
  7. * @return bool
  8. */
  9. public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null)
  10. {
  11. if ($taskWorkerId === null) {
  12. $taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum);
  13. }
  14. if ($srcWorkerId === null) {
  15. $srcWorkerId = mt_rand(0, $this->workerNum - 1);
  16. }
  17. $this->check();
  18. $data = $this->pack($data, $srcWorkerId);
  19. $result = msg_send($this->queueId, $taskWorkerId, $data, false);
  20. if (!$result) {
  21. return false;
  22. }
  23. return true;
  24. }

对于Command进程的任务投递,情况会更复杂一点。
上文提到的Process,其往往衍生于Http/Rpc服务,作为同一个Manager的子孙进程,他们能够拿到SwooleServer的句柄变量,从而通过$server->sendMessage(),$server->task()等方法进行任务投递。

但在Swoft的体系中,还有一个十分路人的角色: Command
Command的进程从shellcronb独立启动,和Http/Rpc服务相关的进程没有亲缘关系。因此Command进程以及从Command中启动的Process进程是没有办法拿到SwooleServer的调用句柄直接通过UnixSocket进行任务投递的。
为了为这种进程提供任务投递支持,Swoft利用了SwooleTask进程的一个特殊功能----消息队列

同一个项目中CommandHttpRpcServer 通过约定一个message_queue_key获取到系统内核中的同一条消息队列,然后Comand进程就可以通过该消息队列向Task进程投递任务了。
该机制没有提供对外的公开方法,仅仅被包含在Task::deliver()方法中,Swoft会根据当前环境隐式切换投递方式。但该消息队列的实现依赖Semaphore拓展,如果你想使用,需要在编译PHP时加上--enable-sysvmsg参数。

定时任务

除了手动执行的普通任务,Swoft还提供了精度为秒的定时任务功能用来在项目中替代Linux的Crontab功能.

Swoft用两个前置Process---任务计划进程:CronTimerProcess和任务执行进程CronExecProcess
,和两张内存数据表-----RunTimeTable(任务(配置)表)OriginTable((任务)执行表)用于定时任务的管理调度。
两张表的每行记录的结构如下:

</>复制代码

  1. SwoftTaskCrontabTableCrontab.php
  2. /**
  3. * 任务表,记录用户配置的任务信息
  4. * 表每行记录包含的字段如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一确定一条记录
  5. * @var array $originStruct
  6. */
  7. private $originStruct = [
  8. "rule" => [SwooleTable::TYPE_STRING, 100],//定时任务执行规则,对应@Scheduled注解的cron属性
  9. "taskClass" => [SwooleTable::TYPE_STRING, 255],//任务名 对应@Task的name属性(默认为类名)
  10. "taskMethod" => [SwooleTable::TYPE_STRING, 255],//Task方法,对应@Scheduled注解所在方法
  11. "add_time" => [SwooleTable::TYPE_STRING, 11],//初始化该表内容时的10位时间戳
  12. ];
  13. /**
  14. * 执行表,记录短时间内要执行的任务列表及其执行状态
  15. * 表每行记录包含的字段如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一确定一条记录
  16. * @var array $runTimeStruct
  17. */
  18. private $runTimeStruct = [
  19. "taskClass" => [SwooleTable::TYPE_STRING, 255],//同上
  20. "taskMethod" => [SwooleTable::TYPE_STRING, 255],//同上
  21. "minute" => [SwooleTable::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 格式date("YmdHi")
  22. "sec" => [SwooleTable::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 10位时间戳
  23. "runStatus" => [SwooleTABLE::TYPE_INT, 4],//任务状态,有 0(未执行) 1(已执行) 2(执行中) 三种。
  24. //注意:这里的执行是一个容易误解的地方,此处的执行并不是指任务本身的执行,而是值`任务投递`这一操作的执行,从宏观上看换成 _未投递_,_已投递_,_投递中_描述会更准确。
  25. ];
此处为何要使用Swoole的内存Table?

Swoft的的定时任务管理是分别由 任务计划进程任务执行进程 进程负责的。两个进程的运行共同管理定时任务,如果使用进程间独立的array()等结构,两个进程必然需要频繁的进程间通信。而使用跨进程的Table(本文的Table,除非特别说明,都指SwooleSwooleTable结构)直接进行进程间数据共享,不仅性能高,操作简单 还解耦了两个进程。

为了Table能够在两个进程间共同使用,Table必须在Swoole Server启动前创建并分配内存。具体代码在SwoftTaskBootstrapListeners->onBeforeStart()中,比较简单,有兴趣的可以自行阅读。

背景介绍完了,我们来看看这两个定时任务进程的行为

</>复制代码

  1. //SwoftTaskBootstrapProcessCronTimerProcess.php
  2. /**
  3. * Crontab timer process
  4. *
  5. * @Process(name="cronTimer", boot=true)
  6. */
  7. class CronTimerProcess implements ProcessInterface
  8. {
  9. /**
  10. * @param SwoftProcessProcess $process
  11. */
  12. public function run(SwoftProcess $process)
  13. {
  14. //code....
  15. /* @var SwoftTaskCrontabCrontab $cron*/
  16. $cron = App::getBean("crontab");
  17. // Swoole/HttpServer
  18. $server = App::$server->getServer();
  19. $time = (60 - date("s")) * 1000;
  20. $server->after($time, function () use ($server, $cron) {
  21. // Every minute check all tasks, and prepare the tasks that next execution point needs
  22. $cron->checkTask();
  23. $server->tick(60 * 1000, function () use ($cron) {
  24. $cron->checkTask();
  25. });
  26. });
  27. }
  28. }

</>复制代码

  1. //SwoftTaskCrontabCrontab.php
  2. /**
  3. * 初始化runTimeTable数据
  4. *
  5. * @param array $task 任务
  6. * @param array $parseResult 解析crontab命令规则结果,即Task需要在当前分钟内的哪些秒执行
  7. * @return bool
  8. */
  9. private function initRunTimeTableData(array $task, array $parseResult): bool
  10. {
  11. $runTimeTableTasks = $this->getRunTimeTable()->table;
  12. $min = date("YmdHi");
  13. $sec = strtotime(date("Y-m-d H:i"));
  14. foreach ($parseResult as $time) {
  15. $this->checkTaskQueue(false);
  16. $key = $this->getKey($task["rule"], $task["taskClass"], $task["taskMethod"], $min, $time + $sec);
  17. $runTimeTableTasks->set($key, [
  18. "taskClass" => $task["taskClass"],
  19. "taskMethod" => $task["taskMethod"],
  20. "minute" => $min,
  21. "sec" => $time + $sec,
  22. "runStatus" => self::NORMAL
  23. ]);
  24. }
  25. return true;
  26. }

CronTimerProcessSwoft的定时任务调度进程,其核心方法是Crontab->initRunTimeTableData()
该进程使用了Swoole的定时器功能,通过SwooleTimer在每分钟首秒时执行的回调,CronTimerProcess每次被唤醒后都会遍历任务表计算出当前这一分钟内的60秒分别需要执行的任务清单,写入执行表并标记为 未执行。

</>复制代码

  1. //SwoftTaskBootstrapProcess
  2. /**
  3. * Crontab process
  4. *
  5. * @Process(name="cronExec", boot=true)
  6. */
  7. class CronExecProcess implements ProcessInterface
  8. {
  9. /**
  10. * @param SwoftProcessProcess $process
  11. */
  12. public function run(SwoftProcess $process)
  13. {
  14. $pname = App::$server->getPname();
  15. $process->name(sprintf("%s cronexec process", $pname));
  16. /** @var SwoftTaskCrontabCrontab $cron */
  17. $cron = App::getBean("crontab");
  18. // Swoole/HttpServer
  19. $server = App::$server->getServer();
  20. $server->tick(0.5 * 1000, function () use ($cron) {
  21. $tasks = $cron->getExecTasks();
  22. if (!empty($tasks)) {
  23. foreach ($tasks as $task) {
  24. // Diliver task
  25. Task::deliverByProcess($task["taskClass"], $task["taskMethod"]);
  26. $cron->finishTask($task["key"]);
  27. }
  28. }
  29. });
  30. }
  31. }

CronExecProcess作为定时任务的执行者,通过SwooleTimer0.5s唤醒自身一次,然后把 执行表 遍历一次,挑选当下需要执行的任务,通过sendMessage()投递出去并更新该 任务执行表中的状态。
该执行进程只负责任务的投递,任务的实际实际执行仍然在Task进程中由TaskExecutor处理。

定时任务的宏观执行情况如下:

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/29080.html

相关文章

  • Swoft 源码剖析 - 目录

    摘要:作者链接來源简书著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。同时顺手整理个人对源码的相关理解,希望能够稍微填补学习领域的空白。系列文章只会节选关键代码辅以思路讲解,请自行配合源码阅读。 作者:bromine链接:https://www.jianshu.com/p/2f6...來源:简书著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。Swoft...

    qpwoeiru96 评论0 收藏0
  • Swoft 源码剖析 - SwooleSwoft那些 (Http/Rpc服务)

    摘要:和服务关系最密切的进程是中的进程组,绝大部分业务处理都在该进程中进行。随后触发一个事件各组件通过该事件进行配置文件加载路由注册。事件每个请求到来时仅仅会触发事件。服务器生命周期和服务基本一致,详情参考源码剖析功能实现 作者:bromine链接:https://www.jianshu.com/p/4c0...來源:简书著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。S...

    张汉庆 评论0 收藏0
  • SwooleSwoft应用

    摘要:在中的应用官网源码解读号外号外欢迎大家我们开发组定了一个就线下聚一次的小目标上一篇源码解读反响还不错不少同学推荐再加一篇讲解一下中使用到的功能帮助大家开启的实战之旅服务器开发涉及到的相关技术领域的知识非常多不日积月累打好基础是很难真正 date: 2017-12-14 21:34:51title: swoole 在 swoft 中的应用 swoft 官网: https://www.sw...

    EscapedDog 评论0 收藏0
  • Swoft 源码解读

    摘要:官网源码解读号外号外欢迎大家我们开发组定了一个就线下聚一次的小目标里面的框架算是非常重的了这里的重先不具体到性能层面主要是框架的设计思想和框架集成的服务让框架可以既可以快速解决很多问题又可以轻松扩展中的框架有在应该无出其右了这次解读的源码 官网: https://www.swoft.org/源码解读: http://naotu.baidu.com/file/8... 号外号外, 欢迎大...

    weij 评论0 收藏0
  • swoft中Crontab定时

    摘要:我们项目使用的是框架,所以我就想到用框架的定时器。,以及的结构注在定时器这块使用到两个一个是用于存储任务的实例。 这两天老大给了个需求想把商城热点数据同步到redis缓存。我们项目使用的是swoft框架,所以我就想到用框架的Crontab定时器。但是在测试的时候发现把Table的size设置为1024时(实际上设置为任何大小都一样,贴上swoole的解释)发现内存溢出了 showImg...

    CarterLi 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<