Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
granule:
resource: {{granule}}
variable: {{variable}}
slicer:
name: sliceFileByStepSize
dimension_step_sizes:
time: 1
lat: 30
lon: 30
processors:
- name: GridReadingProcessor
latitude: lat
longitude: lon
time: time
variable_to_read: {{variable}}
- name: emptyTileFilter
- name: kelvinToCelsius
- name: tileSummary
Expand Down
7 changes: 6 additions & 1 deletion granule_ingester/granule_ingester/exceptions/Exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ class PipelineRunningError(Exception):
pass


class TileProcessingError(Exception):
class TileProcessingError(PipelineRunningError):
pass


class GranuleLoadingError(PipelineRunningError):
pass


Expand All @@ -21,6 +25,7 @@ class RabbitMQLostConnectionError(LostConnectionError):
class CassandraLostConnectionError(LostConnectionError):
pass


class SolrLostConnectionError(LostConnectionError):
pass

Expand Down
19 changes: 8 additions & 11 deletions granule_ingester/granule_ingester/exceptions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from .Exceptions import CassandraFailedHealthCheckError
from .Exceptions import CassandraLostConnectionError
from .Exceptions import FailedHealthCheckError
from .Exceptions import LostConnectionError
from .Exceptions import PipelineBuildingError
from .Exceptions import PipelineRunningError
from .Exceptions import RabbitMQFailedHealthCheckError
from .Exceptions import RabbitMQLostConnectionError
from .Exceptions import SolrFailedHealthCheckError
from .Exceptions import SolrLostConnectionError
from .Exceptions import TileProcessingError
from .Exceptions import (CassandraFailedHealthCheckError,
CassandraLostConnectionError, FailedHealthCheckError,
GranuleLoadingError, LostConnectionError,
PipelineBuildingError, PipelineRunningError,
RabbitMQFailedHealthCheckError,
RabbitMQLostConnectionError,
SolrFailedHealthCheckError, SolrLostConnectionError,
TileProcessingError)
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import aioboto3
import xarray as xr

from granule_ingester.exceptions import GranuleLoadingError

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -52,7 +54,12 @@ async def open(self) -> (xr.Dataset, str):
raise RuntimeError("Granule path scheme '{}' is not supported.".format(resource_url.scheme))

granule_name = os.path.basename(self._resource)
return xr.open_dataset(file_path, lock=False), granule_name
try:
return xr.open_dataset(file_path, lock=False), granule_name
except FileNotFoundError:
raise GranuleLoadingError(f"The granule file {self._resource} does not exist.")
except Exception:
raise GranuleLoadingError(f"The granule {self._resource} is not a valid NetCDF file.")

@staticmethod
async def _download_s3_file(url: str):
Expand Down
13 changes: 8 additions & 5 deletions granule_ingester/granule_ingester/pipeline/Modules.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from granule_ingester.processors import *
from granule_ingester.processors.reading_processors import *
from granule_ingester.slicers import *
from granule_ingester.granule_loaders import *
from granule_ingester.granule_loaders import GranuleLoader
from granule_ingester.processors import (EmptyTileFilter, GenerateTileId,
KelvinToCelsius,
TileSummarizingProcessor)
from granule_ingester.processors.reading_processors import (
EccoReadingProcessor, GridReadingProcessor)
from granule_ingester.slicers import SliceFileByStepSize

