rezilio/collector-bundle

Symfony bundle for collecting user behaviour events into RabbitMQ → S3 → Athena

Maintainers

Package info

bitbucket.org/productionv3/rezilio-collector-bundle

Type:symfony-bundle

pkg:composer/rezilio/collector-bundle

Statistics

Installs: 22

Dependents: 0

Suggesters: 0

1.0.9 2026-05-01 14:39 UTC

This package is auto-updated.

Last update: 2026-05-01 17:14:44 UTC


README

Symfony bundle that collects user behaviour events from any API service and dispatches them to RabbitMQ → S3 → Athena.

Installation

composer require rezilio/collector-bundle

Register the bundle manually in config/bundles.php — Symfony Flex does not auto-register third-party bundles:

// config/bundles.php
return [
    Rezilio\CollectorBundle\CollectorBundle::class => ['all' => true],
];

1. Add the collection_queue transport to your messenger.yaml

Do NOT import the bundle's messenger.yaml. Instead add the transport and routing directly to your project's config/packages/messenger.yaml (and any environment overrides such as config/packages/dev/messenger.yaml):

# config/packages/messenger.yaml  (and each environment override)
framework:
  messenger:
    transports:
      # ... your existing transports ...
      collection_queue: '%env(MESSENGER_TRANSPORT_DSN)%%rabbitmq_prefix%-CollectionQueue'

    routing:
      # ... your existing routing ...
      'Rezilio\CollectorBundle\Message\CollectionResponse': collection_queue

Important — environment overrides: If your project has per-environment messenger files (e.g. config/packages/dev/messenger.yaml) they override the main file entirely. Make sure collection_queue transport and routing are present in every environment file that exists, not just the main one.

Important — RabbitMQ bindings: If you previously had a messages default queue, make sure it is not bound to the CollectionQueue exchange. Stale bindings will cause messages to land in both queues. Delete the binding via the RabbitMQ management UI or API if needed.

2. Create a CollectableUser adapter

Do NOT modify your existing User entity. Create a thin adapter class instead:

// src/Service/CollectableUser.php
namespace App\Service;

use App\Entity\User;
use Rezilio\CollectorBundle\Contract\CollectableUserInterface;

class CollectableUser implements CollectableUserInterface
{
    private User $user;

    public function __construct(User $user)
    {
        $this->user = $user;
    }

    public function getUsername(): string { return $this->user->getUsername(); }
    public function getEmail(): string    { return $this->user->getEmail(); }
    public function getLocale(): string   { return $this->user->getLocale(); }
    public function getZone(): string     { return $this->user->getZone(); }
    public function getHashId(): string   { return $this->user->getHashId(); }
    public function getId(): ?int         { return $this->user->getId(); }
    public function getRoles(): array     { return $this->user->getRoles(); }
}

3. Implement UserResolverInterface

The bundle extracts the username from the bearer token and calls this to load the user.

// src/Service/CollectorUserResolver.php
namespace App\Service;

use App\Repository\UserRepository;
use Rezilio\CollectorBundle\Contract\CollectableUserInterface;
use Rezilio\CollectorBundle\Contract\UserResolverInterface;

class CollectorUserResolver implements UserResolverInterface
{
    private UserRepository $repo;

    public function __construct(UserRepository $repo)
    {
        $this->repo = $repo;
    }

    public function resolve(string $username): ?CollectableUserInterface
    {
        $user = $this->repo->findOneBy(['username' => $username]);
        if (!$user) {
            return null;
        }
        return new CollectableUser($user);
    }
}

Register it in config/services.yaml:

App\Service\CollectorUserResolver:
    tags: ['collector.user_resolver']

Rezilio\CollectorBundle\Contract\UserResolverInterface: '@App\Service\CollectorUserResolver'

4. Register AppCollectorListener

Create a listener that extends the bundle's CollectorListener and excludes it from Symfony's auto-discovery by adding EventListener to the exclude list:

# config/services.yaml
App\:
    resource: '../src/*'
    exclude: '../src/{DependencyInjection,DataProvider,DataMapping,Entity,Migrations,Tests,Kernel.php,EventListener}'

