RabbitMQ官网教程翻译(PHP版本)_3

本文所有内容均个人从RabbitMQ官网教程中翻译,若图片文字的引用有任何侵权的地方,联系我,我会立马删除。

This article was translated from RabbitMQ Official Tutorials by myself,and if this article and the images in this article have any infringement,please contact to me, and i will delete them.

发布/订阅

(使用php-amqplib

在上一个教程,我们创建了一个工作队列。Work Queue(工作队列)是在每一个任务都分发给一个确切的处理程序的假设上建立的。在这一部分,我们将会做一些完全不同的事情——我们将会发送一条消息去多个Consumers(消费者)上。这种模式被称为“发布/订阅”

为了说明这种模式,我们将会建立一个简单的日志系统。它将会包含两个程序——第一个会发出日志消息,而另一个则会接收并打印这些消息。

在我们的日志系统里,每一个正在运行的接收程序副本都会获得(所有)消息。那样我们就可以运行一个接收程序把日志引导到磁盘;同时我们可以运行另外一个接收程序把日志打印到屏幕上来看。

基本上,被发布的消息将会在所有接收程序间进行广播。

交换

在本教程的上一部分,我们通过一个Queue(队列)发送和接收消息。现在是时候介绍一下 RabbitMQ 的全消息模型(Full Messaging Model)了。

让我们快速地复习前面的教程涵盖的内容:
1.一个Producer(生产者)就是一个发送消息的程序。
2.一个Queue(队列)就是一个保存消息的缓冲(buffer)
3.一个Consumer(消费者)就是一个接收消息的用户程序。

RabbitMQ的消息模型的主要思想就是:Producer(生产者)从不会直接将消息发送到Queue(队列)中。实际上,很多时候Producer(生产者)甚至完全不知道一条消息将会被发送到Queue(队列)

相反地,Producer(生产者)只能发送消息到一个交换机(exchange)。一个交换机就是一个非常简单的东西。一方面(或者是叫一端更形象?)从Producer(生产者)处接收消息,另一方面(端?)它把这些消息推进Queue(队列)中。交换机必须准确地知道对它接收的每一条消息做什么。应该将这条消息追加到一个特别的Queue(队列)吗?应该将这条消息追加到多个Queue(队列)吗?或者应该把这条消息丢弃掉吗?做这些事情的规则是通过定义交换类型来确定的。

《RabbitMQ官网教程翻译(PHP版本)_3》 Produce->Exchange->Queues

这里只有少数有效的交换类型:directtopicheadersfanout。我们将会集中(介绍)最后一个fanout。让我们创建一个这种类型的交换机并称他为logs

$channel->exchange_declare('logs', 'fanout', false, false, false);

fanout交换机是非常简单的。从它的名称就能猜到(反正我是不知道怎么翻译好-_-),它只是将它获取到的所有消息广播到它所知道的所有Queue(队列)中。这与我们的日志系统需求十分吻合。

列出交换机

你可以运行十分有用的 rabbitmqctl 来列出服务器上的所有交换机:

sudo rabbitmqctl list_exchanges

在列出的队列中会由一些 amq.* 交换机以及默认的(未被命名)的交换机。这是默认创建的,但此时你似乎并不需要使用他们。

默认的交换机

在教程的上一部分我们对交换机一无所知,却仍然可以发送消息到Queues(队列)中。这很可能是因为我们使用了一个通过空字符串(””)识别的默认的交换机。

重新回顾我们之前发送一个消息的时候:

$channel->basic_publish($msg, '', 'hello');

在此处,我们使用了默认的,或者说是无名的交换机:消息会被路由到routing_key指定的Queue(队列),如果这个队列存在的化。routing_key就是basic_publish 的第三个参数。

现在,让我们发送消息到一个被命名的交换机上:

$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');

临时队列

你可能记得之前我们使用的Queue(队列)都是拥有了一个指定的名称(还记得hellotask_queue吗?)。在我们需要将工作程序指向对应的Queue(队列)时可以命名一个Queue(队列)对于我们来说是十分重要的。当你想要在Producer(生产者)Consumer(消费者)之间共享Queue(队列)时候为Queue(队列)命名是十分重要的。

但是这对我们的日志记录器来说这并不重要。我们打算监听所有的日志消息吗,而不是仅仅是其中一部分。同样我们只对当前流动的消息感兴趣,而不是旧的消息。为了解决这一情况,我们需要两样东西。

首先,无论何时,我们连接到RabbitMQ时候都需要一个新的,并且是空的Queue(队列)。为了达到这一目的,我们可以使用随机的名称来创建一个Queue(队列),或者,更好的选择是——让RabbitMQ服务器为我们选择一个随机的Queue(队列)

其次,一旦我们的Consumer(消费者)断开了链接,对应的Queue(队列)应该被自动删除。

php-amqplib客户端中,当我们传给Queue(队列)名称参数一个空字符串时候,我们能创建一个非持久化队列(non-durable Queue),并反回了一个(自动)生成的队列名称:

list($queue_name,,) = $channel->queue_declare('');

当这个方法反回,$queue_name变量包含了一个由RabbitMQ生成的随机名称。例如,它看起来会像是这样子:amq.gen-JzTY20BRgKO-HjmUJj0wLg

当这个连接宣布关闭了,对应的Queue(队列)将会被删除,因为它被声明为独有的(exclusive)

绑定

《RabbitMQ官网教程翻译(PHP版本)_3》 Producer->Change->(bingding)->Queue

我们已经创建了一个fanout交换机以及一个Queue(队列)。现在我们需要告诉交换机去发送消息到我们的Queue(队列)。交换机与队列之间的关系成为绑定(binding)

$channel->queue_bind($queue_name, 'logs');

从现在起,logs交换机将会追加消息去我们的队列。

列出绑定

如你所想,通过以下方式你可以把正在使用的绑定(bindings)

rabbitmqctl list_bindings

将他们放在一起

《RabbitMQ官网教程翻译(PHP版本)_3》 通过交换机与队列绑定实现”发布/订阅“”模式

发出日志的这个Producer(生产者)程序与我们之前教程的不会相差太多。最重要的改变就是我们现在希望发送消息到我们的logs交换机而不是无名的那个。下面就是emit_log.php脚本的代码:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明交换机
$channel->exchange_declare('logs', 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "info: Hello World!";
$msg = new AMQPMessage($data);

// 发送消息到交换机上(而不是指定队列)
$channel->basic_publish($msg, 'logs');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();
?>

(emit_log.php源码)

这如你所看到的,在建立连接之后我们声明了一个交换机。这一步十分重要因为发送消息到不存在的交换机是被禁止的,

如果已经没有Queue(队列)绑定到交换机,交换机的消息将会被丢失,但对于我们来说可以接受;如果已经没有Consumer(消费者)监听该交换机的消息了,我们可以安全地删除这些消息。

reveive_logs.php的代码如下:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明对应的交换机
$channel->exchange_declare('logs', 'fanout', false, false, false);

// 创建一个非持久化的队列并获取自动生成的队列名称
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// 绑定队列到交换机
$channel->queue_bind($queue_name, 'logs');

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>

(reveive_logs.php源码)

如果你想保存这些日志到文件,只需要打开一个控制台,并输入:

php reveive_logs.php > logs_from_rabbit.log

如果你想在屏幕上看到这些日志,新建一个新的终端并运行:

php reveive_logs.php

当然,你还需要发送日志:

php emit_log.php

使用rabbitmqctl list_bindings你可以验证这些代码已经如我们所想地创建了绑定与Queue(队列)。如果是运行着两个receive_logs.php程序,你将会看到类似下面的情况:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

这结果的解析很直接了当:数据从logs交换机发送到了两个以服务器分配的名称命名的Queue(队列)上。这正式我们所期望的。

    原文作者:JobinLi
    原文地址: https://www.jianshu.com/p/d5e515901cdc
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