RabbitMQ常用模式

1、普通模式 "Hello World!"

一名生产者一名消费者,一对一关系

普通模式关系图

  • 生产者

    <?php
    namespace app\index\controller;
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    class Send
    {
        protected $connection;
        protected $channel;
    
        public function __construct()
        {
            /* @param string $host 主机
             * @param string $port 端口
             * @param string $user 用户名
             * @param string $password 密码
             * @param string $vhost 虚拟主机
             * 创建一个连接*/$this->connection = new AMQPStreamConnection('localhost', 5672, '*****', '******', 'qone', false, 'AMQPLAIN', '登录成功', 'zh-CN');
            
            //建立连接通道
            $this->channel = $this->connection->channel();
    
        }
    
        public function hello()
        {
            //声明队列
            $this->channel->queue_declare('hello', false, false, false, false, false);
    
            //发送信息的参数设置
            $option = [];
    
            //实例化发送信息类
            $msg = new AMQPMessage('Hello World ' . time(), $option);
    
            //写入队列
            $this->channel->basic_publish($msg, '', 'hello');
    
            //关闭通道
            $this->channel->close();
    
            //关闭连接
            $this->connection->close();
    
        }
    
    }
  • 效果图,数据写入队列成功

    队列信息截图

GUI获取数据

  • 消费者
  <?php
  include(__DIR__ . '/../../../vendor/autoload.php');
  
  use PhpAmqpLib\Connection\AMQPStreamConnection;
  
  //创建一个连接
  $connection = new AMQPStreamConnection('localhost', 5672, '*****', '*****', 'qone');
  
  //建立连接通道
  $channel = $connection->channel();
  
  // 回调函数 队列内容处理逻辑
  $callback = function ($msg) {
      echo 'Received ', $msg->body, "\n";
      //确认后队列才会删除信息,basic_consume()第四参数为ture自动需要确认,建议手动确认,防止数据丢失
      //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
      $msg->ack();
  };
  
  //消费信息
  $channel->basic_consume('hello', '', false, false, false, false, $callback);
  
  //脚本执行完成或者 exit() 后关闭连接和通道
  function shutdown($channel, $connection)
  {
      $channel->close();
      $connection->close();
  }
  
  register_shutdown_function('shutdown', $channel, $connection);
  
  //通过callbacks数组判断是否还在活动
  while ($channel->is_consuming()) {
      $channel->wait();
  }
  • 效果图,读取成功,队列数据消失

    消费者结果图

队列信息截图

2、工作模式 Work queues

一个生产者对应多个消费者,工作队列又称任务队列

工作模式关系图

工作模式代码同普通模式一致,只是同时存在多个消费者

  • 结果图

    消费者结果图图

3、发布/订阅模式 Publish/Subscribe

消息可以被多个消费者同时获取,生产者将消息发送到交换机,消费者将自己对应的队列注册到交换机,当生产者发送消息后所有注册的队列的消费者都可以收到消息。场景:多个单位需要同时调整时,共同注册交换机。

