资讯专栏INFORMATION COLUMN

rabbitmq延迟消息示例

RyanQ / 2506人阅读

摘要:官方插件仅支持版本中支持。使用过程声明消息交换机实现实现消息发送实现实现

官方插件仅支持>=3.6.x 版本中支持。

本文描述的消息延迟机制采用官方推荐的插件rabbitmq-delayed-message-exchange,如精通rabbitmq和编程,请自行查看官方文档,描述更加详尽:

github

Rabbitmq插件列表

安装
需要在集群每台机器中安装

由于rabbitmq并未内置该插件,需要手动下载安装。关于已安装的插件通过rabbitmq-plugins list可查看.

3.6.x下载地址

3.7.x下载地址

加载后解压,并将其拷贝至(使用Linux Debian/RPM部署)rabbitmq服务器目录:/usr/lib/rabbitmq/plugins中(windows和其他系统<安装目录> abbitmq_server-versionplugins).

启用插件
需要在集群每台机器中执行

通过rabbitmq-plugins list查看已安装列表,如下:

...
[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x
...

使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang启用插件,输出如下:

The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

通过rabbitmq-plugins list查看已安装列表,如下:

...
[E*] rabbitmq_delayed_message_exchange 20171215-3.6.x
...
机制

安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。

Java使用过程 声明x-delayed-message消息交换机

rabbitmq java client实现

// ... elided code ...
Map args = new HashMap();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...

spring rabbitmq template实现

// ... elided code ...
Map args = new HashMap<>();
args.put("x-delayed-type", "direct");
Exchange exchange = new CustomExchange("test.exchange", "x-delayed-message", true, false, args);
//admin = RabbitmqAdmin
admin.declareExchange(exchange);

//more code...
消息发送

rabbitmq java client实现

// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map headers = new HashMap();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("test.exchange", "test", props.build(), messageBodyBytes);
// ... more code ...

spring rabbitmq template实现

MessageProperties properties = new MessageProperties();
properties.setHeader("x-delay", 1000);
//template : RabbitmqTemplate
template.convertAndSend("test.exchange", "test", new Message(body, properties));

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68500.html

相关文章

  • Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ

    摘要:然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如编写博文时候,设置小时之后发送。在消息监听类中,对通道定义了,这里会对延迟消息做具体的逻辑。由于消息的消费是延迟的,从而变相实现了从消息发送那一刻起开始的定时任务。 应用场景 我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者...

    honhon 评论0 收藏0
  • RabbitMQ延迟消息延迟极限是多少?

    摘要:问题定位因为不是所有的消息都出现了没有延迟消息效果的因素,通过有问题的消息特征,大致猜测可能是延迟时间过长导致了消息延迟失败。所以,我们在使用的延迟消息功能时候,必须注意它的延迟极限是毫秒。 之前在写Spring Cloud Stream专题内容的时候,特地介绍了一下如何使用RabbitMQ的延迟消息来实现定时任务。最近正好因为开发碰到了使用过程中发现,延迟消息没有效果,消息直接就被消...

    stormzhang 评论0 收藏0
  • 一起来学SpringBoot | 第十三篇:RabbitMQ延迟队列

    摘要:另一种就是用中的位于包下,本质是由和实现的阻塞优先级队列。表明了一条消息可在队列中存活的最大时间。当某条消息被设置了或者当某条消息进入了设置了的队列时,这条消息会在时间后死亡成为。 SpringBoot 是为了简化 Spring 应用的创建、运行、调试、部署等一系列问题而诞生的产物,自动装配的特性让我们可以更好的关注业务本身而不是外部的XML配置,我们只需遵循规范,引入相关的依赖就可...

    selfimpr 评论0 收藏0
  • 深入消息中间件选型分析

    摘要:是阿里开源的消息中间件,目前已经捐献个基金会,它是由语言开发的,具备高吞吐量高可用性适合大规模分布式系统应用等特点,经历过双的洗礼,实力不容小觑。 前言 消息队列中间件(简称消息中间件)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数...

    shevy 评论0 收藏0
  • Rabbitmq基础组件架构设计

    摘要:基础组件架构设计基础组件封装设计迅速消息发送支持迅速消息发送模式,在一些日志收集统计分析等需求下可以保证高性能,高吞吐量。基础组件封装设计事务消息发送支持事务消息,且保障可靠性投递,在金融行业单笔大金额操作时会有此类需求。 Rabbitmq基础组件架构设计 基础组件封装设计 - 迅速消息发送支持迅速消息发送模式,在一些日志收集、统计分析等需求下可以保证高性能,高吞吐量。 基础组件封...

    Steve_Wang_ 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<