Нужно спроектировать приложение на Python, которое считывает большой CSV-файл, валидирует и обрабатывает его строки. Так как весь файл не умещается в оперативную память целиком. И эффективно данные оттуда в PostgreSQL с помощью команды COPY.
Приложение должно обрабатывать потенциальные ошибки.
Подсказки:
- Рассмотрите возможность использования встроенного в Python пакета
csv
с генератором для выдачи партий валидных строк - Реализуйте функцию проверки, которая фильтрует некорректные строки и регистрирует ошибки
- Мы можем подождать некоторое время, когда вставка в пакет потерпит неудачу и попробовать снова через некоторое время
- Для интеграции с PostgreSQL изучите метод
copy_expert
изpsycopg2
с использованием StringIO - Подумайте, как структурировать код, чтобы разрешить настраиваемые правила проверки
Выше ожиданий:
- Знание методов оптимизации производительности PostgreSQL COPY
- Реализация транзакционного поведения для пакетной обработки
Обзор проектирования
Примерная архитектура похожа на паттерн конвейера:
- CSV-reader — Читает файл частями, не загружая его целиком в память.
- Validator — Фильтрует и регистрирует некорректные строки.
- Batch Manager — Собрать валидированные строки в управляемые пакеты.
- PostgreSQL Loader — Использует команду COPY для эффективной загрузки данных.
- Error Handler — Управляет повторами и транзакциями.
┌─────────────┐ ┌────────────┐ ┌───────────────┐ ┌────────────────┐
│ CSV Reader │───>│ Validator │───>│ Batch Manager │───>│ PostgreSQL │
│ (Generator) │ │ │ │ │ │ Loader (COPY) │
└─────────────┘ └────────────┘ └───────────────┘ └────────────────┘
│ │ │ │
│ │ │ │
└─────────────────┴───────────────────┴────────────────────┘
│
┌───────────────┐
│ Error Handler │
└───────────────┘
Детальное проектирование компонентов или функций
Компонент CSV-reader
Используйте встроенный модуль csv
Python с генераторами (yield
) для поэтапного чтения файла:
- Создайте функцию-генератор, которая открывает файл и возвращает строки по одной.
- Добавьте обработку файлов с помощью контекстных менеджеров (
with
-оператор) для обеспечения освобождения ресурсов. - Используйте
csv.reader
илиcsv.DictReader
, в зависимости от того, нужны ли имена столбцов.
Validator
- Принимает строку в качестве входных данных и возвращает булево значение, указывающее на валидность.
- Регистрирует подробную информацию об ошибках для некорректных строк.
Подходы к валидации:
- Простая валидация: Базовая проверка типов и валидация обязательных полей (OK).
- Валидация на основе схемы: Использование библиотек, таких как
schema
илиpydantic
(ХОРОШО). - Пользовательские бизнес-правила: Специфичная для предметной области логика валидации (ХОРОШО).
Batch Manager
- Накапливает и возвращает (
yield
) проверенные строки до достижения заданного размера пакета (ХОРОШО). - Обрабатывает частичный пакет без не прошедших проверку строк (ОК).
Компонент Загрузчик PostgreSQL
Используйте psycopg2
и команду COPY PostgreSQL:
- Используйте
copy_expert
соStringIO
для повышения эффективности использования памяти (ПОДСКАЗКА). - Используйте общую для всех батчей транзакцию для атомарности загрузки.
- Используйте пулинг подключений для длительных процессов.
Компонент Обработчик ошибок
- Exponential retry для временных ошибок базы данных (НЕОБЯЗАТЕЛЬНО).
- Записывайте не прошедшие валидацию записи в "dead letter queue"
- Ведите подробные журналы ошибок с контекстной информацией.
- Разрешите откат транзакций и повторную попытку на уровне батча.
Детали реализации
Оптимизация производительности
Максимизируйте пропускную способность с помощью:
- Подходящего размера пакетов для PostgreSQL COPY (обычно 5000-10000 строк).
- Пулинга соединений для повторного использования подключений к базе данных.
- Минимальных преобразований данных в конвейере обработки.
- Параллельной валидации при необходимости.
Транзакционные свойства
Используйте внешнюю транзакцию для всех команд COPY
, когда требуется загрузка "всё или ничего".
Примерная структура кода
def read_csv_in_chunks(file_path, batch_size=5000):
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
batch = []
for row in reader:
batch.append(row)
if len(batch) >= batch_size:
yield batch
batch = []
if batch: # Не забывайте о последнем частичном пакете
yield batch
def validate_row(row, validation_rules):
for rule in validation_rules:
if not rule(row):
return False
return True
def process_batches(file_path, db_conn, validation_rules, batch_size=5000):
for batch in read_csv_in_chunks(file_path, batch_size):
valid_rows = [row for row in batch if validate_row(row, validation_rules)]
if valid_rows:
load_to_postgres(db_conn, valid_rows)
Производительности PostgreSQL COPY (НЕОБЯЗАТЕЛЬНО)
Команда COPY превосходит INSERT при пакетных операциях, потому что:
- Она уменьшает издержки на разбор, обрабатывая данные большими пакетами.
- Она минимизирует издержки на транзакции, выполняя COMMIT один раз на пакет.
- Она обходит планировщик запросов для простых вставок.
- Она позволяет параллельную загрузку данных.
Дополнительная оптимизация COPY:
- Установка
work_mem
параметра в подходящее значение для сессии импорта. - Использование таблиц без журнала для начальной загрузки, а затем копирование в обычные таблицы.
- Удаление индексов перед массовой загрузкой и их последующее восстановление.
- Увеличение
maintenance_work_mem
во время восстановления индексов.
Обработка ошибок
Варианты для некорректных строк:
- Хранение в отдельной таблице "errors" с контекстом ошибки (ХОРОШО).
- Запись в отдельный файл сообщений об ошибках для ручного ревью (ОК).
- Реализация отдельного конвейера для исправленных записей (ХОРОШО).