【workbunny】RabbitMQ客户端

3.0.0 版本
2026-03-16 版本更新时间
3543 安装
33 star

Latest Stable Version Total Downloads License PHP Version Require

版本计划

🐰 已发布3.0.0-RC.1版本

说明

简介

适配Workerman/webmanAMQP组件包

  • 支持基于AMQP协议工具实现AMQP-Server
  • 支持5种消费模式:简单队列、workQueue、routing、pub/sub、exchange;
  • 支持延迟队列(rabbitMQ须安装插件);
  • 支持连接池,支持通道池,Builder支持影子模式(并发补偿);
  • 3.0与之前版本相比,更符合AMQO协议约定,更合理的架构设计和使用逻辑
    • 使用ConnectionManagement多连接管理器管理ConnectionClient),合理复用机制及并发使用能力
    • 使用Channel-Pool管理Channel,合理的复用和并发机制
    • 提供AMQP协议包,可供开发者自定义实现AMQP-ClientAMQP-Server,并提供AMQP-Frame协议帧工具

概念

 ┌───────────┐ | Builder A | ──┐ └───────────┘ | | ┌───────────┐ | | | Channel 1 | | | └───────────┘ ┌───────────┐ └─> ┌──────────────────┐ | ┌───────────┐ | Builder A | ────> | Connections Pool | ── connection ──> | | Channel 2 | └───────────┘ ┌─> └──────────────────┘ min ... MAX | └───────────┘ | <static> <context> | ┌───────────┐ | | | Channel 3 | ┌───────────┐ | | └───────────┘ | Builder C | ──┘ ... └───────────┘ channel-max 
  • Builder:队列消费者、生产者的抽象结构,类似ORMModel
    • BuilderConfig: 队列配置结构
    • Builder可以指定不同的connection配置进行连接,以区分业务/服务
    • Builderpublish/consume使用了影子模式(当前ConnectionChannel耗尽时,会自动从Connection Pool获取新的连接创建Channel
      • 影子模式下请尽量将Connection PoolChannels Pool的配置wait_timeout改小,避免过长时间的等待(等待中会出让控制权,不会阻塞)
  • Connection:基于AsyncTcpConnection封装的AMQP-client
    • ConnectionConnectionManagement管理,连接池为静态,不会因为Builder的释放而释放
    • Connection Pool中通过get拿取Connection后需要手动调用release归还,或者使用action通过传入回调函数来执行并自动归还
    • 配置信息:
    • min_connections: 最小连接数
    • max_connections: 最大连接数
    • idel_timeout: 空闲回收时间 [s]
    • wait_timeout: 等待连接超时时间 [s]
  • Channel:抽象的通道对象
    • 每一个Connection都具备一个Channel
    • 多协程时,自动创建新的Channel消费,并在协程结束后自动归还/释放
    • 单协程时,复用Channel消费
    • 配置信息:
    • idel_timeout: 空闲回收时间 [s]
    • wait_timeout: 等待连接超时时间 [s]
  • AMQP: workerman支持的协议封装

详细文档

使用

要求

  • php >= 8.1
  • webman-framework >= 2.0 或 workerman >= 5.1
  • rabbitmq-server >= 3.10

安装

composer require workbunny/webman-rabbitmq

配置

基础配置 app.php

<?php declare(strict_types=1); return [ 'enable' => true, // 日志 LoggerInterface | LoggerInterface::class 'logger' => null, ];

连接配置 connections.php

<?php declare(strict_types=1); use Workbunny\WebmanRabbitMQ\Clients\AbstractClient; use Workbunny\WebmanRabbitMQ\Connections\Connection; return [ 'default' => [ 'connection' => Connection::class, // 连接池 'connections_pool' => [ 'min_connections' => 1, 'max_connections' => 20, 'idle_timeout' => 60, 'wait_timeout' => 10 ], 'config' => [ 'host' => 'rabbitmq', 'vhost' => '/', 'port' => 5672, 'username' => 'guest', 'password' => 'guest', 'mechanism' => 'AMQPLAIN', 'timeout' => 10, // 重启间隔 'restart_interval' => 5, // 通道池 'channels_pool' => [ 'idle_timeout' => 60, 'wait_timeout' => 10 ], 'client_properties' => [ 'name' => 'workbunny/webman-rabbitmq', 'version' => \Composer\InstalledVersions::getVersion('workbunny/webman-rabbitmq') ], // 心跳回调 callable 'heartbeat_callback' => null, ] ] ];

命令行

  • 构建:php webman workbunny:rabbitmq-builder -h
  • 移除/关闭:php webman workbunny:rabbitmq-remove -h
  • 列表:php webman workbunny:rabbitmq-list -h

延迟队列

延迟队列需要为 rabbitMQ 安装 rabbitmq_delayed_message_exchange 插件

  1. 进入 rabbitMQ 的 plugins 目录下执行命令下载插件(以rabbitMQ 3.10.2举例):
    wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.2/rabbitmq_delayed_message_exchange-3.10.2.ez
  2. 执行安装命令
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  3. 生产
    publish(new TestBuilder(), 'abc', headers: [ 'x-delay' => 10000, # 延迟10秒 ]); # return bool

    注:向延迟队列发布普通消息会抛出一个 WebmanRabbitMQException 异常

注意

  • 不少第三方厂商不支持安装延迟队列插件
  • 当不支持安装延迟队列时,可以通过优先级队列 + REQUEUE实现
    • Builder支持通过REQUEUE标记进行消息重入队尾
    • 通过自定义header中的时间标记,和逻辑判断,当满足时间条件时则执行,不满足条件则通过REQUEUE将数据自动推回队尾
    • 为了减少数据延迟问题,使用优先级标识将时间较近的消息优先级定义高一些,而时间较长的数据优先级定义低一些
    • 队列通常支持0-9的优先级,合理分配时间段和优先级的匹配关系

      生产

注:向延迟队列发布普通消息会抛出一个 WebmanRabbitMQPublishException 异常

注:首先使用命令行工具或者手动创建对应的Builder,以下以Workbunny\Tests\TestBuilders\TestPublishBuilder举例

  • 快捷发送

    use function Workbunny\WebmanRabbitMQ\publish; use Workbunny\Tests\TestBuilders\TestPublishBuilder; publish(new TestPublishBuilder(), 'abc'); # return bool
  • Builder发送

    use Workbunny\Tests\TestBuilders\TestPublishBuilder; use Workbunny\WebmanRabbitMQ\BuilderConfig; use Workbunny\WebmanRabbitMQ\Connection\ConnectionInterface; $builder = new TestPublishBuilder(); $body = 'abc'; return $builder->action(function (ConnectionInterface $connection) use ($builder, $body) { $config = new BuilderConfig($builder->getBuilderConfig()()); $config->setBody($body); $builder->publish($connection, $config); });
  • 原生发送,需要自行指定exchange等参数

    use Workbunny\WebmanRabbitMQ\BuilderConfig; use Workbunny\WebmanRabbitMQ\Connection\ConnectionInterface; use Workbunny\WebmanRabbitMQ\ConnectionsManagement; $config = new \Workbunny\WebmanRabbitMQ\BuilderConfig(); $config->setExchange('your_exchange'); $config->setRoutingKey('your_routing_key'); $config->setQueue('your_queue'); $config->setBody('abc'); $config->setMandatory(true); $config->setImmediate(false); // 使用 your_connection 配置连接发送 return ConnectionsManagement::connection(function (ConnectionInterface $connection) use ($config) { $connection->channel()->publish( $config->getBody(), $config->getHeaders(), $config->getExchange(), $config->getRoutingKey(), $config->getMandatory(), $config->getImmediate() ); }, 'your_connection');

消费

注:首先使用命令行工具或者手动创建对应的Builder,以下以Workbunny\Tests\TestBuilders\TestConsumeBuilder举例

  • 快捷消费

    • 修改生成的Builder文件,将handler()方法逻辑添加消费逻辑
    • 启动构建好的Builder自定义进程即是启动消费
  • Builder消费

    use Workbunny\Tests\TestBuilders\TestConsumeBuilder; use Workbunny\WebmanRabbitMQ\Connection\ConnectionInterface; $builder = new TestConsumeBuilder(); $builder->action(function (ConnectionInterface $connection) use ($builder) { $builder->consume($connection, $builder->getBuilderConfig()); });

    注:需要保持该进程常驻

赞助商