giudicelli / distributed-architecture-queue
PHP distributed architecture queue
This package's canonical repository appears to be gone and the package has been frozen as a result.
Installs: 6 710
Dependents: 1
Suggesters: 0
Security: 0
Stars: 0
Watchers: 1
Forks: 0
Open Issues: 0
Requires
- php: >=7.2.5
- giudicelli/distributed-architecture: >=0.8.0
- psr/log: ~1.0
Requires (Dev)
- phpunit/phpunit: ^9
README
PHP Distributed Architecture Queue is a library that extends distributed-architecture. It implements a feeder/consumers system to allow easy and quick usage in a distributed architecture.
Installation
$ composer require giudicelli/distributed-architecture-queue
Using
To run your distributed architecture queue you will mainly need to use two classes Master\LauncherQueue and Slave\HandlerQueue.
Master process
Here is a simple example to start the master process.
use giudicelli\DistributedArchitecture\Master\Handlers\GroupConfig; use giudicelli\DistributedArchitectureQueue\Master\Handlers\Consumer\Local\Config as LocalConsumerConfig; use giudicelli\DistributedArchitectureQueue\Master\Handlers\Consumer\Remote\Config as RemoteConsumerConfig; use giudicelli\DistributedArchitectureQueue\Master\Handlers\Feeder\Local\Config as LocalFeederConfig; use giudicelli\DistributedArchitectureQueue\Master\Handlers\Feeder\Remote\Config as RemoteFeederConfig; use giudicelli\DistributedArchitectureQueue\Master\LauncherQueue; use Psr\Log\AbstractLogger; class Logger extends AbstractLogger { public function log($level, $message, array $context = []) { foreach ($context as $key => $value) { $message = str_replace('{'.$key.'}', $value, $message); } echo "{$level} - {$message}\n"; flush(); } } $logger = new Logger(); $groupConfigs = [ (new GroupConfig()) ->setName('First Group') ->setCommand('script.php') ->setProcessConfigs([ (new LocalFeederConfig()) ->setBindTo('192.168.0.1') ->setPort(9999), (new RemoteConsumerConfig()) ->setHost('192.168.0.1') ->setPort(9999) ->setHosts(['remote-server1', 'remote-server2']) ->setInstancesCount(3), ]), ]; (new LauncherQueue($logger)) ->setMaxRunningTime(3600) ->run($groupConfigs);
The above code creates one group called "First Group" and it will run "script.php" :
- 1 feeder instance launched on the local machine, it will listen on 192.168.0.1:9999,
- 3 consumer instances on the "remote-server1" machine,
- 3 consumer instances on the "remote-server2" machine.
All 6 consumer instances will connect to the feeder instance listening on 192.168.0.1:9999.
The "Master\LauncherQueue" instance will run for 1 hour before it stops all instances and returns. it's usually a good idea to restart the master after a certain time, to start a new clean environment.
Keep in mind that a "Master\LauncherQueue" instance will run forever, unless you kill it using a SIGTERM.
Slave process
A slave process must use the "Slave\HandlerQueue" class, as the master will be sending commands that need to handled. It also allows your script to do a clean exit upon the master's request. A single script performs both types of tasks, being a feeder or a consumer.
Using the above example, here is an possible implementation for "script.php".
use giudicelli\DistributedArchitecture\Slave\HandlerInterface; use giudicelli\DistributedArchitectureQueue\Slave\HandlerQueue; use giudicelli\DistributedArchitectureQueue\Slave\Queue\Feeder\FeederInterface; use Psr\Log\LoggerInterface; if (empty($_SERVER['argv'][1])) { echo "Empty params\n"; die(); } /** * The is an example of a serializable job implementation. */ class Job implements \JsonSerializable { public $id = 0; public $type = ''; public function jsonSerialize() { return [ 'id' => $this->id, 'type' => $this->type, ]; } } /** * The is an example of a feeder queue implementation. It's returns the jobs that will be sent to the consumers. */ class Feeder implements FeederInterface { private $items = []; private $successes = []; private $errors = []; public function __construct() { $item = new Job(); $item->id = 1; $item->type = 'MyType'; $this->items[] = $item; $item = new Job(); $item->id = 2; $item->type = 'MyType'; $this->items[] = $item; $item = new Job(); $item->id = 3; $item->type = 'MyType'; $this->items[] = $item; } public function empty(): bool { return empty($this->items); } public function get(): ?\JsonSerializable { if ($this->empty()) { return null; } $item = $this->items[0]; array_splice($this->items, 0, 1); return $item; } public function success(\JsonSerializable $item): void { $this->successes[] = $item; } public function error(\JsonSerializable $item): void { $this->errors[] = $item; } } $handler = new HandlerQueue($_SERVER['argv'][1]); $handler->runQueue( // The consumer callback function (HandlerInterface $handler, array $item) { // Anything echoed here will be considered log level "info" by the master process. // If you want another level for certain messages, use $handler->getLogger(). // echo "Hello world!\n" is the same as $handler->getLogger()->info('Hello world!') // I received a job to handle, the job is an array form of the Job class. echo $item['type'].':'.$item['id']."\n"; }, // The feeder accesses the jobs queue new Feeder() );