Skip to content

Commit e006b1c

Browse files
committed
Add run_timestamp column to dataset
Why these changes are being introduced: During work on yielding only "current" records from the dataset, where ordering of the ETL runs in the dataset is critical, it was determined that more time granularity was needed for each ETL run. Currently we store the YYYY-MM-DD for each run, but if multiple runs occur on the same day, we are unable to order them more granularly. How this addresses that need: * Adds new run_timestamp to parquet dataset schema * Timestamp is minted before any runs are written, and then used for each row in the ETL run Side effects of this change: * All TIMDEX components that use this library for reading and writing will need a terraform rebuild to pick up this change. Otherwise, they need no further modification. Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-496
1 parent 8da7a7d commit e006b1c

File tree

6 files changed

+146
-14
lines changed

6 files changed

+146
-14
lines changed

Pipfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pytest = "*"
2525
ruff = "*"
2626
setuptools = "*"
2727
pip-audit = "*"
28+
pytest-freezegun = "*"
2829

2930
[requires]
3031
python_version = "3.12"

Pipfile.lock

Lines changed: 17 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/conftest.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,57 @@ def dataset_with_runs_location(tmp_path) -> str:
153153
@pytest.fixture
154154
def local_dataset_with_runs(dataset_with_runs_location) -> TIMDEXDataset:
155155
return TIMDEXDataset(dataset_with_runs_location)
156+
157+
158+
@pytest.fixture
159+
def dataset_with_same_day_runs(tmp_path) -> TIMDEXDataset:
160+
"""Dataset fixture where a single source had multiple runs on the same day.
161+
162+
After these runs, we'd expect 70 records in Opensearch:
163+
- most recent full run "run-2" established a 75 record base
164+
- runs "run-3" and "run-4" just modified records; no record count change
165+
- run "run-5" deleted 5 records
166+
167+
If the order of full runs 1 & 2 are not handled correctly, we'd see an incorrect
168+
baseline of 100 records.
169+
170+
If the order of daily runs 4 & 5 are not handled correctly, we'd see 75 records
171+
because the deletes would happen before the index just recreated the records.
172+
"""
173+
location = str(tmp_path / "dataset_with_same_day_runs")
174+
os.mkdir(location)
175+
176+
timdex_dataset = TIMDEXDataset(location)
177+
178+
run_params = []
179+
180+
# Simulate two "full" runs where "run-2" should establish the baseline.
181+
# Simulate daily runs, multiple per day sometimes, where deletes from "run-5" should
182+
# be represented.
183+
run_params.extend(
184+
[
185+
(100, "alma", "2025-01-01", "full", "index", "run-1"),
186+
(75, "alma", "2025-01-01", "full", "index", "run-2"),
187+
(10, "alma", "2025-01-01", "daily", "index", "run-3"),
188+
(20, "alma", "2025-01-02", "daily", "index", "run-4"),
189+
(5, "alma", "2025-01-02", "daily", "delete", "run-5"),
190+
]
191+
)
192+
193+
for params in run_params:
194+
num_records, source, run_date, run_type, action, run_id = params
195+
records = generate_sample_records(
196+
num_records,
197+
timdex_record_id_prefix=source,
198+
source=source,
199+
run_date=run_date,
200+
run_type=run_type,
201+
action=action,
202+
run_id=run_id,
203+
)
204+
timdex_dataset.write(records)
205+
206+
# reload after writes
207+
timdex_dataset.load()
208+
209+
return timdex_dataset

tests/test_dataset.py

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
# ruff: noqa: D205, S105, S106, SLF001, PD901, PLR2004
1+
# ruff: noqa: D205, D209, S105, S106, SLF001, PD901, PLR2004
22

33
import os
4-
from datetime import date
4+
from datetime import UTC, date, datetime
55
from unittest.mock import MagicMock, patch
66

77
import pyarrow as pa
@@ -463,3 +463,50 @@ def test_dataset_current_records_index_filtering_accurate_records_yielded(
463463
"alma:23",
464464
"alma:24",
465465
]
466+
467+
468+
@pytest.mark.freeze_time("2025-05-22 01:23:45.567890")
469+
def test_dataset_write_includes_minted_run_timestamp(
470+
dataset_with_same_day_runs,
471+
):
472+
# assert TIMDEXDataset.write() applies current time as run_timestamp
473+
row_dict = next(dataset_with_same_day_runs.read_dicts_iter())
474+
assert "run_timestamp" in row_dict
475+
assert row_dict["run_timestamp"] == datetime(
476+
2025,
477+
5,
478+
22,
479+
1,
480+
23,
481+
45,
482+
567890,
483+
tzinfo=UTC,
484+
)
485+
486+
# assert same time is used for entire batch
487+
df = dataset_with_same_day_runs.read_dataframe()
488+
assert len(list(df.run_timestamp.unique())) == 1
489+
490+
491+
def test_dataset_load_current_records_gets_correct_same_day_full_run(
492+
dataset_with_same_day_runs,
493+
):
494+
"""Two full runs were performed on the same day, but 'run-2' was performed most
495+
recently. current_records=True should discover the more recent of the two 'run-2',
496+
not 'run-1'."""
497+
dataset_with_same_day_runs.load(current_records=True, run_type="full")
498+
df = dataset_with_same_day_runs.read_dataframe()
499+
500+
assert list(df.run_id.unique()) == ["run-2"]
501+
502+
503+
def test_dataset_load_current_records_gets_correct_same_day_daily_runs_ordering(
504+
dataset_with_same_day_runs,
505+
):
506+
"""Two runs were performed on 2025-01-02, but the most recent records should be from
507+
run 'run-5' which are action='delete', not 'run-4' with action='index'."""
508+
dataset_with_same_day_runs.load(current_records=True, run_type="daily")
509+
first_record = next(dataset_with_same_day_runs.read_dicts_iter())
510+
511+
assert first_record["run_id"] == "run-5"
512+
assert first_record["action"] == "delete"

timdex_dataset_api/dataset.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
pa.field("year", pa.string()),
4242
pa.field("month", pa.string()),
4343
pa.field("day", pa.string()),
44+
pa.field("run_timestamp", pa.timestamp("us", tz="UTC")),
4445
)
4546
)
4647

@@ -62,6 +63,7 @@ class DatasetFilters(TypedDict, total=False):
6263
year: str | None
6364
month: str | None
6465
day: str | None
66+
run_timestamp: str | datetime | None
6567

6668

6769
@dataclass
@@ -112,15 +114,19 @@ def __init__(
112114
location (str | list[str]): Local filesystem path or an S3 URI to
113115
a parquet dataset. For partitioned datasets, set to the base directory.
114116
"""
115-
self.location = location
116117
self.config = config or TIMDEXDatasetConfig()
118+
self.location = location
117119

120+
# pyarrow dataset
118121
self.filesystem, self.paths = self.parse_location(self.location)
119122
self.dataset: ds.Dataset = None # type: ignore[assignment]
120123
self.schema = TIMDEX_DATASET_SCHEMA
121124
self.partition_columns = TIMDEX_DATASET_PARTITION_COLUMNS
125+
126+
# writing
122127
self._written_files: list[ds.WrittenFile] = None # type: ignore[assignment]
123128

129+
# reading
124130
self._current_records: bool = False
125131
self._current_records_dataset: ds.Dataset = None # type: ignore[assignment]
126132

@@ -405,26 +411,31 @@ def write(
405411
return self._written_files # type: ignore[return-value]
406412

407413
def create_record_batches(
408-
self,
409-
records_iter: Iterator["DatasetRecord"],
414+
self, records_iter: Iterator["DatasetRecord"]
410415
) -> Iterator[pa.RecordBatch]:
411416
"""Yield pyarrow.RecordBatches for writing.
412417
413418
This method expects an iterator of DatasetRecord instances.
414419
415-
Each DatasetRecord is validated and serialized to a dictionary before added to a
416-
pyarrow.RecordBatch for writing.
420+
Each DatasetRecord is serialized to a dictionary, any column data shared by all
421+
rows is added to the record, and then added to a pyarrow.RecordBatch for writing.
417422
418423
Args:
419424
- records_iter: Iterator of DatasetRecord instances
420425
"""
426+
run_timestamp = datetime.now(UTC)
421427
for i, record_batch in enumerate(
422428
itertools.batched(records_iter, self.config.write_batch_size)
423429
):
424-
batch = pa.RecordBatch.from_pylist(
425-
[record.to_dict() for record in record_batch]
426-
)
427-
logger.debug(f"Yielding batch {i+1} for dataset writing.")
430+
record_dicts = [
431+
{
432+
**record.to_dict(),
433+
"run_timestamp": run_timestamp,
434+
}
435+
for record in record_batch
436+
]
437+
batch = pa.RecordBatch.from_pylist(record_dicts)
438+
logger.debug(f"Yielding batch {i + 1} for dataset writing.")
428439
yield batch
429440

430441
def log_write_statistics(self, start_time: float) -> None:

timdex_dataset_api/run.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def get_runs_metadata(self, *, refresh: bool = False) -> pd.DataFrame:
5252
"source": "first",
5353
"run_date": "first",
5454
"run_type": "first",
55+
"run_timestamp": "first",
5556
"num_rows": "sum",
5657
"filename": list,
5758
}
@@ -65,9 +66,9 @@ def get_runs_metadata(self, *, refresh: bool = False) -> pd.DataFrame:
6566
lambda x: len(x)
6667
)
6768

68-
# sort by run date and source
69+
# sort by run_timestamp (more granularity than run_date) and source
6970
grouped_runs_df = grouped_runs_df.sort_values(
70-
["run_date", "source"], ascending=False
71+
["run_timestamp", "source"], ascending=False
7172
)
7273

7374
# cache the result
@@ -185,12 +186,14 @@ def _parse_run_metadata_from_parquet_file(self, parquet_filepath: str) -> dict:
185186
run_date = columns_meta[4]["statistics"]["max"]
186187
run_type = columns_meta[5]["statistics"]["max"]
187188
run_id = columns_meta[7]["statistics"]["max"]
189+
run_timestamp = columns_meta[9]["statistics"]["max"]
188190

189191
return {
190192
"source": source,
191193
"run_date": run_date,
192194
"run_type": run_type,
193195
"run_id": run_id,
196+
"run_timestamp": run_timestamp,
194197
"num_rows": num_rows,
195198
"filename": parquet_filepath,
196199
}

0 commit comments

Comments
 (0)