robier/laravel-pubsub

A Pub-Sub abstraction for Laravel

5.0.0 2020-06-16 10:26 UTC

README

A Pub-Sub abstraction for Laravel. forked from Superbalist/laravel-pubsub

Author Build Status StyleCI Software License Packagist Version Total Downloads

This package is a wrapper bridging php-pubsub into Laravel.

For Laravel 4 support, use the package https://github.com/Superbalist/laravel4-pubsub

Please note that Laravel 5.3 is only supported up until version 2.0.2.

2.0.3+ supports Laravel 5.4 and up moving forward.

The following adapters are supported:

  • Local
  • /dev/null
  • Redis
  • Kafka (see separate installation instructions below)
  • Google Cloud
  • HTTP

Installation

composer require entanet/laravel-pubsub

Register the service provider in app.php

'providers' => [
    // ...
    Superbalist\LaravelPubSub\PubSubServiceProvider::class,
]

Register the facade in app.php

'aliases' => [
    // ...
    'PubSub' => Superbalist\LaravelPubSub\PubSubFacade::class,
]

The package has a default configuration which uses the following environment variables.

PUBSUB_CONNECTION=redis

REDIS_HOST=localhost
REDIS_PASSWORD=null
REDIS_PORT=6379

KAFKA_BROKERS=localhost
KAFKA_SECURITY_PROTOCOL=kafka-security-protocol
KAFKA_SASL_USERNAME=kafka-sasl-username
KAFKA_SASL_PASSWORD=kafks-sasl-password

GOOGLE_CLOUD_PROJECT_ID=your-project-id-here
GOOGLE_CLOUD_KEY_FILE=path/to/your/gcloud-key.json

HTTP_PUBSUB_URI=null
HTTP_PUBSUB_SUBSCRIBE_CONNECTION=redis

If the KAFKA_SECURITY_PROTOCOL is set to either "SASL_SSL" or "SASL_PLAINTEXT", the credentials stored in KAFKA_SASL_USERNAME & KAFKA_SASL_PASSWORD will be used to authenticate with the kafka server / cluster.

To customize the configuration file, publish the package configuration using Artisan.

php artisan vendor:publish --provider="Superbalist\LaravelPubSub\PubSubServiceProvider"

You can then edit the generated config at app/config/pubsub.php.

Kafka Adapter Installation

Please note that whilst the package is bundled with support for the php-pubsub-kafka adapter, the adapter is not included by default.

This is because the KafkaPubSubAdapter has an external dependency on the librdkafka c library and the php-rdkafka PECL extension.

If you plan on using this adapter, you will need to install these dependencies by following these installation instructions.

You can then include the adapter using:

composer require superbalist/php-pubsub-kafka

Usage

// get the pub-sub manager
$pubsub = app('pubsub');

// note: function calls on the manager are proxied through to the default connection
// eg: you can do this on the manager OR a connection
$pubsub->publish('channel_name', 'message');

// get the default connection
$pubsub = app('pubsub.connection');
// or
$pubsub = app(\Superbalist\PubSub\PubSubAdapterInterface::class);

// get a specific connection
$pubsub = app('pubsub')->connection('redis');

// publish a message
// the message can be a string, array, bool, object - anything which can be json encoded
$pubsub->publish('channel_name', 'this is where your message goes');
$pubsub->publish('channel_name', ['key' => 'value']);
$pubsub->publish('channel_name', true);

// publish multiple messages
$messages = [
    'message 1',
    'message 2',
];
$pubsub->publishBatch('channel_name', $messages);

// subscribe to a channel
$pubsub->subscribe('channel_name', function ($message) {
    var_dump($message);
});

// all the above commands can also be done using the facade
PubSub::connection('kafka')->publish('channel_name', 'Hello World!');

PubSub::connection('kafka')->subscribe('channel_name', function ($message) {
    var_dump($message);
});

Creating a Subscriber

The package includes a helper command php artisan make:subscriber MyExampleSubscriber to stub new subscriber command classes.

A lot of pub-sub adapters will contain blocking subscribe() calls, so these commands are best run as daemons running as a supervisor process.

This generator command will create the file app/Console/Commands/MyExampleSubscriber.php which will contain:

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Superbalist\PubSub\PubSubAdapterInterface;

class MyExampleSubscriber extends Command
{
    /**
     * The name and signature of the subscriber command.
     *
     * @var string
     */
    protected $signature = 'subscriber:name';

    /**
     * The subscriber description.
     *
     * @var string
     */
    protected $description = 'PubSub subscriber for ________';

    /**
     * @var PubSubAdapterInterface
     */
    protected $pubsub;

    /**
     * Create a new command instance.
     *
     * @param PubSubAdapterInterface $pubsub
     */
    public function __construct(PubSubAdapterInterface $pubsub)
    {
        parent::__construct();

        $this->pubsub = $pubsub;
    }

    /**
     * Execute the console command.
     */
    public function handle()
    {
        $this->pubsub->subscribe('channel_name', function ($message) {

        });
    }
}

Kafka Subscribers

For subscribers which use the php-pubsub-kafka adapter, you'll likely want to change the consumer_group_id per subscriber.

To do this, you need to use the PubSubConnectionFactory to create a new connection per subscriber. This is because the consumer_group_id cannot be changed once a connection is created.

Here is an example of how you can do this:

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Superbalist\LaravelPubSub\PubSubConnectionFactory;
use Superbalist\PubSub\PubSubAdapterInterface;

class MyExampleKafkaSubscriber extends Command
{
    /**
     * The name and signature of the subscriber command.
     *
     * @var string
     */
    protected $signature = 'subscriber:name';

    /**
     * The subscriber description.
     *
     * @var string
     */
    protected $description = 'PubSub subscriber for ________';

    /**
     * @var PubSubAdapterInterface
     */
    protected $pubsub;

    /**
     * Create a new command instance.
     *
     * @param PubSubConnectionFactory $factory
     */
    public function __construct(PubSubConnectionFactory $factory)
    {
        parent::__construct();

        $config = config('pubsub.connections.kafka');
        $config['consumer_group_id'] = self::class;
        $this->pubsub = $factory->make('kafka', $config);
    }

    /**
     * Execute the console command.
     */
    public function handle()
    {
        $this->pubsub->subscribe('channel_name', function ($message) {

        });
    }
}

Adding a Custom Driver

Please see the php-pubsub documentation Writing an Adapter.

To include your custom driver, you can call the extend() function.

$manager = app('pubsub');
$manager->extend('custom_connection_name', function ($config) {
    // your callable must return an instance of the PubSubAdapterInterface
    return new MyCustomPubSubDriver($config);
});

// get an instance of your custom connection
$pubsub = $manager->connection('custom_connection_name');