hoangdev / stream-event-driven
Requires
- php: ^8.1
- enqueue/rdkafka: *
This package is auto-updated.
Last update: 2025-03-29 01:09:30 UTC
README
- PHP ^8.1
To install this package in your project
- Copy this and paste to repository array in composer.json
"repositories": [ { "type": "vcs", "url": "https://github.com/hoangnm97/stream-event" }, ... ]
- Import this provider to config/app.php
'providers' => ServiceProvider::defaultProviders()->merge([ \Hoangdev\StreamEventDriven\Providers\EventDrivenProvider::class ... ])->toArray(),
- run command
composer require hoangdev/stream-event-driven:dev-main
To the package (note: remember to push newest package) - Run
php artisan vendor:publish --provider="Hoangdev\StreamEventDriven\Providers\EventDrivenProvider"
- Add this env variable: example:
STREAM_PLATFORM=kafka KAFKA_GROUP_ID=orders KAFKA_SYNC_TOPIC=orders KAFKA_SYNC_USERNAME= KAFKA_SYNC_PASSWORD= KAFKA_BROKERS=kafka:9092
to publish migration and resource
- Execute to kafka container and run the commands below:
- To create new topic:
kafka-topics.sh --bootstrap-server localhost:9092 --topic test --create --partitions 3 --replication-factor 1
- To send a message to topic:
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
- To get message from topic:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
- To create new topic:
Usage
-
Implement
Hoangdev\StreamEventDriven\Contracts\StreamableReceivedInterface
into the class you need to handle received with kafka and useHoangdev\StreamEventDriven\Traits\ReceivedStreamable
as a trait. -
Then you can use/overwrite the needed method in you class. After all, you can call method
receiveStream()
to handle your messages -
You need to have a column with value is timestamp in database in the updated/create object for the compare like the flow below
-
Implement
Hoangdev\StreamEventDriven\Contracts\StreamablePushInterface
into the class you need to handle received with kafka and useHoangdev\StreamEventDriven\Traits\PushStreamable
as a trait. -
Then you can use/overwrite the needed method in you class. After all, you can call method
pushStream()
to handle your messages -
Example:
Need to follow this flow to make sure that you are going the right way:
-
Custom object + action:
- For add custom action, need to over-write trait and add suitable boot and over-write followed interface if needed