freyo/laravel-queue-rocketmq

Queue Adapter for AlibabaMQ(Apache RocketMQ) SDK

1.0.1 2020-10-20 08:29 UTC

This package is auto-updated.

Last update: 2024-12-20 17:45:40 UTC


README

AlibabaMQ(Apache RocketMQ) Driver for Laravel Queue

Software License Build Status Coverage Status Quality Score Packagist Version Total Downloads

FOSSA Status

Installation

composer require freyo/laravel-queue-rocketmq

Configure

Laravel 5.5+ uses Package Auto-Discovery, so doesn't require you to manually add the ServiceProvider.

  1. config/app.php:
'providers' => [
  // ...
  Freyo\LaravelQueueRocketMQ\LaravelQueueRocketMQServiceProvider::class,
]
  1. .env:
QUEUE_DRIVER=rocketmq

ROCKETMQ_ACCESS_KEY=your-access-key
ROCKETMQ_ACCESS_ID=your-access-id

ROCKETMQ_ENDPOINT=http://***.mqrest.***.aliyuncs.com
ROCKETMQ_INSTANCE_ID=MQ_INST_***_***
ROCKETMQ_GROUP_ID=GID_***

ROCKETMQ_QUEUE=topic_name # default topic name

ROCKETMQ_USE_MESSAGE_TAG=false # set to true to use message tag
ROCKETMQ_WAIT_SECONDS=0

Usage

Once you completed the configuration you can use Laravel Queue API. If you used other queue drivers you do not need to change anything else. If you do not know how to use Queue API, please refer to the official Laravel documentation: http://laravel.com/docs/queues

Example

Dispatch Jobs

The default connection name is rocketmq

// Without message tag (ROCKETMQ_USE_MESSAGE_TAG=false)
Job::dispatch()->onConnection('connection-name')->onQueue('TopicTestMQ');
// or dispatch((new Job())->onConnection('connection-name')->onQueue('TopicTestMQ'))

// With message tag (ROCKETMQ_USE_MESSAGE_TAG=true)
Job::dispatch()->onConnection('connection-name')->onQueue('TagA');
// or dispatch((new Job())->onConnection('connection-name')->onQueue('TagA'))

Multiple Queues

Configure config/queue.php

'connections' => [
    //...
    'new-connection-name' => [
        'driver' => 'rocketmq',
        'access_key' => 'your-access-key',
        'access_id'  => 'your-access-id',
        'endpoint' => 'http://***.mqrest.***.aliyuncs.com',
        'instance_id' => 'MQ_INST_***_***',
        'group_id' => 'GID_***',
        'queue' => 'your-default-topic-name',
        'use_message_tag' => false,
        'wait_seconds' => 0,
        'plain' => [
            'enable' => false,
            'job' => 'App\Jobs\RocketMQPlainJobHandler@handle',
        ],
    ];
    //...
];

Process Jobs

php artisan queue:work {connection-name} --queue={queue-name}

Plain Mode

Configure .env

ROCKETMQ_PLAIN_ENABLE=true
ROCKETMQ_PLAIN_JOB=App\Jobs\RocketMQPlainJob@handle

Create a job implements PlainPayload interface. The method getPayload must return a sting value.

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Freyo\LaravelQueueRocketMQ\Queue\Contracts\PlainPayload;

class RocketMQPlainJob implements ShouldQueue, PlainPayload
{
    use Dispatchable, InteractsWithQueue, Queueable;
    
    protected $payload;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct($payload)
    {
        $this->payload = $payload;
    }
    
    /**
     * Get the plain payload of the job.
     *
     * @return string
     */
    public function getPayload()
    {
        return $this->payload;
    }
}

Create a plain job handler

<?php

namespace App\Jobs;

use Illuminate\Queue\Jobs\Job;

class RocketMQPlainJobHandler
{
    /**
     * Execute the job.
     * 
     * @param \Illuminate\Queue\Jobs\Job $job
     * @param string $payload
     * 
     * @return void
     */
    public function handle(Job $job, $payload)
    {
        // processing your payload...
        var_dump($payload);
        
        // release back to the queue manually when failed.
        // $job->release();
        
        // delete message when processed.
        if (! $job->isDeletedOrReleased()) {
            $job->delete();
        }        
    }
}

References

License

The MIT License (MIT). Please see License File for more information.

FOSSA Status