资讯专栏INFORMATION COLUMN

PHP 命令行方式实现异步多进程模式的任务处理

Invoker / 787人阅读

摘要:定义任务处理方法。读取来自命令行的参数,开始执行任务。该函数有两个参数和,是引用类型,用来存储子进程的状态,有两个可选常量,分别表示不等待子进程结束立即返回和等待子进程结束。

用PHP来实现异步任务一直是个难题,现有的解决方案中:PHP知名的异步框架有 swooleWorkerman,但都是无法在 web 环境中直接使用的,即便强行搭建 web 环境,异步调用也是使用多进程模式实现的。但有时真的不需要用启动服务的方式,让服务端一直等待客户端消息,何况中间还不能改动服务端代码。本文就介绍一下不使用任何框架和第三方库的情况下,在 CLI 环境中如何实现多进程以及在web环境中的异步调用。
web 环境的异步调用

常用的方式有两种

1. 使用 socket 连接

这种方式就是典型的C/S架构,需要有服务端支持。

// 1. 创建socket套接字
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
// 2. 进行socket连接
socket_connect($socket, "127.0.0.1", "3939");
//socket_set_nonblock($socket); // 以非阻塞模式运行,由于在客户端不实用,所以这里不考虑
// 3. 向服务端发送请求
socket_write($socket, $request, strlen($request));
// 4. 接受服务端的回应消息(忽略非阻塞的情况,如果服务端不是提供异步服务,那这一步可以省略)
$recv = socket_read($socket, 2048);
// 5. 关闭socket连接
socket_close($socket);
2. 使用 popen 打开进程管道

这种方式是使用操作系统命令,由操作系统直接执行。
本文讨论的异步调用就是使用这种方式。

$sf = "/path/to/cli_async_task.php"; //要执行的脚本文件
$op = "call"; //脚本文件接收的参数1
$data = base64_encode(serialize(["TestTask", "arg1", "arg2"])); //脚本文件接收的参数2
pclose(popen("php "$sf" --op $op --data $data &", "r")); //打开之后接着就关闭进程管道,让该进程以守护模式运行
echo PHP_EOL."异步任务已执行。".PHP_EOL;

这种方式的优点就是:一步解决,当前进程不需要任何开销。
缺点也很明显:无法跟踪任务脚本的运行状态。
所以重头戏会是在执行任务的脚本文件上,下面就介绍任务处理和多进程的实现方式。

CLI 环境的多进程任务处理

注意:多进程模式仅支持Linux,不支持Windows!!

这里会从0开始(未使用任何框架和类库)介绍每一个步骤,最后会附带一份完整的代码

1. 创建脚本

任何脚本不可忽视的地方就是错误处理。所以写一个任务处理脚本首先就是写错误处理方式。

在PHP中就是调用 set_exception_handler set_error_handler register_shutdown_function 这三个函数,然后写上自定义的处理方法。

接着是定义自动加载函数 spl_autoload_register 免去每使用一个新类都要 require / include 的烦恼。

定义日志操作方法。

定义任务处理方法。

读取来自命令行的参数,开始执行任务。

2. 多进程处理

PHP 创建多进程是使用 pcntl_fork 函数,该函数会 fork 一份当前进程(影分身术),于是就有了两个进程,当前进程是主进程(本体),fork 出的进程是子进程(影分身)。需要注意的是两个进程代码环境是一样的,两个进程都是执行到了 pcntl_fork 函数位置。区别就是 getmypid 获得的进程号不一样,最重要的区分是当调用 pcntl_fork函数时,子进程获得的返回值是 0,而主进程获得的是子进程的进程号 pid

好了,当我们知道谁是子进程后,就可以让该子进程执行任务了。

那么主进程是如何得知子进程的状态呢?
使用 pcntl_wait。该函数有两个参数 $status$options$status 是引用类型,用来存储子进程的状态,$options 有两个可选常量WNOHANG| WUNTRACED,分别表示不等待子进程结束立即返回和等待子进程结束。很明显使用WUNTRACED会阻塞主进程。(也可以使用 pcntl_waitpid 函数获取特定 pid 子进程状态)

