ryan1068 / yii2-kafka
Yii2 kafka extension
Installs: 7
Dependents: 0
Suggesters: 0
Security: 0
Stars: 1
Watchers: 2
Forks: 0
Open Issues: 1
Type:yii2-extension
Requires
- php: >=7.0.0
- yiisoft/yii2: ~2.0.14
This package is auto-updated.
Last update: 2024-09-23 14:39:06 UTC
README
通过docker安装kafka,zookeeper服务
docker-compose配置:
### ZooKeeper ######################################### zookeeper: build: ./zookeeper volumes: - ${DATA_PATH_HOST}/zookeeper/data:/data - ${DATA_PATH_HOST}/zookeeper/datalog:/datalog ports: - "${ZOOKEEPER_PORT}:2181" networks: - backend ### kafka #################################################### kafka: image: wurstmeister/kafka ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ADVERTISED_HOST_NAME: 192.168.0.1 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.1:9092 KAFKA_MESSAGE_MAX_BYTES: 2000000 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - ./kafka:/kafka - /var/run/docker.sock:/var/run/docker.sock networks: - backend ### kafka-manager #################################################### kafka-manager: image: sheepkiller/kafka-manager ports: - 9020:9000 environment: ZK_HOSTS: zookeeper:2181 networks: - backend
Yii2配置:
Config:
'components' => [ 'kafka' => [ 'class' => yii\kafka\Producer::class, 'as log' => yii\kafka\KafkaBehavior::class, ], ]
Params:
'params' => [ 'kafka' => [ 'broker_list' => '192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092', 'topic' => [ 'auto.commit.interval.ms' => 100, 'offset.store.method' => 'broker', 'auto.offset.reset' => 'earliest', ], ], ]
使用示例:
// 消费者使用示例 <?php namespace console\controllers; use common\services\NotificationService; /** * 系统消息消费队列 * Class NotificationController * @package console\controllers */ class NotificationController extends ConsumerController { public $notificationService; /** * DemoController constructor. * @param $id * @param $module * @param NotificationService $notificationService * @param array $config */ public function __construct($id, $module, NotificationService $notificationService, $config = []) { $this->notificationService = $notificationService; parent::__construct($id, $module, $config); } /** * @throws \yii\base\InvalidConfigException */ public function init() { parent::init(); $this->consumer->attachBehavior('kafka', [ 'class' => KafkaBehavior::class, 'tableName' => 'kafka_queue_log' ]); } /** * @return string 主题名称 */ public function getTopicName() { return 'notification'; } /** * @return string 分组id */ public function getGroupId() { return 'notificationGroup'; } /** * @param $payload * @throws \yii\base\UserException */ public function consume($payload) { $this->notificationService->sendMessage($payload['id'], $payload['scene']); } } // 生产者使用示例 \Yii::$app->kafka->produce('notification', ['id' => 1, 'scene' => 0]);