small/swoole-patterns

This project provides implementations of async design patterns for PHP OpenSwoole projects.

22.4.1 2024-02-08 21:05 UTC

README

small-swoole-patterns.svg

small-swoole-patterns

coverage-badge.png             tests-badge.png

About

This project provides implementations of async design patterns for PHP OpenSwoole projects.

Install

Install openswoole :

pecl install openswoole

Require the package with composer:

composer require small/swoole-patterns

Patterns

Array

Map

This is an evolution of swoole function \Coroutine\map

It is a class to map array on a callback. More verbose than \Coroutine\map but allow you waiting later in code.

<?php
use Small\SwoolePatterns\Array\Map;

// First map
$map1 = (new Map([1, 2, 3], function($value) {
    return $value - 1;
}))->run();

// Second map
$map2 = (new Map([4, 6, 8], function($value) {
    return $value / 2
}))->run();

// Wait two maps finished
$map1->wait();
$map2->wait();

// Merg results and dump result;
$result = array_merge($map1, $map2);
var_dump($result);

Stack

The stack design pattern is intend to access data in fill order. There is two types of stack : first in first out (FIFO) and last in first out (LIFO).

Here is an example of FIFO stack :

use Small\SwoolePatterns\Array\Stack;
use Small\SwoolePatterns\Array\Enum\StackType;

$stack = new Stack([], StackType::fifo);

$stack->push(1)
    ->push(2)
    ->push(3)
;
foreach (range(1, 3) as $i) {
    echo $stack->pull() . "\n";
}

Will output :

1
2
3

For LIFO, the order will be inverted :

use Small\SwoolePatterns\Array\Stack;
use Small\SwoolePatterns\Array\Enum\StackType;

$stack = new Stack([], StackType::lifo);

$stack->push(1)
->push(2)
->push(3)
;
foreach (range(1, 3) as $i) {
    echo $stack->pull() . "\n";
}

Will output :

3
2
1

State

Simple use

The simplest use of this implementation of state design pattern is a class that represent a set of states in a system with a registry to manage them :

use Small\SwoolePatterns\State\StateRegistry;
use Small\SwoolePatterns\State\State;

enum ButtonType {
    case pressed;
    case released;
}

$registry = (new StateRegistry());
    ->addState('buttonOk', State::class)
;

$registry->getState('buttonOk')
    ->set(ButtonType::pressed)
;

echo $registry->getState('buttonOk')->get()->name;

Will produce :

pressed

Encode state

You can add an encoder like json_encode or other by implementing StateEncoderInterface :

use Small\SwoolePatterns\State\StateRegistry;
use Small\SwoolePatterns\State\Contract\StateEncoderInterface;
use Small\SwoolePatterns\State\State;

class JsonState extends State implements StateEncoderInterface
{
    
    public function encodeState(mixed $state): mixed
    {

        return json_encode($state);

    }

    public function decodeState(mixed $encodedState): mixed
    {

        return json_decode($encodedState);

    }
    
    public function getRawState(): string
    {
    
        return $this->state;
    
    }

}

$registry = (new StateRegistry());
    ->addState('array', JsonState::class)
;

$registry->getState('array')->set(['pressed' => true, 'force' => 12.8]);

echo $registry->getState('array')->get()['force'];

Will produce :

12.8

But calling getRawState method will produce:

{"pressed": true, "force": 12.8}

Action on state change

Implementing OnChangeStateInterface allow you to do actions depending on state change.

For example, changing another state :

use Small\SwoolePatterns\State\StateRegistry;
use Small\SwoolePatterns\State\State;
use Small\SwoolePatterns\State\Contract\OnChangeStateInterface;

enum ActionStateType: int
{

    case first = 1;
    case second = 2;
    case third = 3;

}

class ActionOnState extends State implements OnChangeStateInterface
{

    public function __construct(protected State $triggeredState) {}

    public function set(mixed $state): self {

        if (!$state instanceof ActionStateType) {
            throw new \Exception('Wrong type');
        }

        parent::set($state);

        return $this;

    }

    public function onChange(mixed $stateBefore, mixed $stateAfter): void
    {

        if ($stateBefore < $stateAfter) {
            $this->triggeredState->set('upgrade');
        } else {
            $this->triggeredState->set('downgrade');
        }

    }

}

($registry = new StateRegistry())
->addState(
    'trigger',
    State::class)
->addState(
    'iterator',
    ActionOnState::class,
    $registry->getState('trigger')
);

$registry->getState('iterator')
    ->set(ActionStateType::first);
echo $registry->getState('trigger') . '\n';

$registry->getState('iterator')
    ->set(ActionStateType::third);
echo $registry->getState('trigger') . '\n';

$registry->getState('iterator')
    ->set(ActionStateType::second);
echo $registry->getState('trigger') . '\n';

Will produce :

upgrade
upgrade
downgrade

Observable

This is an implementation of observable pattern on a callback.

use Small\SwoolePatterns\Observable\Observable;

// Create callback
$get = function ($host): string {
    return (new Coroutine\Http\Client($host, 80, true))
        ->get('/');
};

