chocofamilyme/laravel-pubsub

AMQP wrapper for Laravel to publish and consume messages


README

Laravel pub/sub library allows you to publish, consume and process rabbit events.

Installation

composer require chocofamilyme/laravel-pubsub

Upgrade v6 -> v7

Read upgrade guide here

Upgrade v5 -> v6

Read upgrade guide here

Upgrade v3 -> v4

Read upgrade guide here

Publishing the configuration and migration

php artisan vendor:publish --provider="Chocofamilyme\LaravelPubSub\Providers\PubSubServiceProvider"

Configurations

AMQP (RabbitMQ) configuration

  • Set environment BROADCAST_DRIVER = rabbitmq
  • Add to config/broadcasting.php rabbitmq driver
...
'connections' => [
        'rabbitmq' => [
            'driver' => 'rabbitmq',
        ],
...
]
  • AMQP configuration should be inserted into config/queue.php
'sync' => [  
...
],  
  
'database' => [  
...
],  
  
'beanstalkd' => [  
...
],

// Insert into your config/queue.php
'rabbitmq' => [  
    'driver' => 'rabbitmq',  
    'queue' => env('RABBITMQ_QUEUE', 'default'),  
    'connection' => PhpAmqpLib\Connection\AMQPSocketConnection::class,  
    'worker' => env('RABBITMQ_WORKER', Chocofamilyme\LaravelPubSub\Queue\RabbitMQQueue::class),  
  
    'hosts' => [  
        [
        'host' => env('SERVICE_RABBITMQ_HOST', '127.0.0.1'),  
        'port' => env('SERVICE_RABBITMQ_PORT', 5672),  
        'user' => env('SERVICE_RABBITMQ_USER', 'guest'),  
        'password' => env('SERVICE_RABBITMQ_PASSWORD', 'guest'),  
        'vhost' => env('SERVICE_RABBITMQ_VHOST', '/'),  
        ],
     ],  
  
    'options' => [  
        'ssl_options' => [  
            'cafile' => env('RABBITMQ_SSL_CAFILE', null),  
            'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),  
            'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),  
            'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),  
            'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),  
        ],
    
        'heartbeat' => 60,  
        'message-ttl' => 60000000,  
  
        'publisher' => [  
            'queue' => [  
                'declare' => false,  
                'bind' => false,
            ],  
            'exchange' => [  
                'declare' => false,
                'name' => 'exchange-name',  
            ],  
        ],
    ],
]

Params

Event routing configuration

Event routing configuration file is located under config/pubsub.php and contains configuration for EventRouting and storing events in database.

<?php

return [
    /*
    |--------------------------------------------------------------------------
    | Listen for events
    |--------------------------------------------------------------------------
    |
    | Define event name and it's listeners. Please notice that one event name may have multiple listeners
    |
    | Example:
    |
    | listen => [
    |     'UserNotified' => [
    |         'durable' => true,
    |         'listeners' => [
    |             NotifyAboutDeviceChangeListener::class,
    |         ],
    |     ]
    | ],
    |
    */
    'listen' => [

    ],
    
    /**
     * Define database tables for storing data (publishing events, incoming events, etc.)
     */
    'tables' => [
        'events' => 'pubsub_events'
    ]
];  

Usage

You can listen for RabbitMQ events and publish them.

--job=laravel flag should be used if you want to listen to inter project events but from rabbitmq queue.

php artisan event:listen --job=laravel

--job=laravel flag should be used for intermicroservice communictaion.

php artisan event:listen --job=external

Examples

Single event

php artisan event:listen gateway.user.authenticated --job=external

Will listen to single event "gateway.user.authenticated" in default exchange and queue name. Configure the internal event routing in config/pubsub.php Event is taken from payload, when you publish the event it appends the event name automatically there.

Wildcard event

php artisan event:listen gateway.user.# --exchange=gateway --queue=guardqueue --job=external

Will listen to all "gateway.user.*" events in exchange gateway and with queue name "guardqueue"

Interproject communictaion

php artisan event:listen

Will listen for default laravel event, in the default case --job=laravel is set by default

php artisan event:listen flags and parameters

