Skip to content

Commit 7337f31

Browse files
authored
Merge pull request #168 from MITLibraries/TIMX-543-cr-optimize-v2
TIMX 543 - Optimize current records metadata queries and data retreival
2 parents b7c3350 + c6ccd58 commit 7337f31

File tree

6 files changed

+148
-52
lines changed

6 files changed

+148
-52
lines changed

tests/conftest.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def timdex_dataset_multi_source(tmp_path_factory) -> TIMDEXDataset:
113113
)
114114

115115
# ensure static metadata database exists for read methods
116-
dataset.metadata.recreate_static_database_file()
116+
dataset.metadata.rebuild_dataset_metadata()
117117
dataset.metadata.refresh()
118118

119119
return dataset
@@ -223,7 +223,7 @@ def timdex_dataset_same_day_runs(tmp_path) -> TIMDEXDataset:
223223
def timdex_metadata(timdex_dataset_with_runs) -> TIMDEXDatasetMetadata:
224224
"""TIMDEXDatasetMetadata with static database file created."""
225225
metadata = TIMDEXDatasetMetadata(timdex_dataset_with_runs.location)
226-
metadata.recreate_static_database_file()
226+
metadata.rebuild_dataset_metadata()
227227
metadata.refresh()
228228
return metadata
229229

@@ -233,7 +233,7 @@ def timdex_dataset_with_runs_with_metadata(
233233
timdex_dataset_with_runs,
234234
) -> TIMDEXDataset:
235235
"""TIMDEXDataset with runs and static metadata created for read tests."""
236-
timdex_dataset_with_runs.metadata.recreate_static_database_file()
236+
timdex_dataset_with_runs.metadata.rebuild_dataset_metadata()
237237
timdex_dataset_with_runs.metadata.refresh()
238238
return timdex_dataset_with_runs
239239

tests/test_metadata.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def test_tdm_s3_dataset_structure_properties(s3_bucket_mocked):
4343

4444
def test_tdm_create_metadata_database_file_success(caplog, timdex_metadata_empty):
4545
caplog.set_level("DEBUG")
46-
timdex_metadata_empty.recreate_static_database_file()
46+
timdex_metadata_empty.rebuild_dataset_metadata()
4747

4848

4949
def test_tdm_init_metadata_file_found_success(timdex_metadata):

