六、消息路由


如果我们只想让 error 级别的 log 保存到磁盘,该怎么做?

emit_logs_direct.php

<?php
/**
 * 发送消息
 */

$exchangeName = 'direct_logs';
$level = empty($argv[1]) ? 'info' : $argv[1]; // 错误级别:info、warn、error
$message = empty($argv[2]) ? 'Hello World!' : $argv[2];

// 建立TCP连接
$connection = new AMQPConnection([
    'host' => 'localhost',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
]);
$connection->connect() or die("Cannot connect to broker!\n");

$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declareExchange();

$exchange->publish($message, $level);
echo "Message is sent: " . $message . "\n";
$connection->disconnect();

receive_logs_direct.php

<?php
/**
 * 接收消息
 */

$exchangeName = 'direct_logs';
$level = !empty($argv[1]) ? $argv[1] : 'error';

// 建立TCP连接
$connection = new AMQPConnection([
    'host' => 'localhost',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
]);
$connection->connect() or die("Cannot connect to broker!\n");

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declareExchange();

$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->declareQueue();
$queue->bind($exchangeName, $level);

echo "Waiting for logs...\n";
while (TRUE) {
    $queue->consume('processLogs');
}

$connection->disconnect();

function processLogs($envelope, $queue) {
    $logs = $envelope->getBody();
    var_dump("Received: " . $logs);
    $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
}

测试示例

开三个终端,两个用于监听执行消费者队列,一个用于执行生产者脚本。

生产者:

消费者1:

消费者2:

表示只将 error 级别日志记录到日志文件。


Vote Vote Cancel Collect Collect Cancel

<< 上一篇: 五、消息订阅(Publish/Subscribe)

>> 下一篇: 七、通过主题进行消息分发