uginroot / async-symfony-process
Execute pool symfony process
Requires
- php: >=7.4
- symfony/process: ^3.4|^4.4|^5.0
Requires (Dev)
- phpunit/phpunit: 8.0.0
README
Existing libraries work with an early generated pool of processes, this is poorly suited if there are a many processes and is generally not suitable if after the execution of the process it may be necessary to execute an additional one. This library to solve this problem.
This library works on the basis of the symfony/process
Install
composer require uginroot/async-symfony-process
Use
Basic
In the simplest case, you only need to specify a function that will generate new processes. If the function returns zero instead of a process, then execution will end after all current processes have finished executing.
use Symfony\Component\Process\Process; use Uginroot\AsyncSymfonyProcess\Pool; $queue = range(1, 10); $processFactory = static function() use (&$queue):?Process{ if(count($queue) === 0){ return null; } $value = array_shift($queue); return Process::fromShellCommandline(sprintf('echo %d', $value)); }; $pool = new Pool(); $pool->setProcessFactory($processFactory); $pool->execute();
Callback
If you need the result of a process, then you need to set a callback function that will be called after the completion of the process.
use Symfony\Component\Process\Process; use Uginroot\AsyncSymfonyProcess\Pool; use Uginroot\AsyncSymfonyProcess\ProcessWrapper; $queue = range(1, 10); $results = []; $processFactory = static function() use (&$queue):?Process{ if(count($queue) === 0){ return null; } $value = array_shift($queue); return Process::fromShellCommandline(sprintf('echo %d', $value)); }; $callback = static function(ProcessWrapper $processWrapper) use (&$results):void{ $results[] = (int)$processWrapper->getOutput(); }; $pool = new Pool(); $pool->setProcessFactory($processFactory); $pool->setCallback($callback); $pool->execute();
Output listener
If the process is interactive, or you need to somehow react to the errors that the process generates, then you need to set the output listener. It will receive an instance of the current process, output type (Process :: ERR or Process :: OUT) and data generated by the process.
use Symfony\Component\Process\InputStream; use Symfony\Component\Process\Process; use Uginroot\AsyncSymfonyProcess\Pool; $queue = range(1, 10); $processFactory = static function() use (&$queue):?Process{ if(count($queue) === 0){ return null; } $value = array_shift($queue); $process = Process::fromShellCommandline(sprintf('echo %d', $value)); $process->setInput(new InputStream()); return $process; }; $outputListener = static function(Process $process, string $type, string $data):void{ /** @var InputStream $input */ $input = $process->getInput(); if($type === Process::ERR){ $input->write('exit;'); } elseif ($type === Process::OUT){ if($data === 'Say yes:'){ $input->write('yes'); } } }; $pool = new Pool(); $pool->setProcessFactory($processFactory); $pool->setOutputListener($outputListener); $pool->execute();
While listener
To perform any actions at each iteration of the process execution loop, you must set a loop listener.
use Symfony\Component\Process\Process; use Uginroot\AsyncSymfonyProcess\Pool; $queue = range(1, 10); $processFactory = static function() use (&$queue):?Process{ if(count($queue) === 0){ return null; } $value = array_shift($queue); return Process::fromShellCommandline(sprintf('echo %d', $value)); }; $whileListener = static function():void{ // pass }; $pool = new Pool(); $pool->setProcessFactory($processFactory); $pool->setWhileListener($whileListener); $pool->execute();
Endless execution loop
In order for the process execution cycle not to stop when the process factory returns zero, it is necessary to inform this when configuring the pool. You can react and replenish the list of processes in any of the called functions, for example, in the whileListener. If you need to halt infinite execution, then you need to specify this to the pool.
use Symfony\Component\Process\Process; use Uginroot\AsyncSymfonyProcess\Pool; $queue = []; $iteration = 0; $processFactory = static function() use (&$queue):?Process{ if(count($queue) === 0){ return null; } $value = array_shift($queue); return Process::fromShellCommandline(sprintf('echo %d', $value)); }; $whileListener = static function() use (&$queue, &$iteration, &$pool):void{ $iteration++; if($iteration === 20){ // Will end execution after executed and // newly generated processes have finished $pool->setIsEternal(false); } if($iteration % 5 === 0){ $queue[] = $iteration; } }; $pool = new Pool(); $pool->setProcessFactory($processFactory); $pool->setWhileListener($whileListener); $pool->setIsEternal(true); $pool->execute();
Use in class
use Symfony\Component\Process\Process; use Uginroot\AsyncSymfonyProcess\Pool; use Uginroot\AsyncSymfonyProcess\ProcessWrapper; class AsyncProcess { private array $indexes; private array $results = []; public function __construct() { $this->indexes = range(1, 10); } public function processFactory():?Process { if(count($this->indexes) === 0){ return null; } $index = array_shift($this->indexes); return Process::fromShellCommandline(sprintf('echo %d', $index)); } public function processCallback(ProcessWrapper $processWrapper):void { $index = (int)$processWrapper->getOutput(); $this->results[] = $index; } public function run():void { $pool = new Pool(); $pool ->setProcessFactory([$this, 'processFactory']) ->setCallback([$this, 'processCallback']) ->execute() ; } }