diff --git a/ci/scripts/utils/publish_logs.py b/ci/scripts/utils/publish_logs.py index 283c84a8d1..fcd8103438 100755 --- a/ci/scripts/utils/publish_logs.py +++ b/ci/scripts/utils/publish_logs.py @@ -91,7 +91,7 @@ def upload_logs_to_repo(args, emcbot_gh, emcbot_ci_url): file_path_in_repo = f"{repo_path}/{path_header}/" + str(os.path.basename(file.name)) emcbot_gh.repo.create_file(file_path_in_repo, "Adding error log file", file_content, branch="error_logs") - file_url = f"{emcbot_ci_url.rsplit('.',1)[0]}/tree/{repo_branch}/{repo_path}/{path_header}" + file_url = f"{emcbot_ci_url.rsplit('.', 1)[0]}/tree/{repo_branch}/{repo_path}/{path_header}" print(file_url) diff --git a/jobs/JGLOBAL_PREP_EMISSIONS b/jobs/JGLOBAL_PREP_EMISSIONS index 84edac8e50..0d7ce78ac5 100755 --- a/jobs/JGLOBAL_PREP_EMISSIONS +++ b/jobs/JGLOBAL_PREP_EMISSIONS @@ -13,6 +13,8 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "prep_emissions" -c "base prep_emissio ############################################## # Generate COM variables from templates # TODO: Add necessary COMIN, COMOUT variables for this job +YMD="${PDY}" HH="${cyc}" declare_from_tmpl -rx \ + COMOUT_CHEM_HISTORY:COM_CHEM_HISTORY_TMPL ############################################################### # Run relevant script @@ -32,4 +34,10 @@ if [[ -e "${pgmout}" ]] ; then cat "${pgmout}" fi +########################################## +# Remove the Temporary working directory +########################################## +cd "${DATAROOT}" || exit 1 +[[ ${KEEPDATA} = "NO" ]] && rm -rf "${DATA}" + exit 0 diff --git a/parm/config/gefs/config.prep_emissions b/parm/config/gefs/config.prep_emissions index fa411c27ad..c7ba584b79 100644 --- a/parm/config/gefs/config.prep_emissions +++ b/parm/config/gefs/config.prep_emissions @@ -2,10 +2,37 @@ ########## config.prep_emissions ########## # aerosol emissions preprocessing specific +# TODO: this is duplicated in the config.aero file. Should use a common function +case ${machine} in + "HERA") + AERO_INPUTS_DIR="/scratch1/NCEPDEV/global/glopara/data/gocart_emissions" + ;; + "ORION" | "HERCULES") + AERO_INPUTS_DIR="/work2/noaa/global/wkolczyn/noscrub/global-workflow/gocart_emissions" + ;; + "S4") + AERO_INPUTS_DIR="/data/prod/glopara/gocart_emissions" + ;; + "WCOSS2") + AERO_INPUTS_DIR="/lfs/h2/emc/global/noscrub/emc.global/data/gocart_emissions" + ;; + "GAEA") + AERO_INPUTS_DIR="/gpfs/f5/epic/proj-shared/global/glopara/data/gocart_emissions" + ;; + "JET") + AERO_INPUTS_DIR="/lfs4/HFIP/hfv3gfs/glopara/data/gocart_emissions" + ;; + *) + echo "FATAL ERROR: Machine ${machine} unsupported for aerosols" + exit 2 + ;; +esac +export AERO_INPUTS_DIR echo "BEGIN: config.prep_emissions" # Get task specific resources +export PREP_EMISSION_CONFIG="${PARMgfs}/prep/aero_emissions.yaml" source "${EXPDIR}/config.resources" prep_emissions echo "END: config.prep_emissions" diff --git a/parm/config/gefs/config.resources b/parm/config/gefs/config.resources index 68f81c1039..f8f871e96f 100644 --- a/parm/config/gefs/config.resources +++ b/parm/config/gefs/config.resources @@ -82,7 +82,7 @@ case ${step} in export ntasks=1 export threads_per_task=1 export tasks_per_node=$(( max_tasks_per_node / threads_per_task )) - export memory="1GB" + export memory="20GB" ;; "fcst" | "efcs") diff --git a/parm/config/gfs/config.aero b/parm/config/gfs/config.aero index bbfb782636..4221630984 100644 --- a/parm/config/gfs/config.aero +++ b/parm/config/gfs/config.aero @@ -7,7 +7,7 @@ export AERO_INPUTS_DIR=@AERO_INPUTS_DIR@ export AERO_DIAG_TABLE="${PARMgfs}/ufs/fv3/diag_table.aero" export AERO_FIELD_TABLE="${PARMgfs}/ufs/fv3/field_table.aero" -# Biomass burning emission dataset. Choose from: gbbepx, qfed, none +# Biomass burning emission dataset. Choose from: gbbepx, qfed, hfed, blended, none export AERO_EMIS_FIRE="qfed" # Directory containing GOCART configuration files export AERO_CONFIG_DIR="${PARMgfs}/ufs/gocart" diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index eeb33716c0..74eb671184 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -11,7 +11,7 @@ if (( $# != 1 )); then echo "Must specify an input task argument to set resource variables!" echo "argument can be any one of the following:" - echo "stage_ic aerosol_init" + echo "stage_ic aerosol_init prep_emissions" echo "prep prepatmiodaobs" echo "atmanlinit atmanlvar atmanlfv3inc atmanlfinal" echo "atmensanlinit atmensanlobs atmensanlsol atmensanlletkf atmensanlfv3inc atmensanlfinal" diff --git a/parm/prep/aero_emissions.yaml b/parm/prep/aero_emissions.yaml new file mode 100644 index 0000000000..98585c58bd --- /dev/null +++ b/parm/prep/aero_emissions.yaml @@ -0,0 +1,48 @@ +aero_emissions: + config: + debug: False + ratio: 0.95 # weighting ratio for obs + emistype: 'QFED' # emission type {'QFED', 'GBBEPx', 'HFED'} + climfile_str: 'GBBEPx-all01GRID_v4r0_climMean' # climate file base string used for glob later + gbbepx_version: 'v4r0' # GBBEPx version + qfed_version: '006' # QFED version + species: ['so2', 'oc', 'bc'] # species to be used + historical: False # set to true to just use true data for the given day + coarsen_scale: 150 # scale for coarsen function to generate weights + output_var_map: # for GBBEPx + 'bc': 'BC' + 'ch4': 'CH4' + 'co': 'CO' + 'co2': 'CO2' + 'nh3': 'NH3' + 'no': 'NOx' + 'oc': 'OC' + 'pm25': 'PM2.5' + 'so2': 'SO2' + n_persist: 5 + data_in: + mkdir: + - "{{ DATA }}" + - "{{ COMOUT_CHEM_HISTORY}}" + qfed: # copy qfed + copy: + - ["{{ AERO_INPUTS_DIR }}/nexus/QFED/{{ current_cycle | strftime('%Y/%m') }}/qfed2.emis_oc.006.{{ current_cycle | to_YMD }}.nc4", "{{ DATA }}/"] + - ["{{ AERO_INPUTS_DIR }}/nexus/QFED/{{ current_cycle | strftime('%Y/%m') }}/qfed2.emis_so2.006.{{ current_cycle | to_YMD }}.nc4", "{{ DATA }}/"] + - ["{{ AERO_INPUTS_DIR }}/nexus/QFED/{{ current_cycle | strftime('%Y/%m') }}/qfed2.emis_bc.006.{{ current_cycle | to_YMD }}.nc4", "{{ DATA }}/"] + hfed: # copy hfed + copy: + - ["{{ AERO_INPUTS_DIR }}/nexus/HFED/{{ current_cycle | strftime('%Y/M%m') }}/hfed.emis_oc.x576_y361.{{ current_cycle | strftime('%Y%j') }}.nc4", "{{ DATA }}/"] + - ["{{ AERO_INPUTS_DIR }}/nexus/HFED/{{ current_cycle | strftime('%Y/M%m') }}/hfed.emis_bc.x576_y361.{{ current_cycle | strftime('%Y%j') }}.nc4", "{{ DATA }}/"] + - ["{{ AERO_INPUTS_DIR }}/nexus/HFED/{{ current_cycle | strftime('%Y/M%m') }}/hfed.emis_so2.x576_y361.{{ current_cycle | strftime('%Y%j') }}.nc4", "{{ DATA }}/"] + gbbepx: + copy: + - ["{{ AERO_INPUTS_DIR }}/nexus/GBBEPx/v4/GBBEPx_all01GRID_v4r0_climMean_{{ current_cycle | strftime('%m%d') }}.nc", "{{ DATA }}/"] # copy climo files + climo: # copy climatology + copy: + - ["{{ AERO_INPUTS_DIR }}/nexus/GBBEPx/v4/climMean/GBBEPx-all01GRID_v4r0_climMean_{{ current_cycle | strftime('%m%d') }}.nc", "{{ DATA }}/"] # copy climo files + {% for fdate in forecast_dates %} + - ["{{ AERO_INPUTS_DIR }}/nexus/GBBEPx/v4/climMean/GBBEPx-all01GRID_v4r0_climMean_{{ fdate | strftime('%m%d') }}.nc", "{{ DATA }}/"] # copy climo files + {% endfor %} + data_out: + copy: + - ["{{ DATA }}/{{ RUN }}_blended_emissions.{{ current_cycle | to_YMD }}.nc", "{{ COMOUT_CHEM_HISTORY }}/{{ RUN }}.{{ current_cycle | to_YMD }}.{{ RUN }}_blended_emissions.nc"] \ No newline at end of file diff --git a/parm/ufs/gocart/ExtData.blended b/parm/ufs/gocart/ExtData.blended new file mode 100644 index 0000000000..89561cbd29 --- /dev/null +++ b/parm/ufs/gocart/ExtData.blended @@ -0,0 +1,8 @@ +#====== BIOMASS BURNING EMISSIONS ======================================= + +# Blended Emissions +#-------------------------------------------------------------------------------------------------------------------------------- +SU_BIOMASS NA N Y %y4-%m2-%d2t12:00:00 none 0.7778 SO2 gefs.%y4%m2%d2.gefs_blended_emissions.nc +OC_BIOMASS NA N Y %y4-%m2-%d2t12:00:00 none 0.7778 OC gefs.%y4%m2%d2.gefs_blended_emissions.nc +BC_BIOMASS NA N Y %y4-%m2-%d2t12:00:00 none 0.7778 BC gefs.%y4%m2%d2.gefs_blended_emissions.nc +# EMI_NH3_BB NA N Y %y4-%m2-%d2t12:00:00 none 0.7778 NH3 gefs.%y4%m2%d2.gefs_blended_emissions.nc \ No newline at end of file diff --git a/parm/ufs/gocart/ExtData.hfed b/parm/ufs/gocart/ExtData.hfed new file mode 100644 index 0000000000..4be4f1bd01 --- /dev/null +++ b/parm/ufs/gocart/ExtData.hfed @@ -0,0 +1,8 @@ +#====== BIOMASS BURNING EMISSIONS ======================================= + +# HFED +#-------------------------------------------------------------------------------------------------------------------------------- +SU_BIOMASS NA N Y %y4-%m2-%d2t12:00:00 none 0.7778 biomass ExtData/nexus/HFED/Y1994/M%m2/hfed.emis_so2.x576_y361.%y4%m2.nc4 +OC_BIOMASS NA N Y %y4-%m2-%d2t12:00:00 none 0.7778 biomass ExtData/nexus/HFED/Y1994/M%m2/hfed.emis_oc.x576_y361.%y4%m2.nc4 +BC_BIOMASS NA N Y %y4-%m2-%d2t12:00:00 none 0.7778 biomass ExtData/nexus/HFED/Y1994/M%m2/hfed.emis_bc.x576_y361.%y4%m2.nc4 +# EMI_NH3_BB NA N Y %y4-%m2-%d2t12:00:00 none 0.7778 biomass ExtData/nexus/HFED/Y1994/M%m2/hfed.emis_nh3.x576_y361.%y4%m2.nc4 \ No newline at end of file diff --git a/scripts/exglobal_prep_emissions.py b/scripts/exglobal_prep_emissions.py index ef0e709142..58c9f79179 100755 --- a/scripts/exglobal_prep_emissions.py +++ b/scripts/exglobal_prep_emissions.py @@ -1,10 +1,20 @@ #!/usr/bin/env python3 -# exglobal_prep_emissions.py -# This script creates a emissions object -# which perform the pre-processing for aerosol emissions +""" +This script initializes a logger, reads configuration from the environment, and performs emissions pre-processing tasks using the AerosolEmissions class. + +The script does the following: +1. Initializes a root logger with the specified logging level and colored log output. +2. Reads configuration from the environment and converts it into a Python dictionary. +3. Instantiates an AerosolEmissions object with the configuration. +4. Retrieves specific keys from the emissions task configuration and stores them in a dictionary. +5. Sets the 'emistype' attribute in the configuration dictionary based on the 'emistype' value in the emissions configuration. +6. Initializes, configures, runs, and finalizes the emissions task using the provided parameters. + +Note: Make sure to have the necessary dependencies (wxflow, pygfs) installed to run this script successfully. +""" import os -from wxflow import Logger, cast_strdict_as_dtypedict +from wxflow import Logger, AttrDict, cast_strdict_as_dtypedict from pygfs import AerosolEmissions @@ -19,7 +29,7 @@ # Instantiate the emissions pre-processing task emissions = AerosolEmissions(config) + emissions.initialize() - emissions.configure() - emissions.execute(emissions.task_config.DATA, emissions.task_config.APRUN) + emissions.run() emissions.finalize() diff --git a/scripts/exglobal_stage_ic.py b/scripts/exglobal_stage_ic.py index bf4217f45f..9e4307a5e1 100755 --- a/scripts/exglobal_stage_ic.py +++ b/scripts/exglobal_stage_ic.py @@ -30,7 +30,7 @@ def main(): for key in keys: # Make sure OCNRES is three digits if key == "OCNRES": - stage.task_config.OCNRES = f"{stage.task_config.OCNRES :03d}" + stage.task_config.OCNRES = f"{stage.task_config.OCNRES:03d}" stage_dict[key] = stage.task_config[key] # Also import all COM* directory and template variables diff --git a/ush/compare_f90nml.py b/ush/compare_f90nml.py index f3c5573a92..8ecb06b6de 100755 --- a/ush/compare_f90nml.py +++ b/ush/compare_f90nml.py @@ -78,7 +78,7 @@ def _print_diffs(diff_dict: Dict) -> None: for kk in diff_dict[path].keys(): items = diff_dict[path][kk] print( - f"{kk:>{max_len+2}} : {' | '.join(map(str, diff_dict[path][kk]))}") + f"{kk:>{max_len + 2}} : {' | '.join(map(str, diff_dict[path][kk]))}") _print_diffs(result) diff --git a/ush/forecast_postdet.sh b/ush/forecast_postdet.sh index 25cd4d36f0..1c91974e18 100755 --- a/ush/forecast_postdet.sh +++ b/ush/forecast_postdet.sh @@ -682,6 +682,11 @@ GOCART_rc() { [[ ${status} -ne 0 ]] && exit "${status}" fi + # Link blended emissions if AERO_EMIS_FIRE is 'blended' + if [[ "${AERO_EMIS_FIRE}" == "blended" && "${RUN}" == "gefs" ]]; then + ${NCP} "${COMOUT_CHEM_HISTORY}/${RUN}.${current_cycle:0:8}.${RUN}_blended_emissions.nc" "${DATA}" + fi + source "${USHgfs}/parsing_namelists_GOCART.sh" GOCART_namelists } diff --git a/ush/python/pygfs/task/aero_emissions.py b/ush/python/pygfs/task/aero_emissions.py index 5f2d4c6840..6856030180 100644 --- a/ush/python/pygfs/task/aero_emissions.py +++ b/ush/python/pygfs/task/aero_emissions.py @@ -2,29 +2,30 @@ import os from logging import getLogger -from typing import Dict, Any, Union from pprint import pformat +from typing import Any, Dict, List -from wxflow import (AttrDict, - parse_j2yaml, - FileHandler, - Jinja, - logit, - Task, - add_to_datetime, to_timedelta, - WorkflowException, - Executable, which) +import xarray as xr +from dateutil.rrule import DAILY, rrule +from wxflow import ( + AttrDict, + FileHandler, + Task, + logit, + parse_j2yaml, + to_timedelta, +) -logger = getLogger(__name__.split('.')[-1]) +logger = getLogger(__name__.split(".")[-1]) class AerosolEmissions(Task): - """Aerosol Emissions pre-processing Task - """ + """Aerosol Emissions pre-processing Task""" @logit(logger, name="AerosolEmissions") def __init__(self, config: Dict[str, Any]) -> None: - """Constructor for the Aerosol Emissions task + """ + Constructor for the Aerosol Emissions task Parameters ---------- @@ -37,48 +38,363 @@ def __init__(self, config: Dict[str, Any]) -> None: """ super().__init__(config) - local_variable = "something" + nforecast_hours = self.task_config["FHMAX_GFS"] + blend_start_date = self.task_config["PDY"] + blend_end_date = blend_start_date + to_timedelta(f'{nforecast_hours + 24}H') + forecast_dates = list(rrule(freq=DAILY, dtstart=blend_start_date, until=blend_end_date)) + + # add forecast_dates to the task_config for parsing yaml file + localdict = AttrDict({"forecast_dates": forecast_dates}) + self.task_config = AttrDict(**self.task_config, **localdict) + + # populate yaml file and add to task_config + logger.info(f"Read the prep_emission configuration yaml file {self.task_config.PREP_EMISSION_CONFIG}") + self.task_config.aero_emission_yaml = parse_j2yaml(self.task_config.PREP_EMISSION_CONFIG, self.task_config) + logger.debug(f"aero_emission_yaml:\n{pformat(self.task_config.aero_emission_yaml)}") + + config = self.task_config.aero_emission_yaml['aero_emissions']['config'] + emission_files = {} + for emission_type in ['qfed', 'hfed', 'gbbepx', 'climo']: + emission_files[emission_type] = [ + os.path.basename(src) + for src, _ in config['data_in'][emission_type]['copy'] + ] + n_persist = config['n_persist'] localdict = AttrDict( - {'variable_used_repeatedly': local_variable} + { + "cdate": blend_start_date, + "nforecast_days": nforecast_hours // 24, + "workdir": self.task_config.DATA, + "current_date": self.task_config.PDY, + 'config': config, + 'emistype': config['emistype'], + 'emisfiles': emission_files, + 'n_persist': n_persist + } ) + # TODO: if AERO_EMIS_FIRE is not 'blended' we don't want to do anything! + # Extend task_config with localdict self.task_config = AttrDict(**self.task_config, **localdict) + @logit(logger) + def initialize(self) -> None: + """ + Initialize the work directory by copying all the common fix data + + Parameters + ---------- + None + + Returns + ------- + None + """ + logger.info("Copy Static Data to run directory") + + data_in = self.task_config.config.data_in + emistype = self.task_config.emistype + + # Copy climatology files to run directory except for HFED + # HFED is already smoothed, monthly data + # QFED or GBBBEPx will be blended with climatology + if emistype.lower() != 'hfed': + logger.info("Copy climatology data to run directory") + FileHandler(data_in.climo).sync() + logger.info(f"Copy {emistype} data to run directory") + FileHandler(data_in[emistype.lower()]).sync() + + @logit(logger) + def run(self) -> None: + """ + Run the AerosolEmissions task with the given parameters. + + Parameters + ---------- + None + + Returns + ------- + None + """ + config_dict = self.task_config['config'] + emistype = self.task_config['emistype'] + ratio = config_dict['ratio'] + climfiles = self.task_config['emisfiles']['climo'] + coarsen_scale = config_dict['coarsen_scale'] + out_var_dict = config_dict['output_var_map'] + n_persist = config_dict['n_persist'] + + try: + basefile = self.task_config['emisfiles'][emistype.lower()] + except KeyError as e: + logger.exception(f"FATAL ERROR: {emistype.lower()} is not a supported emission type") + raise Exception( + f"FATAL ERROR: {emistype.lower()} is not a supported emission type, ABORT!" + ) from e + + if emistype.lower() != 'hfed': + dset = AerosolEmissions.make_fire_emission( + climos=climfiles, + ratio=ratio, + scale_climo=True, + coarsen_scale=coarsen_scale, + obsfile=basefile, + out_var_dict=out_var_dict, + n_persist=n_persist) + + AerosolEmissions.write_ncf(dset, config_dict.data_out['copy'][0][0]) + @staticmethod @logit(logger) - def initialize() -> None: - """Initialize the work directory + def open_qfed(files: List[str], out_var_dict: Dict[str, str] = None) -> xr.Dataset: + """ + Open QFED2 fire emissions data combining files into one Dataset + and renaming variables to standard (GBBEPx) names. + + Parameters + ---------- + files : list + Paths to the QFED2 fire emissions files + out_var_dict : dict + Mapping of input variable name to desired (output) variable name. + + Returns + ------- + xr.Dataset + Dataset containing the fire emissions data """ + if out_var_dict is None: + logger.info("No output variable mapping provided") + raise Exception("FATAL ERROR: No output variable mapping provided") + + if len(files) == 0: + logger.info("No files provided") + raise Exception("FATAL ERROR: Received empty list of QFED files") + + found_species = [] + dset_dict = {} + for filepath in sorted(files): + logger.info(f"Opening QFED file: {filepath}") + _, input_var = os.path.basename(filepath).split(".")[1].split("_") + found_species.append(input_var) + try: + with xr.open_dataset(filepath, decode_cf=False).biomass as da: + da.name = out_var_dict[input_var] + dset_dict[da.name] = da + except Exception as e: + logger.exception(f"FATAL ERROR: Unable to read dataset: {filepath}") + raise Exception("FATAL ERROR: Unable to read dataset, ABORT!") from e + + dset = xr.Dataset(dset_dict) + + return dset @staticmethod @logit(logger) - def configure() -> None: - """Configure the artifacts in the work directory. - Copy run specific data to run directory + def open_climatology(files: List[str]) -> xr.Dataset: + """ + Open climatology files and concatenate them along the time dimension. + + Parameters + ---------- + files : list + Paths to the climatology files. + + Returns + ------- + xr.Dataset + Concatenated dataset containing the climatology data. """ + das = [] + + logger.info("Process Climatology Files") + for filepath in sorted(files): + logger.info(f"Opening Climatology File: {filepath}") + try: + with xr.open_dataset(filepath, engine="netcdf4") as da: + das.append(da) + except Exception as e: + logger.exception(f"FATAL ERROR: Encountered an error reading climatology file: {filepath}") + raise Exception("FATAL ERROR: Unable to read file, ABORT!") from e + + return xr.concat(das, dim="time") @staticmethod @logit(logger) - def execute(workdir: Union[str, os.PathLike], aprun_cmd: str) -> None: - """Run the executable (if any) + def write_ncf(dset: xr.Dataset, outfile: str) -> None: + """ + Write the given dataset to a NetCDF file with specified encoding. Parameters ---------- - workdir : str | os.PathLike - work directory with the staged data, parm files, namelists, etc. - aprun_cmd : str - launcher command for executable.x + dset : xarray.Dataset + The dataset to be written to the NetCDF file. + outfile : str + The path and filename of the output NetCDF file. Returns ------- None """ + encoding = {} + for v in dset.data_vars: + encoding[v] = dict(zlib=True, complevel=4) + if "latitude" in dset: + encoding["latitude"] = dict(zlib=True, complevel=4) + encoding["longitude"] = dict(zlib=True, complevel=4) + if "lat_b" in dset: + encoding["lat_b"] = dict(zlib=True, complevel=4) + encoding["lon_b"] = dict(zlib=True, complevel=4) + if "time" in dset: + encoding["time"] = dict(dtype="i4") + try: + dset.load().to_netcdf(outfile, encoding=encoding) + except Exception as e: + logger.exception("FATAL ERROR: Encountered an error writing dataset") + raise Exception("FATAL ERROR: Unable to write dataset, ABORT!") from e + + @staticmethod + @logit(logger) + def create_climatology( + emissions: xr.DataArray, climatology: xr.DataArray, lat_coarse: int = 50, lon_coarse: int = 50 + ) -> xr.Dataset: + """ + Create scaled daily climatology data based on observed emission data. + + Parameters + ---------- + emissions : xarray.DataArray + Emission data. Just one time step. Same grid as the climatology. + climatology : xarray.Dataset + Input climatology data. Multiple days of daily data. + lat_coarse : int, optional + Coarsening factor for latitude. Defaults to 50 (0.1 deg -> 5 deg). + lon_coarse : int, optional + Coarsening factor for longitude. Defaults to 50 (0.1 deg -> 5 deg). + + Returns + ------- + xarray.Dataset + Scaled climatology data. + """ + # Create a copy of the climatology + clim = climatology.copy() + + # We coarsen for regional scaling, to avoid small differences in fire locations + coarsen_kws = dict(lat=lat_coarse, lon=lon_coarse, boundary="trim") + clim_coarse = climatology.coarsen(**coarsen_kws).sum() + obs_coarse = emissions.squeeze().coarsen(**coarsen_kws).sum() + + # Calculate the coarse ratio of emissions to climatology + # Where climatology is not positive, the ratio will be 0 + ratio_coarse = (obs_coarse.data / clim_coarse.where(clim_coarse > 0)).fillna(0) + + # Interpolate (uncoarsen) the ratio to match the coordinates of the climatology + # (this should be the same grid as the QFED/GBBEPx emissions) + ratio = ratio_coarse.sel(lat=clim.lat, lon=clim.lon, method="nearest") + + # Scale data + clim.data = clim.data * ratio.data + + return clim.compute() @staticmethod @logit(logger) - def finalize() -> None: - """Perform closing actions of the task. + def make_fire_emission( + climos: List[str], + ratio: float, + scale_climo: bool, + coarsen_scale: int, + obsfile: str, + out_var_dict: Dict[str, str], + n_persist: int, + ) -> xr.Dataset: + """ + Generate fire emissions data for a given date and forecast period. + + Parameters + ---------- + climos : list + List of pre-calculated climatology data files for scaling. + ratio : float + The ratio of original data to climatology data for blending. + scale_climo : bool + Flag indicating whether to scale the climatology data. + n_forecast_days : int + Number of forecast days. + obsfile : str + Path to the file containing observed fire emissions data. + climo_directory : str + Directory containing climatology files. + n_persist : int + Assumed number of days that are able to be persistent fire emissions + + Returns + ------- + xr.Dataset + xarray Dataset object representing fire emissions data for each forecast day. + """ + # Open fire emissions + if isinstance(obsfile, (str, bytes)): + obsfile = [obsfile] + if "qfed" in obsfile[0].lower(): + obs = AerosolEmissions.open_qfed(obsfile, out_var_dict=out_var_dict) + else: + # GBBEPx, already combined and with correct names + obs = xr.open_mfdataset(obsfile, decode_cf=False) + + # Open climatology + climo = AerosolEmissions.open_climatology(climos) + climo = climo.sel(lat=obs["lat"], lon=obs["lon"], method="nearest") + + # Blend + dsets = [] + climo_scaled = {} + for tslice in range(len(climos)): + if tslice == 0: + dset = obs.copy() + else: + dset = dsets[tslice - 1].copy() + dset.update({"time": [float(tslice * 24)]}) + dset.time.attrs = obs.time.attrs + + for v in obs.data_vars: + if not scale_climo: + if tslice > n_persist: + dset[v].data = ( + ratio * dset[v] + (1 - ratio) * climo[v].data[tslice, :, :] + ) + else: + if tslice == 0: + climo_scaled[v] = AerosolEmissions.create_climatology( + obs[v], climo[v], lon_coarse=coarsen_scale, lat_coarse=coarsen_scale + ) + else: + if tslice > n_persist: + dset[v].data = ( + ratio * dset[v] + (1 - ratio) * climo_scaled[v].data[tslice, :, :] + ) + + dsets.append(dset) + + return xr.concat(dsets, dim="time") + + @logit(logger) + def finalize(self) -> None: + """ + Perform closing actions of the task. Copy data back from the DATA/ directory to COM/ + + Parameters + ---------- + None + + Returns + ------- + None """ + logger.info(f"Copy '{self.task_config.config.data_out}' processed data to COM/ directory") + FileHandler(self.task_config.config.data_out).sync() diff --git a/ush/python/pygfs/task/analysis.py b/ush/python/pygfs/task/analysis.py index 1d8b38483b..48cd8eaac5 100644 --- a/ush/python/pygfs/task/analysis.py +++ b/ush/python/pygfs/task/analysis.py @@ -29,7 +29,7 @@ def __init__(self, config: Dict[str, Any]) -> None: # Store location of GDASApp jinja2 templates self.gdasapp_j2tmpl_dir = os.path.join(self.task_config.PARMgfs, 'gdas') # fix ocnres - self.task_config.OCNRES = f"{self.task_config.OCNRES :03d}" + self.task_config.OCNRES = f"{self.task_config.OCNRES:03d}" def initialize(self) -> None: super().initialize() diff --git a/ush/python/pygfs/task/oceanice_products.py b/ush/python/pygfs/task/oceanice_products.py index 39ec53b100..18c4438df9 100644 --- a/ush/python/pygfs/task/oceanice_products.py +++ b/ush/python/pygfs/task/oceanice_products.py @@ -66,7 +66,7 @@ def __init__(self, config: Dict[str, Any]) -> None: # TODO: This is a bit of a hack, but it works for now # FIXME: find a better way to provide the averaging period - avg_period = f"{forecast_hour-interval:03d}-{forecast_hour:03d}" + avg_period = f"{forecast_hour - interval:03d}-{forecast_hour:03d}" # Extend task_config with localdict localdict = AttrDict( diff --git a/workflow/ecFlow/ecflow_definitions.py b/workflow/ecFlow/ecflow_definitions.py index 0aea65710c..d3b34c530c 100644 --- a/workflow/ecFlow/ecflow_definitions.py +++ b/workflow/ecFlow/ecflow_definitions.py @@ -1896,7 +1896,7 @@ def generate_folders(self, ecfhome, suite, parents): None """ if parents: - folder_path = f"{ecfhome}/{suite}/{parents.replace('>','/')}/{self.name()}" + folder_path = f"{ecfhome}/{suite}/{parents.replace('>', '/')}/{self.name()}" else: folder_path = f"{ecfhome}/{suite}/{self.name()}" if not os.path.exists(folder_path): @@ -1974,7 +1974,7 @@ def generate_ecflow_task(self, ecfhome, suite, parents): search_script = f"{self.template}.ecf" if self.template is not \ None else script_name if parents: - script_path = f"{ecfhome}/{suite}/{parents.replace('>','/')}/{script_name}" + script_path = f"{ecfhome}/{suite}/{parents.replace('>', '/')}/{script_name}" else: script_path = f"{ecfhome}/{suite}/{script_name}" for root, dirs, files in os.walk(self.scriptrepo): diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index d2a3e43719..bc83d18c38 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -1856,11 +1856,11 @@ def metp(self): dep_dict = {'type': 'taskvalid', 'name': f'{self.run}_arch', 'condition': 'not'} deps2.append(rocoto.add_dependency(dep_dict)) for lookback2 in range(1, lookback): - offset = timedelta_to_HMS(-to_timedelta(f'{6*lookback2}H')) + offset = timedelta_to_HMS(-to_timedelta(f'{6 * lookback2}H')) dep_dict = {'type': 'cycleexist', 'condition': 'not', 'offset': offset} deps2.append(rocoto.add_dependency(dep_dict)) - offset = timedelta_to_HMS(-to_timedelta(f'{6*lookback}H')) + offset = timedelta_to_HMS(-to_timedelta(f'{6 * lookback}H')) dep_dict = {'type': 'task', 'name': f'{self.run}_arch', 'offset': offset} deps2.append(rocoto.add_dependency(dep_dict)) deps.append(rocoto.create_dependency(dep_condition='and', dep=deps2)) @@ -2388,7 +2388,7 @@ def cleanup(self): # Start of ensemble tasks def eobs(self): deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run.replace("enkf","")}_prep'} + dep_dict = {'type': 'task', 'name': f'{self.run.replace("enkf", "")}_prep'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'metatask', 'name': 'enkfgdas_epmn', 'offset': f"-{timedelta_to_HMS(self._base['interval_gdas'])}"} deps.append(rocoto.add_dependency(dep_dict)) @@ -2498,7 +2498,7 @@ def eupd(self): def atmensanlinit(self): deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run.replace("enkf","")}_prepatmiodaobs'} + dep_dict = {'type': 'task', 'name': f'{self.run.replace("enkf", "")}_prepatmiodaobs'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'metatask', 'name': 'enkfgdas_epmn', 'offset': f"-{timedelta_to_HMS(self._base['interval_gdas'])}"} deps.append(rocoto.add_dependency(dep_dict)) @@ -2679,7 +2679,7 @@ def _get_ecengroups(): return grp, dep, lst deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run.replace("enkf","")}_analcalc'} + dep_dict = {'type': 'task', 'name': f'{self.run.replace("enkf", "")}_analcalc'} deps.append(rocoto.add_dependency(dep_dict)) if self.options['do_jediatmens']: dep_dict = {'type': 'task', 'name': f'{self.run}_atmensanlfinal'} @@ -2723,7 +2723,7 @@ def _get_ecengroups(): def esfc(self): deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run.replace("enkf","")}_analcalc'} + dep_dict = {'type': 'task', 'name': f'{self.run.replace("enkf", "")}_analcalc'} deps.append(rocoto.add_dependency(dep_dict)) if self.options['do_jediatmens']: dep_dict = {'type': 'task', 'name': f'{self.run}_atmensanlfinal'} @@ -2804,7 +2804,7 @@ def echgres(self): self._is_this_a_gdas_task(self.run, 'echgres') deps = [] - dep_dict = {'type': 'metatask', 'name': f'{self.run.replace("enkf","")}_fcst'} + dep_dict = {'type': 'metatask', 'name': f'{self.run.replace("enkf", "")}_fcst'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': f'{self.run}_fcst_mem001'} deps.append(rocoto.add_dependency(dep_dict)) diff --git a/workflow/setup_expt.py b/workflow/setup_expt.py index 09bc1c90ac..2a426f6a77 100755 --- a/workflow/setup_expt.py +++ b/workflow/setup_expt.py @@ -122,7 +122,7 @@ def edit_baseconfig(host, inputs, yaml_dict): "@SDATE@": datetime_to_YMDH(inputs.idate), "@EDATE@": datetime_to_YMDH(inputs.edate), "@CASECTL@": f'C{inputs.resdetatmos}', - "@OCNRES@": f"{int(100.*inputs.resdetocean):03d}", + "@OCNRES@": f"{int(100. * inputs.resdetocean):03d}", "@EXPDIR@": inputs.expdir, "@COMROOT@": inputs.comroot, "@EXP_WARM_START@": is_warm_start,