资讯专栏INFORMATION COLUMN

PHP RabbitMQ

anRui / 3251人阅读

摘要:消息队列,用于存储还未被消费者消费的消息。由在与时指定,而由发送时指定,两者的匹配方式由决定。需要为每一个创建,协议规定只有通过才能执行的命令。建议客户端线程之间不要共用,至少要保证共用的线程发送消息必须是串行的,但是建议尽量共用。

安装

rabbitmq 在 mac 下可以直接用 brew 安装
默认安装在 /usr/local/Cellar/下
命令被软连接加入到了/usr/local/sbin 下,因此可以把此目录放到环境变量中,建议加入到~/.bash_profile 中
rabbitmq-server start 开启服务
端口5672 默认
端口15672 web 端登录管理端口127.0.0.1:15672
rabbitmq 默认提供的用户 guest 密码 guest

管理

停止服务
rabbitmqctl stop
开启应用 [服务依旧运行]
rabbitmqctl start_app
停止应用 [服务依旧运行]
rabbitmqctl stop_app

用户管理

添加用户
sudo rabbitmqctl add_user username password
删除用户
sudo rabbitmqctl delete_user username
修改密码
sudo rabbitmqctl change_password username newpassword
清除用户密码,禁止用户登录
sudo rabbitmqctl clear_password
列出所有用户
sudo rabbitmqctl list_users
设置用户角色
rabbitmqctl set_user_tags username tag

vhost虚拟主机管理

virtual host只是起到一个命名空间的作用,所以可以多个user共同使用一个virtual host,"/"这个是系统默认的vhost,就是说当我们创建一个到rabbitmq的connection时候,它的命名空间是"/",需要注意的是不同的命名空间之间的资源是不能访问的,比如 exchang,queue ,bingding等
创建虚拟主机
sudo rabbitmqctl add_vhost vhostpath
删除虚拟主机
sudo rabbitmqctl delete_vhost vhostpath
列出所有虚拟主机
sudo rabbitmqctl list_vhosts
列出某个 vhost 的所有用户和权限
list_permissions [-p vhostpath]
列出某个用户的所有权限。
list_user_permissions {username}
清除用户对某个 vhost 的权限。
clear_permissions [-p vhostpath] {username}
设置用户对某个 virtual host 的权限,如果不指定 vhost,则默认为“/” vhost。
set_permissions [-p vhostpath] {user}
rabbitmqctl set_permissions -p test_host kang “." "." ".*"

添加一个管理员代替 guest
rabbitmqctl add_user admin 123456
指定用户的角色
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin "." "." ".*”
分配给用户指定虚拟主机的权限,虽然是administrator角色,但不对所有虚拟主机都有权限,一样需要对每个虚拟主机都授权

显示信息
rabbitmqctl list_queues [-p ] [ ...]
列出某个 vhost 的所有 queue。
rabbitmqctl list_exchanges [-p ] [ ...]
列出某个 vhost 的所有 exchange。
rabbitmqctl list_bindings [-p ] [ ...]
列出某个 vhost 的所有 binding。
rabbitmqctl list_connections [ ...]
列出 RabbitMQ broker 的所有 connection。
rabbitmqctl list_channels [ ...]
列出 RabbitMQ broker 的所有 channel
rabbitmqcrl list_consumers [-p ]
列出某个 vhost 的所有 consumer。

基本概念点

1.Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程。

2.Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host

3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。

4.Message Queue:消息队列,用于存储还未被消费者消费的消息。

5.Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。

6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。

7.Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。

8.Channel:信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。

9.Command:AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。

客户端管理

php端 rabbitmq 客户端可以使用 composer 下的库

{
    "require": {
        "php-amqplib/php-amqplib": "2.5.*"
    }
}

使用时,用到了这两个东西

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

开始发送消息

public function handle()
{
    //连接到 test_host 虚拟主机,每个虚拟主机有自己的队列,交换机...
    $connection = new AMQPStreamConnection("127.0.0.1", 5672, "kang", "a943434603", "test_host");
    //创建一个 channel
    $channel = $connection->channel();
    //声明 hello 队列
    $channel->queue_declare("hello", false, false, false, false);
    //创建一个消息
    $msg = new AMQPMessage(time());
    //把消息推送到默认的交换机中,并且告诉交换机要把消息交给 hello 队列
    $channel->basic_publish($msg, "", "hello");
    echo " [x] Sent ".time()."
";
}