tests/test_read.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ def test_read_batches_where_and_dataset_filters_are_combined(timdex_dataset_mult
125125
[
126126
"SELECT * FROM current_records WHERE source = 'libguides'",
127127
"FROM records WHERE source = 'libguides'",
128-
"source = 'libguides';",
129-
" run_date = '2024-12-01'; ",
128+
"ORDER BY timdex_record_id",
129+
"LIMIT 3",
130130
],
131131
)
132132
def test_read_batches_where_rejects_non_predicate_sql(
@@ -254,7 +254,7 @@ def test_dataset_load_current_records_gets_correct_same_day_full_run(
254254
timdex_dataset_same_day_runs,
255255
):
256256
# ensure metadata exists for this dataset
257-
timdex_dataset_same_day_runs.metadata.recreate_static_database_file()
257+
timdex_dataset_same_day_runs.metadata.rebuild_dataset_metadata()
258258
timdex_dataset_same_day_runs.metadata.refresh()
259259
df = timdex_dataset_same_day_runs.read_dataframe(
260260
table="current_records", run_type="full"
@@ -265,7 +265,7 @@ def test_dataset_load_current_records_gets_correct_same_day_full_run(
265265
def test_dataset_load_current_records_gets_correct_same_day_daily_runs_ordering(
266266
timdex_dataset_same_day_runs,
267267
):
268-
timdex_dataset_same_day_runs.metadata.recreate_static_database_file()
268+
timdex_dataset_same_day_runs.metadata.rebuild_dataset_metadata()
269269
timdex_dataset_same_day_runs.metadata.refresh()
270270
first_record = next(
271271
timdex_dataset_same_day_runs.read_dicts_iter(
@@ -276,3 +276,9 @@ def test_dataset_load_current_records_gets_correct_same_day_daily_runs_ordering(
276276
# just assert it's one of the daily runs
277277
assert first_record["run_id"] in {"run-4", "run-5"}
278278
assert first_record["action"] in {"index", "delete"}
279+
280+
281+
def test_read_batches_iter_limit_returns_n_rows(timdex_dataset_multi_source):
282+
batches = timdex_dataset_multi_source.read_batches_iter(limit=10)
283+
table = pa.Table.from_batches(batches)
284+
assert len(table) == 10

timdex_dataset_api/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from timdex_dataset_api.metadata import TIMDEXDatasetMetadata
55
from timdex_dataset_api.record import DatasetRecord
66

7-
__version__ = "3.0.0"
7+
__version__ = "3.1.0"
88

99
__all__ = [
1010
"DatasetRecord",

timdex_dataset_api/dataset.py

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@ def location_scheme(self) -> Literal["file", "s3"]:
143143
def data_records_root(self) -> str:
144144
return f"{self.location.removesuffix('/')}/data/records" # type: ignore[union-attr]
145145

146+
def refresh(self) -> None:
147+
"""Fully reload TIMDEXDataset instance."""
148+
self.__init__(self.location) # type: ignore[misc]
149+
146150
def create_data_structure(self) -> None:
147151
"""Ensure ETL records data structure exists in TIMDEX dataset."""
148152
if self.location_scheme == "file":
@@ -354,6 +358,7 @@ def read_batches_iter(
354358
self,
355359
table: str = "records",
356360
columns: list[str] | None = None,
361+
limit: int | None = None,
357362
where: str | None = None,
358363
**filters: Unpack[DatasetFilters],
359364
) -> Iterator[pa.RecordBatch]:
@@ -371,13 +376,16 @@ def read_batches_iter(
371376
Args:
372377
- table: an available DuckDB view or table
373378
- columns: list of columns to return
379+
- limit: limit number of records yielded
374380
- where: raw SQL WHERE clause that can be used alone, or in combination with
375381
key/value DatasetFilters
376382
- filters: simple filtering based on key/value pairs from DatasetFilters
377383
"""
384+
start_time = time.perf_counter()
385+
378386
# build and execute metadata query
379387
metadata_time = time.perf_counter()
380-
meta_query = self.metadata.build_meta_query(table, where, **filters)
388+
meta_query = self.metadata.build_meta_query(table, limit, where, **filters)
381389
meta_df = self.metadata.conn.query(meta_query).to_df()
382390
logger.debug(
383391
f"Metadata query identified {len(meta_df)} rows, "
@@ -410,6 +418,10 @@ def read_batches_iter(
410418
f"@ {batch_rps} records/second, total yielded: {total_yield_count}"
411419
)
412420

421+
logger.debug(
422+
f"read_batches_iter() elapsed: {round(time.perf_counter()-start_time, 2)}s"
423+
)
424+
413425
def _iter_meta_chunks(self, meta_df: pd.DataFrame) -> Iterator[pd.DataFrame]:
414426
"""Utility method to yield chunks of metadata query results."""
415427
for start in range(0, len(meta_df), self.config.duckdb_join_batch_size):
@@ -461,25 +473,35 @@ def read_dataframes_iter(
461473
self,
462474
table: str = "records",
463475
columns: list[str] | None = None,
476+
limit: int | None = None,
464477
where: str | None = None,
465478
**filters: Unpack[DatasetFilters],
466479
) -> Iterator[pd.DataFrame]:
467480
for record_batch in self.read_batches_iter(
468-
table=table, columns=columns, where=where, **filters
481+
table=table,
482+
columns=columns,
483+
limit=limit,
484+
where=where,
485+
**filters,
469486
):
470487
yield record_batch.to_pandas()
471488

472489
def read_dataframe(
473490
self,
474491
table: str = "records",
475492
columns: list[str] | None = None,
493+
limit: int | None = None,
476494
where: str | None = None,
477495
**filters: Unpack[DatasetFilters],
478496
) -> pd.DataFrame | None:
479497
df_batches = [
480498
record_batch.to_pandas()
481499
for record_batch in self.read_batches_iter(
482-
table=table, columns=columns, where=where, **filters
500+
table=table,
501+
columns=columns,
502+
limit=limit,
503+
where=where,
504+
**filters,
483505
)
484506
]
485507
if not df_batches:
@@ -490,22 +512,32 @@ def read_dicts_iter(
490512
self,
491513
table: str = "records",
492514
columns: list[str] | None = None,
515+
limit: int | None = None,
493516
where: str | None = None,
494517
**filters: Unpack[DatasetFilters],
495518
) -> Iterator[dict]:
496519
for record_batch in self.read_batches_iter(
497-
table=table, columns=columns, where=where, **filters
520+
table=table,
521+
columns=columns,
522+
limit=limit,
523+
where=where,
524+
**filters,
498525
):
499526
yield from record_batch.to_pylist()
500527

501528
def read_transformed_records_iter(
502529
self,
503530
table: str = "records",
531+
limit: int | None = None,
504532
where: str | None = None,
505533
**filters: Unpack[DatasetFilters],
506534
) -> Iterator[dict]:
507535
for record_dict in self.read_dicts_iter(
508-
table=table, columns=["transformed_record"], where=where, **filters
536+
table=table,
537+
columns=["transformed_record"],
538+
limit=limit,
539+
where=where,
540+
**filters,
509541
):
510542
if transformed_record := record_dict["transformed_record"]:
511543
yield json.loads(transformed_record)

0 commit comments

Comments
 (0)