Skip to content

feat: adding split_fileset functionality and Result output#1534

Open
hooloobooroodkoo wants to merge 14 commits intoscikit-hep:masterfrom
hooloobooroodkoo:processor_result_type
Open

feat: adding split_fileset functionality and Result output#1534
hooloobooroodkoo wants to merge 14 commits intoscikit-hep:masterfrom
hooloobooroodkoo:processor_result_type

Conversation

@hooloobooroodkoo
Copy link
Copy Markdown
Contributor

@hooloobooroodkoo hooloobooroodkoo commented Mar 19, 2026

Closes #1532

a. This PR adds two simple functions to the coffea.dataset_tools module: split_fileset and hash_fileset.
The idea is to give users more control over how they execute analysis on a fileset, and to get a partial result instead of nothing if something breaks (e.g., one file is broken). If this PR is accepted and merged (or the alternative one I'm going to open next), I'll write documentation with a kind of "Best practices" guide on how to use it, with the examples below.

  1. split_fileset() allows choosing a strategy for how to split the fileset into parts (~chunks, but higher-level chunks — not the usual coffea chunks; one chunk will be a unique subset of files from the fileset). It returns a list of partial filesets. This function accepts:
  • fileset: {dataset: {"files": {path: treename, ...}}}
  • strategy: "by_dataset" — one dataset per one chunk; None — all datasets together
  • percentage: an integer that divides 100 evenly (20, 25, 50...). If strategy="by_dataset", each dataset is split into 100/percentage chunks; otherwise the whole fileset is split into 100/percentage chunks where each chunk gets that percentage of each dataset's files
  • datasets: list, callable or tuple of dataset names

This gives users the ability to write analysis like this:

chunks = split_fileset(fileset, strategy="by_dataset", percentage=20)
result = None
for chunk in chunks:
   try:
       output, metrics = run(chunk, processor_instance=Processor())
       if result is None:
           result = output
       else:
           result += output
   except BaseException as e:
       print(f"Error: {e}")
       continue

If one or several chunks contained a broken file etc., a partial result will still be returned.

  1. hash_fileset allows creating a unique filename for a processed chunk based on its file paths and dataset names. This is useful when you want to preserve partial results and only rerun analysis on missing chunks. The recommended approach in a Jupyter notebook, for example, would be:
# concept example
chunks = split_fileset(fileset, strategy="by_dataset", percentage=20)

import os

result = None
cache_dir = "./chunk_cache"
os.makedirs(cache_dir, exist_ok=True)

After the first run, partial result will be saved. Then user can just rerun the same cell and partial result will be extracted from cache, while processor run will be only applied to missing chunks.

for chunk in chunks:
    chunk_hash = hash_fileset(chunk)
    cache_path = os.path.join(cache_dir, f"{chunk_hash}.coffea")

    if os.path.exists(cache_path):
        print(f"Loading cached result for chunk {chunk_hash}")
        output = load(cache_path)
    else:
        try:
            output, metrics = run(chunk, processor_instance=Processor())
            save(output, cache_path)
            print(f"Saved result for chunk {chunk_hash}")
        except BaseException as e:
            print(f"Error processing chunk {chunk_hash}: {e}")
            continue

    if result is None:
        result = output
    else:
        result += output

b. This PR also introduces an optional Rust-inspired Result return type for coffea.processor.Runner. Instead of returning Accumulatable or raising an error in case of failure, it now can return Result object that can be either Ok(Accumulateble) or Err(Exception).

In the Runner user should specify a new flag use_result_type:

run = processor.Runner(executor=executor,
                        schema=schemas.NanoAODSchema,
                        savemetrics=True,
                        use_result_type=True
                      )

Then the user can decide what to do with errors, and the code would look the following way:

result = run(chunk, processor_instance=Processor())
if result.is_ok():
    output, metrics = result.unwrap()
else:
    # user can implement additional logic here
    print(f"Error: {result.exception}")
    continue

New API

from coffea.processor import Ok, Err, Result

result = run(chunk, processor_instance=Processor())
result.is_ok()       # bool
result.is_err()      # bool
result.unwrap()      # returns Accumulatable, or raises if Err
result.exception     # the exception, if Result is Err

@ikrommyd
Copy link
Copy Markdown
Collaborator

I'd say we should probably proceed with this one instead of #1533 but make this new result type opt-in via a keyword argument in Runner

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Adding split fileset strategies to coffea functionality

2 participants