rabbitmq async(workerman) and sync PHP Client, Producers, Consumers
rabbitmq 是一个异步(workerman)和同步的PHP客户端,用于异步(workerman)和同步的生产者和消费者。
| Version Tag | Dependencies |
|---|---|
| ^v1.0 | php >=8.0 workerman >= 4.0 |
| ^v2.0 | php >=8.1 workerman >= 5.0 |
composer require roiwk/rabbitmq// 配置格式 $config = [ 'host' => '127.0.0.1', 'port' => 5672, 'vhost' => '/', 'mechanism' => 'AMQPLAIN', 'user' => 'username', 'password' => 'password', 'timeout' => 10, 'heartbeat' => 60, 'heartbeat_callback' => function(){}, 'error_callback' => null, ];// 同步发布者 sync Publisher Roiwk\Rabbitmq\Producer::connect($config)->publishSync('Hello World!', '', '', 'hello'); // 异步发布者 async Publisher(workerman) use Workerman\Worker; $worker = new Worker(); $worker->onWorkerStart = function() use($config) { Roiwk\Rabbitmq\Producer::connect($config)->publishAsync('Hello World!', '', '', 'hello'); }; Worker::runAll();// 同步消费者 sync Consumer use Bunny\Channel; use Bunny\Message; use Bunny\AbstractClient; use Roiwk\Rabbitmq\AbstractConsumer; // style 1: $client = new Roiwk\Rabbitmq\Client($config, null, '', '', 'hello'); $client->syncProcess(function(Message $message, Channel $channel, AbstractClient $client){ echo " [x] Received ", $message->content, "\n"; $channel->ack(); }); // style 2: $consumer = new class ($config) extends AbstractConsumer { protected bool $async = false; protected string $queue = 'hello'; protected array $consume = [ 'noAck' => true, ]; public function consume(Message $message, Channel $channel, AbstractClient $client) { echo " [x] Received ", $message->content, "\n"; } }; $consumer->onWorkerStart(null);// 异步消费者 async Consumer(workerman) use Bunny\Channel; use Bunny\Message; use Workerman\Worker; use Bunny\AbstractClient; use Roiwk\Rabbitmq\AbstractConsumer; $worker = new Worker(); $consumer = new class ($config) extends AbstractConsumer { protected bool $async = true; protected string $queue = 'hello'; protected array $consume = [ 'noAck' => true, ]; public function consume(Message $message, Channel $channel, AbstractClient $client) { echo " [x] Received ", $message->content, "\n"; } }; $worker->onWorkerStart = [$consumer, 'onWorkerStart']; Worker::runAll();1.process.php
'hello-rabbitmq' => [ 'handler' => app\queue\rabbitmq\Hello::class, 'count' => 1, 'constructor' => [ 'rabbitmqConfig' => $config, //'logger' => Log::channel('hello'), ], ]2.app\queue\rabbitmq\Hello.php
namespace app\queue\rabbitmq; use Roiwk\Rabbitmq\AbstractConsumer; use Roiwk\Rabbitmq\Producer; use Bunny\Channel; use Bunny\Message; use Bunny\AbstractClient; class Hello extends AbstractConsumer { protected bool $async = true; protected string $queue = 'hello'; protected array $consume = [ 'noAck' => true, ]; public function consume(Message $message, Channel $channel, AbstractClient $client) { echo " [x] Received ", $message->content, "\n"; } }类似webman-queue插件, 分组将消费者放同一个文件夹下, 使用同一个worker, 多个进程数处理
1.process.php
'hello-rabbitmq' => [ 'handler' => Roiwk\Rabbitmq\GroupConsumers::class, 'count' => 2, 'constructor' => [ 'consumer_dir' => app_path().'/queue/rabbimq', 'rabbitmqConfig' => $config, //'logger' => Log::channel('hello'), ], ]2.在 app_path().'/queue/rabbimq' 目录下创建php文件, 继承Roiwk\Rabbitmq\AbstractConsumer即可, 同上app\queue\rabbitmq\Hello.php
- !!! 此库异步仅支持在workamn环境中, 同步环境都支持. 别的环境,如需异步, 请使用bunny的客户端