beeline/yii2-pgsql-advisory-mutex

PostgreSQL transaction-level advisory locks based mutex for Yii2

Installs: 0

Dependents: 0

Suggesters: 0

Security: 0

Stars: 0

Watchers: 0

Forks: 0

Open Issues: 0

Type:yii2-extension

pkg:composer/beeline/yii2-pgsql-advisory-mutex

1.0.2 2025-11-08 16:11 UTC

This package is auto-updated.

Last update: 2025-11-08 16:14:08 UTC


README

Tests Codecov Packagist Version Packagist License Packagist Downloads

Реализация распределённого mutex для Yii2 на основе транзакционных advisory locks PostgreSQL. Обеспечивает безопасную, быструю и надёжную распределённую блокировку без физического хранения в таблицах.

Возможности

  • Транзакционные блокировки: Автоматическое освобождение при COMMIT/ROLLBACK
  • Совместимость с PgBouncer: Работает с пулингом соединений в любом режиме (session/transaction/statement)
  • Поддержка таймаутов: Таймаут захвата блокировки с миллисекундной точностью
  • Разделяемые/эксклюзивные режимы: Поддержка паттернов читатель-писатель
  • Нулевые накладные расходы на хранение: Виртуальные блокировки без физических строк в таблицах
  • Генерация ключей через xxHash64: Быстрое, устойчивое к коллизиям хеширование ключей блокировок
  • PHP 8.4+: Современный PHP со строгой типизацией

Установка

composer require beeline/yii2-pgsql-advisory-mutex

Требования

  • PHP >= 8.4
  • PostgreSQL >= 9.1
  • Yii2 >= 2.0.45
  • ext-pgsql

Настройка

1. Применение миграции

Mutex требует наличия PostgreSQL функции try_advisory_xact_lock_timeout. Примените миграцию:

# Используя команду миграции Yii2
php yii migrate --migrationPath=@vendor/beeline/yii2-pgsql-advisory-mutex/src/migrations

Или вручную выполните SQL из файла src/migrations/m250202_000000_create_advisory_lock_timeout_function.php.

2. Конфигурация приложения

return [
    'components' => [
        'mutex' => [
            'class' => \beeline\PgsqlAdvisoryMutex\PgsqlAdvisoryMutex::class,
            'db' => 'db', // ID компонента базы данных
        ],
    ],
];

Использование

Базовая блокировка

use Yii;

$mutex = Yii::$app->mutex;

// Захват блокировки (ожидание бесконечно)
if ($mutex->acquire('my_lock')) {
    try {
        // Критическая секция - только один процесс выполняет это одновременно
        performCriticalOperation();
    } finally {
        $mutex->release('my_lock');
    }
} else {
    // Не удалось захватить блокировку
}

Поддержка таймаутов

// Попытка захватить блокировку с таймаутом 5 секунд
if ($mutex->acquire('my_lock', 5)) {
    try {
        performCriticalOperation();
    } finally {
        $mutex->release('my_lock');
    }
} else {
    // Истёк таймаут или блокировка удерживается другим процессом
    echo "Не удалось захватить блокировку в течение 5 секунд\n";
}

// Без ожидания (timeout = 0)
if ($mutex->acquire('my_lock', 0)) {
    // Получили блокировку немедленно
} else {
    // Блокировка занята
}

Разделяемые блокировки (паттерн читатель-писатель)

// Несколько читателей могут одновременно захватить разделяемые блокировки
$readerMutex = new \beeline\PgsqlAdvisoryMutex\PgsqlAdvisoryMutex([
    'db' => Yii::$app->db,
    'sharedMode' => true,
]);

if ($readerMutex->acquire('resource')) {
    // Несколько читателей могут находиться здесь одновременно
    $data = readResource();
    $readerMutex->release('resource');
}

// Писатель использует эксклюзивную блокировку (блокирует и читателей, и писателей)
$writerMutex = new \beeline\PgsqlAdvisoryMutex\PgsqlAdvisoryMutex([
    'db' => Yii::$app->db,
    'sharedMode' => false, // по умолчанию
]);

if ($writerMutex->acquire('resource')) {
    // Эксклюзивный доступ
    writeResource($data);
    $writerMutex->release('resource');
}