connection=rabbitmq                        : The name of the queue connection to work
--queue=                                   : The names of the queues to work
--exchange=                                : Optional, specifies exchange which should be listened [for default value see app/config/queue.php]
--exchange_type=topic                      : Optional, specifies exchange which should be listened [for default value see app/config/queue.php] [RabbitMQ Doc](https://www.rabbitmq.com/tutorials/amqp-concepts.html)
--once                                     : Only process the next job on the queue
--job=laravel                              : Handler for internal or external message
--stop-when-empty                          : Stop when the queue is empty
--delay=0                                  : The number of seconds to delay failed jobs
--force                                    : Force the worker to run even in maintenance mode
--memory=128                               : The memory limit in megabytes
--sleep=3                                  : Number of seconds to sleep when no job is available
--timeout=0                                : The number of seconds a child process can run
--tries=1                                  : Number of times to attempt a job before logging it failed
--exclusive=0                              : used by only one connection and the queue will be deleted when that connection close
--consumer_exclusive=0                     : request exclusive consumer access, meaning only this consumer can access the queue
--wait_non_blocking=0                      : non-blocking actions
--exchange_passive=0                       : If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not [RabbitMQ Doc](https://www.rabbitmq.com/amqp-0-9-1-reference.html#exchange.declare.passive)
--exchange_durable=1                       : If set when creating a new exchange, the exchange will be marked as durable [RabbitMQ Doc](https://www.rabbitmq.com/amqp-0-9-1-reference.html#exchange.declare.durable)
--exchange_auto_delete=0                   : If set, the exchange is deleted when all queues have finished using it [RabbitMQ Doc](https://www.rabbitmq.com/amqp-0-9-1-reference.html#exchange.declare.auto-delete)
--consumer-tag                             :
--prefetch-size=0                          :
--prefetch-count=1                         : [RabbitMQ Doc](https://www.rabbitmq.com/consumer-prefetch.html)  |

How to publish events

For laravel default way see the laravel documentation

https://laravel.com/

If you want to publish event to RabbitMQ

We've tried to make it easy as possible for you, see how it works:

  1. Create event with php artisan make:event, please aware that the name of your event class will be the event name in the message payload. It is then used for internal router config/pubsub.php
  2. Open fresh created event and extends from Chocofamilyme\LaravelPubSub\Events\PublishEvent
  3. You will have to override values of constants couple of methods like EXCHANGE_NAME, ROUTING_KEY. These constants tell the dispatcher which exchange should be used for this event and which routing key. See? It's pretty self-descriptive.
  4. Since you extended from PublishEvent class you could override more methods which could make the event more precise, for that please see inside this class.
  5. After our event is ready, we now can publish it in laravel way:
event(new UserUpdatedEvent(1, 'Josh'));

Since this event extends from PublishEvent class implementing SendToRabbitMQInterface, it will be sent into rabbitmq automatically.

PS: Please note that you need to override toPayload() method returning array that would be used as message payload.

Example event class

<?php

namespace App\Events;

use Chocofamilyme\LaravelPubSub\Events\PublishEvent;

class UserUpdatedEvent extends PublishEvent
{
    public int $id;
    public string $name;

    public const EXCHANGE_NAME = 'exchangeName';
    public const ROUTING_KEY   = 'user.updated';

    /**
     * Create a new event instance.
     *
     * @param int $id
     * @param string $name
     */
    public function __construct(int $id, string $name)
    {
        $this->id = $id;
        $this->name = $name;
    }
}

Example setting listen in config/pubsub.php

...
  'listen' => [
      'App\Events\UserUpdatedEvent' => [
          'durable' => true,
          'listeners' => [
               UserChangeListener::class,
          ],
      ],
  ],
...

To save failed jobs, you need to make the following changes config in queue.php and set uuid column nullable in failed_jobs table

...
      'failed' => [
        'driver' => env('QUEUE_FAILED_DRIVER', 'database-uuids'),
        'database' => env('DB_CONNECTION', 'mysql'),
        'table' => 'failed_jobs',
    ],
...
...
      'failed' => [
//        'driver' => env('QUEUE_FAILED_DRIVER', 'database-uuids'),
        'database' => env('DB_CONNECTION', 'mysql'),
        'table' => 'failed_jobs',
    ],
...