Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5fdeab9
update PydapArrayWrapper to support backend batching
Mikejmnez Aug 12, 2025
baa23e3
update PydapDataStore to use backend logic in dap4 to batch variables…
Mikejmnez Aug 12, 2025
52fd3f9
pydap-server it not necessary
Mikejmnez Aug 12, 2025
f99f400
set `batch=False` as default
Mikejmnez Aug 12, 2025
7a0e2c5
set `batch=False` as default in datatree
Mikejmnez Aug 12, 2025
827101a
set `batch=False` as default in open groups as dict
Mikejmnez Aug 12, 2025
7515616
for flaky, install pydap from repo for now
Mikejmnez Aug 12, 2025
ee03ed6
initial tests - quantify cached url
Mikejmnez Aug 13, 2025
b3b77ab
adds tests to datatree backend to assert multiple dimensions download…
Mikejmnez Aug 13, 2025
3ff1a9a
update testing to show number of download urls
Mikejmnez Aug 13, 2025
ccd7954
simplified logic
Mikejmnez Aug 13, 2025
c244b3a
specify cached session debug name to actually cache urls
Mikejmnez Aug 13, 2025
25b7092
fix for mypy
Mikejmnez Aug 13, 2025
bac007b
user visible changes on `whats-new.rst`
Mikejmnez Aug 13, 2025
844e580
impose sorted to `get_dimensions` method
Mikejmnez Aug 13, 2025
7461489
reformat `whats-new.rst`
Mikejmnez Aug 13, 2025
f0237a3
revert to install pydap from conda and not from repo
Mikejmnez Aug 13, 2025
037ee09
expose checksum as user kwarg
Mikejmnez Aug 13, 2025
4d6e33f
include `checksums` optional argument in `whats-new`
Mikejmnez Aug 13, 2025
5f2adfb
update to newest release of pydap via pip until conda install is avai…
Mikejmnez Aug 13, 2025
84305e4
Merge branch 'main' into pydap4_scale
Mikejmnez Aug 13, 2025
6efb311
use requests_cache session with retry-params when 500 errors occur
Mikejmnez Aug 13, 2025
511da84
update env yml file to use new pydap release via conda
Mikejmnez Aug 14, 2025
5ed9d4a
Merge branch 'main' into pydap4_scale
Mikejmnez Aug 14, 2025
b3c77a0
Merge branch 'main' into pydap4_scale
Mikejmnez Aug 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ci/requirements/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ dependencies:
- pre-commit
- pyarrow # pandas raises a deprecation warning without this, breaking doctests
- pydap
- pydap-server
- pytest
- pytest-asyncio
- pytest-cov
Expand Down
3 changes: 3 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ New Features
By `Matthew Willson <https://github.com/mjwillson>`_.
- Added exception handling for invalid files in :py:func:`open_mfdataset`. (:issue:`6736`)
By `Pratiman Patel <https://github.com/pratiman-91>`_.
- Improved ``pydap`` backend behavior and performance when using :py:func:`open_dataset`, :py:func:`open_datatree` when downloading dap4 (opendap) data (:issue:`10628`, :pull:`10629`).
``batch=True|False`` is a new ``backend_kwarg`` that further enables downloading multiple arrays in single response. In addition ``checksums`` is added as optional argument to be passed to ``pydap`` backend.
By `Miguel Jimenez-Urias <https://github.com/Mikejmnez>`_.

Breaking changes
~~~~~~~~~~~~~~~~
Expand Down
128 changes: 113 additions & 15 deletions xarray/backends/pydap_.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import warnings
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -35,8 +36,11 @@


class PydapArrayWrapper(BackendArray):
def __init__(self, array):
def __init__(self, array, batch=False, cache=None, checksums=True):
self.array = array
self._batch = batch
self._cache = cache
self._checksums = checksums

@property
def shape(self) -> tuple[int, ...]:
Expand All @@ -52,13 +56,27 @@ def __getitem__(self, key):
)

