Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ ush/imsfv3_scf2ioda.py
ush/atparse.bash
ush/run_bufr2ioda.py
ush/bufr2ioda_insitu*
ush/python/gsincdiag_to_ioda
ush/python/ufsda
ush/python/pyiodaconv
ush/python/soca
ush/python/gen_bufr2ioda_json.py
ush/python/gen_bufr2ioda_yaml.py
Expand Down
7 changes: 1 addition & 6 deletions dev/parm/config/gfs/config.base.j2
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export DO_GENESIS_FSU="{{ DO_GENESIS_FSU }}" # Cyclone genesis verification (FSU
export DO_VERFOZN="YES" # Ozone data assimilation monitoring
export DO_VERFRAD="YES" # Radiance data assimilation monitoring
export DO_VMINMON="YES" # GSI minimization monitoring
export DO_ANLSTAT="NO" # JEDI-based analysis statistics
export DO_ANLSTAT="YES" # JEDI-based analysis statistics

# Experiment mode (cycled or forecast-only)
export MODE="{{ MODE }}" # cycled/forecast-only
Expand Down Expand Up @@ -497,11 +497,6 @@ if [[ "${DO_JEDIATMVAR}" = "YES" ]]; then
export DO_VERFOZN="NO" # Ozone data assimilation monitoring
export DO_VERFRAD="NO" # Radiance data assimilation monitoring
export DO_VMINMON="NO" # GSI minimization monitoring
export DO_ANLSTAT="YES" # JEDI-based analysis statistics
else
if [[ "${DO_AERO}" = "YES" || "${DO_JEDIOCNVAR}" = "YES" || "${DO_JEDISNOWDA}" = "YES" ]]; then
export DO_ANLSTAT="YES" # JEDI-based analysis statistics
fi
fi

# TODO: Enable METplus on Ursa when verif-global has been upgraded to spack-stack 1.9.x+
Expand Down
8 changes: 5 additions & 3 deletions dev/workflow/applications/gfs_cycled.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,14 @@ def get_task_names(self):
if options['do_verfrad']:
task_names[run] += ['verfrad']

# Only do analysis statistics for gdas cycles
if run == "gdas":
if options['do_anlstat']:
task_names[run] += ['anlstat']

if options['do_vminmon']:
task_names[run] += ['vminmon']

if options['do_anlstat']:
task_names[run] += ['anlstat']

