dada-amater / nette-rabbitmq
v2.1.0
2018-07-09 08:12 UTC
Requires
- php: >=7.1
- bunny/bunny: ^0.2.4 || ^0.3 || ^0.4
- symfony/console: ~3.3 || ^4.0
Requires (Dev)
- mockery/mockery: ^1.1
- nette/neon: ^2.4
- nette/tester: ^2.0
README
Nette RabbitMQ
Nette extension for RabbitMQ (using composer package jakubkulhan/bunny)
Example setup
Downloading composer package
composer require gamee/nette-rabbitmq
Extension registration
config.neon:
extensions:
rabbitmq: Gamee\RabbitMQ\DI\RabbitMQExtension
Example configuration
services:
- TestConsumer
rabbitmq:
connections:
default:
user: guest
password: guest
host: localhost
port: 5672
queues:
testQueue:
connection: default
exchanges:
testExchange:
connection: default
type: fanout
queueBindings:
testQueue:
routingKey: testRoutingKey
producers:
testProducer:
exchange: testExchange
# queue: testQueue
contentType: application/json
deliveryMode: 2 # Producer::DELIVERY_MODE_PERSISTENT
consumers:
testConsumer:
queue: testQueue
callback: [@TestConsumer, consume]
qos:
prefetchSize: 0
prefetchCount: 5
Publishing messages
Note: Queue will be created automatically after publishing first message.
services.neon:
services:
- TestQueue(@Gamee\RabbitMQ\Client::getProducer(testProducer))
TestQueue.php:
<?php declare(strict_types=1); use Gamee\RabbitMQ\Producer\Producer; final class TestQueue { /** * @var Producer */ private $testProducer; public function __construct(Producer $testProducer) { $this->testProducer = $testProducer; } public function publish(string $message): void { $json = json_encode(['message' => $message]); $headers = []; $this->testProducer->publish($json, $headers); } }
Consuming messages
Your consumer callback has to return a confirmation that particular message has been acknowledges (or different states - unack, reject).
TestConsumer.php
<?php declare(strict_types=1); use Bunny\Message; use Gamee\RabbitMQ\Consumer\IConsumer; final class TestConsumer implements IConsumer { public function consume(Message $message): int { $messageData = json_decode($message->content); $headers = $message->headers; /** * @todo Some logic here... */ return IConsumer::MESSAGE_ACK; // Or ::MESSAGE_NACK || ::MESSAGE_REJECT } }
Running a consumer trough CLI
There are two consumer commands prepared. rabbitmq:consumer
wiil consume messages for specified amount of time (in seconds). Following command wiil be consuming messages for one hour:
php index.php rabbitmq:consumer testConsumer 3600
rabbitmq:staticConsumer
will consume particular amount of messages. Following example will consume just 20 messages:
php index.php rabbitmq:staticConsumer testConsumer 20