资讯专栏INFORMATION COLUMN

SpringBoot+RabbitMq实现延时消息队列

alighters / 1292人阅读

背景:
   在一些应用场景中,程序并不需要同步执行,例如用户注册之后的邮件或者短信通知提醒。这种场景的实现则是在当前线程,开启一个新线
 程,当前线程在开启新线程之后会继续往下执行,无需等待新线程执行完成。
   但例如一些需要延时的场景则不只是开启新线程执行如此简单了。譬如提交订单后在15分钟内没有完成支付,订单需要关闭,这种情
 况,是否只开启一个异步线程就不适用了呢。

那么就单单实现在提交订单后的15分钟内,如果没有完成支付,系统关闭订单。有哪些可行的方案呢。

方案:

使用定时任务轮询订单表,查询出订单创建了15分钟以上并且未支付的订单,如果有查询出此类订单则执行关闭。

缺点:假设每1分钟轮询一次,则会存在秒级误差,如果秒级轮询,则会极其消耗性能,影响程序的健壮性。

提交订单时开启一个新线程,而新线程直接休眠15分钟,休眠结束后开始执行订单关闭

缺点:如果在线程休眠时,重启了整个服务,那么会怎样呢?

使用延时消息队列

缺点:需要额外部署消息中间件

综上考虑:使用延时消息队列则成为最佳选择,消息延时发布之后,保存在消息中间件中,在15分钟后才会正式发布至队列,延时队列监听器在15分钟后监听到消息时,才开始执行,而这期间,即使项目重启也没有关系。

以springboot为基础框架,集成rabbitmq实现延时队列
注意:这里不采用网上流传的死信队列转发,而是采用rabbitmq3.7+版本的延时队列插件,所以务必安装3.7+版本并启用延时队列插件。
增加amqp依赖
      
             org.springframework.boot
             spring-boot-starter-parent
             1.5.4.RELEASE
     
     
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
    
修改application.yml文件,配置rabbitmq,并且开启消息的手动应答
spring:
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: admin
        password: admin
        listener:
            direct:
                acknowledge-mode: MANUAL
            simple:
                acknowledge-mode: MANUAL
配置队列,路由,交换机
package cn.rongyuan.config;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



/**
 * @title rabbitmq配置类
 * @author zengzp
 * @time 2018年8月20日 上午10:46:43
 * @Description 
 */
@Configuration
public class RabbitConfig {
    
    // 支付超时延时交换机
    public static final String Delay_Exchange_Name = "delay.exchange";

    // 超时订单关闭队列
    public static final String Timeout_Trade_Queue_Name = "close_trade";

    
    @Bean
    public Queue delayPayQueue() {
        return new Queue(RabbitConfig.Timeout_Trade_Queue_Name, true);
    }
    
    
    // 定义广播模式的延时交换机 无需绑定路由
    @Bean
    FanoutExchange delayExchange(){
        Map args = new HashMap();
        args.put("x-delayed-type", "direct");
        FanoutExchange topicExchange = new FanoutExchange(RabbitConfig.Delay_Exchange_Name, true, false, args);
        topicExchange.setDelayed(true);
        return topicExchange;
    }
    
    // 绑定延时队列与交换机
    @Bean  
    public Binding delayPayBind() {  
        return BindingBuilder.bind(delayPayQueue()).to(delayExchange());  
    }
    
    // 定义消息转换器
    @Bean
    Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    // 定义消息模板用于发布消息,并且设置其消息转换器
    @Bean
    RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
    @Bean
    RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

}
在提交订单时发布消息至延时队列并且指定延时时长
    @Autowired
    RabbitTemplate rabbitTemplate;
    // 通过广播模式发布延时消息 延时30分钟 持久化消息 消费后销毁 这里无需指定路由,会广播至每个绑定此交换机的队列
    rabbitTemplate.convertAndSend(RabbitConfig.Delay_Exchange_Name, "", trade.getTradeCode(), message ->{
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        message.getMessageProperties().setDelay(30 * (60*1000));   // 毫秒为单位,指定此消息的延时时长
        return message;
    });
