Skip to content

Commit

Permalink
Add handler for out of memory error
Browse files Browse the repository at this point in the history
- if pw_code_large_memory not provide, increase number of machine
- don't change mixing and cg for element, since the spin is always off
  • Loading branch information
unkcpz committed Oct 10, 2023
1 parent ebcc59b commit ce90977
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 28 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ The `options` dict has the format of:
{
"resources": {
"num_machines": 1,
"num_mpiprocs_per_machine": 32,
"num_mpiprocs_per_machine": 36,
},
"max_wallclock_seconds": 1800, # 30 min
"withmpi": True,
Expand Down
9 changes: 8 additions & 1 deletion aiida_sssp_workflow/cli/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,14 @@ def inspect(node, output):
"phonon_frequencies",
]:
# print summary of the convergence to a json file
convergence = wf_node.outputs.convergence[property]
try:
convergence = wf_node.outputs.convergence[property]
except KeyError:
click.secho(
f"Property {property} is not calculated for this workflow",
fg="red",
)
continue

cutoff_control_protocol = wf_node.inputs.convergence.cutoff_control.value
cutoff_control = get_protocol("control", name=cutoff_control_protocol)
Expand Down
15 changes: 15 additions & 0 deletions aiida_sssp_workflow/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
@options.OverridableOption(
"--pw-code", "pw_code", type=types.CodeParamType(entry_point="quantumespresso.pw")
)(required=True)
@options.OverridableOption(
"--pw-code-large-memory",
"pw_code_large_memory",
type=types.CodeParamType(entry_point="quantumespresso.pw"),
)(required=False)
@options.OverridableOption(
"--ph-code", "ph_code", type=types.CodeParamType(entry_point="quantumespresso.ph")
)(required=False)
Expand Down Expand Up @@ -97,6 +102,7 @@
def launch(
pw_code,
ph_code,
pw_code_large_memory,
property,
protocol,
ecutwfc,
Expand Down Expand Up @@ -157,6 +163,12 @@ def launch(
if is_measure and (cutoff_control or criteria):
echo.echo_warning("cutoff_control, criteria are not used for measure workflow.")

# raise warning if pw_code_large_memory is provided for not include cohesive energy convergence workflow
if pw_code_large_memory and (
not is_convergence or "convergence.cohesive_energy" not in properties_list
):
echo.echo_warning("pw_code_large_memory is not used for this workflow.")

if is_convergence and len(configuration) > 1:
echo.echo_critical(
"Only one configuration is allowed for convergence workflow."
Expand Down Expand Up @@ -225,6 +237,9 @@ def launch(
if is_ph:
inputs["ph_code"] = ph_code

if pw_code_large_memory:
inputs["pw_code_large_memory"] = pw_code_large_memory

if len(configuration) == 0:
pass
elif len(configuration) == 1:
Expand Down
48 changes: 32 additions & 16 deletions aiida_sssp_workflow/workflows/convergence/cohesive_energy.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ class ConvergenceCohesiveEnergyWorkChain(_BaseConvergenceWorkChain):
_EVALUATE_WORKCHAIN = CohesiveEnergyWorkChain
_MEASURE_OUT_PROPERTY = "absolute_diff"

@classmethod
def define(cls, spec):
super().define(spec)
spec.input(
"pw_code_large_memory",
valid_type=orm.AbstractCode,
required=False,
help="The `pw.x` code use for the `PwCalculation` require large memory.",
)

def init_setup(self):
super().init_setup()
self.ctx.extra_pw_parameters = {
Expand All @@ -67,15 +77,17 @@ def extra_setup_for_magnetic_element(self):
# 2023-08-02: we decide to use non-magnetic calculation for magnetic element
# Because it gives fault convergence result that not compatible with other convergence tests, lead to very large
# convergence cutoff from cohesive energy tests.
# XXX: (double check) Meanwhile, the pseudopotential is generated in terms of non-magnetic configuration (???).
# "SYSTEM": {
# "nspin": 2,
# "starting_magnetization": {
# self.ctx.element: 0.5,
# },
# },
"ELECTRONS": {
"diagonalization": "cg",
"mixing_beta": 0.3,
# 2023-10-10: using cg with mixing_beta 0.3 for magnetic element will lead to "Error in routine efermig (1):"
# "diagonalization": "cg",
# "mixing_beta": 0.3,
"electron_maxstep": 200,
},
}
Expand All @@ -90,18 +102,19 @@ def extra_setup_for_lanthanide_element(self):
super().extra_setup_for_lanthanide_element()
extra_pw_parameters_for_atom_lanthanide_element = {
self.ctx.element: {
"SYSTEM": {
"nspin": 2,
"starting_magnetization": {
self.ctx.element: 0.5,
},
# Need high number of bands to make atom calculation of lanthanoids
# converged.
"nbnd": int(self.inputs.pseudo.z_valence * 3),
},
# 2023-08-02: we decide to use non-magnetic calculation for magnetic element (see above), lanthanide element also use non-magnetic calculation
# "SYSTEM": {
# "nspin": 2,
# "starting_magnetization": {
# self.ctx.element: 0.5,
# },
# # Need high number of bands to make atom calculation of lanthanoids
# # converged.
# "nbnd": int(self.inputs.pseudo.z_valence * 3),
# },
"ELECTRONS": {
"diagonalization": "cg",
"mixing_beta": 0.3, # even small mixing_beta value
# "diagonalization": "cg",
# "mixing_beta": 0.3, # even smaller mixing_beta value
"electron_maxstep": 200,
},
},
Expand Down Expand Up @@ -181,13 +194,13 @@ def _get_inputs(self, ecutwfc, ecutrho):
atomic_parallelization = update_dict(atomic_parallelization, {"npool": 1})
atomic_parallelization = update_dict(atomic_parallelization, {"ndiag": 1})

# atomic option if mpiprocs too many confine it too no larger than 32 procs
# atomic option if mpiprocs too many confine it too no larger than 36 procs
atomic_options = update_dict(self.ctx.options, {})
if atomic_options["resources"]["num_mpiprocs_per_machine"] > 32:
if atomic_options["resources"]["num_mpiprocs_per_machine"] > 36:
# copy is a shallow copy, so using update_dict.
# if simply assign the value will change also the original dict
atomic_options = update_dict(
atomic_options, {"resources": {"num_mpiprocs_per_machine": 32}}
atomic_options, {"resources": {"num_mpiprocs_per_machine": 36}}
)

# atomic calculation for lanthanides require more time to finish.
Expand Down Expand Up @@ -239,6 +252,9 @@ def _get_inputs(self, ecutwfc, ecutrho):
"clean_workdir": self.inputs.clean_workdir, # atomit clean is controlled above, this clean happened when the whole workchain is finished
}

if "pw_code_large_memory" in self.inputs:
inputs["atom"]["pw_code_large_memory"] = self.inputs.pw_code_large_memory

return inputs

def helper_compare_result_extract_fun(self, sample_node, reference_node, **kwargs):
Expand Down
93 changes: 84 additions & 9 deletions aiida_sssp_workflow/workflows/evaluate/_cohesive_energy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@
Create the structure of isolate atom
"""
from aiida import orm
from aiida.engine import append_, calcfunction
from aiida.plugins import DataFactory, WorkflowFactory
from aiida.engine import (
CalcJob,
ProcessHandlerReport,
append_,
calcfunction,
process_handler,
)
from aiida.plugins import DataFactory
from aiida_quantumespresso.common.types import RestartType
from aiida_quantumespresso.workflows.pw.base import PwBaseWorkChain

from . import _BaseEvaluateWorkChain

PwBaseWorkflow = WorkflowFactory("quantumespresso.pw.base")
UpfData = DataFactory("pseudo.upf")


Expand Down Expand Up @@ -39,6 +46,70 @@ def create_isolate_atom(
return structure


class PwBaseWorkChainWithMemoryHandler(PwBaseWorkChain):
"""Add memory handler to PwBaseWorkChain to use large memory resource"""

@classmethod
def define(cls, spec):
super().define(spec)
spec.input(
"pw_code_large_memory",
valid_type=orm.Code,
required=False,
help="The `pw.x` code use for the `PwCalculation` with large memory resource.",
)

@process_handler(
priority=601,
exit_codes=[
CalcJob.exit_codes.ERROR_SCHEDULER_OUT_OF_MEMORY,
],
)
def handle_out_of_memory(self, calculation):
"""Handle out of memory error by using the code with large memory resource if provided"""
if "pw_code_large_memory" in self.inputs:
# use code with large memory resource
pw_code_large_memory = self.inputs.pw_code_large_memory
self.ctx.inputs.code = pw_code_large_memory

action = f"Use code {self.inputs.pw_code_large_memory} with large memory resource"

self.set_restart_type(RestartType.FROM_SCRATCH)
self.report_error_handled(calculation, action)
return ProcessHandlerReport(True)
else:
self.ctx.current_num_machines = self.ctx.inputs.metadata.options.get(
"resources", {}
).get("num_machines", 1)

if self.ctx.current_num_machines > 4:
self.report(
"The number of machines is larger than 4, the calculation will be terminated."
)
return ProcessHandlerReport(
False, CalcJob.exit_codes.ERROR_SCHEDULER_OUT_OF_MEMORY
)

action = f"Increase the number of machines from {self.ctx.current_num_machines} to {self.ctx.current_num_machines + 1}"
self.ctx.inputs.metadata.options["resources"]["num_machines"] = (
self.ctx.current_num_machines + 1
)
# for atomic calculation, the num_mpiprocs_per_machine is set, but increase the number of machines
# will cause too many mpi processes, so pop the num_mpiprocs_per_machine and use the `tot_num_mpiprocs`.
num_mpiprocs_per_machine = self.ctx.inputs.metadata.options[
"resources"
].pop("num_mpiprocs_per_machine")
if num_mpiprocs_per_machine:
self.ctx.inputs.metadata.options["resources"][
"tot_num_mpiprocs"
] = num_mpiprocs_per_machine

self.set_restart_type(RestartType.FROM_SCRATCH)
self.report_error_handled(calculation, action)

return ProcessHandlerReport(True)


class CohesiveEnergyWorkChain(_BaseEvaluateWorkChain):
"""WorkChain to calculate cohisive energy of input structure"""

Expand All @@ -55,8 +126,8 @@ def define(cls, spec):
help='parameters for pwscf of atom calculation for each element in structure.')
spec.input('vacuum_length', valid_type=orm.Float,
help='The length of cubic cell in isolate atom calculation.')
spec.expose_inputs(PwBaseWorkflow, namespace="bulk", exclude=["pw.structure", "pw.pseudos"])
spec.expose_inputs(PwBaseWorkflow, namespace="atom", exclude=["pw.structure", "pw.pseudos"])
spec.expose_inputs(PwBaseWorkChain, namespace="bulk", exclude=["pw.structure", "pw.pseudos"])
spec.expose_inputs(PwBaseWorkChainWithMemoryHandler, namespace="atom", exclude=["pw.structure", "pw.pseudos"])

spec.outline(
cls.validate_structure,
Expand Down Expand Up @@ -111,18 +182,20 @@ def _get_pseudo(element, pseudos):

def run_energy(self):
"""set the inputs and submit atom/bulk energy evaluation parallel"""
bulk_inputs = self.exposed_inputs(PwBaseWorkflow, namespace="bulk")
bulk_inputs = self.exposed_inputs(PwBaseWorkChain, namespace="bulk")
bulk_inputs["pw"]["structure"] = self.inputs.structure
bulk_inputs["pw"]["pseudos"] = self.inputs.pseudos

running_bulk_energy = self.submit(PwBaseWorkflow, **bulk_inputs)
running_bulk_energy = self.submit(PwBaseWorkChain, **bulk_inputs)
self.report(
f"Submit SCF calculation of bulk {self.inputs.structure.get_description()}"
)
self.to_context(workchain_bulk_energy=running_bulk_energy)

for element, structure in self.ctx.d_element_structure.items():
atom_inputs = self.exposed_inputs(PwBaseWorkflow, namespace="atom")
atom_inputs = self.exposed_inputs(
PwBaseWorkChainWithMemoryHandler, namespace="atom"
)
atom_inputs["pw"]["structure"] = structure
atom_inputs["pw"]["pseudos"] = {
element: self._get_pseudo(element, self.inputs.pseudos),
Expand All @@ -131,7 +204,9 @@ def run_energy(self):
dict=self.inputs.atom_parameters[element]
)

running_atom_energy = self.submit(PwBaseWorkflow, **atom_inputs)
running_atom_energy = self.submit(
PwBaseWorkChainWithMemoryHandler, **atom_inputs
)
self.logger.info(f"Submit atomic SCF of {element}.")
self.to_context(workchain_atom_children=append_(running_atom_energy))

Expand Down
15 changes: 14 additions & 1 deletion aiida_sssp_workflow/workflows/verifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ def define(cls, spec):
help='The `pw.x` code use for the `PwCalculation`.')
spec.input('ph_code', valid_type=orm.AbstractCode, required=False,
help='The `ph.x` code use for the `PhCalculation`.')
spec.input('pw_code_large_memory', valid_type=orm.AbstractCode, required=False,
help='The `pw.x` code use for the `PwCalculation` require large memory.')
spec.input('pseudo', valid_type=UpfData, required=True,
help='Pseudopotential to be verified')
spec.input('wavefunction_cutoff', valid_type=orm.Float, required=False, default=lambda: orm.Float(100.0),
Expand Down Expand Up @@ -221,9 +223,20 @@ def init_setup(self):

convergence_inputs["clean_workdir"] = self.inputs.clean_workdir

for prop in ["cohesive_energy", "delta", "pressure"]:
for prop in ["delta", "pressure"]:
self.ctx.convergence_inputs[prop] = convergence_inputs.copy()

# The cohesive energy evaluation may hit the ran out of memory issue,
# so use the pw_code_large_memory if provided.
if "convergence.cohesive_energy" in self.ctx.properties_list:
inputs_cohesive_energy = convergence_inputs.copy()
if "pw_code_large_memory" in self.inputs:
inputs_cohesive_energy[
"pw_code_large_memory"
] = self.inputs.pw_code_large_memory

self.ctx.convergence_inputs["cohesive_energy"] = inputs_cohesive_energy

# Here, the shallow copy can be used since the type of convergence_inputs
# is AttributesDict.
# The deepcopy can't be used, since it will create new data node.
Expand Down

0 comments on commit ce90977

Please sign in to comment.