发布/订阅模式关系图

  • 生产者
  <?php
  namespace app\index\controller;
  
  use PhpAmqpLib\Connection\AMQPStreamConnection;
  use PhpAmqpLib\Message\AMQPMessage;
  use PhpAmqpLib\Exchange\AMQPExchangeType;
  
  class Send
  {
      protected $connection;
      protected $channel;
  
      public function __construct()
      {
          /* @param string $host 主机
           * @param string $port 端口
           * @param string $user 用户名
           * @param string $password 密码
           * @param string $vhost 虚拟主机
           * 创建一个连接
           */
          $this->connection = new AMQPStreamConnection('localhost', 5672, '*****', '******', 'qone', false, 'AMQPLAIN', '登录成功', 'zh-CN');
          
          //建立连接通道
          $this->channel = $this->connection->channel();
  
      }
  
      public function fanout()
      {
          //发送信息的参数设置
          $option = [];
  
          //实例化发送信息类
          $msg = new AMQPMessage('Fanout ' . time(), $option);
  
          //声明一个交换机
          $this->channel->exchange_declare('fanout', AMQPExchangeType::FANOUT, false, false, true);
  
          //写入交换机
          $this->channel->basic_publish($msg, 'fanout');
  
          //关闭通道
          $this->channel->close();
  
          //关闭连接
          $this->connection->close();
      }
  
  }
  • 消费者 ,因为队列名称是随机字符串,消费者代码一致,一个脚本启动两次就可以
  <?php
  include(__DIR__ . '/../../../vendor/autoload.php');
  
  use PhpAmqpLib\Connection\AMQPStreamConnection;
  use PhpAmqpLib\Exchange\AMQPExchangeType;
  
  
  $exchange = 'fanout';
  
  //创建一个连接
  $connection = new AMQPStreamConnection('localhost', 5672, '*****', '*****', 'qone');
  
  //建立连接通道
  $channel = $connection->channel();
  
  // 回调函数 队列内容处理逻辑
  $callback = function ($msg) {
      echo 'Received ', $msg->body, "\n";
      //确认后队列才会删除信息,basic_consume()第四参数为ture自动需要确认,建议手动确认,防止数据丢失
      //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
      $msg->ack();
  };
  
  //声明队列 队列名称为空 会随机字符串
  list($queue_name, ,) = $channel->queue_declare('', false, false, false, true);
  
  //声明交换机
  $channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, true);
  
  //绑定
  $channel->queue_bind($queue_name, $exchange);
  
  //消费信息
  $channel->basic_consume($queue_name, '', false, false, false, false, $callback);
  
  //脚本执行完成或者 exit() 后关闭连接和通道
  function shutdown($channel, $connection)
  {
      $channel->close();
      $connection->close();
  }
  
  register_shutdown_function('shutdown', $channel, $connection);
  
  //通过callbacks数组判断是否还在活动
  while ($channel->is_consuming()) {
      $channel->wait();
  }
  • 结果图

    队列信息截图

消费者结果图

4、路由模式 Routing

​ 生产者将消息发送到了type为direct模式的交换机,消费者的队列在将自己绑定到路由的时候会给自己绑定一个key,只有消费者发送对应key格式的消息时候队列才会收到消息,类似于带筛选条件的订阅模式。

路由模式关系图

  • 生产者
  <?php
  namespace app\index\controller;
  
  use PhpAmqpLib\Connection\AMQPStreamConnection;
  use PhpAmqpLib\Message\AMQPMessage;
  use PhpAmqpLib\Exchange\AMQPExchangeType;
  
  class Send
  {
      protected $connection;
      protected $channel;
  
      public function __construct()
      {
          /* @param string $host 主机
           * @param string $port 端口
           * @param string $user 用户名
           * @param string $password 密码
           * @param string $vhost 虚拟主机
           * 创建一个连接
           */
          $this->connection = new AMQPStreamConnection('localhost', 5672, '*****', '******', 'qone', false, 'AMQPLAIN', '登录成功', 'zh-CN');
          
          //建立连接通道
          $this->channel = $this->connection->channel();
  
      }
  
      public function direct()
      {
  
          $array = ['k1', 'k2', 'k3'];
          $type = $array[array_rand($array)];
  
          //发送信息的参数设置
          $option = [];
  
          //实例化发送信息类
          $msg = new AMQPMessage('Direct ' . $type . time(), $option);
  
          //声明一个交换机
          $this->channel->exchange_declare('direct', AMQPExchangeType::DIRECT, false, false, true);
  
          //写入交换机
          $this->channel->basic_publish($msg, 'direct', $type);
  
          //关闭通道
          $this->channel->close();
  
          //关闭连接
          $this->connection->close();
      }
  
  }
  • 消费者
  <?php
  include(__DIR__ . '/../../../vendor/autoload.php');
  
  use PhpAmqpLib\Connection\AMQPStreamConnection;
  use PhpAmqpLib\Exchange\AMQPExchangeType;
  
  
  if (empty($argv['1'])) {
      exit('参数异常');
  }
  
  $exchange = 'direct';
  
  //创建一个连接
  $connection = new AMQPStreamConnection('localhost', 5672, '*****', '*****', 'qone');
  
  //建立连接通道
  $channel = $connection->channel();
  
  // 回调函数 队列内容处理逻辑
  $callback = function ($msg) {
      echo 'Received ', $msg->body, "\n";
      //确认后队列才会删除信息,basic_consume()第四参数为ture自动需要确认,建议手动确认,防止数据丢失
      //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
      $msg->ack();
  };
  
  //声明队列 队列名称为空 会随机字符串
  list($queue_name, ,) = $channel->queue_declare('', false, false, false, true);
  
  //声明交换机
  $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, false, true);
  
  //绑定
  $channel->queue_bind($queue_name, $exchange, $argv['1']);
  
  //消费信息
  $channel->basic_consume($queue_name, '', false, false, false, false, $callback);
  
  //脚本执行完成或者 exit() 后关闭连接和通道
  function shutdown($channel, $connection)
  {
      $channel->close();
      $connection->close();
  }
  
  register_shutdown_function('shutdown', $channel, $connection);
  
  //通过callbacks数组判断是否还在活动
  while ($channel->is_consuming()) {
      $channel->wait();
  }
  • 结果图

    消费者结果图