def _getitem(self, key):
result = robust_getitem(self.array, key, catch=ValueError)
# in some cases, pydap doesn't squeeze axes automatically like numpy
result = np.asarray(result)
if self.array.id in self._cache.keys():
# safely avoid re-downloading some coordinates
result = self._cache[self.array.id]
Comment on lines +59 to +61
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Xarray should already avoid downloading 1D coordinates multiple times, because coordinates are saved in memory as NumPy arrays and pandas indexes. If this is not the case, please file a bug report to discuss :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see this was discussed earlier, and seems to be the same issue as #10560.

I would prefer a more general solution rather that something specifically for pydap, which will be harder to maintain.

elif self._batch and hasattr(self.array, "dataset"):
# this are both True only for pydap>3.5.5
from pydap.lib import resolve_batch_for_all_variables

dataset = self.array.dataset
resolve_batch_for_all_variables(self.array, key, checksums=self._checksums)
result = np.asarray(
dataset._current_batch_promise.wait_for_result(self.array.id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to avoid private APIs here?

)
else:
result = robust_getitem(self.array, key, catch=ValueError)
try:
result = np.asarray(result.data)
except AttributeError:
result = np.asarray(result)
Comment on lines +73 to +76
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is worth some explanation. Did a change in pydap break np.asarray(result) or is there some reason why it is not preferred?

axis = tuple(n for n, k in enumerate(key) if isinstance(k, integer_types))
if result.ndim + len(axis) != self.array.ndim and axis:
result = np.squeeze(result, axis)

return result


Expand All @@ -81,7 +99,15 @@ class PydapDataStore(AbstractDataStore):
be useful if the netCDF4 library is not available.
"""

def __init__(self, dataset, group=None):
def __init__(
self,
dataset,
group=None,
session=None,
batch=False,
protocol=None,
checksums=True,
):
"""
Parameters
----------
Expand All @@ -91,6 +117,11 @@ def __init__(self, dataset, group=None):
"""
self.dataset = dataset
self.group = group
self._batch = batch
self._batch_done = False
self._array_cache = {} # holds 1D dimension data
self._protocol = protocol
self._checksums = checksums # true by default

@classmethod
def open(
Expand All @@ -103,6 +134,8 @@ def open(
timeout=None,
verify=None,
user_charset=None,
batch=False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have the default be batch=None, which means "use batching if possible"? This would expose these benefits to more users.

checksums=True,
):
from pydap.client import open_url
from pydap.net import DEFAULT_TIMEOUT
Expand All @@ -117,6 +150,7 @@ def open(
DeprecationWarning,
)
output_grid = False # new default behavior

kwargs = {
"url": url,
"application": application,
Expand All @@ -133,13 +167,28 @@ def open(
# pydap dataset
dataset = url.ds
args = {"dataset": dataset}
args["checksums"] = checksums
Comment on lines 169 to +170
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please specify dataset and checksums with the same syntax

if group:
# only then, change the default
args["group"] = group
if url.startswith(("http", "dap2")):
args["protocol"] = "dap2"
elif url.startswith("dap4"):
args["protocol"] = "dap4"
if batch:
if args["protocol"] == "dap2":
warnings.warn(
f"`batch={batch}` is currently only compatible with the `DAP4` "
"protocol. Make sue the OPeNDAP server implements the `DAP4` "
"protocol and then replace the scheme of the url with `dap4` "
"to make use of it. Setting `batch=False`.",
stacklevel=2,
)
Comment on lines +179 to +185
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, if a user explicitly specifies an invalid argument, the preferred pattern is to raise an exception, rather than warning and ignoring what the user asked for.

Likely pydap already does this? Generally, we re-raise errors from Xarray only when we can add Xarray-specific details that are helpful to users.

else:
# only update if dap4
args["batch"] = batch
return cls(**args)

def open_store_variable(self, var):
data = indexing.LazilyIndexedArray(PydapArrayWrapper(var))
try:
dimensions = [
dim.split("/")[-1] if dim.startswith("/") else dim for dim in var.dims
Expand All @@ -148,6 +197,25 @@ def open_store_variable(self, var):
# GridType does not have a dims attribute - instead get `dimensions`
# see https://github.com/pydap/pydap/issues/485
dimensions = var.dimensions
if (
self._protocol == "dap4"
and var.name in dimensions
and hasattr(var, "dataset") # only True for pydap>3.5.5
):
if not var.dataset._batch_mode:
# for dap4, always batch all dimensions at once
var.dataset.enable_batch_mode()
data_array = self._get_data_array(var)
data = indexing.LazilyIndexedArray(data_array)
if not self._batch and var.dataset._batch_mode:
# if `batch=False``, restore it for all other variables
var.dataset.disable_batch_mode()
else:
# all non-dimension variables
data = indexing.LazilyIndexedArray(
PydapArrayWrapper(var, self._batch, self._array_cache, self._checksums)
)

return Variable(dimensions, data, var.attributes)

def get_variables(self):
Expand All @@ -165,6 +233,7 @@ def get_variables(self):
# check the key is not a BaseType or GridType
if not isinstance(self.ds[var], GroupType)
]

return FrozenDict((k, self.open_store_variable(self.ds[k])) for k in _vars)

def get_attrs(self):
Expand All @@ -176,18 +245,35 @@ def get_attrs(self):
"libdap",
"invocation",
"dimensions",
"path",
"Maps",
)
attrs = self.ds.attributes
list(map(attrs.pop, opendap_attrs, [None] * 6))
attrs = dict(self.ds.attributes)
list(map(attrs.pop, opendap_attrs, [None] * 8))
return Frozen(attrs)

def get_dimensions(self):
return Frozen(self.ds.dimensions)
return Frozen(sorted(self.ds.dimensions))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To potentially address the issues with dimensions in Datatree, and the lat/lon dimensions being inconsistently ordered, I added this sorted to the dimensions list that the backend gets from the Pydap dataset directly. Hopefully this little fix will make it go away, but I will continue checking this issue locally and after merging main into this PR (it has not failed once yet! knocks on wood)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only dataset level dimensions, not variable level dimensions.

At the dataset level, dimension order doesn't really matter, so I doubt this is going to fix the issue, unfortunately.


@property
def ds(self):
return get_group(self.dataset, self.group)

def _get_data_array(self, var):
"""gets dimension data all at once, storing the numpy
arrays within a cached dictionary
"""
from pydap.lib import get_batch_data

if not self._batch_done or var.id not in self._array_cache:
# store all dim data into a dict for reuse
self._array_cache = get_batch_data(
var.parent, self._array_cache, self._checksums
)
self._batch_done = True

return self._array_cache[var.id]


class PydapBackendEntrypoint(BackendEntrypoint):
"""
Expand Down Expand Up @@ -231,6 +317,8 @@ def open_dataset(
timeout=None,
verify=None,
user_charset=None,
batch=False,
checksums=True,
) -> Dataset:
store = PydapDataStore.open(
url=filename_or_obj,
Expand All @@ -241,6 +329,8 @@ def open_dataset(
timeout=timeout,
verify=verify,
user_charset=user_charset,
batch=batch,
checksums=checksums,
)
store_entrypoint = StoreBackendEntrypoint()
with close_on_error(store):
Expand Down Expand Up @@ -273,6 +363,8 @@ def open_datatree(
timeout=None,
verify=None,
user_charset=None,
batch=False,
checksums=True,
) -> DataTree:
groups_dict = self.open_groups_as_dict(
filename_or_obj,
Expand All @@ -285,10 +377,12 @@ def open_datatree(
decode_timedelta=decode_timedelta,
group=group,
application=None,
session=None,
timeout=None,
verify=None,
user_charset=None,
session=session,
timeout=timeout,
verify=application,
user_charset=user_charset,
batch=batch,
checksums=checksums,
)

return datatree_from_dict_with_io_cleanup(groups_dict)
Expand All @@ -310,6 +404,8 @@ def open_groups_as_dict(
timeout=None,
verify=None,
user_charset=None,
batch=False,
checksums=True,
) -> dict[str, Dataset]:
from xarray.core.treenode import NodePath

Expand All @@ -321,6 +417,8 @@ def open_groups_as_dict(
timeout=timeout,
verify=verify,
user_charset=user_charset,
batch=batch,
checksums=checksums,
)

# Check for a group and make it a parent if it exists
Expand Down
70 changes: 70 additions & 0 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -6275,6 +6275,76 @@ def test_session(self) -> None:
)


@requires_pydap
@network
@pytest.mark.parametrize("protocol", ["dap2", "dap4"])
@pytest.mark.parametrize("batch", [False, True])
def test_batchdap4_downloads(protocol, batch) -> None:
"""Test that in dap4, all dimensions are downloaded at once"""
import pydap
from pydap.net import create_session

_version_ = Version(pydap.__version__)
# Create a session with pre-set params in pydap backend, to cache urls
session = create_session(use_cache=True, cache_kwargs={"cache_name": "debug"})
session.cache.clear()
url = "https://test.opendap.org/opendap/hyrax/data/nc/coads_climatology.nc"

if protocol == "dap4":
ds = open_dataset(
url.replace("https", protocol),
engine="pydap",
session=session,
decode_times=False,
batch=batch,
)
if _version_ > Version("3.5.5"):
# total downloads are:
# 1 dmr + 1 dap (dimensions)
assert len(session.cache.urls()) == 2
# now load the rest of the variables
ds.load()
if batch:
# all non-dimensions are downloaded in a single https requests
assert len(session.cache.urls()) == 2 + 1
if not batch:
# each non-dimension array is downloaded with an individual
# https requests
assert len(session.cache.urls()) == 2 + 4
else:
assert len(session.cache.urls()) == 4
ds.load()
assert len(session.cache.urls()) == 4 + 4
elif protocol == "dap2":
ds = open_dataset(
url.replace("https", protocol),
engine="pydap",
session=session,
decode_times=False,
)
# das + dds + 3 dods urls
assert len(session.cache.urls()) == 5


@requires_pydap
@network
def test_batch_warnswithdap2() -> None:
from pydap.net import create_session

# Create a session with pre-set retry params in pydap backend, to cache urls
session = create_session(use_cache=True, cache_kwargs={"cache_name": "debug"})
session.cache.clear()

url = "dap2://test.opendap.org/opendap/hyrax/data/nc/coads_climatology.nc"
with pytest.warns(UserWarning):
open_dataset(
url, engine="pydap", session=session, batch=True, decode_times=False
)

# no batching is supported here
assert len(session.cache.urls()) == 5


class TestEncodingInvalid:
def test_extract_nc4_variable_encoding(self) -> None:
var = xr.Variable(("x",), [1, 2, 3], {}, {"foo": "bar"})
Expand Down
19 changes: 18 additions & 1 deletion xarray/tests/test_backends_datatree.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import numpy as np
import pytest
from packaging.version import Version

import xarray as xr
from xarray.backends.api import open_datatree, open_groups
Expand Down Expand Up @@ -511,7 +512,16 @@ def test_inherited_coords(self, url=simplegroup_datatree_url) -> None:
│ Temperature (time, Z, Y, X) float32 ...
| Salinity (time, Z, Y, X) float32 ...
"""
tree = open_datatree(url, engine=self.engine)
import pydap
from pydap.net import create_session

# Create a session with pre-set retry params in pydap backend, to cache urls
session = create_session(use_cache=True, cache_kwargs={"cache_name": "debug"})
session.cache.clear()

_version_ = Version(pydap.__version__)

tree = open_datatree(url, engine=self.engine, session=session)
assert set(tree.dims) == {"time", "Z", "nv"}
assert tree["/SimpleGroup"].coords["time"].dims == ("time",)
assert tree["/SimpleGroup"].coords["Z"].dims == ("Z",)
Expand All @@ -522,6 +532,13 @@ def test_inherited_coords(self, url=simplegroup_datatree_url) -> None:
list(expected.dims) + ["Z", "nv"]
)

if _version_ > Version("3.5.5"):
# Total downloads are: 1 dmr, + 1 dap url for all dimensions across groups
assert len(session.cache.urls()) == 2
else:
# 1 dmr + 1 dap url per dimension (total there are 4 dimension arrays)
assert len(session.cache.urls()) == 5

def test_open_groups_to_dict(self, url=all_aligned_child_nodes_url) -> None:
aligned_dict_of_datasets = open_groups(url, engine=self.engine)
aligned_dt = DataTree.from_dict(aligned_dict_of_datasets)
Expand Down
Loading