Supporting Enqueue

Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and our customers. If you’d like to join them, please consider:


AMQP transport

Implements AMQP specifications and implements amqp interop interfaces. Build on top of php amqp extension.

Drawbacks:

Parts:

Installation

Warning: You need amqp extension of at least 1.9.3. Here’s how you can compile the extension from the source code.

$ composer require enqueue/amqp-ext 

Create context

<?php use Enqueue\AmqpExt\AmqpConnectionFactory; // connects to localhost $connectionFactory = new AmqpConnectionFactory(); // same as above $factory = new AmqpConnectionFactory('amqp:'); // same as above $factory = new AmqpConnectionFactory([]); // connect to AMQP broker at example.com $factory = new AmqpConnectionFactory([ 'host' => 'example.com', 'port' => 1000, 'vhost' => '/', 'user' => 'user', 'pass' => 'pass', 'persisted' => false, ]); // same as above but given as DSN string $factory = new AmqpConnectionFactory('amqp://user:pass@example.com:10000/%2f'); // SSL or secure connection $factory = new AmqpConnectionFactory([ 'dsn' => 'amqps:', 'ssl_cacert' => '/path/to/cacert.pem', 'ssl_cert' => '/path/to/cert.pem', 'ssl_key' => '/path/to/key.pem', ]); $context = $factory->createContext(); // if you have enqueue/enqueue library installed you can use a factory to build context from DSN $context = (new \Enqueue\ConnectionFactoryFactory())->create('amqp:')->createContext(); $context = (new \Enqueue\ConnectionFactoryFactory())->create('amqp+ext:')->createContext(); 

Declare topic.

Declare topic operation creates a topic on a broker side.

<?php use Interop\Amqp\AmqpTopic; /** @var \Enqueue\AmqpExt\AmqpContext $context */ $fooTopic = $context->createTopic('foo'); $fooTopic->setType(AmqpTopic::TYPE_FANOUT); $context->declareTopic($fooTopic); // to remove topic use delete topic method //$context->deleteTopic($fooTopic); 

Declare queue.

Declare queue operation creates a queue on a broker side.

<?php use Interop\Amqp\AmqpQueue; /** @var \Enqueue\AmqpExt\AmqpContext $context */ $fooQueue = $context->createQueue('foo'); $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE); $context->declareQueue($fooQueue); // to remove queue use delete queue method //$context->deleteQueue($fooQueue); 

Bind queue to topic

Connects a queue to the topic. So messages from that topic comes to the queue and could be processed.

<?php use Interop\Amqp\Impl\AmqpBind; /** @var \Enqueue\AmqpExt\AmqpContext $context */ /** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */ /** @var \Interop\Amqp\Impl\AmqpTopic $fooTopic */ $context->bind(new AmqpBind($fooTopic, $fooQueue)); 

Send message to topic

<?php /** @var \Enqueue\AmqpExt\AmqpContext $context */ /** @var \Interop\Amqp\Impl\AmqpTopic $fooTopic */ $message = $context->createMessage('Hello world!'); $context->createProducer()->send($fooTopic, $message); 

Send message to queue

<?php /** @var \Enqueue\AmqpExt\AmqpContext $context */ /** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */ $message = $context->createMessage('Hello world!'); $context->createProducer()->send($fooQueue, $message); 

Send priority message

<?php /** @var \Enqueue\AmqpExt\AmqpContext $context */ $fooQueue = $context->createQueue('foo'); $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE); $fooQueue->setArguments(['x-max-priority' => 10]); $context->declareQueue($fooQueue); $message = $context->createMessage('Hello world!'); $context->createProducer() ->setPriority(5) // the higher priority the sooner a message gets to a consumer // ->send($fooQueue, $message) ; 

Send expiration message

<?php /** @var \Enqueue\AmqpExt\AmqpContext $context */ /** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */ $message = $context->createMessage('Hello world!'); $context->createProducer() ->setTimeToLive(60000) // 60 sec // ->send($fooQueue, $message) ; 

Send delayed message

AMQP specification says nothing about message delaying hence the producer throws DeliveryDelayNotSupportedException. Though the producer (and the context) accepts a delivery delay strategy and if it is set it uses it to send delayed message. The enqueue/amqp-tools package provides two RabbitMQ delay strategies, to use them you have to install that package

<?php use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; /** @var \Enqueue\AmqpExt\AmqpContext $context */ /** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */ // make sure you run "composer require enqueue/amqp-tools". $message = $context->createMessage('Hello world!'); $context->createProducer() ->setDelayStrategy(new RabbitMqDlxDelayStrategy()) ->setDeliveryDelay(5000) // 5 sec ->send($fooQueue, $message) ; 

Consume message:

<?php /** @var \Enqueue\AmqpExt\AmqpContext $context */ /** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */ $consumer = $context->createConsumer($fooQueue); $message = $consumer->receive(); // process a message $consumer->acknowledge($message); // $consumer->reject($message); 

Subscription consumer

<?php use Interop\Queue\Message; use Interop\Queue\Consumer; /** @var \Enqueue\AmqpExt\AmqpContext $context */ /** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */ /** @var \Interop\Amqp\Impl\AmqpQueue $barQueue */ $fooConsumer = $context->createConsumer($fooQueue); $barConsumer = $context->createConsumer($barQueue); $subscriptionConsumer = $context->createSubscriptionConsumer(); $subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) { // process message $consumer->acknowledge($message); return true; }); $subscriptionConsumer->subscribe($barConsumer, function(Message $message, Consumer $consumer) { // process message $consumer->acknowledge($message); return true; }); $subscriptionConsumer->consume(2000); // 2 sec 

Purge queue messages:

<?php /** @var \Enqueue\AmqpExt\AmqpContext $context */ /** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */ $queue = $context->createQueue('aQueue'); $context->purgeQueue($queue); 

back to index