5、主题模式 Topics

类似于路由模式,key变得更宽泛,可以模糊匹配。
\#:可以替代零个或多个单词。
*:可以代替一个单词。

主题模式关系图

  • 生产者
  <?php
  namespace app\index\controller;
  
  use PhpAmqpLib\Connection\AMQPStreamConnection;
  use PhpAmqpLib\Message\AMQPMessage;
  use PhpAmqpLib\Exchange\AMQPExchangeType;
  
  class Send
  {
      protected $connection;
      protected $channel;
  
      public function __construct()
      {
          /* @param string $host 主机
           * @param string $port 端口
           * @param string $user 用户名
           * @param string $password 密码
           * @param string $vhost 虚拟主机
           * 创建一个连接
           */
          $this->connection = new AMQPStreamConnection('localhost', 5672, '*****', '******', 'qone', false, 'AMQPLAIN', '登录成功', 'zh-CN');
          
          //建立连接通道
          $this->channel = $this->connection->channel();
  
      }
  
      //主题模式
      public function topic()
      {
  
          $arr = ['mysql.select.k1', 'mysql.select.k2', 'mysql.update.k1'];
          $type = $arr[array_rand($arr)];
  
          //发送信息的参数设置
          $option = [];
  
          //实例化发送信息类
          $msg = new AMQPMessage('Topic ' . $type, $option);
  
          //声明一个交换机
          $this->channel->exchange_declare('topic', AMQPExchangeType::TOPIC, false, false, true);
  
          //写入交换机
          $this->channel->basic_publish($msg, 'topic', $type);
  
          //关闭通道
          $this->channel->close();
  
          //关闭连接
          $this->connection->close();
      }
  
  }
  • 消费者
  <?php
  include(__DIR__ . '/../../../vendor/autoload.php');
  
  use PhpAmqpLib\Connection\AMQPStreamConnection;
  use PhpAmqpLib\Exchange\AMQPExchangeType;
  
  
  if (empty($argv['1'])) {
      exit('参数异常');
  }
  
  $exchange = 'topic';
  
  //创建一个连接
  $connection = new AMQPStreamConnection('localhost', 5672, '*****', '*****', 'qone');
  
  //建立连接通道
  $channel = $connection->channel();
  
  // 回调函数 队列内容处理逻辑
  $callback = function ($msg) {
      echo 'Received ', $msg->body, "\n";
      //确认后队列才会删除信息,basic_consume()第四参数为ture自动需要确认,建议手动确认,防止数据丢失
      //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
      $msg->ack();
  };
  
  //声明队列 队列名称为空 会随机字符串
  list($queue_name, ,) = $channel->queue_declare('', false, false, false, true);
  
  //声明交换机
  $channel->exchange_declare($exchange, AMQPExchangeType::TOPIC, false, false, true);
  
  //绑定
  $channel->queue_bind($queue_name, $exchange, $argv['1']);
  
  //消费信息
  $channel->basic_consume($queue_name, '', false, false, false, false, $callback);
  
  //脚本执行完成或者 exit() 后关闭连接和通道
  function shutdown($channel, $connection)
  {
      $channel->close();
      $connection->close();
  }
  
  register_shutdown_function('shutdown', $channel, $connection);
  
  //通过callbacks数组判断是否还在活动
  while ($channel->is_consuming()) {
      $channel->wait();
  }
  • 结果图

    消费者结果图

