axl-media/laravel-sqs-fifo-queue

Adds a Laravel queue driver for Amazon SQS FIFO queues. Based on shiftonelabs/laravel-sqs-fifo-queue

1.2.0 2019-11-05 07:00 UTC

This package is auto-updated.

Last update: 2024-04-05 16:46:45 UTC


README

Latest Version on Packagist Software License Build Status Coverage Status Quality Score Total Downloads

This Laravel/Lumen package provides a queue driver for Amazon's SQS FIFO queues. While Laravel works with Amazon's SQS standard queues out of the box, FIFO queues are slightly different and are not handled properly by Laravel. That is where this package comes in.

Versions

This package has been tested on Laravel 4.1 through Laravel 5.6, though it may continue to work on later versions as they are released. This section will be updated to reflect the versions on which the package has actually been tested.

Install

Via Composer

$ composer require axl-media/laravel-sqs-fifo-queue

Once composer has been updated and the package has been installed, the service provider will need to be loaded.

Laravel 5.5+ (5.5, 5.6)

This package uses auto package discovery. The service provider will automatically be registered.

Laravel 5.0 - 5.4

Open config/app.php and add following line to the providers array:

AXLMedia\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider::class,

Laravel 4 (4.1, 4.2)

Open app/config/app.php and add following line to the providers array:

'AXLMedia\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider',

Lumen 5 (5.0, 5.1, 5.2, 5.3, 5.4, 5.5, 5.6)

Open bootstrap/app.php and add following line under the "Register Service Providers" section:

$app->register(AXLMedia\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider::class);

Configuration

Laravel/Lumen 5.1+ (5.1, 5.2, 5.3, 5.4, 5.5, 5.6)

If using Lumen, create a config directory in your project root if you don't already have one. Next, copy vendor/laravel/lumen-framework/config/queue.php to config/queue.php.

Now, for both Laravel and Lumen, open config/queue.php and add the following entry to the connections array.

'sqs-fifo' => [
    'driver' => 'sqs-fifo',
    'key' => env('SQS_KEY'),
    'secret' => env('SQS_SECRET'),
    'prefix' => env('SQS_PREFIX'),
    'queue' => 'your-queue-name',    // ex: queuename.fifo
    'region' => 'your-queue-region', // ex: us-east-2
    'group' => 'default',
    'deduplicator' => 'unique',
],

Example .env file:

SQS_KEY=ABCDEFGHIJKLMNOPQRST
SQS_SECRET=1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
SQS_PREFIX=https://sqs.us-east-2.amazonaws.com/123456789012

If you'd like this to be the default connection, also set QUEUE_DRIVER=sqs-fifo in the .env file.

Laravel/Lumen 5.0

If using Lumen, create a config directory in your project root if you don't already have one. Next, copy vendor/laravel/lumen-framework/config/queue.php to config/queue.php.

Now, for both Laravel and Lumen, open config/queue.php and add the following entry to the connections array:

'sqs-fifo' => [
    'driver' => 'sqs-fifo',
    'key'    => env('SQS_KEY'),
    'secret' => env('SQS_SECRET'),
    'queue'  => env('SQS_PREFIX').'/your-queue-name',
    'region' => 'your-queue-region',
    'group' => 'default',
    'deduplicator' => 'unique',
],

Example .env file:

SQS_KEY=ABCDEFGHIJKLMNOPQRST
SQS_SECRET=1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
SQS_PREFIX=https://sqs.us-east-2.amazonaws.com/123456789012

If you'd like this to be the default connection, also set QUEUE_DRIVER=sqs-fifo in the .env file.

Laravel 4

Open app/config/queue.php and add the following entry to the connections array:

'sqs-fifo' => array(
    'driver' => 'sqs-fifo',
    'key'    => 'your-public-key',   // ex: ABCDEFGHIJKLMNOPQRST
    'secret' => 'your-secret-key',   // ex: 1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
    'queue'  => 'your-queue-url',    // ex: https://sqs.us-east-2.amazonaws.com/123456789012/queuename.fifo
    'region' => 'your-queue-region', // ex: us-east-2
    'group' => 'default',
    'deduplicator' => 'unique',
),

