资讯专栏INFORMATION COLUMN

springboot 集成rabbitmq 实例

springDevBird / 1317人阅读

摘要:集成实例个人在学习时发现网上很少有系统性介绍和如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理此篇文章。

springboot 集成rabbitmq 实例

个人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理此篇文章。

本文章共分为以下部分:

rabbitmq简介

springboot配置

rabbitmq生产者配置

rabbitmq消费者配置

问题补充

一、rabbitmq简介

目前流程的消息队列主要有:ActivityMQ/kafka/redis/rabbitmq等,各有各自的应用场景,关于各个框架的介绍,大家可自行百度,网上很多文章介绍~其中rabbit因为其ack特性以及还算不错的性能被大多数公司采用。

概念:

生产者 消息的产生方,负责将消息推送到消息队列

消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息

队列 消息的寄存器,负责存放生产者发送的消息

交换机 负责根据一定规则分发生产者产生的消息

绑定 完成交换机和队列之间的绑定

模式:

direct
直连模式,用于实例间的任务分发

topic
话题模式,通过可配置的规则分发给绑定在该exchange上的队列

headers
适用规则复杂的分发,用headers里的参数表达规则

fanout
分发给所有绑定到该exchange上的队列,忽略routing key

安装

单机版安装很简单,大概步骤如下:

# 安装erlang包
    yum install erlang
# 安装socat
    yum install socat
# 安装rabbit    
    rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm 
# 启动服务
  rabbitmq-server start
# 增加管理控制功能
  rabbitmq-plugins enable rabbitmq_management
# 增加用户:
    sudo rabbitmqctl add_user root password
    rabbitmqctl set_user_tags root administrator 
    rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

集群安装,可参考以下博客:
     
rabbitmq集群安装

二、springboot配置

废话少说直接上代码:
配置参数
application.yml:

spring:
   rabbitmq:
    addresses: 192.168.1.1:5672
    username: username
    password: password
    publisher-confirms: true
    virtual-host: /

java config读取参数

/**
 * RabbitMq配置文件读取类
 *
 * @author chenhf
 * @create 2017-10-23 上午9:31
 **/
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {

    @Value("${spring.rabbitmq.addresses}")
    private String addresses;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.publisher-confirms}")
    private Boolean publisherConfirms;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    // 构建mq实例工厂
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setPublisherConfirms(publisherConfirms);
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }
}
三、rabbitmq生产者配置

主要配置了直连和话题模式,其中话题模式设置两个队列(queueTopicTest1、queueTopicTest2),此两个队列在和交换机绑定时分别设置不同的routingkey(.TEST.以及lazy.#)来验证匹配模式。

/**
 * 用于配置交换机和队列对应关系
 * 新增消息队列应该按照如下步骤
 * 1、增加queue bean,参见queueXXXX方法
 * 2、增加queue和exchange的binding
 * @author chenhf
 * @create 2017-10-23 上午10:33
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class RabbitMqExchangeConfig {
    /** logger */
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);

    /**
     * @Author:chenhf
     * @Description: 主题型交换机
     * @Date:下午5:49 2017/10/23
     * @param
     * @return
     */
    @Bean
    TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){
        TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());
        rabbitAdmin.declareExchange(contractTopicExchange);
        logger.debug("完成主题型交换机bean实例化");
        return contractTopicExchange;
    }
    /**
     * 直连型交换机
     */
    @Bean
    DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {
        DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());
        rabbitAdmin.declareExchange(contractDirectExchange);
        logger.debug("完成直连型交换机bean实例化");
        return contractDirectExchange;
    }

    //在此可以定义队列

    @Bean
    Queue queueTest(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("测试队列实例化完成");
        return queue;
    }

    //topic 1
    @Bean
    Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("话题测试队列1实例化完成");
        return queue;
    }
    //topic 2
    @Bean
    Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("话题测试队列2实例化完成");
        return queue;
    }


    //在此处完成队列和交换机绑定
    @Bean
    Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("测试队列与直连型交换机绑定完成");
        return binding;
    }
    //topic binding1
    @Bean
    Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("测试队列与话题交换机1绑定完成");
        return binding;
    }

    //topic binding2
    @Bean
    Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("测试队列与话题交换机2绑定完成");
        return binding;
    }

}

在这里用到枚举类:RabbitMqEnum

/**
 * 定义rabbitMq需要的常量
 *
 * @author chenhf
 * @create 2017-10-23 下午4:07
 **/
public class RabbitMqEnum {

    /**
     * @param
     * @Author:chenhf
     * @Description:定义数据交换方式
     * @Date:下午4:08 2017/10/23
     * @return
     */
    public enum Exchange {
        CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分发"),
        CONTRACT_TOPIC("CONTRACT_TOPIC", "消息订阅"),
        CONTRACT_DIRECT("CONTRACT_DIRECT", "点对点");

        private String code;
        private String name;

        Exchange(String code, String name) {
            this.code = code;
            this.name = name;
        }

        public String getCode() {
            return code;
        }

        public String getName() {
            return name;
        }
    }

    /**
     * describe: 定义队列名称
     * creat_user: chenhf
     * creat_date: 2017/10/31
     **/
    public enum QueueName {
        TESTQUEUE("TESTQUEUE", "测试队列"),
        TOPICTEST1("TOPICTEST1", "topic测试队列"),
        TOPICTEST2("TOPICTEST2", "topic测试队列");

        private String code;
        private String name;