modules = {
"granule": GranuleLoader,
Expand All @@ -11,5 +14,5 @@
"GridReadingProcessor": GridReadingProcessor,
"tileSummary": TileSummarizingProcessor,
"emptyTileFilter": EmptyTileFilter,
"kelvinToCelsius": KelvinToCelsius
"kelvinToCelsius": KelvinToCelsius,
}
14 changes: 12 additions & 2 deletions granule_ingester/granule_ingester/pipeline/Pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from aiomultiprocess.types import ProxyException
from granule_ingester.exceptions import PipelineBuildingError
from granule_ingester.granule_loaders import GranuleLoader
from granule_ingester.processors import ReadingProcessorSelector
from granule_ingester.pipeline.Modules import \
modules as processor_module_mappings
from granule_ingester.processors.TileProcessor import TileProcessor
Expand Down Expand Up @@ -90,13 +91,15 @@ class Pipeline:
def __init__(self,
granule_loader: GranuleLoader,
slicer: TileSlicer,
variable_name: str,
data_store_factory,
metadata_store_factory,
tile_processors: List[TileProcessor],
max_concurrency: int):
self._granule_loader = granule_loader
self._tile_processors = tile_processors
self._slicer = slicer
self._variable_name = variable_name
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
self._max_concurrency = max_concurrency
Expand Down Expand Up @@ -141,6 +144,7 @@ def _build_pipeline(cls,

slicer_config = config['slicer']
slicer = cls._parse_module(slicer_config, module_mappings)
variable_name = config['variable']

tile_processors = []
for processor_config in config['processors']:
Expand All @@ -149,6 +153,7 @@ def _build_pipeline(cls,

return cls(granule_loader,
slicer,
variable_name,
data_store_factory,
metadata_store_factory,
tile_processors,
Expand All @@ -174,17 +179,22 @@ async def run(self):
async with self._granule_loader as (dataset, granule_name):
start = time.perf_counter()

reading_processor = ReadingProcessorSelector(dataset, self._variable_name).get_reading_processor()
tile_processors = [reading_processor, *self._tile_processors]
logger.info(f"Using {type(reading_processor)} to process granule {granule_name}.")

shared_memory = self._manager.Namespace()

async with Pool(initializer=_init_worker,
initargs=(self._tile_processors,
initargs=(tile_processors,
dataset,
self._data_store_factory,
self._metadata_store_factory,
shared_memory),
maxtasksperchild=self._max_concurrency,
childconcurrency=self._max_concurrency) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
self._slicer.generate_tiles(dataset, self._variable_name, granule_name)]
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
# a queue can't have more than 2**15-1 tasks. So, we have to batch it.
for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import xarray as xr
from typing import List
import re

from granule_ingester.processors.reading_processors import (TileReadingProcessor,
GridReadingProcessor,
EccoReadingProcessor,
SwathReadingProcessor,
TimeSeriesReadingProcessor)


GRID_PROCESSORS = [GridReadingProcessor, EccoReadingProcessor, SwathReadingProcessor, TimeSeriesReadingProcessor]


class ReadingProcessorSelector:
def __init__(self, dataset: xr.Dataset, variable: str, *args, **kwargs):
self._dataset = dataset
self._variable = variable

def get_reading_processor(self):
lat, lon, time = self.detect_dimensions()
processor_class = self.detect_grid_type(lat=lat, lon=lon, time=time, processor_types=GRID_PROCESSORS)
return processor_class(variable_to_read=self._variable, latitude=lat, longitude=lon, time=time)

def detect_grid_type(self,
lat: str,
lon: str,
time: str,
processor_types: List[TileReadingProcessor]):
bids = []
for processor_type in processor_types:
bid = processor_type.bid(dataset=self._dataset,
variable=self._variable,
lat=lat,
lon=lon,
time=time)
bids.append((processor_type, bid))
highest_bidder = max(bids, key=lambda bidder: bidder[1])

return highest_bidder[0]

def detect_dimensions(self):
lat_regex = r'((.*\s+)?latitude(.*\s+)?)|((.*\s+)?lat(\s+.*)?)'
lon_regex = r'((.*\s+)?longitude(.*\s+)?)|((.*\s+)?lon(\s+.*)?)'
time_regex = r'(.*\s+)?time(.*\s+)?'

dims = self._dataset.data_vars
lat = self._find_dimension_in_list(lat_regex, dims)
lon = self._find_dimension_in_list(lon_regex, dims)
time = self._find_dimension_in_list(time_regex, dims)

return (lat, lon, time)

def _find_dimension_in_list(self, pattern: str, dims: List[str], use_long_name=True) -> str:
candidates = []
for dim_name in dims:
if use_long_name:
name = self._dataset[dim_name].long_name
else:
name = dim_name
if re.match(pattern, name):
candidates.append(dim_name)
if len(candidates) > 1:
raise Exception(f"Found multiple possibilities for dimension with pattern {pattern}.")

if len(candidates) == 0:
return None
return candidates[0]

def _detect_step_sizes(self, dataset: xr.Dataset, variable_name, slice_time=True):
dimensions = dataset[variable_name].dims
time_dim = self._find_dimension_in_list(r'(.*)?time(.*)?', dimensions, use_long_name=False)

spatial_dims = set(dimensions[-2:]) - {time_dim}
other_dims = set(dimensions[:-2]) - {time_dim}

spatial_step_sizes = {dim_name: 30 for dim_name in spatial_dims}
other_step_sizes = {dim_name: 1 for dim_name in other_dims}
if time_dim:
if slice_time:
time_step_size = {time_dim: 1}
else:
time_step_size = {time_dim: dataset[variable_name].sizes[time_dim]}
else:
time_step_size = {}

return {**other_step_sizes, **spatial_step_sizes, **time_step_size}
3 changes: 2 additions & 1 deletion granule_ingester/granule_ingester/processors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from granule_ingester.processors.ReadingProcessorSelector import ReadingProcessorSelector
from granule_ingester.processors.EmptyTileFilter import EmptyTileFilter
from granule_ingester.processors.GenerateTileId import GenerateTileId
from granule_ingester.processors.kelvintocelsius import KelvinToCelsius
from granule_ingester.processors.TileProcessor import TileProcessor
from granule_ingester.processors.TileSummarizingProcessor import TileSummarizingProcessor
from granule_ingester.processors.kelvintocelsius import KelvinToCelsius
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self,
variable_to_read,
latitude,
longitude,
tile,
tile='tile',
depth=None,
time=None,
**kwargs):
Expand All @@ -23,6 +23,14 @@ def __init__(self,
self.time = time
self.tile = tile

@staticmethod
def get_criteria(dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str):
return [
lambda: lat == 'YC' and lon == 'XC',
lambda: lat not in dataset[variable].dims and lon not in dataset[variable].dims,
lambda: 'tile' in dataset[variable].dims
]

def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
new_tile = nexusproto.EccoTile()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ def __init__(self, variable_to_read, latitude, longitude, depth=None, time=None,
self.depth = depth
self.time = time

@staticmethod
def get_criteria(dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str):
return [
lambda: all(dimension_size > 2 for dimension_size in dataset[variable].sizes.values()),
lambda: len(dataset[lat].dims) == 1 and len(dataset[lon].dims) == 1,
lambda: len(set(dataset[variable].dims) - {time}) >= 2
]

def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
new_tile = nexusproto.GridTile()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kw
self.depth = depth
self.time = time

@staticmethod
def get_criteria(dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str):
return [lambda: 2 in dataset[variable].sizes.values()]

def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
new_tile = nexusproto.SwathTile()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

from granule_ingester.exceptions import TileProcessingError
from granule_ingester.processors.TileProcessor import TileProcessor
import logging

logger = logging.getLogger(__name__)

class TileReadingProcessor(TileProcessor, ABC):

Expand All @@ -32,6 +34,17 @@ def __init__(self, variable_to_read: str, latitude: str, longitude: str, *args,
self.latitude = latitude
self.longitude = longitude

@classmethod
def bid(cls, dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str) -> bool:
criteria = cls.get_criteria(dataset, variable, lat, lon, time)
points = [1 if criterium() else 0 for criterium in criteria]
return sum(points) / len(criteria)

@staticmethod
@abstractmethod
def get_criteria(dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str):
pass

def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
try:
dimensions_to_slices = self._convert_spec_to_slices(tile.summary.section_spec)
Expand All @@ -41,7 +54,8 @@ def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
output_tile.summary.data_var_name = self.variable_to_read

return self._generate_tile(dataset, dimensions_to_slices, output_tile)
except Exception:
except Exception as e:
logger.exception(e)
raise TileProcessingError("Could not generate tiles from the granule.")

@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kw
self.depth = depth
self.time = time

@staticmethod
def get_criteria(dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str):
return [
lambda: len(dataset[variable].dims) == 2,
lambda: time in dataset[variable].dims
]

def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
new_tile = nexusproto.TimeSeriesTile()

Expand Down
Loading