adt / background-queue
Background queue for Nette Framework
Installs: 31 448
Dependents: 5
Suggesters: 0
Security: 0
Stars: 3
Watchers: 15
Forks: 0
Open Issues: 1
Requires
- php: ^7.4|^8.0
- ext-json: *
- adt/command-lock: ^1.1
- adt/utils: ^2.10
- doctrine/dbal: ^2.0|^3.0
- psr/log: ^1.0|^2.0|^3.0
- symfony/console: ^4.0|^5.0|^6.0
Requires (Dev)
- ext-bcmath: *
- ext-sockets: *
- adt/docker-binaries: ^3.6
- codeception/assert-throws: ^1.3
- codeception/codeception: ^5.0
- codeception/module-asserts: ^2.0
- php-amqplib/php-amqplib: ^2.8
- dev-master
- v4.20
- v4.19.1
- v4.19
- v4.18.2
- v4.18.1
- v4.18
- v4.17.11
- v4.17.10
- v4.17.9
- v4.17.8
- v4.17.7
- v4.17.6
- v4.17.5
- v4.17.4
- v4.17.3
- v4.17.2
- v4.17.1
- v4.17.0
- v4.16.18
- v4.16.17
- v4.16.16
- v4.16.15
- v4.16.14
- v4.16.13
- v4.16.12
- v4.16.11
- v4.16.10
- v4.16.9
- v4.16.8
- v4.16.7
- v4.16.6
- v4.16.5
- v4.16.4
- v4.16.3
- v4.16.2
- v4.16.1
- v4.16
- v4.15.2
- v4.15.1
- v4.15
- v4.14
- v4.13.4
- v4.13.3
- v4.13.2
- v4.13.1
- v4.13
- v4.12.7
- v4.12.6
- v4.12.5
- v4.12.4
- v4.12.3
- v4.12.2
- v4.12.1
- v4.12.0
- v4.11.9
- v4.11.8
- v4.11.7
- v4.11.6
- v4.11.5
- v4.11.4
- v4.11.3
- v4.11.2
- v4.11.1
- v4.11.0
- v4.10.4
- v4.10.3
- v4.10.2
- v4.10.1
- v4.10
- v4.9.2
- v4.9.1
- v4.9
- v4.8
- v4.7.3
- v4.7.2
- v4.7.1
- v4.7
- v4.6.1
- v4.6
- v4.5.4
- v4.5.3
- v4.5.2
- v4.5.1
- v4.5
- v4.4.3
- v4.4.2
- v4.4.1
- v4.4
- v4.3
- v4.2.3
- v4.2.2
- v4.2.1
- v4.2
- v4.1.1
- v4.1
- v4.0.1
- v4.0
- v3.20.7
- v3.20.6
- v3.20.5
- v3.20.4
- v3.20.3
- v3.20.2
- v3.20.1
- v3.20
- v3.19.3
- v3.19.2
- v3.19.1
- v3.19
- v3.18
- v3.17
- v3.16
- v3.15
- v3.14.1
- v3.14
- v3.13
- v3.12.0
- v3.11.1
- v3.11.0
- v3.10.1
- v3.10.0
- v3.9.1
- v3.9
- v3.8.4
- v3.8.3
- v3.8.1
- v3.8
- v3.7.3
- v3.7.2
- v3.7.1
- v3.7
- v3.6.2
- v3.6.1
- v3.6
- v3.5
- v3.4
- v3.3
- v3.2
- v3.1
- v3.0
- v2.2
- v2.1.1
- v2.1
- v2.0
- v1.5
- v1.4
- v1.3
- v1.2
- v1.1
- v1.0
- dev-priorities
- dev-manually-back-to-broker
- dev-extend-monitoring
- dev-allow-descendants-of-exceptions
- dev-fix-type-hinting
This package is auto-updated.
Last update: 2024-05-01 12:03:13 UTC
README
Komponenta umožňuje zpracovávat úkoly na pozadí pomocí cronu nebo AMQP brokera (např. RabbitMQ). Vhodné pro dlouhotrvající requesty, komunikaci s API nebo odesílání webhooků či e-mailů.
Komponenta využívá vlastní doctrine entity manager pro ukládání záznamů do fronty. Tím pádem fungování komponenty není ovlivněno aplikačním entity managerem a naopak.
1. Instalace a konfigurace
1.1 Instalace
composer require adt/background-queue
1.2 Registrace a konfigurace
BackgroundQueue přebírá pole následujících parametrů:
$connection = [ 'serverVersion' => '8.0', 'driver' => 'pdo_mysql', 'host' => $_ENV['DB_HOST'], 'port' => $_ENV['DB_PORT'], 'user' => $_ENV['DB_USER'], 'password' => $_ENV['DB_PASSWORD'], 'dbname' => $_ENV['DB_DBNAME'], ]; $backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([ 'callbacks' => [ 'processEmail' => [$mailer, 'process'], 'processEmail2' => [ // možnost specifikace jiné fronty pro tento callback 'callback' => [$mailer, 'process'], 'queue' => 'general', ], ] 'notifyOnNumberOfAttempts' => 5, // počet pokusů o zpracování záznamu před zalogováním 'tempDir' => $tempDir, // cesta pro uložení zámku proti vícenásobnému spuštění commandu 'connection' => $connection, // pole parametru predavane do Doctrine\Dbal\Connection nebo DSN 'queue' => $_ENV['PROJECT_NAME'], // název fronty, do které se ukládají a ze které se vybírají záznamy 'tableName' => 'background_job', // nepovinné, název tabulky, do které se budou ukládat jednotlivé joby 'logger' => $logger, // nepovinné, musí implementovat psr/log LoggerInterface 'onBeforeProcess' => function(array $parameters) {...}, // nepovinné 'onError' => function(Throwable $e, array $parameters) {...}, // nepovinné 'onAfterProcess' => function(array $parameters) {...}, // nepovinné 'onProcessingGetMetadata' => function(array $parameters): ?array {...}, // nepovinné ]);
Potřebné databázové schéma se vytvoři při prvním použití fronty automaticky a také se automaticky aktualizuje, je-li třeba.
1.3 Broker (optional)
You can use this package with any message broker. Your producer or consumer just need to implement ADT\BackgroundQueue\Broker\Producer
or ADT\BackgroundQueue\Broker\Consumer
.
Or you can use php-amqplib/php-amqplib
, for which this library has an ready to use implementation.
1.3.1 php-amqplib installation
Because using of php-amqplib/php-amqplib
is optional, it doesn't check your installed version against the version with which this package was tested. That's why it's recommended to add to your composer:
{ "conflict": { "php-amqplib/php-amqplib": "<3.0.0 || >=4.0.0" } }
This version of php-amqplib/php-amqplib
also need ext-sockets
. You can add it to your Dockerfile like this:
docker-php-ext-install sockets
and then run:
composer require php-amqplib/php-amqplib
This make sures you avoid BC break when upgrading php-amqplib/php-amqplib
in the future.
1.3.1 php-amqplib configuration
$connectionParams = [ 'host' => $_ENV['RABBITMQ_HOST'], 'user' => $_ENV['RABBITMQ_USER'], 'password' => $_ENV['RABBITMQ_PASSWORD'] ]; $queueParams = [ 'arguments' => ['x-queue-type' => ['S', 'quorum']] ]; $manager = new \ADT\BackgroundQueue\Broker\PhpAmqpLib\Manager($connectionParams, $queueParams); $producer = new \ADT\BackgroundQueue\Broker\PhpAmqpLib\Producer(); $consumer = new \ADT\BackgroundQueue\Broker\PhpAmqpLib\Consumer(); $backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([ ... 'producer' => $producer, 'waitingJobExpiration' => 1000, // nepovinné, délka v ms, po které se job pokusí znovu provést, když čeká na dokončení předchozího ]);
2 Použití
2.1 Přidání záznamu do fronty a jeho zpracování
namespace App\Presenters; use ADT\BackgroundQueue\BackgroundQueue; class Mailer { private BackgroundQueue $backgroundQueue public function __construct(BackgroundQueue $backgroundQueue) { $this->backgroundQueue = $backgroundQueue; } public function send(Invoice $invoice) { $callbackName = 'processEmail'; $parameters = [ 'to' => 'hello@appsdevteam.com', 'subject' => 'Background queue test' 'text' => 'Anything you want.' ]; $serialGroup = 'invoice-' . $invoice->getId(); $identifier = 'sendEmail-' . $invoice->getId(); $isUnique = true; // always set to true if a callback on an entity should be performed only once, regardless of how it can happen that it is added to your queue twice $availableAt = new \DateTimeImmutable('+1 hour'); // earliest time when the record should be processed $this->backgroundQueue->publish($callbackName, $parameters, $serialGroup, $identifier, $isUnique, $availableAt); } public function process(string $to, string $subject, string $text) { // own implementation } }
Záznam se uloží ve stavu READY
.
Parametr $parameters
může přijímat jakýkoliv běžný typ (pole, objekt, string, ...) či jejich kombinace (pole objektů), a to dokonce i binární data.
Parametr $serialGroup
je nepovinný - jeho zadáním zajistítě, že všechny joby se stejným serialGroup budou provedeny sériově.
Parametr $identifier
je nepovinný - pomocí něj si můžete označit joby vlastním identifikátorem a následně pomocí metody getUnfinishedJobIdentifiers(array $identifiers = [])
zjistit, které z nich ještě nebyly provedeny.
Pokud callback vyhodí ADT\BackgroundQueue\Exception\PermanentErrorException
, záznam se uloží ve stavu PERMANENTLY_FAILED
a je potřeba jej zpracovat ručně.
Pokud callback vyhodí ADT\BackgroundQueue\Exception\WaitingException
, záznam se uloží ve stavu WAITING
a zkusí se zpracovat při přištím spuštění background-queue:process
commandu (viz níže). Počítadlo pokusů se nezvyšuje.
Pokud callback vyhodí jakýkoliv jiný error nebo exception implementující Throwable
, záznam se uloží ve stavu TEMPORARILY_FAILED
a zkusí se zpracovat při přištím spuštění background-queue:process
commandu (viz níže). Po notifyOnNumberOfAttempts
je zaslána notifikace. Prodleva mezi každým dalším opakováním je prodloužena o dvojnásobek času, maximálně však na dobu 16 minut.
Ve všech ostatních případech se záznam uloží jako úspěšně dokončený ve stavu STATE_FINISHED
.
2.2 Commandy
background-queue:process
Zpracuje všechny záznamy ve stavu READY
, TEMPORARILY_FAILED
, WAITING
a BROKER_FAILED
, v případě využití brokera pouze záznamy ve stavu TEMPORARILY_FAILED
a WAITING
. Command je ideální spouštět cronem každou minutu. V případě použití brokeru je záznam ve stavu TEMPORARILY_FAILED
a WAITING
zařazen znovu do brokera a stav je změněn na READY
.
background-queue:clear-finished
Smaže všechny úspěšně zpracované záznamy.
background-queue:clear-finished 14
Smaže všechny úspěšně zpracované záznamy starší 14 dní.
background-queue:reload-consumers QUEUE NUMBER
Reloadne NUMBER consumerů pro danou QUEUE.
background-queue:update-schema
Aktualizuje databázové schéma, pokud je potřeba.
Všechny commandy jsou chráněny proti vícenásobnému spuštění.
2.3 Callbacky
Využivát můžete také 2 callbacky onBeforeProcess
a onAfterProcess
, v nichž například můžete provést přepinání databází.
3 Monitoring
Při spuštění consumera se do tabulky background_job
do sloupce pid
uloží aktuální PID procesu. Nejedná se o PID z pohledu systému, ale o PID uvnitř docker kontejneru.