Conversation
440ae37 to
faf2b84
Compare
|
Suggestion: convert to DRAFT until you're done pushing to the branch |
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
59cb10f to
5c9734c
Compare
5c9734c to
037e92c
Compare
|
To make review easier, I'm going to split this into two PRs. I'll keep this one open, and chop it off after commit 7, Clean up SQL writer memory optimization spec. |
037e92c to
bf574fb
Compare
dannyroberts
left a comment
There was a problem hiding this comment.
I had one substantive comment that I think will help a lot with the readability of one section, but otherwise looks good!
| type incompatibility). | ||
| 2. If so, call `make_table_compatible()` for the failing row, then retry the | ||
| entire batch. Only do this **once per batch** -- if the retry also fails, | ||
| raise the original exception. |
There was a problem hiding this comment.
If there are multiple rows in the same batch that have different schema mismatches, will the schema be updated for all rows in the batch that fail before retrying, or will it keep retrying until there are no more rows failing? The way it's written here it almost sounds like it'll try once, but if it fails on another row it'll just give up.
If the error we get back can only tell us about one row at a time, it almost seems better to just call make_table_compatible() for all 1000 rows and then retry once than to fix one, retry, and fail.
| table, | ||
| row_dict, | ||
| data_type_dict, | ||
| ) |
There was a problem hiding this comment.
Okay this answers my earlier question on the spec.
| assert table is not None # So that mypy knows it's a Table | ||
| if not schema_check_complete: | ||
| schema_check_complete = True | ||
| self._commit() |
There was a problem hiding this comment.
Will this work if there are exactly SCHEMA_CHECK_ROWS rows? Looking at the rows after the loop, I think so? But it's just a tad hard to follow.
| "Schema check complete for table '%s'. Final columns: %s", | ||
| table_name, | ||
| [c.name for c in table.columns], | ||
| ) |
There was a problem hiding this comment.
I wonder if we can do something nice with itertools that makes this code flow more naturally. Right now there's a lot of iteration accounting with each step doing conditional cleanup for the last step and/or lookahead for the next in a way that's a bit hard to follow.
I'm thinking something like this:
if table_spec.rows:
# ... create_table(...) ...
row_stream = iter(table_spec.rows)
for row in itertools.islice(row_stream, SCHEMA_CHECK_ROWS):
table = self.make_table_compatible...
self.upsert...
logger.debug("Schema check complete...
for batch in itertools.batched(row_stream, BATCH_SIZE):
self._flush_batch(table, batch, ...I think that's it—I think there'd then be no cleanup/commit/flush necessary afterwards.
e352658 to
f7fba6e
Compare
jingcheng16
left a comment
There was a problem hiding this comment.
No blocking comments.
| sqlalchemy.exc.CompileError, | ||
| sqlalchemy.exc.OperationalError, | ||
| sqlalchemy.exc.ProgrammingError, | ||
| ): |
There was a problem hiding this comment.
Is this all possible exceptions when there is a incompatibility?
| "Schema check complete for table '%s'. Final columns: %s", | ||
| table_name, |
There was a problem hiding this comment.
Can we be explicit that this is just a schema check for the first SCHEMA_CHECK_ROWS rows? This might be helpful for people who didn't read the code and just read the log.
| batch_keys = { | ||
| k for row_dict in batch | ||
| for k, v in row_dict.items() | ||
| if v is not None | ||
| } |
There was a problem hiding this comment.
I find this set comprehension hard to read...maybe it is just me...
There was a problem hiding this comment.
AI comment:
The old upsert() (writers.py:585) does:
row_dict = {col: val for col, val in row_dict.items() if val is not None}
# then UPDATE ... SET (only the non-None cols)
So an existing row's columns that aren't in row_dict are preserved.
The new bulk_upsert() builds batch_keys as the union of non-None keys across the whole batch, then in PG iterates stmt.excluded and in MySQL iterates table.columns to construct the SET clause. Worth verifying whether columns that exist in the table but were dropped from batch_keys (because every row in that batch had None for them) end up getting clobbered to NULL on update. Old code wouldn't have touched them; new code might.
Thoughts? Sounds like a real concern
dannyroberts
left a comment
There was a problem hiding this comment.
Very nice! The itertools refactor is so very much easier to follow! Left one small thought on the same topic, but totally up to you—looks good as is.
| rows = (dict(zip(headings, row)) for row in table_spec.rows) | ||
| first_row = next(rows, None) | ||
| if first_row is None: | ||
| return | ||
| row_stream = itertools.chain([first_row], rows) | ||
|
|
||
| table = self.get_table(table_name) | ||
| if table is None: | ||
| table = self.create_table(table_name, first_row, data_type_dict) |
There was a problem hiding this comment.
Nit: Would the following work? I think the only difference would be that self.make_table_compatible would not be called on the first row, but that might be okay (just based on the names of the functions) if create_table is already taking into account first_row's schema? I guess technically we'd also need SCHEMA_CHECK_ROWS - 1 below. What I like about this is that then we're literally doing one full clean sweep through the data, no lookahead or backtracking, so there's no data flow to keep in your head. Process first row. Process next N rows. Process rest.
Not a big deal at all; the way you have it is not much harder to understand.
| rows = (dict(zip(headings, row)) for row in table_spec.rows) | |
| first_row = next(rows, None) | |
| if first_row is None: | |
| return | |
| row_stream = itertools.chain([first_row], rows) | |
| table = self.get_table(table_name) | |
| if table is None: | |
| table = self.create_table(table_name, first_row, data_type_dict) | |
| row_stream = (dict(zip(headings, row)) for row in table_spec.rows) | |
| first_row = next(row_stream, None) | |
| if first_row is None: | |
| return | |
| table = self.get_table(table_name) | |
| if table is None: | |
| table = self.create_table(table_name, first_row, data_type_dict) | |
| self.upsert(table, first_row) |
There was a problem hiding this comment.
I think the only difference would be that self.make_table_compatible would not be called on the first row.
Yes, and neither would self.upsert (on the next line), so the first row would not end up in the database.
(They might not be the first rows in the table. They are the first rows in the export.)
- sqlalchemy.exc.CompileError — raised at statement-compile time, before the DB sees anything. Triggered by things like stmt.excluded/stmt.inserted referencing a column that isn't in the table metadata, or a type that can't be rendered for the dialect. - sqlalchemy.exc.OperationalError — DB-side. For schema mismatches this is mostly the MySQL path: "Unknown column 'x' in 'field list'", and also things like "Data too long for column" on some MySQL versions/drivers. (It also covers non-schema issues like deadlocks/timeouts, which the retry won't actually fix.) - sqlalchemy.exc.ProgrammingError — DB-side. Postgres/MSSQL "column … does not exist", "relation does not exist", and similar DDL-shape errors come through here. - sqlalchemy.exc.DataError — DB-side. It's the sibling DBAPI subclass for "value doesn't fit the column" — string longer than VARCHAR(n), number out of range for the integer type, invalid date, numeric overflow, etc. On Postgres this is exactly how value too long for type character varying(N) and numeric field overflow arrive; psycopg2 raises StringDataRightTruncation / NumericValueOutOfRange, which SQLAlchemy maps to DataError.
Wrap the conflict-update SET clause in COALESCE so that a None in the inserted row leaves the existing column value untouched, matching the per-row upsert() which strips Nones before building the UPDATE. Restrict update_cols to batch_keys so columns where every row in the batch is None are not touched at all. Iterate over table.columns and filter by batch_keys instead of using batch_keys directly, since a new column in batch_keys is not yet on the table and table.c[name] raises KeyError. Letting the INSERT fail on the missing column lets _flush_batch retry after schema repair. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Newer mypy with disallow_any_generics flags untyped dict/list literals assigned at module scope. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Optimizes
SqlTableWriter:make_table_compatible()on every row, only runs on the first 10 (SCHEMA_CHECK_ROWSconst) rows.MetaDataobject growth.