micromus/kafka-bus-messages

Messages Builder for Kafka Bus

Maintainers

Package info

github.com/micromus/kafka-bus-messages

pkg:composer/micromus/kafka-bus-messages

Fund package maintenance!

Micromus

Statistics

Installs: 337

Dependents: 1

Suggesters: 0

Stars: 0

Open Issues: 0

v1.0.0 2026-05-26 17:26 UTC

README

Latest Version on Packagist GitHub Tests Action Status GitHub Code Style GitHub PHPStan

A PHP library for structuring, serializing, and deserializing Kafka messages. It provides typed message payloads with automatic casting, domain event messages, and factory helpers for both production and testing.

Installation

composer require micromus/kafka-bus-messages

Core Concepts

  • Payload — a flexible key-value container that supports typed attribute casting (dates, integers, floats, nested payloads, collections).
  • JsonMessage — a Payload that serializes itself directly to a JSON Kafka message.
  • DomainMessage — a structured message that wraps an attributes object with a domain event type (create, update, delete) and a list of dirty (changed) fields.
  • Casters — classes that convert raw values on read (cast) and convert them back for serialization (rollback).

Usage

1. Defining a Domain Message

Extend Payload and define casters in definitionCasters() to type your fields automatically.

use Micromus\KafkaBusMessages\Data\Payload;
use Micromus\KafkaBusMessages\Data\Casters\PayloadCaster;
use Micromus\KafkaBusMessages\Data\Casters\CollectionCaster;
use Micromus\KafkaBusMessages\Data\Casters\IntegerCaster;
 
/**
 * @property int            $id
 * @property string         $name
 * @property CategoryPayload    $category
 * @property AttributePayload[] $attributes
 */
class ProductMessage extends \Micromus\KafkaBusMessages\DomainMessage
{
    public function getKey(): ?string
    {
        return (string) $this->id;
    }
 
    protected function definitionCasters(): array
    {
        return [
            'id'         => new IntegerCaster(),
            'category'   => new PayloadCaster(CategoryPayload::class),
            'attributes' => new CollectionCaster(new PayloadCaster(AttributePayload::class)),
        ];
    }
}
 
// Create from a raw array (e.g. decoded JSON)
$product = ProductMessage::from([
    'id'   => '42',
    'name' => 'Laptop',
    'category' => ['id' => 1, 'name' => 'Electronics'],
    'attributes' => [
        ['id' => 10, 'name' => 'Color', 'value' => 'Silver'],
    ],
]);
 
echo $product->id;               // int(42)
echo $product->category->name;   // string("Electronics")
echo $product->attributes[0]->value; // string("Silver")

2. Available Casters

Caster Description
IntegerCaster Casts value to int
FloatCaster Casts value to float
DateTimeCaster Parses/formats DateTimeInterface with a configurable format
PayloadCaster Hydrates a nested Payload subclass from an array
CollectionCaster Applies another caster to each item in an array
NullableCaster Wraps any caster to allow null values
use Micromus\KafkaBusMessages\Data\Casters\DateTimeCaster;
use Micromus\KafkaBusMessages\Data\Casters\NullableCaster;
use Micromus\KafkaBusMessages\Data\Casters\FloatCaster;
 
protected function definitionCasters(): array
{
    return [
        'published_at' => new DateTimeCaster('Y-m-d\TH:i:s.uP'), // default format
        'deleted_at'   => new NullableCaster(new DateTimeCaster()),
        'price'        => new FloatCaster(),
    ];
}

3. Sending a JSON Message

JsonMessage extends Payload and implements ProducerMessageInterface, so it can be published directly to Kafka.

use Micromus\KafkaBusMessages\JsonMessage;
 
$message = new JsonMessage([
    'order_id' => 123,
    'status'   => 'shipped',
]);
 
// Produces: {"order_id":123,"status":"shipped"}
$message->toPayload();

4. Sending a Domain Message

DomainMessage wraps an attributes object with a domain event type and a list of changed fields.

use Micromus\KafkaBusMessages\DomainMessage;
use Micromus\KafkaBusMessages\DomainEventEnum;
 
$attributes = [
    'id'   => 42,
    'name' => 'Laptop Pro',
    'category'   => ['id' => 1, 'name' => 'Electronics'],
    'attributes' => [],
];
 
// create / update / delete
$message = new ProductMessage(
    attributes: $attributes,
    event: DomainEventEnum::Update,
    dirty: ['name'],
);
 
// Produces JSON:
// {
//   "event": "update",
//   "attributes": { "id": 42, "name": "Laptop Pro", ... },
//   "dirty": ["name"]
// }
$message->toPayload();
 
// The Kafka partition key comes from getKey() on the attributes object
$message->getKey(); // "42"


// Send to bus
$bus->publish($message);

5. Consuming a Domain Message

Use DomainMessageFactory to deserialize an incoming Kafka message into a typed DomainMessage.

use Micromus\KafkaBusMessages\Factories\DomainMessageFactory;

class ProductConsumer
{
    #[MessageFactory(new DomainMessageFactory(ProductMessage::class))]
    public function __invoke(ProductMessage $message)
    {
        echo $message->event->value;          // "update"
        echo $message->name;      // "Laptop Pro"
    }
}

6. Testing Helpers

The library ships with factory base classes to generate realistic test data via Faker.

Define a test factory:

use Micromus\KafkaBusMessages\Testing\DomainMessageTestFactory;
 
/**
 * @extends DomainMessageTestFactory<ProductPayload>
 */
final class ProductTestFactory extends DomainMessageTestFactory
{
    protected string $messageClass = ProductMessage::class;
 
    public function definition(): array
    {
        return [
            'id'         => $this->faker->numberBetween(1, 9999),
            'name'       => $this->faker->sentence(),
            'category'   => CategoryPayloadTestFactory::new()->makeArray(),
            'attributes' => [
                AttributePayloadTestFactory::new()->makeArray(),
            ],
        ];
    }
}

Use it in tests:

// Build a typed DomainMessage with default fake data
$message = ProductMessageTestFactory::new()->message();
 
// Override specific fields
$message = ProductMessageTestFactory::new()
    ->withEvent(DomainEventEnum::Delete)
    ->withDirty(['name', 'category'])
    ->message(['name' => 'Laptop Pro']);
 
// Build a raw RdKafka\Message for lower-level consumer tests
$rdKafkaMessage = ProductMessageTestFactory::new()->make();
 
// Build just the raw array
$array = ProductTestFactory::new()->makeArray();

For payload-only factories, extend PayloadTestFactory:

use Micromus\KafkaBusMessages\Testing\PayloadTestFactory;
 
/**
 * @extends PayloadTestFactory<CategoryPayload>
 */
final class CategoryPayloadTestFactory extends PayloadTestFactory
{
    protected string $payloadClass = CategoryPayload::class;
 
    public function definition(): array
    {
        return [
            'id'   => $this->faker->numberBetween(1, 9999),
            'name' => $this->faker->word(),
        ];
    }
}
 
$category = CategoryPayloadTestFactory::new()->payload();

Testing

composer test

Changelog

Please see CHANGELOG for more information on what has changed recently.

Contributing

Please see CONTRIBUTING for details.

Security Vulnerabilities

Please review our security policy on how to report security vulnerabilities.

Credits

License

The MIT License (MIT). Please see License File for more information.