资讯专栏INFORMATION COLUMN

白话RabbitMQ(五): 主题路由器(Topic Exchange)

Gilbertat / 901人阅读

摘要:推广专题讲座开源项目我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。主题交换机也可以当成其它交换机来使用,假如队列绑定到了那么它会接收所有的消息,就像广播路由器一样而如果未使用,那么就跟直达路由器一样了。

推广
RabbitMQ专题讲座

https://segmentfault.com/l/15...

CoolMQ开源项目

我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https://github.com/vvsuperman…,项目支持网站: http://rabbitmq.org.cn,最新文章或实现会更新在上面

前言

在之前的建立路由中我们改进了日志系统。我们摒弃无脑发送消息的广播路由器,而使用能够根据绑定键(binding key)来发送消息的,从而能有有选择的后去logs.

尽管使用直达路由器大大的改进了我们系统,但也存在局限性 - 无法加入更多条件。比如我们希望能够加入更多的维度,我们希望不仅是基于严重程度,而且是基于来源,如果你对linux tool工具有了解的话,它不仅仅是基于严重程度(info/warn/crit...) 而且有来源(auth/cron/kern...),这个给到我们更大的灵活性-我们需要监听所有来自"cron"的errors消息,以及来自"kern"的所有log。所以我们需要的是一个更复杂的主题交换机

主题交换机

发送到主题交换机的消息并不会有一个确定的路由键-而是一长串字符列表,以"."来分割,而这个字符串列表表明了路由信息,比如"stock.usd.nyse","nyse.vmw","quick.orange.rabbit",字符串的最大长度限制在255bytes。

同时,在队列绑定交换机时也需要指定模式,而符合模式的消息将会被发送至该队列,模式可以由通配符组成:
"*" 可以表示一个词
"#" 表示0个或多个词
可以通过如下的例子来说明

请看例子,以发送动物的消息为例,我们会发送包含三个词的路由键(两个".")。第一个是速度,第二个是颜色,而第三个是种族

同时,我们建立了三个绑定,Q1绑定了键".orange.",Q2绑定了键"..rabbit"以及"lazy.#"。可以做如下的解释,Q1用来接受所有orange的动物,Q2用来接受所有rabbits,以及lazy的动物

一个路由为"quick.orange.rabbit"的消息将会被同时发送给这两个队列,消息"lazy.orange.elephant"也会被同时发给它们;"quick.orange.fox"只会发给第一个队列;"lazy.brown.fox"会发到第二个;"lazy.pink.rabbit"将只会发送给第二个;"quick.brown.fox"会被丢弃因为匹配不上任何一个。

如果我们发送四个词的呢?比如"oragne"或者"quick.orange.male.rabbit"?这些没有任何匹配的队列将会丢失。但比如"quick.orange.male.rabbit"会匹配到第二个队列。

主题交换机也可以当成其它交换机来使用,假如队列绑定到了 "#",那么它会接收所有的消息,就像广播路由器一样;而如果未使用"*","#",那么就跟直达路由器一样了。

整合所有的代码

我们用主题交换机替换掉之前的直达交换机,用如同"."的形式, EmitLogTopic.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println(" [x] Sent "" + routingKey + "":"" + message + """);

        connection.close();
    }
    //...
}

ReceiveLogsTopic.java的代码片段

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsTopic {
  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
      System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
      System.exit(1);
    }

    for (String bindingKey : argv) {
      channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    }

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received "" + envelope.getRoutingKey() + "":"" + message + """);
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

编译这段代码

javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java

接受所有的logs

java -cp $CP ReceiveLogsTopic "#"

接受来自"kern"的消息

java -cp $CP ReceiveLogsTopic "kern.*"

接受来自"critical"的消息

java -cp $CP ReceiveLogsTopic "*.critical"

创建多个绑定

java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

发送消息

java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"

你可以尝试更多的参数,以此来熟悉这个知识

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68117.html

相关文章

  • 白话RabbitMQ(三):发布/订阅

    摘要:推广专题讲座开源项目我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。因此一旦有有消息,消息会广播到所有的消费者。如此一来路由器就能够把消息发送给相应的队列了。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https...

    Ververica 评论0 收藏0
  • RabbitMQ+PHP 教程Topics)

    摘要:前提必读本教程假设是安装在标准端口上运行。这些词可以是任何东西,但通常它们指定连接到消息的某些特性。如果我们违背合同,用一个或四个词,如或那么,这些消息将不匹配任何绑定并将丢失。代码与前面的教程几乎相同。 (using php-amqplib) 前提必读 本教程假设RabbitMQ是安装在标准端口上运行(5672)。如果您使用不同的主机、端口或凭据,则连接设置需要调整。 在哪里得到帮助...

    nemo 评论0 收藏0
  • 【译】RabbitMQ系列() - 主题模式

    摘要:主题模式在上一章我们改进了我们的日志系统,如果使用我们只能简单进行广播,而使用则允许消费者可以进行一定程度的选择。为的会同时发布到这两个。当为时,会接收所有的。当中没有使用通配符和时,的行为和一致。 主题模式 在上一章我们改进了我们的日志系统,如果使用fanout我们只能简单进行广播,而使用direct则允许消费者可以进行一定程度的选择。但是direct还是有其局限性,其路由不支持多个...

    pingan8787 评论0 收藏0
  • 简述消息队列在电商系统使用场景以及工作模式

    摘要:概述概述消息队列,是分布式系统中重要的组件,是一种进程间通信或者是同一进程的不同线程的通信方式。消息队列的使用场景消息队列的使用场景异步处理流量控制应用解耦应用解耦应用解耦消息队列的一个作用就是实现系统应用之间的解耦。概述消息队列(Message Queue),是分布式系统中重要的组件,是一种进程间通信或者是同一进程的不同线程的通信方式。和 http 同步协议不同的是,消息队列是一种异步的通...

    Honwhy 评论0 收藏0
  • PHP RabbitMQ

    摘要:消息队列,用于存储还未被消费者消费的消息。由在与时指定,而由发送时指定,两者的匹配方式由决定。需要为每一个创建,协议规定只有通过才能执行的命令。建议客户端线程之间不要共用,至少要保证共用的线程发送消息必须是串行的,但是建议尽量共用。 安装 rabbitmq 在 mac 下可以直接用 brew 安装默认安装在 /usr/local/Cellar/下命令被软连接加入到了/usr/local...

    anRui 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<