koco / messenger-kafka
Symfony Messenger Kafka Transport
Installs: 765 447
Dependents: 1
Suggesters: 3
Security: 0
Stars: 87
Watchers: 5
Forks: 35
Open Issues: 17
Type:symfony-bundle
Requires
- php: ^7.4|8.*
- ext-json: *
- psr/http-client: ^1.0
- psr/http-factory: ^1.0
- psr/http-message: ^1.0
- psr/log: ^1.0.1||^2.0 ||^3.0
- symfony/config: ^3.0||^4.0||^5.0||^6.0||7.*
- symfony/dependency-injection: ^3.4.26||^4.1.12|^5.0||^6.0||7.*
- symfony/http-kernel: ^3.0||^4.0||^5.0||^6.0||7.*
- symfony/messenger: ^4.4||^5.0||^6.0||7.*
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.6
- kwn/php-rdkafka-stubs: ^2.1
- nyholm/psr7: ^1.4
- phpstan/phpstan: ^1.4.6
- symfony/framework-bundle: ^5.0||^6.0
- symfony/phpunit-bridge: ^5.0||^6.0
- symfony/property-access: ^5.0||^6.0
- symfony/serializer: ^5.0||^6.0
Suggests
- ext-rdkafka: ^4.0; Needed to support Kafka connectivity
- koco/avro-regy: Confluent Schema Registry integration
This package is auto-updated.
Last update: 2024-12-30 15:48:11 UTC
README
This bundle aims to provide a simple Kafka transport for Symfony Messenger. Kafka REST Proxy support coming soon.
Installation
Applications that use Symfony Flex
Open a command console, enter your project directory and execute:
$ composer require koco/messenger-kafka
Applications that don't use Symfony Flex
After adding the composer requirement, enable the bundle by adding it to the list of registered bundles
in the config/bundles.php
file of your project:
return [ // ... Koco\Kafka\KocoKafkaBundle::class => ['all' => true], ];
Configuration
DSN
Specify a DSN starting with either kafka://
or kafka+ssl://
. Multiple brokers are separated by ,
.
kafka://my-local-kafka:9092
kafka+ssl://my-staging-kafka:9093
kafka+ssl://prod-kafka-01:9093,kafka+ssl://prod-kafka-02:9093,kafka+ssl://prod-kafka-03:9093
Example
The configuration options for kafka_conf
and topic_conf
can be found here.
It is highly recommended to set enable.auto.offset.store
to false
for consumers. Otherwise, every message will be acknowledged, regardless of any error thrown by the message handlers.
framework: messenger: transports: producer: dsn: '%env(KAFKA_URL)%' # serializer: App\Infrastructure\Messenger\MySerializer options: flushTimeout: 10000 flushRetries: 5 topic: name: 'events' kafka_conf: security.protocol: 'sasl_ssl' ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem' sasl.username: '%env(KAFKA_SASL_USERNAME)%' sasl.password: '%env(KAFKA_SASL_PASSWORD)%' sasl.mechanisms: 'SCRAM-SHA-256' consumer: dsn: '%env(KAFKA_URL)%' # serializer: App\Infrastructure\Messenger\MySerializer options: commitAsync: true receiveTimeout: 10000 topic: name: "events" kafka_conf: enable.auto.offset.store: 'false' group.id: 'my-group-id' # should be unique per consumer security.protocol: 'sasl_ssl' ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem' sasl.username: '%env(KAFKA_SASL_USERNAME)%' sasl.password: '%env(KAFKA_SASL_PASSWORD)%' sasl.mechanisms: 'SCRAM-SHA-256' max.poll.interval.ms: '45000' topic_conf: auto.offset.reset: 'earliest'
Serializer
You will most likely want to implement your own Serializer. Please see: https://symfony.com/doc/current/messenger.html#serializing-messages
The fields key
, headers
, and body
are available in the decode()
and encode()
methods.
<?php namespace App\Infrastructure\Messenger; use App\Catalogue\Domain\Model\Event\ProductCreated; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; final class MySerializer implements SerializerInterface { public function decode(array $encodedEnvelope): Envelope { $record = json_decode($encodedEnvelope['body'], true); return new Envelope(new ProductCreated( $record['id'], $record['name'], $record['description'], )); } public function encode(Envelope $envelope): array { /** @var ProductCreated $event */ $event = $envelope->getMessage(); return [ 'key' => $event->getId(), 'headers' => [], 'body' => json_encode([ 'id' => $event->getId(), 'name' => $event->getName(), 'description' => $event->getDescription(), ]), ]; } }
How do I work with Avro?
Same as with the basic example above, you need to build your own serializer.
Within the decode()
and encode()
you can make use of flix-tech/avro-serde-php.
What about the Confluent Schema Registry?
To connect with Schema Registry and control various settings, you can use this bundle:
$ composer require koco/avro-regy
And configure it to match your setup:
avro_regy: base_uri: '%env(SCHEMA_REGISTRY_URL)%' file_naming_strategy: subject options: register_missing_schemas: true register_missing_subjects: true serializers: catalogue: schema_dir: '%kernel.project_dir%/src/Catalogue/Domain/Model/Event/Avro/' orders: schema_dir: '%kernel.project_dir%/src/Orders/Domain/Model/Event/Avro/' file_naming_strategy: qualified_name options: register_missing_schemas: false register_missing_subjects: false
Please see https://github.com/KonstantinCodes/avro-regy for the full documentation.