Initial implementation of generic Excel-to-DB import tool

Supports .xls and .xlsx, Oracle and PostgreSQL via SQLAlchemy.
Includes CLI (run/inspect/generate-config), YAML config, auto schema
detection, and append/replace/upsert modes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-13 11:31:47 +02:00
commit 8f7399de58
26 changed files with 663 additions and 0 deletions
+72
View File
@@ -0,0 +1,72 @@
from __future__ import annotations
import pandas as pd
from sqlalchemy import (
Column, Integer, Float, String, DateTime, Date, Boolean, Numeric, Text
)
from .config import ColumnMapping
def _pandas_dtype_to_sqla(series: pd.Series, varchar_length: int):
dtype = series.dtype
if pd.api.types.is_bool_dtype(dtype):
return Boolean()
if pd.api.types.is_integer_dtype(dtype):
return Integer()
if pd.api.types.is_float_dtype(dtype):
return Float()
if pd.api.types.is_datetime64_any_dtype(dtype):
return DateTime()
# object columns: check if they look like dates
if dtype == object:
sample = series.dropna().head(100)
if len(sample) > 0:
try:
pd.to_datetime(sample)
return DateTime()
except Exception:
pass
max_len = int(series.dropna().astype(str).str.len().max()) if len(series.dropna()) > 0 else 1
return String(max(max_len + 10, varchar_length))
return Text()
def _override_to_sqla(dtype_str: str):
"""Convert a user-supplied type string like 'VARCHAR(100)' to a SQLAlchemy type."""
s = dtype_str.upper().strip()
if s.startswith("VARCHAR"):
length = int(s.split("(")[1].rstrip(")")) if "(" in s else 255
return String(length)
if s in ("TEXT", "CLOB"):
return Text()
if s in ("INTEGER", "INT", "NUMBER"):
return Integer()
if s.startswith("NUMBER") or s.startswith("NUMERIC") or s.startswith("DECIMAL"):
if "(" in s:
parts = s.split("(")[1].rstrip(")").split(",")
p, sc = int(parts[0]), int(parts[1]) if len(parts) > 1 else 0
return Numeric(precision=p, scale=sc)
return Numeric()
if s in ("FLOAT", "REAL", "DOUBLE"):
return Float()
if s in ("DATETIME", "TIMESTAMP"):
return DateTime()
if s == "DATE":
return Date()
if s in ("BOOLEAN", "BOOL"):
return Boolean()
raise ValueError(f"Unknown dtype override: {dtype_str!r}")
def build_columns(df: pd.DataFrame, column_configs: list[ColumnMapping], varchar_length: int) -> list[Column]:
override_map = {c.target or c.source: c.dtype for c in column_configs if c.dtype and not c.skip}
columns = []
for col in df.columns:
col_name = str(col)
if col_name in override_map and override_map[col_name]:
sqla_type = _override_to_sqla(override_map[col_name])
else:
sqla_type = _pandas_dtype_to_sqla(df[col], varchar_length)
columns.append(Column(col_name, sqla_type))
return columns