摘要:消费者开发本例我们使用的多进程开发工具来完成这个需求,通常使用常驻进程来处理队列的消费,所以我们使用的类型,模式。中进程负责执行邮件发送任务。此时终端将打印成功收到测试邮件官网
注意:这个是 MixPHP V1 的范例
邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。
传统 MVC 框架由于缺少多进程开发能力,通常是采用同一个脚本执行多次,产生多个进程的方式,mixphp 封装了 TaskExecutor 专用于多进程开发,用户能非常简单的开发出功能完善的高可用多进程应用。
下面演示一个异步邮件发送系统的开发过程,涉及知识点:
异步
消息队列
多进程
守护进程
如何使用消息队列实现异步PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:
redis
rabbitmq
kafka
本次我们选用 redis 来实现异步邮件发送,redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:
// 入列 $redis->lpush($key, $data); // 出列 $data = $redis->rpop($key); // 阻塞出列 $data = $redis->brpop($key, 10);架构设计
本实例由传统 MVC 框架投递邮件发送需求,MixPHP 多进程执行发送任务。
邮件发送库选型以往我们通常使用框架提供的邮件发送库,或者网上下载别的用户分享的库,composer 出现后,https://packagist.org/ 上有大量优质的库,我们只需选择一个最好的即可,本例选择 swiftmailer。
由于发送任务是由 MixPHP 执行,所以 swiftmailer 是安装在 MixPHP 项目中,在项目根目录中执行以下命令安装:
composer require swiftmailer/swiftmailer生产者开发
在邮件发送这个需求中生产者是指投递发送任务的一方,这一方通常是一个接口或网页,这个部分并不一定需 mixphp 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息投递到消息队列中即可。
在传统 MVC 框架的控制器中增加如下代码:
通常框架中使用 redis 会安装一个类库来使用,本例使用原生代码,便于理解。
// 连接
$redis = new Redis();
if (!$redis->connect("127.0.0.1", 6379)) {
throw new Exception("Redis Connect Failure");
}
$redis->auth("");
$redis->select(0);
// 投递任务
$data = [
"to" => ["***@qq.com" => "A name"],
"body" => "Here is the message itself",
"subject" => "The title content",
];
$redis->lpush("queue:email", serialize($data));
通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有执行。
消费者开发本例我们使用 MixPHP 的多进程开发工具 TaskExecutor 来完成这个需求,通常使用常驻进程来处理队列的消费,所以我们使用 TaskExecutor 的 TYPE_DAEMON 类型,MODE_PUSH 模式。
TaskExecutor 的 MODE_PUSH 模式有二种进程:
左进程:负责从消息队列取出任务数据,投放给中进程。
中进程:负责执行邮件发送任务。
PushCommand.php 代码如下:
*/
class PushCommand extends BaseCommand
{
// 配置信息
const HOST = "smtpdm.aliyun.com";
const PORT = 465;
const SECURITY = "ssl";
const USERNAME = "****@email.***.com";
const PASSWORD = "****";
// 初始化事件
public function onInitialize()
{
parent::onInitialize(); // TODO: Change the autogenerated stub
// 获取程序名称
$this->programName = Input::getCommandName();
// 设置pidfile
$this->pidFile = "/var/run/{$this->programName}.pid";
}
/**
* 获取服务
* @return TaskExecutor
*/
public function getTaskService()
{
return create_object(
[
// 类路径
"class" => "mix askTaskExecutor",
// 服务名称
"name" => "mix-daemon: {$this->programName}",
// 执行类型
"type" => mix askTaskExecutor::TYPE_DAEMON,
// 执行模式
"mode" => mix askTaskExecutor::MODE_PUSH,
// 左进程数
"leftProcess" => 1,
// 中进程数
"centerProcess" => 5,
// 任务超时时间 (秒)
"timeout" => 5,
]
);
}
// 启动
public function actionStart()
{
// 预处理
if (!parent::actionStart()) {
return ExitCode::UNSPECIFIED_ERROR;
}
// 启动服务
$service = $this->getTaskService();
$service->on("LeftStart", [$this, "onLeftStart"]);
$service->on("CenterStart", [$this, "onCenterStart"]);
$service->start();
// 返回退出码
return ExitCode::OK;
}
// 左进程启动事件回调函数
public function onLeftStart(LeftProcess $worker)
{
try {
// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
$queueModel = Redis::getInstance();
// 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
for ($j = 0; $j < 16000; $j++) {
// 从消息队列中间件阻塞获取一条消息
$data = $queueModel->brpop("queue:email", 10);
if (empty($data)) {
continue;
}
list(, $data) = $data;
// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->push($data, false);
}
} catch (Exception $e) {
// 休息一会,避免 CPU 出现 100%
sleep(1);
// 抛出错误
throw $e;
}
}
// 中进程启动事件回调函数
public function onCenterStart(CenterProcess $worker)
{
// 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
for ($j = 0; $j < 16000; $j++) {
// 从进程消息队列中抢占一条消息
$data = $worker->pop();
if (empty($data)) {
continue;
}
// 处理消息
try {
// 处理消息,比如:发送短信、发送邮件、微信推送
var_dump($data);
$ret = self::sendEmail($data);
var_dump($ret);
} catch (Exception $e) {
// 回退数据到消息队列
$worker->rollback($data);
// 休息一会,避免 CPU 出现 100%
sleep(1);
// 抛出错误
throw $e;
}
}
}
// 发送邮件
public static function sendEmail($data)
{
// Create the Transport
$transport = (new Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
->setUsername(self::USERNAME)
->setPassword(self::PASSWORD);
// Create the Mailer using your created Transport
$mailer = new Swift_Mailer($transport);
// Create a message
$message = (new Swift_Message($data["subject"]))
->setFrom([self::USERNAME => "**网"])
->setTo($data["to"])
->setBody($data["body"]);
// Send the message
$result = $mailer->send($message);
return $result;
}
}
测试
在 shell 中启动 push 常驻程序。
[root@localhost bin]# ./mix-daemon push start mix-daemon "push" start successed.
调用接口往消息队列投放任务。
此时 shell 终端将打印:
成功收到测试邮件:
MixPHPGitHub: https://github.com/mixstart/m...
官网:http://www.mixphp.cn/
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/28918.html
摘要:消费者开发使用本例时,请确保你使用的编译时开启了本例我们采用的守护程序协程池来完成一个超高性能的邮件发送程序。 去年 Mix PHP V1 发布时,我写了一个多进程的邮件发送实例: 使用 mixphp 打造多进程异步邮件发送,今年 Mix PHP V2 发布,全面的协程支持让我们可以使用一个进程就可达到之前多个进程都无法达到的更高 IO 性能,所以今天重写一个协程池版本的邮件发送实例。...
摘要:异步队列消费者开发只提供了模式下运行控制器方法,并未提供主进程多子进程的进程模型,并未提供多线程处理。多线程异步队列服务只需写好控制器方法,然后在配置文件中配置下路由命名空间进程线程数量,就可在模式中启动多进程多线程模型的异步队列处理程序。 最近业余时间一直在开发ExpressPHP的第二个版本 MixPHP,今天下班想起之前一个面试官的问题:你为什么还要再造一个轮子呢?仔细回想,第一...
摘要:如何使用优化高并发场景写库或者耗时计算在的接口中使用消息队列,把要入库的数据写入的类型中。高容错子进程异常奔溃时,主进程将重建子进程。高性能多进程运行,充分利用多个并行计算,性能强劲。 经常在群里听到一些朋友问:TP 的项目怎么迁移到 mixphp 来处理高并发,我通常都是回复需要重写,可是一个开发很久的 TP 项目,代码量巨大,又怎么可能会花大量时间成本来重写呢? 那么为何我们不尝试...
摘要:框架最新源代码行数行,因此可以很容易的改造它,成为你们公司的专属框架。也不同于其他基于的微服务框架,只聚焦于微服务治理,定位于开发的更多领域,覆盖从初创到亿元级体量的技术诉求。的授权全靠用户自愿购买,详情 MixPHP是什么 MixPHP 是秉承 普及 PHP 常驻内存型解决方案,促进 PHP 往更后端发展 的理念而创造,采用 Swoole 扩展作为底层引擎,围绕常驻内存的方式而设计,...
摘要:在多种环境中迁移,代码无需修改,是无缝迁移的。由于大部分用户开发是在中进行,因此开发阶段我们推荐使用部署方案,因为更简单快速,下面整体演示一下的环境搭建。安装解压至指定安装目录。先不要启动,这会启动会报错,没加环境变量。 MixPHP 是一款基于 Swoole 的常驻内存型 PHP 高性能框架。 MixPHP 同时支持多种环境中执行: Nginx + mix-httpd (使用到 S...
阅读 3086·2021-09-22 15:20
阅读 3210·2021-09-22 15:19
阅读 3803·2021-09-22 15:15
阅读 2988·2021-09-08 09:35
阅读 2567·2019-08-30 15:44
阅读 3216·2019-08-30 10:50
阅读 4038·2019-08-29 16:25
阅读 1776·2019-08-26 13:55