消费端监听延时队列
package cn.rongyuan.mq.consumer;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

import cn.rongyuan.config.RabbitConfig;
import cn.rongyuan.service.TradeService;
import cn.rongyuan.util.ExceptionUtil;


/**
 * @title 消息消费端
 * @author zengzp
 * @time 2018年8月20日 上午11:00:26
 * @Description 
 */
@Component
public class PayTimeOutConsumer {
    
    @Autowired
    TradeService tradeService;
    
    private Logger logger = LoggerFactory.getLogger(getClass());
    
    @RabbitListener(queues = RabbitConfig.Timeout_Trade_Queue_Name)
    public void process(String tradeCode, Message message, Channel channel) throws IOException{
        try {
            logger.info("开始执行订单[{}]的支付超时订单关闭......", tradeCode);
            tradeService.cancelTrade(tradeCode);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            logger.info("超时订单处理完毕");
        } catch (Exception e) {
            logger.error("超时订单处理失败:{}", ExceptionUtil.getMessage(e));
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } 
    }

}
参考资料:
  1、spring amqp 官方文档:https://docs.spring.io/spring-amqp/docs/2.0.0.M2/reference/htmlsingle/#delayed-message-exchange
  2、rabbitmq 官方文档:http://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/76788.html

相关文章

  • 一起来学SpringBoot | 第十三篇:RabbitMQ延迟队列

    摘要:另一种就是用中的位于包下,本质是由和实现的阻塞优先级队列。表明了一条消息可在队列中存活的最大时间。当某条消息被设置了或者当某条消息进入了设置了的队列时,这条消息会在时间后死亡成为。 SpringBoot 是为了简化 Spring 应用的创建、运行、调试、部署等一系列问题而诞生的产物,自动装配的特性让我们可以更好的关注业务本身而不是外部的XML配置,我们只需遵循规范,引入相关的依赖就可...

    selfimpr 评论0 收藏0
  • SpringBoot RabbitMQ 整合使用

    摘要:可以在地址看到如何使用讲解下上面命令行表示控制台端口号,可以在浏览器中通过控制台来执行的相关操作。同时从控制台可以看到发送的速率多线程测试性能开了个线程,每个线程发送条消息。 showImg(http://ww2.sinaimg.cn/large/006tNc79ly1g5jjb62t88j30u00gwdi2.jpg); 前提 上次写了篇文章,《SpringBoot Kafka 整合...

    yuanxin 评论0 收藏0
  • springboot 集成rabbitmq 实例

    摘要:集成实例个人在学习时发现网上很少有系统性介绍和如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理此篇文章。 springboot 集成rabbitmq 实例 个人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理此篇文章。 本文章共分为以下部分: rabbitmq简介 sprin...

    springDevBird 评论0 收藏0
  • RabbitMQ发布订阅实战-实现延时重试队列

    摘要:本文将会讲解如何使用实现延时重试和失败消息队列,实现可靠的消息消费,消费失败后,自动延时将消息重新投递,当达到一定的重试次数后,将消息投递到失败消息队列,等待人工介入处理。 RabbitMQ是一款使用Erlang开发的开源消息队列。本文假设读者对RabbitMQ是什么已经有了基本的了解,如果你还不知道它是什么以及可以用来做什么,建议先从官网的 RabbitMQ Tutorials 入门...

    Heier 评论0 收藏0
  • RabbitMQ发布订阅实战-实现延时重试队列

    摘要:本文将会讲解如何使用实现延时重试和失败消息队列,实现可靠的消息消费,消费失败后,自动延时将消息重新投递,当达到一定的重试次数后,将消息投递到失败消息队列,等待人工介入处理。 RabbitMQ是一款使用Erlang开发的开源消息队列。本文假设读者对RabbitMQ是什么已经有了基本的了解,如果你还不知道它是什么以及可以用来做什么,建议先从官网的 RabbitMQ Tutorials 入门...

    vslam 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<