jezzdk / lumen-pubsub
A Pub-Sub abstraction for Lumen
Requires
- php: >=5.6.0
- illuminate/console: ^5.3
- illuminate/support: ^5.3
- superbalist/php-pubsub: ^1.0
- superbalist/php-pubsub-google-cloud: ^2.0|^3.0
- superbalist/php-pubsub-redis: ^1.0
Requires (Dev)
- mockery/mockery: ^0.9.5
- phpunit/phpunit: ^5.5
README
A Pub-Sub abstraction for Laravel Lumen.
This package is a wrapper bridging php-pubsub into Laravel Lumen.
For Laravel 4 support, use the package https://github.com/Superbalist/laravel4-pubsub
The following adapters are supported:
- Local
- /dev/null
- Redis
- Kafka (see separate installation instructions below)
- Google Cloud
Installation
composer require jezzdk/lumen-pubsub
The package has a default configuration which uses the following environment variables.
PUBSUB_CONNECTION=redis
REDIS_HOST=localhost
REDIS_PASSWORD=null
REDIS_PORT=6379
KAFKA_BROKERS=localhost
GOOGLE_CLOUD_PROJECT_ID=your-project-id-here
GOOGLE_CLOUD_KEY_FILE=path/to/your/gcloud-key.json
To customize the configuration file, copy the package configuration into your project.
cp vendor/jezzdk/lumen-pubsub/config/pubsub.php config/pubsub.php
You can then edit the generated config at config/pubsub.php
.
Register the service provider in bootstrap/app.php
$app->register(Superbalist\LaravelPubSub\PubSubServiceProvider::class);
Kafka Adapter Installation
Please note that whilst the package is bundled with support for the php-pubsub-kafka adapter, the adapter is not included by default.
This is because the KafkaPubSubAdapter has an external dependency on the librdkafka c library
and the php-rdkafka
PECL extension.
If you plan on using this adapter, you will need to install these dependencies by following these installation instructions.
You can then include the adapter using:
composer require superbalist/php-pubsub-kafka
Usage
// get the pub-sub manager $pubsub = app('pubsub'); // note: function calls on the manager are proxied through to the default connection // eg: you can do this on the manager OR a connection $pubsub->publish('channel_name', 'message'); // get the default connection $pubsub = app('pubsub.connection'); // or $pubsub = app(\Superbalist\PubSub\PubSubAdapterInterface::class); // get a specific connection $pubsub = app('pubsub')->connection('redis'); // publish a message // the message can be a string, array, json string, bool, object - anything which can be serialized $pubsub->publish('channel_name', 'this is where your message goes'); $pubsub->publish('channel_name', ['key' => 'value']); $pubsub->publish('channel_name', '{"key": "value"}'); $pubsub->publish('channel_name', true); // subscribe to a channel $pubsub->subscribe('channel_name', function ($message) { var_dump($message); }); // all the aboce commands can also be done using the facade PubSub::connection('kafka')->publish('channel_name', 'Hello World!'); PubSub::connection('kafka')->subscribe('channel_name', function ($message) { var_dump($message); });
Creating a Subscriber
The package includes a helper command php artisan make:subscriber MyExampleSubscriber
to stub new subscriber command classes.
A lot of pub-sub adapters will contain blocking subscribe()
calls, so these commands are best run as daemons running
as a supervisor process.
This generator command will create the file app/Console/Commands/MyExampleSubscriber.php
which will contain:
<?php namespace App\Console\Commands; use Illuminate\Console\Command; use Superbalist\PubSub\PubSubAdapterInterface; class MyExampleSubscriber extends Command { /** * The name and signature of the subscriber command. * * @var string */ protected $signature = 'subscriber:name'; /** * The subscriber description. * * @var string */ protected $description = 'PubSub subscriber for ________'; /** * @var PubSubAdapterInterface */ protected $pubsub; /** * Create a new command instance. * * @param PubSubAdapterInterface $pubsub */ public function __construct(PubSubAdapterInterface $pubsub) { parent::__construct(); $this->pubsub = $pubsub; } /** * Execute the console command. */ public function handle() { $this->pubsub->subscribe('channel_name', function ($message) { }); } }
Kafka Subscribers
For subscribers which use the php-pubsub-kafka
adapter, you'll likely want to change the consumer_group_id
per
subscriber.
To do this, you need to use the PubSubConnectionFactory
to create a new connection per subscriber. This is because
the consumer_group_id
cannot be changed once a connection is created.
Here is an example of how you can do this:
<?php namespace App\Console\Commands; use Illuminate\Console\Command; use Superbalist\LaravelPubSub\PubSubConnectionFactory; use Superbalist\PubSub\PubSubAdapterInterface; class MyExampleKafkaSubscriber extends Command { /** * The name and signature of the subscriber command. * * @var string */ protected $signature = 'subscriber:name'; /** * The subscriber description. * * @var string */ protected $description = 'PubSub subscriber for ________'; /** * @var PubSubAdapterInterface */ protected $pubsub; /** * Create a new command instance. * * @param PubSubConnectionFactory $factory */ public function __construct(PubSubConnectionFactory $factory) { parent::__construct(); $config = config('pubsub.connections.kafka'); $config['consumer_group_id'] = self::class; $this->pubsub = $factory->make('kafka', $config); } /** * Execute the console command. */ public function handle() { $this->pubsub->subscribe('channel_name', function ($message) { }); } }
Adding a Custom Driver
Please see the php-pubsub documentation Writing an Adapter.
To include your custom driver, you can call the extend()
function.
$manager = app('pubsub'); $manager->extend('custom_connection_name', function ($config) { // your callable must return an instance of the PubSubAdapterInterface return new MyCustomPubSubDriver($config); }); // get an instance of your custom connection $pubsub = $manager->connection('custom_connection_name');