资讯专栏INFORMATION COLUMN

php+redis实现延迟队列

罗志环 / 256人阅读

摘要:基于有序集实现延迟任务执行,比如某个时间给某个用户发短信,订单过期处理,等等我是在框架上写的,实现起来很简单对于一些不是很复杂的应用足够了,目前在公司项目中使用,后台进程并没有实现多进程,不多说,贴代码,不回排版,见谅命令行脚本执行方法这

基于redis有序集实现延迟任务执行,比如某个时间给某个用户发短信,订单过期处理,等等
我是在tp5框架上写的,实现起来很简单,对于一些不是很复杂的应用足够了,目前在公司项目中使用,后台进程并没有实现多进程,
不多说,贴代码,不回排版,见谅

1、命令行脚本 执行方法:php think delay-queue queuename(这是有序集的key)

namespace appcommand;

use appcommonlibdelayqueueDelayQueue;
use thinkconsoleCommand;
use thinkconsoleInput;
use thinkconsoleOutput;
use thinkDb;

class DelayQueueWorker extends Command
{
    const COMMAND_ARGV_1 = "queue";

    protected function configure()
    {
        $this->setName("delay-queue")->setDescription("延迟队列任务进程");
        $this->addArgument(self::COMMAND_ARGV_1);
    }

    protected function execute(Input $input, Output $output)
    {
        $queue = $input->getArgument(self::COMMAND_ARGV_1);
        //参数1 延迟队列表名,对应与redis的有序集key名
        while (true) {
            DelayQueue::getInstance($queue)->perform();
            usleep(300000);
        }
    }
}

库类目录结构

config.php 里是redis连接参数配置

RedisHandler.php只实现有序集的操作,重连机制还没有实现

namespace appcommonlibdelayqueue;

class RedisHandler
{
    public $provider;
    private static $_instance = null;

    private function __construct() {
        $this->provider = new Redis();
        //host port
        $config = require_once "config.php";
        $this->provider->connect($config["redis_host"], $config["redis_port"]);
    }

    final private function __clone() {}

    public static function getInstance() {
        if(!self::$_instance) {
            self::$_instance = new RedisHandler();
        }
        return self::$_instance;
    }

    /**
     * @param string $key 有序集key
     * @param number $score 排序值
     * @param string $value 格式化的数据
     * @return int
     */
    public function zAdd($key, $score, $value)
    {
        return $this->provider->zAdd($key, $score, $value);
    }

    /**
     * 获取有序集数据
     * @param $key
     * @param $start
     * @param $end
     * @param null $withscores
     * @return array
     */
    public function zRange($key, $start, $end, $withscores = null)
    {
        return $this->provider->zRange($key, $start, $end, $withscores);
    }

    /**
     * 删除有序集数据
     * @param $key
     * @param $member
     * @return int
     */
    public function zRem($key,$member)
    {
        return $this->provider->zRem($key,$member);
    }

}

延迟队列类

namespace appcommonlibdelayqueue;

class DelayQueue
{

    private $prefix = "delay_queue:";

    private $queue;

    private static $_instance = null;

    private function __construct($queue) {
        $this->queue = $queue;
    }

    final private function __clone() {}

    public static function getInstance($queue = "") {
        if(!self::$_instance) {
            self::$_instance = new DelayQueue($queue);
        }
        return self::$_instance;
    }

    /**
     * 添加任务信息到队列
     *
     * demo DelayQueue::getInstance("test")->addTask(
     *    "appcommonlibdelayqueuejobTest",
     *    strtotime("2018-05-02 20:55:20"),
     *    ["abc"=>111]
     * );
     *
     * @param $jobClass
     * @param int $runTime 执行时间
     * @param array $args
     */
    public function addTask($jobClass, $runTime, $args = null)
    {

        $key = $this->prefix.$this->queue;

        $params = [
            "class" => $jobClass,
            "args"  => $args,
            "runtime" => $runTime,
        ];

        RedisHandler::getInstance()->zAdd(
            $key,
            $runTime,
            serialize($params)
        );
    }

    /**
     * 执行job
     * @return bool
     */
    public function perform()
    {
        $key = $this->prefix.$this->queue;
        //取出有序集第一个元素
        $result = RedisHandler::getInstance()->zRange($key, 0 ,0);

        if (!$result) {
            return false;
        }

        $jobInfo = unserialize($result[0]);

        print_r("job: ".$jobInfo["class"]." will run at: ". date("Y-m-d H:i:s",$jobInfo["runtime"]).PHP_EOL);

        $jobClass = $jobInfo["class"];

        if(!@class_exists($jobClass)) {
            print_r($jobClass." undefined". PHP_EOL);
            RedisHandler::getInstance()->zRem($key, $result[0]);
            return false;
        }

        // 到时间执行
        if (time() >= $jobInfo["runtime"]) {
            $job = new $jobClass;
            $job->setPayload($jobInfo["args"]);
            $jobResult = $job->preform();
            if ($jobResult) {
                // 将任务移除
                RedisHandler::getInstance()->zRem($key, $result[0]);
                return true;
            }
        }

        return false;
    }

}

异步任务基类:

namespace appcommonlibdelayqueue;

class DelayJob
{

    protected $payload;

    public function preform ()
    {
        // todo
        return true;
    }


    public function setPayload($args = null)
    {
        $this->payload = $args;
    }

}

所有异步执行的任务都卸载job目录下,且要继承DelayJob,你可以实现任何你想延迟执行的任务

如:

namespace appcommonlibdelayqueuejob;

use appcommonlibdelayqueueDelayJob;

class Test extends DelayJob
{

    public function preform()
    {
        // payload 里应该有处理任务所需的参数,通过DelayQueue的addTask传入
        print_r("test job".PHP_EOL);
        return true;
    }

}

使用方法:

假设用户创建了一个订单,订单在10分钟后失效,那么在订单创建后加入:

DelayQueue::getInstance("close_order")->addTask(
     "appcommonlibdelayqueuejobCloseOrder", // 自己实现的job
     strtotime("2018-05-02 20:55:20"), // 订单失效时间
     ["order_id"=>123456] // 传递给job的参数
 );

close_order 是有序集的key

命令行启动进程

php think delay-queue close_order

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/30732.html

相关文章

  • Redis 实现队列

    摘要:场景说明用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求抢购场景,先入先出的模式命令或往列表右侧推入数据客户端阻塞直到队列有 场景说明: 用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时 高并发场景,当某个时刻请求瞬间增加时,可以把请...

    PascalXie 评论0 收藏0
  • Redis 实现队列

    摘要:场景说明用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求抢购场景,先入先出的模式命令或往列表右侧推入数据客户端阻塞直到队列有 场景说明: 用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时 高并发场景,当某个时刻请求瞬间增加时,可以把请...

    lifesimple 评论0 收藏0
  • Redis 实现队列

    摘要:场景说明用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求抢购场景,先入先出的模式命令或往列表右侧推入数据客户端阻塞直到队列有 场景说明: 用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时 高并发场景,当某个时刻请求瞬间增加时,可以把请...

    LoftySoul 评论0 收藏0

发表评论

0条评论

罗志环

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<