Skip to content

Conversation

@ghukill
Copy link
Contributor

@ghukill ghukill commented May 23, 2025

Purpose and background context

This PR allows for filtering -- either during .load() or by any read method -- in conjunction with using .load(current_records=True). With these changes, only the most recent version of a record will be yielded, or not at all if filtering removes the most recent version for whatever reason.

Jira ticket TIMX-497 has a fairly in-depth explanation of the behavior which will not be restated here.

The mechanics of how this works is outlined:

Ultimately, this should make usage of .load(current_records=True) less surprising. No matter what kind of filtering is applied -- action="index" or run_date="2025-02-15" or run_id="abc123" -- any records that are yielded will be the current version of the record in dataset, or not yielded at all if filtering removes that version. Previously, it was possible to encounter non-current versions of a record even if .load(current_records=True) was set.

How can a reviewer manually see the effects of these changes?

It's difficult pinpoint a tidy example in the real data, but a test fixture can get at this pretty easily.

The test fixture dataset_with_runs_location provides a dataset for simulated alma and dspace rows. Focusing on alma, we see these "runs" (number of records, source, run_date, run_type, action (for all records in run), run_id):

[
    (40, "alma", "2024-12-01", "full", "index", "run-1"),
    (20, "alma", "2024-12-15", "daily", "index", "run-2"),
    (100, "alma", "2025-01-01", "full", "index", "run-3"),
    (50, "alma", "2025-01-02", "daily", "index", "run-4"),
    (25, "alma", "2025-01-03", "daily", "index", "run-5"),
    (10, "alma", "2025-01-04", "daily", "delete", "run-6"),
    (9, "alma", "2025-01-05", "daily", "index", "run-7"),
]

Runs run-5 through run-7 are worth focusing on. In natural language, these runs did the following:

  • run-5 created/modified 25 records, alma:0 - alma:24 in the dataset
  • run-6 deleted 10 records, alma:0 - alma:9
  • run-7 recreated 9 records, alma:0 - alma:8

We can kind of ignore run-1 through run-4 for this example.

The net effect of these runs is that alma:9 no longer exists in Opensearch. The most recent version of it in the dataset is from run-6 where it had action="delete".

So, what happens if we do the following?

timdex_dataset = TIMDEXDataset("path/to/fictional/dataset")
timdex_dataset.load(current_records=True, source="alma")

records = timdex_dataset.read_dataframe(action="index")

Before the changes in this PR alma:9 from run-5 would have inaccurately still been yielded. That's because filtering fully removed run-6, and so we had not "seen" it yet by the time we reached that run.

After the changes in this PR, alma:9 is no longer yielded based on this filtering. The "current" version of alma:9 is action="delete" so any kind of action="index" filtering must necessarily not include it.

Any kind of manual testing or runs should work, yield records, etc. But all said, it's this test case scenario that best exemplifies the purpose and effect of this PR.

Includes new or updated dependencies?

NO

Changes expectations for external applications?

NO: nothing is using this new functionality yet

What are the relevant tickets?

Developer

  • All new ENV is documented in README
  • All new ENV has been added to staging and production environments
  • All related Jira tickets are linked in commit message(s)
  • Stakeholder approval has been confirmed (or is not needed)

Code Reviewer(s)

  • The commit message is clear and follows our guidelines (not just this PR message)
  • There are appropriate tests covering any new functionality
  • The provided documentation is sufficient for understanding any new functionality introduced
  • Any manual tests have been performed or provided examples verified
  • New dependencies are appropriate or there were no changes

ghukill added 2 commits May 23, 2025 15:39
Why these changes are being introduced:

Formerly, an instance of TIMDEXRunManager expected a TIMDEXDataset on init, where
it would utilize the pyarrow TIMDEXDataset.dataset.  This results in an unneeded
tightly coupling betweent these classes.

How this addresses that need:
* TIMDEXRunManager updated to only expect a pyarrow Dataset

Side effects of this change:
* None

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-496
Why these changes are being introduced:

