七、通过主题进行消息分发
Topic Exchange
对于 Message 的routing_key
字符串格式是有限制的:以点号"."分割的字符表,如php.laravel
,并且长度不能超过 255 个字节。
对于 routing_key
而言,有两个特殊字符:
*
:代表任意单词#
:代表0个或多个单词
Topic Exchange 与其他 Exchange 的转化:
routing_key
是#
,会接收所有 Message,此时等同于 Fanout Exchange;routing_key
不包含#
或*
,则等同于 Direct Exchange
代码实现
emit_logs_topic.php
<?php
/**
* 发送消息
*/
$exchangeName = 'topic_logs';
$topic = empty($argv[1]) ? 'anonymous.info' : $argv[1]; // 主题
$message = empty($argv[2]) ? 'Hello World!' : $argv[2];
// 建立TCP连接
$connection = new AMQPConnection([
'host' => 'localhost',
'port' => '5672',
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
]);
$connection->connect() or die("Cannot connect to broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->declareExchange();
$exchange->publish($message, $topic);
echo "Message is sent: " . $message . "\n";
$connection->disconnect();
receive_logs_topic.php
<?php
/**
* 接收消息
*/
$exchangeName = 'topic_logs';
$topic = $argv[1];
// 建立TCP连接
$connection = new AMQPConnection([
'host' => 'localhost',
'port' => '5672',
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
]);
$connection->connect() or die("Cannot connect to broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->declareExchange();
$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->declareQueue();
$queue->bind($exchangeName, $topic);
echo "Waiting for logs...\n";
while (TRUE) {
$queue->consume('processLogs');
}
$connection->disconnect();
function processLogs($envelope, $queue) {
$logs = $envelope->getBody();
var_dump("Received: " . $logs);
$queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
}
运行代码:


1 Comment
$argv[1] 参数哪里来的