Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ ush/imsfv3_scf2ioda.py
ush/atparse.bash
ush/run_bufr2ioda.py
ush/bufr2ioda_insitu*
ush/python/gsincdiag_to_ioda
ush/python/ufsda
ush/python/pyiodaconv
ush/python/soca
ush/python/gen_bufr2ioda_json.py
ush/python/gen_bufr2ioda_yaml.py
Expand Down
7 changes: 1 addition & 6 deletions dev/parm/config/gfs/config.base.j2
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export DO_GENESIS_FSU="{{ DO_GENESIS_FSU }}" # Cyclone genesis verification (FSU
export DO_VERFOZN="YES" # Ozone data assimilation monitoring
export DO_VERFRAD="YES" # Radiance data assimilation monitoring
export DO_VMINMON="YES" # GSI minimization monitoring
export DO_ANLSTAT="NO" # JEDI-based analysis statistics
export DO_ANLSTAT="YES" # JEDI-based analysis statistics

# Experiment mode (cycled or forecast-only)
export MODE="{{ MODE }}" # cycled/forecast-only
Expand Down Expand Up @@ -497,11 +497,6 @@ if [[ "${DO_JEDIATMVAR}" = "YES" ]]; then
export DO_VERFOZN="NO" # Ozone data assimilation monitoring
export DO_VERFRAD="NO" # Radiance data assimilation monitoring
export DO_VMINMON="NO" # GSI minimization monitoring
export DO_ANLSTAT="YES" # JEDI-based analysis statistics
else
if [[ "${DO_AERO}" = "YES" || "${DO_JEDIOCNVAR}" = "YES" || "${DO_JEDISNOWDA}" = "YES" ]]; then
export DO_ANLSTAT="YES" # JEDI-based analysis statistics
fi
fi

# TODO: Enable METplus on Ursa when verif-global has been upgraded to spack-stack 1.9.x+
Expand Down
8 changes: 5 additions & 3 deletions dev/workflow/applications/gfs_cycled.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,14 @@ def get_task_names(self):
if options['do_verfrad']:
task_names[run] += ['verfrad']

# Only do analysis statistics for gdas cycles
if run == "gdas":
if options['do_anlstat']:
task_names[run] += ['anlstat']

if options['do_vminmon']:
task_names[run] += ['vminmon']

if options['do_anlstat']:
task_names[run] += ['anlstat']

