Skip to content

Conversation

@ghukill
Copy link
Contributor

@ghukill ghukill commented May 29, 2025

Purpose and background context

1- TDA Library Code Changes

During work on yielding only "current" records from the dataset, where ordering of the ETL runs in the dataset is critical, it was determined that more time granularity was needed for each ETL run.

Currently we store the YYYY-MM-DD for each run in the run_date column, but if multiple runs occur on the same day, we are unable to order them more granularly from this alone.

How this addresses that need:

  • Adds new run_timestamp to parquet dataset schema
  • Timestamp is minted before any runs are written, and then used for each row in the ETL run

2- Bulk data work

In addition to this code change, we will need to retroactively add a run_timestamp to all past ETL runs in the parquet dataset. A python script with onetime code that will be used. This code won't be committed to this repository -- unless it makes sense to build some kind of bulk editing utility into this library? -- but will be shared with the team for input before running.

The flow can be summarized as the following:

  • invoke the script with a CSV of limited explicit run_timestamp's for specific run_id's
  • for each ETL run (parquet file(s)) in the dataset
    - if the run_id is not included in the input CSV, set a run_timestamp of YYYY-MM-DD 12:00:01:0000; one second after midnight
    - if the run_id is included in the input CSV, use the explicit run_timestamp provided
  • use the run_timestamp from the CSV for the run_id associated with the parquet file

An interesting quality of parquet datasets is that "atomic updates" (i.e. adding a new column + value for rows) is not something well supported. So this script will effectively rewrite the dataset, parquet file-by-file, to introduce this new column. The upside to this work is establishing some experience and workflows for performing such a bulk update.

The production parquet dataset has roughly 400+ parquet files, probably around 10gb total data ~597 parquet files @ 10gb. But an interesting dimension here is that most files in the dataset are static and not touched after an ETL run. So we could theoretically be writing these updated files to the dataset even while ETL runs (with the new code + column!) are occurring. Which is all to say, actually performing this work shouldn't require much of any coordination.

3- Order of work

  • PR code review + merge
  • Rebuild TIMDEX terraform in Dev, Stage, Prod to pickup TDA library changes
  • Any ETL runs after that will begin writing the run_timestamp columnt to new parquet files
  • Backup the production parquet dataset
  • Run bulk update script that updates only parquet files without run_timestamp

A strength of parquet datasets is schema evolution, meaning there is nothing inherently wrong with some parquet files missing a column. Any writes after the code goes live will include this new column, but all reads are not expecting or using it yet so it's okay if older files are missing it. This means we are not rushed to run our bulk update script.

How can a reviewer manually see the effects of these changes?

0- Set env vars:

TDA_LOG_LEVEL=DEBUG
WARNING_ONLY_LOGGERS=asyncio,botocore,urllib3,s3transfer,boto3

1- Open Ipython shell

pipenv run ipython

2- Perform two "full" writes that occur on the same day 2025-01-01:

import os
import time

from timdex_dataset_api import TIMDEXDataset
from timdex_dataset_api.config import configure_dev_logger
from tests.utils import generate_sample_records

configure_dev_logger()

LOCATION = "output/datasets/dataset_with_same_day_runs"

if not os.path.exists(LOCATION):
    os.mkdir(LOCATION)

timdex_dataset = TIMDEXDataset(LOCATION)

# Simulate two "full" runs from the same day, where "run-2" is chronologically more recent.
run_params = [
    (100, "alma", "2025-01-01", "full", "index", "run-1"),
    (75, "alma", "2025-01-01", "full", "index", "run-2"),
]
for params in run_params:
    num_records, source, run_date, run_type, action, run_id = params
    records = generate_sample_records(
        num_records,
        timdex_record_id_prefix=source,
        source=source,
        run_date=run_date,
        run_type=run_type,
        action=action,
        run_id=run_id,
    )
    timdex_dataset.write(records)
    time.sleep(5)  # 5 second sleep for observable delay in run_timestamp

# reload after writes
timdex_dataset.load()

After these writes, we can see two parquet files under the same day partitions:

output/datasets/dataset_with_same_day_runs
└── year=2025
    └── month=01
        └── day=01
            ├── a4556d0b-853c-47f9-a525-1241c3306f93-0.parquet
            └── eee646a1-3e98-4434-a2ee-2bc8e3568a1c-0.parquet

3- Load a dataframe with all rows and inspect new run_timestamp column values:

df = timdex_dataset.read_dataframe()

df[['run_date','run_id','run_timestamp']].value_counts()
"""
Out[3]: 
run_date    run_id  run_timestamp                   
2025-01-01  run-1   2025-05-29 14:39:20.016089+00:00    100
            run-2   2025-05-29 14:39:25.026558+00:00     75
Name: count, dtype: int64
"""
  • note that all rows for each run have a single run_timestamp value
  • note that run-2 is 5 seconds after run-1
  • note that run_timestamp has microsecond accuracy, which should be more than sufficient for even virtually simultaneous ETL runs