重要概念,消息是保存在交换机中的,当消息存放时指定的队列存在,交换机会把消息推送到该队列

消息队列发送消息给消费者,一个消息发给一个消费者

public function handle()
{
    //连接
    $connection = new AMQPStreamConnection("localhost", 5672, "kang", "a943434603", "test_host");
    //创建一个 channel
    $channel = $connection->channel();
    //可以运行这个命令很多次,但是只有一个队列会被创建, 在程序中重复将队列重复声明一下是种值得推荐的做法,保证队列存在
    $channel->queue_declare("hello", false, false, false, false);

    echo " [*] Waiting for messages. To exit press CTRL+C", "
";

    $callback = function($msg) {
        echo " [x] Received ", $msg->body, "
";
        sleep($msg->body);
        $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
    };

    //默认情况下,队列会把消息公平的分配给各个消费者
    //如果某个消费者脚本处理完成分配给他的消息任务后,会一直空闲
    //另外一个消费者脚本处理的消息都非常耗时,这就容易导致消费者脚本得不到合理利用,
    //加入此句话,是告诉队列,取消把消息公平分配到各个脚本,而是那个脚本空闲,就交给它一个消息任务
    //这样,合理利用到每一个空闲的消费者脚本
    $channel->basic_qos(null, 1, null);

    /**
     * basic_consume 方法    从队列中读取数据
     * @param string $queue 指定队列
     * @param string $consumer_tag
     * @param bool $no_local
     * @param bool $no_ack    消费者处理完消息后,是否不需要告诉队列已经处理完成,true 不需要 false 需要,
     * true
        默认情况下,队列会把消息公平分配到各个消费者中,然后一次性把消息交给消费者,如果消费者处理了一半挂了,那么消息就丢失了
     * false
        默认情况下,队列会把消息公平的分配给各个消费者,然后一个一个的把消息分配到消费者脚本中,脚本处理完成后,告诉队列,队列会删除这个消息,并且接着给下一个消息,
        当脚本挂掉,不会丢失消息,队列会把未完成的消息分配给其他消费者
        在 callback 函数中需要加入这句话,处理完后通知队列可以删除消息了
        $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
        未加入这句话,队列不会删除已处理完的消息,当脚本挂掉时,会把分配给当前队列的所有消息再次重新分配给其他队列,会导致消息会重复处理
     */
    $channel->basic_consume("hello", "", false, false, false, false, $callback);

    while(count($channel->callbacks)) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
}
发布/订阅

一个消息发送给多个消费者

扇形交换机 fanout
发布订阅模式,科院实现一个消息发送到多个队列中
在发布消息脚本中,创建一个扇形交换机,把消息推送到交换机,不需要推动到指定的队列中,队列在消费者脚本中创建
消费脚本定义个临时队列,并绑定这个临时队列到交换机中,扇形交换机会把接收到的消息推动到每一个绑定的队列中

生产者脚本

public function handle()
{
    //连接到 test_host 虚拟主机,每个虚拟主机有自己的队列,交换机...
    $connection = new AMQPStreamConnection("127.0.0.1", 5672, "kang", "a943434603", "test_host");
    //创建一个 channel
    $channel = $connection->channel();
    //声明 log 队列
    //$channel->queue_declare("log", false, false, false, false);
    //创建一个fanout类型交换机
    $channel->exchange_declare("logs","fanout",false,false,false);

    //创建一个消息
    $msg = new AMQPMessage( time() );
    $channel->basic_publish ( $msg , "logs" );

    echo " [x] Sent ".time()."
";

    $channel->close();
    $connection->close();
}

消费者脚本

public function handle()
{
    //连接
    $connection = new AMQPStreamConnection("localhost", 5672, "kang", "a943434603", "test_host");
    //创建一个 channel
    $channel = $connection->channel();
    //可以运行这个命令很多次,但是只有一个队列会被创建, 在程序中重复将队列重复声明一下是种值得推荐的做法,保证队列存在
    //$channel->queue_declare("hello", false, false, false, false);
    //创建一个fanout类型交换机
    $channel->exchange_declare("logs", "fanout", false, false, false);

    //系统创建一个临时队列
    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    //绑定临时队列到交换机上
    $channel->queue_bind($queue_name, "logs");

    $callback = function($msg){
        echo " [x] ", $msg->body, "
";
    };
    $channel->basic_consume($queue_name, "", false, true, false, false, $callback);

    while(count($channel->callbacks)) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
}

