littlesqx / aint-queue
A async-queue library built on top of swoole, flexable multi-consumer, coroutine supported.
Installs: 284
Dependents: 0
Suggesters: 0
Security: 0
Stars: 173
Watchers: 5
Forks: 32
Open Issues: 7
Requires
- php: >=7.2
- ext-json: *
- ext-swoole: >=4.4
- illuminate/pipeline: ^6.16
- jeremeamia/superclosure: ^2.4
- monolog/monolog: ^2.0
- predis/predis: ^1.1
- psr/log: ^1.1
- symfony/console: ^4.4 | ^5.0
Requires (Dev)
- phpunit/phpunit: ^8.3
- swoole/ide-helper: @dev
Suggests
- php: >=7.3
- dev-master
- v1.2.2
- v1.2.1
- v1.2.0
- v1.1.4
- v1.1.3
- v1.1.2
- v1.1.1
- v1.1.0
- v1.0.2
- v1.0.1
- v1.0.0
- v0.9.2
- v0.9.1
- v0.9.0
- v0.8.2
- v0.8.1
- v0.8.0
- v0.7.2
- v0.7.1
- v0.7.0
- v0.6.0
- dev-optimize/failed-job-storage
- dev-pref/connection
- dev-fix/coroutine-exception-catching
- dev-feature/dashboard
- dev-feature/job-middleware
- dev-master-dev
- dev-pref/logger
- dev-pref/docs
This package is auto-updated.
Last update: 2023-06-04 08:41:35 UTC
README
An async-queue library built on top of swoole, flexible multi-consumer, coroutine supported. 中文说明
Feature
- Default Redis driver
- Delayed job
- The custom job retries and times
- Custom failed callback
- Job middleware
- Queue snapshot event
- Concurrent processing, flexible multi-worker
- Worker coroutine support
- Beautiful dashboard
Required
- PHP 7.2+
- Swoole 4.4+
- Redis 3.2+ (redis driver)
Install
composer require littlesqx/aint-queue -vvv
Usage
Config
By default, aint-queue will require config/aint-queue.php
as default config. If not exist, /vendor/littlesqx/aint-queue/src/Config/config.php
will be the final config file.
<?php use Littlesqx\AintQueue\Driver\Redis\Queue as RedisQueue; use Littlesqx\AintQueue\Logger\DefaultLogger; return [ // channel_name => [...config] 'default' => [ 'driver' => [ 'class' => RedisQueue::class, 'connection' => [ 'host' => '127.0.0.1', 'port' => 6379, 'database' => '0', // 'password' => 'password', ], ], 'logger' => [ 'class' => DefaultLogger::class, 'options' => [ 'level' => \Monolog\Logger::DEBUG, ], ], 'pid_path' => '/var/run/aint-queue', 'consumer' => [ 'sleep_seconds' => 1, 'memory_limit' => 96, 'dynamic_mode' => true, 'capacity' => 6, 'flex_interval' => 5 * 60, 'min_worker_number' => 5, 'max_worker_number' => 30, 'max_handle_number' => 0, ], 'job_snapshot' => [ 'interval' => 5 * 60, 'handler' => [], ], ], ];
All the options:
name | type | comment | default |
---|---|---|---|
channel | string | The queue unit, every queue pusher and queue listener work for. Multiple channel supported, use --channel option. |
default |
driver.class | string | Queue driver class, implements QueueInterface. | Redis |
driver.connection | map | Queue driver's config. | |
pid_path | string | The path of listener master pid file. Noted that permission required. | /var/run/aint-queue |
consumer.sleep_seconds | int | Sleep seconds after every empty pop from queue. | 1 |
consumer.memory_limit | int | Mb. Worker will reload when its memory usage exceeds the limit. | 96 |
consumer.dynamic_mode | bool | Determine whether worker's number flex dynamically. | true |
consumer.capacity | int | The capacity that every consumer can handle in health and in short time, it affects the worker number when dynamic-mode. | 5 |
consumer.flex_interval | int | every flex_interval seconds monitor process try to flex the worker number. Only work when consumer.dynamic_mode = true. |
5 |
consumer.min_worker_number | int | Min expansion. | 5 |
consumer.max_worker_number | int | Max expansion. | 30 |
consumer.max_handle_number | int | Current consumer's max job-handle time. 0 means no limit. |
0 |
job_snapshot | map | Every interval seconds, handles will be executed. Handle must implements JobSnapshotterInterface. |
Push job
You can use it in your project running via fpm/cli.
<?php use Littlesqx\AintQueue\Driver\DriverFactory; $queue = DriverFactory::make($channel, $options); // push a job $queue->push(function () { echo "Hello aint-queue\n"; }); // push a delay job $closureJob = function () { echo "Hello aint-queue delayed\n"; }; $queue->push($closureJob, 5); // And class job are allowed. // 1. Create a class which implements JobInterface, you can see the example in `/example`. // 2. Noted that job pushed should be un-serialize by queue-listener, // it means queue-pusher and queue-listener are required to in the same project. // 3. You can see more examples in `example` directory.
Manage Queue
We recommend that using Supervisor
to monitor and control the listener.
vendor/bin/aint-queue
AintQueue Console Tool Usage: command [options] [arguments] Options: -h, --help Display this help message -q, --quiet Do not output any message -V, --version Display this application version --ansi Force ANSI output --no-ansi Disable ANSI output -n, --no-interaction Do not ask any interactive question -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug Available commands: help Displays help for a command list Lists commands queue queue:clear Clear the queue. queue:dashboard Start http server for dashboard. queue:reload-failed Reload all the failed jobs onto the waiting queue. queue:status Get the execute status of specific queue. worker worker:listen Listen the queue. worker:reload Reload worker for the queue. worker:run Run the specific job. worker:stop Stop listening the queue.
Testing
composer test
Contributing
You can contribute in one of three ways:
- File bug reports using the issue tracker.
- Answer questions or fix bugs on the issue tracker.
- Contribute new features or update the wiki.
The code contribution process is not very formal. You just need to make sure that you follow the PSR-2, PSR-12 coding guidelines. Any new code contributions must be accompanied by unit tests where applicable.
License
MIT