RabbitMQ - topic (话题)

 提示:转载请注明原文链接

 本文链接:https://360us.net/article/63.html

Routing那一文是日志系统还是有不足的地方,如果我们想既基于选择和发送消息的来源来接收日志呢?

比如像Linux的syslog一样,可以基于消息的级别(info/warn..),产生消息的来源(auth/cron)来记录日志。

要实行这样的功能,就需要topic类型的exchange了。

topic类型的exchange,routing_key参数值不能随便设置,需要是一个点(.)分隔的词列表,词可以随意设置,一般是使用有意义的内容。比如stock.usd.nyse,数据长度是最大是255个字符。

binding key也是同样的格式,可以有两个特殊的字符:

  • *:匹配单个词。
  • #:匹配0个或多个词。

比如*.orange.*lazy.#,这里的词指的是点分隔的完整字符。

生产者:

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.33.10', 5672, 'admin', 'admin');
$channel = $connection->channel(); //创建channel

//声明一个topic类型exchange logs
$channel->exchange_declare('topic_logs', 'topic', false, false, false);

//routing key从参数里面手动给出
$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';

$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "info: Hello World!";
}

//往队列里面推消息
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo " [x] Sent ' , $routing_key , ':' , $data'\n";

//关闭channel和连接
$channel->close();
$connection->close();

消费者:

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.33.10', 5672, 'admin', 'admin');
$channel = $connection->channel();

//声明一个exchange topic_logs
$channel->exchange_declare('topic_logs', 'topic', false, false, false);

//创建一个临时队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

//绑定exchange和临时队列
$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}
foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

//声明一个匿名函数,用来处理接收到的消息
$callback = function ($msg) {
    sleep(3);
    echo ' [x] ', $msg->delivery_info['routing_key'] , ':' , $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

//有回调函数的时候,代码会阻塞,当有消息到来的时候就会调用$callback来处理
while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

运行两个消费者

#设置了auth.info和nginx.*两个binding key
php consumer.php auth.info nginx.*

#设置了auth.#和nginx.error两个binding key
php consumer.php auth.# nginx.error

运行消费者:

php protucer.php nginx.error logs1
php protucer.php nginx.info logs2
php protucer.php auth.info logs2
php protucer.php auth.error logs2
php protucer.php auth.info.test logs2
php protucer.php auth.test logs2

消费者输出如下:

vagrant@ubuntu-bionic:$ php consumer.php auth.info nginx.*
 [*] Waiting for logs. To exit press CTRL+C
 [x] nginx.error:logs1
 [x] nginx.info:logs2
 [x] auth.info:logs2

vagrant@ubuntu-bionic:$ php consumer.php auth.# nginx.error
 [*] Waiting for logs. To exit press CTRL+C
 [x] nginx.error:logs1
 [x] auth.info:logs2
 [x] auth.error:logs2
 [x] auth.info.test:logs2
 [x] auth.test:logs2

参考 https://www.rabbitmq.com/tutorials/tutorial-five-php.html


本文链接:https://360us.net/article/63.html