From e2a1c1750191c3dc81ae2b32bab82ef3a9768679 Mon Sep 17 00:00:00 2001 From: LucR31 <94859181+LucR31@users.noreply.github.com> Date: Wed, 17 Jul 2024 16:18:57 +0200 Subject: [PATCH] Multi workflow (#35) * new workflow * new workflow entry point --------- Co-authored-by: Fernandez Vilanova, Lucas --- .../workflows/child_meteo_workflow.py | 93 ++++++ .../workflows/child_sim_workflow.py | 269 ++++++++++++++++++ aiida_flexpart/workflows/parent_workflow.py | 70 +++++ pyproject.toml | 1 + 4 files changed, 433 insertions(+) create mode 100644 aiida_flexpart/workflows/child_meteo_workflow.py create mode 100644 aiida_flexpart/workflows/child_sim_workflow.py create mode 100644 aiida_flexpart/workflows/parent_workflow.py diff --git a/aiida_flexpart/workflows/child_meteo_workflow.py b/aiida_flexpart/workflows/child_meteo_workflow.py new file mode 100644 index 0000000..10bd939 --- /dev/null +++ b/aiida_flexpart/workflows/child_meteo_workflow.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +from aiida import engine, orm +from aiida_shell import launch_shell_job +from aiida_flexpart.utils import get_simulation_period + +#possible models +cosmo_models = ['cosmo7', 'cosmo1', 'kenda1'] +ECMWF_models = ['IFS_GL_05', 'IFS_GL_1', 'IFS_EU_02', 'IFS_EU_01'] + + +class TransferMeteoWorkflow(engine.WorkChain): + """Multi-dates workflow for transfering the necessary + Meteorological data for the subsequent Flexpart + simulations.""" + @classmethod + def define(cls, spec): + """Specify inputs and outputs.""" + super().define(spec) + + spec.input('check_meteo_cosmo_code', valid_type=orm.AbstractCode) + spec.input('check_meteo_ifs_code', valid_type=orm.AbstractCode) + + spec.input('simulation_dates', valid_type=orm.List, required=False) + spec.input('gribdir', valid_type=orm.Str, required=True) + + spec.input('model', valid_type=orm.List, required=True) + spec.input('model_offline', valid_type=orm.List, required=True) + spec.input('offline_integration_time', valid_type=orm.Int) + spec.input('integration_time', valid_type=orm.Int) + spec.input('command', valid_type=orm.Dict) + + spec.outline( + cls.setup, + engine.while_(cls.date)(cls.prepare_meteo_folder, + engine.if_(cls.offline)( + cls.prepare_meteo_folder), + cls.add_index)) + + def setup(self): + self.ctx.index = 0 + + def date(self): + self.ctx.offline = False + return self.ctx.index < len(self.inputs.simulation_dates) + + def offline(self): + if self.inputs.offline_integration_time > 0: + self.ctx.offline = True + return True + return False + + def add_index(self): + self.ctx.index += 1 + + def prepare_meteo_folder(self): + model_list = self.inputs.model + code_ = self.inputs.check_meteo_cosmo_code + age_class_ = self.inputs.integration_time.value * 3600 + + if all(mod in ECMWF_models + for mod in self.inputs.model) and self.inputs.model: + code_ = self.inputs.check_meteo_ifs_code + + if self.ctx.offline > 0: + age_class_ = self.inputs.offline_integration_time.value * 3600 + model_list = self.inputs.model_offline + code_ = self.inputs.check_meteo_ifs_code + + e_date, s_date = get_simulation_period( + self.inputs.simulation_dates[self.ctx.index], + age_class_, + self.inputs.command.get_dict()['release_duration'], + self.inputs.command.get_dict()['simulation_direction'], + ) + self.report(f'preparing meteo from {s_date} to {e_date}') + + node_list = [] + for mod in model_list: + self.report(f'transfering {mod} meteo') + _, node = launch_shell_job( + code_, + arguments=' -s {sdate} -e {edate} -g {gribdir} -m {model} -a', + nodes={ + 'sdate': orm.Str(s_date), + 'edate': orm.Str(e_date), + 'gribdir': self.inputs.gribdir, + 'model': orm.Str(mod), + }, + ) + node_list.append(node) + + if all(node.is_finished_ok for node in node_list): + self.report('ALL meteo OK') diff --git a/aiida_flexpart/workflows/child_sim_workflow.py b/aiida_flexpart/workflows/child_sim_workflow.py new file mode 100644 index 0000000..091b1b9 --- /dev/null +++ b/aiida_flexpart/workflows/child_sim_workflow.py @@ -0,0 +1,269 @@ +# -*- coding: utf-8 -*- +"""Flexpart multi-dates WorkChain.""" +from aiida import engine, plugins, orm +from aiida_flexpart.workflows.child_meteo_workflow import TransferMeteoWorkflow + +# plugins +FlexpartCosmoCalculation = plugins.CalculationFactory("flexpart.cosmo") +FlexpartIfsCalculation = plugins.CalculationFactory("flexpart.ifs") +FlexpartPostCalculation = plugins.CalculationFactory("flexpart.post") + +# possible models +cosmo_models = ["cosmo7", "cosmo1", "kenda1"] +ECMWF_models = ["IFS_GL_05", "IFS_GL_1", "IFS_EU_02", "IFS_EU_01"] + + +class FlexpartSimWorkflow(engine.WorkChain): + """Flexpart multi-dates workflow""" + + @classmethod + def define(cls, spec): + """Specify inputs and outputs.""" + super().define(spec) + + # Codes + spec.input("fcosmo_code", valid_type=orm.AbstractCode) + spec.input("fifs_code", valid_type=orm.AbstractCode) + spec.input("post_processing_code", valid_type=orm.AbstractCode) + + # Basic Inputs + spec.expose_inputs(TransferMeteoWorkflow) + spec.input("date", valid_type=orm.Str, required=False) + + # Model settings + # Command is exposed form TransferMeteoWorkflow + spec.input("input_phy", valid_type=orm.Dict) + spec.input("release_settings", valid_type=orm.Dict) + spec.input("locations", valid_type=orm.Dict, + help="Dictionary of locations properties." + ) + + # Meteo related inputs + spec.input( + "meteo_inputs", + valid_type=orm.Dict, + required=False, + help="Meteo models input params.", + ) + spec.input( + "meteo_inputs_offline", + valid_type=orm.Dict, + required=False, + help="Meteo models input params.", + ) + spec.input( + "meteo_path", + valid_type=orm.List, + required=False, + help="Path to the folder containing the meteorological input data.", + ) + spec.input( + "meteo_path_offline", + valid_type=orm.List, + required=False, + help="Path to the folder containing the meteorological input data.", + ) + + # Others + spec.input("outgrid", valid_type=orm.Dict) + spec.input("outgrid_nest", valid_type=orm.Dict, required=False) + spec.input("species", valid_type=orm.RemoteData, required=True) + spec.input_namespace( + "land_use", + valid_type=orm.RemoteData, + required=False, + dynamic=True, + help="#TODO", + ) + spec.input_namespace( + "land_use_ifs", valid_type=orm.RemoteData, required=False, dynamic=True + ) + spec.expose_inputs( + FlexpartCosmoCalculation, + include=["metadata.options"], + namespace="flexpartcosmo", + ) + spec.expose_inputs( + FlexpartIfsCalculation, + include=["metadata.options"], + namespace="flexpartifs", + ) + spec.expose_inputs( + FlexpartPostCalculation, + include=["metadata.options"], + namespace="flexpartpost", + ) + spec.outputs.dynamic = True + + #exit codes + spec.exit_code(400, 'ERROR_CALCULATION_FAILED', + 'the previous calculation did not finish successfully') + + spec.outline( + cls.setup, + + engine.if_(cls.run_cosmo)( + cls.run_cosmo_simulation, + cls.inspect_calculation + ), + engine.if_(cls.run_ifs)( + cls.run_ifs_simulation, + cls.inspect_calculation + ), + + cls.post_processing, + cls.results, + ) + + def run_cosmo(self): + """run cosmo simulation""" + if all(mod in cosmo_models for mod in self.inputs.model) and self.inputs.model: + return True + return False + + def run_ifs(self): + """run ifs simulation""" + if ( + all(mod in ECMWF_models for mod in self.inputs.model) + or all(mod in ECMWF_models for mod in self.inputs.model_offline) + and self.inputs.model + and self.inputs.model_offline + ): + return True + return False + + def inspect_calculation(self): + if not self.ctx.calculations[-1].is_finished_ok: + self.report('ERROR calculation did not finish ok') + return self.exit_codes.ERROR_CALCULATION_FAILED + self.report('calculation successfull') + + def setup(self): + + self.ctx.simulation_date = self.inputs.date.value + self.ctx.integration_time = self.inputs.integration_time + self.ctx.offline_integration_time = self.inputs.offline_integration_time + + # model settings + self.ctx.release_settings = self.inputs.release_settings + self.ctx.command = self.inputs.command + self.ctx.input_phy = self.inputs.input_phy + self.ctx.locations = self.inputs.locations + + # others + self.ctx.outgrid = self.inputs.outgrid + self.ctx.species = self.inputs.species + self.ctx.land_use = self.inputs.land_use + + def post_processing(self): + """post processing""" + self.report("starting post-processsing") + builder = FlexpartPostCalculation.get_builder() + builder.code = self.inputs.post_processing_code + builder.input_dir = self.ctx.calculations[-1].outputs.remote_folder + + if self.ctx.offline_integration_time > 0: + + builder.input_dir = self.ctx.calculations[-2].outputs.remote_folder + builder.input_offline_dir = self.ctx.calculations[-1].outputs.remote_folder + + builder.metadata.options = self.inputs.flexpartpost.metadata.options + + running = self.submit(builder) + self.to_context(calculations=engine.append_(running)) + + def run_cosmo_simulation(self): + """Run calculations for equation of state.""" + + self.report(f"starting flexpart cosmo {self.ctx.simulation_date}") + + builder = FlexpartCosmoCalculation.get_builder() + builder.code = self.inputs.fcosmo_code + + # update command file + new_dict = self.ctx.command.get_dict() + new_dict["simulation_date"] = self.ctx.simulation_date + new_dict["age_class"] = self.inputs.integration_time * 3600 + new_dict.update(self.inputs.meteo_inputs) + + # model settings + builder.model_settings = { + "release_settings": self.ctx.release_settings, + "locations": self.ctx.locations, + "command": orm.Dict(dict=new_dict), + "input_phy": self.ctx.input_phy, + } + + builder.outgrid = orm.Dict( + list(self.ctx.outgrid.get_dict().values())[0]) + if 'outgrid_nest' in self.inputs: + builder.outgrid_nest = orm.Dict( + list(self.inputs.outgrid_nest.get_dict().values())[0]) + builder.species = self.ctx.species + builder.land_use = self.ctx.land_use + builder.meteo_path = self.inputs.meteo_path + + # Walltime, memory, and resources. + builder.metadata.description = "Test workflow to submit a flexpart calculation" + builder.metadata.options = self.inputs.flexpartcosmo.metadata.options + + # Ask the workflow to continue when the results are ready and store them in the context + running = self.submit(builder) + self.to_context(calculations=engine.append_(running)) + + def run_ifs_simulation(self): + """Run calculations for equation of state.""" + # Set up calculation. + self.report(f"running flexpart ifs for {self.ctx.simulation_date}") + builder = FlexpartIfsCalculation.get_builder() + builder.code = self.inputs.fifs_code + + # changes in the command file + new_dict = self.ctx.command.get_dict() + new_dict["simulation_date"] = self.ctx.simulation_date + + if self.ctx.offline_integration_time > 0: + new_dict["age_class"] = self.ctx.offline_integration_time * 3600 + new_dict["dumped_particle_data"] = True + + self.ctx.parent_calc_folder = self.ctx.calculations[ + -1 + ].outputs.remote_folder + builder.parent_calc_folder = self.ctx.parent_calc_folder + self.report(f"starting from: {self.ctx.parent_calc_folder}") + + builder.meteo_path = self.inputs.meteo_path_offline + new_dict.update(self.inputs.meteo_inputs_offline) + + else: + new_dict["age_class"] = self.inputs.integration_time * 3600 + builder.meteo_path = self.inputs.meteo_path + new_dict.update(self.inputs.meteo_inputs) + + # model settings + builder.model_settings = { + "release_settings": self.ctx.release_settings, + "locations": self.ctx.locations, + "command": orm.Dict(dict=new_dict), + } + + builder.outgrid = orm.Dict( + list(self.ctx.outgrid.get_dict().values())[0]) + if 'outgrid_nest' in self.inputs: + builder.outgrid_nest = orm.Dict( + list(self.inputs.outgrid_nest.get_dict().values())[0]) + builder.species = self.ctx.species + builder.land_use = self.inputs.land_use_ifs + + # Walltime, memory, and resources. + builder.metadata.description = "Test workflow to submit a flexpart calculation" + builder.metadata.options = self.inputs.flexpartifs.metadata.options + + # Ask the workflow to continue when the results are ready and store them in the context + running = self.submit(builder) + self.to_context(calculations=engine.append_(running)) + + def results(self): + """Process results.""" + for indx, calculation in enumerate(self.ctx.calculations): + self.out(f"calculation_{indx}_output_file", calculation.outputs.output_file) diff --git a/aiida_flexpart/workflows/parent_workflow.py b/aiida_flexpart/workflows/parent_workflow.py new file mode 100644 index 0000000..b10f6fa --- /dev/null +++ b/aiida_flexpart/workflows/parent_workflow.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +from aiida import engine, orm +from aiida_flexpart.workflows.child_meteo_workflow import TransferMeteoWorkflow +from aiida_flexpart.workflows.child_sim_workflow import FlexpartSimWorkflow + + + +class ParentWorkflow(engine.WorkChain): + @classmethod + def define(cls, spec): + super().define(spec) + + #extras + spec.input('name', valid_type=str, non_db=True, required=False) + + spec.expose_inputs(TransferMeteoWorkflow) + spec.expose_outputs(TransferMeteoWorkflow) + spec.expose_inputs(FlexpartSimWorkflow) + spec.expose_outputs(FlexpartSimWorkflow) + spec.outputs.dynamic = True + + spec.outline( + cls.setup, + cls.transfer_meteo, + cls.run_sim, + cls.finalize, + ) + + def setup(self): + #self.inputs.simulation_dates is orm.List + self.ctx.month_chunks = 1 + if 'name' in self.inputs: + out_n = 'None' + if 'outgrid_nest' in self.inputs: + out_n = self.inputs.outgrid_nest.get_dict() + self.node.base.extras.set( + self.inputs.name, { + 'command': self.inputs.command.get_dict(), + 'input_phy': self.inputs.input_phy.get_dict(), + 'release': self.inputs.release_settings.get_dict(), + 'locations': self.inputs.locations.get_dict(), + 'integration_time': self.inputs.integration_time.value, + 'offline_integration_time': + self.inputs.offline_integration_time.value, + 'model': self.inputs.model, + 'model_offline': self.inputs.model_offline, + 'outgrid': self.inputs.outgrid.get_dict(), + 'outgrid_nest': out_n + }) + + def transfer_meteo(self): + child_1 = self.submit(TransferMeteoWorkflow, + **self.exposed_inputs(TransferMeteoWorkflow)) + return engine.ToContext(child_1=child_1) + + def run_sim(self): + + for i in range(len(self.inputs.simulation_dates)): + child = self.submit( + FlexpartSimWorkflow, + **self.exposed_inputs(FlexpartSimWorkflow), + date=orm.Str(self.inputs.simulation_dates[i]), + ) + self.to_context(workchains=engine.append_(child)) + + def finalize(self): + self.out_many( + self.exposed_outputs(self.ctx.child_1, TransferMeteoWorkflow)) + for w in self.ctx.workchains: + self.out_many(self.exposed_outputs(w, FlexpartSimWorkflow)) diff --git a/pyproject.toml b/pyproject.toml index e0dc7fc..37080bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,6 +64,7 @@ docs = [ [project.entry-points."aiida.workflows"] "flexpart.multi_dates" = "aiida_flexpart.workflows.multi_dates_workflow:FlexpartMultipleDatesWorkflow" +"flexpart.multi_workflow" = "aiida_flexpart.workflows.parent_workflow:ParentWorkflow" "inspect.workflow" = "aiida_flexpart.workflows.inspect:InspectWorkflow" [project.entry-points."aiida.data"]