资讯专栏INFORMATION COLUMN

RabbitMQ基础

Airmusic / 451人阅读

摘要:一简介是一个有开发的的开源实现的官网是一款消息组件,其中一定包含生产者,消费者,消息组件。

一. RabbitMQ简介

1 . RabbitMQ是一个有Erlang开发的AMQP(Advanced Message Queue)的开源实现

2 . RabbitMQ的官网:http://www.rabbitmq.com

3 . RabbitMQ是一款消息组件,其中一定包含生产者,消费者,消息组件。RabbitMQ中有三个重要组成部分

a . Exchange:交换空间

b . Queue:数据队列

c . RoutingKey:队列路由(如果所有的队列的RoutingKey都一样,则属于广播小,如果不一样,则属于点对点消息)

4 . RabbitMQ中的几个核心概念

a . Broker:消息队列的服务主机

b . Exchange:消息交换机,用于分发消息到队列

c . Queue:消息队列的载体,每个消息都会被投入到一个或多个队列

e . Binding:将Exchange与Queue按照RoutingKey规则进行绑定

f . RoutingKey:路由Key,Exchange根据RoutingKey进行消息分发

g . Vhost:虚拟主机,一个Broker可以有多个Vhost,用于实现用户(权限)的分离

h . Producer:消息生产者

i . Consumer:消息消费者

j . Channel:消息通道,每个Channel代表一个会话任务

二. 环境搭建

1 . 安装Erlang开发环境

a . 在这里安装Erlang时遇到的坑较多,个人不推荐下载erlang源码进行解压缩编译安装,因为依赖的库较多(gcc,libncurses5-dev,.eg):


建立erlang目录mkdir -p /usr/local/erlang

进入源码目录 cd /user/local/src/otp_src_19.3

编译配置 ./configure --prefix=/usr/local/erlang

编译安装 make && make install

配置环境变量

vim /etc/profile
export ERLANG_HOME=/usr/local/erlang
export PATH=$PATH:$ERLANG_HOME/bin:
source /etc/profile

b . 本人使用apt-get安装erlang语言环境

apt-get install erlang 或者apt-get install erlang-nox

c . 测试erlang

输入erl 表示进入erlang环境

输入halt().退出

2 . 安装RabbitMQ

a . 根据官网介绍进行安装

相关命令

echo "deb http://www.rabbitmq.com/debian/ testing main" |
     sudo tee /etc/apt/sources.list.d/rabbitmq.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc |
     sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server

b . 后台启动RabbitMQrabbitmq-server start > /dev/null 2>&1 &

c . 开启管理页面插件rabbitmq-plugins enable rabbitmq_management

d . 添加新用户rabbitmqctl add_user evans 123123(创建一个用户名为evans,密码为123123的用户)

e . 将新用户设为管理员rabbitmqctl set_user_tags evans administrator

f . 打开浏览器输入访问地址http://192.168.1.1:15672访问RabbitMQ管理页面

g . 查看RabbitMQ状态rabbitmqctl status,关闭RabbitMQrabbitmqctl stop

h . 设置用户虚拟主机,否则程序无法连接Queue

二. Java基本操作

1 . 在管理界面中增加一个新的Queue

a . Name:队列名称

b . Durability:持久化选项:Durable(持久化保存),Transient(即时保存),持久化保存在RabbitMQ宕机或者重启后,未消费的消息仍然存在,即时保存在RabbitMQ宕机或者重启后不存在

c . Auto delete:自动删除

2 . 引入RabbitMQ的Repository


    com.rabbitmq
    amqp-client
    4.1.0

3 . 消息生产者MessageProducer.java

package com.evans.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by Evans 
 */
public class MessageProducer {
    //队列名称
    private static final String QUEUE_NAME = "first";
    //主机IP
    private static final String HOST="127.0.0.1";
    //端口
    private static final Integer PORT=5672;
    //用户名
    private static final String USERNAME="evans";
    //密码
    private static final String PASSWORD="evans";

