Необходимо спроектировать распределенную систему обработки задач (данных) на Python, где задачи хранятся в S3.
Можно использовать только S3, без развертывания или использования внешних сервисов. Вы можете свободно выбирать сторонние библиотеки. Мы можем упростить задачу — имена задач монотонно возрастают.
Решение должно поддерживать обработку задач ровно один раз (exactly once), учитывая отсутствие поддержки транзакций в S3? Стоимость запросов к S3 не имеет значения.
Подсказки:
- Каждый worker может быть избран лидером и реализует REST API для других рабочих.
- Протокол выбора лидера может помочь в координации worker-ов в распределении задач как их координатор.
- Не использовать S3 Etag для выбора лидера и блокировки.
- Можно оставновить выбор алгоритма консенсуса и выбора лидера, на таких как Raft или Paxos.
- Для гарантии обработки exactly once, рассмотрите реализацию механизма блокировки на координаторе.
- Возможно, использовать метаданные объектов или отдельные маркерные объекты для отслеживания состояния задачи.
- Подумайте, как обрабатывать сбои и гарантировать завершение задачи, даже если worker упадет.
Выше ожиданий:
- Реализация идемпотентных операций как резервной стратегии.
Обзор Архитектуры
- Общая архитектура
- Структура хранения в S3 для задач и управления состоянием
- Дизайн worker-а с REST API
- Механизм выбора лидера
- Рабочий процесс обработки задач
- Гарантия обработки задач ровно один раз (exactly-once)
- Устойчивость к сбоям и восстановление
┌─────────────────────┐
│ │
┌────────────▶│ S3 Store │◀───────────┐
| │ │ |
| └─────────────────────┘ |
| ▲ |
| │ |
| │ |
| | |
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ │ │ │ │ │
│ Worker Node │─────▶│ Leader Node │◀────│ Worker Node │
│ │ │ │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Структура Хранения в S3
Организуйте хранение в S3 следующим образом:
tasks-bucket/
tasks/
0000001/ # Каталог задачи с монотонно возрастающими именами
input.json # Входные данные задачи
metadata.json # Метаданные задачи, включая состояние
0000002/
...
locks/
task-0000001.lock # Файлы блокировок для задач
...
leader/
current.json # Информация о текущем лидере
heartbeats/
worker-1-timestamp.json # Жизненные циклы (heartbeat) worker-ов
Дополнительный бакет для хранения состояния узлов:
system-bucket/
worker-registry/ # Информация об активных worker-ах
election-data/ # Данные для выбора лидера
logs/ # Системные логи
Дизайн workerа
Каждый узел-worker состоит из:
- Сервиса c REST API (используя Flask/FastAPI/Django)
- Обработчика задач
- Код для выбора лидера среди workerов
- Механизма heartbeat
Эндпоинты REST API (пример):
GET /status
- Статус узла-workerаPOST /tasks/{task_id}/assign
- Назначение задачи узлу-workerуGET /tasks/{task_id}/status
- Получение статуса обработки задачиPOST /leader/heartbeat
- Отправка жизненного цикла лидера
Выбор Лидера
Реализуйте механизм выбора лидера, используя один из этих подходов:
-
Простой алгоритм выбора на основе отметки времени (ИЗБЕГАТЬ)
- Каждый worker записывает файл со своим ID и отметкой времени в папку
leader-election
- Worker с самой ранней отметкой времени становится лидером
- Простой, но уязвим к разделениям сети
- Каждый worker записывает файл со своим ID и отметкой времени в папку
-
Алгоритм Bully (OK)
- Worker-ы имеют уникальные ID
- Выбор инициируется, когда лидер считается умершим
- Worker-ы с более высоким ID могут вытеснить выбор у worker-ов с более низким ID
- Для контроля консистентности можно использовать метки S3 ETags
- Победитель объявляет себя лидером
-
Алгоритмы консенсуса Raft/Paxos (ХОРОШО)
- Реализуйте распределённый консенсус для выбора лидера
- Используйте библиотеки, такие как
python-raft
илиpysyncobj
- Выбор лидера в срок (term)
- Репликация логов для контроля состояния процесса
- Гарантия безопасности выбора
Рабочий процесс Обработки Задач
-
Создание задачи
- Задачи создаются в S3 с монотонно возрастающими именами каталогов
- Начальное состояние — "pending"
-
Обнаружение задач
- Лидер периодически сканирует новые задачи в состоянии "pending"
- Задачи упорядочиваются в очереди с приоритетом по ID задачи (монотонно возрастающий)
-
Назначение задачи
- Worker-ы обращаются к лидеру за назначением задачи через REST API
- Лидер назначает задачи доступным узлам-workerам
- Создаёт файл блокировки в S3, чтобы отметить задачу как "назначенная"
-
Выполнение задачи
- Worker обрабатывает задачу
- Обновляет состояние задачи на "processing", затем "completed"
- Записывает выходные данные в каталог задачи
-
Завершение задачи
- Worker уведомляет лидера о завершении
- Лидер удаляет блокировку и обновляет состояние задачи
Гарантия обработки задач ровно один раз
Реализуйте механизм блокировки со следующими функциями:
-
Блокировка задачи
- Перед обработкой создайте файл блокировки в S3, например:
locks/task-{task_id}.lock
- Включите ID узла worker-а и отметку времени в содержимое блокировки
- Попытайтесь прочитать блокировку перед обработкой — если она существует, задача уже взята
- Перед обработкой создайте файл блокировки в S3, например:
-
Проверка блокировки
- Worker-ы периодически проверяют свои блокировки
- Если блокировки нет, а задача обрабатывается, прервать обработку
-
Истечение срока действия блокировки
- Включите отметку времени истечения срока действия в блокировку
- Лидер может переназначить задачи с истекшими блокировками
Примерная структура файла блокировки:
{
"worker_id": "worker-123",
"timestamp": 1632150101,
"expiration": 1632153701, // Через 1 час позже
"task_id": "0000042"
}
Обработка падений и сбоев workerа
-
Механизм heartbeat
- Workerы отправляют периодические запросы heartbeat лидеру
- Лидер поддерживает реестр активных узлов worker-ов
- Если от worker-а heartbeat не поступил в течение установленного периода, узел worker считается вышедшим из строя
-
Переназначение задачи
- Лидер определяет задачи, назначенные вышедшим из строя узлам-workerам
- Проверяет истечение срока действия блокировки
- Переназначает задачи здоровым workerам
-
Идемпотентность обработки задач
- Проектируйте исполнение задач с поддержкой идемпотентности где это возможно
- Храните промежуточное состояние в каталоге задачи, если это необходимо
Обработка сбоев лидера
-
Жизненный цикл лидера (heartbeat)
- Лидер периодически записывает heartbeat в S3
- Worker-ы отслеживают heartbeat лидера через определенные промежутки времени
-
Триггер выбора нового лидера
- Если heartbeat лидера отсутствует в течение установленного периода
- Worker-ы инициируют выборы нового лидера
- Новый лидер принимает на себя назначение задач
-
Восстановление состояния задачи
- Новый лидер сканирует все задачи и их состояния
- Переназначает задачи в состоянии "assigned" или "processing" с истекшими блокировками
Реализация распределённого консенсуса (НЕОБЯЗАТЕЛЬНО)
Для обработки частичной недоступности workerов и сделать однозначные правила выбора лидера, то необходимо реализовать алгоритм нахождения консенсуса: