资讯专栏INFORMATION COLUMN

白话RabbitMQ(四): 建立路由

CoderStudy / 2192人阅读

摘要:可以参考源码,项目支持网站,最新文章或实现会更新在上面前言在订阅发布中我们建立了一个简单的日志系统,从而将消息广播给一些消费者。因此,发送到路由键的消息会发送给队列,发送到路由键或者的消息会发送给,其它的消息将被丢弃。

推广
RabbitMQ专题讲座

https://segmentfault.com/l/15...

CoolMQ开源项目

我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https://github.com/vvsuperman…,项目支持网站: http://rabbitmq.org.cn,最新文章或实现会更新在上面

前言

在订阅/发布中我们建立了一个简单的日志系统,从而将log消息广播给一些消费者。这章我们会在此基础上加入一些新的特性-我们将有针对性的进行消息分发,比如,只把错误(error)消息保存到磁盘,与此同时,打印出所有的消息。

绑定

我们在前面的例子中,绑定是这么来做的

</>复制代码

  1. channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定是建立交换机和队列之间的一种联系:队列会接受交换机中的消息。绑定可以用一个路由键来指明,为了与basic_publish区分开,我们称之为绑定键(binding key):

</>复制代码

  1. channel.queueBind(queueName, EXCHANGE_NAME, "black");

绑定键跟路由器类型也有关系,我们之前用的广播路由器,会忽略掉这个值

直达交换机(Direct Exchange)

之前我们用的是广播交换机,会将消息发送给所有的消费者。这里我们希望通过log的严重程度进行过滤,例如只有严重的错误才会写入到磁盘,而warn和info消息就不用了,以此来节省磁盘空间

而广播交换机没法满足这个需求-它只是无脑的发送消息。所以我们会使用直达交换机(Direct Exchange)- 消息会通过所绑定的键来发送给对应的队列,可以看如下这幅图

如上图所示,直达交换机X绑定了两个队列,C1是通过orange来绑定,而C2是通过black和green绑定。因此,发送到路由键orange的消息会发送给队列Q1,发送到路由键black或者green的消息会发送给Q2,其它的消息将被丢弃。

多项绑定


当然,多个队列绑定到一个键上也是合法的,在这种情况下,直达交换机将会将消息发送给所有的队列,就像广播交换机一样,如上图所示,一个键为black的消息将会同时被发送给C1和C2.

我们首先需要创建一个直达路由器

</>复制代码

  1. channel.exchangeDeclare(EXCHANGE_NAME, "direct");

并发送消息到这个路由器

</>复制代码

  1. channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

上面我们是发送给"severity",简单起见,假设有下列几种日志类型"severity" ,"info", "warning", "error".

订阅消息(Subscribing)

接受消息跟之前一样,但有一点不同,我们提供了一个binding key,

</>复制代码

  1. String queueName = channel.queueDeclare().getQueue();
  2. for(String severity : argv){
  3. channel.queueBind(queueName, EXCHANGE_NAME, severity);
  4. }
整合

将上面的所有代码整合到一起

EmitLogDirect.java

</>复制代码

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. public class EmitLogDirect {
  4. private static final String EXCHANGE_NAME = "direct_logs";
  5. public static void main(String[] argv)
  6. throws java.io.IOException {
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("localhost");
  9. Connection connection = factory.newConnection();
  10. Channel channel = connection.createChannel();
  11. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  12. String severity = getSeverity(argv);
  13. String message = getMessage(argv);
  14. channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
  15. System.out.println(" [x] Sent "" + severity + "":"" + message + """);
  16. channel.close();
  17. connection.close();
  18. }
  19. //..
  20. }

ReceiveLogsDirect.java

</>复制代码

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. public class ReceiveLogsDirect {
  4. private static final String EXCHANGE_NAME = "direct_logs";
  5. public static void main(String[] argv) throws Exception {
  6. ConnectionFactory factory = new ConnectionFactory();
  7. factory.setHost("localhost");
  8. Connection connection = factory.newConnection();
  9. Channel channel = connection.createChannel();
  10. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  11. String queueName = channel.queueDeclare().getQueue();
  12. if (argv.length < 1){
  13. System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
  14. System.exit(1);
  15. }
  16. for(String severity : argv){
  17. channel.queueBind(queueName, EXCHANGE_NAME, severity);
  18. }
  19. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  20. Consumer consumer = new DefaultConsumer(channel) {
  21. @Override
  22. public void handleDelivery(String consumerTag, Envelope envelope,
  23. AMQP.BasicProperties properties, byte[] body) throws IOException {
  24. String message = new String(body, "UTF-8");
  25. System.out.println(" [x] Received "" + envelope.getRoutingKey() + "":"" + message + """);
  26. }
  27. };
  28. channel.basicConsume(queueName, true, consumer);
  29. }
  30. }

编译

</>复制代码

  1. javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java

只保存warning和error的消息到磁盘上

</>复制代码

  1. java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

将所有的消息打印到频幕上

</>复制代码

  1. java -cp $CP ReceiveLogsDirect info warning error
  2. # => [*] Waiting for logs. To exit press CTRL+C

最后,发送error消息

</>复制代码

  1. java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
  2. # => [x] Sent "error":"Run. Run. Or it will explode."

好了,这一章就到这儿,下一章我们将讲述如何基于特定模式进行监听

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68120.html

相关文章

  • 白话RabbitMQ(五): 主题路由器(Topic Exchange)

    摘要:推广专题讲座开源项目我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。主题交换机也可以当成其它交换机来使用,假如队列绑定到了那么它会接收所有的消息,就像广播路由器一样而如果未使用,那么就跟直达路由器一样了。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性...

    Gilbertat 评论0 收藏0
  • 白话RabbitMQ(三):发布/订阅

    摘要:推广专题讲座开源项目我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。因此一旦有有消息,消息会广播到所有的消费者。如此一来路由器就能够把消息发送给相应的队列了。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https...

    Ververica 评论0 收藏0
  • 白话rabbitmq(一): HelloWorld

    摘要:作为消息队列的一个典型实践,完全实现了标准,与的快快快不同,它追求的稳定可靠。同一个队列不仅可以绑定多个生产者,而且能够发送消息到多个消费者。消费者接受并消费消息。几乎于完全类似是一个继承了接口的类,方便我们来存储消息队列来的消息。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的...

    garfileo 评论0 收藏0
  • 白话RabbitMQ(六): RPC

    摘要:因为消费消息是在另外一个进程中,我们需要阻塞我们的进程直到结果返回,使用阻塞队列是一种非常好的方式,这里我们使用了长度为的,的功能是检查消息的的是不是我们之前所发送的,如果是,将返回值返回到。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考...

    KevinYan 评论0 收藏0
  • 【译】RabbitMQ系列() - 路由模式

    摘要:路由模式在之前的文章中我们建立了一个简单的日志系统。更形象的表示,如对中的感兴趣。为了进行说明,像下图这么来设置如图,可以看到有两个绑到了类型为的上。如图的设置中,一个为的就会同时发送到和。接收程序可以选择要接收日志的严重性级别。 路由模式 在之前的文章中我们建立了一个简单的日志系统。我们可以通过这个系统将日志message广播给很多接收者。 在本篇文章中,我们在这之上,添加一个新的功...

    liuchengxu 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<