    public static void main(String[] args) throws Exception {
        //创建工厂类
        ConnectionFactory factory = new ConnectionFactory();
        //设置参数
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        //创建连接
        Connection connection =factory.newConnection();
        //创建Channel
        Channel channel=connection.createChannel();
        //声明Queue
        /*
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments)
         * 队列名称,是否持久保存,是否为专用的队列,是否允许自动删除,配置参数
         * 此处的配置与RabbitMQ管理界面的配置一致
         */
        channel.queueDeclare(QUEUE_NAME,true,false,true,null);
        Long start = System.currentTimeMillis();
        for (int i=0;i<100;i++){
            //发布消息
            /*
             * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
             * exchange名称,RoutingKey,消息参数(消息头等)(持久化时需要设置),消息体
             * MessageProperties有4中针对不同场景可以进行选择
             */
            channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,("Message:"+i).getBytes());
        }
        Long end = System.currentTimeMillis();
        System.out.println("System cost :"+(end-start));
        channel.close();
        connection.close();
    }
}

4 . 运行MessageProduce的Main方法,在管理界面会出现详细的监控数据,此时消息已经成功发送至RabbitMQ的队列中

5 . 消息消费者MessageConsumer.java

package com.evans.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by Evans on 2017/7/15.
 */
public class MessageConsumer {

    //队列名称
    private static final String QUEUE_NAME = "first";
    //主机IP
    private static final String HOST="10.0.0.37";
    //端口
    private static final Integer PORT=5672;
    //用户名
    private static final String USERNAME="evans";
    //密码
    private static final String PASSWORD="evans";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建工厂类
        ConnectionFactory factory = new ConnectionFactory();
        //设置参数
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        //创建连接
        Connection connection =factory.newConnection();
        //创建Channel
        Channel channel=connection.createChannel();
        //声明Queue
        /*
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments)
         * 队列名称,是否持久保存,是否为专用的队列,是否允许自动删除,配置参数
         * 此处的配置与RabbitMQ管理界面的配置一致
         */
        channel.queueDeclare(QUEUE_NAME,true,false,true,null);
        //这里需要复写handleDelivery方法进行消息自定义处理
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("Consume Get Message : "+message);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

6 . 运行MessageConsumer的Main方法,会进行消息消费处理,此时控制台会输出消费的消息,此时完成了消息的生产与消费的基本操作,当存在多个消费者的处理同一个队列时,RabbitMQ会自动进行均衡负载处理,多个消费者共同来处理消息

Consume Get Message : Message:0
Consume Get Message : Message:1
Consume Get Message : Message:2
...
Consume Get Message : Message:99

7 . RabbitMQ虚拟主机

a . 可以在管理界面的admin-vhost下设置多个虚拟主机

b . 在程序中通过设置factory参数进行虚拟主机的指定factory.setVirtualHost("firstHost")

8 . Exchange工作模式:topic、direct、fanout

a . 广播模式(fanout):一条消息被所有的消费者进行处理

① .将消费者与生产者中的`channel.queueDeclare()`方法替换为`channel.exchangeDeclare(EXCHANGE_NAME, "fanout")`方法进行Exchange的指定,channel.basicPublish()方法需要指定exchange
② .此时再次运行生产者和多个消费者,则一个消息会被多个消费者进行消费处理

b . 直连模式(direct):一跳消息根据RoutingKey进行生产者与消费者的匹配,从而达到指定生产者的消息被指定消费者进行处理

① .将生产者中的`channel.queueDeclare()`方法替换为`channel.exchangeDeclare(EXCHANGE_NAME, "direct")`方法进行Exchange的指定,channel.basicPublish()方法需要指定exchange和RoutingKey("mykey")
② .将消费者中的`channel.queueDeclare()`方法替换为
// 定义EXCHANGE的声明String
channel.exchangeDeclare(EXCHANGE_NAME, "direct") ;
// 通过通道获取一个队列名称                         
String queueName= channel.queueDeclare().getQueue() ;
// 进行绑定处理
channel.queueBind(queueName, EXCHANGE_NAME, "mykey") ;
③ .此时RoutingKey作为唯一标记,这样就可以将消息推送到指定的消费者进行处理

c . 主题模式(topic):一条消息被所有的消费者进行处理

① .将生产者中的`channel.queueDeclare()`方法替换为`channel.exchangeDeclare(EXCHANGE_NAME, "topic") `方法进行Exchange的指定,channel.basicPublish()方法需要指定exchange和RoutingKey("mykey-01")
② .将消费者中的`channel.queueDeclare()`方法替换为
// 定义EXCHANGE的声明String
channel.exchangeDeclare(EXCHANGE_NAME, "topic") ;
// 通过通道获取一个队列名称                         
String queueName= channel.queueDeclare().getQueue() ;
// 进行绑定处理
channel.queueBind(queueName, EXCHANGE_NAME, "mykey-01");
③ .此时主题模式即为广播模式与直连模式的混合使用。

三. RabbitMQ整合Spring

1 . 引入srping-rabbit的Repository


