byjg / rabbitmq-client
A minimal rabbitmq client using the components of byjg/message-queue-client
Fund package maintenance!
byjg
Installs: 6 694
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 2
Forks: 0
Open Issues: 0
Requires
- php: >=8.1 <8.4
- ext-curl: *
- byjg/message-queue-client: ^5.0
- php-amqplib/php-amqplib: ^3.5
Requires (Dev)
- phpunit/phpunit: 5.7.*|7.4.*|^9.5
- vimeo/psalm: ^5.9
README
It creates a simple abstraction layer to publish and consume messages from the RabbitMQ Server using the component byjg/message-queue-client.
For details on how to use the Message Queue Client see the documentation
Usage
Publish
<?php // Register the connector and associate with a scheme ConnectorFactory::registerConnector(RabbitMQConnector::class); // Create a connector $connector = ConnectorFactory::create(new Uri("amqp://$user:$pass@$host:$port/$vhost")); // Create a queue $pipe = new Pipe("test"); $pipe->withDeadLetter(new Pipe("dlq_test")); // Create a message $message = new Message("Hello World"); // Publish the message into the queue $connector->publish(new Envelope($pipe, $message));
Consume
<?php // Register the connector and associate with a scheme ConnectorFactory::registerConnector(RabbitMQConnector::class); // Create a connector $connector = ConnectorFactory::create(new Uri("amqp://$user:$pass@$host:$port/$vhost")); // Create a queue $pipe = new Pipe("test"); $pipe->withDeadLetter(new Pipe("dlq_test")); // Connect to the queue and wait to consume the message $connector->consume( $pipe, // Queue name function (Envelope $envelope) { // Callback function to process the message echo "Process the message"; echo $envelope->getMessage()->getBody(); return Message::ACK; }, function (Envelope $envelope, $ex) { // Callback function to process the failed message echo "Process the failed message"; echo $ex->getMessage(); return Message::REQUEUE; } );
The consume method will wait for a message and call the callback function to process the message. If there is no message in the queue, the method will wait until a message arrives.
If you want to exit the consume method, just return Message::ACK | Message::EXIT
from the callback function.
Possible return values from the callback function:
Message::ACK
- Acknowledge the message and remove from the queueMessage::NACK
- Not acknowledge the message and remove from the queue. If the queue has a dead letter queue, the message will be sent to the dead letter queue.Message::REQUEUE
- Requeue the messageMessage::EXIT
- Exit the consume method
RabbitMQ Client (AMQP Protocol)
The RabbitMQ connector uses the php-amqplib library.
The standard behavior of the connector is to create an Exchange, a Queue and bind the queue to the exchange with a routing key (by default is the same as the queue name). All messages are published to the exchange and consumed from the queue.
As the queue and exchange is created by the Connector it is recommended you do not use to publish/consume from existing queues. If you use an existing Queue you might get the error:
PHP Fatal error: Uncaught PhpAmqpLib\Exception\AMQPProtocolChannelException: PRECONDITION_FAILED - Existing queue 'test' declared with other arguments in AMQPChannel.php:224
You can change the behavior of the connection by using the Pipe::withProperty()
and Message::withProperty()
methods.
Some of them are used by the RabbitMQConnector by setting some default values:
Pipe::withProperty(RabbitMQConnector::EXCHANGE)
- Set the exchange name. Default is the queue name.Pipe::withProperty(RabbitMQConnector::ROUTING_KEY)
- Set the routing key. Default is the queue name.Pipe::withProperty('x-message-ttl')
- Only affects dead letter queues. Set the time to live of the message in milliseconds. Default 3 days.Pipe::withProperty('x-expires')
- Only affects dead letter queues. Set the time to live of the queue in milliseconds. Default 3 days.Message::withProperty('content_type')
- Set the content type of the message. Default is text/plain.Message::withProperty('delivery_mode')
- Set the delivery mode of the message. Default is 2 (persistent).
Protocols:
Protocol | URI Example | Notes |
---|---|---|
AMQP | amqp://user:pass@host:port/vhost?arg1=...&args=... | Default port: 5672. |
AMQPS | amqps://user:pass@host:port/vhost?arg1=...&args=... | Default port: 5671. Required: capath |
Connection Parameters
The following parameters are available for both AMQP and AMQPS connections:
- heartbeat: Interval in seconds to send heartbeat frames to keep the connection alive during periods of inactivity. Default is 30 seconds.
- connection_timeout: Timeout in seconds when establishing a new connection. Default is 10 seconds.
- max_attempts: Maximum number of reconnection attempts before failing. Default is 10 attempts.
- pre_fetch: Controls how many messages the server will deliver before requiring acknowledgements. Default is 0 (no limit).
- timeout: Specifies the timeout in seconds for waiting for messages when consuming. Default is 600 seconds.
- single_run: When set to 'true', the consumer will exit after one batch of messages instead of continuously waiting. Default is 'false'.
AMQPS SSL Parameters
The following parameters are available for secure connections via AMQPS:
- capath: (Required) Path to the CA certificate directory. This parameter is mandatory for AMQPS connections.
- local_cert: Path to the client certificate file.
- local_pk: Path to the client private key file.
- verify_peer: Enable/disable peer verification (true/false).
- verify_peer_name: Enable/disable peer name verification (true/false).
- passphrase: The passphrase for the private key.
- ciphers: A list of ciphers to use for the encryption.
Robust Connection Setup
For applications that need to maintain long-lived connections to RabbitMQ, especially in environments with network challenges or when the connection remains idle for extended periods, use the following robust connection settings:
<?php // Configure the connection with robust settings $connectionUri = "amqp://$user:$pass@$host:$port/$vhost?heartbeat=30&connection_timeout=10&max_attempts=5&timeout=60"; $connector = ConnectorFactory::create(new Uri($connectionUri)); // Test the connection before proceeding $rabbitConnector = new RabbitMQConnector(); $rabbitConnector->setUp(new Uri($connectionUri)); if (!$rabbitConnector->testConnection()) { die("Failed to connect to RabbitMQ server.\n"); }
This configuration:
- Sends heartbeat every 30 seconds to keep the connection alive
- Sets a 10-second timeout when establishing the connection
- Will attempt to reconnect up to 5 times with exponential backoff
- Sets a 60-second timeout for message consumption operations
See the included example_robust_connection.php
file for a complete implementation.
Dependencies
flowchart TD byjg/rabbitmq-client --> byjg/message-queue-client byjg/rabbitmq-client --> php-amqplib/php-amqplibLoading