cv65kr / messenger
CQRS + Event Sourcing for Symfony.
Installs: 5
Dependents: 0
Suggesters: 0
Security: 0
Stars: 7
Watchers: 4
Forks: 1
Open Issues: 1
Type:symfony-bundle
Requires
- php: ^7.2
- beberlei/assert: ^3.2
- doctrine/doctrine-bundle: ^1.8|^2.0
- ramsey/uuid: ^3.9
- ramsey/uuid-doctrine: ^1.6
- symfony/event-dispatcher: ^4.4|^5.0
- symfony/messenger: ^4.4|^5.0
Requires (Dev)
- phpunit/phpunit: ^8.5
- sylius-labs/coding-standard: ^3.1
This package is auto-updated.
Last update: 2025-03-19 05:24:41 UTC
README
This is simple implementation CQRS + Event Sourcing for Symfony.
Installation
composer require cv65kr/messenger
Activate in config/bundles.php
:
Messenger\MessengerBundle::class => ['all' => true],
Update database (migration in future):
bin/console d:s:u --force --dump-sql
To activate async bus, open config/packages/messenger.yaml
and paste:
framework: messenger: transports: events: "%env(MESSENGER_TRANSPORT_DSN)%" routing: 'Messenger\Event\EventInterface': events
How to use
The current example: https://github.com/cv65kr/symfony-4-es-cqrs-boilerplate/tree/feature/es
Aggregate root
class User extends AggregateRoot { /** @var UuidInterface */ private $uuid; /** @var Email */ private $email; /** @var HashedPassword */ private $hashedPassword; /** @var DateTime */ private $createdAt; /** @var DateTime|null */ private $updatedAt; public static function create( UuidInterface $uuid, Credentials $credentials, UniqueEmailSpecificationInterface $uniqueEmailSpecification ): self { $uniqueEmailSpecification->isUnique($credentials->email); $user = new self(); $user->apply(new UserWasCreated($uuid, $credentials, DateTime::now())); return $user; } private function setEmail(Email $email): void { $this->email = $email; } private function setHashedPassword(HashedPassword $hashedPassword): void { $this->hashedPassword = $hashedPassword; } private function setCreatedAt(DateTime $createdAt): void { $this->createdAt = $createdAt; } private function setUpdatedAt(DateTime $updatedAt): void { $this->updatedAt = $updatedAt; } public function createdAt(): string { return $this->createdAt->toString(); } public function updatedAt(): ?string { return isset($this->updatedAt) ? $this->updatedAt->toString() : null; } public function email(): string { return $this->email->toString(); } public function uuid(): string { return $this->uuid->toString(); } public function getAggregateRootId(): AggregateRootId { return AggregateRootId::fromUUID($this->uuid); } protected function applyUserWasCreated(UserWasCreated $event): void { $this->uuid = $event->uuid; $this->setEmail($event->credentials->email); $this->setHashedPassword($event->credentials->password); $this->setCreatedAt($event->createdAt); } }
Event Sourcing Repository
final class UserStore extends EventSourcingRepository implements UserRepositoryInterface { public function store(User $user): void { $this->save($user); } public function get(UuidInterface $uuid): User { /** @var User $user */ $user = $this->load(AggregateRootId::fromUUID($uuid)); return $user; } public function getAggregateRoot(): string { return User::class; } }
Projection
Read model
class UserView implements ReadModelInterface { /** @var UuidInterface */ private $uuid; /** @var Credentials */ private $credentials; /** @var DateTime */ private $createdAt; /** @var DateTime */ private $updatedAt; public static function fromSerializable(EventInterface $event): self { return self::deserialize($event->serialize()); } public static function deserialize(array $data): self { $instance = new self(); $instance->uuid = Uuid::fromString($data['uuid']); $instance->credentials = new Credentials( Email::fromString($data['credentials']['email']), HashedPassword::fromHash($data['credentials']['password'] ?? '') ); $instance->createdAt = DateTime::fromString($data['created_at']); $instance->updatedAt = isset($data['updated_at']) ? DateTime::fromString($data['updated_at']) : null; return $instance; } public function serialize(): array { return [ 'uuid' => $this->getId(), 'credentials' => [ 'email' => (string) $this->credentials->email, ], ]; } public function uuid(): UuidInterface { return $this->uuid; } public function email(): string { return (string) $this->credentials->email; } public function changeEmail(Email $email): void { $this->credentials->email = $email; } public function changeUpdatedAt(DateTime $updatedAt): void { $this->updatedAt = $updatedAt; } public function hashedPassword(): string { return (string) $this->credentials->password; } public function getId(): string { return $this->uuid->toString(); } }
Projector
class UserProjectionFactory extends Projector { /** @var MysqlUserReadModelRepository */ private $repository; public function __construct(MysqlUserReadModelRepository $repository) { $this->repository = $repository; } protected function applyUserWasCreated(UserWasCreated $userWasCreated): void { $userReadModel = UserView::fromSerializable($userWasCreated); $this->repository->add($userReadModel); } protected function applyUserEmailChanged(UserEmailChanged $emailChanged): void { /** @var UserView $userReadModel */ $userReadModel = $this->repository->oneByUuid($emailChanged->uuid); $userReadModel->changeEmail($emailChanged->email); $userReadModel->changeUpdatedAt($emailChanged->updatedAt); $this->repository->apply(); } }
Consume events from queue
class SendEventsToElasticConsumer implements EventHandlerInterface { /** @var EventElasticRepository */ private $eventElasticRepository; public function __construct(EventElasticRepository $eventElasticRepository) { $this->eventElasticRepository = $eventElasticRepository; } public function __invoke(EventInterface $event): void { $this->eventElasticRepository->store($event); } }