摘要:基于有序集实现延迟任务执行,比如某个时间给某个用户发短信,订单过期处理,等等我是在框架上写的,实现起来很简单对于一些不是很复杂的应用足够了,目前在公司项目中使用,后台进程并没有实现多进程,不多说,贴代码,不回排版,见谅命令行脚本执行方法这
基于redis有序集实现延迟任务执行,比如某个时间给某个用户发短信,订单过期处理,等等
我是在tp5框架上写的,实现起来很简单,对于一些不是很复杂的应用足够了,目前在公司项目中使用,后台进程并没有实现多进程,
不多说,贴代码,不回排版,见谅
1、命令行脚本 执行方法:php think delay-queue queuename(这是有序集的key)
namespace appcommand;
use appcommonlibdelayqueueDelayQueue;
use thinkconsoleCommand;
use thinkconsoleInput;
use thinkconsoleOutput;
use thinkDb;
class DelayQueueWorker extends Command
{
const COMMAND_ARGV_1 = "queue";
protected function configure()
{
$this->setName("delay-queue")->setDescription("延迟队列任务进程");
$this->addArgument(self::COMMAND_ARGV_1);
}
protected function execute(Input $input, Output $output)
{
$queue = $input->getArgument(self::COMMAND_ARGV_1);
//参数1 延迟队列表名,对应与redis的有序集key名
while (true) {
DelayQueue::getInstance($queue)->perform();
usleep(300000);
}
}
}
库类目录结构
config.php 里是redis连接参数配置
RedisHandler.php只实现有序集的操作,重连机制还没有实现
namespace appcommonlibdelayqueue;
class RedisHandler
{
public $provider;
private static $_instance = null;
private function __construct() {
$this->provider = new Redis();
//host port
$config = require_once "config.php";
$this->provider->connect($config["redis_host"], $config["redis_port"]);
}
final private function __clone() {}
public static function getInstance() {
if(!self::$_instance) {
self::$_instance = new RedisHandler();
}
return self::$_instance;
}
/**
* @param string $key 有序集key
* @param number $score 排序值
* @param string $value 格式化的数据
* @return int
*/
public function zAdd($key, $score, $value)
{
return $this->provider->zAdd($key, $score, $value);
}
/**
* 获取有序集数据
* @param $key
* @param $start
* @param $end
* @param null $withscores
* @return array
*/
public function zRange($key, $start, $end, $withscores = null)
{
return $this->provider->zRange($key, $start, $end, $withscores);
}
/**
* 删除有序集数据
* @param $key
* @param $member
* @return int
*/
public function zRem($key,$member)
{
return $this->provider->zRem($key,$member);
}
}
延迟队列类
namespace appcommonlibdelayqueue;
class DelayQueue
{
private $prefix = "delay_queue:";
private $queue;
private static $_instance = null;
private function __construct($queue) {
$this->queue = $queue;
}
final private function __clone() {}
public static function getInstance($queue = "") {
if(!self::$_instance) {
self::$_instance = new DelayQueue($queue);
}
return self::$_instance;
}
/**
* 添加任务信息到队列
*
* demo DelayQueue::getInstance("test")->addTask(
* "appcommonlibdelayqueuejobTest",
* strtotime("2018-05-02 20:55:20"),
* ["abc"=>111]
* );
*
* @param $jobClass
* @param int $runTime 执行时间
* @param array $args
*/
public function addTask($jobClass, $runTime, $args = null)
{
$key = $this->prefix.$this->queue;
$params = [
"class" => $jobClass,
"args" => $args,
"runtime" => $runTime,
];
RedisHandler::getInstance()->zAdd(
$key,
$runTime,
serialize($params)
);
}
/**
* 执行job
* @return bool
*/
public function perform()
{
$key = $this->prefix.$this->queue;
//取出有序集第一个元素
$result = RedisHandler::getInstance()->zRange($key, 0 ,0);
if (!$result) {
return false;
}
$jobInfo = unserialize($result[0]);
print_r("job: ".$jobInfo["class"]." will run at: ". date("Y-m-d H:i:s",$jobInfo["runtime"]).PHP_EOL);
$jobClass = $jobInfo["class"];
if(!@class_exists($jobClass)) {
print_r($jobClass." undefined". PHP_EOL);
RedisHandler::getInstance()->zRem($key, $result[0]);
return false;
}
// 到时间执行
if (time() >= $jobInfo["runtime"]) {
$job = new $jobClass;
$job->setPayload($jobInfo["args"]);
$jobResult = $job->preform();
if ($jobResult) {
// 将任务移除
RedisHandler::getInstance()->zRem($key, $result[0]);
return true;
}
}
return false;
}
}
异步任务基类:
namespace appcommonlibdelayqueue;
class DelayJob
{
protected $payload;
public function preform ()
{
// todo
return true;
}
public function setPayload($args = null)
{
$this->payload = $args;
}
}
所有异步执行的任务都卸载job目录下,且要继承DelayJob,你可以实现任何你想延迟执行的任务
如:
namespace appcommonlibdelayqueuejob;
use appcommonlibdelayqueueDelayJob;
class Test extends DelayJob
{
public function preform()
{
// payload 里应该有处理任务所需的参数,通过DelayQueue的addTask传入
print_r("test job".PHP_EOL);
return true;
}
}
使用方法:
假设用户创建了一个订单,订单在10分钟后失效,那么在订单创建后加入:
DelayQueue::getInstance("close_order")->addTask(
"appcommonlibdelayqueuejobCloseOrder", // 自己实现的job
strtotime("2018-05-02 20:55:20"), // 订单失效时间
["order_id"=>123456] // 传递给job的参数
);
close_order 是有序集的key
命令行启动进程
php think delay-queue close_order
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/30732.html
摘要:场景说明用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求抢购场景,先入先出的模式命令或往列表右侧推入数据客户端阻塞直到队列有 场景说明: 用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时 高并发场景,当某个时刻请求瞬间增加时,可以把请...
摘要:场景说明用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求抢购场景,先入先出的模式命令或往列表右侧推入数据客户端阻塞直到队列有 场景说明: 用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时 高并发场景,当某个时刻请求瞬间增加时,可以把请...
摘要:场景说明用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求抢购场景,先入先出的模式命令或往列表右侧推入数据客户端阻塞直到队列有 场景说明: 用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时 高并发场景,当某个时刻请求瞬间增加时,可以把请...
阅读 2561·2021-09-22 15:25
阅读 3800·2019-08-30 12:48
阅读 2464·2019-08-30 11:25
阅读 2543·2019-08-30 11:05
阅读 938·2019-08-29 17:28
阅读 3444·2019-08-26 12:16
阅读 2832·2019-08-26 11:31
阅读 2049·2019-08-23 17:08