dev-master 2023-02-10 09:08 UTC

This package is auto-updated.

Last update: 2024-12-10 13:02:58 UTC


README

介绍

{以下是 Gitee 平台说明,您可以替换此简介 Gitee 是 OSCHINA 推出的基于 Git 的代码托管平台(同时支持 SVN)。专为开发者提供稳定、高效、安全的云端软件开发协作平台 无论是个人、团队、或是企业,都能够用 Gitee 实现代码托管、项目管理、协作开发。企业项目请看 https://gitee.com/enterprises}

软件架构

软件架构说明

安装教程

  1. 安装依赖包

    composer require general/mq
    
  2. 创建配置文件 mq.php, 放在 config 目录下 (对于Lumen), 配置示例

<?php

return [
    'default' => env('MQ_CONNECTION', 'kafka'),

    'connections' => [
        'redis' => [
            'driver' => 'redis',
            'connection' => env('QUEUE_REDIS_CONNECTION', 'default'),
            'queue' => env('QUEUE_NAME','default'),
            'retry_after' => 300,
            'block_for' => null,
        ],

        'kafka' => [
            'driver' => 'kafka',
            'metadata.broker.list' => 'xxxxx', // 多个 broker 以逗号分隔
            'producer' => [
                // 'enable.idempotence' => true, // 保证推送 only-once
            ],
            'consumer' => [
                'group.id' => 'myConsumerGroup11',
                'auto.offset.reset' => 'earliest',
                'enable.partition.eof' => true,
//                'enable.auto.commit' => 0
            ]
        ]
    ],

    'failed' => [
        'database' => env('DB_CONNECTION', 'mysql'),
        'table' => env('MQ_FAILED_TABLE', 'mq_failed_messages'),
    ],
];
  1. 注册服务, 对于Lumen
$app->register(\General\Mq\MqServiceProvider::class)
  1. 自定义对接消费命令, 以下是 Lumen Command 示例
    <?php
    

namespace App\Console\Commands\Mq;

use App\Domain\Generic\MQ\Event\Event; use App\Domain\Generic\MQ\EventSet; use App\Domain\Generic\MQ\Notify\DingNotify; use App\Domain\Generic\MQ\Worker; use Illuminate\Console\Command; use App\Domain\Generic\MQ\Failed\DatabaseFailed;

class ConsumerCommand extends Command {

private $event;

/**
 * The name and signature of the console command.
 *
 * @var string
 */
protected $signature = 'mq:consumer_run {queueName}';

/**
 * The console command description.
 *
 * @var string
 */
protected $description = '消费消息';

public function __construct()
{
    parent::__construct();
    $this->event = app(Event::class);
}

/**
 * Execute the console command.
 *
 */
public function handle()
{
    $this->listenForEvents();
    $queueName = $this->argument('queueName');
    $handler = app(BusinessHandlerMap::class)->getBusinessHandler($queueName);

    // 运行
    (new Worker($queueName, $this->event))->setBusinessHandler($handler)->run();
}

protected function listenForEvents()
{
    $this->event->attach(EventSet::FAILED,
        [new DingNotify(), 'notify'],
        [new DatabaseFailed(), 'log'],
    );
}

}


5. 配置错误消息表接口: 支持错误消息记录, 目前强关联, 以下是是 SQL 语句

CREATE TABLE mq_failed_messages ( id bigint(20) unsigned NOT NULL AUTO_INCREMENT, driver varchar(512) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '驱动名称', topic varchar(512) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '队列主题名称', message text COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '消息体', exception longtext COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '异常信息', failed_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '错误发生时间', PRIMARY KEY (id) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='MQ错误记录表';


相应 migration 脚本

<?php

use Illuminate\Support\Facades\Schema; use Illuminate\Database\Schema\Blueprint; use Illuminate\Database\Migrations\Migration;

class CreateTableMqFailedMessages extends Migration {

/**
 * Run the migrations.
 *
 * @return void
 */
public function up()
{
    Schema::create('mq_failed_messages', function (Blueprint $table) {
        $table->bigIncrements('id');
        $table->string('driver', 512)->comment('驱动名称');
        $table->string('topic', 512)->comment('队列主题名称');
        $table->text('message')->comment('消息体');
        $table->longText('exception')->comment('异常信息');
        $table->timestamp('failed_at')->useCurrent()->comment('错误发生时间');
    });

    DB::statement("ALTER TABLE mq_failed_messages COMMENT='MQ错误记录表'");
}

/**
 * Reverse the migrations.
 *
 * @return void
 */
public function down()
{
    Schema::dropIfExists('mq_failed_messages');
}

}


#### 使用说明

1.  推送消息

<?php

$messageData = [
    'topic' => 'common-kafka-test', // topic(队列名)
    'payload' => ['content' => '这是一次测试'], // 消息内容
    'extend_data' => ['key' =>'这是一个扩展参数'], // 消息扩展
    'max_tries' => 3 // 最大重试此时
];
$message = new Message($messageData);

// 推送消息
app('mq')->push($message);
echo '发送成功';

2.  消费消息, 以下是 Lumen 示例:

php artisan mq:consumer_run {queueName}


#### 参与贡献

1.  Fork 本仓库
2.  新建 Feat_xxx 分支
3.  提交代码
4.  新建 Pull Request


#### 特技

1.  使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md
2.  Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com)
3.  你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目
4.  [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目
5.  Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help)
6.  Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)