资讯专栏INFORMATION COLUMN

Mix PHP V2 实例:协程池异步邮件发送守护程序

lauren_liuling / 1663人阅读

摘要:消费者开发使用本例时,请确保你使用的编译时开启了本例我们采用的守护程序协程池来完成一个超高性能的邮件发送程序。

去年 Mix PHP V1 发布时,我写了一个多进程的邮件发送实例: 使用 mixphp 打造多进程异步邮件发送,今年 Mix PHP V2 发布,全面的协程支持让我们可以使用一个进程就可达到之前多个进程都无法达到的更高 IO 性能,所以今天重写一个协程池版本的邮件发送实例。

邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。

下面演示一个异步邮件发送系统的开发过程,涉及知识点:

异步

消息队列

守护进程

协程池

如何使用消息队列实现异步

PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:

redis

rabbitmq

kafka

本次我们选用 Redis 来实现异步邮件发送,Redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:

// 入列
$redis->lpush($key, $data);
// 出列
$data = $redis->rpop($key);
// 阻塞出列
$data = $redis->brpop($key, 10);
架构设计

本实例由传统 MVC 框架投递邮件发送需求(生产者),Mix PHP 编写的守护程序执行发送任务(消费者)。

邮件发送库选型

以往我们通常使用框架提供的邮件发送库,或者网上下载别的用户分享的库,composer 出现后,https://packagist.org/ 上有大量优质的库,我们只需选择一个最好的即可,本例选择 swiftmailer。

由于发送任务是由 Mix PHP 执行,所以 swiftmailer 是安装在 Mix PHP 项目中,在项目根目录中执行以下命令安装:

composer require swiftmailer/swiftmailer
生产者开发

在邮件发送这个需求中生产者是指投递发送任务的一方,这一方通常是一个接口或网页,这个部分并不一定需 Mix PHP 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息投递到消息队列中即可。

在传统 MVC 框架的控制器中增加如下代码:

通常框架中使用 Redis 会安装一个类库来使用,本例使用原生代码,便于理解。
// 连接
$redis = new Redis();
if (!$redis->connect("127.0.0.1", 6379)) {
    throw new Exception("Redis connect failed.");
}
$redis->auth("");
$redis->select(0);
// 投递任务
$data = [
    "to"      => "***@qq.com",
    "body"    => "The message content",
    "subject" => "The title content",
];
$redis->lpush("queue:email", serialize($data));

通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有在生产者中执行,而是待消息被消费者获取后才执行。

消费者开发
使用本例时,请确保你使用的 Swoole 编译时开启了 openssl

本例我们采用 Mix PHP V2 的守护程序、协程池来完成一个超高性能的邮件发送程序。

因为我们是开发一个守护程序,所以我们在 applications/daemon 模块中开发,首先我们在配置 applications/daemon/config/main.php 中注册一个命令:

// 命令
"commands"         => [

    "mailer" => ["Mailer", "description" => "Mailer daemon."],

],

注册的命令中指定的 Mailer 命令类,接下来我们编写一个 MailerCommand 类:

applications/daemon/src/Commands/MailerCommand.php

 */
class MailerCommand
{

    /**
     * 退出
     * @var bool
     */
    public $quit = false;

    /**
     * 主函数
     */
    public function main()
    {
        // 捕获信号
        ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], function ($signal) {
            $this->quit = true;
            ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], null);
        });
        // 协程池执行任务
        xgo(function () {
            $maxWorkers = 20;
            $maxQueue   = 20;
            $jobQueue   = new Channel($maxQueue);
            $dispatch   = new Dispatcher([
                "jobQueue"   => $jobQueue,
                "maxWorkers" => $maxWorkers,
            ]);
            $dispatch->start(MailerWorker::class);
            // 投放任务
            $redis = app()->redisPool->getConnection();
            while (true) {
                if ($this->quit) {
                    $dispatch->stop();
                    return;
                }
                try {
                    $data = $redis->brPop(["queue:email"], 3);
                } catch (Throwable $e) {
                    $dispatch->stop();
                    return;
                }
                if (!$data) {
                    continue;
                }
                $data = array_pop($data); // brPop命令最后一个键才是值
                $jobQueue->push($data);
            }
        });
    }

}
$data = $redis->brPop(["queue:email"], 3); 外部的异常捕获可得知,当 Redis 连接出错时,比如 Redis 重启、连接异常时协程池会安全退出,也就是说当进程异常退出后用户需使用 supervisorpm2 等工具重启守护进程。

上面是一个 Mix PHP 协程池的使用代码,基本可以直接复制使用,框架默认包含了协程池的 Demo,本次实例只是修改了协程池的 Worker,本命令主要是完成从 Redis 队列中获取消息然后 push 到 jobQueue 中,jobQueue 中的数据会被 20 个 Worker 实例中某一个抢占后并行执行,本例的邮件发送代码逻辑就在 MailerWorker 类中:

applications/daemon/src/Libraries/MailerWorker.php

 */
class MailerWorker extends AbstractWorker implements WorkerInterface
{

    /**
     * 邮件发送器
     * @var Mailer
     */
    public $mailer;

    /**
     * 初始化事件
     */
    public function onInitialize()
    {
        parent::onInitialize(); // TODO: Change the autogenerated stub
        // 实例化一些需重用的对象
        $this->mailer = new Mailer();
    }

