-
Notifications
You must be signed in to change notification settings - Fork 0
TIMX 529 - read methods utilize SQL + metadata context #161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Why these changes are being introduced: TIMDEXDatasetMetadata (TDM) has a DuckDB context for metadata attachments and views. TIMDEXDataset (TD) will need one for DuckDB queries that return actual ETL data, not just metadata. How this addresses that need: * TD reuses the TDM.conn DuckDB connection and builds upon it * TDM DuckDB connection builds metadata related views in a "metadata" schema * TD will build views in a "data" schema Side effects of this change: * All TDM metadata views are under a "metadata" schema Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-529
Pull Request Test Coverage Report for Build 16942404938Details
💛 - Coveralls |
Why these changes are being introduced: This commit is a culmination of work to elevate metadata about ETL records to the point it can be used to improve the speed and efficiency of data queries. While the signature of the read methods will remain mostly the same, it exposes a 'where' clause that accepts raw SQL to filter the results, allowing for more advanced querying beyond the simple key/value DatasetFilters. Additionally, and equally important, data retrieval is now coming directly from DuckDB instead of more low level pyarrow dataset reads. Overall complexity remains about the same, but we have shifted focus into DuckDB table and view preperation and SQL construction, which also pays dividends in other contexts. It it anticipated this will set us up well for other data we may add to the TIMDEX dataset, e.g. vector embeddings or fulltext, which we may want to query and retrieve. How this addresses that need: As before, all read methods eventually call TIMDEXDataset.read_batches_iter() which now performs a two-part process of first quickly querying metadata records, then using that information to prune heavier data retrieved. SQLAlchemy is used to provide model DuckDB tables and views such that we can preserve the simpler key/value DatasetFilters, e.g. source='libguides' or run_type='daily', which will likely represent the majority of the public API needs by converting those key/value pairs into a SQL WHERE clause programatically. This is done without the need for complex string interpolation and escaping. The overall input and output signatures are largely the same, but the underlying approach to querying the ETL parquet records now utilizes DuckDB much more heavily, while also providing a SQL 'escape hatch' if the keyword filters don't suffice. Side effects of this change: * None! Transmog and TIM can call TDA in the same way as before. The underlying approach is different, but the signatures are mostly the same. Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-529
Why these changes are being introduced: During the refactor to use dataset metadata for querying, we had to temporarily skip tests that tested for dataset filtering and current records limiting. With the SQL backed querying now in place, these tests can be reinstated. Note that a future commit will likely *add* a couple more tests for the new, optional 'WHERE' clause functionality. How this addresses that need: * No tests are skipped. * Dataset filtering tests still remain, but the key/value filters are just handled differently under the hood. * Tests for current records no longer use .load(current_records=True) but instead utilize the DuckDB table via table='current_records' within a read method. Side effects of this change: * None Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-529
e1762ec to
262a910
Compare
| @pytest.fixture(scope="module") | ||
| def timdex_dataset_multi_source(tmp_path_factory) -> TIMDEXDataset: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Focusing the comment here, but applies elsewhere too.
Two things are happening here:
-
By setting
scope="module"this fixture is only created once per testing file, e.g. all tests intest_read.pywill get the same object. This greatly reduces testing time. -
We can only do this if a fixture does not extend any function-level scope (default). Apparently
tmp_pathis a function level convenience fixture. Usingtmp_path_factoryis a workaround to that.
| conn = self.metadata.conn | ||
|
|
||
| # create data schema | ||
| conn.execute("""create schema data;""") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, TIMDEXDataset does not create any tables or views in the data schema, but I think it provides some nice symmetry with TIMDEXDatasetMetadata creating a metadata schema. It establishes a pattern that anything utilizing this shared DuckDB context should create schemas to encapsulate their tables/views. I think this will be useful when we get into storing vector embeddings and fulltext in the TIMDEX dataset.
| for i, meta_chunk_df in enumerate(self._iter_meta_chunks(meta_df)): | ||
| batch_time = time.perf_counter() | ||
| batch_yield_count = len(meta_chunk_df) | ||
| total_yield_count += batch_yield_count | ||
|
|
||
| if batch_yield_count == 0: | ||
| continue | ||
|
|
||
| self.conn.register("meta_chunk", meta_chunk_df) | ||
| data_query = self._build_data_query_for_chunk( | ||
| columns, | ||
| meta_chunk_df, | ||
| registered_metadata_chunk="meta_chunk", | ||
| ) | ||
| yield from self._stream_data_query_batches(data_query) | ||
| self.conn.unregister("meta_chunk") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is perhaps the most key part of these changes.
We go into this code block with a meta_df dataframe that contains metadata about all the rows we want to return from this read method. Instead of a single DuckDB query, we batch the metadata into chunks and perform smaller queries. Very often this batch may only contain a single parquet file, so the reading + filtering will be very quick.
To get that metadata "inside" the DuckDB context to join on and use, we register it as a temporary view/table. Then we perform the data query + joining on metadata. Then after yielding the data, we de-register that metadata chunk for the next iteration.
The larger the metadata chunk we register --> join --> yield, the faster, but the more memory is required. In some fairly substantial testing it feels like the current approach will yield records about as fast as we could ever do anything with them.
| def build_meta_query( | ||
| self, table: str, where: str | None, **filters: Unpack["DatasetFilters"] | ||
| ) -> str: | ||
| """Build SQL query using SQLAlchemy against metadata schema tables and views.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is admiteddly pretty dense, but it's an operation that is common to many applications where you're translating key/value style arguments into a SQL-like dialect.
The price of those key/value filters -- which will likely be the most common way the dataset is filtered -- is the complexity here of converting that into a SQL WHERE clause.
While SQLAlchemy reflection + query building may seem at a glance more complicated then building a WHERE query string, it quickly becomes apparent that's not the case as you start getting into escaping, logic for every possible SQL operator, etc.
This is a good method to spend some time reading and thinking about, as it's also pretty key for this new approach.
| engine = create_engine( | ||
| "duckdb://", | ||
| creator=lambda: ConnectionWrapper(conn), | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit of a hack, but kind of unsure another way to do it. We need a SQLAlchemy engine object that we can use for reflection/introspection... but we already have a DuckDB connection, we don't want this to do that for us.
This was inspired by this comment from the duckdb_engine SQLAlchemy library creator.
What's important to remember is that we don't use this engine for anything except introspection, so not terribly critical.
ehanson8
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works as expected, 2 very minor comments!
timdex_dataset_api/dataset.py
Outdated
| "month": run_date_obj.strftime("%m"), | ||
| "day": run_date_obj.strftime("%d"), | ||
| } | ||
| def _stream_data_query_batches(self, data_query: str) -> Iterator[pa.RecordBatch]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this go below read_batches_iter? I think private methods below what calls them is more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was on the fence here. If we move this method, I think we'd want to move all methods that are called by read_batches_iter():
The reasoning behind the current arrangement was that read_batches_iter() is followed immediately by all the other read methods:
Honestly, I think I like having the required sub-methods immediately following it as well. I'll make the change, and we can always tweak if not better.
ehanson8
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved!
Purpose and background context
High level, this PR updates all
TIMDEXDatasetread methods to use DuckDB for record querying and retrieval. This completes the primary work of epic TIMX-515, with remaining tickets focusing on metadata management (e.g. creating the first static database file and merging deltas) and deployment.The read method signatures remain quite similar, but under the hood the querying and retrieval is following a new two-step process:
This is sketched in a new "Reading Data from TIMDEXDataset" document under section, "How reading works (two-step process)".
The changes in this PR built on updates from previous PRs, including reinstating some temporarily skipped tests. It is recommended to review by looking at individual commits. The most meaningful changes come from commit "Rework read methods to utilize metadata" which updated the read methods to use dataset metadata + DuckDB for querying and retrieval. Commits that follow that are mostly updating tests, reinstating skipped tests, some work on fixtures to dramatically increase testing time via fixture scoping, and documentation.
Changes for Transmogrifier?
The
TIMDEXDataset.write()method signature remains the same, but now append deltas will get added to the dataset as a side effect for all writes; no code changes required.Changes for Pipeline Lambdas?
Changes for TIM?
TIMDEXDatasetloading and reading: https://mitlibraries.atlassian.net/browse/TIMX-536How can a reviewer manually see the effects of these changes?
1- Set AWS Dev
TimdexManagerscredentials2- Set env vars:
3- Start Ipython shell with
pipenv run ipythonand do some setup:4- Perform a metadata query to get a sense of some ETL runs:
5- Simulate how TIM will provide a
run_idas a "simple" key/value filter to get records to index:This is an interesting one! We no longer need to provide
sourceor evenrun_dateto efficiently get this data. With just therun_id, the metadata query pinpoints the records + parquet files to use.The discerning eye may notice the metadata query above had 250 rows, but the debug statement says
"Metadata query identified 248 rows, across 2 parquet files". This is because we added theaction="index"to this query, removing a couple ofaction="delete"rows apparently.6- We can use "simple" key/value filters or the new
where=SQL filtering and demonstrate the results are the same:7- Next, we can show the memory-safe reading of a large number of records. This yields every
timdex_record_idfromalmarecords:8- Lastly, we can demonstrate complex filtering that was not formerly possible with just key/value equality filters:
Includes new or updated dependencies?
YES: these changes introduce
sqlalchemyfor programattic query buildingChanges expectations for external applications?
YES: as noted above, tickets have been made for small syntactical changes that pipeline lambdas and TIM will require
What are the relevant tickets?