general / mq
Requires
- php: >=7.3
This package is auto-updated.
Last update: 2024-12-10 13:02:58 UTC
README
介绍
{以下是 Gitee 平台说明,您可以替换此简介 Gitee 是 OSCHINA 推出的基于 Git 的代码托管平台(同时支持 SVN)。专为开发者提供稳定、高效、安全的云端软件开发协作平台 无论是个人、团队、或是企业,都能够用 Gitee 实现代码托管、项目管理、协作开发。企业项目请看 https://gitee.com/enterprises}
软件架构
软件架构说明
安装教程
安装依赖包
composer require general/mq
创建配置文件
mq.php
, 放在 config 目录下 (对于Lumen), 配置示例
<?php
return [
'default' => env('MQ_CONNECTION', 'kafka'),
'connections' => [
'redis' => [
'driver' => 'redis',
'connection' => env('QUEUE_REDIS_CONNECTION', 'default'),
'queue' => env('QUEUE_NAME','default'),
'retry_after' => 300,
'block_for' => null,
],
'kafka' => [
'driver' => 'kafka',
'metadata.broker.list' => 'xxxxx', // 多个 broker 以逗号分隔
'producer' => [
// 'enable.idempotence' => true, // 保证推送 only-once
],
'consumer' => [
'group.id' => 'myConsumerGroup11',
'auto.offset.reset' => 'earliest',
'enable.partition.eof' => true,
// 'enable.auto.commit' => 0
]
]
],
'failed' => [
'database' => env('DB_CONNECTION', 'mysql'),
'table' => env('MQ_FAILED_TABLE', 'mq_failed_messages'),
],
];
- 注册服务, 对于Lumen
$app->register(\General\Mq\MqServiceProvider::class)
- 自定义对接消费命令, 以下是 Lumen Command 示例
<?php
namespace App\Console\Commands\Mq;
use App\Domain\Generic\MQ\Event\Event; use App\Domain\Generic\MQ\EventSet; use App\Domain\Generic\MQ\Notify\DingNotify; use App\Domain\Generic\MQ\Worker; use Illuminate\Console\Command; use App\Domain\Generic\MQ\Failed\DatabaseFailed;
class ConsumerCommand extends Command {
private $event;
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'mq:consumer_run {queueName}';
/**
* The console command description.
*
* @var string
*/
protected $description = '消费消息';
public function __construct()
{
parent::__construct();
$this->event = app(Event::class);
}
/**
* Execute the console command.
*
*/
public function handle()
{
$this->listenForEvents();
$queueName = $this->argument('queueName');
$handler = app(BusinessHandlerMap::class)->getBusinessHandler($queueName);
// 运行
(new Worker($queueName, $this->event))->setBusinessHandler($handler)->run();
}
protected function listenForEvents()
{
$this->event->attach(EventSet::FAILED,
[new DingNotify(), 'notify'],
[new DatabaseFailed(), 'log'],
);
}
}
5. 配置错误消息表接口: 支持错误消息记录, 目前强关联, 以下是是 SQL 语句
CREATE TABLE mq_failed_messages
(
id
bigint(20) unsigned NOT NULL AUTO_INCREMENT,
driver
varchar(512) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '驱动名称',
topic
varchar(512) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '队列主题名称',
message
text COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '消息体',
exception
longtext COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '异常信息',
failed_at
timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '错误发生时间',
PRIMARY KEY (id
)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='MQ错误记录表';
相应 migration 脚本
<?php
use Illuminate\Support\Facades\Schema; use Illuminate\Database\Schema\Blueprint; use Illuminate\Database\Migrations\Migration;
class CreateTableMqFailedMessages extends Migration {
/**
* Run the migrations.
*
* @return void
*/
public function up()
{
Schema::create('mq_failed_messages', function (Blueprint $table) {
$table->bigIncrements('id');
$table->string('driver', 512)->comment('驱动名称');
$table->string('topic', 512)->comment('队列主题名称');
$table->text('message')->comment('消息体');
$table->longText('exception')->comment('异常信息');
$table->timestamp('failed_at')->useCurrent()->comment('错误发生时间');
});
DB::statement("ALTER TABLE mq_failed_messages COMMENT='MQ错误记录表'");
}
/**
* Reverse the migrations.
*
* @return void
*/
public function down()
{
Schema::dropIfExists('mq_failed_messages');
}
}
#### 使用说明
1. 推送消息
<?php
$messageData = [
'topic' => 'common-kafka-test', // topic(队列名)
'payload' => ['content' => '这是一次测试'], // 消息内容
'extend_data' => ['key' =>'这是一个扩展参数'], // 消息扩展
'max_tries' => 3 // 最大重试此时
];
$message = new Message($messageData);
// 推送消息
app('mq')->push($message);
echo '发送成功';
2. 消费消息, 以下是 Lumen 示例:
php artisan mq:consumer_run {queueName}
#### 参与贡献
1. Fork 本仓库
2. 新建 Feat_xxx 分支
3. 提交代码
4. 新建 Pull Request
#### 特技
1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md
2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com)
3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目
4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目
5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help)
6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)