# gfs-only verification/tracking
if run == 'gfs':
if options['do_tracker']:
Expand Down
8 changes: 4 additions & 4 deletions dev/workflow/rocoto/gfs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1891,6 +1891,9 @@ def anlstat(self):
if self.options['do_jediatmvar']:
dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlfinal'}
deps.append(rocoto.add_dependency(dep_dict))
else:
dep_dict = {'type': 'task', 'name': f'{self.run}_analdiag'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_jediocnvar']:
dep_dict = {'type': 'task', 'name': f'{self.run}_marineanlfinal'}
deps.append(rocoto.add_dependency(dep_dict))
Expand All @@ -1909,7 +1912,7 @@ def anlstat(self):
'resources': resources,
'dependency': dependencies,
'envars': self.envars,
'cycledef': self.run,
'cycledef': self.run.replace('enkf', ''),
'command': f'{self.HOMEgfs}/dev/jobs/anlstat.sh',
'job_name': f'{self.pslot}_{task_name}_@H',
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
Expand Down Expand Up @@ -2144,9 +2147,6 @@ def _arch_tars_deps(self):
if self.options['do_vminmon']:
dep_dict = {'type': 'task', 'name': f'{self.run}_vminmon'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_anlstat']:
dep_dict = {'type': 'task', 'name': f'{self.run}_anlstat'}
deps.append(rocoto.add_dependency(dep_dict))
elif self.run in ['gdas']:
dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlprod'}
deps.append(rocoto.add_dependency(dep_dict))
Expand Down
3 changes: 3 additions & 0 deletions scripts/exglobal_analysis_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
AnlStats.task_config['STAT_ANALYSES'].append('snow')
if AnlStats.task_config.DO_JEDIATMVAR:
AnlStats.task_config['STAT_ANALYSES'].append('atmos')
else:
AnlStats.task_config['STAT_ANALYSES'].append('atmos_gsi')
AnlStats.convert_gsi_diags()

# Initialize JEDI variational analysis
AnlStats.initialize()
Expand Down
2 changes: 2 additions & 0 deletions sorc/link_workflow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ if [[ -d "${HOMEgfs}/sorc/gdas.cd/build" ]]; then
${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/ush/ufsda" .
${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/ush/ioda/bufr2ioda/gen_bufr2ioda_json.py" .
${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/ush/ioda/bufr2ioda/gen_bufr2ioda_yaml.py" .
${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/sorc/da-utils/ush/pyiodaconv" .
${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/sorc/da-utils/ush/gsincdiag_to_ioda" .
cd "${HOMEgfs}/ush" || exit 1
${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/ush/ioda/bufr2ioda/run_bufr2ioda.py" .
${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/build/bin/imsfv3_scf2ioda.py" .
Expand Down
87 changes: 85 additions & 2 deletions ush/python/pygfs/task/analysis_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
import glob
import gsincdiag_to_ioda.proc_gsi_ncdiag as gsid
import gzip
import tarfile
from logging import getLogger
Expand Down Expand Up @@ -73,7 +74,7 @@ def initialize(self) -> None:
# Create dictionary of Jedi objects
# Expected keys are what must be included from the JEDI config file. We can
# then loop through ob space list from scripts/exglobal_analysis_stats.py
expected_keys = ['aero', 'atmos', 'snow']
expected_keys = ['aero', 'atmos', 'atmos_gsi', 'snow']
jedi_config_dict = parse_j2yaml(self.task_config.JEDI_CONFIG_YAML, self.task_config)
self.jedi_dict = Jedi.get_jedi_dict(jedi_config_dict, self.task_config, expected_keys)

Expand Down Expand Up @@ -173,7 +174,10 @@ def finalize(self, jedi_dict_key: str) -> None:

for analysis_dict in analysis_config_dict[jedi_dict_key]['obs spaces']:
statfile = os.path.join(self.task_config.DATA, analysis_dict['output file'])
outdir = self.task_config['COMOUT_' + jedi_dict_key.upper() + '_ANLMON']
if jedi_dict_key == 'atmos_gsi':
outdir = self.task_config['COMOUT_ATMOS_ANLMON']
else:
outdir = self.task_config['COMOUT_' + jedi_dict_key.upper() + '_ANLMON']

# Check if the directory exists; if not, create it
if not os.path.exists(outdir):
Expand All @@ -199,3 +203,82 @@ def finalize(self, jedi_dict_key: str) -> None:
with tarfile.open(iodastatzipfile, "w|gz") as archive:
for targetfile in iodastatfiles:
archive.add(targetfile, arcname=os.path.basename(targetfile))

@logit(logger)
def convert_gsi_diags(self) -> None:
"""Convert GSI diag files to ioda-stat files for analysis stats

This method will convert GSI diag files to ioda-stat files for analysis stats.
This includes:
- copying GSI diag files to DATA path
- untarring and gunzipping GSI diag files
- converting GSI diag files to ioda files using gsincdiag2ioda converter scripts

Parameters
----------
None

Returns
----------
None
"""
logger.info("Converting GSI diag files to IODA files for analysis stats")
# copy GSI diag files to DATA path
diag_tars = ['cnvstat', 'radstat', 'oznstat']
diag_dir_path = os.path.join(self.task_config.DATA, 'atmos_gsi')
FileHandler({'mkdir': [diag_dir_path]}).sync()
diag_ioda_dir_path = os.path.join(self.task_config.DATA, 'atmos_gsi_ioda')
FileHandler({'mkdir': [diag_ioda_dir_path]}).sync()
diag_tar_copy_list = []
for diag in diag_tars:
input_tar_basename = f"{self.task_config.APREFIX}{diag}"
input_tar = os.path.join(self.task_config.COMIN_ATMOS_ANALYSIS,
input_tar_basename)
dest = os.path.join(diag_dir_path, input_tar_basename)
if os.path.exists(input_tar):
diag_tar_copy_list.append([input_tar, dest])
FileHandler({'copy_opt': diag_tar_copy_list}).sync()

# Untar and gunzip diag files
gsi_diag_tars = glob.glob(os.path.join(diag_dir_path, f"{self.task_config.APREFIX}*stat"))
for diag_tar in gsi_diag_tars:
logger.info(f"Untarring {diag_tar}")
with tarfile.open(diag_tar, "r") as tar:
tar.extractall(path=diag_dir_path)
gsi_diags = glob.glob(os.path.join(diag_dir_path, "diag_*.nc4.gz"))
for diag in gsi_diags:
logger.info(f"Gunzipping {diag}")
output_file = diag.rstrip('.gz')
with gzip.open(diag, 'rb') as f_in:
with open(output_file, 'wb') as f_out:
f_out.write(f_in.read())
os.remove(diag)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self for next week: I'm thinking I should move _ges to one directory and _anl to another directory, run the proc_gsi_ncdiag code twice, once on each directory, and save two tarballs of the raw IODA files, and perhaps only run the stats code on one of them (perhaps ges only?)


# Convert GSI diag files to ioda files using gsincdiag2ioda converter scripts
gsid.proc_gsi_ncdiag(ObsDir=diag_ioda_dir_path, DiagDir=diag_dir_path)

# Tar up the ioda files
iodastatzipfile = os.path.join(self.task_config.DATA, 'atmos_gsi_ioda',
f"{self.task_config.APREFIX}atmos_gsi_ioda_diags.tgz")
logger.info(f"Compressing GSI IODA files to {iodastatzipfile}")
# get list of iodastat files to put in tarball
iodastatfiles = glob.glob(os.path.join(diag_ioda_dir_path, '*nc4'))
logger.info(f"Gathering {len(iodastatfiles)} GSI IODA files to {iodastatzipfile}")
with tarfile.open(iodastatzipfile, "w|gz") as archive:
for targetfile in iodastatfiles:
# gzip the file before adding to tar
with open(targetfile, 'rb') as f_in:
with gzip.open(f"{targetfile}.gz", 'wb') as f_out:
f_out.writelines(f_in)
os.remove(targetfile)
targetfile = f"{targetfile}.gz"
archive.add(targetfile, arcname=os.path.basename(targetfile))
logger.info(f"Finished compressing GSI IODA files to {iodastatzipfile}")
# copy to COMOUT
outdir = self.task_config.COMOUT_ATMOS_ANLMON
if not os.path.exists(outdir):
FileHandler({'mkdir': [outdir]}).sync()
dest = os.path.join(outdir, os.path.basename(iodastatzipfile))
logger.info(f"Copying {iodastatzipfile} to {dest}")
FileHandler({'copy_opt': [[iodastatzipfile, dest]]}).sync()
logger.info("Finished copying GSI IODA tar file to COMOUT")