etouches / phplumber
Pipelining library which allows a mix of asynchronous and synchronous processes.
Requires
- php: >=5.3
Requires (Dev)
- phpunit/phpunit: 4.8.*
This package is not auto-updated.
Last update: 2025-07-20 07:10:29 UTC
README
Phplumber is a very simple pipelining library for PHP which allows a mix of asynchronous and synchronous processes. Use it to break a large process into multiple steps using a queue and multi-processing. Phplumber contains no hard-coded dependencies and can be backed by any queue setup and any storage mechanism.
Requirements
- PHP 5.3+
- A queue, such as RabbitMQ or Redis
- Storage, such as a relational database table or Redis
Example
Let's take creating and filling a database as an example. It takes multiple steps and some can be done concurrently.
- Create database (one process)
- Create and populate tables (one process per table)
- Create views dependent on multiple tables (one process, dependent on all tables existing)
Create table 1
/ \
Create database -> Create table 2 -> Create views
\ /
Create table 3
First we would define our processes. Sequential steps extend Process
. A step that can run multiple times with
different data extends MultiProcess
.
class CreateDatabase extends Process { public function invoke($payload) { $database_name = $payload['database_name']; echo "Drop database $database_name if it exists...\n"; echo "Creating database $database_name...\n"; } }
class CreateTable extends MultiProcess { // Determine the data we need to queue for async processes public function getAsyncPayloads($payload) { $database_name = $payload['database_name']; $table_names = array('first_table', 'second_table', 'third_table'); $payloads = array(); foreach ($table_names as $table) { $payloads[] = array('database_name' => $database_name, 'table_name' => $table); } return $payloads; } public function invoke($payload) { $database_name = $payload['database_name']; $table_name = $payload['table']; echo "Connecting to database $database_name...\n"; echo "Creating and populating $table_name...\n"; switch ($payload['table']) { case 'first_table': // Create table and insert rows... break; // ... } } }
Then we define the sequence of processes.
class CreateAndFillDatabase extends ProcessList { protected function setup() { $this ->add('CreateDatabase') ->add('CreateTable') ->add('CreateViews'); } }
We can now kick off the process sequence.
$equation = new CreateAndFillDatabase(); $equation->process(array('database_name' => 'test_db'));
See the examples
directory for complete, working demos.
Getting Started
- Implement
StorageInterface
. This will hold semaphore data. Appropriate storage engines include any relational database, nosql, or key-value stores such as Redis. - Extend the
Queue
class to integrate with a queue system. Appropriate queue engines include Redis and RabbitMQ. - Write each process as a class that extends
Process
(for synchronous) orMultiProcess
(for asynchronous). - Implement
ProcessFactoryInterface
. This will create each instance ofProcessInterface
, allowing you to set up your processes with any prerequisites, such as a database connection or configuration options. - Put the processes together by extending
ProcessList
. - Implement a worker daemon which instantiates your
Queue
implementation and callsconsume()
to listen for incoming messages. Each message is to invoke a single part of a multi-process. Run multiple workers to execute processes concurrently. - Choose a place in your system to start the entire workflow, instantiate your
ProcessList
, and callprocess()
, passing it the initial payload.