资讯专栏INFORMATION COLUMN

Laravel 基于redis队列的解析

banana_pi / 3083人阅读

摘要:年月日参考链接使用不得不明白的知识队列文档中文文档本文环境队列为什么使用队列使用队列的目的一般是异步执行出错重试解释一下异步执行部分代码执行很耗时为了提高响应速度及避免占用过多连接资源可以将这部分代码放到队列中异步执行网站新用户注册后需要

Last-Modified: 2019年5月10日15:04:22

参考链接

使用 Laravel Queue 不得不明白的知识

Laravel 队列文档

Redis 中文文档

本文环境

Laravel 5.5

队列 Redis

为什么使用队列

使用队列的目的一般是:

异步执行

出错重试

解释一下:

异步执行: 部分代码执行很耗时, 为了提高响应速度及避免占用过多连接资源, 可以将这部分代码放到队列中异步执行.

Eg. 网站新用户注册后, 需要发送欢迎的邮件, 涉及到网络IO无法控制耗时的这一类就很适合放到队列中来执行.

出错重试: 为了保证一些任务的正常执行, 可以将任务放到队列中执行, 若执行出错则可以延迟一段时间后重试, 直到任务处理成功或出错超过N次后取消执行.

Eg. 用户需要绑定手机号, 此时发送短信的接口是依赖第三方, 一个是不确定耗时, 一个是不确定调用的成功, 为了保证调用成功, 必然需要在出错后重试
Laravel 中的队列

以下分析默认使用的队列及其配置如下

默认队列引擎: redis

通过在 redis-cli 中使用 monitor 命令查看具体执行的命令语句

默认队列名: default

分发任务

此处以分发 异步通知(class XxxNotification implement ShouldQueue)为例.

在Laravel中发起异步通知时, Laravel 会往redis中的任务队列添加一条新任务

redis 执行语句

redis> RPUSH queues:default

{
    "displayName": "AppListenersRebateEventListener",
    "job": "IlluminateQueueCallQueuedHandler@call",
    "maxTries": null,
    "timeout": null,
    "timeoutAt": null,
    "data": {
        "commandName": "IlluminateEventsCallQueuedListener",
        "command": "O:36:"IlluminateEventsCallQueuedListener":7:{s:5:"class";s:33:"AppListenersRebateEventListener";s:6:"method";s:15:"onRebateCreated";s:4:"data";a:1:{i:0;O:29:"AppEventsRebateCreatedEvent":4:{s:11:"u0000*u0000tbkOrder";O:45:"IlluminateContractsDatabaseModelIdentifier":3:{s:5:"class";s:19:"AppModelsTbkOrder";s:2:"id";i:416;s:10:"connection";s:5:"mysql";}s:15:"u0000*u0000notifyAdmins";b:1;s:13:"u0000*u0000manualBind";b:0;s:6:"socket";N;}}s:5:"tries";N;s:9:"timeoutAt";N;s:7:"timeout";N;s:6:"u0000*u0000job";N;}"
    },
    "id": "iTqpbeDqqFb3VoED2WP3pgmDbLAUQcMB",
    "attempts": 0
}

上面的redis语句是将任务信息(json格式) rpush 到 redis 队列 queues:default 的尾部.

任务队列 Worker

Laravel 处理任务队列的进程开启方式: php artisan queue:work, 为了更好的观察, 这里使用 --once 选项来指定队列中的单一任务进行处理, 具体的更多参数请自行参考文档

php artisan queue:work --once --delay=1 --tries=3

上述执行语句参数含义:

--once 仅执行一次任务, 默认是常驻进程一直执行

--tries=3 任务出错最多重试3次, 默认是无限制重试

--delay=1 任务出错后, 每次延迟1秒后再次执行, 默认是延迟0秒

当 Worker 启动时, 它依次执行如下步骤:

此处仍以默认队列 default 为例讲解, 且只讲解redis的相关操作

queues:default:delayed 有序集合中获取可以处理的 "延迟任务", 并 rpushqueue:default队列的尾部

具体的执行语句:

redis> eval "Lua脚本" 2 queues:default:delayed queues:default 当前时间戳

Lua 脚本内容如下:

-- Get all of the jobs with an expired "score"...
local val = redis.call("zrangebyscore", KEYS[1], "-inf", ARGV[1])

