From ce90977cc603ddc4e7f0cc5ace8b5c5f19e38869 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Tue, 10 Oct 2023 12:09:01 +0200 Subject: [PATCH] Add handler for out of memory error - if pw_code_large_memory not provide, increase number of machine - don't change mixing and cg for element, since the spin is always off --- README.md | 2 +- aiida_sssp_workflow/cli/inspect.py | 9 +- aiida_sssp_workflow/cli/run.py | 15 +++ .../workflows/convergence/cohesive_energy.py | 48 ++++++---- .../workflows/evaluate/_cohesive_energy.py | 93 +++++++++++++++++-- .../workflows/verifications.py | 15 ++- 6 files changed, 154 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 2d08406c..4008fa20 100644 --- a/README.md +++ b/README.md @@ -115,7 +115,7 @@ The `options` dict has the format of: { "resources": { "num_machines": 1, - "num_mpiprocs_per_machine": 32, + "num_mpiprocs_per_machine": 36, }, "max_wallclock_seconds": 1800, # 30 min "withmpi": True, diff --git a/aiida_sssp_workflow/cli/inspect.py b/aiida_sssp_workflow/cli/inspect.py index 4cb166e1..24672318 100644 --- a/aiida_sssp_workflow/cli/inspect.py +++ b/aiida_sssp_workflow/cli/inspect.py @@ -152,7 +152,14 @@ def inspect(node, output): "phonon_frequencies", ]: # print summary of the convergence to a json file - convergence = wf_node.outputs.convergence[property] + try: + convergence = wf_node.outputs.convergence[property] + except KeyError: + click.secho( + f"Property {property} is not calculated for this workflow", + fg="red", + ) + continue cutoff_control_protocol = wf_node.inputs.convergence.cutoff_control.value cutoff_control = get_protocol("control", name=cutoff_control_protocol) diff --git a/aiida_sssp_workflow/cli/run.py b/aiida_sssp_workflow/cli/run.py index c4749122..4b436261 100644 --- a/aiida_sssp_workflow/cli/run.py +++ b/aiida_sssp_workflow/cli/run.py @@ -32,6 +32,11 @@ @options.OverridableOption( "--pw-code", "pw_code", type=types.CodeParamType(entry_point="quantumespresso.pw") )(required=True) +@options.OverridableOption( + "--pw-code-large-memory", + "pw_code_large_memory", + type=types.CodeParamType(entry_point="quantumespresso.pw"), +)(required=False) @options.OverridableOption( "--ph-code", "ph_code", type=types.CodeParamType(entry_point="quantumespresso.ph") )(required=False) @@ -97,6 +102,7 @@ def launch( pw_code, ph_code, + pw_code_large_memory, property, protocol, ecutwfc, @@ -157,6 +163,12 @@ def launch( if is_measure and (cutoff_control or criteria): echo.echo_warning("cutoff_control, criteria are not used for measure workflow.") + # raise warning if pw_code_large_memory is provided for not include cohesive energy convergence workflow + if pw_code_large_memory and ( + not is_convergence or "convergence.cohesive_energy" not in properties_list + ): + echo.echo_warning("pw_code_large_memory is not used for this workflow.") + if is_convergence and len(configuration) > 1: echo.echo_critical( "Only one configuration is allowed for convergence workflow." @@ -225,6 +237,9 @@ def launch( if is_ph: inputs["ph_code"] = ph_code + if pw_code_large_memory: + inputs["pw_code_large_memory"] = pw_code_large_memory + if len(configuration) == 0: pass elif len(configuration) == 1: diff --git a/aiida_sssp_workflow/workflows/convergence/cohesive_energy.py b/aiida_sssp_workflow/workflows/convergence/cohesive_energy.py index a50d895a..0d334c12 100644 --- a/aiida_sssp_workflow/workflows/convergence/cohesive_energy.py +++ b/aiida_sssp_workflow/workflows/convergence/cohesive_energy.py @@ -46,6 +46,16 @@ class ConvergenceCohesiveEnergyWorkChain(_BaseConvergenceWorkChain): _EVALUATE_WORKCHAIN = CohesiveEnergyWorkChain _MEASURE_OUT_PROPERTY = "absolute_diff" + @classmethod + def define(cls, spec): + super().define(spec) + spec.input( + "pw_code_large_memory", + valid_type=orm.AbstractCode, + required=False, + help="The `pw.x` code use for the `PwCalculation` require large memory.", + ) + def init_setup(self): super().init_setup() self.ctx.extra_pw_parameters = { @@ -67,6 +77,7 @@ def extra_setup_for_magnetic_element(self): # 2023-08-02: we decide to use non-magnetic calculation for magnetic element # Because it gives fault convergence result that not compatible with other convergence tests, lead to very large # convergence cutoff from cohesive energy tests. + # XXX: (double check) Meanwhile, the pseudopotential is generated in terms of non-magnetic configuration (???). # "SYSTEM": { # "nspin": 2, # "starting_magnetization": { @@ -74,8 +85,9 @@ def extra_setup_for_magnetic_element(self): # }, # }, "ELECTRONS": { - "diagonalization": "cg", - "mixing_beta": 0.3, + # 2023-10-10: using cg with mixing_beta 0.3 for magnetic element will lead to "Error in routine efermig (1):" + # "diagonalization": "cg", + # "mixing_beta": 0.3, "electron_maxstep": 200, }, } @@ -90,18 +102,19 @@ def extra_setup_for_lanthanide_element(self): super().extra_setup_for_lanthanide_element() extra_pw_parameters_for_atom_lanthanide_element = { self.ctx.element: { - "SYSTEM": { - "nspin": 2, - "starting_magnetization": { - self.ctx.element: 0.5, - }, - # Need high number of bands to make atom calculation of lanthanoids - # converged. - "nbnd": int(self.inputs.pseudo.z_valence * 3), - }, + # 2023-08-02: we decide to use non-magnetic calculation for magnetic element (see above), lanthanide element also use non-magnetic calculation + # "SYSTEM": { + # "nspin": 2, + # "starting_magnetization": { + # self.ctx.element: 0.5, + # }, + # # Need high number of bands to make atom calculation of lanthanoids + # # converged. + # "nbnd": int(self.inputs.pseudo.z_valence * 3), + # }, "ELECTRONS": { - "diagonalization": "cg", - "mixing_beta": 0.3, # even small mixing_beta value + # "diagonalization": "cg", + # "mixing_beta": 0.3, # even smaller mixing_beta value "electron_maxstep": 200, }, }, @@ -181,13 +194,13 @@ def _get_inputs(self, ecutwfc, ecutrho): atomic_parallelization = update_dict(atomic_parallelization, {"npool": 1}) atomic_parallelization = update_dict(atomic_parallelization, {"ndiag": 1}) - # atomic option if mpiprocs too many confine it too no larger than 32 procs + # atomic option if mpiprocs too many confine it too no larger than 36 procs atomic_options = update_dict(self.ctx.options, {}) - if atomic_options["resources"]["num_mpiprocs_per_machine"] > 32: + if atomic_options["resources"]["num_mpiprocs_per_machine"] > 36: # copy is a shallow copy, so using update_dict. # if simply assign the value will change also the original dict atomic_options = update_dict( - atomic_options, {"resources": {"num_mpiprocs_per_machine": 32}} + atomic_options, {"resources": {"num_mpiprocs_per_machine": 36}} ) # atomic calculation for lanthanides require more time to finish. @@ -239,6 +252,9 @@ def _get_inputs(self, ecutwfc, ecutrho): "clean_workdir": self.inputs.clean_workdir, # atomit clean is controlled above, this clean happened when the whole workchain is finished } + if "pw_code_large_memory" in self.inputs: + inputs["atom"]["pw_code_large_memory"] = self.inputs.pw_code_large_memory + return inputs def helper_compare_result_extract_fun(self, sample_node, reference_node, **kwargs): diff --git a/aiida_sssp_workflow/workflows/evaluate/_cohesive_energy.py b/aiida_sssp_workflow/workflows/evaluate/_cohesive_energy.py index 70254c9c..c3453522 100644 --- a/aiida_sssp_workflow/workflows/evaluate/_cohesive_energy.py +++ b/aiida_sssp_workflow/workflows/evaluate/_cohesive_energy.py @@ -4,12 +4,19 @@ Create the structure of isolate atom """ from aiida import orm -from aiida.engine import append_, calcfunction -from aiida.plugins import DataFactory, WorkflowFactory +from aiida.engine import ( + CalcJob, + ProcessHandlerReport, + append_, + calcfunction, + process_handler, +) +from aiida.plugins import DataFactory +from aiida_quantumespresso.common.types import RestartType +from aiida_quantumespresso.workflows.pw.base import PwBaseWorkChain from . import _BaseEvaluateWorkChain -PwBaseWorkflow = WorkflowFactory("quantumespresso.pw.base") UpfData = DataFactory("pseudo.upf") @@ -39,6 +46,70 @@ def create_isolate_atom( return structure +class PwBaseWorkChainWithMemoryHandler(PwBaseWorkChain): + """Add memory handler to PwBaseWorkChain to use large memory resource""" + + @classmethod + def define(cls, spec): + super().define(spec) + spec.input( + "pw_code_large_memory", + valid_type=orm.Code, + required=False, + help="The `pw.x` code use for the `PwCalculation` with large memory resource.", + ) + + @process_handler( + priority=601, + exit_codes=[ + CalcJob.exit_codes.ERROR_SCHEDULER_OUT_OF_MEMORY, + ], + ) + def handle_out_of_memory(self, calculation): + """Handle out of memory error by using the code with large memory resource if provided""" + if "pw_code_large_memory" in self.inputs: + # use code with large memory resource + pw_code_large_memory = self.inputs.pw_code_large_memory + self.ctx.inputs.code = pw_code_large_memory + + action = f"Use code {self.inputs.pw_code_large_memory} with large memory resource" + + self.set_restart_type(RestartType.FROM_SCRATCH) + self.report_error_handled(calculation, action) + return ProcessHandlerReport(True) + else: + self.ctx.current_num_machines = self.ctx.inputs.metadata.options.get( + "resources", {} + ).get("num_machines", 1) + + if self.ctx.current_num_machines > 4: + self.report( + "The number of machines is larger than 4, the calculation will be terminated." + ) + return ProcessHandlerReport( + False, CalcJob.exit_codes.ERROR_SCHEDULER_OUT_OF_MEMORY + ) + + action = f"Increase the number of machines from {self.ctx.current_num_machines} to {self.ctx.current_num_machines + 1}" + self.ctx.inputs.metadata.options["resources"]["num_machines"] = ( + self.ctx.current_num_machines + 1 + ) + # for atomic calculation, the num_mpiprocs_per_machine is set, but increase the number of machines + # will cause too many mpi processes, so pop the num_mpiprocs_per_machine and use the `tot_num_mpiprocs`. + num_mpiprocs_per_machine = self.ctx.inputs.metadata.options[ + "resources" + ].pop("num_mpiprocs_per_machine") + if num_mpiprocs_per_machine: + self.ctx.inputs.metadata.options["resources"][ + "tot_num_mpiprocs" + ] = num_mpiprocs_per_machine + + self.set_restart_type(RestartType.FROM_SCRATCH) + self.report_error_handled(calculation, action) + + return ProcessHandlerReport(True) + + class CohesiveEnergyWorkChain(_BaseEvaluateWorkChain): """WorkChain to calculate cohisive energy of input structure""" @@ -55,8 +126,8 @@ def define(cls, spec): help='parameters for pwscf of atom calculation for each element in structure.') spec.input('vacuum_length', valid_type=orm.Float, help='The length of cubic cell in isolate atom calculation.') - spec.expose_inputs(PwBaseWorkflow, namespace="bulk", exclude=["pw.structure", "pw.pseudos"]) - spec.expose_inputs(PwBaseWorkflow, namespace="atom", exclude=["pw.structure", "pw.pseudos"]) + spec.expose_inputs(PwBaseWorkChain, namespace="bulk", exclude=["pw.structure", "pw.pseudos"]) + spec.expose_inputs(PwBaseWorkChainWithMemoryHandler, namespace="atom", exclude=["pw.structure", "pw.pseudos"]) spec.outline( cls.validate_structure, @@ -111,18 +182,20 @@ def _get_pseudo(element, pseudos): def run_energy(self): """set the inputs and submit atom/bulk energy evaluation parallel""" - bulk_inputs = self.exposed_inputs(PwBaseWorkflow, namespace="bulk") + bulk_inputs = self.exposed_inputs(PwBaseWorkChain, namespace="bulk") bulk_inputs["pw"]["structure"] = self.inputs.structure bulk_inputs["pw"]["pseudos"] = self.inputs.pseudos - running_bulk_energy = self.submit(PwBaseWorkflow, **bulk_inputs) + running_bulk_energy = self.submit(PwBaseWorkChain, **bulk_inputs) self.report( f"Submit SCF calculation of bulk {self.inputs.structure.get_description()}" ) self.to_context(workchain_bulk_energy=running_bulk_energy) for element, structure in self.ctx.d_element_structure.items(): - atom_inputs = self.exposed_inputs(PwBaseWorkflow, namespace="atom") + atom_inputs = self.exposed_inputs( + PwBaseWorkChainWithMemoryHandler, namespace="atom" + ) atom_inputs["pw"]["structure"] = structure atom_inputs["pw"]["pseudos"] = { element: self._get_pseudo(element, self.inputs.pseudos), @@ -131,7 +204,9 @@ def run_energy(self): dict=self.inputs.atom_parameters[element] ) - running_atom_energy = self.submit(PwBaseWorkflow, **atom_inputs) + running_atom_energy = self.submit( + PwBaseWorkChainWithMemoryHandler, **atom_inputs + ) self.logger.info(f"Submit atomic SCF of {element}.") self.to_context(workchain_atom_children=append_(running_atom_energy)) diff --git a/aiida_sssp_workflow/workflows/verifications.py b/aiida_sssp_workflow/workflows/verifications.py index cd5a4ec5..6bbd4f64 100644 --- a/aiida_sssp_workflow/workflows/verifications.py +++ b/aiida_sssp_workflow/workflows/verifications.py @@ -87,6 +87,8 @@ def define(cls, spec): help='The `pw.x` code use for the `PwCalculation`.') spec.input('ph_code', valid_type=orm.AbstractCode, required=False, help='The `ph.x` code use for the `PhCalculation`.') + spec.input('pw_code_large_memory', valid_type=orm.AbstractCode, required=False, + help='The `pw.x` code use for the `PwCalculation` require large memory.') spec.input('pseudo', valid_type=UpfData, required=True, help='Pseudopotential to be verified') spec.input('wavefunction_cutoff', valid_type=orm.Float, required=False, default=lambda: orm.Float(100.0), @@ -221,9 +223,20 @@ def init_setup(self): convergence_inputs["clean_workdir"] = self.inputs.clean_workdir - for prop in ["cohesive_energy", "delta", "pressure"]: + for prop in ["delta", "pressure"]: self.ctx.convergence_inputs[prop] = convergence_inputs.copy() + # The cohesive energy evaluation may hit the ran out of memory issue, + # so use the pw_code_large_memory if provided. + if "convergence.cohesive_energy" in self.ctx.properties_list: + inputs_cohesive_energy = convergence_inputs.copy() + if "pw_code_large_memory" in self.inputs: + inputs_cohesive_energy[ + "pw_code_large_memory" + ] = self.inputs.pw_code_large_memory + + self.ctx.convergence_inputs["cohesive_energy"] = inputs_cohesive_energy + # Here, the shallow copy can be used since the type of convergence_inputs # is AttributesDict. # The deepcopy can't be used, since it will create new data node.