在多进程中,主进程要做的就是管理每个子进程的状态,否则子进程很可能无法退出而变成僵尸进程。

关于多进程间的消息通信
这一块需要涉及具体的业务逻辑,所以只能简单的提一下。不考虑使用第三方比如 redis 等服务的情况下,PHP原生可以实现就是管道通信共享内存等方式。实现起来都比较简单,缺点就是可使用的数据容量有限,只能用简单文本协议交换数据。

如何手动结束所有进程任务
如果多进程处理不当,很可能导致进程任务卡死,甚至占用过多系统资源,此时只能手动结束进程。
除了一个个的根据进程号来结束,还有一个快速的方法是首先在任务脚本里自定义进程名称,就是调用cli_set_process_title函数,然后在命令行输入:ps aux|grep cli_async_worker |grep -v grep|awk "{print $2}"|xargs kill -9 (里面的 cli_async_worker 就是自定义的进程名称),这样就可以快速结束多进程任务了。


未完待续...


以下是完整的任务执行脚本代码:
可能无法直接使用,需要修改的地方有:

脚本目录和日志目录常量

自动加载任务类的方法(默认是加载脚本目录中以Task结尾的文件)

其他的如:错误和日志处理方式和文本格式就随意吧...

如果命名管道文件设置有错误,可能导致进程假死,你可能需要手动删除进程管道通信的代码。

多进程的例子:execAsyncTask("multi", [ "test" => ["a", "b", "c"], "grab" => [["url" => "https://www.baidu.com", "callback" => "http://localhost"]] ]);。执行情况可以在日志文件中查看。execAsyncTask函数参考【__使用popen打开进程管道__】。