Расширенное использование

$mutex = Yii::$app->mutex;

// Получение информации о текущих advisory locks в базе данных
$activeLocks = $mutex->getActiveLocks();
// Возвращает: [['pid' => 12345, 'locktype' => 'advisory', 'mode' => 'ExclusiveLock', ...], ...]

// Получение блокировок, захваченных этим экземпляром mutex
$acquired = $mutex->getAcquiredLocks();
// Возвращает: ['my_lock' => ['lockKey' => -1234567890, 'sharedMode' => false], ...]

// Освобождение всех блокировок, захваченных этим экземпляром
$count = $mutex->releaseAll();
echo "Освобождено {$count} блокировок\n";

Принцип работы

Транзакционные Advisory Locks

В отличие от session-level advisory locks, транзакционные блокировки автоматически освобождаются при коммите или откате транзакции. Это делает их безопасными для использования с пулингом соединений:

// Блокировка захватывается внутри транзакции
$mutex->acquire('my_lock');

// Если приложение упадёт или соединение потеряется,
// PostgreSQL автоматически освободит блокировку при откате транзакции

Генерация ключей блокировок

Имена блокировок хешируются в int64 с использованием xxHash64:

$mutex->acquire('user_123_profile');
// Внутренне: xxHash64('user_123_profile') -> -8234567890123456789

Вероятность коллизии с xxHash64 чрезвычайно мала (~10⁻¹⁹ для 1 миллиарда блокировок).

Реализация таймаута

Mutex использует настройку PostgreSQL lock_timeout для поддержки таймаутов:

-- Внутренне для timeout=2000ms
SET LOCAL lock_timeout = '2000ms';
SELECT pg_advisory_xact_lock(key);
-- lock_timeout восстанавливается после возврата функции

Важные ограничения

Не реентерабельные

Транзакционные advisory locks НЕ являются реентерабельными. Попытка захватить одну и ту же блокировку дважды в одной транзакции приведёт к блокировке:

$mutex->acquire('lock1'); // OK
$mutex->acquire('lock1'); // DEADLOCK - зависнет!

Вложенные транзакции

В окружениях с вложенными транзакциями (например, Codeception с TransactionForcer) блокировки НЕ будут освобождены до коммита самой внешней транзакции:

// В тесте Codeception с TransactionForcer
$mutex->acquire('lock1'); // Создаётся SAVEPOINT, не новая транзакция
$mutex->release('lock1'); // SAVEPOINT закоммичен, но xact lock НЕ освобождён
// Блокировка освобождается только при коммите/откате внешней тестовой транзакции

Совместимость с PgBouncer

Работает во всех режимах PgBouncer:

  • Session mode: ✅ Полная поддержка
  • Transaction mode: ✅ Полная поддержка (транзакционные блокировки)
  • Statement mode: ✅ Полная поддержка (каждый запрос в своей транзакции)

Параметры конфигурации

Свойство Тип По умолчанию Описание
db Connection|string 'db' Компонент базы данных или его ID
sharedMode bool false Использовать разделяемые блокировки (читатели) вместо эксклюзивных (писатель)
functionName string 'try_advisory_xact_lock_timeout' Имя функции PostgreSQL

Тестирование

Локальное тестирование с Docker

# Запуск PostgreSQL
docker-compose up -d

# Установка зависимостей
composer install

# Запуск тестов
vendor/bin/phpunit

# Запуск с покрытием
vendor/bin/phpunit --coverage-html coverage/

# Остановка PostgreSQL
docker-compose down

Переменные окружения

Настройте подключение к базе данных через переменные окружения:

export DB_HOST=localhost
export DB_PORT=5432
export DB_NAME=test_mutex
export DB_USER=postgres
export DB_PASSWORD=postgres

vendor/bin/phpunit

Производительность

Advisory locks обладают высокой производительностью:

  • Без дискового I/O: Блокировки хранятся только в памяти
  • Быстрый захват: Время захвата блокировки менее миллисекунды
  • Низкие накладные расходы: Минимальное использование CPU и памяти
  • Масштабируемость: Тысячи одновременных блокировок

