Fetch queue messages from external queue

v0.3 2019-04-01 06:09 UTC

Laravel queues work great internally when jobs are both pushed (SomeJob::dispatch()) and then fetched via Laravel queue workers.

But what if jobs are pushed to a queue from an external service or message broker other than Laravel? For example, AWS S3 bucket > SNS > SQS? In those cases, Laravel queue workers do not recognise the payload, cannot parse it and break because the job payload is missing the expected job and data attributes etc.

CustomQueue aims to solve this issue. It will fetch a job, re-packages the payload to a format which Laravel recognises and then process via a user specified job handler.

Payload without custom-queue:

  "Records": [
      "eventVersion": "2.1",
      "eventSource": "aws:s3",
      "awsRegion": "ap-southeast-2",
      "eventTime": "2019-03-22T06:31:40.395Z",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {
        "principalId": "AWS:blahblah:blah"
      "requestParameters": {
        "sourceIPAddress": ""
      "responseElements": {
        "x-amz-request-id": "C53F65ECD63F53F8",
        "x-amz-id-2": "blah="
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "folder-name",
        "bucket": {
          "name": "bucket-name",
          "ownerIdentity": {
            "principalId": "A2XHNNJ3IERBDC"
          "arn": "arn:aws:s3:::bucket-name"
        "object": {
          "key": "file-drop/droptest.csv",
          "size": 12,
          "eTag": "tagid",
          "sequencer": "005C94814C54E35D75"

Payload with custom-queue:

  "type": "job",
  "job": "custom-sqs",
  "data": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-2\",\"eventTime\":\"2019-03-22T06:31:40.395Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:blahblah:blah\"},\"requestParameters\":{\"sourceIPAddress\":\"\"},\"responseElements\":{\"x-amz-request-id\":\"C53F65ECD63F53F8\",\"x-amz-id-2\":\"blah=\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"folder-name\",\"bucket\":{\"name\":\"bucket-name\",\"ownerIdentity\":{\"principalId\":\"A2XHNNJ3IERBDC\"},\"arn\":\"arn:aws:s3:::bucket-name\"},\"object\":{\"key\":\"file-drop/droptest.csv\",\"size\":12,\"eTag\":\"tagid\",\"sequencer\":\"005C94814C54E35D75\"}}}]}"

**Note: ** job: custom-sqs is designating a job handler to be used to process the job. data is simply the original payload.


Install package via composer

composer require wired00/custom-queue

Publish the CustomQueue config file into your project


Setup a custom external SQS connection

        'custom-sqs' => [
            'driver' => 'custom-sqs',
            'key' => env('AWS_ACCESS_KEY_ID', 'your-public-key'),
            'secret' => env('AWS_SECRET_ACCESS_KEY', 'your-secret-key'),
            'queue' => env('SQS_QUEUE', 'your-queue-url'),
            'region' => env('AWS_REGION', 'us-east-1'),

Set all these values from your Laravel .env. I.e:



customqueue.php config file simply contains a mapping between handler class path and an identifier.

The example contains identifier custom-sqs and class path App\Jobs\ProcessSQS::class. This specifies that the job payload when fetched from a queue will be appended with a key-value of job: App\Jobs\ProcessSQS::class whenever a connection type of custom-sqs is processed. The job will then process via ProcessSQS->handle().

Currently CustomQueue only supports custom SQS fetching but in future it might support RabbitMQ and Redis. In those cases customqueue.php would include identifiers such as custom-redis and custom-rabbitmq.

Handler files

Handler files are those referenced via the class path within customqueue.php. They must implement Wired00\CustomQueue\Contracts\CustomQueueJobHandler.

A common namespace for these are App\Jobs.

For example:


namespace App\Jobs;

use Wired00\CustomQueue\Contracts\CustomQueueJobHandler as HandlerContract;
use Illuminate\Queue\Jobs\Job;

class ProcessSQS implements HandlerContract
     * Execute the job.
     * @param Job $job
     * @param null $data
     * @return void
    public function handle(Job $job, $data = null)
        // process code (you can access job payload via $data)
		    // $job->delete();


  • handle() accepts the current job and importantly $data which contains the job payload popped from the SQS queue for example.

  • $job->delete(); might be needed depending on how the queue handles things. In my case, SQS did not seem to auto remove jobs after they were fetched. So delete() was required to remove the job after handling.


This is built upon the unmaintained, and non-functional, Laravel External Queue package (kristianedlund/laravel-external-queue)

It is of MIT license do what you want with it. All care no responsibility. MIT license.