webgriffe / esb
Simple, beanstalkd powered, ESB framework.
Installs: 3 133
Dependents: 0
Suggesters: 0
Security: 0
Stars: 32
Watchers: 7
Forks: 3
Open Issues: 7
Requires
- php: ~7.4.0|~8.0.0|~8.1.0|~8.2.0|~8.3.0
- ext-json: *
- ext-mbstring: *
- ext-pcntl: *
- amphp/amp: ^2.5
- amphp/beanstalk: ^0.3.2
- amphp/file: ^2.0
- amphp/http: ^1.1
- amphp/http-server: ^2.0
- amphp/http-server-form-parser: ^1.1
- amphp/socket: ^1.0
- doctrine/annotations: ^1.8
- dragonmantank/cron-expression: ^2.0
- monolog/monolog: ^1.23
- nikic/fast-route: ^1.3
- pagerfanta/pagerfanta: ^2.4
- psr/log: ^1.1
- ramsey/uuid: ^3.8
- symfony/config: ^4.3
- symfony/dependency-injection: ^4.3
- symfony/deprecation-contracts: ^2.1
- symfony/property-access: ^4.3
- symfony/property-info: ^4.3
- symfony/serializer: ^4.3
- symfony/yaml: ^4.3
- twig/twig: ^2.5
- webgriffe/amp-elasticsearch: ^2.1
- webmozart/assert: ^1.5
Requires (Dev)
- amphp/http-client: ^4.6
- amphp/phpunit-util: ^1.0
- mikey179/vfsstream: ^1.6
- pda/pheanstalk: ^3.1
- phpstan/phpstan: ^0.12
- phpunit/phpunit: ^9.5
- symplify/easy-coding-standard: ^11.1
- dev-master
- 2.6.2
- 2.6.1
- 2.6.0
- 2.5.1
- 2.5.0
- 2.4.0
- 2.3.0
- 2.2.1
- v2.2.0
- v2.1.0
- v2.0.0
- v1.1.2
- v1.1.1
- v1.1.0
- v1.0.x-dev
- v1.0.0
- v0.2.3
- v0.2.2
- v0.2.1
- v0.2.0
- v0.1.1
- v0.1.0
- dev-elasticsearch-indexes-configuration
- dev-job-not-found-retrials
- dev-allow-higher-max-body-size
- dev-call-reset-on-logger
- dev-php8-support
- dev-github-actions
- dev-es-batch-index
- dev-phpstan-upgrade
This package is auto-updated.
Last update: 2024-11-22 15:54:58 UTC
README
Simple, beanstalkd powered, ESB framework.
Introduction
Webgriffe ESB is a PHP framework that aims to speed up the development of Enterprise Service Buses.
It uses Beanstalkd as a queue engine and Elasticsearch as jobs persistence layer. It's built on top of popular open-sourced PHP libraries like:
Architecture & Core concepts
Integrating different systems together is a matter of data flows. With Webgriffe ESB every data flow goes one way, from a system to another through a Beanstalkd tube. Every tube must have a producer which creates jobs and a worker that processes them. So data goes from the producer to the worker through the tube.
With Webgriffe ESB you integrate different systems by only implementing producers and workers. The framework takes care of the rest.
Webgriffe ESB is designed to use a single binary which is used as the main entry point of the whole application; all the producers and workers are started and executed by that single PHP binary. This is possible thanks to the Amp concurrency framework.
Requirements
- PHP 7.4, 8.0, 8.1, 8.2, 8.3
- Beanstalk
- Elasticsearch 7.*
Installation
Require this package using Composer:
composer require webgriffe/esb
Configuration
Copy the sample configuration file into your ESB root directory:
cp vendor/webgriffe/esb/esb.yml.sample ./esb.yml
The esb.yml
file is the main configuration of your ESB application, where you have to define flows with their worker and producer services.
services: _defaults: autowire: true # This is optional (see https://symfony.com/doc/current/service_container/autowiring.html) My\Esb\Producer: # A producer service definition arguments: [] My\Esb\Worker: # A worker service definition arguments: [] flows: sample_flow: # The flow "code" and will be the Beanstalkd tube name description: Sample Flow # The flow description producer: service: My\Esb\Producer # A producer service ID defined above batch_size: 1200 # Jobs are produced in batches of 1200 jobs. Optional: default is 1000 worker: service: My\Esb\Worker # A worker service ID defined above instances: 1 # The number of worker instances to spawn for this flow error_retry_delay: 0 # The number of seconds to wait before an errored job can be retried. The default is 0 (errored jobs can be retried immediately). Useful when "retrying later" might solve the problem. release_delay: 0 # (deprecated) older name of the error_retry_delay parameter max_retry: 5 # The number of maximum work retries for a job in this tube/flow before being buried dependencies: # This whole section can be omitted if the current flow has no dependencies flows: ['other_flow_1', 'other_flow_2'] # Optional: dependencies of this flow toward other flow(s) delay_after_idle_time: 1000 # Optional: delay that a worker with dependencies waits before working the first job received after the tube was empty initial_polling_interval: 1000 # Optional: initial polling delay that a worker waits when it has to wait for a dependency that is not idle maximum_polling_interval: 60000 # Optional: maximum polling delay that a worker waits when it has to wait for a dependency that is not idle polling_interval_multiplier: 2 # Optional: polling delay increase factor whenever a worker is waiting for a dependency that is not idle es_index_settings: # Optional: the update ElasticSearch index API settings (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html#update-index-settings-api-request-body). This is useful if you want to control index settings. index: # For example you can set the total_fields limit to an higher (or lower) value mapping: total_fields: limit: 2000 es_index_mapping: # Optional: the update ElasticSearch index API mapping (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html#put-mapping-api-request-body). This is useful if you want to control index mapping. properties: title: type: text es_index_aliases: # Optional: the update ElasticSearch index API aliases (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-add-alias.html#add-alias-api-request-body). This is useful if you want to control index aliases. my_alias: is_hidden: true other_flow_1: # ... other_flow_2: # ...
The services
section is where you have to define your worker and producer services using the syntax of the Symfony Dependency Injection component.
In the services
section you also find two services Webgriffe\Esb\Producer\CleanOldJobs and Webgriffe\Esb\Worker\CleanOldJobs that you should keep if you want to enable the flow that periodically deletes old jobs.
The flows
section is where you have to define your ESB flows. Every flow must refer to a producer and a worker service defined in the services
section.
In the flows
section you also find the definition of the clean_old_jobs_flow
: you should keep it if you want old jobs to be periodically deleted.
This is also the section where dependencies between flows are defined. See the Dependencies section for details.
You also have to define some parameters under the parameters
section. Please refer to the esb.yml.sample
file for more informations about required parameters. Usually it's better to isolate parameters in a parameters.yml
file which can be included in the esb.yml
as follows:
# esb.yml imports: - { resource: parameters.yml} services: # ... flows: # ...
# parameters.yml parameters: beanstalkd: tcp://127.0.0.1:11300 # Other parameters here ...
Please refer to the sample configuration file for the complete list of parameters and for more information about the configuration of your ESB.
Dependencies
It is possible to specify dependencies across flows, which ensure that a flow cannot process any job as long as one or more configured flows are working their jobs. This is done by using the dependencies
configuration section: if you want flow A to depend on flow B, you specify the dependencies
section in flow A's configuration to list flow B:
# esb.yml # ... flows: flow_B: #... flow_A: #... dependencies: flows: ['flow_B']
When one such dependency is specified so that flow A depends on flow B, whenever flow B is working some job and/or it has queued jobs, then flow A will still produce and queue new jobs, but it will not work them. When flow B finishes processing its last job and its Beanstalk tube is empty, then flow A begins to work through its jobs.
If a new job is created for flow B while flow A is working, flow A will complete the job (or jobs, if there are multiple workers) that was already being worked and then it will stop until all its dependencies are idle (empty tube and no jobs being worked).
Dependencies can also be multiple, meaning that flow A can depend on both flow B and flow C (and more, if needed). In this case flow A will wait until all its dependencies are idle. To declare multiple dependencies, simply list all dependencies in the dependencies.flows
field.
Indirect dependencies are not honored. This means that if flow A depends on flow B, which in turn depends on flow C, a job for flow C will block flow B, but it will not stop flow A. Flow A will only check flow B. If you want flow A to also check flow C, simply make the dependency between flow A and flow C explicit by saying that flow A depends on both flows B and C.
When a flow depends on another, such as flow A depending on flow B, whenever flow A's worker extracts a job from its queue it will check its dependencies to ensure that they are all idle. If one is found that is not idle, flow A will begin polling that dependency to see when it finishes. If desired, the timing of this polling action can be controlled with a few configuration parameters (these are all optional):
initial_polling_interval
(default 1000ms) is the number of milliseconds that flow A will wait the first time, before rechecking flow B's status.polling_interval_multiplier
(default: 2) after the first delay defined byinitial_polling_interval
, each subsequent delay is obtained by multiplying the previous one by this parameter. The default value of 2 means that each time the delay doubles, so if the first delay is 1000ms, the second will be 2000ms, then 4000ms etc. Setting this value to 1 forces the system to keep on using the initial delay value without change.maximum_polling_interval
(default: 60000ms) the exponential increase imposed bypolling_interval_multiplier
is limited by this value, which is the maximum possible polling interval.delay_after_idle_time
(default: 1000ms) sometimes it may happen that flow A's and flow B's producers begin producing new jobs at the same time. If flow A was idle and its producer manages to produce the first job before flow B's, flow A's worker may begin to work the new job before flow B's producer has had time to produce its job, and so the dependency would be violated. To solve this, whenever a flow with some dependencies has been waiting for some time for new jobs to arrive (flow A in our example), upon receiving a new job it will wait a time specified bydelay_after_idle_time
before resuming operation. This gives flow B's producer enough time to publish its job in its tube, which will ensure that flow A will wait for its dependency when the timeout expires. This parameter should not need changing unless one is dealing with a very slow producer, in which case it may be appropriate to increase it.
Notice that the exponential polling time increase is reset for each dependency: if a flow depends on multiple other flows, each time a dependency goes idle the timing parameters are reset before checking the next dependency.
Producers
A producer can be any service whose class implements the ProducerInterface
. However, implementing only the ProducerInterface
is not enough: every producer must implement also one of the supported producer type interfaces. This is because the framework must know when to invoke every producer. At the moment these are the supported producer types:
RepeatProducerInterface
: these producers are invoked repeatedly every fixed interval.CrontabProducerInterface
: these producers are invoked when their crontab expression matches.HttpRequestProducerInterface
: these producers are invoked when the ESB's HTTP server receives a corresponding HTTP request.
Refer to these interfaces in the source code for more information.
The produce
method of the ProducerInterface
must return an Amp's Iterator, this allows you to produce a collection of jobs with a single produce
invocation. Moreover iterators allows to have long running produce operations which are executed asyncronously.
Also, keep in mind that you should never use I/O blocking function calls inside your producers. Use Amp or ReactPHP libraries when you need to do I/O operations.
See the dummy producers in the tests/ directory for some examples.
Workers
A worker can be any service whose class implements the WorkerInterface
. Every worker is invoked immediately when a job is available on its flow's tube.
The work
method of a worker must return an Amp's Promise that must resolve when the job is worked succesfully. Otherwise the work
method must throw an exception.
When a worker successfully works a job, the ESB framwork deletes it from the tube. Conversely, when a worker fails to work a job the ESB framework keeps it in the tube for a maximum of a max_retry
times. If an error_retry_delay
other than 0 is specified, then the job will not be retried for at least the specified number of seconds. If the maximum number of retries is exceeded, the job is buried and a critical event is logged.
Like for producers, you should never use I/O blocking function calls inside your workers. Use Amp or ReactPHP libraries if you need to do I/O operations.
See the dummy workers in the tests/ directory for some examples.
Initialization
WorkerInterface
and ProducerInterface
both support an init
method which is called by the ESB framework at the boot phase.
The init
method must return an Amp's Promise. This allows you to perform initialization operations asyncronously (for example instantiating a SOAP client with a remote WSDL URL).
Unit testing
You can (and should) also unit test your workers and producers. Because workers and producers must return promises and iterators you have to use the Amp loop inside your tests. You should also use the amphp/phpunit-util to reset the loop state between tests.
Unit test example
Here follows an example of a producer test which verifies that the producer produces stock inventory update jobs based on an XML file in a given directory.
public function testShouldProduceMultipleJobsWithMultipleEntriesFile() { filesystem(new BlockingDriver()); vfsStream::setup(); $this->importFile = vfsStream::url('root/stock.xml'); $this->producer = new Stock($this->importFile); copy(__DIR__ . '/StockTestFixtures/multiple_entries.xml', $this->importFile); $this->jobs = []; Loop::run( function () use ($data) { $iterator = $this->producer->produce($data); while (yield $iterator->advance()) { $this->jobs[] = $iterator->getCurrent(); } } ); $this->assertCount(52, $this->jobs); $this->assertEquals(new Job(['sku' => 'SKU-1', 'qty' => 9519.000]), $this->jobs[0]); $this->assertEquals(new Job(['sku' => 'SKU-23', 'qty' => 299.000]), $this->jobs[12]); $this->assertEquals(new Job(['sku' => 'SKU-50', 'qty' => 2017.000]), $this->jobs[21]); }
Here follows the example of a unit test for the related worker which takes the SKU and quantity to update from the job and then performs an API call to update the quantity.
public function testWorksSimpleJob() { $this->sessionId = random_int(1, 1000); $this->client = $this->prophesize(Client::class); $this->clientFactory = $this->prophesize(Factory::class); $this->clientFactory->create()->willReturn(new Success($this->client->reveal())); $this->worker = new Stock($this->clientFactory->reveal()); $sku = 'SKU-1'; $qty = 10; $this->client ->login() ->shouldBeCalled() ->willReturn(new Success($this->sessionId)) ; $this->client ->call('cataloginventory_stock_item.update', [$sku, ['qty' => $qty, 'is_in_stock' => true]]) ->shouldBeCalled() ->willReturn(new Success(true)) ; $this->client->endSession()->shouldBeCalled()->willReturn(new Success()); $job = new QueuedJob(1, ['sku' => $sku, 'qty' => $qty]); Loop::run(function () use ($job) { yield $this->worker->init(); yield $this->worker->work($job); }); }
Web Console
A web console UI is available and allows to inspect tubes and jobs; is it also possible to search jobs and requeue them. The web console is currentyl only available under HTTP (not HTTPS) and must be configured using the following parameters:
# esb.yml parameters: # ... console_port: 8080 # Web console port console_username: admin # Web console username console_password: password # Web console password console_log_file: /tmp/esb_console.log # Web console server log file # ...
For example, given the configuration above, you can access to the web console at the URL http://<ip_or_hostname>:8080/
using admin
as username and password
as password.
The web console HTTP server must be set on a different port then the one used by the HttpRequestProducerInterface
producers (and identified by the http_server_port
parameter).
In the web console dashboard you can check the status of the configured flows:
By clicking on a flow, you can see a searchable list of jobs for that flow. The list is paginated and it allows one to manually requeue jobs, if needed:
Clicking on a single job or on the view button, one can see further details of a specific job, as well as forcing it back in the queue:
Deployment
As said all workers and producers are managed by a single PHP binary. This binary is located at vendor/bin/esb
. To deploy and run your ESB application all you have to do is deploy your application as any other PHP application (for example using Deployer) and make sure that vendor/bin/esb
is always running (we suggest to use Supervisord for this purpose).
Keep in mind that the vendor/bin/esb
binary logs its operations to stdout
and it reports errors using error_log()
function. With a standard PHP CLI configuration all the error_log()
entries are then redirected to stderr
. This is done through Monolog's StreamHandler and ErrorHandler handlers. Moreover all warning (or higher level) events are handled by the NativeMailHander (configured with logger_mail_to
and logger_mail_from
parameters).
You can also add your own handlers using the esb.yml
configuration file.
Contributing
To contribute simply fork this repository, do your changes and then propose a pull request.
We recommend to use Docker. Indeed a docker-compose.yml
file is provided.
Just copy the .env.dist
file to .env
and adjust the environment variables values according to your environment.
For example, to run the entire test suite (PHP Code Sniffer, PHPStan, PHPUnit, etc...) you can simply run:
docker-compose run php composer tests
Or to only run PHPUnit tests you can use:
docker-compose run php vendor/bin/phpunit
The test suite uses the ESB_BEANSTALKD_URL
environment variable to get the connection URL of the Beanstalkd instance. This environment variable is already set in the provided docker-compose.yml
file.
You can also run an instance of the ESB locally using Docker. You must create an esb.yml
configuration file in the root directory and then run:
docker-compose up
License
This library is under the MIT license. See the complete license in the LICENSE file.
Credits
Developed by Webgriffe®.