diff --git a/changes/3207.bugfix.rst b/changes/3207.bugfix.rst new file mode 100644 index 0000000000..e134835ce8 --- /dev/null +++ b/changes/3207.bugfix.rst @@ -0,0 +1,15 @@ +- This pull request resolves the issue of deadlocks and indefinite hangs when + opening Zarr v3 arrays on synchronous fsspec filesystems, by implementing a + fallback to sequential reads for non-concurrency-safe filesystems, ensuring + robust metadata retrieval without sacrificing performance for safe + filesystems. Furthermore ``Store._get_many`` was modified to retrieve objects + concurrently from storage. The previous implementation was sequential, + awaiting each ``self.get(*req)`` before proceeding, contrary to the + docstring. +- Introduced ``StorePath.get_many``, mimicing the behaviour of `StorePath.get`. +- Use ``Store._get_many`` and ``StorePath.get_many`` in ``get_array_metadata``. +- Implemented ``FsspecStore._get_many`` to conditionally use ``asyncio.gather`` + based on the concurrency safety of the underlying file system, enhancing + compatibility with synchronous file systems by avoiding deadlocks when + accessing metadata concurrently. Adding tests ``LockableFileSystem`` to test + with async/sync behavior. diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index 1fbdb3146c..aa20466455 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from asyncio import gather +from asyncio import as_completed, gather from dataclasses import dataclass from itertools import starmap from typing import TYPE_CHECKING, Protocol, runtime_checkable @@ -414,8 +414,17 @@ async def _get_many( that objects will be retrieved in the order in which they were requested, so this method yields tuple[str, Buffer | None] instead of just Buffer | None """ - for req in requests: - yield (req[0], await self.get(*req)) + + async def _get_with_name( + key: str, prototype: BufferPrototype, byte_range: ByteRequest | None + ) -> tuple[str, Buffer | None]: + value = await self.get(key, prototype, byte_range) + return key, value + + tasks = [_get_with_name(*req) for req in requests] + for completed in as_completed(tasks): + task = await completed + yield task async def getsize(self, key: str) -> int: """ diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 78dddf3669..308bf2679f 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -211,10 +211,10 @@ async def get_array_metadata( store_path: StorePath, zarr_format: ZarrFormat | None = 3 ) -> dict[str, JSON]: if zarr_format == 2: - zarray_bytes, zattrs_bytes = await gather( - (store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype), - (store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype), - ) + requests = [(key, default_buffer_prototype(), None) for key in [ZARRAY_JSON, ZATTRS_JSON]] + retrieved_buffers = {key: value async for key, value in store_path.get_many(requests)} + zarray_bytes, zattrs_bytes = tuple(retrieved_buffers.get(req[0]) for req in requests) + if zarray_bytes is None: raise FileNotFoundError(store_path) elif zarr_format == 3: @@ -222,11 +222,14 @@ async def get_array_metadata( if zarr_json_bytes is None: raise FileNotFoundError(store_path) elif zarr_format is None: - zarr_json_bytes, zarray_bytes, zattrs_bytes = await gather( - (store_path / ZARR_JSON).get(prototype=cpu_buffer_prototype), - (store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype), - (store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype), + requests = [ + (key, default_buffer_prototype(), None) for key in [ZARR_JSON, ZARRAY_JSON, ZATTRS_JSON] + ] + retrieved_buffers = {key: value async for key, value in store_path.get_many(requests)} + zarr_json_bytes, zarray_bytes, zattrs_bytes = tuple( + retrieved_buffers.get(req[0]) for req in requests ) + if zarr_json_bytes is not None and zarray_bytes is not None: # warn and favor v3 msg = f"Both zarr.json (Zarr format 3) and .zarray (Zarr format 2) metadata objects exist at {store_path}. Zarr v3 will be used." @@ -1445,7 +1448,6 @@ async def _save_metadata(self, metadata: ArrayMetadata, ensure_parents: bool = F ).items() ] ) - await gather(*awaitables) async def _set_selection( diff --git a/src/zarr/core/group.py b/src/zarr/core/group.py index a398aa01aa..11861213a0 100644 --- a/src/zarr/core/group.py +++ b/src/zarr/core/group.py @@ -513,19 +513,23 @@ async def open( consolidated_key = use_consolidated if zarr_format == 2: - paths = [store_path / ZGROUP_JSON, store_path / ZATTRS_JSON] + requests = [ + (key, default_buffer_prototype(), None) for key in [ZGROUP_JSON, ZATTRS_JSON] + ] if use_consolidated or use_consolidated is None: - paths.append(store_path / consolidated_key) + requests.append((consolidated_key, default_buffer_prototype(), None)) - zgroup_bytes, zattrs_bytes, *rest = await asyncio.gather( - *[path.get() for path in paths] + retrieved_buffers = {key: value async for key, value in store_path.get_many(requests)} + zgroup_bytes, zattrs_bytes = ( + retrieved_buffers[ZGROUP_JSON], + retrieved_buffers[ZATTRS_JSON], ) + if zgroup_bytes is None: raise FileNotFoundError(store_path) if use_consolidated or use_consolidated is None: - maybe_consolidated_metadata_bytes = rest[0] - + maybe_consolidated_metadata_bytes = retrieved_buffers[consolidated_key] else: maybe_consolidated_metadata_bytes = None @@ -534,17 +538,18 @@ async def open( if zarr_json_bytes is None: raise FileNotFoundError(store_path) elif zarr_format is None: + requests = [ + (key, default_buffer_prototype(), None) + for key in [ZARR_JSON, ZGROUP_JSON, ZATTRS_JSON, consolidated_key] + ] + retrieved_buffers = {key: value async for key, value in store_path.get_many(requests)} ( zarr_json_bytes, zgroup_bytes, zattrs_bytes, maybe_consolidated_metadata_bytes, - ) = await asyncio.gather( - (store_path / ZARR_JSON).get(), - (store_path / ZGROUP_JSON).get(), - (store_path / ZATTRS_JSON).get(), - (store_path / str(consolidated_key)).get(), - ) + ) = tuple(retrieved_buffers.get(req[0]) for req in requests) + if zarr_json_bytes is not None and zgroup_bytes is not None: # warn and favor v3 msg = f"Both zarr.json (Zarr format 3) and .zgroup (Zarr format 2) metadata objects exist at {store_path}. Zarr format 3 will be used." @@ -3476,10 +3481,14 @@ async def _read_metadata_v2(store: Store, path: str) -> ArrayV2Metadata | GroupM """ # TODO: consider first fetching array metadata, and only fetching group metadata when we don't # find an array - zarray_bytes, zgroup_bytes, zattrs_bytes = await asyncio.gather( - store.get(_join_paths([path, ZARRAY_JSON]), prototype=default_buffer_prototype()), - store.get(_join_paths([path, ZGROUP_JSON]), prototype=default_buffer_prototype()), - store.get(_join_paths([path, ZATTRS_JSON]), prototype=default_buffer_prototype()), + requests = [ + (_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None), + (_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None), + (_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None), + ] + retrieved_buffers = {key: value async for key, value in store._get_many(requests)} + zarray_bytes, zgroup_bytes, zattrs_bytes = tuple( + retrieved_buffers.get(req[0]) for req in requests ) if zattrs_bytes is None: diff --git a/src/zarr/storage/_common.py b/src/zarr/storage/_common.py index e25fa28424..b90078c197 100644 --- a/src/zarr/storage/_common.py +++ b/src/zarr/storage/_common.py @@ -2,6 +2,7 @@ import importlib.util import json +from asyncio import gather from pathlib import Path from typing import TYPE_CHECKING, Any, Literal, Self, TypeAlias @@ -27,6 +28,8 @@ FSMap = None if TYPE_CHECKING: + from collections.abc import AsyncGenerator, Iterable + from zarr.core.buffer import BufferPrototype @@ -163,6 +166,25 @@ async def get( prototype = default_buffer_prototype() return await self.store.get(self.path, prototype=prototype, byte_range=byte_range) + async def get_many( + self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]] + ) -> AsyncGenerator[tuple[str, Buffer | None], None]: + """ + Read multiple bytes from the store in order of the provided path_components. + + Parameters + ---------- + requests : Iterable[tuple[str, BufferPrototype, ByteRequest | None]] + + Yields + ------- + tuple[str, Buffer | None] + """ + path_component_dict = {(self / req[0]).path: req[0] for req in requests} + complete_requests = [((self / req[0]).path, *req[1:]) for req in requests] + async for result in self.store._get_many(complete_requests): + yield (path_component_dict[result[0]], *result[1:]) + async def set(self, value: Buffer, byte_range: ByteRequest | None = None) -> None: """ Write bytes to the store. diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index e169eededc..1993e51105 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import json import warnings from contextlib import suppress @@ -18,7 +19,7 @@ from zarr.storage._common import _dereference_path if TYPE_CHECKING: - from collections.abc import AsyncIterator, Iterable + from collections.abc import AsyncGenerator, AsyncIterator, Iterable from fsspec import AbstractFileSystem from fsspec.asyn import AsyncFileSystem @@ -326,6 +327,17 @@ async def get( else: return value + async def _get_many( + self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]] + ) -> AsyncGenerator[tuple[str, Buffer | None], None]: + if getattr(self.fs, "asynchronous", True): + async for result in super()._get_many(requests): + yield result + else: + for key, prototype, byte_range in requests: + value = await self.get(key, prototype, byte_range) + yield (key, value) + async def set( self, key: str, diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index 026b25f8fc..f5baa4eb87 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -1,8 +1,11 @@ from __future__ import annotations +import asyncio import json import os import re +import warnings +from itertools import cycle from typing import TYPE_CHECKING, Any import numpy as np @@ -42,6 +45,7 @@ ] fsspec = pytest.importorskip("fsspec") +AsyncFileSystem = pytest.importorskip("fsspec.asyn").AsyncFileSystem s3fs = pytest.importorskip("s3fs") requests = pytest.importorskip("requests") moto_server = pytest.importorskip("moto.moto_server.threaded_moto_server") @@ -440,3 +444,103 @@ async def test_with_read_only_auto_mkdir(tmp_path: Path) -> None: store_w = FsspecStore.from_url(f"file://{tmp_path}", storage_options={"auto_mkdir": False}) _ = store_w.with_read_only() + + +class LockableFileSystem(AsyncFileSystem): + """ + A mock file system that simulates asynchronous and synchronous behaviors with artificial delays. + """ + + def __init__( + self, + asynchronous: bool, + lock: bool | None = None, + delays: tuple[float, ...] | None = None, + ) -> None: + if delays is None: + delays = ( + 0.03, + 0.01, + ) + lock = lock if lock is not None else not asynchronous + + # self.asynchronous = asynchronous + self.lock = asyncio.Lock() if lock else None + self.delays = cycle(delays) + self.async_impl = True + + super().__init__(asynchronous=asynchronous) + + async def _check_active(self) -> None: + if self.lock and self.lock.locked(): + raise RuntimeError("Concurrent requests!") + + async def _cat_file(self, path, start=None, end=None) -> bytes: + await self._simulate_io_operation(path) + return self.get_data(path) + + async def _await_io(self) -> None: + await asyncio.sleep(next(self.delays)) + + async def _simulate_io_operation(self, path) -> None: + if self.lock: + await self._check_active() + async with self.lock: + await self._await_io() + else: + await self._await_io() + + def get_store(self, path: str) -> FsspecStore: + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=UserWarning) + return FsspecStore(fs=self, path=path) + + @staticmethod + def get_data(key: str) -> bytes: + return f"{key}_data".encode() + + +@pytest.mark.asyncio +class TestLockableFSSPECFileSystem: + @pytest.fixture(autouse=True) + async def setup(self): + self.path = "root" + self.store_async = LockableFileSystem(asynchronous=True).get_store(path=self.path) + self.store_sync = LockableFileSystem(asynchronous=False).get_store(path=self.path) + + def get_requests_and_true_results(self, path_components=("a", "b")): + true_results = [ + (component, LockableFileSystem.get_data(f"{self.path}/{component}")) + for component in path_components + ] + requests = [(component, default_buffer_prototype(), None) for component in path_components] + return requests, true_results + + async def test_get_many_asynchronous_fs(self): + requests, true_results = self.get_requests_and_true_results(("a", "b", "c")) + + results = [] + async for k, v in self.store_async._get_many(requests): + results.append((k, v.to_bytes() if v else None)) + + results_ordered = sorted(results, key=lambda x: x[0]) + assert results_ordered == true_results + + async def test_get_many_synchronous_fs(self): + requests, true_results = self.get_requests_and_true_results() + + results = [] + async for k, v in self.store_sync._get_many(requests): + results.append((k, v.to_bytes() if v else None)) + # In the synchronous case, results should be in the same order as requests + + assert results == true_results + + + async def test_asynchronous_locked_fs_raises(self): + store = LockableFileSystem(asynchronous=True, lock=True).get_store(path="root") + requests, _ = self.get_requests_and_true_results() + + with pytest.raises(RuntimeError, match="Concurrent requests!"): + async for _, _ in store._get_many(requests): + pass