from __future__ import annotations import pandas as pd from sqlalchemy import ( Column, Integer, Float, String, DateTime, Date, Boolean, Numeric, Text ) from .config import ColumnMapping def _pandas_dtype_to_sqla(series: pd.Series, varchar_length: int): dtype = series.dtype if pd.api.types.is_bool_dtype(dtype): return Boolean() if pd.api.types.is_integer_dtype(dtype): return Integer() if pd.api.types.is_float_dtype(dtype): return Float() if pd.api.types.is_datetime64_any_dtype(dtype): return DateTime() # object columns: check if they look like dates if dtype == object: sample = series.dropna().head(100) if len(sample) > 0: try: pd.to_datetime(sample) return DateTime() except Exception: pass max_len = int(series.dropna().astype(str).str.len().max()) if len(series.dropna()) > 0 else 1 return String(max(max_len + 10, varchar_length)) return Text() def _override_to_sqla(dtype_str: str): """Convert a user-supplied type string like 'VARCHAR(100)' to a SQLAlchemy type.""" s = dtype_str.upper().strip() if s.startswith("VARCHAR"): length = int(s.split("(")[1].rstrip(")")) if "(" in s else 255 return String(length) if s in ("TEXT", "CLOB"): return Text() if s in ("INTEGER", "INT", "NUMBER"): return Integer() if s.startswith("NUMBER") or s.startswith("NUMERIC") or s.startswith("DECIMAL"): if "(" in s: parts = s.split("(")[1].rstrip(")").split(",") p, sc = int(parts[0]), int(parts[1]) if len(parts) > 1 else 0 return Numeric(precision=p, scale=sc) return Numeric() if s in ("FLOAT", "REAL", "DOUBLE"): return Float() if s in ("DATETIME", "TIMESTAMP"): return DateTime() if s == "DATE": return Date() if s in ("BOOLEAN", "BOOL"): return Boolean() raise ValueError(f"Unknown dtype override: {dtype_str!r}") def build_columns(df: pd.DataFrame, column_configs: list[ColumnMapping], varchar_length: int) -> list[Column]: override_map = {c.target or c.source: c.dtype for c in column_configs if c.dtype and not c.skip} columns = [] for col in df.columns: col_name = str(col) if col_name in override_map and override_map[col_name]: sqla_type = _override_to_sqla(override_map[col_name]) else: sqla_type = _pandas_dtype_to_sqla(df[col], varchar_length) columns.append(Column(col_name, sqla_type)) return columns