Сканирование + атомарный захват как альтернатива очередям

Начну с задачи: Необходимо реализовать сервис который будет обрабатывать сложные задания (обработка видео, парсинг и др).

Хотел найти решение, которое не требует очередей и при этом дает возможность масштабировать и контролировать нагрузку.

В процесс поиска лучшего, для меня, решения пришел к схеме ниже.

┌─────────────────────────────────────────────────────────────────┐
│                      SCANER (Supervisor)                        │
│                                                                 │
│  1. Сканирует БД, находит задания, которые нужно выполнить      │
│  2. Для каждой задачи:                                          │
│     HSETNX match:data:{task_id} field1 value1 ...               │
│     EXPIRE match:data:{task_id} 7200                            │
│  3. Удаляет задания, которые больше не актуальны                │
└─────────────────────────────────────────────────────────────────┘
                                ▼
┌─────────────────────────────────────────────────────────────────┐
│                            REDIS                                │
│                                                                 │
│  task:data:123    → сериализованные данные задачи               │
│  task:data:456    → сериализованные данные задачи               │
│  task:lock:123    → worker_id (если есть)                       │
│  task:lock:456    → worker_id (если есть)                       │
└─────────────────────────────────────────────────────────────────┘
                                ▼
┌─────────────────────────────────────────────────────────────────┐
│                      WORKERS (Supervisor)                       │
│                                                                 │
│  Worker 1 ──┐                                                   │
│  Worker 2 ──┼── Каждый в бесконечном цикле:                     │
│  Worker 3 ──┤                                                   │
│  ...        │  1. SCAN task:data:*                              │
│             │  2. Для каждого ID:                               │
│             │     SETNX task:lock:{id} {worker_id} EX 60        │
│             │     Если получилось → взят в работу               │
│             │  3. Если нет свободных задач → sleep(5)           │
└─────────────────────────────────────────────────────────────────┘

🧠 Как работает схема

Разделим зоны ответственности и сделаем REDIS шиной данных.

SCANNER (1 процесс)                 WORKER (N процессов)
─────────────────                   ───────────────────
• Следит за БД                      • Ждет заданий в Redis
• Создает (задания)                 • Захватывает SETNX
• Удаляет неактуальные              • Выполняет работу
• Никогда не парсит                 • Фиксирует результат в БД


REDIS КАК ШИНА ДАННЫХ + АТОМАРНЫЕ ОПЕРАЦИИ              
───────────────────────────────────────────────                
• Redis - единое место встречи SCANNER и WORKER'ов             
• Данные заданий хранятся в Redis (task:data:*)               
• Блокировки через SETNX гарантируют отсутствие гонок          
• SCANNER не знает о WORKER'ах, WORKER'ы не знают о SCANNER'е 

1. SCANNER (php-процесс)

Чтобы отказаться от очередей, я придумал SCANNER (php-процесс), управляемый Supervisor. Вместо использования очередей, он выполняет роль синхронизатора: периодически проверяет наличие новых заданий и поддерживает в Redis актуальный набор ключей для воркеров. В Redis хранятся только те задания, чей статус требует обработки, что исключает появление неактуальных задач.

2. WORKER (php-процесс)

Далее, настраиваем Supervisor таким образом, чтобы он всегда поднимал нужное нам количество воркеров.

WORKER это отдельный php скрипт, процесс - задача которого реагировать на появление заданий и начинать обработку (в моем случае парсить по сложной логике).

После завершения, воркер должен зафиксировать в базе, что задание выполнено (нужно для SCANNER)

Как гарантировать отсутствие гонки?

Тут нам помогут атомарные операции REDIS SETNX task:lock:{id} {worker_id} EX 60

Сразу после получения задания воркер пытается зафиксировать - создать lock ключ для задания. В моем случае у каждой задачи есть свой уникальный идентификатор в базе, его и будет использовать для lock.

Если у worker получается выполнить SETNX task:lock:{id} {worker_id} EX 60, то он начинает работу и отчитывается что задание выполняется (в базу).

На этом этапе можно сохранить pid, время и другую мета информацию для мониторинга.


🎯 Ключевые моменты:

Механизм избежания гонок

  • Атомарность захвата обеспечивает Redis команда SETNX
  • TTL блокировки (60 сек) защищает от "мертвых" воркеров
  • Каждый воркер уникально идентифицируется (ID задания)

Требования к окружению

  • Redis (любая версия с поддержкой SETNX)
  • Supervisor (для управления процессами)

Итоги

Теперь остается мониторить и подобрать оптимальное для вас количество воркеров - все зависит от ваших задач и ресурсов, выделенных сервису. Мне хватает 10, и это с запасом.

В данной статье нет кода: цель была обсудить сам подход и нюансы реализации. Если нужна помощь с кодом - пишите.

Спасибо за внимание! Жду ваши мысли в комментариях или в соцсетях.


Обсудить пост:

🔥 И не забудь подписаться :)