Эффективная по памяти загрузка CSV в PostgreSQL с обработк... | Вопросы для собеседования | Skilio
Эффективная по памяти загрузка CSV в PostgreSQL с обработкой ошибок
Вопрос:

Нужно спроектировать приложение на Python, которое считывает большой CSV-файл, валидирует и обрабатывает его строки. Так как весь файл не умещается в оперативную память целиком. И эффективно данные оттуда в PostgreSQL с помощью команды COPY.

Приложение должно обрабатывать потенциальные ошибки.

Подсказки:

  • Рассмотрите возможность использования встроенного в Python пакета csv с генератором для выдачи партий валидных строк
  • Реализуйте функцию проверки, которая фильтрует некорректные строки и регистрирует ошибки
  • Мы можем подождать некоторое время, когда вставка в пакет потерпит неудачу и попробовать снова через некоторое время
  • Для интеграции с PostgreSQL изучите метод copy_expert из psycopg2 с использованием StringIO
  • Подумайте, как структурировать код, чтобы разрешить настраиваемые правила проверки

Выше ожиданий:

  • Знание методов оптимизации производительности PostgreSQL COPY
  • Реализация транзакционного поведения для пакетной обработки
Ответ:

Обзор проектирования

Примерная архитектура похожа на паттерн конвейера:

  1. CSV-reader — Читает файл частями, не загружая его целиком в память.
  2. Validator — Фильтрует и регистрирует некорректные строки.
  3. Batch Manager — Собрать валидированные строки в управляемые пакеты.
  4. PostgreSQL Loader — Использует команду COPY для эффективной загрузки данных.
  5. Error Handler — Управляет повторами и транзакциями.
┌─────────────┐    ┌────────────┐    ┌───────────────┐    ┌────────────────┐
│  CSV Reader │───>│  Validator │───>│ Batch Manager │───>│ PostgreSQL     │
│ (Generator) │    │            │    │               │    │ Loader (COPY)  │
└─────────────┘    └────────────┘    └───────────────┘    └────────────────┘
       │                 │                   │                    │
       │                 │                   │                    │
       └─────────────────┴───────────────────┴────────────────────┘
                                    │
                            ┌───────────────┐
                            │ Error Handler │
                            └───────────────┘

Детальное проектирование компонентов или функций

Компонент CSV-reader

Используйте встроенный модуль csv Python с генераторами (yield) для поэтапного чтения файла:

  • Создайте функцию-генератор, которая открывает файл и возвращает строки по одной.
  • Добавьте обработку файлов с помощью контекстных менеджеров (with-оператор) для обеспечения освобождения ресурсов.
  • Используйте csv.reader или csv.DictReader, в зависимости от того, нужны ли имена столбцов.

Validator

  • Принимает строку в качестве входных данных и возвращает булево значение, указывающее на валидность.
  • Регистрирует подробную информацию об ошибках для некорректных строк.

Подходы к валидации:

  1. Простая валидация: Базовая проверка типов и валидация обязательных полей (OK).
  2. Валидация на основе схемы: Использование библиотек, таких как schema или pydantic (ХОРОШО).
  3. Пользовательские бизнес-правила: Специфичная для предметной области логика валидации (ХОРОШО).

Batch Manager

  • Накапливает и возвращает (yield) проверенные строки до достижения заданного размера пакета (ХОРОШО).
  • Обрабатывает частичный пакет без не прошедших проверку строк (ОК).

Компонент Загрузчик PostgreSQL

Используйте psycopg2 и команду COPY PostgreSQL:

  • Используйте copy_expert со StringIO для повышения эффективности использования памяти (ПОДСКАЗКА).
  • Используйте общую для всех батчей транзакцию для атомарности загрузки.
  • Используйте пулинг подключений для длительных процессов.

Компонент Обработчик ошибок

  • Exponential retry для временных ошибок базы данных (НЕОБЯЗАТЕЛЬНО).
  • Записывайте не прошедшие валидацию записи в "dead letter queue"
  • Ведите подробные журналы ошибок с контекстной информацией.
  • Разрешите откат транзакций и повторную попытку на уровне батча.

Детали реализации

Оптимизация производительности

Максимизируйте пропускную способность с помощью:

  • Подходящего размера пакетов для PostgreSQL COPY (обычно 5000-10000 строк).
  • Пулинга соединений для повторного использования подключений к базе данных.
  • Минимальных преобразований данных в конвейере обработки.
  • Параллельной валидации при необходимости.

Транзакционные свойства

Используйте внешнюю транзакцию для всех команд COPY, когда требуется загрузка "всё или ничего".

Примерная структура кода

def read_csv_in_chunks(file_path, batch_size=5000):
    with open(file_path, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        batch = []
        for row in reader:
            batch.append(row)
            if len(batch) >= batch_size:
                yield batch
                batch = []
        if batch:  # Не забывайте о последнем частичном пакете
            yield batch

def validate_row(row, validation_rules):
    for rule in validation_rules:
        if not rule(row):
            return False
    return True

def process_batches(file_path, db_conn, validation_rules, batch_size=5000):
    for batch in read_csv_in_chunks(file_path, batch_size):
        valid_rows = [row for row in batch if validate_row(row, validation_rules)]
        if valid_rows:
            load_to_postgres(db_conn, valid_rows)

Производительности PostgreSQL COPY (НЕОБЯЗАТЕЛЬНО)

Команда COPY превосходит INSERT при пакетных операциях, потому что:

  1. Она уменьшает издержки на разбор, обрабатывая данные большими пакетами.
  2. Она минимизирует издержки на транзакции, выполняя COMMIT один раз на пакет.
  3. Она обходит планировщик запросов для простых вставок.
  4. Она позволяет параллельную загрузку данных.

Дополнительная оптимизация COPY:

  • Установка work_mem параметра в подходящее значение для сессии импорта.
  • Использование таблиц без журнала для начальной загрузки, а затем копирование в обычные таблицы.
  • Удаление индексов перед массовой загрузкой и их последующее восстановление.
  • Увеличение maintenance_work_mem во время восстановления индексов.

Обработка ошибок

Варианты для некорректных строк:

  1. Хранение в отдельной таблице "errors" с контекстом ошибки (ХОРОШО).
  2. Запись в отдельный файл сообщений об ошибках для ручного ревью (ОК).
  3. Реализация отдельного конвейера для исправленных записей (ХОРОШО).
0
Python Средний Опубликовано
© Skilio, 2025
Условия использования
Политика конфиденциальности
Мы используем файлы cookie, для персонализации сервисов и повышения удобства пользования сайтом. Если вы не согласны на их использование, поменяйте настройки браузера.