// Create observer
$getObserver = (new Observable($get))
    ->subscribe(function(string $html) {
        // left method handle result
        echo $html;
    }, function(\Exception $e) {
        // Right method handle exceptions
        echo 'Can\'t get url : ' . $e->getMessage();
    })
;

$getObserver
    ->run('www.google.com')
    ->run('www.yahoo.com')
    ->run('www.qwant.com')
;

$getObserver->wait();

In this example, we print homepage of google, yahoo and qwant on async calls.

Async tasks

Async task are functions or methods that run async.

In this implementation, a TaskCollection class will run a set of tasks and a join method will wait execution for all tasks ends.

A task can enclose :

  • A callback
  • A static method
  • A task class that extends basic Task

For example :

use Small\SwoolePatterns\AsyncTask\Task;
use Small\SwoolePatterns\AsyncTask\TaskCollection;

class MaTask extends Task
{

    public function __construct()
    {
        parent::__construct([$this, 'tache']);
    }

    public static function tache(): string
    {

        return 'tester';

    }

}

(new TaskCollection([
    $impots = new Task(fn($url) => $url . '/test', $urlImpots = 'impots.gouv.fr'),
    $yahoo = new Task([static::class, 'getNull']),
    $google = new Task([static::class, 'getUrl'], $urlGoogle = 'google.com'),
    $maTask = new MaTask(),
]))->join();

echo $impots->getResult() . '\n';
echo $maTask->getResult() . '\n';

will output :

impots.gouv.fr/test
tester

Pool

This is an implementation of pool pattern.

Pools are useful to manage async processes in order to manage connections to server resources.

Create the Pool

$pool = new \Small\SwoolePatterns\Pool\Pool(
    new \Small\SwoolePatterns\Manager\Connection\PRedisClientManager('tcp://my-redis.net'),
    10,
    100
);

Here we have created

  • A PRedis client pool (first parameter)
  • With a maximum of 10 clients at the same time (second parameter).
  • If no more clients available, the pool try to lock a new client every 100µs (third parameter)

Using client process

To get a client, use get method :

$client = $pool->get();

You have now locked the client and can use it :

$client->get('my-app:key')

Now we have finished the use, we must release the client :

$pool->put($client);

Putting together in async process will be :

$pool = new \Small\SwoolePatterns\Pool\Pool(
    new \Small\SwoolePatterns\Manager\Connection\PRedisClientManager('tcp://my-redis.net'),
    10,
    100
);

(new \Small\SwoolePatterns\Array\Map(range(1, 100), ($i) use($pool) => {
    $client = $pool->get();
    $client->put('my-app:sum:' . $i, $i +$i);
    $pool->put($client);
}));

In this use case, the number of concurrent connections can't be more than 10 connections at the same time even the process is async.

Using one client destroy the async advantages while using client will wait for previous end.

Using a new client at each time can overload your memory and server.

Rate control

You can control the server limitations using rate control.

Activating rate control

($pool = new \Small\SwoolePatterns\Pool\Pool(
    new \Small\SwoolePatterns\Manager\Connection\PRedisClientManager('tcp://my-redis.net'),
    10,
    100
))->activateRateController(100, 10);

In this code, the pool is waiting you are getting clients no more than 100µs. If less, it will wait 10µs before retry.

Using rate control for server limitation

For this example, we will consider that you want to connect to a provider http api. You have a limitation of 3000 requests by minutes.

You can activate a unit control to observe the provider limitations even your code is faster :

($pool = new \Small\SwoolePatterns\Pool\Pool(
    new \Small\SwoolePatterns\Manager\Connection\HttpClientManager('api.my-provider.net'),
    10,
    100
))->activateRateController(100, 10)
    ->addUnitToControl('minutes', 60, \Small\SwoolePatterns\Pool\Enum\RateBehaviour::waiting, 3000);

(new \Small\SwoolePatterns\Array\Map(range(1, 100), ($productId) use($pool) => {
    $client = $pool->get();
    $uri = 'getProduct/{productId}'
    $product = json_decode($client->get(str_replace('{productId}', $productId, $uri)));
    $pool->put($client);
    
    return $product;
}))->run()->wait();

Resource

You can manage resource access with resource pattern :

$factory = new \Small\SwoolePatterns\Resource\ResourceFactory();
$resource = $factory->get('testResource1');
$ticket = $resource->acquireResource(\Small\SwoolePatterns\Resource\Enum\GetResourceBehaviour::exceptionIfNotFree);
$resource->releaseResource($ticket);

In async processes you can wait the others processes to unlock resource :

$factory = new \Small\SwoolePatterns\Resource\ResourceFactory();
$resource = $factory->get('testResource1');
$ticket = $resource->acquireResource(\Small\SwoolePatterns\Resource\Enum\waitingForFree);
$resource->releaseResource($ticket);

Or manage yourself the waiting process :

$factory = new \Small\SwoolePatterns\Resource\ResourceFactory();
$resource = $factory->get('testResource1');
$ticket = $resource->acquireResource(\Small\SwoolePatterns\Resource\Enum\GetResourceBehaviour::getTicket);
while ($ticket->isWaiting()) {
    doStuff();
    usleep(100);
}
doResourceStuff();
$resource->releaseResource($ticket);