ryaremenko / amqp
AMQP wrapper to publish and consume messages
Installs: 3 350
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 1
Forks: 2
Open Issues: 0
Requires
- php: >=7.3
- ext-json: *
- php-amqplib/php-amqplib: ^2.9|^3.1
Requires (Dev)
- mockery/mockery: ^1.4@dev
- phpunit/phpunit: 9.5.x-dev
README
AMQP wrapper to publish and consume messages especially from RabbitMQ
Installation
Composer
Add the following to your require part within the composer.json:
$ php composer require ryaremenko/amqp
Integration
Lumen
Create a connection class
Adjust the properties to your needs.
class BaseConnection extends AmqpConnection { protected function setConnectionOptions(): AmqpConnectionOptions { return (new AmqpConnectionOptions()) ->setHost('127.0.0.1') ->setPort(5672) ->setLogin('guest') ->setPassword('guest'); } }
Register connection class as singleton:
/* |-------------------------------------------------------------------------- | Laravel example |-------------------------------------------------------------------------- */ //... $this->app->singleton(BaseConnection::class); $this->app->bind(AMQPConnectionInterface::class, BaseConnection::class); //...
Publishing a message
(new AmqpProducer)->publish(['data'], 'queue_name');
Consuming messages
class AMQPHandlersService { private const HANDLERS = [ 'queue_name' => TestHandler::class ]; private const PRIORITY_HANDLERS = [ 'queue_name' ]; private $amqpConsumer; public function __construct(AmqpConsumer $amqpConsumer) { $this->amqpConsumer = $amqpConsumer; } public function handle(string $queueName) { $properties = []; if (in_array($queueName, self::PRIORITY_HANDLERS, true)){ $properties['priority'] = true; } $handler = app(self::HANDLERS[$queueName]); $this->amqpConsumer->consume($queueName, function ($message) use ($handler) { try { $handler->handle($message->body); } catch (\Exception $exception) { // exception handler } }, $properties); } }