Skip to content

roiwk/rabbitmq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

13 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

RabbitMQ

rabbitmq async(workerman) and sync PHP Client, Producers, Consumers

rabbitmq 是一个异步(workerman)和同步的PHP客户端,用于异步(workerman)和同步的生产者和消费者。

Dependencies 依赖

Version Tag Dependencies
^v1.0 php >=8.0 workerman >= 4.0
^v2.0 php >=8.1 workerman >= 5.0

Install 安装

composer require roiwk/rabbitmq

Usage 使用

All demo

Config Demo

// 配置格式 $config = [ 'host' => '127.0.0.1', 'port' => 5672, 'vhost' => '/', 'mechanism' => 'AMQPLAIN', 'user' => 'username', 'password' => 'password', 'timeout' => 10, 'heartbeat' => 60, 'heartbeat_callback' => function(){}, 'error_callback' => null, ];

Publisher Demo

// 同步发布者 sync Publisher Roiwk\Rabbitmq\Producer::connect($config)->publishSync('Hello World!', '', '', 'hello'); // 异步发布者 async Publisher(workerman) use Workerman\Worker; $worker = new Worker(); $worker->onWorkerStart = function() use($config) { Roiwk\Rabbitmq\Producer::connect($config)->publishAsync('Hello World!', '', '', 'hello'); }; Worker::runAll();

Consumer Demo

// 同步消费者 sync Consumer use Bunny\Channel; use Bunny\Message; use Bunny\AbstractClient; use Roiwk\Rabbitmq\AbstractConsumer; // style 1: $client = new Roiwk\Rabbitmq\Client($config, null, '', '', 'hello'); $client->syncProcess(function(Message $message, Channel $channel, AbstractClient $client){ echo " [x] Received ", $message->content, "\n"; $channel->ack(); }); // style 2: $consumer = new class ($config) extends AbstractConsumer { protected bool $async = false; protected string $queue = 'hello'; protected array $consume = [ 'noAck' => true, ]; public function consume(Message $message, Channel $channel, AbstractClient $client) { echo " [x] Received ", $message->content, "\n"; } }; $consumer->onWorkerStart(null);
// 异步消费者 async Consumer(workerman) use Bunny\Channel; use Bunny\Message; use Workerman\Worker; use Bunny\AbstractClient; use Roiwk\Rabbitmq\AbstractConsumer; $worker = new Worker(); $consumer = new class ($config) extends AbstractConsumer { protected bool $async = true; protected string $queue = 'hello'; protected array $consume = [ 'noAck' => true, ]; public function consume(Message $message, Channel $channel, AbstractClient $client) { echo " [x] Received ", $message->content, "\n"; } }; $worker->onWorkerStart = [$consumer, 'onWorkerStart']; Worker::runAll();

Advanced 高级用法

webman中自定义进程--消费者

1.process.php

'hello-rabbitmq' => [ 'handler' => app\queue\rabbitmq\Hello::class, 'count' => 1, 'constructor' => [ 'rabbitmqConfig' => $config, //'logger' => Log::channel('hello'), ], ]

2.app\queue\rabbitmq\Hello.php

namespace app\queue\rabbitmq; use Roiwk\Rabbitmq\AbstractConsumer; use Roiwk\Rabbitmq\Producer; use Bunny\Channel; use Bunny\Message; use Bunny\AbstractClient; class Hello extends AbstractConsumer { protected bool $async = true; protected string $queue = 'hello'; protected array $consume = [ 'noAck' => true, ]; public function consume(Message $message, Channel $channel, AbstractClient $client) { echo " [x] Received ", $message->content, "\n"; } }

webman中自定义进程--分组消费者

类似webman-queue插件, 分组将消费者放同一个文件夹下, 使用同一个worker, 多个进程数处理
1.process.php

'hello-rabbitmq' => [ 'handler' => Roiwk\Rabbitmq\GroupConsumers::class, 'count' => 2, 'constructor' => [ 'consumer_dir' => app_path().'/queue/rabbimq', 'rabbitmqConfig' => $config, //'logger' => Log::channel('hello'), ], ]

2.在 app_path().'/queue/rabbimq' 目录下创建php文件, 继承Roiwk\Rabbitmq\AbstractConsumer即可, 同上app\queue\rabbitmq\Hello.php

Tips

  1. !!! 此库异步仅支持在workamn环境中, 同步环境都支持. 别的环境,如需异步, 请使用bunny的客户端

About

rabbitmq async(workerman) and sync PHP Client, Producers, Consumers rabbitmq 是一个异步(workerman)和同步的PHP客户端,用于异步(workerman)和同步的生产者和消费者。

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages