Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move environment configuration to its own class #784

Merged
merged 13 commits into from
Dec 15, 2023
Merged
208 changes: 171 additions & 37 deletions flow/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
This enables the user to adjust their workflow based on the present
environment, e.g. for the adjustment of scheduler submission scripts.
"""
import enum
import logging
import os
import re
Expand All @@ -27,7 +28,7 @@
_WALLTIME,
_Directives,
)
from .errors import NoSchedulerError
from .errors import NoSchedulerError, SubmitError
from .scheduling.base import JobStatus
from .scheduling.fake_scheduler import FakeScheduler
from .scheduling.lsf import LSFScheduler
Expand Down Expand Up @@ -90,6 +91,169 @@ def template_filter(func):
return classmethod(func)


class _NodeTypes(enum.Enum):
"""Defines for a partition the acceptable node requests.

- SHARED: Only support partial nodes requests to full node. Note that you
can restrict the cores provided in `_PartitionConfig` to restrict below
even full nodes.
- MIXED: No restriction on partial to full node requests.
- WHOLENODE: Only support submissions in units of whole nodes.
"""

SHARED = 1
MIXED = 2
WHOLENODE = 3


class _PartitionConfig:
"""The configuration of partition for a given environment.

Currently supports the ideas of
- CPUs for a partition
- GPUs for a partition
- Node type of a partition

When querying a value for a specific partition the logic first searches the
provided mapping if any for the partition, if it is not found then the
mapping is searched for "default" if it exists, if not the class default is
used.

1. Partition specific -> 2. Provided default -> 3. _PartitionConfig default
b-butler marked this conversation as resolved.
Show resolved Hide resolved

The class defaults are

- CPUs: ``None`` which represents any number. This will be interpretted as
an unlimited supply of CPUs practically.
- GPUs: 0.
- Node type: `_NodeTypes.MIXED`.

Parameters
----------
cpus_per_node: dict[str, int], optional
Mapping between partitions and CPUs per node. Defaults to an empyt `dict`.
gpus_per_node: dict[str, int], optional
Mapping between partitions and GPUs per node. Defaults to an empyt `dict`.
node_types: dict[str, _NodeTypes], optional
Mapping between partitions and node types. Defaults to an empyt `dict`.
"""

_default_cpus_per_node = None
_default_gpus_per_node = 0
_default_node_type = _NodeTypes.MIXED

def __init__(self, cpus_per_node=None, gpus_per_node=None, node_types=None):
self.cpus_per_node = {} if cpus_per_node is None else cpus_per_node
self.gpus_per_node = {} if gpus_per_node is None else gpus_per_node
self.node_types = {} if node_types is None else node_types

def __getitem__(self, partition):
"""Get the `_Partition` object for the provided partition."""
return _Partition(
partition,
self._get(partition, self.cpus_per_node, self._default_cpus_per_node),
self._get(partition, self.gpus_per_node, self._default_gpus_per_node),
self._get(partition, self.node_types, self._default_node_type),
)

@staticmethod
def _get(key, mapping, default):
"""Get the value of key following the class priority chain."""
return mapping.get(key, mapping.get("default", default))


class _Partition:
"""Represents a partition and associated data.

Parameters
----------
name: str
The name of the partition.
cpus: int
The CPUs per node.
gpus: int
The GPUs per node.
node_type: _NodeTypes
The node type for the partition.

Attributes
----------
name: str
The name of the partition.
cpus: int
The CPUs per node.
gpus: int
The GPUs per node.
node_type: _NodeTypes
The node type for the partition.
"""

def __init__(self, name, cpus, gpus, node_type):
# Use empty string for error messages.
self.name = name if name is not None else ""
self.gpus = gpus
self.cpus = cpus
self.node_type = node_type

def calculate_num_nodes(self, cpu_tasks, gpu_tasks, force):
"""Compute the number of nodes for the given workload.

Parameters
----------
cpu_tasks: int
Total CPU tasks/cores.
gpu_tasks: int
Total GPUs requested.
force: bool
Whether to allow seemingly nonsensical/erronous resource requests.

Raises
------
SubmitError:
Raises a SubmitError for
1. non-zero GPUs on non-GPU partitions
2. zero GPUs on GPU partitions.
3. Requests larger than a node on `_NodeTypes.SHARED` partitions (through
`~._nodes_for_task`).
4. Requests less than 0.9 of the last node for `_NodeTypes.WHOLENODE`
partitions (through `calc_num_nodes`)
if ``not force``.
"""
threshold = 0.9 if self.node_type == _NodeTypes.WHOLENODE and not force else 0.0
if gpu_tasks > 0:
if self.gpus == 0:
# Required for current tests. Also skips a divide by zero error
# if user actually wants to submit CPU only jobs to GPU partitions.
if force:
num_nodes_gpu = 1
else:
raise SubmitError(
f"Cannot request GPU's on nonGPU partition, {self.name}."
)
else:
num_nodes_gpu = self._nodes_for_task(gpu_tasks, self.gpus, threshold)
num_nodes_cpu = self._nodes_for_task(cpu_tasks, self.cpus, 0)
else:
if self.gpus > 0 and not force:
raise SubmitError(
f"Cannot submit to GPU partition, {self.name}, without GPUs."
)
num_nodes_gpu = 0
num_nodes_cpu = self._nodes_for_task(cpu_tasks, self.cpus, threshold)
return max(num_nodes_cpu, num_nodes_gpu, 1)

def _nodes_for_task(self, tasks, processors, threshold):
"""Call calc_num_nodes but handles the None sentinal value."""
if processors is None:
return 1
nodes = calc_num_nodes(tasks, processors, threshold)
if self.node_type == _NodeTypes.SHARED and nodes > 1:
raise SubmitError(
f"Cannot submit {tasks} tasks to shared partition {self.name}"
)
return nodes


class ComputeEnvironment(metaclass=_ComputeEnvironmentType):
"""Define computational environments.

Expand All @@ -109,9 +273,7 @@ class ComputeEnvironment(metaclass=_ComputeEnvironmentType):
template = "base_script.sh"
mpi_cmd = "mpiexec"

_cpus_per_node = {"default": -1}
_gpus_per_node = {"default": -1}
_shared_partitions = set()
_partition_config = _PartitionConfig()

@classmethod
def is_present(cls):
Expand Down Expand Up @@ -296,12 +458,8 @@ def _get_scheduler_values(cls, context):
-------
Must be called after the rest of the template context has been gathered.
"""
partition = context.get("partition", None)
partition = cls._partition_config[context.get("partition", None)]
force = context.get("force", False)
if force or partition in cls._shared_partitions:
threshold = 0.0
else:
threshold = 0.9
cpu_tasks_total = calc_tasks(
context["operations"],
"np",
Expand All @@ -315,40 +473,16 @@ def _get_scheduler_values(cls, context):
context.get("force", False),
)

if gpu_tasks_total > 0:
num_nodes_gpu = cls._calc_num_nodes(
gpu_tasks_total, cls._get_gpus_per_node(partition), threshold
)
num_nodes_cpu = cls._calc_num_nodes(
cpu_tasks_total, cls._get_cpus_per_node(partition), 0
)
else:
num_nodes_gpu = 0
num_nodes_cpu = cls._calc_num_nodes(
cpu_tasks_total, cls._get_cpus_per_node(partition), threshold
)
num_nodes = max(num_nodes_cpu, num_nodes_gpu, 1)
num_nodes = partition.calculate_num_nodes(
cpu_tasks_total, gpu_tasks_total, force
)

return {
"ncpu_tasks": cpu_tasks_total,
"ngpu_tasks": gpu_tasks_total,
"num_nodes": num_nodes,
}

@classmethod
def _get_cpus_per_node(cls, partition):
return cls._cpus_per_node.get(partition, cls._cpus_per_node["default"])

@classmethod
def _get_gpus_per_node(cls, partition):
return cls._gpus_per_node.get(partition, cls._gpus_per_node["default"])

@classmethod
def _calc_num_nodes(cls, tasks, processors, threshold):
"""Call calc_num_nodes but handles the -1 sentinal value."""
if processors == -1:
return 1
return calc_num_nodes(tasks, processors, threshold)


class StandardEnvironment(ComputeEnvironment):
"""Default environment which is always present."""
Expand Down
30 changes: 22 additions & 8 deletions flow/environments/incite.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from ..environment import (
DefaultLSFEnvironment,
DefaultSlurmEnvironment,
_NodeTypes,
_PartitionConfig,
template_filter,
)
from ..util.template_filters import check_utilization
Expand All @@ -36,8 +38,11 @@ def my_operation(job):
hostname_pattern = r".*\.summit\.olcf\.ornl\.gov"
template = "summit.sh"
mpi_cmd = "jsrun"
_cpus_per_node = {"default": 42}
_gpus_per_node = {"default": 6}
_partition_config = _PartitionConfig(
cpus_per_node={"default": 42},
gpus_per_node={"default": 6},
node_types={"default": _NodeTypes.WHOLENODE},
)

@template_filter
def calc_num_nodes(cls, resource_sets, parallel=False):
Expand Down Expand Up @@ -187,8 +192,11 @@ class AndesEnvironment(DefaultSlurmEnvironment):
hostname_pattern = r"andes-.*\.olcf\.ornl\.gov"
template = "andes.sh"
mpi_cmd = "srun"
_cpus_per_node = {"default": 32, "gpu": 28}
_gpus_per_node = {"default": 0, "gpu": 2}
_partition_config = _PartitionConfig(
cpus_per_node={"default": 32, "gpu": 28},
gpus_per_node={"gpu": 2},
node_types={"default": _NodeTypes.WHOLENODE},
)

@classmethod
def add_args(cls, parser):
Expand Down Expand Up @@ -217,8 +225,11 @@ class CrusherEnvironment(DefaultSlurmEnvironment):

hostname_pattern = r".*\.crusher\.olcf\.ornl\.gov"
template = "crusher.sh"
_cpus_per_node = {"default": 56}
_gpus_per_node = {"default": 8}
_partition_config = _PartitionConfig(
cpus_per_node={"default": 56},
gpus_per_node={"default": 8},
node_types={"default": _NodeTypes.WHOLENODE},
)

mpi_cmd = "srun"

Expand Down Expand Up @@ -269,8 +280,11 @@ class FrontierEnvironment(DefaultSlurmEnvironment):

hostname_pattern = r".*\.frontier\.olcf\.ornl\.gov"
template = "frontier.sh"
_cpus_per_node = {"default": 56}
_gpus_per_node = {"default": 8}
_partition_config = _PartitionConfig(
cpus_per_node={"default": 56},
gpus_per_node={"default": 8},
node_types={"default": _NodeTypes.WHOLENODE},
)
mpi_cmd = "srun"

@classmethod
Expand Down
16 changes: 12 additions & 4 deletions flow/environments/purdue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""Environments for Purdue supercomputers."""
import logging

from ..environment import DefaultSlurmEnvironment
from ..environment import DefaultSlurmEnvironment, _NodeTypes, _PartitionConfig

logger = logging.getLogger(__name__)

Expand All @@ -18,9 +18,17 @@ class AnvilEnvironment(DefaultSlurmEnvironment):
hostname_pattern = r".*\.anvil\.rcac\.purdue\.edu$"
template = "anvil.sh"
mpi_cmd = "mpirun"
_cpus_per_node = {"default": 128}
_gpus_per_node = {"default": 4}
_shared_partitions = {"debug", "gpu-debug", "shared", "highmem", "gpu"}
_partition_config = _PartitionConfig(
cpus_per_node={"default": 128},
gpus_per_node={"gpu": 4, "gpu-debug": 4},
node_types={
"gpu-debug": _NodeTypes.SHARED,
"shared": _NodeTypes.SHARED,
"highmem": _NodeTypes.SHARED,
"wholenode": _NodeTypes.WHOLENODE,
"wide": _NodeTypes.WHOLENODE,
},
)

@classmethod
def add_args(cls, parser):
Expand Down
10 changes: 5 additions & 5 deletions flow/environments/umich.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# All rights reserved.
# This software is licensed under the BSD 3-Clause License.
"""Environments for the University of Michigan HPC environment."""
from ..environment import DefaultSlurmEnvironment
from ..environment import DefaultSlurmEnvironment, _PartitionConfig


class GreatLakesEnvironment(DefaultSlurmEnvironment):
Expand All @@ -13,10 +13,10 @@ class GreatLakesEnvironment(DefaultSlurmEnvironment):

hostname_pattern = r"gl(-login)?[0-9]+\.arc-ts\.umich\.edu"
template = "umich-greatlakes.sh"
_cpus_per_node = {"default": 36, "gpu": 40}
_gpus_per_node = {"default": 2}
_shared_partitions = {"standard", "gpu"}
b-butler marked this conversation as resolved.
Show resolved Hide resolved

_partition_config = _PartitionConfig(
cpus_per_node={"default": 36, "gpu": 40},
gpus_per_node={"gpu": 2},
)
# For unknown reasons, srun fails to export environment variables such as
# PATH on Great Lakes unless explicitly requested to with --export=ALL.
# On Great Lakes, srun also fails to flush the buffer until the end of
Expand Down
Loading