-
Notifications
You must be signed in to change notification settings - Fork 469
Description
dlt version
1.22.1
Describe the problem
When using the scd2 merge strategy with the SQLAlchemy destination against an MSSQL backend, the generated INSERT/SELECT statement has a column order mismatch that causes a type error.
In SqlalchemyMergeFollowupJob.gen_scd2_sql the INSERT column list is built from root_table_obj.columns (which reflects the destination table column order, with _dlt_valid_from and _dlt_valid_to at the end), but the SELECT prepends the _dlt_valid_from and _dlt_valid_to literals first:
sa.select(
sa.literal(boundary_literal.strip("'")).label(from_), # prepended first
sa.literal(...).label(to), # prepended second
*[c for c in staging_root_table_obj.columns if c.name not in [from_, to]],
)This means every value is shifted β the timestamp literal lands in the wrong column, and by the time SQL Server reaches the actual _dlt_valid_from column in the INSERT list it receives a bigint (_dlt_load_id), producing:
Operand type clash: bigint is incompatible with date
Expected behavior
The SELECT column order should match the INSERT column order so that values land in the correct columns regardless of where _dlt_valid_from and _dlt_valid_to appear in the destination table schema.
Steps to reproduce
- Use the SQLAlchemy destination with an MSSQL connection string
- Define a resource with
write_disposition={"disposition": "merge", "strategy": "scd2"} - Run the pipeline
- The load job will fail with
Operand type clash: bigint is incompatible with date
Operating system
Windows/Linux
Runtime environment
Local / Self-Hosted
Python version
3.12
dlt data source
REST API β https://api.delaget.com/swagger/index.html
dlt destination
SQLAlchemy (dlt.destinations.impl.sqlalchemy) with MSSQL/pyodbc backend
Other deployment details
Does not reproduce with DuckDB backend β the base gen_scd2_sql in SqlMergeFollowupJob is correct. The bug is specific to the SQLAlchemy destination's override.
Additional information
The fix is to build the SELECT by iterating root_table_obj.columns in order and substituting literals only for from_ and to in their correct positions:
select_cols = []
for col in root_table_obj.columns:
if col.name == from_:
select_cols.append(sa.literal(boundary_literal.strip("'")).label(from_))
elif col.name == to:
select_cols.append(sa.literal(
active_record_literal.strip("'") if active_record_literal is not None else None
).label(to))
else:
select_cols.append(staging_root_table_obj.c[col.name])Metadata
Metadata
Assignees
Labels
Type
Projects
Status