solcloud/consumer

RabbitMQ base worker for queue consuming

v3.1 2022-11-22 12:42 UTC

This package is auto-updated.

Last update: 2024-04-30 00:42:38 UTC


README

Consumer needs AMQPChannel channel as only dependency.

Channel setup

If you have channel instance in your project already than you can skip this, otherwise lets setup rabbitmq connection, we recommend to use container for this.

$config = new \Solcloud\Consumer\QueueConfig();
$config
    ->setHost('solcloud_rabbitmq')
    ->setVhost('/')
    #->setHeartbeatSec(5)
    ->setUsername('dev')
    ->setPassword('dev')
;
$connectionFactory = new \Solcloud\Consumer\QueueConnectionFactory($config);
$connection = $connectionFactory->createSocketConnection();
#(new \PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender($connection))->register(); // if heartbeat and pcntl_async_signals() is available

Create channel from connection or use your own channel

/** @var \PhpAmqpLib\Channel\AMQPChannel $channel */
$channel = $connection->channel();

Worker

Create worker (consumer) class for your business logic and inject $channel dependency. You can extend AbstractConsumer for lightweight abstraction or use "solcloud standard" BaseConsumer. We will use BaseConsumer in this example

$worker = new class($channel) extends \Solcloud\Consumer\BaseConsumer {

    protected function run(): void
    {
        // Your hard work here
        echo "Processing message: " . $this->data->id . PHP_EOL;
    }

};

Start consuming message from queue using blocking method wait

$worker->consume($consumeQueueName);
while ($worker->hasCallback()) {
    try {
        // While we have callback lets enter event loop with some timeout
        $worker->wait(rand(8, 11));
    } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $ex) {
        echo $ex->getMessage() . PHP_EOL;
    }
}
$worker->closeChannel();

Message publishing

For message publish you can use $worker directly or use rabbitmq management plugin or different scripts

$worker->publishMessage(
    $worker->createMessageHelper([], ["id" => 1]),
    '',
    $consumeQueueName
); // OR open rabbitmq management and publish: {"meta":[],"data":{"id":1}}

Logging

Worker can log to Psr\Log\LoggerInterface compatible logger.

$worker->setLogger(new YourPsrLogger());
$worker->getLogger()->info('Something');

Examples

For complete example for this readme see example.php