    org.springframework.amqp
    spring-rabbit
    1.7.3.RELEASE

2 . 建立rabbitmq.properties,对RabbitMQ的属性参数进行设置

# RabbitMQ的主机IP
mq.rabbit.host=192.168.68.211
# RabbitMQ的端口
mq.rabbit.port=5672
# RabbitMQ的VHost
mq.rabbit.vhost=hello
# RabbitMQ的exchange名称
mq.rabbit.exchange=spring.rabbit
# 用户名
mq.rabbit.username=evans
# 密码
mq.rabbit.password=evans

3 . 生产者XML(需增加xmlns:rabbit命名空间)



  
  
  
  
  
  
  
  
  
  
  
    
      
      
    
  
  
  

4 . 消费者XML(需增加xmlns:rabbit命名空间)



  
  
  
  
  
  
  
  
  
  
    
      
      
    
  
  
  
  
  
    
  

5 . 生产者

a . 定义消息Service

package com.evans.rabbitmq;

/**
 * Created by Evans 
 */
public interface MessageService {
    /**
     * 发送消息
     * @param message
     */
    public void sendMessage(String message);
}

b . 定义MessageService的实现类

package com.evans.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;

import javax.annotation.Resource;

/**
 * Created by Evans
 */
public class MessageServiceImpl implements MessageService {
    
    @Resource
    private AmqpTemplate template;
    
    @Override
    public void sendMessage(String message) {
        template.convertAndSend("key01",message);
    }
}

5 . 消费者

a .消费者需要实现MessageListener接口

b .消息处理类

package com.evans.rabbitmq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

/**
 * Created by Evans 
 */
public class MessageConsumer implements MessageListener {
    
    @Override
    public void onMessage(Message message) {
        System.out.println("Consumer Message: "+ message);    
    }
}

四. RabbitMQ整合SpringBoot

1 . 引入SpringBoot的RabbitMQ脚手架


    org.springframework.boot
    spring-boot-starter-amqp

2 . 配置Application.yml

spring:
  rabbitmq:
    host: 10.0.0.37
    port: 5672
    username: evans
    password: evans

3 . 配置类

package com.evans.rabbitmq;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**
 * Created by Evans 
 */
@Configuration
public class RabbitConfigure {
    @Bean
    public Queue firstQueue(){
        return new Queue("firstQueue");
    }
}

4 . 生产者

package com.evans.rabbitmq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.LocalDateTime;

/**
 * Created by Evans
 */
@Component
public class MessageProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;
    
    public void send(){
        LocalDateTime current =LocalDateTime.now();
        System.out.println("Send Message : "+current);
        rabbitTemplate.convertAndSend("firstQueue","Send Message"+ current);
    }
}

5 . 消费者

package com.evans.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Created by Evans 
 */
@Component
@RabbitListener(queues = "firstQueue")
public class MessageConsumer {

