tourze/async-import-bundle

Symfony异步导入模块

Installs: 19

Dependents: 0

Suggesters: 0

Security: 0

Stars: 0

Watchers: 1

Forks: 0

Open Issues: 0

Type:symfony-bundle

This package is auto-updated.

Last update: 2025-06-22 18:21:59 UTC


README

A powerful Symfony bundle for handling asynchronous file imports with support for CSV, Excel, and JSON formats. Features include progress tracking, error logging, retry mechanisms, and batch processing.

Features

  • Multiple File Formats: Support for CSV, Excel (XLS/XLSX), and JSON files
  • Asynchronous Processing: Uses Symfony Messenger for background processing
  • Progress Tracking: Real-time progress updates with speed calculation
  • Error Handling: Detailed error logging with line numbers and data
  • Retry Mechanism: Automatic retry with exponential backoff
  • Batch Processing: Configurable batch sizes for memory efficiency
  • Extensible Architecture: Easy to add custom import handlers
  • EasyAdmin Integration: Ready for admin panel integration

Installation

composer require tourze/async-import-bundle

Configuration

1. Register the bundle

Register the bundle in your config/bundles.php:

return [
    // ...
    AsyncImportBundle\AsyncImportBundle::class => ['all' => true],
    // ...
];

2. Configure Messenger

Add to config/packages/messenger.yaml:

framework:
    messenger:
        transports:
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: import
                retry_strategy:
                    max_retries: 3
                    delay: 1000
                    multiplier: 2

        routing:
            AsyncImportBundle\Message\ProcessImportTaskMessage: async
            AsyncImportBundle\Message\ProcessImportBatchMessage: async
            AsyncImportBundle\Message\CleanupImportTaskMessage: async

3. Create upload directory

mkdir -p var/import
chmod 755 var/import

Usage

1. Create an Import Handler

<?php

namespace App\Import;

use AsyncImportBundle\Service\ImportHandlerInterface;
use AsyncImportBundle\Service\ValidationResult;
use AsyncImportBundle\Entity\AsyncImportTask;

class UserImportHandler implements ImportHandlerInterface
{
    public function supports(string $entityClass): bool
    {
        return $entityClass === User::class;
    }

    public function validate(array $row, int $lineNumber): ValidationResult
    {
        $result = ValidationResult::success();
        
        if (empty($row['email'])) {
            $result->addError('Email is required');
        }
        
        return $result;
    }

    public function import(array $row, AsyncImportTask $task): void
    {
        $user = new User();
        $user->setEmail($row['email']);
        $user->setName($row['name']);
        
        $this->entityManager->persist($user);
    }

    public function getFieldMapping(): array
    {
        return [
            'Email' => 'email',
            'Name' => 'name',
        ];
    }

    public function getBatchSize(): int
    {
        return 100;
    }

    public function getEntityClass(): string
    {
        return User::class;
    }

    public function preprocess(array $row): array
    {
        return array_map('trim', $row);
    }
}

2. Register the Handler

services:
    App\Import\UserImportHandler:
        tags: ['async_import.handler']

3. Create Import Task

use AsyncImportBundle\Service\AsyncImportService;
use AsyncImportBundle\Message\ProcessImportTaskMessage;

// In your controller
public function import(
    Request $request,
    AsyncImportService $importService,
    MessageBusInterface $messageBus
): Response {
    $file = $request->files->get('file');
    
    // Create import task
    $task = $importService->createTask($file, User::class, [
        'priority' => 10,
        'maxRetries' => 3,
        'remark' => 'User batch import'
    ]);
    
    // Dispatch for async processing
    $messageBus->dispatch(new ProcessImportTaskMessage($task->getId()));
    
    return $this->json(['taskId' => $task->getId()]);
}

4. Track Progress

use AsyncImportBundle\Service\ImportProgressTracker;

public function progress(
    string $taskId,
    ImportProgressTracker $progressTracker
): Response {
    $progress = $progressTracker->getProgress($task);
    
    return $this->json([
        'percentage' => $progress['percentage'],
        'processed' => $progress['processed'],
        'total' => $progress['total'],
        'speed' => $progress['speed'],
        'eta' => $progress['eta']
    ]);
}

File Format Examples

CSV

Email,Name
john@example.com,John Doe
jane@example.com,Jane Smith

Excel

Standard Excel files with headers in the first row.

JSON

[
    {"email": "john@example.com", "name": "John Doe"},
    {"email": "jane@example.com", "name": "Jane Smith"}
]

Console Commands

# Process pending import tasks
php bin/console import:process

# Retry failed tasks
php bin/console import:retry-failed

# Cleanup old tasks (default: 30 days)
php bin/console import:cleanup

# Check import status
php bin/console import:status <taskId>

Error Handling

Errors are logged with detailed information:

$errors = $importService->getTaskErrors($task);

foreach ($errors as $error) {
    echo sprintf(
        "Line %d: %s\nData: %s\n",
        $error->getLine(),
        $error->getError(),
        json_encode($error->getRawRow())
    );
}

Events

The bundle dispatches several events:

  • ImportTaskCreatedEvent: When a new import task is created
  • ImportProgressEvent: During import processing with progress updates
  • ImportCompletedEvent: When import is successfully completed
  • ImportFailedEvent: When import fails

Testing

# Run all tests from project root
./vendor/bin/phpunit packages/async-import-bundle/tests

# Run specific test class
./vendor/bin/phpunit packages/async-import-bundle/tests/Entity/AsyncImportTaskTest.php

Advanced Features

Custom File Parsers

Implement FileParserInterface to support additional file formats:

class XmlParser implements FileParserInterface
{
    public function supports(ImportFileType $fileType): bool
    {
        return $fileType === ImportFileType::XML;
    }
    
    // ... implement other methods
}

Import Options

$task = $importService->createTask($file, Entity::class, [
    'delimiter' => ';',          // CSV delimiter
    'encoding' => 'ISO-8859-1',  // File encoding
    'skipHeader' => true,        // Skip first row
    'sheetIndex' => 0,          // Excel sheet index
    'rootKey' => 'data',        // JSON root key
    'priority' => 10,           // Task priority
    'maxRetries' => 3,          // Max retry attempts
]);

Performance Tips

  1. Batch Size: Adjust batch size based on your entity complexity
  2. Memory: Monitor memory usage for large files
  3. Workers: Run multiple messenger workers for parallel processing
  4. Indexes: Ensure database indexes on frequently queried fields

Contributing

Contributions are welcome! Please ensure all tests pass and add tests for new features.

License

This package is available under the MIT license.