App\EventListener\AppCollectorListener:
    arguments:
        $factory:      '@Rezilio\CollectorBundle\Service\CollectorServiceFactory'
        $userResolver: '@App\Service\CollectorUserResolver'
        $logger:       '@logger'
        $routes:       []
    tags:
        - { name: kernel.event_subscriber }

Create the listener class:

// src/EventListener/AppCollectorListener.php
namespace App\EventListener;

use Rezilio\CollectorBundle\Contract\CollectableUserInterface;
use Rezilio\CollectorBundle\EventListener\CollectorListener;

class AppCollectorListener extends CollectorListener
{
    protected function buildExtra(CollectableUserInterface $user): array
    {
        return [
            'user_id'    => $user->getId(),
            'org_id'     => null, // populate from your data source
            'mission_id' => null, // populate from your data source
            'partner_id' => null, // populate from your data source
        ];
    }
}

5. Add a CollectionResponseHandler

Symfony Messenger requires a handler for every routed message class. Add a minimal stub that satisfies this requirement — the actual processing is done by the rezilio_collector worker project.

The implementation differs depending on your Symfony version:

Symfony 4.4 / 5.x / 6.x — implement MessageHandlerInterface:

// src/MessageHandler/CollectionResponseHandler.php
namespace App\MessageHandler;

use Rezilio\CollectorBundle\Message\CollectionResponse;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class CollectionResponseHandler implements MessageHandlerInterface
{
    public function __invoke(CollectionResponse $message)
    {
        // Handled by rezilio_collector worker
    }
}

Symfony 7.xMessageHandlerInterface was removed. Use the #[AsMessageHandler] attribute instead:

// src/MessageHandler/CollectionResponseHandler.php
namespace App\MessageHandler;

use Rezilio\CollectorBundle\Message\CollectionResponse;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class CollectionResponseHandler
{
    public function __invoke(CollectionResponse $message)
    {
        // Handled by rezilio_collector worker
    }
}

6. Define which routes to collect

# config/packages/collector.yaml
collector:
    routes:
        - { path: '^/api/v1/events' }
        - { path: '^/api/v2/events' }
        - { path: '^/api/v2/content', controller: 'App\DataProvider\ContentCollectionDataProvider' }

7. Collecting from security events (login)

For login collection where no bearer token exists yet, inject CollectorServiceFactory directly into your security subscriber:

// src/EventSubscriber/SecuritySubscriber.php
use App\Service\CollectableUser;
use Rezilio\CollectorBundle\Service\CollectorServiceFactory;

class SecuritySubscriber implements EventSubscriberInterface
{
    private CollectorServiceFactory $collectorFactory;

    public function __construct(/* ... */ CollectorServiceFactory $collectorFactory)
    {
        $this->collectorFactory = $collectorFactory;
    }

    public function logSuccess($event): void
    {
        $user = $event->getAuthenticationToken()->getUser();
        $this->collectorFactory
            ->create(new CollectableUser($user))
            ->collect(static::class, ['login' => true]);
    }
}

8. Dispatching directly (optional)

For cases where you need to dispatch a collection event without going through the listener — for example from a controller — dispatch the message directly:

use Rezilio\CollectorBundle\Message\CollectionResponse;

$this->bus->dispatch(new CollectionResponse(
    $user->getZone(),
    ['hash' => $user->getHashId(), 'id' => $productId, 'ts' => time()]
));

9. Sanitize rules (optional)

Redact, hash, or measure fields per route:

collector:
    routes:
        - path: '^/api/v1/profile'
          sanitize:
              - { field: password, action: redact }

        - path: '^/api/v1/content'
          sanitize:
              - { field: content, action: hash }
              - { field: content, action: length }
              - { field: content, action: redact }

Actions: redact replaces value with -, hash adds {field}_md5, length adds {field}_length.

What is included

ClassPurpose
CollectorBundleBundle entry point
CollectableUserInterfaceContract for the user adapter
UserResolverInterfaceContract for loading a user by username
NullUserResolverDefault no-op resolver until you register your own
CollectionResponseMessenger message (zone + data)
CollectorServiceBuilds and dispatches the payload
CollectorServiceFactoryCreates a CollectorService for a given user
CollectorListenerKernel event subscriber — auto-collects matching routes
DataProviderAbstractOptional base class for API Platform DataProviders
ConfigurationValidates collector.yaml config