-
Notifications
You must be signed in to change notification settings - Fork 45
feat: missing features for observations #344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
47f650b
eeeedcd
ecba1db
8ccd237
e2b54ad
fcd46a3
a02deb3
48df56e
4823099
89fe751
4e6d877
2bd2567
ab56402
111bcc8
5738660
3680c6b
227f815
bc70153
a195b03
9bd5ca5
1f9f9aa
ebfe5af
a3e72f7
2b01564
9bb6c66
8d6ccd6
3c38f5f
6b2adb8
dcc1802
826dff8
d4be463
552c797
0053aec
7c2d4fb
031e9f2
5bebd27
fbb4121
e6ecbc0
ad61f65
ea2f7d9
c10ea85
e565866
6faeb15
348d43a
98dae84
a9816ca
20fc185
558f1a6
aa8e16d
f2615d6
1d94d5a
6941fdf
9ef1fc3
4f5acbb
23e8a5f
8b6b765
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
# (C) Copyright 2025 Anemoi contributors. | ||
# | ||
# This software is licensed under the terms of the Apache Licence Version 2.0 | ||
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. | ||
# | ||
# In applying this licence, ECMWF does not waive the privileges and immunities | ||
# granted to it by virtue of its status as an intergovernmental organisation | ||
# nor does it submit to any jurisdiction. | ||
|
||
|
||
import pandas as pd | ||
|
||
|
||
def check_dataframe(df): | ||
"""Check the DataFrame for consistency.""" | ||
if df.empty: | ||
pass | ||
if "times" not in df.columns: | ||
raise ValueError("The DataFrame must contain a 'times' column.") | ||
if not pd.api.types.is_datetime64_any_dtype(df["times"]): | ||
raise TypeError("The 'times' column must be of datetime type.") | ||
if "latitudes" not in df.columns or "longitudes" not in df.columns: | ||
raise ValueError("The DataFrame must contain 'latitudes' and 'longitudes' columns.") | ||
|
||
|
||
class ObservationsSource: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the plan for |
||
def __call__(self, window): | ||
raise NotImplementedError("This method should be implemented by subclasses") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make an |
||
|
||
def _check(self, df): | ||
check_dataframe(df) | ||
return df | ||
|
||
|
||
class ObservationsFilter: | ||
def __call__(self, df): | ||
"""Filter the data based on the given window.""" | ||
check_dataframe(df) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. call |
||
return df | ||
|
||
def _check(self, df): | ||
check_dataframe(df) | ||
return df | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here and above, should we |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -136,7 +136,8 @@ def _subset(self, **kwargs: Any) -> "Dataset": | |
if not kwargs: | ||
return self.mutate() | ||
|
||
name = kwargs.pop("name", None) | ||
name = kwargs.pop("set_group", None) # TODO(Florian) | ||
name = kwargs.pop("name", name) | ||
result = self.__subset(**kwargs) | ||
result._name = name | ||
|
||
|
@@ -177,13 +178,18 @@ def __subset(self, **kwargs: Any) -> "Dataset": | |
padding = kwargs.pop("padding", None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is a padding? (type |
||
|
||
if padding: | ||
if padding != "empty": | ||
raise ValueError(f"Only 'empty' padding is supported, got {padding=}") | ||
from .padded import Padded | ||
|
||
frequency = kwargs.pop("frequency", self.frequency) | ||
return ( | ||
Padded(self, start, end, frequency, dict(start=start, end=end, frequency=frequency)) | ||
Padded( | ||
self, | ||
start=start, | ||
end=end, | ||
frequency=frequency, | ||
padding=padding, | ||
reason=dict(start=start, end=end, frequency=frequency, padding=padding), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the need for the |
||
) | ||
._subset(**kwargs) | ||
.mutate() | ||
) | ||
|
@@ -404,6 +410,9 @@ def _select_to_columns(self, vars: str | list[str] | tuple[str] | set) -> list[i | |
if not isinstance(vars, (list, tuple)): | ||
vars = [vars] | ||
|
||
for v in vars: | ||
if v not in self.name_to_index: | ||
raise ValueError(f"select: unknown variable: {v}, available: {list(self.name_to_index)}") | ||
return [self.name_to_index[v] for v in vars] | ||
|
||
def _drop_to_columns(self, vars: str | Sequence[str]) -> list[int]: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,7 @@ def __init__(self, dataset: "Dataset", kids: list[Any], **kwargs: Any) -> None: | |
Additional keyword arguments. | ||
""" | ||
self.dataset = dataset | ||
assert isinstance(kids, list), "Kids must be a list" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are other iterables acceptable? |
||
self.kids = kids | ||
self.kwargs = kwargs | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -349,19 +349,7 @@ def _open(a: str | PurePath | dict[str, Any] | list[Any] | tuple[Any, ...]) -> " | |
""" | ||
from .dataset import Dataset | ||
from .stores import Zarr | ||
from .stores import zarr_lookup | ||
|
||
if isinstance(a, str) and len(a.split(".")) in [2, 3]: | ||
|
||
metadata_path = os.path.join(a, "metadata.json") | ||
if os.path.exists(metadata_path): | ||
metadata = load_any_dict_format(metadata_path) | ||
if "backend" not in metadata: | ||
raise ValueError(f"Metadata for {a} does not contain 'backend' key") | ||
|
||
from anemoi.datasets.data.records import open_records_dataset | ||
|
||
return open_records_dataset(a, backend=metadata["backend"]) | ||
from .stores import dataset_lookup | ||
|
||
if isinstance(a, Dataset): | ||
return a.mutate() | ||
|
@@ -370,7 +358,22 @@ def _open(a: str | PurePath | dict[str, Any] | list[Any] | tuple[Any, ...]) -> " | |
return Zarr(a).mutate() | ||
|
||
if isinstance(a, str): | ||
return Zarr(zarr_lookup(a)).mutate() | ||
path = dataset_lookup(a) | ||
|
||
if path and path.endswith(".zarr") or path.endswith(".zip"): | ||
return Zarr(path).mutate() | ||
|
||
if path and path.endswith(".vz"): | ||
metadata_path = os.path.join(path, "metadata.json") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like all the checking here is the responsibility of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some of this code is actually in that function - we should remove the duplication |
||
if os.path.exists(metadata_path): | ||
if "backend" not in load_any_dict_format(metadata_path): | ||
raise ValueError(f"Metadata for {path} does not contain 'backend' key") | ||
|
||
from anemoi.datasets.data.records import open_records_dataset | ||
|
||
return open_records_dataset(path) | ||
|
||
raise ValueError(f"Unsupported dataset path: {path}. ") | ||
|
||
if isinstance(a, PurePath): | ||
return _open(str(a)).mutate() | ||
|
@@ -587,6 +590,18 @@ def _open_dataset(*args: Any, **kwargs: Any) -> "Dataset": | |
|
||
assert len(sets) > 0, (args, kwargs) | ||
|
||
if "set_group" in kwargs: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is a "set_group"? Why does this result in returning |
||
from anemoi.datasets.data.records import FieldsRecords | ||
|
||
set_group = kwargs.pop("set_group") | ||
assert len(sets) == 1, "set_group can only be used with a single dataset" | ||
dataset = sets[0] | ||
|
||
from anemoi.datasets.data.dataset import Dataset | ||
|
||
if isinstance(dataset, Dataset): # Fields dataset | ||
return FieldsRecords(dataset, **kwargs, name=set_group).mutate() | ||
|
||
if len(sets) > 1: | ||
dataset, kwargs = _concat_or_join(sets, kwargs) | ||
return dataset._subset(**kwargs) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
from anemoi.utils.dates import frequency_to_timedelta | ||
from numpy.typing import NDArray | ||
|
||
from anemoi.datasets.data import MissingDateError | ||
from anemoi.datasets.data.dataset import Dataset | ||
from anemoi.datasets.data.dataset import FullIndex | ||
from anemoi.datasets.data.dataset import Shape | ||
|
@@ -36,7 +37,15 @@ class Padded(Forwards): | |
_after: int = 0 | ||
_inside: int = 0 | ||
|
||
def __init__(self, dataset: Dataset, start: str, end: str, frequency: str, reason: dict[str, Any]) -> None: | ||
def __init__( | ||
self, | ||
dataset: Dataset, | ||
start: str, | ||
end: str, | ||
frequency: str, | ||
reason: Dict[str, Any], | ||
padding: str, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NB: Padding added but not in docstring (could contain literal values if they're what we are expecting) |
||
) -> None: | ||
"""Create a padded subset of a dataset. | ||
|
||
Attributes: | ||
|
@@ -46,6 +55,7 @@ def __init__(self, dataset: Dataset, start: str, end: str, frequency: str, reaso | |
frequency (str): The frequency of the subset. | ||
reason (Dict[str, Any]): The reason for the padding. | ||
""" | ||
self.padding = padding | ||
|
||
self.reason = {k: v for k, v in reason.items() if v is not None} | ||
|
||
|
@@ -164,12 +174,20 @@ def _get_tuple(self, n: TupleIndex) -> NDArray[Any]: | |
return [self[i] for i in n] | ||
|
||
def empty_item(self): | ||
return self.dataset.empty_item() | ||
if self.padding == "empty": | ||
return self.dataset.empty_item() | ||
elif self.padding == "raise": | ||
raise ValueError("Padding is set to 'raise', cannot return an empty item.") | ||
elif self.padding == "missing": | ||
raise MissingDateError("Padding is set to 'missing'") | ||
assert False, self.padding | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be better to have a proper error message |
||
|
||
def get_aux(self, i: FullIndex) -> NDArray[np.timedelta64]: | ||
if self._i_out_of_range(i): | ||
arr = np.array([], dtype=np.float32) | ||
aux = arr, arr, arr | ||
lats = np.array([], dtype=np.float32) | ||
lons = lats | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be a copy? |
||
timedeltas = np.ones_like(lons, dtype="timedelta64[s]") * 0 | ||
aux = lats, lons, timedeltas | ||
else: | ||
aux = self.dataset.get_aux(i - self._before) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if df.empty
I guess you don't want to do the checks below? If so, should this bereturn
?