diff --git a/docs/src/whatsnew/latest.rst b/docs/src/whatsnew/latest.rst index 35359c4482..776e1a0217 100644 --- a/docs/src/whatsnew/latest.rst +++ b/docs/src/whatsnew/latest.rst @@ -96,6 +96,10 @@ This document explains the changes made to Iris for this release constraints are given. This was previously only implemented where one such constraint was given. (:issue:`6228`, :pull:`6754`) +#. `@stephenworsley`_ reduced the memory load for regridding and other operations + using :func:`~iris._lazy_data.map_complete_blocks` when the output chunks would + exceed the optimum chunksize set in dask. (:pull:`6730`) + 🔥 Deprecations =============== diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 992789ed90..c74da78d6b 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -626,6 +626,14 @@ def map_complete_blocks(src, func, dims, out_sizes, dtype, *args, **kwargs): -------- :func:`dask.array.map_blocks` : The function used for the mapping. + Notes + ----- + .. note: + + If the output chunks would be larger than the maximum chunksize set + in the dask config, the input is rechunked, where possible to + optimise the output chunksize. + """ data = None result = None @@ -640,17 +648,40 @@ def map_complete_blocks(src, func, dims, out_sizes, dtype, *args, **kwargs): else: data = src.lazy_data() + shape = list(src.shape) + if result is None and data is not None: # Ensure dims are not chunked in_chunks = list(data.chunks) for dim in dims: - in_chunks[dim] = src.shape[dim] - data = data.rechunk(in_chunks) + in_chunks[dim] = (src.shape[dim],) # Determine output chunks - out_chunks = list(data.chunks) + out_chunks = in_chunks.copy() for dim, size in zip(dims, out_sizes): - out_chunks[dim] = size + out_chunks[dim] = (size,) + shape[dim] = size + + # Ensure the chunksize of the output is a reasonable size. + max_outchunks = [max(chunk) for chunk in out_chunks] + df = tuple(i in dims for i in range(len(shape))) + dtype = np.dtype(dtype) + opt_outchunks = _optimum_chunksize( + max_outchunks, shape, dtype=dtype, dims_fixed=df + ) + for i, (chunk, max_out, opt_out) in enumerate( + zip(out_chunks, max_outchunks, opt_outchunks) + ): + if opt_out < max_out: + new_chunks = [] + for c in chunk: + new_chunks.extend((c // opt_out) * [opt_out]) + if chunk_end := c % opt_out: + new_chunks.append(chunk_end) + in_chunks[i] = tuple(new_chunks) + out_chunks[i] = tuple(new_chunks) + + data = data.rechunk(in_chunks) # Assume operation preserves mask. meta = da.utils.meta_from_array(data).astype(dtype) diff --git a/lib/iris/tests/unit/lazy_data/test_map_complete_blocks.py b/lib/iris/tests/unit/lazy_data/test_map_complete_blocks.py index be25ab6c09..6dc8286996 100644 --- a/lib/iris/tests/unit/lazy_data/test_map_complete_blocks.py +++ b/lib/iris/tests/unit/lazy_data/test_map_complete_blocks.py @@ -7,6 +7,7 @@ from unittest.mock import Mock, PropertyMock import dask.array as da +import dask.config import numpy as np from iris._lazy_data import is_lazy_data, map_complete_blocks @@ -134,3 +135,21 @@ def test_multidimensional_input(self): ) assert is_lazy_data(result) assert_array_equal(result.compute(), array + 1) + + def test_rechunking(self): + # Choose a dask array with an irregularly chunked dimension to be rechunked. + lazy_array = da.ones((5, 10, 9, 10), chunks=(2, 10, 5, 5)) + cube, _ = create_mock_cube(lazy_array) + + # Reduce the optimum dask chunksize. + with dask.config.set({"array.chunk-size": "32KiB"}): + result = map_complete_blocks( + cube, self.func, dims=(1, 3), out_sizes=(30, 40), dtype=lazy_array.dtype + ) + assert is_lazy_data(result) + expected_chunksize = (1, 30, 2, 40) + assert result.chunksize == expected_chunksize + # Note that one chunk is irregularly rechunked and the other isn't. + assert result.chunks[0] == (1, 1, 1, 1, 1) + # split from the original chunks of (5, 4) + assert result.chunks[2] == (2, 2, 1, 2, 2)