    /**
     * 处理
     * @param $data
     */
    public function handle($data)
    {
        // TODO: Implement handle() method.
        $data = unserialize($data);
        if (empty($data)) {
            return;
        }
        try {
            $this->mailer->send($data["to"], $data["subject"], $data["body"]);
            app()->log->info("Mail sent successfully:to {to} subject {subject}", $data);
        } catch (Throwable $e) {
            app()->log->error("Mail failed to send:to {to} subject {subject} error {error}", array_merge($data, ["error" => $e->getMessage()]));
        }
    }

}

由以上代码可见,Worker 在初始化时,新增了一个 Mailer 类的属性,当 jobQueue 消息投递过来时消息会传递到 handle 方法,在该方法中使用 Mailer 类的实例完成邮件发送任务,所以我们要编写了一个 Mailer 发送程序:

applications/daemon/src/Libraries/Mailer.php

 */
class Mailer
{

    /**
     * 配置信息
     */
    const HOST = "smtpdm.aliyun.com";
    const PORT = 465;
    const SECURITY = "ssl";
    const USERNAME = "***";
    const PASSWORD = "***";

    /**
     * Mailer constructor.
     */
    public function __construct()
    {
        // 开启协程钩子
        Coroutine::enableHook();
    }

    /**
     * 发送
     * @param $to
     * @param $subject
     * @param $body
     * @return int
     */
    public function send($to, $subject, $body)
    {
        // Create the Transport
        $transport = (new Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
            ->setUsername(self::USERNAME)
            ->setPassword(self::PASSWORD);
        // Create the Mailer using your created Transport
        $mailer = new Swift_Mailer($transport);
        // Create a message
        $message = (new Swift_Message($subject))
            ->setFrom([self::USERNAME => "**网"])
            ->setTo($to)
            ->setBody($body);
        // Send the message
        return $mailer->send($message);
    }

}

在 Mailer 发送程序中我们使用了前面 composer 安装的 swiftmailer 库来发送邮件,以上就完成了全部的代码逻辑,现在我们开始测试。

先启动消费者守护程序:

[root@localhost bin]# ./mix-daemon mailer

将上文的生产者脚本命名为 push.php 然后在 CLI 中执行 (开一个新终端):

[root@localhost bin]# php /tmp/push.php

消费者守护程序结果:

[root@localhost bin]# ./mix-daemon mailer
[info] 2019-04-15 11:48:36 [message] Mail sent successfully:to ***@qq.com subject The title content

命令行终端打印了发送成功的日志,发送完成。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/31259.html

相关文章

  • Mix PHP V2 实例:AliCloud 短信程池异步发送守护程序

    摘要:前些时间我们发布了实例协程池异步邮件发送守护程序范例,这一次我们提供一个使用大厂通过协程化来并行执行短信发送任务,本文是一个代码简单性能极强的范例。 前些时间我们发布了 Mix PHP V2 实例:协程池异步邮件发送守护程序 范例,这一次我们提供一个使用大厂 SDK 通过 Swoole Hook 协程化来并行执行短信发送任务,本文是一个代码简单、IO 性能极强的范例。 请先升级到 m...

    qc1iu 评论0 收藏0
  • Mix PHP V2 新特性:协程、定时器

    摘要:主函数查询数据不手动释放的连接不会归还连接池,会在析构时丢弃执行结果为,说明是并行执行的。主函数查询数据即便抛出了异常,仍然能执行到,没有导致内的一直处于阻塞状态。主函数一次性定时持续定时停止定时 协程 Mix PHP V2 基于 Swoole 4 的 PHP Stream Hook 协程技术开发,协程使用方式与 Golang 几乎一致,包括框架封装的协程池、连接池、命令行处理都大量参...

    Nosee 评论0 收藏0
  • Mix PHP V2 生态:让 Guzzle 支持 Swoole 的 Hook 协程

    摘要:是一个非常流行的的客户端,现在各大厂的也都开始基于开发,因为只支持的协程,而默认是使用扩展的,所以开发了,能在不修改源码的情况下让协程化。 Guzzle 是一个非常流行的 PHP 的 HTTP 客户端,现在各大厂的 SDK 也都开始基于 Guzzle 开发,因为 Swoole 只支持 PHP Stream 的协程 Hook ,而 Guzzle 默认是使用 cURL 扩展的,所以 Mix...

    Flands 评论0 收藏0
  • swoole通用程池的实现

    摘要:之前过行代码实现通用协程池今天看了下相关文档,用也实现了一个,由于没有的,所以实现的有点简单,但是实用性还可以,通过工厂函数实现了通用性。官方的协程池是用只能用在。因为协程池代码层耦合了实例化逻辑。 之前过golang40行代码实现通用协程池 今天看了下swoole相关文档,用PHP也实现了一个,由于swoole没有golang的select,所以实现的有点简单,但是实用性还可以,通过...

    wanghui 评论0 收藏0
  • PHP回顾之协程

    摘要:本文先回顾生成器,然后过渡到协程编程。其作用主要体现在三个方面数据生成生产者,通过返回数据数据消费消费者,消费传来的数据实现协程。解决回调地狱的方式主要有两种和协程。重点应当关注控制权转让的时机,以及协程的运作方式。 转载请注明文章出处: https://tlanyan.me/php-review... PHP回顾系列目录 PHP基础 web请求 cookie web响应 sess...

    Java3y 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<