b2pweb / bdf-queue
Bdf queue component
Installs: 5 675
Dependents: 3
Suggesters: 0
Security: 0
Stars: 3
Watchers: 2
Forks: 0
Open Issues: 1
Requires
- php: ~7.4 || ~8.0.0 || ~8.1.0 || ~8.2.0 || ~8.3.0
- ext-pcntl: *
- b2pweb/bdf-dsn: ~1.0
- b2pweb/bdf-instantiator: ~1.0
- symfony/console: ~5.4|~6.0|~7.0
- symfony/messenger: ~5.4|~6.0|~7.0
Requires (Dev)
- b2pweb/bdf-serializer: ~1.0
- doctrine/dbal: ~2.5|~3.0
- enqueue/enqueue: ~0.9
- enqueue/fs: ~0.9
- friendsofphp/php-cs-fixer: ^3.0
- jeremeamia/superclosure: ~2.1
- league/container: ~3.0
- monolog/monolog: ~2.0
- pda/pheanstalk: ^3.1@dev
- php-amqplib/php-amqplib: ~3.0
- phpbench/phpbench: ~0.0|~1.0
- phpunit/phpunit: ~9.6
- predis/predis: ~1.1.0
- ramsey/uuid: ~3.0|~4.0
- symfony/error-handler: ~5.4|~6.0|~7.0
- symfony/phpunit-bridge: ~5.4|~6.0|~7.0
- symfony/var-dumper: ~5.4|~6.0|~7.0
- vimeo/psalm: ~5.22
Suggests
- ext-redis: Required to use the Redis queue driver
- aws/aws-sdk-php: Required to use the SQS queue driver (~3.0).
- enqueue/enqueue: Required to use the Enqueue driver (~0.9)
- ext/rdkafka: Required to use the Kafka queue driver (~4.0|~5.0|~6.0)
- iron-io/iron_mq: Required to use the iron queue driver (~2.0).
- kwn/php-rdkafka-stubs: Required to use the Kafka queue driver (~2.0)
- pda/pheanstalk: Required to use the beanstalk queue driver (~3.0).
- php-amqplib/php-amqplib: Required to use the RabbitMQ queue driver (~2.6|~3.0)
- symfony/var-dumper: VarDumper could be used for displaying failed message (~5.4|~6.0|~7.0)
README
This package provides 2 layers for abstraction of message broker.
- A connection layer
- A destination layer
Supports
Usage Instructions
Produce messages
First, create a new destination manager instance.
<?php use Bdf\Queue\Connection\Factory\ResolverConnectionDriverFactory; use Bdf\Queue\Connection\Pheanstalk\PheanstalkConnection; use Bdf\Queue\Destination\ConfigurationDestinationFactory; use Bdf\Queue\Destination\DestinationManager; use Bdf\Queue\Destination\DestinationFactory; use Bdf\Queue\Serializer\JsonSerializer; // Declare connections $driverFactory = new ResolverConnectionDriverFactory([ 'foo' => [ 'driver' => 'pheanstalk', 'host' => 'localhost', 'port' => '11300', 'additionalOption' => 'value', ] // OR use DSN 'foo' => 'pheanstalk://localhost:11300?additionalOption=value' ]); // Declare drivers $driverFactory->addDriverResolver('pheanstalk', function($config) { //echo $config['connection'] displays "foo" return new PheanstalkConnection($config['connection'], new JsonSerializer()); }); // Declare destination // You can also declare your custom destination that defined type of transport (queue, multi queues, topic, ...), // the connection to use, and the name of the queue(s) / topic(s) to use. // This example will use the queue driver of the "foo" connection defined above. And send / consume message on the queue named "default". $destinationFactory = new DestinationFactory( $driverFactory, ['my_destination' => 'queue://foo/default'] ); // To send a message to multiple destinations, you can use "aggregate" destination type. // You can use a wildcard to send to all destinations that match the pattern. // In this example, 'user' destination will be sent to the "foo" and "bar" queues, and to all topics that match the pattern "*.user" $destinationFactory = new DestinationFactory( $driverFactory, [ 'foo' => 'queue://test/foo', 'bar' => 'queue://test/bar', 'a.user' => 'topic://a/user', 'b.user' => 'topic://b/user', 'user' => 'aggregate://foo,bar,*.user', ] ); // Create the manager $manager = new DestinationManager($driverFactory, $destinationFactory);
Push a basic message into the queue. The consume should defined handler to process the message.
<?php use Bdf\Queue\Message\Message; $message = Message::create('Hello world'); $message->setDestination('my_destination'); // or use a lower level setting the connection and queue. $message = Message::create('Hello world', 'queue'); $message->setConnection('foo'); /** @var Bdf\Queue\Destination\DestinationManager $manager */ $manager->send($message);
Useful for monolithic application that needs to differ a process. Push a message job into the queue. The consumer will evaluate the job string and run the processor. In this use case the producer and the receiver share the same model.
<?php $message = \Bdf\Queue\Message\Message::createFromJob(Mailer::class.'@send', ['body' => 'my content']); $message->setDestination('my_destination'); /** @var Bdf\Queue\Destination\DestinationManager $manager */ $manager->send($message);
Available type for dsn destination
The class Bdf\Queue\Destination\DsnDestinationFactory
provides default type of destination:
You can declare your own type:
<?php use Bdf\Dsn\DsnRequest; use Bdf\Queue\Connection\ConnectionDriverInterface; use Bdf\Queue\Connection\Factory\ResolverConnectionDriverFactory; /** @var ResolverConnectionDriverFactory $driverFactory */ $destinationFactory = new Bdf\Queue\Destination\DsnDestinationFactory($driverFactory); $destinationFactory->register('my_own_type', function(ConnectionDriverInterface $connection, DsnRequest $dsn) { // ... }); // use dsn as "my_own_type://connection/queue_or_topic_name?option="
Consume messages
The consumer layer provides many tools for message handling. The default stack of objects that will receive the message is:
consumer (ConsumerInterface) -> receivers (ReceiverInterface) -> processor (ProcessorInterface) -> handler (callable)
consumer
has the strategy for reading the message from queue / topic. It also manage a graceful shutdown.receivers
is the stack of middlewares interacts with the envelope.processor
resolves the handler arguments. You can plug here your business logic and remove the handler layer. By default processor injects 2 arguments in handlers: the message data and the envelope.handler
manages the business logic. Handler allows an interface less mode.
An example to consume a simple message:
<?php use Bdf\Queue\Consumer\Receiver\ProcessorReceiver; use Bdf\Queue\Destination\DestinationManager; use Bdf\Queue\Processor\CallbackProcessor; use Bdf\Queue\Processor\MapProcessorResolver; // Create your processor and declare in a map: $myProcessor = new CallbackProcessor(function($data) { echo $data; }); $processorResolver = new MapProcessorResolver(['foo' => $myProcessor]); /** @var DestinationManager $manager */ $manager->create('queue://foo')->consumer(new ProcessorReceiver($processorResolver))->consume(0);
Consume a job message:
<?php use Bdf\Instantiator\Instantiator; use Bdf\Queue\Consumer\Receiver\ProcessorReceiver; use Bdf\Queue\Destination\DestinationManager; use Bdf\Queue\Processor\JobHintProcessorResolver; /** @var Instantiator $instantiator */ // The job should be provided from message to get the processor $processorResolver = new JobHintProcessorResolver($instantiator); /** @var DestinationManager $manager */ $manager->create('queue://foo')->consumer(new ProcessorReceiver($processorResolver))->consume(0);
Create a handler
<?php /** @var Bdf\Queue\Destination\DestinationManager $manager */ class MyHandler { public function handle($data, \Bdf\Queue\Message\EnvelopeInterface $envelope) { echo $data; // Display 'foo' // Ack the message. Default behavior. The ack is sent before the call by the consumer. $envelope->acknowledge(); // Reject the message. It will be no more available. The message is rejected if and exception is thrown. $envelope->reject(); // Reject the message and send it back to the queue $envelope->reject(true); } } $message = \Bdf\Queue\Message\Message::createFromJob(MyHandler::class, 'foo', 'queue'); $manager->send($message);
Use the synthax "Class@method"
to determine the callable (By default the method is "handle")
or register your handlers on a specific destination with the receiver builder:
<?php use Bdf\Queue\Consumer\Receiver\Builder\ReceiverBuilder; use Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoader; use Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoaderInterface; use Psr\Container\ContainerInterface; /** @var ContainerInterface $container */ /** @var Bdf\Queue\Destination\DestinationManager $manager */ $container->set(ReceiverLoaderInterface::class, function (ContainerInterface $container) { return new ReceiverLoader( $container, [ 'destination_name or connection_name' => function(ReceiverBuilder $builder) { /** @var \Bdf\Queue\Processor\ProcessorInterface $myProcessor */ /** @var \Bdf\Queue\Consumer\ReceiverInterface $myReceiver */ // Register your unique handler for the destination or connection. // all message will be handled by this handler. $builder->handler(MyHandler::class); // Or register your unique processor $builder->processor($myProcessor); // Or register the job bearer resolver as processor. The procesor will resolve the job // from the Message::$job attribute value. $builder->jobProcessor(); // Or register your own processor or handler by queue in case you consume a connection. // By default the key of the map is the queue name. You can provide your own key provider // with the second parameter. $builder->mapProcessor([ 'queue1' => $myProcessor, 'queue2' => MyHandler::class, ]); // Or register your final own receiver $builder->outlet($myReceiver); // Or register your own receiver in the stack $builder->add($myReceiver); // You can add more defined middlewares here // $builder->retry(2); } ] ); }); $receiver = $container->get(ReceiverLoaderInterface::class)->load('destination_name or connection_name')->build(); $manager->create('queue://foo')->consumer($receiver)->consume(0);
Run the consumer in console
$ example/consumer.php "connection name OR destination name"
Create receiver extensions
The consumer use a stack of receivers to extend the reception of messages.
See the interface Bdf\Queue\Consumer\ReceiverInterface
and the trait Bdf\Queue\Consumer\DelegateHelper
.
<?php class MyExtension implements \Bdf\Queue\Consumer\ReceiverInterface { use \Bdf\Queue\Consumer\DelegateHelper; private $options; /** * MyExtension constructor. */ public function __construct(\Bdf\Queue\Consumer\ReceiverInterface $delegate, array $options) { $this->delegate = $delegate; $this->options = $options; } /** * {@inheritdoc} */ public function receive($message, \Bdf\Queue\Consumer\ConsumerInterface $consumer): void { // Do something when receiving message if ($message->queue() === 'foo') { return; } // Call the next receiver $this->delegate->receive($message, $consumer); } }
You can use the Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoader::add()
to register your receiver in the stack
<?php $options = ['foo' => 'bar']; /** @var \Bdf\Queue\Consumer\Receiver\Builder\ReceiverBuilder $builder */ $builder->add(MyExtension::class, [$options]);
Customize the string payload
The class Bdf\Queue\Serializer\SerializerInterface
manage the payload content sent to the message broker.
By default metadata are added to the json as:
- PHP Type: to help consumer to deserialize complex entities.
- Message info: The attempt number for retry, The sending date, ...
A basic payload looks like:
{ "name": "Foo", "data": "Hello World", "date": "2019-12-23T16:02:03+01:00" }
You can customize the string with your own implementation of the serializer interface.
Try the hello world example (configure the message broker in example/config/connections.php
):
$ example/producer.php foo '{"name":"Foo", "data":"Hello World"}' --raw
$ example/consumer.php foo
RPC client
<?php use Bdf\Queue\Message\InteractEnvelopeInterface; use Bdf\Queue\Message\Message; class RpcReplyHandler { public function doSomethingUseful(int $number, InteractEnvelopeInterface $envelope) { // Send bask: 1 x 2 to client $envelope->reply($number * 2); // Or retry in 10sec $envelope->retry(10); } } $message = Message::createFromJob(RpcReplyHandler::class.'@doSomethingUseful', 1, 'queue'); $message->setConnection('foo'); /** @var Bdf\Queue\Destination\DestinationManager $manager */ $promise = $manager->send($message); // Consume the foo connection // Receive data from the reply queue. If the header "replyTo" is not set, // the response will be sent to "queue_reply" echo $promise->await(500)->data(); // Display 2
Additionnal options for connection
Note:
- Format of a valid DSN: {driver}+{vendor}://{user}:{password}@{host}:{port}/{queue}?{option}=value
- See https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md for more kafka options.
Additionnal options for message
Serialization
Benchmarks
simple job / closure job
Analysis
- For the best execution time, regardless of size, use the default
Serializer
- For the smaller size, regardless of time, use
BdfSerializer
withCompressedSerializer
- For the best compromise, use
Serializer
withCompressedSerializer
- Always smaller than pure
BdfSerializer
(JSON or Binary) - Faster on unserialize, slightly slower on serialize
- Around twice faster than compressed bdf, but only ~40% larger on simple job
- Always smaller than pure
License
Distributed under the terms of the MIT license.