-- If we have values in the array, we will remove them from the first queue
-- and add them onto thedestination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
    redis.call("zremrangebyrank", KEYS[1], 0, #val - 1)

    for i = 1, #val, 100 do
        redis.call("rpush", KEYS[2], unpack(val, i, math.min(i+99, #val)))
    end
end

return val 

queue:default:reserved有序集合中获取已过期的 "reserved 任务", 并 rpushqueue:default队列的尾部

具体的执行语句:

redis> eval "Lua脚本" 2 queues:default:reserved queues:default 当前时间戳

使用的Lua脚本同步骤 1

queue:default 队列中获取(lpop)一个任务, 增加其 attempts 次数, 并将该任务保存到 queu:default:reserved 有序集合中, 该任务的 score 值为 当前时间 + 90(任务执行超时时间)

具体的执行语句:

redis> eval “Lua脚本” 2 queues:default queues:default:reserved 任务超时时间戳

Lua脚本

-- Pop the first job off of the queue...
local job = redis.call("lpop", KEYS[1])
local reserved = false

if(job ~= false) then
    -- Increment the attempt count and place job on the reserved queue...
    reserved = cjson.decode(job)
    reserved["attempts"] = reserved["attempts"] + 1
    reserved = cjson.encode(reserved)
    redis.call("zadd", KEYS[2], ARGV[1], reserved)
end

return {job, reserved}
这里的 90 是根据配置而定:  config("queue.connections.redis.retry_after")

若预计任务耗时过久, 则应增加该数值, 防止任务还在执行时就被重置

在成功执行上面获取的任务后, 就将该任务从 queues:default:reserved 队列中移除掉

具体执行语句: ZREM queues:default:reserved "具体任务"

如果执行任务失败, 此时分为2种情况:

任务失败次数未达到指定的重试次数阀值

将该任务从 queues:default:reserved 中移除, 并将该任务添加到 queue:default:delayed 有序集合中, score 为该任务下一次执行的时间戳

执行语句:

redis> EVAL "Lua脚本" 2 queues:default:delayed queues:default:reserved "失败的任务" 任务延迟执行的时间戳

Lua脚本

-- Remove the job from the current queue...
redis.call("zrem", KEYS[2], ARGV[1])

-- Add the job onto the "delayed" queue...
redis.call("zadd", KEYS[1], ARGV[2], ARGV[1])

return true

如果任务失败次数超过指定的重试阀值

将该任务从 queue:default:reserved 中移除

执行语句:

redis> ZREM queue:default:reserved

注意, 上述使用 Lua 脚本的目的在于操作的原子性, Redis 是单进程单线程模式, 以Lua脚本形式执行命令时可以确保执行脚本的原子性, 而不会有并发问题.

关于Redis的原子操作

上面 Laravel 使用redis作为队列存储引擎时, 在操作redis时使用到了 exec 执行Lua脚本, 以确保原子性.

这里给不熟悉redis的同学简单讲一下.

以上面 Worker 启动时的步骤1为例:

queues:default:delayed 有序集合中获取可以处理的 "延迟任务", 并 rpushqueue:default队列的尾部

具体的执行语句:

redis> eval "Lua脚本" 2 queues:default:delayed queues:default 当前时间戳

Lua 脚本内容如下:

-- Get all of the jobs with an expired "score"...
local val = redis.call("zrangebyscore", KEYS[1], "-inf", ARGV[1])

-- If we have values in the array, we will remove them from the first queue
-- and add them onto thedestination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
    redis.call("zremrangebyrank", KEYS[1], 0, #val - 1)

    for i = 1, #val, 100 do
        redis.call("rpush", KEYS[2], unpack(val, i, math.min(i+99, #val)))
    end
end

return val 

上述步骤首先从 queues:default:delayed 有序集合中获取可以处理的 "延迟任务" 并 rpushqueue:default队列的尾部.

那么如果不使用Lua脚本的话, 一般做法会是如下:

$jobs = $redis->zRangeByScore("queues:default:delayed", "-inf", time())
if (!empty($jobs)) {
    $redis->zRem("queues:default:delayed", ...$jobs);
    $redis->rPush("queues:default", ...$jobs);   
}

如果是单个Worker的话, 上述脚本不会有问题, 但是如果有多个Worker呢? 在php层面上执行上述操作是会有并发问题的.

Worker_1 和 Worker_2 从 queues:default:delayed 队列中获取多个任务后, 执行 rPush 语句会导致任务被执行2次, 如果有多个 Worker 甚至会执行更多次.

只要是有可能引起并发问题的情况, 那么就一定会发生.
以 分布式锁 为例

锁的两大基本操作:

Lock

Unlock

Lock 操作

// 生成唯一的锁id
$identifier = uniqid(php_uname("n") . "_", true);
// 仅在该key不存在时设置, 过期时间5秒
$result = $redis->set("lock_key", $identifier, ["NX", "EX" => 5]);

Unlock 操作

$script = <<evaluate($script, ["lock_key", $identifier], 1);

至于 Unlock 操作为什么要这么麻烦, 可以看一下以下两种有问题的方案, 再想一想.

有问题的方案一

$redis->del("lock_key");

有问题的方案二

if ($redis->get("lock_key") == $identifier) {
    $redis->del("lock_key");
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/31416.html

相关文章

  • 分析Laravel队列实现原理解决问题记录

    摘要:在使用中的队列时,产生冲突干扰。文件中的配置部分至此,两个项目的队列冲突原因就找到了。队列监听最后遇到问题,莫要病急乱投医。从代码入手,分析理解实现原理,找对点,解决方法也许很简单,。 问题 公司项目使用Laravel的开发的两个项目在同一个测试服务器部署,公用同一个redis。在使用laravel中的队列时,产生冲突干扰。 查找问题原因 在laravel 队列的操作类 Illumin...

    Corwien 评论0 收藏0
  • 基于 Redis驱动 Laravel 事件广播

    摘要:一前言之前在项目中需要使用的事件广播,而且项目打算使用作为驱动,但发现网上的资料大部分都是驱动的,只能自己摸索着搭建了一下服务。 一、前言 之前在项目中需要使用laravel的事件广播,而且项目打算使用redis作为驱动,但发现网上的资料大部分都是Pusher驱动的,只能自己摸索着搭建了一下服务。现在将这个过程记录一下,希望能帮到其他人。 二、项目的环境 事件广播需要用到redis,n...

    fantix 评论0 收藏0
  • laravel 队列

    摘要:如果任务没有在规定时间内完成,那么该有序集合的任务将会被重新放入队列中。这两个进程操纵了三个队列,其中一个,负责即时任务,两个,负责延时任务与待处理任务。如果任务执行成功,就会删除中的任务,否则会被重新放入队列中。 在实际的项目开发中,我们经常会遇到需要轻量级队列的情形,例如发短信、发邮件等,这些任务不足以使用 kafka、RabbitMQ 等重量级的消息队列,但是又的确需要异步、重试...

    BDEEFE 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<