直连交换机 direct
交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列

生产者,创建基本上和扇形交换机一样,不同的是

$channel->exchange_declare("direct_logs","direct",false,false,false);
//创建一个消息
$msg = new AMQPMessage( time() );
//把消息推动到direct_logs交换机,并给消息加上路由 key,让消费者队列来根据 key 接收消息
$channel->basic_publish ( $msg , "direct_logs", "warning" );

消费者

$channel->exchange_declare("direct_logs","direct",false,false,false);
//系统创建一个临时队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
//绑定临时队列到交换机上,并指定消息的路由 key
$channel->queue_bind($queue_name, "direct_logs", "error");
$channel->queue_bind($queue_name, "direct_logs", "warning");

主题交换机 topic

直连交换机中路由 key 匹配模式
(星号*) 用来表示一个单词.
(井号#) 用来表示任意数量(零个或多个)单词。

Q1会接收到 a.orange.b 等key 值中间为 orange 的消息
Q2会接收到 a.b.rabbit, lazy.a, lazy.a.b.c 等消息

当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。

生产者,指定路由 key

$channel->exchange_declare("topic_logs","topic",false,false,false);
//创建一个消息
$msg = new AMQPMessage( time() );
//把消息推动到direct_logs交换机,并给消息加上路由 key,让消费者队列来根据 key 接收消息
$channel->basic_publish ( $msg , "topic_logs", "baidu.warning" );

消费者1

$channel->exchange_declare("topic_logs","topic",false,false,false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, "topic_logs", "baidu.#");

消费者2

$channel->exchange_declare("topic_logs","topic",false,false,false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, "topic_logs", "ali.#");
参考

https://www.rabbitmq.com/tutorials/tutorial-one-php.html
http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/21527.html

相关文章

  • RabbitMQ】——centos7安装rabbitmq教程 以及 PHP开启rabbitmq扩展

    摘要:第一步安装因为是语言编写的,所以我们首先需要安装第二步安装官网提供的安装方式本人安装成功的方式第三步查看是否已经安装好了,能查到说明已经安装完成了。 第一步:安装Erlang 因为rabbitMQ是Erlang语言编写的,所以我们首先需要安装Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...

    lscho 评论0 收藏0
  • RabbitMQ】——centos7安装rabbitmq教程 以及 PHP开启rabbitmq扩展

    摘要:第一步安装因为是语言编写的,所以我们首先需要安装第二步安装官网提供的安装方式本人安装成功的方式第三步查看是否已经安装好了,能查到说明已经安装完成了。 第一步:安装Erlang 因为rabbitMQ是Erlang语言编写的,所以我们首先需要安装Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...

    printempw 评论0 收藏0
  • RabbitMQ+PHP 消息队列环境配置

    摘要:参考文档依赖包安装环境配置环境变量增加内容保存退出,并刷新变量测试是否安装成功安装完成以后,执行看是否能打开,用退出,注意后面的点号,那是的结束符。 参考文档:http://www.cnblogs.com/phpinfo/p/4104551...http://blog.csdn.net/historyasamirror/ar... 依赖包安装 yum install ncurses-d...

    geekidentity 评论0 收藏0
  • RabbitMQ+PHP 教程一(Hello World)

    摘要:在中间的框是一个队列的消息缓冲区,保持代表的消费。本教程介绍,这是一个开放的通用的协议消息。我们将在本教程中使用,解决依赖管理。发送者将连接到,发送一条消息,然后退出。注意,这与发送发布的队列匹配。 介绍 RabbitMQ是一个消息代理器:它接受和转发消息。你可以把它当作一个邮局:当你把邮件放在信箱里时,你可以肯定邮差先生最终会把邮件送到你的收件人那里。在这个比喻中,RabbitMQ就...

    silencezwm 评论0 收藏0

发表评论

0条评论

anRui

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<