Fix schema-qualified table names (e.g. dl.tabelle1)
SQLAlchemy requires schema and table name as separate arguments. Splitting target_table on '.' and passing schema= to Table/reflect/has_table. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -13,6 +13,14 @@ from .schema import build_columns
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _split_table(target_table: str) -> tuple[str | None, str]:
|
||||||
|
"""Split 'schema.table' into (schema, table). Returns (None, table) if no dot."""
|
||||||
|
if "." in target_table:
|
||||||
|
schema, table = target_table.split(".", 1)
|
||||||
|
return schema, table
|
||||||
|
return None, target_table
|
||||||
|
|
||||||
|
|
||||||
class Importer:
|
class Importer:
|
||||||
def __init__(self, config: ImportConfig):
|
def __init__(self, config: ImportConfig):
|
||||||
self.config = config
|
self.config = config
|
||||||
@@ -43,7 +51,7 @@ class Importer:
|
|||||||
truncate_sql = (
|
truncate_sql = (
|
||||||
f"DELETE FROM {cfg.target_table}"
|
f"DELETE FROM {cfg.target_table}"
|
||||||
if dialect == "sqlite"
|
if dialect == "sqlite"
|
||||||
else f"TRUNCATE TABLE {cfg.target_table}"
|
else f"TRUNCATE TABLE {cfg.target_table}" # schema.table is valid SQL here
|
||||||
)
|
)
|
||||||
conn.execute(text(truncate_sql))
|
conn.execute(text(truncate_sql))
|
||||||
rows = self._bulk_insert(conn, df, cfg.target_table)
|
rows = self._bulk_insert(conn, df, cfg.target_table)
|
||||||
@@ -56,11 +64,12 @@ class Importer:
|
|||||||
return rows
|
return rows
|
||||||
|
|
||||||
def _ensure_table(self, conn, df: pd.DataFrame, cfg: SheetConfig):
|
def _ensure_table(self, conn, df: pd.DataFrame, cfg: SheetConfig):
|
||||||
|
schema, table_name = _split_table(cfg.target_table)
|
||||||
insp = inspect(conn)
|
insp = inspect(conn)
|
||||||
if not insp.has_table(cfg.target_table):
|
if not insp.has_table(table_name, schema=schema):
|
||||||
meta = MetaData()
|
meta = MetaData()
|
||||||
cols = build_columns(df, cfg.columns, self.config.default_varchar_length)
|
cols = build_columns(df, cfg.columns, self.config.default_varchar_length)
|
||||||
table = Table(cfg.target_table, meta, *cols)
|
table = Table(table_name, meta, *cols, schema=schema)
|
||||||
meta.create_all(conn)
|
meta.create_all(conn)
|
||||||
logger.info("Created table %r", cfg.target_table)
|
logger.info("Created table %r", cfg.target_table)
|
||||||
|
|
||||||
@@ -68,9 +77,11 @@ class Importer:
|
|||||||
records = _df_to_records(df)
|
records = _df_to_records(df)
|
||||||
if not records:
|
if not records:
|
||||||
return 0
|
return 0
|
||||||
|
schema, tname = _split_table(table_name)
|
||||||
meta = MetaData()
|
meta = MetaData()
|
||||||
meta.reflect(bind=conn, only=[table_name])
|
meta.reflect(bind=conn, schema=schema, only=[tname])
|
||||||
table = meta.tables[table_name]
|
key = f"{schema}.{tname}" if schema else tname
|
||||||
|
table = meta.tables[key]
|
||||||
conn.execute(table.insert(), records)
|
conn.execute(table.insert(), records)
|
||||||
return len(records)
|
return len(records)
|
||||||
|
|
||||||
@@ -80,9 +91,11 @@ class Importer:
|
|||||||
if not records:
|
if not records:
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
schema, tname = _split_table(cfg.target_table)
|
||||||
meta = MetaData()
|
meta = MetaData()
|
||||||
meta.reflect(bind=conn, only=[cfg.target_table])
|
meta.reflect(bind=conn, schema=schema, only=[tname])
|
||||||
table = meta.tables[cfg.target_table]
|
key = f"{schema}.{tname}" if schema else tname
|
||||||
|
table = meta.tables[key]
|
||||||
|
|
||||||
if dialect == "postgresql":
|
if dialect == "postgresql":
|
||||||
stmt = pg_insert(table).values(records)
|
stmt = pg_insert(table).values(records)
|
||||||
|
|||||||
Reference in New Issue
Block a user