6、RPC模式

RPC是远程过程调用(Remote Procedure Call)的缩写形式。

进程间通信(IPC)是在多任务操作系统或联网的计算机之间运行的程序和进程所用的通信技术。有两种类型的进程间通信。

LPC用在多任务操作系统中,使得同时运行的任务能互相会话。这些任务共享内存空间使任务同步和互相发送信息。

远程过程调用(RPC)RPC类似于LPC,只是在网上工作。

RPC流程图

RPC模式关系图

  • 客户端
  <?php
  include(__DIR__ . '/../../../vendor/autoload.php');
  
  use PhpAmqpLib\Connection\AMQPStreamConnection;
  use PhpAmqpLib\Message\AMQPMessage;
  
  class RpcClient
  {
      private $connection;
      private $channel;
      private $callback_queue;
      private $response;
      private $corr_id;
  
      public function __construct()
      {
          $this->connection = new AMQPStreamConnection('localhost', 5672, '*****', '*****', '*****', false, 'AMQPLAIN', '登录成功', 'zh-CN');
          $this->channel = $this->connection->channel();
          list($this->callback_queue, ,) = $this->channel->queue_declare("", false, false, true, false);
          $this->channel->basic_consume($this->callback_queue, '', false, true, false, false,
              array(
                  $this,
                  'onResponse'
              )
          );
  
      }
  
      public function onResponse($rep)
      {
          if ($rep->get('correlation_id') == $this->corr_id) {
              $this->response = $rep->body;
          }
      }
  
      public function call($n)
      {
          $this->response = null;
          $this->corr_id = uniqid();
  
          $msg = new AMQPMessage(
              (string)$n,
              array(
                  'correlation_id' => $this->corr_id,
                  'php' => $n,
                  'reply_to' => $this->callback_queue
              )
          );
          $this->channel->basic_publish($msg, '', 'rpc_queue');
          while (!$this->response) {
              $this->channel->wait();
          }
          return intval($this->response);
      }
  }
  if (empty($argv['1'])) {
      exit('参数异常');
  }
  $rpc = new RpcClient();
  $response = $rpc->call($argv['1']);
  echo 'Got ', $response, "\n";
  • 服务端,这里业务逻辑只是做了+1处理
  <?php
  include(__DIR__ . '/../../../vendor/autoload.php');
  
  use PhpAmqpLib\Connection\AMQPStreamConnection;
  use PhpAmqpLib\Message\AMQPMessage;
  
  $connection = new AMQPStreamConnection('localhost', 5672, '*****', '*****', '*****', false, 'AMQPLAIN', '登录成功', 'zh-CN');
  $channel = $connection->channel();
  
  $channel->queue_declare('rpc_queue', false, false, false, false);
  
  
  echo "Awaiting RPC requests\r\n";
  $callback = function ($req) {
      $n = intval($req->body);
      echo $n."\r\n";
  
      $msg = new AMQPMessage(
          (string) ($n+1),
          array('correlation_id' => $req->get('correlation_id'))
      );
  
      $req->delivery_info['channel']->basic_publish(
          $msg,
          '',
          $req->get('reply_to')
      );
      $req->delivery_info['channel']->basic_ack(
          $req->delivery_info['delivery_tag']
      );
  };
  
  $channel->basic_qos(null, 1, null);
  $channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
  
  while ($channel->is_consuming()) {
      $channel->wait();
  }
  
  $channel->close();
  $connection->close();
  
  • 结果图

    双端结果图

Last modification:August 20, 2021
如果觉得我的文章对你有用,请随意赞赏