# gfs-only verification/tracking
if run == 'gfs':
if options['do_tracker']:
Expand Down
8 changes: 4 additions & 4 deletions dev/workflow/rocoto/gfs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1891,6 +1891,9 @@ def anlstat(self):
if self.options['do_jediatmvar']:
dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlfinal'}
deps.append(rocoto.add_dependency(dep_dict))
else:
dep_dict = {'type': 'task', 'name': f'{self.run}_analdiag'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_jediocnvar']:
dep_dict = {'type': 'task', 'name': f'{self.run}_marineanlfinal'}
deps.append(rocoto.add_dependency(dep_dict))
Expand All @@ -1909,7 +1912,7 @@ def anlstat(self):
'resources': resources,
'dependency': dependencies,
'envars': self.envars,
'cycledef': self.run,
'cycledef': self.run.replace('enkf', ''),
'command': f'{self.HOMEgfs}/dev/jobs/anlstat.sh',
'job_name': f'{self.pslot}_{task_name}_@H',
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
Expand Down Expand Up @@ -2144,9 +2147,6 @@ def _arch_tars_deps(self):
if self.options['do_vminmon']:
dep_dict = {'type': 'task', 'name': f'{self.run}_vminmon'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_anlstat']:
dep_dict = {'type': 'task', 'name': f'{self.run}_anlstat'}
deps.append(rocoto.add_dependency(dep_dict))
elif self.run in ['gdas']:
dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlprod'}
deps.append(rocoto.add_dependency(dep_dict))
Expand Down
3 changes: 3 additions & 0 deletions scripts/exglobal_analysis_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
AnlStats.task_config['STAT_ANALYSES'].append('snow')
if AnlStats.task_config.DO_JEDIATMVAR:
AnlStats.task_config['STAT_ANALYSES'].append('atmos')
else:
AnlStats.task_config['STAT_ANALYSES'].append('atmos_gsi')
AnlStats.convert_gsi_diags()

# Initialize JEDI variational analysis
AnlStats.initialize()
Expand Down
2 changes: 2 additions & 0 deletions sorc/link_workflow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ if [[ -d "${HOMEgfs}/sorc/gdas.cd/build" ]]; then
${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/ush/ufsda" .
${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/ush/ioda/bufr2ioda/gen_bufr2ioda_json.py" .
${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/ush/ioda/bufr2ioda/gen_bufr2ioda_yaml.py" .
${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/sorc/da-utils/ush/pyiodaconv" .
${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/sorc/da-utils/ush/gsincdiag_to_ioda" .
cd "${HOMEgfs}/ush" || exit 1
${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/ush/ioda/bufr2ioda/run_bufr2ioda.py" .
${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/build/bin/imsfv3_scf2ioda.py" .
Expand Down
117 changes: 115 additions & 2 deletions ush/python/pygfs/task/analysis_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
import glob
import gsincdiag_to_ioda.proc_gsi_ncdiag as gsid
import gzip
import tarfile
from logging import getLogger
Expand Down Expand Up @@ -73,7 +74,7 @@ def initialize(self) -> None:
# Create dictionary of Jedi objects
# Expected keys are what must be included from the JEDI config file. We can
# then loop through ob space list from scripts/exglobal_analysis_stats.py
expected_keys = ['aero', 'atmos', 'snow']
expected_keys = ['aero', 'atmos', 'atmos_gsi', 'snow']
jedi_config_dict = parse_j2yaml(self.task_config.JEDI_CONFIG_YAML, self.task_config)
self.jedi_dict = Jedi.get_jedi_dict(jedi_config_dict, self.task_config, expected_keys)

Expand Down Expand Up @@ -173,7 +174,10 @@ def finalize(self, jedi_dict_key: str) -> None:

for analysis_dict in analysis_config_dict[jedi_dict_key]['obs spaces']:
statfile = os.path.join(self.task_config.DATA, analysis_dict['output file'])
outdir = self.task_config['COMOUT_' + jedi_dict_key.upper() + '_ANLMON']
if jedi_dict_key == 'atmos_gsi':
outdir = self.task_config['COMOUT_ATMOS_ANLMON']
else:
outdir = self.task_config['COMOUT_' + jedi_dict_key.upper() + '_ANLMON']

# Check if the directory exists; if not, create it
if not os.path.exists(outdir):
Expand All @@ -199,3 +203,112 @@ def finalize(self, jedi_dict_key: str) -> None:
with tarfile.open(iodastatzipfile, "w|gz") as archive:
for targetfile in iodastatfiles:
archive.add(targetfile, arcname=os.path.basename(targetfile))

@logit(logger)
def convert_gsi_diags(self) -> None:
"""Convert GSI diag files to ioda-stat files for analysis stats

This method will convert GSI diag files to ioda-stat files for analysis stats.
This includes:
- copying GSI diag files to DATA path
- untarring and gunzipping GSI diag files
- converting GSI diag files to ioda files using gsincdiag2ioda converter scripts

Parameters
----------
None

Returns
----------
None
"""
logger.info("Converting GSI diag files to IODA files for analysis stats")
# copy GSI diag files to DATA path
diag_tars = ['cnvstat', 'radstat', 'oznstat']
diag_dir_ges_path = os.path.join(self.task_config.DATA, 'atmos_gsi_ges')
diag_dir_anl_path = os.path.join(self.task_config.DATA, 'atmos_gsi_anl')
diag_dir_path = os.path.join(self.task_config.DATA, 'atmos_gsi_diags')
FileHandler({'mkdir': [diag_dir_path, diag_dir_ges_path, diag_dir_anl_path]}).sync()
diag_ioda_dir_ges_path = os.path.join(self.task_config.DATA, 'atmos_gsi_ioda_ges')
diag_ioda_dir_anl_path = os.path.join(self.task_config.DATA, 'atmos_gsi_ioda_anl')
output_dir_path = os.path.join(self.task_config.DATA, 'atmos_gsi_ioda')
FileHandler({'mkdir': [diag_ioda_dir_ges_path, diag_ioda_dir_anl_path, output_dir_path]}).sync()
diag_tar_copy_list = []
for diag in diag_tars:
input_tar_basename = f"{self.task_config.APREFIX}{diag}"
input_tar = os.path.join(self.task_config.COMIN_ATMOS_ANALYSIS,
input_tar_basename)
dest = os.path.join(diag_dir_path, input_tar_basename)
if os.path.exists(input_tar):
diag_tar_copy_list.append([input_tar, dest])
FileHandler({'copy_opt': diag_tar_copy_list}).sync()

# Untar and gunzip diag files
gsi_diag_tars = glob.glob(os.path.join(diag_dir_path, f"{self.task_config.APREFIX}*stat"))
for diag_tar in gsi_diag_tars:
logger.info(f"Untarring {diag_tar}")
with tarfile.open(diag_tar, "r") as tar:
tar.extractall(path=diag_dir_path)
gsi_diags = glob.glob(os.path.join(diag_dir_path, "diag_*.nc4.gz"))
for diag in gsi_diags:
logger.info(f"Gunzipping {diag}")
output_file = diag.rstrip('.gz')
with gzip.open(diag, 'rb') as f_in:
with open(output_file, 'wb') as f_out:
f_out.write(f_in.read())
os.remove(diag)

# Copy diag files to ges or anl directory
anl_diags = glob.glob(os.path.join(diag_dir_path, "diag_*_anl*.nc4"))
ges_diags = glob.glob(os.path.join(diag_dir_path, "diag_*_ges*.nc4"))
copy_anl_diags = []
for diag in anl_diags:
copy_anl_diags.append([diag, os.path.join(diag_dir_anl_path, os.path.basename(diag))])
FileHandler({'copy_opt': copy_anl_diags}).sync()
copy_ges_diags = []
for diag in ges_diags:
copy_ges_diags.append([diag, os.path.join(diag_dir_ges_path, os.path.basename(diag))])
FileHandler({'copy_opt': copy_ges_diags}).sync()

# Convert GSI diag files to ioda files using gsincdiag2ioda converter scripts
gsid.proc_gsi_ncdiag(ObsDir=diag_ioda_dir_ges_path, DiagDir=diag_dir_ges_path)
gsid.proc_gsi_ncdiag(ObsDir=diag_ioda_dir_anl_path, DiagDir=diag_dir_anl_path)

# now we need to combine the two sets of ioda files into one file
# by adding certain groups from the anl file to the ges file
ges_ioda_files = glob.glob(os.path.join(diag_ioda_dir_ges_path, '*nc4'))
for ges_ioda_file in ges_ioda_files:
anl_ioda_file = ges_ioda_file.replace('_ges_', '_anl_').replace(diag_ioda_dir_ges_path, diag_ioda_dir_anl_path)
if os.path.exists(anl_ioda_file):
logger.info(f"Combining {ges_ioda_file} and {anl_ioda_file}")
out_ioda_file = os.path.join(output_dir_path, os.path.basename(ges_ioda_file).replace('_ges_', '_gsi_'))
gsid.combine_ges_anl_ioda(ges_ioda_file, anl_ioda_file, out_ioda_file)
else:
logger.warning(f"WARNING: {anl_ioda_file} does not exist to combine with {ges_ioda_file}")
logger.warning("Skipping this file ...")

# Tar up the ioda files
iodastatzipfile = os.path.join(self.task_config.DATA, 'atmos_gsi_ioda',
f"{self.task_config.APREFIX}atmos_gsi_ioda_diags.tgz")
logger.info(f"Compressing GSI IODA files to {iodastatzipfile}")
# get list of iodastat files to put in tarball
iodastatfiles = glob.glob(os.path.join(output_dir_path, '*nc4'))
logger.info(f"Gathering {len(iodastatfiles)} GSI IODA files to {iodastatzipfile}")
with tarfile.open(iodastatzipfile, "w|gz") as archive:
for targetfile in iodastatfiles:
# gzip the file before adding to tar
with open(targetfile, 'rb') as f_in:
with gzip.open(f"{targetfile}.gz", 'wb') as f_out:
f_out.writelines(f_in)
os.remove(targetfile)
targetfile = f"{targetfile}.gz"
archive.add(targetfile, arcname=os.path.basename(targetfile))
logger.info(f"Finished compressing GSI IODA files to {iodastatzipfile}")
# copy to COMOUT
outdir = self.task_config.COMOUT_ATMOS_ANLMON
if not os.path.exists(outdir):
FileHandler({'mkdir': [outdir]}).sync()
dest = os.path.join(outdir, os.path.basename(iodastatzipfile))
logger.info(f"Copying {iodastatzipfile} to {dest}")
FileHandler({'copy_opt': [[iodastatzipfile, dest]]}).sync()
logger.info("Finished copying GSI IODA tar file to COMOUT")