Distributed task processing with exactly-once guarantee an... | Вопросы для собеседования | Skilio
Distributed task processing with exactly-once guarantee and S3
Вопрос:

Необходимо спроектировать распределенную систему обработки задач (данных) на Python, где задачи хранятся в S3.

Можно использовать только S3, без развертывания или использования внешних сервисов. Вы можете свободно выбирать сторонние библиотеки. Мы можем упростить задачу — имена задач монотонно возрастают.

Решение должно поддерживать обработку задач ровно один раз (exactly once), учитывая отсутствие поддержки транзакций в S3? Стоимость запросов к S3 не имеет значения.

Подсказки:

  • Каждый worker может быть избран лидером и реализует REST API для других рабочих.
  • Протокол выбора лидера может помочь в координации worker-ов в распределении задач как их координатор.
  • Не использовать S3 Etag для выбора лидера и блокировки.
  • Можно оставновить выбор алгоритма консенсуса и выбора лидера, на таких как Raft или Paxos.
  • Для гарантии обработки exactly once, рассмотрите реализацию механизма блокировки на координаторе.
  • Возможно, использовать метаданные объектов или отдельные маркерные объекты для отслеживания состояния задачи.
  • Подумайте, как обрабатывать сбои и гарантировать завершение задачи, даже если worker упадет.

Выше ожиданий:

  • Реализация идемпотентных операций как резервной стратегии.
Ответ:

Обзор Архитектуры

  1. Общая архитектура
  2. Структура хранения в S3 для задач и управления состоянием
  3. Дизайн worker-а с REST API
  4. Механизм выбора лидера
  5. Рабочий процесс обработки задач
  6. Гарантия обработки задач ровно один раз (exactly-once)
  7. Устойчивость к сбоям и восстановление
                       ┌─────────────────────┐
                       │                     │
         ┌────────────▶│       S3 Store      │◀───────────┐
         |             │                     │            |
         |             └─────────────────────┘            |
         |                       ▲                        |
         |                       │                        |
         |                       │                        |
         |                       |                        |
┌─────────────────┐      ┌─────────────────┐     ┌─────────────────┐
│                 │      │                 │     │                 │
│  Worker Node    │─────▶│  Leader Node    │◀────│  Worker Node    │
│                 │      │                 │     │                 │
└─────────────────┘      └─────────────────┘     └─────────────────┘

Структура Хранения в S3

Организуйте хранение в S3 следующим образом:

tasks-bucket/
  tasks/
    0000001/  # Каталог задачи с монотонно возрастающими именами
      input.json  # Входные данные задачи
      metadata.json  # Метаданные задачи, включая состояние
    0000002/
      ...
  locks/
    task-0000001.lock  # Файлы блокировок для задач
    ...
  leader/
    current.json  # Информация о текущем лидере
    heartbeats/
      worker-1-timestamp.json  # Жизненные циклы (heartbeat) worker-ов

Дополнительный бакет для хранения состояния узлов:

system-bucket/
  worker-registry/  # Информация об активных worker-ах
  election-data/    # Данные для выбора лидера
  logs/             # Системные логи

Дизайн workerа

Каждый узел-worker состоит из:

  1. Сервиса c REST API (используя Flask/FastAPI/Django)
  2. Обработчика задач
  3. Код для выбора лидера среди workerов
  4. Механизма heartbeat

Эндпоинты REST API (пример):

  • GET /status - Статус узла-workerа
  • POST /tasks/{task_id}/assign - Назначение задачи узлу-workerу
  • GET /tasks/{task_id}/status - Получение статуса обработки задачи
  • POST /leader/heartbeat - Отправка жизненного цикла лидера

Выбор Лидера

Реализуйте механизм выбора лидера, используя один из этих подходов:

  1. Простой алгоритм выбора на основе отметки времени (ИЗБЕГАТЬ)

    • Каждый worker записывает файл со своим ID и отметкой времени в папку leader-election
    • Worker с самой ранней отметкой времени становится лидером
    • Простой, но уязвим к разделениям сети
  2. Алгоритм Bully (OK)

    • Worker-ы имеют уникальные ID
    • Выбор инициируется, когда лидер считается умершим
    • Worker-ы с более высоким ID могут вытеснить выбор у worker-ов с более низким ID
    • Для контроля консистентности можно использовать метки S3 ETags
    • Победитель объявляет себя лидером
  3. Алгоритмы консенсуса Raft/Paxos (ХОРОШО)

    • Реализуйте распределённый консенсус для выбора лидера
    • Используйте библиотеки, такие как python-raft или pysyncobj
    • Выбор лидера в срок (term)
    • Репликация логов для контроля состояния процесса
    • Гарантия безопасности выбора

Рабочий процесс Обработки Задач

  1. Создание задачи

    • Задачи создаются в S3 с монотонно возрастающими именами каталогов
    • Начальное состояние — "pending"
  2. Обнаружение задач

    • Лидер периодически сканирует новые задачи в состоянии "pending"
    • Задачи упорядочиваются в очереди с приоритетом по ID задачи (монотонно возрастающий)
  3. Назначение задачи

    • Worker-ы обращаются к лидеру за назначением задачи через REST API
    • Лидер назначает задачи доступным узлам-workerам
    • Создаёт файл блокировки в S3, чтобы отметить задачу как "назначенная"
  4. Выполнение задачи

    • Worker обрабатывает задачу
    • Обновляет состояние задачи на "processing", затем "completed"
    • Записывает выходные данные в каталог задачи
  5. Завершение задачи

    • Worker уведомляет лидера о завершении
    • Лидер удаляет блокировку и обновляет состояние задачи

Гарантия обработки задач ровно один раз

Реализуйте механизм блокировки со следующими функциями:

  1. Блокировка задачи

    • Перед обработкой создайте файл блокировки в S3, например: locks/task-{task_id}.lock
    • Включите ID узла worker-а и отметку времени в содержимое блокировки
    • Попытайтесь прочитать блокировку перед обработкой — если она существует, задача уже взята
  2. Проверка блокировки

    • Worker-ы периодически проверяют свои блокировки
    • Если блокировки нет, а задача обрабатывается, прервать обработку
  3. Истечение срока действия блокировки

    • Включите отметку времени истечения срока действия в блокировку
    • Лидер может переназначить задачи с истекшими блокировками

Примерная структура файла блокировки:

{
  "worker_id": "worker-123",
  "timestamp": 1632150101,
  "expiration": 1632153701,  // Через 1 час позже
  "task_id": "0000042"
}

Обработка падений и сбоев workerа

  1. Механизм heartbeat

    • Workerы отправляют периодические запросы heartbeat лидеру
    • Лидер поддерживает реестр активных узлов worker-ов
    • Если от worker-а heartbeat не поступил в течение установленного периода, узел worker считается вышедшим из строя
  2. Переназначение задачи

    • Лидер определяет задачи, назначенные вышедшим из строя узлам-workerам
    • Проверяет истечение срока действия блокировки
    • Переназначает задачи здоровым workerам
  3. Идемпотентность обработки задач

    • Проектируйте исполнение задач с поддержкой идемпотентности где это возможно
    • Храните промежуточное состояние в каталоге задачи, если это необходимо

Обработка сбоев лидера

  1. Жизненный цикл лидера (heartbeat)

    • Лидер периодически записывает heartbeat в S3
    • Worker-ы отслеживают heartbeat лидера через определенные промежутки времени
  2. Триггер выбора нового лидера

    • Если heartbeat лидера отсутствует в течение установленного периода
    • Worker-ы инициируют выборы нового лидера
    • Новый лидер принимает на себя назначение задач
  3. Восстановление состояния задачи

    • Новый лидер сканирует все задачи и их состояния
    • Переназначает задачи в состоянии "assigned" или "processing" с истекшими блокировками

Реализация распределённого консенсуса (НЕОБЯЗАТЕЛЬНО)

Для обработки частичной недоступности workerов и сделать однозначные правила выбора лидера, то необходимо реализовать алгоритм нахождения консенсуса:

0
Python Старший Опубликовано
© Skilio, 2025
Условия использования
Политика конфиденциальности
Мы используем файлы cookie, для персонализации сервисов и повышения удобства пользования сайтом. Если вы не согласны на их использование, поменяйте настройки браузера.