[%s] %s (%s)". "
". "
%s
", $time, $e->getMessage(), $e->getCode(), $e->getTraceAsString() ); file_put_contents(TASK_LOGS_PATH ."/exception-".date("Ymd").".log", $msg.PHP_EOL, FILE_APPEND|LOCK_EX); }); set_error_handler(function($errno, $errmsg, $filename, $line) { if (!(error_reporting() & $errno)) return; ob_start(); debug_print_backtrace(); $backtrace = ob_get_contents(); ob_end_clean(); $datetime = date("Y-m-d H:i:s", time()); $msg = << $header) { if (!is_numeric($_k)) $header = sprintf("%s: %s", $_k, $header); $_headers .= $header . " "; } } $headers = "Connection: close " . $_headers; $opts = array( "http" => array( "method" => strtoupper(@$job["method"] ?: "get"), "content" => @$job["data"] ?: null, "header" => $headers, "user_agent" => @$job["args"]["user_agent"] ?: "HTTPGRAB/1.0 (compatible)", "proxy" => @$job["args"]["proxy"] ?: null, "timeout" => intval(@$job["args"]["timeout"] ?: 120), "protocol_version" => @$job["args"]["protocol_version"] ?: "1.1", "max_redirects" => 3, "ignore_errors" => true ) ); $ret = @file_get_contents($url, false, stream_context_create($opts)); //debug_log($url." -->".strlen($ret)); if ($ret and isset($job["callback"])) { $postdata = http_build_query(array( "msg_id" => @$job["msg_id"] ?: 0, "url" => @$job["url"], "result" => $ret )); $opts = array( "http" => array( "method" => "POST", "header" => "Content-type:application/x-www-form-urlencoded". " ", "content" => $postdata, "timeout" => 30 ) ); file_get_contents($job["callback"], false, stream_context_create($opts)); //debug_log(json_encode(@$http_response_header)); //debug_log($job["callback"]." -->".$ret2); } return $ret; } function clean($tmpdirs, $expires=3600*24*7) { $ret = []; foreach ((array)$tmpdirs as $tmpdir) { $ret[$tmpdir] = 0; foreach (glob($tmpdir.DIRECTORY_SEPARATOR."*") as $_file) { if (fileatime($_file) < (time()-$expires)) { if (@unlink($_file)) $ret[$tmpdir]++; } } } return $ret; } function backup($file, $dest) { $zip = new ipArchive(); if (!$zip->open($file, ipArchive::CREATE)) { return false; } _backup_dir($zip, $dest); $zip->close(); return $file; } function _backup_dir($zip, $dest, $sub="") { $dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $dir = opendir($dest); if (!$dir) return false; while (false !== ($file = readdir($dir))) { if (is_file($dest . $file)) { $zip->addFile($dest . $file, $sub . $file); } else { if ($file != "." and $file != ".." and is_dir($dest . $file)) { //$zip->addEmptyDir($sub . $file . DIRECTORY_SEPARATOR); _backup_dir($zip, $dest . $file, $file); } } } closedir($dir); return true; } function execute_task($op, $data) { debug_log("Start..."); $t1 = microtime(true); switch($op) { case "call": //执行任务脚本类 $cmd = $data; if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd; elseif (is_array($cmd)) { if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0]; } $ret = call($cmd); break; case "grab": //抓取网页 if (is_string($data)) $data = ["url" => $data]; if (is_array($data)) $ret = grab($data); else throw new Exception("无效的命令参数!"); break; case "clean": //清理缓存文件夹:dirs 需要清理的文件夹列表,expires 过期时间(秒,默认7天) if (isset($data["dirs"])) { $ret = clean($data["dirs"], @$data["expires"]); } else { $ret = clean($data); } break; case "backup": //备份文件:zip 备份到哪个zip文件,dest 需要备份的文件夹 if (isset($data["zip"]) and is_dir($data["dest"])) $ret = backup($data["zip"], $data["dest"]); else throw new Exception("没有指定需要备份的文件!"); break; case "require": //加载脚本文件 if (is_file($data)) $ret = require($data); else throw new Exception("不是可请求的文件!"); break; case "test": sleep(rand(1, 5)); $ret = ucfirst(strval($data)). ".PID:". getmypid(); break; case "multi": //多进程处理模式 $results = $childs = []; $fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . "pipe.". posix_getpid(); if (!file_exists($fifo)) { if (!posix_mkfifo($fifo, 0666)) { //开启进程数据通信管道 throw new Exception("make pipe failed!"); } } //$shmid = shmop_open(ftok(__FILE__, "h"), "c", 0644, 4096); //共享内存 //shmop_write($shmid, serialize([]), 0); //$data = unserialize(shmop_read($shmid, 0, 4096)); //shmop_delete($shmid); //shmop_close($shmid); foreach($data as $_op => $_datas) { $_datas = (array)$_datas; //data 格式为数组表示一个 op 有多个执行数据 foreach($_datas as $_data) { $pid = pcntl_fork(); if ($pid == 0) { //子进程中执行任务 $_ret = execute_task($_op, $_data); $_pid = getmypid(); $pipe = fopen($fifo, "w"); //写 //stream_set_blocking($pipe, false); $_ret = serialize(["pid" => $_pid, "op" => $_op, "args" => $_data, "result" => $_ret]); if (strlen($_ret) > 4096) //写入管道的数据最大4K $_ret = serialize(["pid" => $_pid, "op" => $_op, "args" => $_data, "result" => "[RESPONSE_TOO_LONG]"]); //debug_log("write pipe: ".$_ret); fwrite($pipe, $_ret.PHP_EOL); fflush($pipe); fclose($pipe); exit(0); //退出子进程 } elseif ($pid > 0) { //主进程中记录任务 $childs[] = $pid; $results[$pid] = 0; debug_log("fork by child: ".$pid); //pcntl_wait($status, WNOHANG); } elseif ($pid == -1) { throw new Exception("could not fork at ". getmygid()); } } } $pipe = fopen($fifo, "r+"); //读 stream_set_blocking($pipe, true); //阻塞模式,PID与读取的管道数据可能会不一致。 $n = 0; while(count($childs) > 0) { foreach($childs as $i => $pid) { $res = pcntl_waitpid($pid, $status, WNOHANG); if (-1 == $res || $res > 0) { $_ret = @unserialize(fgets($pipe)); //读取管道数据 $results[$pid] = $_ret; unset($childs[$i]); debug_log("read child: ".$pid . " - " . json_encode($_ret, 64|256)); } if ($n > 1000) posix_kill($pid, SIGTERM); //超时(10分钟)结束子进程 } usleep(200000); $n++; } debug_log("child process completed."); @fclose($pipe); @unlink($fifo); $ret = json_encode($results, 64|256); break; default: throw new Exception("没有可执行的任务!"); break; } $t2 = microtime(true); $times = round(($t2 - $t1) * 1000, 2); $log = sprintf("[%s] %s --> (%s) %sms", strtoupper($op), @json_encode($data, 64|256), @strlen($ret)<65?$ret:@strlen($ret), $times); debug_log($log); return $ret; } // 读取 CLI 命令行参数 $params = getopt("", array("op:", "data:")); $op = $params["op"]; $data = unserialize(base64_decode($params["data"])); // 开始执行任务 execute_task($op, $data); function __autoload($classname) { $parts = explode("", ltrim($classname, "")); if (false !== strpos(end($parts), "_")) { array_splice($parts, -1, 1, explode("_", current($parts))); } $filename = implode(DIRECTORY_SEPARATOR, $parts) . ".php"; if ($filename = stream_resolve_include_path($filename)) { include $filename; } else if (preg_match("/.*Task$/", $classname)) { //查找以Task结尾的任务脚本类 include TASK_PATH . DIRECTORY_SEPARATOR . $classname . ".php"; } else { return false; } }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/30016.html

