ryan1068/yii2-kafka

There is no license information available for the latest version (dev-master) of this package.

Yii2 kafka extension

Installs: 7

Dependents: 0

Suggesters: 0

Security: 0

Stars: 1

Watchers: 2

Forks: 0

Open Issues: 1

Type:yii2-extension

dev-master 2021-04-23 07:10 UTC

This package is auto-updated.

Last update: 2024-09-23 14:39:06 UTC


README

通过docker安装kafka,zookeeper服务

docker-compose配置:

### ZooKeeper #########################################
    zookeeper:
      build: ./zookeeper
      volumes:
        - ${DATA_PATH_HOST}/zookeeper/data:/data
        - ${DATA_PATH_HOST}/zookeeper/datalog:/datalog
      ports:
        - "${ZOOKEEPER_PORT}:2181"
      networks:
        - backend

### kafka ####################################################
    kafka:
      image: wurstmeister/kafka
      ports:
        - "9092:9092"
      environment:
        KAFKA_BROKER_ID: 1
        KAFKA_ADVERTISED_HOST_NAME: 192.168.0.1
        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.1:9092
        KAFKA_MESSAGE_MAX_BYTES: 2000000
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      volumes:
        - ./kafka:/kafka
        - /var/run/docker.sock:/var/run/docker.sock
      networks:
        - backend
        
### kafka-manager ####################################################
    kafka-manager:
      image: sheepkiller/kafka-manager
      ports:
        - 9020:9000
      environment:
        ZK_HOSTS: zookeeper:2181
      networks:
        - backend

Yii2配置:

Config:

'components' => [
    'kafka' => [
        'class' => yii\kafka\Producer::class,
        'as log' => yii\kafka\KafkaBehavior::class,
    ],
]

Params:

'params' => [
    'kafka' => [
        'broker_list' => '192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092',
        'topic' => [
            'auto.commit.interval.ms' => 100,
            'offset.store.method' => 'broker',
            'auto.offset.reset' => 'earliest',
        ],
    ],
]

使用示例:

// 消费者使用示例
<?php
namespace console\controllers;

use common\services\NotificationService;

/**
 * 系统消息消费队列
 * Class NotificationController
 * @package console\controllers
 */
class NotificationController extends ConsumerController
{
    public $notificationService;

    /**
     * DemoController constructor.
     * @param $id
     * @param $module
     * @param NotificationService $notificationService
     * @param array $config
     */
    public function __construct($id, $module, NotificationService $notificationService, $config = [])
    {
        $this->notificationService = $notificationService;
        parent::__construct($id, $module, $config);
    }

    /**
     * @throws \yii\base\InvalidConfigException
     */
    public function init()
    {
        parent::init();
        $this->consumer->attachBehavior('kafka', [
            'class' => KafkaBehavior::class,
            'tableName' => 'kafka_queue_log'
        ]);
    }

    /**
     * @return string 主题名称
     */
    public function getTopicName()
    {
        return 'notification';
    }

    /**
     * @return string 分组id
     */
    public function getGroupId()
    {
        return 'notificationGroup';
    }

    /**
     * @param $payload
     * @throws \yii\base\UserException
     */
    public function consume($payload)
    {
        $this->notificationService->sendMessage($payload['id'], $payload['scene']);
    }
}

// 生产者使用示例
\Yii::$app->kafka->produce('notification', ['id' => 1, 'scene' => 0]);