michel / pqueue
PQueue is a minimalist PHP library for processing background messages using a single persistent CLI (managed via systemd) or periodic execution (via cron), covering 90% of use cases without external dependencies or complex worker management.
Requires
- php: >=7.4
- ext-json: *
- ext-sqlite3: *
- michel/console: ^1.0
- michel/michel-package-starter: ^1.0
- michel/options-resolver: ^1.0
- psr/container: ^1.1|^2.0
- psr/log: ^1.1|^2.0|^3.0
Requires (Dev)
- michel/unitester: ^1.0.0
This package is not auto-updated.
Last update: 2025-12-16 11:54:29 UTC
README
PQueue is a lightweight, framework-agnostic library for handling background jobs and messages with persistent queues.
Features
- Multiple Transports: Comes with
SQLiteandFilesystemtransports. - DI-Friendly: Designed to integrate cleanly with any PSR-11 dependency injection container.
- Configurable Worker: The queue worker can be configured with memory limits, time limits, retry strategies, and more.
- Automatic Handler Discovery: Scans specified directories to find your message handlers automatically.
Installation
composer require michel/pqueue
Basic Usage (Without a Framework)
This example shows how to use the library in a simple PHP script.
1. Create a Message and a Handler
// src/Messages/MyMessage.php
namespace App\Messages;
class MyMessage {
public string $text;
public function __construct(string $text) { $this->text = $text; }
}
// src/Handlers/MyMessageHandler.php
namespace App\Handlers;
use App\Messages\MyMessage;
class MyMessageHandler {
public function __invoke(MyMessage $message) {
echo "Processing message: " . $message->text . "\n";
}
}
2. Dispatch a Message
// send_message.php
require 'vendor/autoload.php';
use Michel\PQueue\Transport\SQLiteTransport;
use Michel\PQueue\PQueueDispatcher;
use App\Messages\MyMessage;
// 1. Create a transport
$transport = SQLiteTransport::create(['db_path' => __DIR__ . '/pqueue.sqlite']);
// 2. Create a dispatcher
$dispatcher = new PQueueDispatcher($transport);
// 3. Dispatch your message
$dispatcher->dispatch(new MyMessage('Hello, World!'));
echo "Message dispatched!\n";
3. Run the Worker
The worker needs a HandlerResolver to get handler instances. For this simple example, we'll create a basic one.
// worker.php
require 'vendor/autoload.php';
use Michel\PQueue\PQueueConsumerFactory;
use Michel\PQueue\PQueueWorker;
use Michel\PQueue\HandlerResolver\HandlerResolverInterface;
use Michel\PQueue\Transport\SQLiteTransport;
use App\Handlers\MyMessageHandler; // Import the handler class
// 1. Create a simple handler resolver for the example
$handlerResolver = new class implements HandlerResolverInterface {
private array $handlers = [];
public function getHandler(string $className): object {
if (!isset($this->handlers[$className])) {
$this->handlers[$className] = new $className();
}
return $this->handlers[$className];
}
public function hasHandler(string $className): bool {
return class_exists($className);
}
};
// 2. Create the transport
$transport = SQLiteTransport::create(['db_path' => __DIR__ . '/pqueue.sqlite']);
// 3. Use the factory to build the consumer
$factory = new PQueueConsumerFactory(
$handlerResolver,
[
MyMessageHandler::class, // You can add handler classes directly
__DIR__ . '/src/Handlers' // And also scan directories
],
__DIR__ . '/cache' // Cache directory for handler discovery
);
$consumer = $factory->createConsumer();
// 4. Create and run the worker
$worker = new PQueueWorker($transport, $consumer, [
'stopWhenEmpty' => true, // Stop after processing all messages
]);
$worker->run();
echo "Worker finished.\n";
Worker Callbacks
You can hook into the worker lifecycle using the following methods:
onConsume(callable $callback): Executed after a message is successfully consumed.onFailure(callable $callback): Executed when a message fails processing.onStop(callable $callback): Executed when the worker stops (due to memory limit, time limit, or empty queue).
$worker->onConsume(function ($message) {
echo "Message processed!\n";
});
$worker->onFailure(function ($message, $exception) {
echo "Message failed: " . $exception->getMessage() . "\n";
});
$worker->onStop(function () {
echo "Worker stopped.\n";
});