tourze / async-import-bundle
Symfony异步导入模块
Installs: 19
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 1
Forks: 0
Open Issues: 0
Type:symfony-bundle
Requires
- php: ^8.1
- doctrine/dbal: ^4.0
- doctrine/doctrine-bundle: ^2.13
- doctrine/orm: ^3.0
- doctrine/persistence: ^3.1 || ^4
- symfony/config: ^6.4
- symfony/dependency-injection: ^6.4
- symfony/doctrine-bridge: ^6.4
- symfony/framework-bundle: ^6.4
- symfony/http-kernel: ^6.4
- symfony/security-core: ^6.4
- symfony/yaml: ^6.4 || ^7.1
- tourze/bundle-dependency: 0.0.*
- tourze/doctrine-indexed-bundle: 0.0.*
- tourze/doctrine-snowflake-bundle: 0.1.*
- tourze/doctrine-timestamp-bundle: 0.0.*
- tourze/doctrine-user-bundle: 0.0.*
- tourze/easy-admin-attribute: 0.1.*
- tourze/symfony-integration-test-kernel: 0.0.*
Requires (Dev)
- phpstan/phpstan: ^2.1
- phpunit/phpunit: ^10.0
- symfony/phpunit-bridge: ^6.4
README
A powerful Symfony bundle for handling asynchronous file imports with support for CSV, Excel, and JSON formats. Features include progress tracking, error logging, retry mechanisms, and batch processing.
Features
- Multiple File Formats: Support for CSV, Excel (XLS/XLSX), and JSON files
- Asynchronous Processing: Uses Symfony Messenger for background processing
- Progress Tracking: Real-time progress updates with speed calculation
- Error Handling: Detailed error logging with line numbers and data
- Retry Mechanism: Automatic retry with exponential backoff
- Batch Processing: Configurable batch sizes for memory efficiency
- Extensible Architecture: Easy to add custom import handlers
- EasyAdmin Integration: Ready for admin panel integration
Installation
composer require tourze/async-import-bundle
Configuration
1. Register the bundle
Register the bundle in your config/bundles.php
:
return [ // ... AsyncImportBundle\AsyncImportBundle::class => ['all' => true], // ... ];
2. Configure Messenger
Add to config/packages/messenger.yaml
:
framework: messenger: transports: async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: queue_name: import retry_strategy: max_retries: 3 delay: 1000 multiplier: 2 routing: AsyncImportBundle\Message\ProcessImportTaskMessage: async AsyncImportBundle\Message\ProcessImportBatchMessage: async AsyncImportBundle\Message\CleanupImportTaskMessage: async
3. Create upload directory
mkdir -p var/import chmod 755 var/import
Usage
1. Create an Import Handler
<?php namespace App\Import; use AsyncImportBundle\Service\ImportHandlerInterface; use AsyncImportBundle\Service\ValidationResult; use AsyncImportBundle\Entity\AsyncImportTask; class UserImportHandler implements ImportHandlerInterface { public function supports(string $entityClass): bool { return $entityClass === User::class; } public function validate(array $row, int $lineNumber): ValidationResult { $result = ValidationResult::success(); if (empty($row['email'])) { $result->addError('Email is required'); } return $result; } public function import(array $row, AsyncImportTask $task): void { $user = new User(); $user->setEmail($row['email']); $user->setName($row['name']); $this->entityManager->persist($user); } public function getFieldMapping(): array { return [ 'Email' => 'email', 'Name' => 'name', ]; } public function getBatchSize(): int { return 100; } public function getEntityClass(): string { return User::class; } public function preprocess(array $row): array { return array_map('trim', $row); } }
2. Register the Handler
services: App\Import\UserImportHandler: tags: ['async_import.handler']
3. Create Import Task
use AsyncImportBundle\Service\AsyncImportService; use AsyncImportBundle\Message\ProcessImportTaskMessage; // In your controller public function import( Request $request, AsyncImportService $importService, MessageBusInterface $messageBus ): Response { $file = $request->files->get('file'); // Create import task $task = $importService->createTask($file, User::class, [ 'priority' => 10, 'maxRetries' => 3, 'remark' => 'User batch import' ]); // Dispatch for async processing $messageBus->dispatch(new ProcessImportTaskMessage($task->getId())); return $this->json(['taskId' => $task->getId()]); }
4. Track Progress
use AsyncImportBundle\Service\ImportProgressTracker; public function progress( string $taskId, ImportProgressTracker $progressTracker ): Response { $progress = $progressTracker->getProgress($task); return $this->json([ 'percentage' => $progress['percentage'], 'processed' => $progress['processed'], 'total' => $progress['total'], 'speed' => $progress['speed'], 'eta' => $progress['eta'] ]); }
File Format Examples
CSV
Email,Name
john@example.com,John Doe
jane@example.com,Jane Smith
Excel
Standard Excel files with headers in the first row.
JSON
[ {"email": "john@example.com", "name": "John Doe"}, {"email": "jane@example.com", "name": "Jane Smith"} ]
Console Commands
# Process pending import tasks php bin/console import:process # Retry failed tasks php bin/console import:retry-failed # Cleanup old tasks (default: 30 days) php bin/console import:cleanup # Check import status php bin/console import:status <taskId>
Error Handling
Errors are logged with detailed information:
$errors = $importService->getTaskErrors($task); foreach ($errors as $error) { echo sprintf( "Line %d: %s\nData: %s\n", $error->getLine(), $error->getError(), json_encode($error->getRawRow()) ); }
Events
The bundle dispatches several events:
ImportTaskCreatedEvent
: When a new import task is createdImportProgressEvent
: During import processing with progress updatesImportCompletedEvent
: When import is successfully completedImportFailedEvent
: When import fails
Testing
# Run all tests from project root ./vendor/bin/phpunit packages/async-import-bundle/tests # Run specific test class ./vendor/bin/phpunit packages/async-import-bundle/tests/Entity/AsyncImportTaskTest.php
Advanced Features
Custom File Parsers
Implement FileParserInterface
to support additional file formats:
class XmlParser implements FileParserInterface { public function supports(ImportFileType $fileType): bool { return $fileType === ImportFileType::XML; } // ... implement other methods }
Import Options
$task = $importService->createTask($file, Entity::class, [ 'delimiter' => ';', // CSV delimiter 'encoding' => 'ISO-8859-1', // File encoding 'skipHeader' => true, // Skip first row 'sheetIndex' => 0, // Excel sheet index 'rootKey' => 'data', // JSON root key 'priority' => 10, // Task priority 'maxRetries' => 3, // Max retry attempts ]);
Performance Tips
- Batch Size: Adjust batch size based on your entity complexity
- Memory: Monitor memory usage for large files
- Workers: Run multiple messenger workers for parallel processing
- Indexes: Ensure database indexes on frequently queried fields
Contributing
Contributions are welcome! Please ensure all tests pass and add tests for new features.
License
This package is available under the MIT license.