相关文章

  • 使用 mixphp 打造进程异步邮件发送

    摘要:消费者开发本例我们使用的多进程开发工具来完成这个需求,通常使用常驻进程来处理队列的消费,所以我们使用的类型,模式。中进程负责执行邮件发送任务。此时终端将打印成功收到测试邮件官网 注意:这个是 MixPHP V1 的范例 邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。 传统 MVC 框架由于缺少多进程...

    EdwardUp 评论0 收藏0
  • swoole进程结构

    摘要:管理进程会监视所有子进程的退出事件,当进程发生致命错误或者运行生命周期结束时,管理进程会回收此进程,并创建新的进程。换句话也就是说,对于进程的创建回收等操作全权有保姆进程进行管理。跟的交互请求到达实际上是与进程中的某个线程发生了连接。 showImg(https://segmentfault.com/img/bVbrhb2?w=600&h=360); 一、进程的基本知识 什么是进程,所...

    546669204 评论0 收藏0
  • PHP并发IO编程之路

    摘要:下文如无特殊声明将使用进程同时表示进程线程。收到数据后服务器程序进行处理然后使用向客户端发送响应。现在各种高并发异步的服务器程序都是基于实现的,比如。 并发 IO 问题一直是服务器端编程中的技术难题,从最早的同步阻塞直接 Fork 进程,到 Worker 进程池/线程池,到现在的异步IO、协程。PHP 程序员因为有强大的 LAMP 框架,对这类底层方面的知识知之甚少,本文目的就是详细介...

    Riddler 评论0 收藏0
  • nodejs中进程,深入解析child_process模块和cluster模块

    摘要:严格来说,并不是单线程的。其他异步和事件驱动相关的线程通过来实现内部的线程池和线程调度。线程是最小的进程,因此也是单进程的。子进程中执行的是非程序,提供一组参数后,执行的结果以回调的形式返回。在子进程中通过和的机制来接收和发送消息。   node遵循的是单线程单进程的模式,node的单线程是指js的引擎只有一个实例,且在nodejs的主线程中执行,同时node以事件驱动的方式处理IO...

    JinB 评论0 收藏0
  • Gearman安装和使用

    摘要:启动和如下信息则表示成功查看版本安装扩展从下载最新扩展需下载最新源码包,并解压缩安装安装成功后信息然后,配置文件增加内容重启后,出现如下信息则表示安装扩展成功。 首发于 樊浩柏科学院 Gearman 是一个分布式任务分发系统,通过程序调用(API,跨语言)分布式地把工作委派给更适合做某项工作的机器,且这些机器可以以并发的、负载均衡的形式来共同完成某项工作。当计算密集型场景时,适合在后...

    U2FsdGVkX1x 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<