thomasvargiu / rabbitmq-module
Integrates php-amqplib with Zend Framework 2 and RabbitMq
Installs: 114 746
Dependents: 1
Suggesters: 0
Security: 0
Stars: 15
Watchers: 3
Forks: 9
Open Issues: 2
Requires
- php: ^7.4 || ^8.0
- laminas/laminas-serializer: ^2.10.0
- laminas/laminas-servicemanager: ^3.11
- laminas/laminas-stdlib: ^3.3.0
- php-amqplib/php-amqplib: ^3.0.0
- psr/container: ^1.0 || ^2.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.11
- laminas/laminas-cli: ^1.0
- phpspec/prophecy: ^1.15.0
- phpspec/prophecy-phpunit: ^2.0.1
- phpunit/phpunit: ^9.5.24
- vimeo/psalm: ^4.27
This package is auto-updated.
Last update: 2024-11-13 14:48:12 UTC
README
Integrates php-amqplib with Laminas/Mezzio Framework and RabbitMq.
Inspired from RabbitMqBundle for Symfony 2
Usage
Connections
You can configure multiple connections in configuration:
return [ 'rabbitmq' => [ 'connection' => [ // connection name 'default' => [ // default values 'type' => 'stream', // Available: stream, socket, ssl, lazy 'host' => 'localhost', 'port' => 5672, 'username' => 'guest', 'password' => 'guest', 'vhost' => '/', 'insist' => false, 'read_write_timeout' => 2, 'keep_alive' => false, 'connection_timeout' => 3, 'heartbeat' => 0 ] ] ] ]
Option classes
You can find all available options here:
Retrieve the service
You can retrieve the connection from service locator:
// Getting the 'default' connection /** @var \Laminas\ServiceManager\ServiceLocatorInterface $serviceLocator **/ $connection = $serviceLocator->get('rabbitmq.connection.default');
Producers
You can configure multiple producers in configuration:
return [ 'rabbitmq' => [ 'producer' => [ 'producer_name' => [ 'connection' => 'default', // the connection name 'exchange' => [ 'type' => 'direct', 'name' => 'exchange-name', 'durable' => true, // (default) 'auto_delete' => false, // (default) 'internal' => false, // (default) 'declare' => true, // (default) 'arguments' => [], // (default) 'ticket' => 0, // (default) 'exchange_binds' => [] // (default) ], 'queue' => [ // optional queue 'name' => 'queue-name', // can be an empty string, 'passive' => false, // (default) 'durable' => true, // (default) 'auto_delete' => false, // (default) 'exclusive' => false, // (default) 'arguments' => [], // (default) 'ticket' => 0, // (default) 'routing_keys' => [] // (default) ], 'auto_setup_fabric_enabled' => true // auto-setup exchanges and queues ] ] ] ]
Option classes
You can find all available options here:
Retrieve the service
You can retrieve the producer from service locator:
// Getting a producer /** @var \Laminas\ServiceManager\ServiceLocatorInterface $serviceLocator **/ /** @var \RabbitMqModule\ProducerInterface $producer **/ $producer = $serviceLocator->get('rabbitmq.producer.producer_name'); // Sending a message $producer->publish(json_encode(['foo' => 'bar']));
Consumers
You can configure multiple consumers in configuration:
return [ 'rabbitmq' => [ 'consumer' => [ 'consumer_name' => [ 'description' => 'Consumer description', 'connection' => 'default', // the connection name 'exchange' => [ 'type' => 'direct', 'name' => 'exchange-name' ], 'queue' => [ 'name' => 'queue-name', // can be an empty string, 'routing_keys' => [ // optional routing keys ] ], 'auto_setup_fabric_enabled' => true, // auto-setup exchanges and queues 'qos' => [ // optional QOS options for RabbitMQ 'prefetch_size' => 0, 'prefetch_count' => 1, 'global' => false ], 'callback' => 'my-service-name', ] ] ] ]
Option classes
You can find all available options here:
Callback
The callback
key must contain one of the following:
- A
callable
: a closure or an invokable object that receive anPhpAmqpLib\Message\AMQPMessage
object. - An instance of
RabbitMqModule\\ConsumerInterface
. - A string service name in service locator (can be anything
callable
or an instance ofRabbitMqModule\\ConsumerInterface
.
Take a look on RabbitMqModule\\ConsumerInterface
class constants for available return values.
If your callback return false
than the message will be rejected and requeued.
If your callback return anything else different from false
and one of ConsumerInterface
constants, the default response is like MSG_ACK
constant.
Retrieve the service
You can retrieve the consumer from service locator:
// Getting a consumer /** @var \Laminas\ServiceManager\ServiceLocatorInterface $serviceLocator **/ /** @var \RabbitMqModule\Consumer $consumer **/ $consumer = $serviceLocator->get('rabbitmq.consumer.consumer_name'); // Start consumer $consumer->consume();
There is a console command available to list and start consumers. See below.
Consumer Example
use PhpAmqpLib\Message\AMQPMessage; use RabbitMqModule\ConsumerInterface; class FetchProposalsConsumer implements ConsumerInterface { /** * @param AMQPMessage $message * * @return int */ public function execute(AMQPMessage $message) { $data = json_decode($message->body, true); try { // do something... } catch (\PDOException $e) { return ConsumerInterface::MSG_REJECT_REQUEUE; } catch (\Exception $e) { return ConsumerInterface::MSG_REJECT; } return ConsumerInterface::MSG_ACK; } }
Exchange2exchange binding
You can configure exchange2exchange binding in producers or consumers. Example:
return [ 'rabbitmq' => [ 'consumer' => [ 'consumer_name' => [ // ... 'exchange' => [ 'type' => 'fanout', 'name' => 'exchange_to_bind_to', 'exchange_binds' => [ [ 'exchange' => [ 'type' => 'fanout', 'name' => 'main_exchange' ], 'routing_keys' => [ '#' ] ] ] ], ] ] ] ]
Console usage
There are some console commands available:
rabbitmq:fabric:setup
: Setup fabric for each service, declaring exchanges and queuesrabbitmq:consumers:list
: List available consumersrabbitmq:consumers:start <name> [--without-signals|-w]
: Start a consumer by namerabbitmq:rpc-server:start <name> [--without-signals|-w]
: Start a rpc server by namerabbitmq:producer:publish <name> [--route=] <msg>
: Send a message with a producer
Example :
vendor/bin/laminas rabbitmq:producer:publish my_producer "Hello world!"