huangbin2018 / dbmq
DB实现简单的消息队列
v1.0.1
2019-12-20 09:57 UTC
Requires
- php: >=7.0
- ext-pdo: *
This package is auto-updated.
Last update: 2024-12-23 11:49:27 UTC
README
利用 MySQL 实现简单的 DB 消息队列。
安装
Package is available on Packagist, 使用 Composer Composer.
composer require huangbin2018/dbmq
依赖
- PHP 7.0+
- PDO Extension
使用
消息发布
首先创建tag,消费者key,再绑定两者的关系; 然后就可以发布消息了。
// Mysql 连接配置 $dbConfig = [ 'user' => 'root', 'password' => '', 'host' => '127.0.0.1', 'port' => '3306', 'database' => 'test', ]; $channel = 'test_channel'; $tag = 'test_tag'; $consumerArr = [ 'consumer_1', 'consumer_2', 'consumer_3', 'consumer_4', 'consumer_5', ] ; // 实例化消息生产者 $publisherObj = new Publisher($channel, $dbConfig); // 定义tag $publisherObj->declareTag($tag, TagType::TOPIC, '测试tag'); // 定义消费者 foreach ($consumerArr as $consumerKey) { // 定义消费者 $publisherObj->declareConsumer($consumerKey, '', '测试consumerKey'); // 绑定tag $publisherObj->bindTag($consumerKey, $tag); } // 发布消息 $key = 'uid_1'; $data = [ 'user_id' => 1, 'user_name' => 'huangbin', ]; $message = new Message($data); $body = $message->serialize(); $rs = $publisherObj->send($tag, $key, $body);
消息消费
use DBMQ\Message\Message; use DBMQ\Consumer\Consumer; use DBMQ\Message\Response; // 数据库连接参数 $dbConfig = [ 'user' => 'root', 'password' => '', 'host' => '127.0.0.1', 'port' => '3306', 'database' => 'test', ]; $channel = 'test_channel'; $tag = 'test_tag'; $consumerKey = 'consumer_1'; $processSize = $argv[1] ?? 0; // 消费者进程数 $processIndex = $argv[2] ?? 0; // 进程索引 $consumerObj = new Consumer($consumerKey, $dbConfig, '', $processSize, $processIndex); // 定义消息消费处理函数 $consumerObj->run(function (Message $message) { $timestamp = $message->getTimestamp(); // 消息体 $body = $message->getMessage(); print_r($body); // 这里是逻辑处理... try { $ack = true; if ($ack == false) { $msg = '测试失败啦'; return Response::isFail($msg); } else { $msg = '测试成功啦'; return Response::isSuccess($msg); } } catch (\Exception $e) { // 异常 return Response::isException($e->getMessage()); } });
只有有绑定了消费者的 tag 才能发布消息,否则消息不会被保存!!!