    @RabbitHandler
    public void consumer(String message){
        System.out.println("Consumer Message : "+message);
    }
}

6 . FanoutExchange配置

@Configuration
public class FanoutConfiguration {

    @Bean
    public Queue fanoutFirstQueue() {
        return new Queue("fanout.first");
    }

    @Bean
    public Queue fanoutSecondQueue() {
        return new Queue("fanout.second");
    }

    @Bean
    public Queue fanoutThirdQueue() {
        return new Queue("fanout.third");
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    public Binding bindingExchangeFanoutFirst(Queue fanoutFirstQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutFirstQueue).to(fanoutExchange);
    }

    @Bean
    public Binding bindingExchangeFanoutSecond(Queue fanoutSecondQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutSecondQueue).to(fanoutExchange);
    }

    @Bean
    public Binding bindingExchangeFanoutThird(Queue fanoutThirdQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutThirdQueue).to(fanoutExchange);
    }

}

7 . TopicExchange配置

@Configuration
public class TopicConfiguration {

    @Bean
    public Queue topicFirstQueue() {
        return new Queue("topic.first");
    }

    @Bean
    public Queue topicAnyQueue() {
        return new Queue("topic.any");
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    public Binding bindingExchangeTopicFirst(Queue topicFirstQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicFirstQueue).to(topicExchange).with("topic.first");
    }

    @Bean
    public Binding bindingExchangeTopicAny(Queue topicAnyQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicAnyQueue).to(topicExchange).with("topic.#");
    }

}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/67408.html

相关文章

  • RabbitMQ 基础教程(1) - Hello World

    摘要:基础教程注本文是对众多博客的学习和总结,可能存在理解错误。请带着怀疑的眼光,同时如果有错误希望能指出。安装库这里我们首先将消息推入队列,然后消费者从队列中去除消息进行消费。 RabbitMQ 基础教程(1) - Hello World 注:本文是对众多博客的学习和总结,可能存在理解错误。请带着怀疑的眼光,同时如果有错误希望能指出。 如果你喜欢我的文章,可以关注我的私人博客:http:...

    wushuiyong 评论0 收藏0
  • Spring Cloud构建微服务架构:消息驱动的微服务(入门)【Dalston版】

    摘要:它通过使用来连接消息代理中间件以实现消息事件驱动的微服务应用。该示例主要目标是构建一个基于的微服务应用,这个微服务应用将通过使用消息中间件来接收消息并将消息打印到日志中。下面我们通过编写生产消息的单元测试用例来完善我们的入门内容。 之前在写Spring Boot基础教程的时候写过一篇《Spring Boot中使用RabbitMQ》。在该文中,我们通过简单的配置和注解就能实现向Rabbi...

    smallStone 评论0 收藏0
  • Rabbitmq基础组件架构设计

    摘要:基础组件架构设计基础组件封装设计迅速消息发送支持迅速消息发送模式,在一些日志收集统计分析等需求下可以保证高性能,高吞吐量。基础组件封装设计事务消息发送支持事务消息,且保障可靠性投递,在金融行业单笔大金额操作时会有此类需求。 Rabbitmq基础组件架构设计 基础组件封装设计 - 迅速消息发送支持迅速消息发送模式,在一些日志收集、统计分析等需求下可以保证高性能,高吞吐量。 基础组件封...

    Steve_Wang_ 评论0 收藏0
  • PHP-RabbitMQ学习日记(一)

    摘要:通道,建立一个访问通道。队列,每个消息都会被投入到一个或多个队列。路由,根据这个关键字进行消息投递。消息消费者,就是接受消息的程序。 给自己做一个记录 本文主要介绍有一下 1.RabbitMQ是概念 2.RabbitMQ在windows上安装,启动,关闭 3.RabbitMQ其他小介绍 下面一步一步走起来 1.RabbitMQ是概念 RabbitMQ是一个建立在AMQP(高级消息队列协...

    SolomonXie 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<