grifix/event-store

This package is abandoned and no longer maintained. No replacement package was suggested.

Grifix Event Store

dev-main 2023-06-06 17:45 UTC

This package is auto-updated.

Last update: 2023-07-26 07:04:33 UTC


README

Event store that guarantees at least one event sent and only one event consumption.

Description

Imagine that we need to establish reliable communication between two application contexts: order module and delivery module. We need to create a new delivery when order is completed. The simplest solution is:

class OrderModule{
    public function completeOrder(string $orderId):void {
        $order = $this->orderRepository->getById($orderId);
        $order->complete();
        $this->orderRepository->save($order);
        $this->deliveryModule->createDelivery($orderId);
    }
}

But this solution is not reliable:

class OrderModule{
    public function completeOrder(string $orderId):void {
        $order = $this->orderRepository->getById($orderId);
        $order->complete();
        $this->orderRepository->save($order);
        //server is going down
        $this->deliveryModule->createDelivery($orderId);
    }
}

We can solve the problem with reliability in this way:

class OrderModule{
    public function completeOrder(string $orderId):void {
        $this->entityManager->beginTransaction();
        $order = $this->orderRepository->getById($orderId);
        $order->complete();
        $this->orderRepository->save($order);
        $this->deliveryModule->createDelivery($orderId);
        $this->entityManager->commit();
    }
}

But our order module will be directly coupled with the delivery module, and it will be hard to transfer the delivery module to the separated application (microservice) We can use event bus to decouple these two modules:

class OrderModule{
    public function completeOrder(string $orderId):void {
        $order = $this->orderRepository->getById($orderId);
        $order->complete();
        $this->orderRepository->save($order);
        $this->eventBus->publish(new OrderCompletedEvent($orderId));
    }
}
class DeliveryModuleSubscriber{
    public function onOrderCompleted(OrderCompletedEvent $event){
         $this->deliveryModule->createDelivery($event->orderId);
    }
}

But the reliability problem came back:

class OrderModule{
    public function completeOrder(string $orderId):void {
        $order = $this->orderRepository->getById($orderId);
        $order->complete();
        // server is going down
        $this->eventBus->publish(new OrderCompletedEvent($orderId));
    }
}

Event store package resolves this problem and provides some additional features

How does it work?

Instead of sending events to the events bus immediately, the event store stores them in the database in the same transaction as the application state is changing. Then the background process fetches these events from the database and publishes them to the message broker and marks them as published.

When the event arrives to the subscriber it checks if this event was not already received, checks its sequence number and if the sequence number is greater than the last received event sequence number by one begins the transaction, changes the application state, saves information about the last received event and commits the transaction.

It guarantees at least one event sending and exactly one event receiving. It also guarantees us receiving events in proper order.

Installation

  • composer require grifix/event-store
  • execute migrations from src/Migrations

Integration with Symfony

Usage

First, we should create an EventStore instance.

$dbConnection =  \Doctrine\DBAL\DriverManager::getConnection(
    [
        'dbname' => 'dbname',
        'user' => 'user',
        'password' => 'password',
        'host' => 'host',
        'driver' => 'pdo_pgsql'
    ],
);

$amqpConnection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
    'host',
    'port',
    'user',
    'password'
);

$normalizer = \Grifix\Normalizer\Normalizer::create();

$messageBroker = new \Grifix\EventStore\MessageBroker\RabbitMqMessageBroker(
    $amqpConnection,
    $normalizer
)

$logger = new \Monolog\Logger('logger');

$clock = new \Grifix\Clock\SystemClock();
    
$eventStore = \Grifix\EventStore\EventStore::create(
    $dbConnection, 
    $clock, 
    $messageBroker, 
    $logger,
    $normalizer
)

Then we should register a stream type:

/** @var $eventStore \Grifix\EventStore\EventStore **/
$eventStore->registerStreamType('order', Order::class);

The first argument is the stream type name, and this information will be stored in the database. The second is the stream producer class. We don't want to store the stream producer class name in the database because it can change during the application lifetime so we use it just for mapping the producer class name to the stream type name.

Then we should register events types with our stream produces:

use Grifix\Normalizer\SchemaValidator\Repository\Schema\Schema;

/** @var $eventStore \Grifix\EventStore\EventStore **/
$eventStore->registerEventType(
    'order', 
    'created',
    OrderCreatedEvent::class,
    [
       Schema::create()
          ->withStringProperty('orderId')
    ],   
);

$eventStore->registerEventType(
    'order', 
    'completed',
    OrderCompletedEvent::class,
    [
        Schema::create()
            ->withStringProperty('orderId')
    ],   
);

$eventStore->registerEventType(
    'order', 
    'canceled',
    OrderCanceledEvent::class,
    [
        Schema::create()
            ->withStringProperty('orderId')
    ],   
);

As well as the stream producer name we also don't want to store in the database the class name of the event because it can change after refactoring. So we use the event name to map the event class to the database value. We also describe the event JSON schema to be able to upcast an event if its data structure changes. This feature is described here in detail.

Then we should register subscription types:

/** @var $eventStore \Grifix\EventStore\EventStore **/
$eventStore->registerSubscriptionType(
            'delivery_subscription',
            DeliveryModuleSubscriber::class,
            'order',
            [
                'order.created',
            ],
            [
                'order.finished',
                'order.cancelled',
            ]
        );

We should pass subscription type name, subscriber class, event types that start the subscription background process, and event types that stop subscription background process.

And finally, we should register the subscriber instance:

/** @var $eventStore \Grifix\EventStore\EventStore **/
$eventStore->registerSubscriber(new DeliveryModuleSubscriber());

Now we can send the event:

class OrderModule{

    /** @var  \Doctrine\DBAL\Connection */
    private readonly $connection;
    
    /** @var \Grifix\EventStore\EventStoreInterface */
    private readonly $eventStore;

    public function completeOrder(string $orderId):void {
        $this->connection->beginTransaction()
        $order = $this->orderRepository->getById($orderId);
        $order->complete();
        $this->orderRepository->save($order);
        $this->eventStore->storeEvent(
            new OrderCompletedEvent($orderId),
            Order::class,
            $orderId
        );
        $this->eventStore->flush();
        $this->connection->commit()
    }
}