If you'd like this to be the default connection, also update the 'default' key to 'sqs-fifo'.

Capsule

If using the illuminate\queue component Capsule outside of Lumen/Laravel:

use Illuminate\Queue\Capsule\Manager as Queue;
use AXLMedia\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider;

$queue = new Queue;

$queue->addConnection([
    'driver' => 'sqs-fifo',
    'key'    => 'your-public-key',   // ex: ABCDEFGHIJKLMNOPQRST
    'secret' => 'your-secret-key',   // ex: 1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
    /**
     * Set "prefix" and/or "queue" depending on version, as described for Laravel versions above
     * 'prefix' => 'your-prefix',
     * 'queue' => 'your-queue-name',
     */
    'region' => 'your-queue-region', // ex: us-east-2
    'group' => 'default',
    'deduplicator' => 'unique',
], 'sqs-fifo');

// Make this Capsule instance available globally via static methods... (optional)
$queue->setAsGlobal();

// Register the 'queue' alias in the Container, then register the SQS FIFO provider.
$app = $queue->getContainer();
$app->instance('queue', $queue->getQueueManager());
(new LaravelSqsFifoQueueServiceProvider($app))->register();

Credentials

The key and secret config options may be omitted if using one of the alternative options for providing AWS credentials (e.g. using an AWS credentials file). More informataion about this is available in the AWS PHP SDK guide here.

Usage

For the most part, usage of this queue driver is the same as the built in queue drivers. There are, however, a few extra things to consider when working with Amazon's SQS FIFO queues.

Message Groups

In addition to being able to have multiple queue names for each connection, an SQS FIFO queue also allows one to have multiple "message groups" for each FIFO queue. These message groups are used to group related jobs together, and jobs are processed in FIFO order per group. This is important, as your queue performance may depend on being able to assign message groups properly. If you have 100 jobs in the queue, and they all belong to one message group, then only one queue worker will be able to process the jobs at a time. If, however, they can logically be split across 5 message groups, then you could have 5 queue workers processing the jobs from the queues (one per group). The FIFO ordering is per message group.

Currently, by default, all queued jobs will be lumped into one group, as defined in the configuration file. In the configuration provided above, all queued jobs would be sent as part of the default group. The group can be changed per job using the onMessageGroup() method, which will be explained more below.

The group id must not be empty, must not be more than 128 characters, and can contain alphanumeric characters and punctuation (!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~).

In a future release, the message group will be able to be assigned to a function, like the deduplicator below.

Deduplication

When sending jobs to the SQS FIFO queue, Amazon requires a way to determine if the job is a duplicate of a job already in the queue. SQS FIFO queues have a 5 minute deduplication interval, meaning that if a duplicate message is sent within the interval, it will be accepted successfully (no errors), but it will not be delivered or processed.

Determining duplicate messages is generally handled in one of two ways: either all messages are considered unique regardless of content, or messages are considered duplicates if they have the same content.

This package handles deduplication via the deduplicator configuration option.

To have all messages considered unique, set the deduplicator to unique.

To have messages with the same content considered duplicates, there are two options, depending on how the FIFO queue has been configured. If the FIFO queue has been setup in Amazon with the Content-Based Deduplication feature enabled, then the deduplicator should be set to sqs. This tells the connection to rely on Amazon to determine content uniqueness. However, if the Content-Based Deduplication feature is disabled, the deduplicator should be set to content. Note, if Content-Based Deduplication is disabled, and the deduplicator is set to sqs, this will generate an error when attempting to send a job to the queue.

To summarize:

  • sqs - This is used when messages with the same content should be considered duplicates and Content-Based Deduplication is enabled on the SQS FIFO queue.
  • content - This is used when messages with the same content should be considered duplicates but Content-Based Deduplication is disabled on the SQS FIFO queue.
  • unique - This is used when all messages should be considered unique, regardless of content.

If there is a need for a different deduplication algorithm, custom deduplication methods can be registered in the container.

Finally, by default, all queued jobs will use the deduplicator defined in the configuration file. This can be changed per job using the withDeduplicator() method.

Delayed Jobs

SQS FIFO queues do not support per-message delays, only per-queue delays. The desired delay is defined on the queue itself when the queue is setup in the Amazon Console. Attempting to set a delay on a job sent to a FIFO queue will have no affect. To this end, using the later() method to push a job to an SQS FIFO queue will generate a BadMethodCallException.

To delay a job, you must push() the job to an SQS FIFO queue that has been defined with a delivery delay.

Advanced Usage

Per-Job Group and Deduplicator

If you need to change the group or the deduplicator for a specific job, you will need access to the onMessageGroup() and withDeduplicator() methods. These methods are provided through the AXLMedia\LaravelSqsFifoQueue\Bus\SqsFifoQueueable trait. Once you add this trait to your job class, you can change the group and/or the deduplicator for that specific job without affecting any other jobs on the queue.

Code Example

Job:

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use AXLMedia\LaravelSqsFifoQueue\Bus\SqsFifoQueueable;

class ProcessCoin implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SqsFifoQueueable, SerializesModels;

    //
}

Usage:

dispatch(
    (new \App\Jobs\ProcessCoin)
        ->onMessageGroup('quarter')
        ->withDeduplicator('unique')
);

Custom Deduplicator

The deduplicators work by generating a deduplication id that is sent to the queue. If two messages generate the same deduplication id, the second message is considered a duplicate, and the message will not be delivered if it is within the 5 minute deduplication interval.

If you have some custom logic that needs to be used to generate the deduplication id, you can register your own custom deduplicator. The deduplicators are stored in the IoC container with the prefix queue.sqs-fifo.deduplicator. So, for example, the unique deduplicator is aliased to queue.sqs-fifo.deduplicator.unique.

Custom deduplicators are created by registering a new prefixed alias in the IoC. This alias should resolve to a new object instance that implements the AXLMedia\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator contract. You can either define a new class that implements this contract, or you can create a new AXLMedia\LaravelSqsFifoQueue\Queue\Deduplicators\Callback instance, which takes a Closure that performs the deduplication logic. The defined Closure should take two parameters: $payload and $queue, where $payload is the json_encoded() message to send to the queue, and $queue is the name of the queue to which the message is being sent. The generated id must not be more than 128 characters, and can contain alphanumeric characters and punctuation (!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~).

So, for example, if you wanted to create a random deduplicator that would randomly select some jobs to be duplicates, you could add the following line in the register() method of your AppServiceProvider:

$this->app->bind('queue.sqs-fifo.deduplicator.random', function ($app) {
    return new \AXLMedia\LaravelSqsFifoQueue\Queue\Deduplicators\Callback(function ($payload, $queue) {
        // Return the deduplication id generated for messages. Randomly 0 or 1.
        return mt_rand(0,1);
    });
}

Or, if you prefer to create a new class, your class would look like this:

namespace App\Deduplicators;

use AXLMedia\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator;

class Random implements Deduplicator
{
    public function generate($payload, $queue)
    {
        // Return the deduplication id generated for messages. Randomly 0 or 1.
        return mt_rand(0,1);
    }
}

And you could register that class in your AppServiceProvider like this:

$this->app->bind('queue.sqs-fifo.deduplicator.random', App\Deduplicators\Random::class);

With this alias registered, you could update the deduplicator key in your configuration to use the value random, or you could set the deduplicator on individual jobs by calling withDeduplicator('random') on the job.

Contributing

Contributions are welcome. Please see CONTRIBUTING for details.

Security

If you discover any security related issues, please email alex.g@axl.media instead of using the issue tracker.

Credits

License

The MIT License (MIT). Please see License File for more information.