Unexpected behavior was possible when using load(current_records=True)
and then applying additional filtering to the dataset before reading.
In short, a non-current record could be yielded if filtering removed
the truly current version of the record.  This happened because the
reverse chronological marking of "seen" records would not "see" this
record and happily yield an older version.

How this addresses that need:

When load(current_records=True) is used, a clone of the dataset is
saved to the TIMDEXDataset object before any additional filtering
is applied.  This dataset is just metadata, not expensive to store.

Then, during any read methods, this dataset is used to provide an
exhaustive and ordered list of timdex_record_ids.  Even if a record
has been filtered out by the read method (e.g. limiting records to only
action="index"), this secondary list of timdex_record_ids is used
as the authoritative list of "seen" timdex_record_ids.

There is a bit of network overhead to this parallel batch reading,
but fairly minimal as we are only retrieving the 'timdex_record_id';
perhaps 1-2mb of IO per millions of records.

Side effects of this change:
* Applications like TIM that will likely use this new functionality
to yield only "current" records can do so confidently, and optionally
with additional filtering, knowing they will only encounter current
versions of a record from the dataset.

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-497
@coveralls
Copy link

coveralls commented May 23, 2025

Pull Request Test Coverage Report for Build 15309482286

Details

  • 29 of 29 (100.0%) changed or added relevant lines in 2 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage increased (+0.6%) to 95.484%

Totals Coverage Status
Change from base Build 15217885508: 0.6%
Covered Lines: 296
Relevant Lines: 310

💛 - Coveralls

assert df.action.value_counts().to_dict() == {"index": 99}


def test_dataset_current_records_index_filtering_accurate_records_yielded(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test is the most verbose and percise in pinpointing how these changes address the issue.

@ghukill ghukill marked this pull request as ready for review May 27, 2025 13:25
@ghukill
Copy link
Contributor Author

ghukill commented May 27, 2025

@jonavellecuerdo, @ehanson8 - I saved this logging output during the development work, thinking it might help illustrate how this code change works. At the risk of complicating things, opting to include it here.

NOTE: The records and counts here are not identical to the test discussed above, but used generate_sample_records() hence the similar looking identifiers.

In [2]: td1.read_dataframe()
DEBUG:timdex_dataset_api.dataset:Seen: set(), Dedupe ids: ['alma:0', 'alma:1', 'alma:2'], batch ids: []
DEBUG:timdex_dataset_api.dataset:Seen: {'alma:2', 'alma:1', 'alma:0'}, Dedupe ids: ['alma:0', 'alma:1', 'alma:2', 'alma:3', 'alma:4', 'alma:5', 'alma:6', 'alma:7', 'alma:8', 'alma:9'], batch ids: ['alma:0', 'alma:1', 'alma:2', 'alma:3', 'alma:4', 'alma:5', 'alma:6', 'alma:7', 'alma:8', 'alma:9']
DEBUG:timdex_dataset_api.dataset:Unseen indices from batch: [3, 4, 5, 6, 7, 8, 9]
DEBUG:timdex_dataset_api.dataset:Seen: {'alma:8', 'alma:9', 'alma:2', 'alma:1', 'alma:6', 'alma:7', 'alma:0', 'alma:3', 'alma:4', 'alma:5'}, Dedupe ids: ['alma:0', 'alma:1', 'alma:2', 'alma:3', 'alma:4'], batch ids: ['alma:0', 'alma:1', 'alma:2', 'alma:3', 'alma:4']
Out[2]: 
  timdex_record_id                                    source_record             transformed_record source    run_date run_type action run_id  run_record_offset  year month day
0           alma:3  b'<record><title>Hello World.</title></record>'  b'{"title":["Hello World."]}'   alma  2025-01-02    daily  index  run-2                  3  2025    01  02
1           alma:4  b'<record><title>Hello World.</title></record>'  b'{"title":["Hello World."]}'   alma  2025-01-02    daily  index  run-2                  4  2025    01  02
2           alma:5  b'<record><title>Hello World.</title></record>'  b'{"title":["Hello World."]}'   alma  2025-01-02    daily  index  run-2                  5  2025    01  02
3           alma:6  b'<record><title>Hello World.</title></record>'  b'{"title":["Hello World."]}'   alma  2025-01-02    daily  index  run-2                  6  2025    01  02
4           alma:7  b'<record><title>Hello World.</title></record>'  b'{"title":["Hello World."]}'   alma  2025-01-02    daily  index  run-2                  7  2025    01  02
5           alma:8  b'<record><title>Hello World.</title></record>'  b'{"title":["Hello World."]}'   alma  2025-01-02    daily  index  run-2                  8  2025    01  02
6           alma:9  b'<record><title>Hello World.</title></record>'  b'{"title":["Hello World."]}'   alma  2025-01-02    daily  index  run-2                  9  2025    01  02

The most illuminating lines are the 2nd and 3rd I think. Imagine this is moving through runs and batches like normal. Some line commentary

DEBUG:timdex_dataset_api.dataset:Seen: set(), Dedupe ids: ['alma:0', 'alma:1', 'alma:2'], batch ids: []
  • first batch of records
  • we have not "seen" any records yet
  • when yielding identifiers only from self._current_records_dataset we "see" alma:0 through alma:2
  • but the batch_ids shows that, due to filtering, the batch we might actually yield from didn't include those records
DEBUG:timdex_dataset_api.dataset:Seen: {'alma:2', 'alma:1', 'alma:0'}, Dedupe ids: ['alma:0', 'alma:1', 'alma:2', 'alma:3', 
  • the next batch, note that alma:0 - alma:2 are now "seen" even though we didn't yield them! This is the key.
DEBUG:timdex_dataset_api.dataset:Unseen indices from batch: [3, 4, 5, 6, 7, 8, 9]
  • kind of mechanical in nature, but it's demonstrating that we're only going to yield records 3-9 in this batch because we've already "seen" 0-2
DEBUG:timdex_dataset_api.dataset:Seen: {'alma:8', 'alma:9', 'alma:2', 'alma:1', 'alma:6', 'alma:7', 'alma:0', 'alma:3', 'alma:4', 'alma:5'}, Dedupe ids: ['alma:0', 'alma:1', 'alma:2', 'alma:3', 'alma:4'], batch ids: ['alma:0', 'alma:1', 'alma:2', 'alma:3', 'alma:4']
  • by the next batch, we've effectively "seen" alma:0 through alma:9
  • unless we hit a new timdex_record_id in future batches, we won't yield those anymore

And finally, the dataframe at the end shows that we never yielded alma:0 through alma:2. Though those identifiers never showed up in the batch we would actually yield from, they did show up in the parallel batches from self._current_records_dataset and informed any future batches that we've "seen" them.

Copy link

@ehanson8 ehanson8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, few optional suggestions!


def _yield_deduped_batches(
self, batches: Iterator[pa.RecordBatch]
def _yield_current_record_deduped_batches(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: I think _yield_current_record_batches or even _yield_current_records would be clear enough

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My inclination is to leave as-is to try and communicate that a) it's still related to "batches" and b) the work is getting performed within each batch.

I worry that _yield_current_record_batches() suggests some batches are skipped entirely, and _yield_current_records() loses the idea that it's yielding batches still.

But open to additional feedback and discussion, it has so far defied a nice tidy name...

Copy link
Contributor

@jonavellecuerdo jonavellecuerdo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me!

@jonavellecuerdo jonavellecuerdo self-requested a review May 28, 2025 19:58
@ghukill
Copy link
Contributor Author

ghukill commented May 28, 2025

@jonavellecuerdo - new commit with updated "maybe filtered record batches" and "definitely unfiltered id batches". And FYI @ehanson8 , one of your early comments about a method name is included in this change too.

Thanks for the feedback all!

@ghukill ghukill merged commit 8da7a7d into main May 28, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants