Skip to content

Commit 674e978

Browse files
committed
Add an alternative xarray approach to computing convservation time series
This alternative approach makes sure the constents are unique (and sorted) in the xtime variable. To reduce redundancy between the new approach and the old ncrcat approach (retained for comparison and in case the xarray approach is not performant), a new helper function for determining if we want to append to the existing datasets added.
1 parent 05c946d commit 674e978

File tree

1 file changed

+114
-37
lines changed

1 file changed

+114
-37
lines changed

mpas_analysis/ocean/conservation.py

Lines changed: 114 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ def run_task(self):
252252
for plot_type in self.plotTypes:
253253
for varname in self.variableList[plot_type]:
254254
all_plots_variable_list.append(varname)
255-
self._compute_time_series_with_ncrcat(all_plots_variable_list)
255+
self._compute_time_series_with_xarray(all_plots_variable_list)
256256
for plot_type in self.plotTypes:
257257
self._make_plot(plot_type)
258258

@@ -604,28 +604,27 @@ def _get_variable(self, ds, varname, mks=False):
604604

605605
return variable
606606

607-
def _compute_time_series_with_ncrcat(self, variable_list):
608-
607+
def _check_output_safe_to_append(self, variable_list):
609608
"""
610-
Uses ncrcat to extact time series from conservationCheckOutput files
609+
Check if the output file is safe to append to by verifying the presence
610+
of necessary variables and determining if new input files are needed.
611611
612-
Raises
613-
------
614-
OSError
615-
If ``ncrcat`` is not in the system path.
612+
Parameters
613+
----------
614+
variable_list : list of str
615+
List of variables to include in the time series.
616+
617+
Returns
618+
-------
619+
append : bool
620+
True if the output file can be safely appended to, False otherwise.
621+
inputFiles : list of str
622+
Updated list of input files to process.
616623
"""
617-
618-
if shutil.which('ncrcat') is None:
619-
raise OSError('ncrcat not found. Make sure the latest nco '
620-
'package is installed: \n'
621-
'conda install nco\n'
622-
'Note: this presumes use of the conda-forge '
623-
'channel.')
624-
625-
inputFiles = self.inputFiles
626624
append = False
625+
inputFiles = self.inputFiles
626+
627627
if os.path.exists(self.outputFile):
628-
# make sure all the necessary variables are also present
629628
with xr.open_dataset(self.outputFile) as ds:
630629
if ds.sizes['Time'] == 0:
631630
updateSubset = False
@@ -637,11 +636,7 @@ def _compute_time_series_with_ncrcat(self, variable_list):
637636
break
638637

639638
if updateSubset:
640-
# add only input files with times that aren't already in
641-
# the output file
642-
643639
append = True
644-
645640
fileNames = sorted(self.inputFiles)
646641
inYears, inMonths = get_files_year_month(
647642
fileNames, self.historyStreams,
@@ -652,33 +647,51 @@ def _compute_time_series_with_ncrcat(self, variable_list):
652647
totalMonths = 12 * inYears + inMonths
653648

654649
dates = decode_strings(ds.xtime)
655-
656650
lastDate = dates[-1]
657-
658651
lastYear = int(lastDate[0:4])
659652
lastMonth = int(lastDate[5:7])
660653
lastTotalMonths = 12 * lastYear + lastMonth
661654

662-
inputFiles = []
663-
for index, inputFile in enumerate(fileNames):
664-
if totalMonths[index] > lastTotalMonths:
665-
inputFiles.append(inputFile)
655+
inputFiles = [
656+
inputFile for index, inputFile in enumerate(fileNames)
657+
if totalMonths[index] > lastTotalMonths
658+
]
666659

667660
if len(inputFiles) == 0:
668-
# nothing to do
669-
return
661+
return append, inputFiles
670662
else:
671-
# there is an output file but it has the wrong variables
672-
# so we need ot delete it.
673-
self.logger.warning('Warning: deleting file {self.outputFile}'
674-
' because it is empty or some variables'
675-
' were missing')
663+
self.logger.warning(
664+
f'Warning: deleting file {self.outputFile} because it '
665+
'is empty or some variables were missing')
676666
os.remove(self.outputFile)
677667

678-
variableList = variable_list + ['xtime']
668+
return append, inputFiles
669+
670+
def _compute_time_series_with_ncrcat(self, variable_list):
671+
"""
672+
Uses ncrcat to extract time series from conservationCheckOutput files.
673+
674+
Raises
675+
------
676+
OSError
677+
If ``ncrcat`` is not in the system path.
678+
"""
679+
if shutil.which('ncrcat') is None:
680+
raise OSError('ncrcat not found. Make sure the latest nco '
681+
'package is installed: \n'
682+
'conda install nco\n'
683+
'Note: this presumes use of the conda-forge '
684+
'channel.')
685+
686+
variable_list = variable_list + ['xtime']
687+
append, inputFiles = self._check_output_safe_to_append(variable_list)
688+
689+
if len(inputFiles) == 0:
690+
# nothing to do
691+
return
679692

680693
args = ['ncrcat', '-4', '--no_tmp_fl',
681-
'-v', ','.join(variableList)]
694+
'-v', ','.join(variable_list)]
682695

683696
if append:
684697
args.append('--record_append')
@@ -710,3 +723,67 @@ def _compute_time_series_with_ncrcat(self, variable_list):
710723
if process.returncode != 0:
711724
raise subprocess.CalledProcessError(process.returncode,
712725
' '.join(args))
726+
727+
def _compute_time_series_with_xarray(self, variable_list):
728+
"""
729+
Uses xarray to extract time series from conservationCheckOutput files,
730+
handling redundant `xtime` entries and sorting by `xtime`.
731+
732+
Parameters
733+
----------
734+
variable_list : list of str
735+
List of variables to include in the time series.
736+
"""
737+
# Authors
738+
# -------
739+
# Xylar Asay-Davis
740+
741+
inputFiles = self.inputFiles
742+
variable_list = variable_list + ['xtime']
743+
744+
append, inputFiles = self._check_output_safe_to_append(variable_list)
745+
746+
# Open all input files as a single dataset
747+
self.logger.info(
748+
f'Opening input files with xarray: {inputFiles[0]} ... '
749+
f'{inputFiles[-1]}')
750+
ds = xr.open_mfdataset(
751+
inputFiles,
752+
combine='nested',
753+
concat_dim='Time',
754+
data_vars='minimal',
755+
coords='minimal',
756+
compat='override'
757+
)
758+
759+
# Select only the requested variables
760+
ds = ds[variable_list]
761+
762+
# Handle redundant `xtime` entries by keeping the last occurrence
763+
self.logger.info('Removing redundant xtime entries...')
764+
_, unique_indices = np.unique(ds['xtime'].values, return_index=True)
765+
unique_indices = sorted(unique_indices) # Ensure ascending order
766+
ds = ds.isel(Time=unique_indices)
767+
768+
if append:
769+
# Load the existing dataset and combine it with the new dataset
770+
self.logger.info(
771+
f'Appending to existing dataset in {self.outputFile}...')
772+
with xr.open_dataset(self.outputFile) as existing_ds:
773+
ds = xr.concat([existing_ds, ds], dim='Time')
774+
# Remove redundant `xtime` entries again after concatenation
775+
_, unique_indices = np.unique(
776+
ds['xtime'].values, return_index=True)
777+
unique_indices = sorted(unique_indices)
778+
ds = ds.isel(Time=unique_indices)
779+
780+
# Sort by `xtime` to ensure the time series is in ascending order
781+
self.logger.info('Sorting by xtime...')
782+
ds = ds.sortby('xtime')
783+
784+
# Save the resulting dataset to the output file
785+
self.logger.info(
786+
f'Saving concatenated dataset to {self.outputFile}...')
787+
ds.to_netcdf(self.outputFile, format='NETCDF4')
788+
789+
self.logger.info('Time series successfully created with xarray.')

0 commit comments

Comments
 (0)