freyo / laravel-queue-rocketmq
Queue Adapter for AlibabaMQ(Apache RocketMQ) SDK
Installs: 143
Dependents: 0
Suggesters: 0
Security: 0
Stars: 17
Watchers: 2
Forks: 11
Open Issues: 2
Requires
- php: >=5.6.4
- ext-curl: *
- ext-json: *
- aliyunmq/mq-http-sdk: ^1.0
- guzzlehttp/guzzle: ^6.0.0
- illuminate/queue: 5.2.*|5.3.*|5.4.*|5.5.*|5.6.*|5.7.*|5.8.*
Requires (Dev)
- php: ^7.0
- mockery/mockery: ^1.2.3
- phpunit/phpunit: >=6.0.0
- vlucas/phpdotenv: ^3.3
This package is auto-updated.
Last update: 2025-01-20 17:56:47 UTC
README
AlibabaMQ(Apache RocketMQ) Driver for Laravel Queue
Installation
composer require freyo/laravel-queue-rocketmq
Configure
Laravel 5.5+ uses Package Auto-Discovery, so doesn't require you to manually add the ServiceProvider.
config/app.php
:
'providers' => [ // ... Freyo\LaravelQueueRocketMQ\LaravelQueueRocketMQServiceProvider::class, ]
.env
:
QUEUE_DRIVER=rocketmq
ROCKETMQ_ACCESS_KEY=your-access-key
ROCKETMQ_ACCESS_ID=your-access-id
ROCKETMQ_ENDPOINT=http://***.mqrest.***.aliyuncs.com
ROCKETMQ_INSTANCE_ID=MQ_INST_***_***
ROCKETMQ_GROUP_ID=GID_***
ROCKETMQ_QUEUE=topic_name # default topic name
ROCKETMQ_USE_MESSAGE_TAG=false # set to true to use message tag
ROCKETMQ_WAIT_SECONDS=0
Usage
Once you completed the configuration you can use Laravel Queue API. If you used other queue drivers you do not need to change anything else. If you do not know how to use Queue API, please refer to the official Laravel documentation: http://laravel.com/docs/queues
Example
Dispatch Jobs
The default connection name is rocketmq
// Without message tag (ROCKETMQ_USE_MESSAGE_TAG=false) Job::dispatch()->onConnection('connection-name')->onQueue('TopicTestMQ'); // or dispatch((new Job())->onConnection('connection-name')->onQueue('TopicTestMQ')) // With message tag (ROCKETMQ_USE_MESSAGE_TAG=true) Job::dispatch()->onConnection('connection-name')->onQueue('TagA'); // or dispatch((new Job())->onConnection('connection-name')->onQueue('TagA'))
Multiple Queues
Configure config/queue.php
'connections' => [ //... 'new-connection-name' => [ 'driver' => 'rocketmq', 'access_key' => 'your-access-key', 'access_id' => 'your-access-id', 'endpoint' => 'http://***.mqrest.***.aliyuncs.com', 'instance_id' => 'MQ_INST_***_***', 'group_id' => 'GID_***', 'queue' => 'your-default-topic-name', 'use_message_tag' => false, 'wait_seconds' => 0, 'plain' => [ 'enable' => false, 'job' => 'App\Jobs\RocketMQPlainJobHandler@handle', ], ]; //... ];
Process Jobs
php artisan queue:work {connection-name} --queue={queue-name}
Plain Mode
Configure .env
ROCKETMQ_PLAIN_ENABLE=true
ROCKETMQ_PLAIN_JOB=App\Jobs\RocketMQPlainJob@handle
Create a job implements PlainPayload
interface. The method getPayload
must return a sting value.
<?php namespace App\Jobs; use Illuminate\Bus\Queueable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Freyo\LaravelQueueRocketMQ\Queue\Contracts\PlainPayload; class RocketMQPlainJob implements ShouldQueue, PlainPayload { use Dispatchable, InteractsWithQueue, Queueable; protected $payload; /** * Create a new job instance. * * @return void */ public function __construct($payload) { $this->payload = $payload; } /** * Get the plain payload of the job. * * @return string */ public function getPayload() { return $this->payload; } }
Create a plain job handler
<?php namespace App\Jobs; use Illuminate\Queue\Jobs\Job; class RocketMQPlainJobHandler { /** * Execute the job. * * @param \Illuminate\Queue\Jobs\Job $job * @param string $payload * * @return void */ public function handle(Job $job, $payload) { // processing your payload... var_dump($payload); // release back to the queue manually when failed. // $job->release(); // delete message when processed. if (! $job->isDeletedOrReleased()) { $job->delete(); } } }
References
License
The MIT License (MIT). Please see License File for more information.