Бенчмарк (PostgreSQL 16, одно ядро):

  • Захват/освобождение блокировки: ~0.1мс
  • Пропускная способность: ~10,000 операций/сек на соединение

Примеры использования

Распределённая обработка задач

// Гарантируем, что только один воркер обрабатывает каждую задачу
if ($mutex->acquire("task:{$taskId}", 0)) {
    try {
        processTask($taskId);
    } finally {
        $mutex->release("task:{$taskId}");
    }
}

Предотвращение cache stampede

$cacheKey = 'expensive_data';
$data = Cache::get($cacheKey);

if ($data === null) {
    if ($mutex->acquire($cacheKey, 5)) {
        try {
            // Двойная проверка после захвата блокировки
            $data = Cache::get($cacheKey);
            if ($data === null) {
                $data = computeExpensiveData();
                Cache::set($cacheKey, $data);
            }
        } finally {
            $mutex->release($cacheKey);
        }
    } else {
        // Запасной вариант, если не удалось захватить блокировку
        $data = computeExpensiveData();
    }
}

Координация миграций базы данных

// Гарантируем, что только один экземпляр запускает миграции
if ($mutex->acquire('schema_migration', 0)) {
    try {
        runMigrations();
    } finally {
        $mutex->release('schema_migration');
    }
}

Атомарные операции с внешними ресурсами

use beeline\PgsqlAdvisoryMutex\PgsqlAdvisoryMutex;

class FileProcessor
{
    private PgsqlAdvisoryMutex $mutex;

    public function __construct()
    {
        $this->mutex = new PgsqlAdvisoryMutex(['db' => Yii::$app->db]);
    }

    public function processFile(string $filename): void
    {
        // Используем имя файла как ключ блокировки
        if (!$this->mutex->acquire("file:{$filename}", 10)) {
            throw new \RuntimeException("Файл {$filename} уже обрабатывается");
        }

        try {
            // Только один процесс обрабатывает этот файл
            $content = file_get_contents($filename);
            $processed = $this->process($content);
            file_put_contents($filename, $processed);
        } finally {
            $this->mutex->release("file:{$filename}");
        }
    }
}

Распределённый счётчик с блокировкой

use beeline\PgsqlAdvisoryMutex\PgsqlAdvisoryMutex;

class DistributedCounter
{
    private PgsqlAdvisoryMutex $mutex;

    public function __construct()
    {
        $this->mutex = new PgsqlAdvisoryMutex(['db' => Yii::$app->db]);
    }

    public function increment(string $counterName): int
    {
        if (!$this->mutex->acquire("counter:{$counterName}", 5)) {
            throw new \RuntimeException('Не удалось захватить блокировку счётчика');
        }

        try {
            $current = (int)Cache::get($counterName, 0);
            $new = $current + 1;
            Cache::set($counterName, $new);
            return $new;
        } finally {
            $this->mutex->release("counter:{$counterName}");
        }
    }
}

Отладка

Просмотр активных блокировок

$mutex = Yii::$app->mutex;
$locks = $mutex->getActiveLocks();

foreach ($locks as $lock) {
    echo "PID: {$lock['pid']}, ";
    echo "Lock Key: {$lock['lock_key']}, ";
    echo "Mode: {$lock['mode']}, ";
    echo "Granted: " . ($lock['granted'] ? 'Yes' : 'No') . "\n";
}

Мониторинг блокировок через SQL

-- Просмотр всех advisory locks
SELECT
    pid,
    locktype,
    mode,
    granted,
    objid as lock_key
FROM pg_locks
WHERE locktype = 'advisory'
ORDER BY pid, objid;

-- Поиск заблокированных процессов
SELECT
    blocked_locks.pid AS blocked_pid,
    blocking_locks.pid AS blocking_pid,
    blocked_activity.usename AS blocked_user,
    blocking_activity.usename AS blocking_user,
    blocked_activity.query AS blocked_statement,
    blocking_activity.query AS blocking_statement
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks
    ON blocking_locks.locktype = blocked_locks.locktype
    AND blocking_locks.objid = blocked_locks.objid
    AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted
AND blocked_locks.locktype = 'advisory';

Лицензия

GNU Lesser General Public License 3.0