rezilio / collector-bundle
Symfony bundle for collecting user behaviour events into RabbitMQ → S3 → Athena
Package info
bitbucket.org/productionv3/rezilio-collector-bundle
Type:symfony-bundle
pkg:composer/rezilio/collector-bundle
Requires
- php: ^7.4|^8.0
- symfony/framework-bundle: ^4.4|^5.0|^6.0|^7.0
- symfony/http-client: ^4.4|^5.0|^6.0|^7.0
- symfony/http-foundation: ^4.4|^5.0|^6.0|^7.0
- symfony/messenger: ^4.4|^5.0|^6.0|^7.0
README
Symfony bundle that collects user behaviour events from any API service and dispatches them to RabbitMQ → S3 → Athena.
Installation
composer require rezilio/collector-bundle
Register the bundle manually in config/bundles.php — Symfony Flex does not auto-register third-party bundles:
// config/bundles.php
return [
Rezilio\CollectorBundle\CollectorBundle::class => ['all' => true],
];
1. Add the collection_queue transport to your messenger.yaml
Do NOT import the bundle's messenger.yaml. Instead add the transport and routing
directly to your project's config/packages/messenger.yaml (and any environment
overrides such as config/packages/dev/messenger.yaml):
# config/packages/messenger.yaml (and each environment override)
framework:
messenger:
transports:
# ... your existing transports ...
collection_queue: '%env(MESSENGER_TRANSPORT_DSN)%%rabbitmq_prefix%-CollectionQueue'
routing:
# ... your existing routing ...
'Rezilio\CollectorBundle\Message\CollectionResponse': collection_queue
Important — environment overrides: If your project has per-environment messenger files (e.g.
config/packages/dev/messenger.yaml) they override the main file entirely. Make surecollection_queuetransport and routing are present in every environment file that exists, not just the main one.
Important — RabbitMQ bindings: If you previously had a
messagesdefault queue, make sure it is not bound to theCollectionQueueexchange. Stale bindings will cause messages to land in both queues. Delete the binding via the RabbitMQ management UI or API if needed.
2. Create a CollectableUser adapter
Do NOT modify your existing User entity. Create a thin adapter class instead:
// src/Service/CollectableUser.php
namespace App\Service;
use App\Entity\User;
use Rezilio\CollectorBundle\Contract\CollectableUserInterface;
class CollectableUser implements CollectableUserInterface
{
private User $user;
public function __construct(User $user)
{
$this->user = $user;
}
public function getUsername(): string { return $this->user->getUsername(); }
public function getEmail(): string { return $this->user->getEmail(); }
public function getLocale(): string { return $this->user->getLocale(); }
public function getZone(): string { return $this->user->getZone(); }
public function getHashId(): string { return $this->user->getHashId(); }
public function getId(): ?int { return $this->user->getId(); }
public function getRoles(): array { return $this->user->getRoles(); }
}
3. Implement UserResolverInterface
The bundle extracts the username from the bearer token and calls this to load the user.
// src/Service/CollectorUserResolver.php
namespace App\Service;
use App\Repository\UserRepository;
use Rezilio\CollectorBundle\Contract\CollectableUserInterface;
use Rezilio\CollectorBundle\Contract\UserResolverInterface;
class CollectorUserResolver implements UserResolverInterface
{
private UserRepository $repo;
public function __construct(UserRepository $repo)
{
$this->repo = $repo;
}
public function resolve(string $username): ?CollectableUserInterface
{
$user = $this->repo->findOneBy(['username' => $username]);
if (!$user) {
return null;
}
return new CollectableUser($user);
}
}
Register it in config/services.yaml:
App\Service\CollectorUserResolver:
tags: ['collector.user_resolver']
Rezilio\CollectorBundle\Contract\UserResolverInterface: '@App\Service\CollectorUserResolver'
4. Register AppCollectorListener
Create a listener that extends the bundle's CollectorListener and excludes it
from Symfony's auto-discovery by adding EventListener to the exclude list:
# config/services.yaml
App\:
resource: '../src/*'
exclude: '../src/{DependencyInjection,DataProvider,DataMapping,Entity,Migrations,Tests,Kernel.php,EventListener}'
App\EventListener\AppCollectorListener:
arguments:
$factory: '@Rezilio\CollectorBundle\Service\CollectorServiceFactory'
$userResolver: '@App\Service\CollectorUserResolver'
$logger: '@logger'
$routes: []
tags:
- { name: kernel.event_subscriber }
Create the listener class:
// src/EventListener/AppCollectorListener.php
namespace App\EventListener;
use Rezilio\CollectorBundle\Contract\CollectableUserInterface;
use Rezilio\CollectorBundle\EventListener\CollectorListener;
class AppCollectorListener extends CollectorListener
{
protected function buildExtra(CollectableUserInterface $user): array
{
return [
'user_id' => $user->getId(),
'org_id' => null, // populate from your data source
'mission_id' => null, // populate from your data source
'partner_id' => null, // populate from your data source
];
}
}
5. Add a CollectionResponseHandler
Symfony Messenger requires a handler for every routed message class. Add a minimal
stub that satisfies this requirement — the actual processing is done by the
rezilio_collector worker project.
The implementation differs depending on your Symfony version:
Symfony 4.4 / 5.x / 6.x — implement MessageHandlerInterface:
// src/MessageHandler/CollectionResponseHandler.php
namespace App\MessageHandler;
use Rezilio\CollectorBundle\Message\CollectionResponse;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class CollectionResponseHandler implements MessageHandlerInterface
{
public function __invoke(CollectionResponse $message)
{
// Handled by rezilio_collector worker
}
}
Symfony 7.x — MessageHandlerInterface was removed. Use the #[AsMessageHandler]
attribute instead:
// src/MessageHandler/CollectionResponseHandler.php
namespace App\MessageHandler;
use Rezilio\CollectorBundle\Message\CollectionResponse;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class CollectionResponseHandler
{
public function __invoke(CollectionResponse $message)
{
// Handled by rezilio_collector worker
}
}
6. Define which routes to collect
# config/packages/collector.yaml
collector:
routes:
- { path: '^/api/v1/events' }
- { path: '^/api/v2/events' }
- { path: '^/api/v2/content', controller: 'App\DataProvider\ContentCollectionDataProvider' }
7. Collecting from security events (login)
For login collection where no bearer token exists yet, inject CollectorServiceFactory
directly into your security subscriber:
// src/EventSubscriber/SecuritySubscriber.php
use App\Service\CollectableUser;
use Rezilio\CollectorBundle\Service\CollectorServiceFactory;
class SecuritySubscriber implements EventSubscriberInterface
{
private CollectorServiceFactory $collectorFactory;
public function __construct(/* ... */ CollectorServiceFactory $collectorFactory)
{
$this->collectorFactory = $collectorFactory;
}
public function logSuccess($event): void
{
$user = $event->getAuthenticationToken()->getUser();
$this->collectorFactory
->create(new CollectableUser($user))
->collect(static::class, ['login' => true]);
}
}
8. Dispatching directly (optional)
For cases where you need to dispatch a collection event without going through the listener — for example from a controller — dispatch the message directly:
use Rezilio\CollectorBundle\Message\CollectionResponse;
$this->bus->dispatch(new CollectionResponse(
$user->getZone(),
['hash' => $user->getHashId(), 'id' => $productId, 'ts' => time()]
));
9. Sanitize rules (optional)
Redact, hash, or measure fields per route:
collector:
routes:
- path: '^/api/v1/profile'
sanitize:
- { field: password, action: redact }
- path: '^/api/v1/content'
sanitize:
- { field: content, action: hash }
- { field: content, action: length }
- { field: content, action: redact }
Actions: redact replaces value with -, hash adds {field}_md5, length adds {field}_length.
What is included
| Class | Purpose |
|---|---|
CollectorBundle | Bundle entry point |
CollectableUserInterface | Contract for the user adapter |
UserResolverInterface | Contract for loading a user by username |
NullUserResolver | Default no-op resolver until you register your own |
CollectionResponse | Messenger message (zone + data) |
CollectorService | Builds and dispatches the payload |
CollectorServiceFactory | Creates a CollectorService for a given user |
CollectorListener | Kernel event subscriber — auto-collects matching routes |
DataProviderAbstract | Optional base class for API Platform DataProviders |
Configuration | Validates collector.yaml config |