        QueueName(String code, String name) {
            this.code = code;
            this.name = name;
        }

        public String getCode() {
            return code;
        }

        public String getName() {
            return name;
        }

    }

    /**
     * describe: 定义routing_key
     * creat_user: chenhf
     * creat_date: 2017/10/31
     **/
    public enum QueueEnum {
        TESTQUEUE("TESTQUEUE1", "测试队列key"),
        TESTTOPICQUEUE1("*.TEST.*", "topic测试队列key"),
        TESTTOPICQUEUE2("lazy.#", "topic测试队列key");


        private String code;
        private String name;

        QueueEnum(String code, String name) {
            this.code = code;
            this.name = name;
        }

        public String getCode() {
            return code;
        }

        public String getName() {
            return name;
        }
    }

}

以上完成消息生产者的定义,下面封装调用接口
测试时直接调用此工具类,testUser类需自己实现

rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);
/**
 * rabbitmq发送消息工具类
 *
 * @author chenhf
 * @create 2017-10-26 上午11:10
 **/

@Component
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{
    /** logger */
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMqSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        logger.info("confirm: " + correlationData.getId());
    }

    /**
     * 发送到 指定routekey的指定queue
     * @param routeKey
     * @param obj
     */
    public void sendRabbitmqDirect(String routeKey,Object obj) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        logger.info("send: " + correlationData.getId());
        this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);
    }

    /**
     * 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
     * @param routeKey
     * @param obj
     */
    public void sendRabbitmqTopic(String routeKey,Object obj) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        logger.info("send: " + correlationData.getId());
        this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);
    }
}
四、rabbitmq消费者配置

springboot注解方式监听队列,无法手动指定回调,所以采用了实现ChannelAwareMessageListener接口,重写onMessage来进行手动回调,详见以下代码,详细介绍可以在spring的官网上找amqp相关章节阅读

直连消费者
通过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费

/**
 * 消费者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class ExampleAmqpConfiguration {
    @Bean("testQueueContainer")
    public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("TESTQUEUE");
        container.setMessageListener(exampleListener());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }


    @Bean("testQueueListener")
    public ChannelAwareMessageListener exampleListener() {
        return new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
                //通过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费
                if ("2".equals(testUser.getUserName())){
                    System.out.println(testUser.toString());
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                }

                if ("1".equals(testUser.getUserName())){
                    System.out.println(testUser.toString());
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                }

            }
        };
    }

}

topic消费者1

/**
 * 消费者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration {
    @Bean("topicTest1Container")
    public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("TOPICTEST1");
        container.setMessageListener(exampleListener1());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }


    @Bean("topicTest1Listener")
    public ChannelAwareMessageListener exampleListener1(){
        return new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
                System.out.println("TOPICTEST1:"+testUser.toString());
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            }
        };
    }




}

topic消费者2

/**
 * 消费者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration2 {
    @Bean("topicTest2Container")
    public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("TOPICTEST2");
        container.setMessageListener(exampleListener());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }


    @Bean("topicTest2Listener")
    public ChannelAwareMessageListener exampleListener() {
        return new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
                System.out.println("TOPICTEST2:"+testUser.toString());
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            }
        };
    }

}
问题补充

使用过程中可能出现的坑参考此篇文章
https://segmentfault.com/a/11...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/67921.html

相关文章

  • Java 类文章 - 收藏集 - 掘金

    摘要:而调用后端服务就应用了的高级特分布式配置管理平台后端掘金轻量的分布式配置管理平台。关于网络深度解读后端掘金什么是网络呢总的来说,网络中的容器们可以相互通信,网络外的又访问不了这些容器。 在 Java 路上,我看过的一些书、源码和框架(持续更新) - 后端 - 掘金简书 占小狼转载请注明原创出处,谢谢!如果读完觉得有收获的话,欢迎点赞加关注 物有本末,事有终始,知所先后,则近道矣 ......

    RayKr 评论0 收藏0
  • SpringBoot集成RabbitMQ(死信队列)

    摘要:介绍死信队列没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因有消息被拒绝并且队列达到最大长度消息过期场景小时进入初始队列,等待分钟后进入分钟队列消息等待分钟后进入执行队列执行失败后重新回到分钟队列失败次后,消息进入小时队列消 介绍 死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因:1.有消息被拒绝(basic.reject/ basic.nac...

    honhon 评论0 收藏0
  • spring boot - 收藏集 - 掘金

    摘要:引入了新的环境和概要信息,是一种更揭秘与实战六消息队列篇掘金本文,讲解如何集成,实现消息队列。博客地址揭秘与实战二数据缓存篇掘金本文,讲解如何集成,实现缓存。 Spring Boot 揭秘与实战(九) 应用监控篇 - HTTP 健康监控 - 掘金Health 信息是从 ApplicationContext 中所有的 HealthIndicator 的 Bean 中收集的, Spring...

    rollback 评论0 收藏0
  • SpringBoot+RabbitMq实现延时消息队列

    背景: 在一些应用场景中,程序并不需要同步执行,例如用户注册之后的邮件或者短信通知提醒。这种场景的实现则是在当前线程,开启一个新线 程,当前线程在开启新线程之后会继续往下执行,无需等待新线程执行完成。 但例如一些需要延时的场景则不只是开启新线程执行如此简单了。譬如提交订单后在15分钟内没有完成支付,订单需要关闭,这种情 况,是否只开启一个异步线程就不适用了呢。 那么就单单实现...

    alighters 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<