RabbitMQ - 发布-订阅

 提示:转载请注明原文链接

 本文链接:https://360us.net/article/61.html

一个消息分发给多个消费者,这就叫做发布-订阅模式。

作为示例,这里构建一个日志系统,由两部分组成,一个是发送日志消息,另一个是接收和打印。

exchange

exchange就是接收生产者的消息,然后推送给队列的一个东西,类似邮局。

exchange有几种类型,direct, topic, headersfanout

不同的exchange类型会有不同的行为,各种类型的使用在后续文章会有介绍,这里我们先只关注最后一种。

$channel->exchange_declare('logs', 'fanout', false, false, false);

fanout类型是广播消息到所有已知的队列。

创建一个名字是logsexchange

$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');

RabbitMQ默认是有好几个exchange的,$channel->basic_publish($msg, '', 'hello');第二个参数传空字符串,用的就是默认的exchange,使用命令sudo rabbitmqctl list_exchanges列出,有一些amq开头的exchange,没有名字类型是direct的exchange就是系统默认的。

临时队列

对于日志,我们只关心当前的日志消息流,而不关心老的,所以需要一个新的空队列。

临时队列在连上服务器时会自动创建,随机名称,消费者断开服务器后会自动销毁。

创建临时队列:list($queue_name, ,) = $channel->queue_declare("");,只需要给参数一个空名称即可。

绑定

创建好exchange和临时队列后,要告诉exchange发送消息到我们创建的队列,这个就是绑定。

$channel->queue_bind($queue_name, 'logs');

rabbitmqctl list_bindings列出已存在的绑定关系列表

最终生产者代码:

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.33.10', 5672, 'admin', 'admin');
$channel = $connection->channel(); //创建channel

//声明一个exchange logs
$channel->exchange_declare('logs', 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "info: Hello World!";
}

//往队列里面推消息
$msg = new AMQPMessage($data);
$result = $channel->basic_publish($msg, 'logs');
var_dump($result);

echo " [x] Sent '$data'\n";

//关闭channel和连接
$channel->close();
$connection->close();

消费者代码:

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.33.10', 5672, 'admin', 'admin');
$channel = $connection->channel();

//声明一个exchange logs
$channel->exchange_declare('logs', 'fanout', false, false, false);

//创建一个临时队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

//绑定exchange和临时队列
$channel->queue_bind($queue_name, 'logs');

echo " [*] Waiting for logs. To exit press CTRL+C\n";

//声明一个匿名函数,用来处理接收到的消息
$callback = function ($msg) {
    sleep(3);
    echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

//有回调函数的时候,代码会阻塞,当有消息到来的时候就会调用$callback来处理
while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

这种模式下面需要先运行消费者,创建队列后,生产者才能推送消息。

这里可以运行多个消费者,可以看到他们都能收到全部的消息。

参考RabbitMQ官方教程:http://www.rabbitmq.com/getstarted.html


本文链接:https://360us.net/article/61.html