4- Lastly, utilize load(current_records=True) to see that run_timestamp is utilized to correctly to only get run-2 records given it's the most recent "full" run:

timdex_dataset.load(current_records=True)
df_current = timdex_dataset.read_dataframe()
df_current.run_id.value_counts()
"""
Out[4]: 
run_id
run-2    75   <-----------------
Name: count, dtype: int64
"""

Includes new or updated dependencies?

YES

Changes expectations for external applications?

YES: All TIMDEX components that use this library for reading and writing will need a terraform rebuild to pick up this change. Otherwise, they need no further modification.

What are the relevant tickets?

Developer

  • All new ENV is documented in README
  • All new ENV has been added to staging and production environments
  • All related Jira tickets are linked in commit message(s)
  • Stakeholder approval has been confirmed (or is not needed)

Code Reviewer(s)

  • The commit message is clear and follows our guidelines (not just this PR message)
  • There are appropriate tests covering any new functionality
  • The provided documentation is sufficient for understanding any new functionality introduced
  • Any manual tests have been performed or provided examples verified
  • New dependencies are appropriate or there were no changes

Why these changes are being introduced:

During work on yielding only "current" records from the dataset,
where ordering of the ETL runs in the dataset is critical, it
was determined that more time granularity was needed for each
ETL run.

Currently we store the YYYY-MM-DD for each run, but if multiple
runs occur on the same day, we are unable to order them more
granularly.

How this addresses that need:
* Adds new run_timestamp to parquet dataset schema
* Timestamp is minted before any runs are written, and then
used for each row in the ETL run

Side effects of this change:
* All TIMDEX components that use this library for reading
and writing will need a terraform rebuild to pick
up this change.  Otherwise, they need no further modification.

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-496
@coveralls
Copy link

coveralls commented May 29, 2025

Pull Request Test Coverage Report for Build 15349942489

Details

  • 7 of 7 (100.0%) changed or added relevant lines in 2 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage increased (+0.06%) to 95.541%

Totals Coverage Status
Change from base Build 15310331722: 0.06%
Covered Lines: 300
Relevant Lines: 314

💛 - Coveralls

@ghukill ghukill marked this pull request as ready for review May 29, 2025 15:26
@ghukill ghukill requested a review from a team May 29, 2025 15:27
@jonavellecuerdo jonavellecuerdo self-assigned this May 29, 2025
@ghukill
Copy link
Contributor Author

ghukill commented May 29, 2025

@MITLibraries/dataeng - for the backfill script work, we will have accurate run_timestamp for each run, not a "second past midnight" as sketched in the PR notes.

