diff --git a/collection_manager/collection_manager/resources/dataset_config_template.yml b/collection_manager/collection_manager/resources/dataset_config_template.yml index d35a527..70a7643 100644 --- a/collection_manager/collection_manager/resources/dataset_config_template.yml +++ b/collection_manager/collection_manager/resources/dataset_config_template.yml @@ -1,5 +1,6 @@ granule: resource: {{granule}} +variable: {{variable}} slicer: name: sliceFileByStepSize dimension_step_sizes: @@ -7,11 +8,6 @@ slicer: lat: 30 lon: 30 processors: - - name: GridReadingProcessor - latitude: lat - longitude: lon - time: time - variable_to_read: {{variable}} - name: emptyTileFilter - name: kelvinToCelsius - name: tileSummary diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py index c648b99..fdd03e5 100644 --- a/granule_ingester/granule_ingester/exceptions/Exceptions.py +++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py @@ -6,7 +6,11 @@ class PipelineRunningError(Exception): pass -class TileProcessingError(Exception): +class TileProcessingError(PipelineRunningError): + pass + + +class GranuleLoadingError(PipelineRunningError): pass @@ -21,6 +25,7 @@ class RabbitMQLostConnectionError(LostConnectionError): class CassandraLostConnectionError(LostConnectionError): pass + class SolrLostConnectionError(LostConnectionError): pass diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py index ea0969f..f2429b1 100644 --- a/granule_ingester/granule_ingester/exceptions/__init__.py +++ b/granule_ingester/granule_ingester/exceptions/__init__.py @@ -1,11 +1,8 @@ -from .Exceptions import CassandraFailedHealthCheckError -from .Exceptions import CassandraLostConnectionError -from .Exceptions import FailedHealthCheckError -from .Exceptions import LostConnectionError -from .Exceptions import PipelineBuildingError -from .Exceptions import PipelineRunningError -from .Exceptions import RabbitMQFailedHealthCheckError -from .Exceptions import RabbitMQLostConnectionError -from .Exceptions import SolrFailedHealthCheckError -from .Exceptions import SolrLostConnectionError -from .Exceptions import TileProcessingError +from .Exceptions import (CassandraFailedHealthCheckError, + CassandraLostConnectionError, FailedHealthCheckError, + GranuleLoadingError, LostConnectionError, + PipelineBuildingError, PipelineRunningError, + RabbitMQFailedHealthCheckError, + RabbitMQLostConnectionError, + SolrFailedHealthCheckError, SolrLostConnectionError, + TileProcessingError) diff --git a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py index c28ffbb..6377de0 100644 --- a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py +++ b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py @@ -21,6 +21,8 @@ import aioboto3 import xarray as xr +from granule_ingester.exceptions import GranuleLoadingError + logger = logging.getLogger(__name__) @@ -52,7 +54,12 @@ async def open(self) -> (xr.Dataset, str): raise RuntimeError("Granule path scheme '{}' is not supported.".format(resource_url.scheme)) granule_name = os.path.basename(self._resource) - return xr.open_dataset(file_path, lock=False), granule_name + try: + return xr.open_dataset(file_path, lock=False), granule_name + except FileNotFoundError: + raise GranuleLoadingError(f"The granule file {self._resource} does not exist.") + except Exception: + raise GranuleLoadingError(f"The granule {self._resource} is not a valid NetCDF file.") @staticmethod async def _download_s3_file(url: str): diff --git a/granule_ingester/granule_ingester/pipeline/Modules.py b/granule_ingester/granule_ingester/pipeline/Modules.py index 2cf2245..689b3b1 100644 --- a/granule_ingester/granule_ingester/pipeline/Modules.py +++ b/granule_ingester/granule_ingester/pipeline/Modules.py @@ -1,7 +1,10 @@ -from granule_ingester.processors import * -from granule_ingester.processors.reading_processors import * -from granule_ingester.slicers import * -from granule_ingester.granule_loaders import * +from granule_ingester.granule_loaders import GranuleLoader +from granule_ingester.processors import (EmptyTileFilter, GenerateTileId, + KelvinToCelsius, + TileSummarizingProcessor) +from granule_ingester.processors.reading_processors import ( + EccoReadingProcessor, GridReadingProcessor) +from granule_ingester.slicers import SliceFileByStepSize modules = { "granule": GranuleLoader, @@ -11,5 +14,5 @@ "GridReadingProcessor": GridReadingProcessor, "tileSummary": TileSummarizingProcessor, "emptyTileFilter": EmptyTileFilter, - "kelvinToCelsius": KelvinToCelsius + "kelvinToCelsius": KelvinToCelsius, } diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index dabca81..86bc617 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -26,6 +26,7 @@ from aiomultiprocess.types import ProxyException from granule_ingester.exceptions import PipelineBuildingError from granule_ingester.granule_loaders import GranuleLoader +from granule_ingester.processors import ReadingProcessorSelector from granule_ingester.pipeline.Modules import \ modules as processor_module_mappings from granule_ingester.processors.TileProcessor import TileProcessor @@ -90,6 +91,7 @@ class Pipeline: def __init__(self, granule_loader: GranuleLoader, slicer: TileSlicer, + variable_name: str, data_store_factory, metadata_store_factory, tile_processors: List[TileProcessor], @@ -97,6 +99,7 @@ def __init__(self, self._granule_loader = granule_loader self._tile_processors = tile_processors self._slicer = slicer + self._variable_name = variable_name self._data_store_factory = data_store_factory self._metadata_store_factory = metadata_store_factory self._max_concurrency = max_concurrency @@ -141,6 +144,7 @@ def _build_pipeline(cls, slicer_config = config['slicer'] slicer = cls._parse_module(slicer_config, module_mappings) + variable_name = config['variable'] tile_processors = [] for processor_config in config['processors']: @@ -149,6 +153,7 @@ def _build_pipeline(cls, return cls(granule_loader, slicer, + variable_name, data_store_factory, metadata_store_factory, tile_processors, @@ -174,9 +179,14 @@ async def run(self): async with self._granule_loader as (dataset, granule_name): start = time.perf_counter() + reading_processor = ReadingProcessorSelector(dataset, self._variable_name).get_reading_processor() + tile_processors = [reading_processor, *self._tile_processors] + logger.info(f"Using {type(reading_processor)} to process granule {granule_name}.") + shared_memory = self._manager.Namespace() + async with Pool(initializer=_init_worker, - initargs=(self._tile_processors, + initargs=(tile_processors, dataset, self._data_store_factory, self._metadata_store_factory, @@ -184,7 +194,7 @@ async def run(self): maxtasksperchild=self._max_concurrency, childconcurrency=self._max_concurrency) as pool: serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in - self._slicer.generate_tiles(dataset, granule_name)] + self._slicer.generate_tiles(dataset, self._variable_name, granule_name)] # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that # a queue can't have more than 2**15-1 tasks. So, we have to batch it. for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE): diff --git a/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py b/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py new file mode 100644 index 0000000..a99f8be --- /dev/null +++ b/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py @@ -0,0 +1,87 @@ +import xarray as xr +from typing import List +import re + +from granule_ingester.processors.reading_processors import (TileReadingProcessor, + GridReadingProcessor, + EccoReadingProcessor, + SwathReadingProcessor, + TimeSeriesReadingProcessor) + + +GRID_PROCESSORS = [GridReadingProcessor, EccoReadingProcessor, SwathReadingProcessor, TimeSeriesReadingProcessor] + + +class ReadingProcessorSelector: + def __init__(self, dataset: xr.Dataset, variable: str, *args, **kwargs): + self._dataset = dataset + self._variable = variable + + def get_reading_processor(self): + lat, lon, time = self.detect_dimensions() + processor_class = self.detect_grid_type(lat=lat, lon=lon, time=time, processor_types=GRID_PROCESSORS) + return processor_class(variable_to_read=self._variable, latitude=lat, longitude=lon, time=time) + + def detect_grid_type(self, + lat: str, + lon: str, + time: str, + processor_types: List[TileReadingProcessor]): + bids = [] + for processor_type in processor_types: + bid = processor_type.bid(dataset=self._dataset, + variable=self._variable, + lat=lat, + lon=lon, + time=time) + bids.append((processor_type, bid)) + highest_bidder = max(bids, key=lambda bidder: bidder[1]) + + return highest_bidder[0] + + def detect_dimensions(self): + lat_regex = r'((.*\s+)?latitude(.*\s+)?)|((.*\s+)?lat(\s+.*)?)' + lon_regex = r'((.*\s+)?longitude(.*\s+)?)|((.*\s+)?lon(\s+.*)?)' + time_regex = r'(.*\s+)?time(.*\s+)?' + + dims = self._dataset.data_vars + lat = self._find_dimension_in_list(lat_regex, dims) + lon = self._find_dimension_in_list(lon_regex, dims) + time = self._find_dimension_in_list(time_regex, dims) + + return (lat, lon, time) + + def _find_dimension_in_list(self, pattern: str, dims: List[str], use_long_name=True) -> str: + candidates = [] + for dim_name in dims: + if use_long_name: + name = self._dataset[dim_name].long_name + else: + name = dim_name + if re.match(pattern, name): + candidates.append(dim_name) + if len(candidates) > 1: + raise Exception(f"Found multiple possibilities for dimension with pattern {pattern}.") + + if len(candidates) == 0: + return None + return candidates[0] + + def _detect_step_sizes(self, dataset: xr.Dataset, variable_name, slice_time=True): + dimensions = dataset[variable_name].dims + time_dim = self._find_dimension_in_list(r'(.*)?time(.*)?', dimensions, use_long_name=False) + + spatial_dims = set(dimensions[-2:]) - {time_dim} + other_dims = set(dimensions[:-2]) - {time_dim} + + spatial_step_sizes = {dim_name: 30 for dim_name in spatial_dims} + other_step_sizes = {dim_name: 1 for dim_name in other_dims} + if time_dim: + if slice_time: + time_step_size = {time_dim: 1} + else: + time_step_size = {time_dim: dataset[variable_name].sizes[time_dim]} + else: + time_step_size = {} + + return {**other_step_sizes, **spatial_step_sizes, **time_step_size} diff --git a/granule_ingester/granule_ingester/processors/__init__.py b/granule_ingester/granule_ingester/processors/__init__.py index 592d8ea..a05673a 100644 --- a/granule_ingester/granule_ingester/processors/__init__.py +++ b/granule_ingester/granule_ingester/processors/__init__.py @@ -1,5 +1,6 @@ +from granule_ingester.processors.ReadingProcessorSelector import ReadingProcessorSelector from granule_ingester.processors.EmptyTileFilter import EmptyTileFilter from granule_ingester.processors.GenerateTileId import GenerateTileId +from granule_ingester.processors.kelvintocelsius import KelvinToCelsius from granule_ingester.processors.TileProcessor import TileProcessor from granule_ingester.processors.TileSummarizingProcessor import TileSummarizingProcessor -from granule_ingester.processors.kelvintocelsius import KelvinToCelsius diff --git a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py index 1876013..0128d57 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py @@ -13,7 +13,7 @@ def __init__(self, variable_to_read, latitude, longitude, - tile, + tile='tile', depth=None, time=None, **kwargs): @@ -23,6 +23,14 @@ def __init__(self, self.time = time self.tile = tile + @staticmethod + def get_criteria(dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str): + return [ + lambda: lat == 'YC' and lon == 'XC', + lambda: lat not in dataset[variable].dims and lon not in dataset[variable].dims, + lambda: 'tile' in dataset[variable].dims + ] + def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): new_tile = nexusproto.EccoTile() diff --git a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py index 4354f9e..27f0b46 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py @@ -14,6 +14,14 @@ def __init__(self, variable_to_read, latitude, longitude, depth=None, time=None, self.depth = depth self.time = time + @staticmethod + def get_criteria(dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str): + return [ + lambda: all(dimension_size > 2 for dimension_size in dataset[variable].sizes.values()), + lambda: len(dataset[lat].dims) == 1 and len(dataset[lon].dims) == 1, + lambda: len(set(dataset[variable].dims) - {time}) >= 2 + ] + def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): new_tile = nexusproto.GridTile() diff --git a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py index fec28ca..4c6fc6e 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py @@ -14,6 +14,10 @@ def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kw self.depth = depth self.time = time + @staticmethod + def get_criteria(dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str): + return [lambda: 2 in dataset[variable].sizes.values()] + def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): new_tile = nexusproto.SwathTile() diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py index 8b69ad2..b5d5105 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py @@ -23,7 +23,9 @@ from granule_ingester.exceptions import TileProcessingError from granule_ingester.processors.TileProcessor import TileProcessor +import logging +logger = logging.getLogger(__name__) class TileReadingProcessor(TileProcessor, ABC): @@ -32,6 +34,17 @@ def __init__(self, variable_to_read: str, latitude: str, longitude: str, *args, self.latitude = latitude self.longitude = longitude + @classmethod + def bid(cls, dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str) -> bool: + criteria = cls.get_criteria(dataset, variable, lat, lon, time) + points = [1 if criterium() else 0 for criterium in criteria] + return sum(points) / len(criteria) + + @staticmethod + @abstractmethod + def get_criteria(dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str): + pass + def process(self, tile, dataset: xr.Dataset, *args, **kwargs): try: dimensions_to_slices = self._convert_spec_to_slices(tile.summary.section_spec) @@ -41,7 +54,8 @@ def process(self, tile, dataset: xr.Dataset, *args, **kwargs): output_tile.summary.data_var_name = self.variable_to_read return self._generate_tile(dataset, dimensions_to_slices, output_tile) - except Exception: + except Exception as e: + logger.exception(e) raise TileProcessingError("Could not generate tiles from the granule.") @abstractmethod diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py index 2831c0c..b84c08b 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py @@ -15,6 +15,13 @@ def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kw self.depth = depth self.time = time + @staticmethod + def get_criteria(dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str): + return [ + lambda: len(dataset[variable].dims) == 2, + lambda: time in dataset[variable].dims + ] + def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): new_tile = nexusproto.TimeSeriesTile() diff --git a/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py b/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py index 6e03336..2827c29 100644 --- a/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py +++ b/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py @@ -15,8 +15,11 @@ import itertools import logging -from typing import List, Dict +from typing import Dict, List +import xarray as xr + +from granule_ingester.exceptions import TileProcessingError from granule_ingester.slicers.TileSlicer import TileSlicer logger = logging.getLogger(__name__) @@ -29,22 +32,22 @@ def __init__(self, super().__init__(*args, **kwargs) self._dimension_step_sizes = dimension_step_sizes - def _generate_slices(self, dimension_specs: Dict[str, int]) -> List[str]: + def _generate_slices(self, dimension_specs: Dict[str, int], step_sizes: Dict[str, int]) -> List[str]: # make sure all provided dimensions are in dataset - for dim_name in self._dimension_step_sizes.keys(): + for dim_name in step_sizes.keys(): if dim_name not in list(dimension_specs.keys()): - raise KeyError('Provided dimension "{}" not found in dataset'.format(dim_name)) + raise TileProcessingError('Provided dimension "{}" not found in dataset'.format(dim_name)) - slices = self._generate_chunk_boundary_slices(dimension_specs) + slices = self._generate_chunk_boundary_slices(dimension_specs, step_sizes) logger.info("Sliced granule into {} slices.".format(len(slices))) return slices - def _generate_chunk_boundary_slices(self, dimension_specs) -> list: + def _generate_chunk_boundary_slices(self, dimension_specs, step_sizes) -> list: dimension_bounds = [] - dim_step_keys = self._dimension_step_sizes.keys() + dim_step_keys = step_sizes.keys() for dim_name, dim_len in dimension_specs.items(): - step_size = self._dimension_step_sizes[dim_name] if dim_name in dim_step_keys else dim_len + step_size = step_sizes[dim_name] if dim_name in dim_step_keys else dim_len bounds = [] for i in range(0, dim_len, step_size): diff --git a/granule_ingester/granule_ingester/slicers/TileSlicer.py b/granule_ingester/granule_ingester/slicers/TileSlicer.py index 06cf094..cdf67c4 100644 --- a/granule_ingester/granule_ingester/slicers/TileSlicer.py +++ b/granule_ingester/granule_ingester/slicers/TileSlicer.py @@ -18,6 +18,7 @@ import xarray as xr from nexusproto.DataTile_pb2 import NexusTile +import re class TileSlicer(ABC): @@ -44,10 +45,11 @@ def __next__(self) -> NexusTile: tile.summary.granule = self._granule_name return tile - def generate_tiles(self, dataset: xr.Dataset, granule_name: str = None): + def generate_tiles(self, dataset: xr.Dataset, variable_name: str, granule_name: str = None): self._granule_name = granule_name dimensions = dataset.dims - self._tile_spec_list = self._generate_slices(dimensions) + step_sizes = self._detect_step_sizes(dataset, variable_name) + self._tile_spec_list = self._generate_slices(dimensions, step_sizes) return self diff --git a/granule_ingester/tests/processors/test_ReadingProcessorSelector.py b/granule_ingester/tests/processors/test_ReadingProcessorSelector.py new file mode 100644 index 0000000..2d722e4 --- /dev/null +++ b/granule_ingester/tests/processors/test_ReadingProcessorSelector.py @@ -0,0 +1,78 @@ + +import unittest +from os import path + + +import xarray as xr + + +from granule_ingester.processors import ReadingProcessorSelector +from granule_ingester.processors.reading_processors import GridReadingProcessor, EccoReadingProcessor, TimeSeriesReadingProcessor, SwathReadingProcessor + + +from granule_ingester.processors.ReadingProcessorSelector import GRID_PROCESSORS + + +class TestGenerateTileId(unittest.TestCase): + + def test_detect_dimensions(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'smap_sss') + self.assertEqual(('lat', 'lon', 'row_time'), selector.detect_dimensions()) + + def test_detect_grid_type_smap(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'smap_sss') + processor = selector.detect_grid_type('lat', 'lon', 'time', GRID_PROCESSORS) + self.assertEqual(GridReadingProcessor, processor) + + def test_detect_grid_type_ecco_native(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'OBP') + processor = selector.detect_grid_type('YC', 'XC', 'time', GRID_PROCESSORS) + self.assertEqual(EccoReadingProcessor, processor) + + def test_detect_grid_type_ecco_interp(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/OBP_2017_01.nc') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'OBP') + processor = selector.detect_grid_type('latitude', 'longitude', 'time', GRID_PROCESSORS) + self.assertEqual(GridReadingProcessor, processor) + + def test_detect_grid_type_time_series(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/not_empty_wswm.nc') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'Qout') + processor = selector.detect_grid_type('lat', 'lon', 'time', GRID_PROCESSORS) + self.assertEqual(TimeSeriesReadingProcessor, processor) + + def test_detect_grid_type_swath(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/not_empty_smap.h5') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'smap_sss') + processor = selector.detect_grid_type('lat', 'lon', 'row_time', GRID_PROCESSORS) + self.assertEqual(SwathReadingProcessor, processor) + + def test_get_reading_processor(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'smap_sss') + processor = selector.get_reading_processor() + self.assertEqual(GridReadingProcessor, type(processor)) + + def test_detect_step_sizes_smap(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'smap_sss') + step_sizes = selector._detect_step_sizes(dataset, 'smap_sss') + self.assertEqual({'phony_dim_0': 30, 'phony_dim_1': 30}, step_sizes) + + def test_detect_step_sizes_timeseries(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/not_empty_wswm.nc') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'Qout') + step_sizes = selector._detect_step_sizes(dataset, 'Qout', slice_time=False) + self.assertEqual({'phony_dim_0': 30, 'phony_dim_1': 30}, step_sizes) diff --git a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py index f2e9f29..ec3311f 100644 --- a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py +++ b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py @@ -62,3 +62,14 @@ def test_generate_tile_with_dims_out_of_order(self): self.assertEqual(output_tile.tile.ecco_tile.variable_data.shape, [15, 7]) self.assertEqual(output_tile.tile.ecco_tile.latitude.shape, [15, 7]) self.assertEqual(output_tile.tile.ecco_tile.longitude.shape, [15, 7]) + + def test_bid(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + bid = EccoReadingProcessor.bid( + dataset=dataset, + variable='OBP', + lat='YC', + lon='XC', + time='time') + self.assertEqual(3, bid) diff --git a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py index 55ac4fc..c9e76c3 100644 --- a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py +++ b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py @@ -72,3 +72,14 @@ def test_read_not_empty_smap(self): self.assertEqual([38, 1], output_tile.tile.swath_tile.variable_data.shape) self.assertEqual([38, 1], output_tile.tile.swath_tile.latitude.shape) self.assertEqual([38, 1], output_tile.tile.swath_tile.longitude.shape) + + def test_bid(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/not_empty_ascatb.nc4') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + bid = SwathReadingProcessor.bid( + dataset=dataset, + variable='wind_speed', + lat='lat', + lon='lon', + time='time') + self.assertTrue(bid)