Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ notebooks/advanced_numba
notebooks/mltools
notebooks/dataset_discovery
notebooks/filespec
notebooks/splitting
```
1,427 changes: 1,427 additions & 0 deletions docs/source/notebooks/splitting.ipynb

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions src/coffea/dataset_tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
slice_files,
)
from coffea.dataset_tools.preprocess import preprocess
from coffea.dataset_tools.splitting import hash_fileset, split_fileset

__all__ = [
"preprocess",
"split_fileset",
"hash_fileset",
"apply_to_dataset",
"apply_to_fileset",
"max_chunks",
Expand Down
101 changes: 101 additions & 0 deletions src/coffea/dataset_tools/splitting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import hashlib
import json
import math

__all__ = [
"split_fileset",
"hash_fileset",
]


def hash_fileset(chunk):
"""
Return a stable SHA-256 hash for a fileset chunk.
The hash considers dataset names, file paths in sorted order.

Input
chunk: fileset dict {dataset: {"files": {path: treename, ...}, ...}, ...}

Output
hex string uniquely identifying this chunk's file content
"""
canonical = {
dataset: dict(sorted(files.get("files", {}).items()))
for dataset, files in chunk.items()
}
serialized = json.dumps(canonical, sort_keys=True, separators=(",", ":")).encode()
return hashlib.sha256(serialized).hexdigest()


def split_fileset(fileset, strategy=None, datasets=None, percentage=None):
"""
Split the fileset into chunks to enable getting a partial result if one or several
of the chunks failed to produce a result while being processed.
One chunk is one partial fileset(unique combination of files), these are not usual coffea chunks.


Input
fileset: {dataset: {"files": {path: treename, ...}}}
strategy: "by_dataset" — one dataset is one chunk; None — all datasets together
percentage: integer that divides 100 evenly (20, 25, 50...).
Each chunk gets this percentage of each dataset's files.
datasets: list, callable or tuple of datasets' names

Output
List of fileset dicts
If strategy only:
chunks = _split_fileset(fileset, "by_dataset") - one chunk per dataset
If percentage only:
chunks = _split_fileset(fileset, percentage=50) - 2 chunks (50 of each dataset in 1st chunk and 2nd, mixed chunks
If strategy and percentage:
chunks = _split_fileset(fileset, "by_dataset", percentage=50) - N_datasets * 2 chunks, not mixed chunks
If datasets + any/nothing:
strategies are only applied to chosen datasets
"""
if strategy is not None and strategy != "by_dataset":
raise ValueError(f"Unknown strategy '{strategy}'. Use 'by_dataset' or None.")
if percentage is not None:
if (
not isinstance(percentage, int)
or not (1 <= percentage <= 100)
or 100 % percentage != 0
):
raise ValueError(
"'percentage' must be an int that divides 100 evenly (e.g. 10, 20, 25, 50)."
)

if datasets is None:
pass
elif callable(datasets):
fileset = {k: v for k, v in fileset.items() if datasets(k)}
else:
fileset = {k: fileset[k] for k in datasets if k in fileset}

if strategy == "by_dataset":
groups = [{name: data} for name, data in fileset.items()]
else:
groups = [fileset]

if percentage is None:
return groups

n_chunks = 100 // percentage
result = []
for group in groups:
for bin_idx in range(n_chunks):
chunk = {}
for dataset, data in group.items():
files = data.get("files", {})
if not files:
continue
file_items = list(files.items())
n = len(file_items)
chunk_size = max(1, math.ceil(n / n_chunks))
start = bin_idx * chunk_size
end = min(start + chunk_size, n)
if start >= n:
continue
chunk[dataset] = {**data, "files": dict(file_items[start:end])}
if chunk:
result.append(chunk)
return result
4 changes: 4 additions & 0 deletions src/coffea/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Runner,
)
from .processor import ProcessorABC
from .result import Err, Ok, Result
from .taskvine_executor import TaskVineExecutor

__all__ = [
Expand All @@ -41,4 +42,7 @@
"dict_accumulator",
"defaultdict_accumulator",
"column_accumulator",
"Result",
"Ok",
"Err",
]
57 changes: 44 additions & 13 deletions src/coffea/processor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from .accumulator import Accumulatable, accumulate, set_accumulator
from .checkpointer import CheckpointerABC
from .processor import ProcessorABC
from .result import Err, Ok, Result

_PICKLE_PROTOCOL = pickle.HIGHEST_PROTOCOL
DEFAULT_METADATA_CACHE: MutableMapping = LRUCache(100000)
Expand Down Expand Up @@ -1060,6 +1061,9 @@ class Runner:
(please don't) during a session, the session can be restarted to clear the cache.
checkpointer : CheckpointerABC, optional
A CheckpointerABC instance to manage checkpointing of each chunk output
use_result_type : bool, optional
If True, ``__call__`` returns ``Ok(output)`` or ``Err(exception)``.
If False (default), returns the output directly and raises on error.
"""

executor: ExecutorBase
Expand All @@ -1071,6 +1075,7 @@ class Runner:
xrootdtimeout: int | None = 60
align_clusters: bool = False
savemetrics: bool = False
use_result_type: bool = False
schema: schemas.BaseSchema | None = schemas.NanoAODSchema
processor_compression: int = 1
use_skyhook: bool | None = False
Expand Down Expand Up @@ -1565,8 +1570,17 @@ def __call__(
treename: str | None = None,
uproot_options: dict | None = {},
iteritems_options: dict | None = {},
) -> Accumulatable:
"""Run the processor_instance on a given fileset
) -> "Result | Accumulatable":
"""
Run the processor_instance on a given fileset

When use_result_type=True, returns an object of class Return — either Ok(Accumulatable) or Err(Exception).
result.is_ok() - check success whether Result is Ok or Err
result.unwrap() - to get the value (Accumulatable or Exception)
result.exception - to inspect the error if Result is Err

When use_result_type=False (default), returns output directly and raises on error.
When savemetrics=True, the output value is (output, metrics).

Parameters
----------
Expand All @@ -1584,18 +1598,35 @@ def __call__(
iteritems_options : dict, optional
Any options to pass to ``tree.iteritems``
"""
wrapped_out = self.run(
fileset=fileset,
processor_instance=processor_instance,
treename=treename,
uproot_options=uproot_options,
iteritems_options=iteritems_options,
)
try:
wrapped_out = self.run(
fileset=fileset,
processor_instance=processor_instance,
treename=treename,
uproot_options=uproot_options,
iteritems_options=iteritems_options,
)
except BaseException as e:
if self.use_result_type:
return Err(e)
raise

exception = wrapped_out.get("exception", 0)
if exception != 0:
if self.use_result_type:
return Err(exception)
raise exception

if self.use_dataframes:
return wrapped_out # not wrapped anymore
if self.savemetrics:
return wrapped_out["out"], wrapped_out["metrics"]
return wrapped_out["out"]
out = wrapped_out
elif self.savemetrics:
out = (wrapped_out["out"], wrapped_out["metrics"])
else:
out = wrapped_out["out"]

if self.use_result_type:
return Ok(out)
return out

def preprocess(
self,
Expand Down
63 changes: 63 additions & 0 deletions src/coffea/processor/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""
Rust-style Result type for coffea Runner return values.
"""

from __future__ import annotations

from typing import Generic, TypeVar

T = TypeVar("T")


class Result(Generic[T]):
"""Result type — either Ok or Err."""

def is_ok(self) -> bool:
raise NotImplementedError

def is_err(self) -> bool:
return not self.is_ok()

def unwrap(self) -> T:
"""Either returns value (accumulator) for Ok Result or an exception if Err result."""
raise NotImplementedError


class Ok(Result[T]):
"""A successful result containing a value."""

def __init__(self, value: T) -> None:
self._value = value

@property
def value(self) -> T:
return self._value

def is_ok(self) -> bool:
return True

def unwrap(self) -> T:
return self._value

def __repr__(self) -> str:
return f"Ok({self._value!r})"


class Err(Result):
"""A failed result containing an exception."""

def __init__(self, exception: BaseException) -> None:
self._exception = exception

@property
def exception(self) -> BaseException:
return self._exception

def is_ok(self) -> bool:
return False

def unwrap(self):
raise self._exception

def __repr__(self) -> str:
return f"Err({self._exception!r})"
1 change: 0 additions & 1 deletion src/coffea/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
np = numpy
nb = numba


__all__ = [
"load",
"save",
Expand Down
Loading