Using boto3 it was pretty trivial to get the StepFunction execution ID (which is the run_id) and then the execution start time (which is the run_timestamp` value we want) and create a CSV from that.

@ghukill
Copy link
Contributor Author

ghukill commented May 29, 2025

The following is a draft of the script that will be used to backfill the column run_timestamp for all pre-existing TIMDEX ETL runs:

"""
Example usage:
 PYTHONPATH=. pipenv run python output/timestamp_backfill/backfill_run_timestamp_column.py \
/Users/ghukill/dev/mit/data/timdex_dataset/prod_small_subset_1 \
output/timestamp_backfill/overrides.csv
--dry-run
"""

import argparse
import json
import os

import boto3
import pandas as pd
import pyarrow.dataset as ds

from timdex_dataset_api.dataset import TIMDEXDataset, TIMDEX_DATASET_SCHEMA
from timdex_dataset_api.config import configure_dev_logger, configure_logger

configure_dev_logger()

logger = configure_logger(__name__)


def backfill_parquet_file(
    parquet_filepath: str,
    dataset: ds.Dataset,
    run_timestamp_overrides: dict,
    dry_run: bool = False,
) -> tuple[bool, dict | None | Exception]:
    import pyarrow.parquet as pq
    import pyarrow as pa
    from datetime import datetime, UTC

    parquet_file = pq.ParquetFile(parquet_filepath, filesystem=dataset.filesystem)

    if "run_timestamp" in parquet_file.schema.names:
        logger.info(
            f"Parquet file already has 'run_timestamp' column, skipping: {parquet_filepath}"
        )
        return (True, {"file_path": parquet_filepath})

    try:
        table = parquet_file.read()

        run_id = table.column("run_id")[0].as_py()

        # Check if run_id exists in overrides - error if not found
        if run_id not in run_timestamp_overrides:
            error_msg = f"run_id '{run_id}' not found in timestamp overrides CSV"
            logger.error(error_msg)
            raise ValueError(error_msg)

        run_timestamp_override = run_timestamp_overrides[run_id]
        run_timestamp = datetime.fromisoformat(
            run_timestamp_override.replace("Z", "+00:00")
        )

        # create run_timestamp column using the exact schema definition
        num_rows = len(table)
        run_timestamp_field = TIMDEX_DATASET_SCHEMA.field("run_timestamp")
        run_timestamp_array = pa.array(
            [run_timestamp] * num_rows, type=run_timestamp_field.type
        )

        # add the run_timestamp column to the table
        table_with_timestamp = table.append_column("run_timestamp", run_timestamp_array)

        # write the updated table back to the same file
        if not dry_run:
            pq.write_table(
                table_with_timestamp,
                parquet_filepath,
                filesystem=dataset.filesystem,
            )

        update_details = {
            "file_path": parquet_filepath,
            "run_id": run_id,
            "rows_updated": num_rows,
            "run_timestamp_added": run_timestamp.isoformat(),
        }

        return (True, update_details)

    except Exception as e:
        logger.error(f"Error processing parquet file {parquet_filepath}: {str(e)}")
        return (False, str(e))


def backfill_dataset(location: str, timestamp_csv: str, dry_run=False):
    td = TIMDEXDataset(location)
    td.load()

    parquet_files = td.dataset.files
    logger.info(f"Found {len(parquet_files)} parquet files in dataset.")

    # Always load timestamp overrides CSV (required)
    logger.info(f"Loading timestamp overrides CSV: {timestamp_csv}")
    overrides_df = pd.read_csv(timestamp_csv)
    overrides = dict(overrides_df.values)
    logger.info(f"Loaded {len(overrides)} timestamp overrides")

    for i, parquet_file in enumerate(parquet_files):
        logger.info(
            f"Working on parquet file {i + 1}/{len(parquet_files)}: {parquet_file}"
        )
        result = backfill_parquet_file(
            parquet_file, td.dataset, run_timestamp_overrides=overrides, dry_run=dry_run
        )
        logger.info(json.dumps(result))


def generate_timestamps_overrides_csv(filepath: str) -> str:
    state_machine_arn = os.getenv("STEPFUNCTION_ARN")
    if not state_machine_arn:
        raise ValueError("STEPFUNCTION_ARN environment variable is required")

    stepfunctions = boto3.client("stepfunctions")

    # get all executions for the state machine
    executions = []
    paginator = stepfunctions.get_paginator("list_executions")
    for page in paginator.paginate(stateMachineArn=state_machine_arn):
        executions.extend(page["executions"])

    # create dataframe
    data = []
    for execution in executions:
        execution_id = execution["executionArn"].split(":")[-1]

        # format timestamp to UTC with microseconds and append
        start_date = execution["startDate"]
        timestamp_utc = start_date.strftime("%Y-%m-%d %H:%M:%S.%f")
        data.append({"run_id": execution_id, "run_timestamp": timestamp_utc})

    df = pd.DataFrame(data)

    df.to_csv(filepath, index=False)
    logger.info(f"Generated timestamp overrides CSV with {len(data)} entries: {filepath}")
    return filepath


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Backfill run_timestamp column in TIMDEX parquet files"
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        help="Scan files and report what would be done without making changes",
    )
    parser.add_argument(
        "dataset_location", help="Path to the dataset (local path or s3://bucket/path)"
    )
    parser.add_argument(
        "timestamp_csv",
        help="Path where the timestamp CSV will be generated and used for overrides",
    )
    args = parser.parse_args()

    logger.info("Generating timestamp overrides CSV from StepFunction executions...")
    timestamp_csv = generate_timestamps_overrides_csv(args.timestamp_csv)

    backfill_dataset(
        args.dataset_location, timestamp_csv=timestamp_csv, dry_run=args.dry_run
    )

Copy link

@ehanson8 ehanson8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good and the associated backfill work is well-considered!

# assert TIMDEXDataset.write() applies current time as run_timestamp
row_dict = next(dataset_with_same_day_runs.read_dicts_iter())
assert "run_timestamp" in row_dict
assert row_dict["run_timestamp"] == datetime(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally optional: you could use strftime() so it's immediately obvious that it matches @pytest.mark.freeze_time("2025-05-22 01:23:45.567890")

@ghukill
Copy link
Contributor Author

ghukill commented May 30, 2025

@jonavellecuerdo , @ehanson8 - if and when this code change is merged, as a next step I am thinking that I would like to introduce a new root level migrations/ folder. The script that will perform the backfill will live here, and that will allow a code review, some historical memory, etc.

Example structure:

migrations
├── 001_2025_05_30_backfill_run_timestamp_column.py
└── README.md

No real bearing on this PR, but noting I'm planning a followup PR for this backfill work as a formalized dataset migration.

Copy link
Contributor

@jonavellecuerdo jonavellecuerdo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ghukill I think this is looking good! I have a change request for an update to one of the new test that I think would be helpful in understanding the main reason for this update. 🤓

@ghukill ghukill requested a review from jonavellecuerdo May 30, 2025 15:18
@ghukill ghukill merged commit 926d630 into main May 30, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants