b2pweb/bdf-queue

Bdf queue component

v1.5.5 2024-12-18 14:49 UTC

README

This package provides 2 layers for abstraction of message broker.

  • A connection layer
  • A destination layer

build codecov Packagist Version Total Downloads Type Coverage

Supports

Usage Instructions

Produce messages

First, create a new destination manager instance.

<?php

use Bdf\Queue\Connection\Factory\ResolverConnectionDriverFactory;
use Bdf\Queue\Connection\Pheanstalk\PheanstalkConnection;
use Bdf\Queue\Destination\ConfigurationDestinationFactory;
use Bdf\Queue\Destination\DestinationManager;
use Bdf\Queue\Destination\DestinationFactory;
use Bdf\Queue\Serializer\JsonSerializer;

// Declare connections
$driverFactory = new ResolverConnectionDriverFactory([
    'foo' => [
        'driver' => 'pheanstalk',
        'host' => 'localhost',
        'port' => '11300',
        'additionalOption' => 'value',
    ]
    // OR use DSN 'foo' => 'pheanstalk://localhost:11300?additionalOption=value'
]);

// Declare drivers
$driverFactory->addDriverResolver('pheanstalk', function($config) {
    //echo $config['connection'] displays "foo"
    return new PheanstalkConnection($config['connection'], new JsonSerializer());
});

// Declare destination
// You can also declare your custom destination that defined type of transport (queue, multi queues, topic, ...),
// the connection to use, and the name of the queue(s) / topic(s) to use.
// This example will use the queue driver of the "foo" connection defined above. And send / consume message on the queue named "default".
$destinationFactory = new DestinationFactory(
    $driverFactory,
    ['my_destination' => 'queue://foo/default']
);

// To send a message to multiple destinations, you can use "aggregate" destination type.
// You can use a wildcard to send to all destinations that match the pattern.
// In this example, 'user' destination will be sent to the "foo" and "bar" queues, and to all topics that match the pattern "*.user"
$destinationFactory = new DestinationFactory(
    $driverFactory,
    [
        'foo' => 'queue://test/foo',
        'bar' => 'queue://test/bar',
        'a.user' => 'topic://a/user',
        'b.user' => 'topic://b/user',
        'user' => 'aggregate://foo,bar,*.user',
    ]
);

// Create the manager
$manager = new DestinationManager($driverFactory, $destinationFactory);

Push a basic message into the queue. The consume should defined handler to process the message.

<?php

use Bdf\Queue\Message\Message;

$message = Message::create('Hello world');
$message->setDestination('my_destination');
// or use a lower level setting the connection and queue.
$message = Message::create('Hello world', 'queue');
$message->setConnection('foo');

/** @var Bdf\Queue\Destination\DestinationManager $manager */
$manager->send($message);

Useful for monolithic application that needs to differ a process. Push a message job into the queue. The consumer will evaluate the job string and run the processor. In this use case the producer and the receiver share the same model.

<?php
$message = \Bdf\Queue\Message\Message::createFromJob(Mailer::class.'@send', ['body' => 'my content']);
$message->setDestination('my_destination');

/** @var Bdf\Queue\Destination\DestinationManager $manager */
$manager->send($message);

Available type for dsn destination

The class Bdf\Queue\Destination\DsnDestinationFactory provides default type of destination:

You can declare your own type:

<?php

use Bdf\Dsn\DsnRequest;
use Bdf\Queue\Connection\ConnectionDriverInterface;
use Bdf\Queue\Connection\Factory\ResolverConnectionDriverFactory;

/** @var ResolverConnectionDriverFactory $driverFactory */

$destinationFactory = new Bdf\Queue\Destination\DsnDestinationFactory($driverFactory);
$destinationFactory->register('my_own_type', function(ConnectionDriverInterface $connection, DsnRequest $dsn) {
    // ...
});

// use dsn as "my_own_type://connection/queue_or_topic_name?option="

Consume messages

The consumer layer provides many tools for message handling. The default stack of objects that will receive the message is:

consumer (ConsumerInterface) -> receivers (ReceiverInterface) -> processor (ProcessorInterface) -> handler (callable)

  • consumer has the strategy for reading the message from queue / topic. It also manage a graceful shutdown.
  • receivers is the stack of middlewares interacts with the envelope.
  • processor resolves the handler arguments. You can plug here your business logic and remove the handler layer. By default processor injects 2 arguments in handlers: the message data and the envelope.
  • handler manages the business logic. Handler allows an interface less mode.

An example to consume a simple message:

<?php

use Bdf\Queue\Consumer\Receiver\ProcessorReceiver;
use Bdf\Queue\Destination\DestinationManager;
use Bdf\Queue\Processor\CallbackProcessor;
use Bdf\Queue\Processor\MapProcessorResolver;

// Create your processor and declare in a map:
$myProcessor = new CallbackProcessor(function($data) {
    echo $data;
});
$processorResolver = new MapProcessorResolver(['foo' => $myProcessor]);

/** @var DestinationManager $manager */
$manager->create('queue://foo')->consumer(new ProcessorReceiver($processorResolver))->consume(0);

Consume a job message:

<?php

use Bdf\Instantiator\Instantiator;
use Bdf\Queue\Consumer\Receiver\ProcessorReceiver;
use Bdf\Queue\Destination\DestinationManager;
use Bdf\Queue\Processor\JobHintProcessorResolver;

/** @var Instantiator $instantiator */

// The job should be provided from message to get the processor
$processorResolver = new JobHintProcessorResolver($instantiator);

/** @var DestinationManager $manager */
$manager->create('queue://foo')->consumer(new ProcessorReceiver($processorResolver))->consume(0);

Create a handler

<?php

/** @var Bdf\Queue\Destination\DestinationManager $manager */

class MyHandler
{
    public function handle($data, \Bdf\Queue\Message\EnvelopeInterface $envelope)
    {
        echo $data; // Display 'foo'
        
        // Ack the message. Default behavior. The ack is sent before the call by the consumer.
        $envelope->acknowledge();
        
        // Reject the message. It will be no more available. The message is rejected if and exception is thrown.
        $envelope->reject();
        
        // Reject the message and send it back to the queue
        $envelope->reject(true);
    }
}

$message = \Bdf\Queue\Message\Message::createFromJob(MyHandler::class, 'foo', 'queue');
$manager->send($message);

Use the synthax "Class@method" to determine the callable (By default the method is "handle") or register your handlers on a specific destination with the receiver builder:

<?php

use Bdf\Queue\Consumer\Receiver\Builder\ReceiverBuilder;
use Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoader;
use Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoaderInterface;
use Psr\Container\ContainerInterface;

/** @var ContainerInterface $container */
/** @var Bdf\Queue\Destination\DestinationManager $manager */

$container->set(ReceiverLoaderInterface::class, function (ContainerInterface $container) {
    return new ReceiverLoader(
        $container,
        [
            'destination_name or connection_name' => function(ReceiverBuilder $builder) {
                /** @var \Bdf\Queue\Processor\ProcessorInterface $myProcessor */
                /** @var \Bdf\Queue\Consumer\ReceiverInterface $myReceiver */

                // Register your unique handler for the destination or connection. 
                // all message will be handled by this handler.
                $builder->handler(MyHandler::class);
                
                // Or register your unique processor
                $builder->processor($myProcessor);
                
                // Or register the job bearer resolver as processor. The procesor will resolve the job
                // from the Message::$job attribute value.
                $builder->jobProcessor();
                
                // Or register your own processor or handler by queue in case you consume a connection.
                // By default the key of the map is the queue name. You can provide your own key provider 
                // with the second parameter.
                $builder->mapProcessor([
                    'queue1' => $myProcessor,
                    'queue2' => MyHandler::class,
                ]);
                
                // Or register your final own receiver
                $builder->outlet($myReceiver);
                
                // Or register your own receiver in the stack
                $builder->add($myReceiver);
                
                // You can add more defined middlewares here
                // $builder->retry(2);
            }
        ]
    );
});

$receiver = $container->get(ReceiverLoaderInterface::class)->load('destination_name or connection_name')->build();

$manager->create('queue://foo')->consumer($receiver)->consume(0);

Run the consumer in console

$ example/consumer.php "connection name OR destination name"
Create receiver extensions

The consumer use a stack of receivers to extend the reception of messages. See the interface Bdf\Queue\Consumer\ReceiverInterface and the trait Bdf\Queue\Consumer\DelegateHelper.

<?php
class MyExtension implements \Bdf\Queue\Consumer\ReceiverInterface
{
    use \Bdf\Queue\Consumer\DelegateHelper;
    
    private $options;

    /**
     * MyExtension constructor.
     */
    public function __construct(\Bdf\Queue\Consumer\ReceiverInterface $delegate, array $options)
    {
        $this->delegate = $delegate;
        $this->options = $options;
    }
    
    /**
     * {@inheritdoc}
     */
    public function receive($message, \Bdf\Queue\Consumer\ConsumerInterface $consumer): void
    {
        // Do something when receiving message
        if ($message->queue() === 'foo') {
            return;        
        }

        // Call the next receiver
        $this->delegate->receive($message, $consumer);
    }
}

You can use the Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoader::add() to register your receiver in the stack

<?php
$options = ['foo' => 'bar'];

/** @var \Bdf\Queue\Consumer\Receiver\Builder\ReceiverBuilder $builder */
$builder->add(MyExtension::class, [$options]);

Customize the string payload

The class Bdf\Queue\Serializer\SerializerInterface manage the payload content sent to the message broker. By default metadata are added to the json as:

  • PHP Type: to help consumer to deserialize complex entities.
  • Message info: The attempt number for retry, The sending date, ...

A basic payload looks like:

{
  "name": "Foo",
  "data": "Hello World",
  "date": "2019-12-23T16:02:03+01:00"
}

You can customize the string with your own implementation of the serializer interface.

Try the hello world example (configure the message broker in example/config/connections.php):

$ example/producer.php foo '{"name":"Foo", "data":"Hello World"}' --raw
$ example/consumer.php foo

RPC client

<?php

use Bdf\Queue\Message\InteractEnvelopeInterface;
use Bdf\Queue\Message\Message;

class RpcReplyHandler
{
    public function doSomethingUseful(int $number, InteractEnvelopeInterface $envelope)
    {
        // Send bask: 1 x 2 to client
        $envelope->reply($number * 2);

        // Or retry in 10sec
        $envelope->retry(10);
    }
}

$message = Message::createFromJob(RpcReplyHandler::class.'@doSomethingUseful', 1, 'queue');
$message->setConnection('foo');

/** @var Bdf\Queue\Destination\DestinationManager $manager */
$promise = $manager->send($message);

// Consume the foo connection

// Receive data from the reply queue. If the header "replyTo" is not set, 
// the response will be sent to "queue_reply"
echo $promise->await(500)->data(); // Display 2

Additionnal options for connection

Note:

Additionnal options for message

Serialization

Benchmarks

simple job / closure job

Analysis

  • For the best execution time, regardless of size, use the default Serializer
  • For the smaller size, regardless of time, use BdfSerializer with CompressedSerializer
  • For the best compromise, use Serializer with CompressedSerializer
    • Always smaller than pure BdfSerializer (JSON or Binary)
    • Faster on unserialize, slightly slower on serialize
    • Around twice faster than compressed bdf, but only ~40% larger on simple job

License

Distributed under the terms of the MIT license.