iq2i / messenger-import-bundle
A Symfony bundle that tracks the completion of asynchronous imports dispatched via symfony/messenger
Package info
github.com/IQ2i/messenger-import-bundle
Type:symfony-bundle
pkg:composer/iq2i/messenger-import-bundle
Requires
- php: >=8.1
- symfony/clock: ^6.4|^7.4|^8.0
- symfony/dependency-injection: ^6.4|^7.4|^8.0
- symfony/http-kernel: ^6.4|^7.4|^8.0
- symfony/messenger: ^6.4|^7.4|^8.0
Requires (Dev)
- doctrine/orm: ^2.20 | ^3.0
- friendsofphp/php-cs-fixer: ^3.52
- phpstan/phpstan: ^2.1
- phpstan/phpstan-symfony: ^2.0
- phpunit/phpunit: ^10.5
- symfony/framework-bundle: ^6.4|^7.4|^8.0
This package is auto-updated.
Last update: 2026-03-31 19:13:23 UTC
README
A Symfony bundle that tracks the completion of asynchronous imports dispatched via symfony/messenger.
When importing large files, dispatching messages asynchronously speeds up processing but makes it impossible to know when all messages have been handled. This bundle solves that problem by listening to Messenger worker events and detecting when every message in a batch has been processed — whether successfully or not.
Requirements
- PHP 8.1+
- Symfony 6.4 / 7.x / 8.x
symfony/messenger- Doctrine ORM (for the provided traits)
Installation
composer require iq2i/messenger-import-bundle
Register the bundle in config/bundles.php:
return [ // ... IQ2i\MessengerImportBundle\MessengerImportBundle::class => ['all' => true], ];
How it works
- Before dispatching messages, you create an
ImportBatchentity that stores the total number of messages to process. - Each message carries the batch ID via
BatchAwareMessageInterface. - The bundle's subscriber listens to
WorkerMessageHandledEventandWorkerMessageFailedEvent. After each message, it decrements the batch counter. - When the counter reaches zero, an
ImportBatchCompletedEventis dispatched. You listen to this event to send a notification, trigger a follow-up action, etc.
Setup
1. Create the batch entity
Create an entity that implements ImportBatchInterface and uses ImportBatchTrait:
// src/Entity/ImportBatch.php use Doctrine\ORM\Mapping as ORM; use IQ2i\MessengerImportBundle\Model\ImportBatchInterface; use IQ2i\MessengerImportBundle\Model\ImportBatchTrait; #[ORM\Entity(repositoryClass: ImportBatchRepository::class)] class ImportBatch implements ImportBatchInterface { use ImportBatchTrait; #[ORM\Id] #[ORM\GeneratedValue(strategy: 'CUSTOM')] #[ORM\CustomIdGenerator(class: UuidGenerator::class)] #[ORM\Column(type: 'uuid', unique: true)] private string $id; public function getId(): string { return $this->id; } }
2. Create the batch repository
Create a repository that implements ImportBatchRepositoryInterface and uses ImportBatchRepositoryTrait:
// src/Repository/ImportBatchRepository.php use Doctrine\Bundle\DoctrineBundle\Repository\ServiceEntityRepository; use Doctrine\Persistence\ManagerRegistry; use IQ2i\MessengerImportBundle\Model\ImportBatchRepositoryInterface; use IQ2i\MessengerImportBundle\Model\ImportBatchRepositoryTrait; class ImportBatchRepository extends ServiceEntityRepository implements ImportBatchRepositoryInterface { use ImportBatchRepositoryTrait; public function __construct(ManagerRegistry $registry) { parent::__construct($registry, ImportBatch::class); } }
3. Implement BatchAwareMessageInterface on your messages
// src/Message/ImportProductMessage.php use IQ2i\MessengerImportBundle\Message\BatchAwareMessageInterface; class ImportProductMessage implements BatchAwareMessageInterface { public function __construct( private readonly array $row, private readonly ?string $batchId = null, ) {} public function getRow(): array { return $this->row; } public function getBatchId(): ?string { return $this->batchId; } }
4. Dispatch your messages
Initialize the batch with the total number of messages, then attach the batch ID to each message:
// src/Service/ProductImporter.php class ProductImporter { public function __construct( private readonly ImportBatchRepository $batchRepository, private readonly MessageBusInterface $bus, private readonly EntityManagerInterface $em, ) {} public function import(array $rows): void { $batch = new ImportBatch(); $batch->initialize(count($rows)); $this->em->persist($batch); $this->em->flush(); foreach ($rows as $row) { $this->bus->dispatch(new ImportProductMessage($row, $batch->getId())); } } }
5. Listen to the completion event
// src/EventSubscriber/ImportCompletedSubscriber.php use IQ2i\MessengerImportBundle\Event\ImportBatchCompletedEvent; use Symfony\Component\EventDispatcher\EventSubscriberInterface; class ImportCompletedSubscriber implements EventSubscriberInterface { public static function getSubscribedEvents(): array { return [ ImportBatchCompletedEvent::class => 'onImportCompleted', ]; } public function onImportCompleted(ImportBatchCompletedEvent $event): void { // Send a notification, trigger a report, clean up temporary files... } }
API reference
BatchAwareMessageInterface
Implement this interface on any message that belongs to an import batch.
| Method | Description |
|---|---|
getBatchId(): ?string |
Returns the batch ID, or null if the message is not part of a batch |
ImportBatchInterface
| Method | Description |
|---|---|
initialize(int $total): void |
Sets the total message count and marks the batch as started |
getTotal(): int |
Total number of messages in the batch |
getRemaining(): int |
Number of messages not yet processed |
getCreatedAt(): \DateTimeImmutable |
When the batch was initialized |
getCompletedAt(): ?\DateTimeImmutable |
When the batch completed, null if still in progress |
isComplete(): bool |
Returns true when all messages have been processed |
markComplete(): void |
Sets completedAt (idempotent — safe to call multiple times) |
ImportBatchCompletedEvent
Dispatched when the last message in a batch has been handled (or permanently failed).
| Method | Description |
|---|---|
getBatchId(): string |
ID of the completed batch |
getTotal(): int |
Total number of messages that were processed |
Notes
- Failed messages are counted as processed only when all retry attempts are exhausted (
willRetry() === false). A message that will be retried does not decrement the counter. completedAtis set atomically insideImportBatchRepositoryTrait::decrement()the first timeremainingreaches zero. It is safe in concurrent worker environments.- The
decrement()operation uses a singleUPDATE ... WHERE remaining > 0query to prevent the counter from going below zero under concurrent load.
License
MIT — see LICENSE.