diff --git a/.gitignore b/.gitignore index b5f61746e94..4e69c2fb365 100644 --- a/.gitignore +++ b/.gitignore @@ -174,7 +174,9 @@ ush/imsfv3_scf2ioda.py ush/atparse.bash ush/run_bufr2ioda.py ush/bufr2ioda_insitu* +ush/python/gsincdiag_to_ioda ush/python/ufsda +ush/python/pyiodaconv ush/python/soca ush/python/gen_bufr2ioda_json.py ush/python/gen_bufr2ioda_yaml.py diff --git a/dev/parm/config/gfs/config.base.j2 b/dev/parm/config/gfs/config.base.j2 index 21acc0ce02d..9e9230b8b93 100644 --- a/dev/parm/config/gfs/config.base.j2 +++ b/dev/parm/config/gfs/config.base.j2 @@ -85,7 +85,7 @@ export DO_GENESIS_FSU="{{ DO_GENESIS_FSU }}" # Cyclone genesis verification (FSU export DO_VERFOZN="YES" # Ozone data assimilation monitoring export DO_VERFRAD="YES" # Radiance data assimilation monitoring export DO_VMINMON="YES" # GSI minimization monitoring -export DO_ANLSTAT="NO" # JEDI-based analysis statistics +export DO_ANLSTAT="YES" # JEDI-based analysis statistics # Experiment mode (cycled or forecast-only) export MODE="{{ MODE }}" # cycled/forecast-only @@ -497,11 +497,6 @@ if [[ "${DO_JEDIATMVAR}" = "YES" ]]; then export DO_VERFOZN="NO" # Ozone data assimilation monitoring export DO_VERFRAD="NO" # Radiance data assimilation monitoring export DO_VMINMON="NO" # GSI minimization monitoring - export DO_ANLSTAT="YES" # JEDI-based analysis statistics -else - if [[ "${DO_AERO}" = "YES" || "${DO_JEDIOCNVAR}" = "YES" || "${DO_JEDISNOWDA}" = "YES" ]]; then - export DO_ANLSTAT="YES" # JEDI-based analysis statistics - fi fi # TODO: Enable METplus on Ursa when verif-global has been upgraded to spack-stack 1.9.x+ diff --git a/dev/workflow/applications/gfs_cycled.py b/dev/workflow/applications/gfs_cycled.py index 4af35004017..d964686ede5 100644 --- a/dev/workflow/applications/gfs_cycled.py +++ b/dev/workflow/applications/gfs_cycled.py @@ -318,12 +318,14 @@ def get_task_names(self): if options['do_verfrad']: task_names[run] += ['verfrad'] + # Only do analysis statistics for gdas cycles + if run == "gdas": + if options['do_anlstat']: + task_names[run] += ['anlstat'] + if options['do_vminmon']: task_names[run] += ['vminmon'] - if options['do_anlstat']: - task_names[run] += ['anlstat'] - # gfs-only verification/tracking if run == 'gfs': if options['do_tracker']: diff --git a/dev/workflow/rocoto/gfs_tasks.py b/dev/workflow/rocoto/gfs_tasks.py index d188eeaf342..9242a7bfa33 100644 --- a/dev/workflow/rocoto/gfs_tasks.py +++ b/dev/workflow/rocoto/gfs_tasks.py @@ -1891,6 +1891,9 @@ def anlstat(self): if self.options['do_jediatmvar']: dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlfinal'} deps.append(rocoto.add_dependency(dep_dict)) + else: + dep_dict = {'type': 'task', 'name': f'{self.run}_analdiag'} + deps.append(rocoto.add_dependency(dep_dict)) if self.options['do_jediocnvar']: dep_dict = {'type': 'task', 'name': f'{self.run}_marineanlfinal'} deps.append(rocoto.add_dependency(dep_dict)) @@ -1909,7 +1912,7 @@ def anlstat(self): 'resources': resources, 'dependency': dependencies, 'envars': self.envars, - 'cycledef': self.run, + 'cycledef': self.run.replace('enkf', ''), 'command': f'{self.HOMEgfs}/dev/jobs/anlstat.sh', 'job_name': f'{self.pslot}_{task_name}_@H', 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', @@ -2144,9 +2147,6 @@ def _arch_tars_deps(self): if self.options['do_vminmon']: dep_dict = {'type': 'task', 'name': f'{self.run}_vminmon'} deps.append(rocoto.add_dependency(dep_dict)) - if self.options['do_anlstat']: - dep_dict = {'type': 'task', 'name': f'{self.run}_anlstat'} - deps.append(rocoto.add_dependency(dep_dict)) elif self.run in ['gdas']: dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlprod'} deps.append(rocoto.add_dependency(dep_dict)) diff --git a/scripts/exglobal_analysis_stats.py b/scripts/exglobal_analysis_stats.py index 745f33efecc..8500bf98a00 100755 --- a/scripts/exglobal_analysis_stats.py +++ b/scripts/exglobal_analysis_stats.py @@ -31,6 +31,9 @@ AnlStats.task_config['STAT_ANALYSES'].append('snow') if AnlStats.task_config.DO_JEDIATMVAR: AnlStats.task_config['STAT_ANALYSES'].append('atmos') + else: + AnlStats.task_config['STAT_ANALYSES'].append('atmos_gsi') + AnlStats.convert_gsi_diags() # Initialize JEDI variational analysis AnlStats.initialize() diff --git a/sorc/gdas.cd b/sorc/gdas.cd index 6b00f289cf8..84143b13cfe 160000 --- a/sorc/gdas.cd +++ b/sorc/gdas.cd @@ -1 +1 @@ -Subproject commit 6b00f289cf8333133a55bb5ac534a0ddfed74980 +Subproject commit 84143b13cfe395aa96476b6622cc0f38756b1a95 diff --git a/sorc/link_workflow.sh b/sorc/link_workflow.sh index 109fc346810..1529cfedabd 100755 --- a/sorc/link_workflow.sh +++ b/sorc/link_workflow.sh @@ -264,6 +264,8 @@ if [[ -d "${HOMEgfs}/sorc/gdas.cd/build" ]]; then ${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/ush/ufsda" . ${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/ush/ioda/bufr2ioda/gen_bufr2ioda_json.py" . ${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/ush/ioda/bufr2ioda/gen_bufr2ioda_yaml.py" . + ${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/sorc/da-utils/ush/pyiodaconv" . + ${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/sorc/da-utils/ush/gsincdiag_to_ioda" . cd "${HOMEgfs}/ush" || exit 1 ${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/ush/ioda/bufr2ioda/run_bufr2ioda.py" . ${LINK_OR_COPY} "${HOMEgfs}/sorc/gdas.cd/build/bin/imsfv3_scf2ioda.py" . diff --git a/ush/python/pygfs/task/analysis_stats.py b/ush/python/pygfs/task/analysis_stats.py index 086f289d3a1..36b3e188999 100644 --- a/ush/python/pygfs/task/analysis_stats.py +++ b/ush/python/pygfs/task/analysis_stats.py @@ -2,6 +2,7 @@ import os import glob +import gsincdiag_to_ioda.proc_gsi_ncdiag as gsid import gzip import tarfile from logging import getLogger @@ -73,7 +74,7 @@ def initialize(self) -> None: # Create dictionary of Jedi objects # Expected keys are what must be included from the JEDI config file. We can # then loop through ob space list from scripts/exglobal_analysis_stats.py - expected_keys = ['aero', 'atmos', 'snow'] + expected_keys = ['aero', 'atmos', 'atmos_gsi', 'snow'] jedi_config_dict = parse_j2yaml(self.task_config.JEDI_CONFIG_YAML, self.task_config) self.jedi_dict = Jedi.get_jedi_dict(jedi_config_dict, self.task_config, expected_keys) @@ -173,7 +174,10 @@ def finalize(self, jedi_dict_key: str) -> None: for analysis_dict in analysis_config_dict[jedi_dict_key]['obs spaces']: statfile = os.path.join(self.task_config.DATA, analysis_dict['output file']) - outdir = self.task_config['COMOUT_' + jedi_dict_key.upper() + '_ANLMON'] + if jedi_dict_key == 'atmos_gsi': + outdir = self.task_config['COMOUT_ATMOS_ANLMON'] + else: + outdir = self.task_config['COMOUT_' + jedi_dict_key.upper() + '_ANLMON'] # Check if the directory exists; if not, create it if not os.path.exists(outdir): @@ -199,3 +203,99 @@ def finalize(self, jedi_dict_key: str) -> None: with tarfile.open(iodastatzipfile, "w|gz") as archive: for targetfile in iodastatfiles: archive.add(targetfile, arcname=os.path.basename(targetfile)) + + @logit(logger) + def convert_gsi_diags(self) -> None: + """Convert GSI diag files to ioda-stat files for analysis stats + + This method will convert GSI diag files to ioda-stat files for analysis stats. + This includes: + - copying GSI diag files to DATA path + - untarring and gunzipping GSI diag files + - converting GSI diag files to ioda files using gsincdiag2ioda converter scripts + + Parameters + ---------- + None + + Returns + ---------- + None + """ + logger.info("Converting GSI diag files to IODA files for analysis stats") + # copy GSI diag files to DATA path + diag_tars = ['cnvstat', 'radstat', 'oznstat'] + diag_dir_ges_path = os.path.join(self.task_config.DATA, 'atmos_gsi_ges') + diag_dir_anl_path = os.path.join(self.task_config.DATA, 'atmos_gsi_anl') + diag_dir_path = os.path.join(self.task_config.DATA, 'atmos_gsi_diags') + FileHandler({'mkdir': [diag_dir_ges_path, diag_dir_anl_path]}).sync() + diag_ioda_dir_ges_path = os.path.join(self.task_config.DATA, 'atmos_gsi_ioda_ges') + diag_ioda_dir_anl_path = os.path.join(self.task_config.DATA, 'atmos_gsi_ioda_anl') + FileHandler({'mkdir': [diag_ioda_dir_ges_path, diag_ioda_dir_anl_path]}).sync() + diag_tar_copy_list = [] + for diag in diag_tars: + input_tar_basename = f"{self.task_config.APREFIX}{diag}" + input_tar = os.path.join(self.task_config.COMIN_ATMOS_ANALYSIS, + input_tar_basename) + dest = os.path.join(diag_dir_path, input_tar_basename) + if os.path.exists(input_tar): + diag_tar_copy_list.append([input_tar, dest]) + FileHandler({'copy_opt': diag_tar_copy_list}).sync() + + # Untar and gunzip diag files + gsi_diag_tars = glob.glob(os.path.join(diag_dir_path, f"{self.task_config.APREFIX}*stat")) + for diag_tar in gsi_diag_tars: + logger.info(f"Untarring {diag_tar}") + with tarfile.open(diag_tar, "r") as tar: + tar.extractall(path=diag_dir_path) + gsi_diags = glob.glob(os.path.join(diag_dir_path, "diag_*.nc4.gz")) + for diag in gsi_diags: + logger.info(f"Gunzipping {diag}") + output_file = diag.rstrip('.gz') + with gzip.open(diag, 'rb') as f_in: + with open(output_file, 'wb') as f_out: + f_out.write(f_in.read()) + os.remove(diag) + + # Copy diag files to ges or anl directory + anl_diags = glob.glob(os.path.join(diag_dir_path, "diag_*_anl*.nc4")) + ges_diags = glob.glob(os.path.join(diag_dir_path, "diag_*_ges*.nc4")) + copy_anl_diags = [] + for diag in anl_diags: + copy_anl_diags.append([diag, os.path.join(diag_dir_anl_path, os.path.basename(diag))]) + FileHandler({'copy_opt': copy_anl_diags}).sync() + copy_ges_diags = [] + for diag in ges_diags: + copy_ges_diags.append([diag, os.path.join(diag_dir_ges_path, os.path.basename(diag))]) + FileHandler({'copy_opt': copy_ges_diags}).sync() + + # Convert GSI diag files to ioda files using gsincdiag2ioda converter scripts + gsid.proc_gsi_ncdiag(ObsDir=diag_ioda_dir_ges_path, DiagDir=diag_dir_ges_path) + gsid.proc_gsi_ncdiag(ObsDir=diag_ioda_dir_anl_path, DiagDir=diag_dir_anl_path) + + # Tar up the ioda files + iodastatzipfile = os.path.join(self.task_config.DATA, 'atmos_gsi_diags', + f"{self.task_config.APREFIX}atmos_gsi_ioda_diags.tgz") + logger.info(f"Compressing GSI IODA files to {iodastatzipfile}") + # get list of iodastat files to put in tarball + iodastatfiles = glob.glob(os.path.join(diag_ioda_dir_ges_path, '*nc4')) + \ + glob.glob(os.path.join(diag_ioda_dir_anl_path, '*nc4')) + logger.info(f"Gathering {len(iodastatfiles)} GSI IODA files to {iodastatzipfile}") + with tarfile.open(iodastatzipfile, "w|gz") as archive: + for targetfile in iodastatfiles: + # gzip the file before adding to tar + with open(targetfile, 'rb') as f_in: + with gzip.open(f"{targetfile}.gz", 'wb') as f_out: + f_out.writelines(f_in) + os.remove(targetfile) + targetfile = f"{targetfile}.gz" + archive.add(targetfile, arcname=os.path.basename(targetfile)) + logger.info(f"Finished compressing GSI IODA files to {iodastatzipfile}") + # copy to COMOUT + outdir = self.task_config.COMOUT_ATMOS_ANLMON + if not os.path.exists(outdir): + FileHandler({'mkdir': [outdir]}).sync() + dest = os.path.join(outdir, os.path.basename(iodastatzipfile)) + logger.info(f"Copying {iodastatzipfile} to {dest}") + FileHandler({'copy_opt': [[iodastatzipfile, dest]]}).sync() + logger.info("Finished copying GSI IODA tar file to COMOUT")