Skip to content

Commit

Permalink
Merge pull request #72 from i-VRESSE/dirac-test-fail-71
Browse files Browse the repository at this point in the history
Dirac test fail 71
  • Loading branch information
sverhoeven authored Jun 26, 2023
2 parents 60ed012 + 7104901 commit fa472c2
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 69 deletions.
3 changes: 1 addition & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ repos:
name: Check with Flake8
entry: poetry run flake8
language: system
pass_filenames: false
types: [python]
args: [--count, .]
args: [--count]

- id: mypy
name: Validate types with MyPy
Expand Down
10 changes: 3 additions & 7 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,12 @@ If you need [DIRAC](http://diracgrid.org/) support create a conda environment
instead of creating a virtual environment.

```bash
mamba create --name bartender dirac-grid python=3.10
mamba create --name bartender dirac-grid python=3.10 poetry=1.5.1
conda activate bartender
poetry install
```

The conda environment contains all DIRAC dependencies.
Install DIRAC itself with

```bash
pip install DIRAC==8.0
```
The conda environment contains all DIRAC dependencies and DIRAC itself.

(Cannot use `poetry install --with=dirac` as Poetry gets stuck resolving
dependencies because it ignores the already installed DIRAC dependencies.)
Expand Down
19 changes: 19 additions & 0 deletions docs/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,22 @@ do this outside container with
docker compose -f tests_dirac/docker-compose.yml rm -fs dirac-tuto
docker compose -f tests_dirac/docker-compose.yml up dirac-tuto
```

When the DIRAC server container is running you can login to it with

```shell
docker exec -ti tests_dirac-dirac-tuto-1 bash
. bashrc
dirac-proxy-init -K ~diracuser/.globus/userkey.pem -C ~diracuser/.globus/usercert.pem
# to fetch job status
dirac-wms-job-status 1
dirac-wms-job-logging-info 1
# to download raw logs
dirac-wms-job-get-output 1
# to download output files
dirac-wms-job-get-output-data 1
# to browse storage
dirac-dms-filecatalog-cli
# pilot logs
cat ~diracpilot/localsite/output/*
```
3 changes: 3 additions & 0 deletions src/bartender/filesystems/dirac.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ async def download(self, src: JobDescription, target: JobDescription) -> None:
if not result["OK"]:
raise RuntimeError(result["Message"])
archive_fn_in_tmpdir = Path(tmpdirname) / archive_base_fn
if not archive_fn_in_tmpdir.exists():
# Failed job does not have a output.tar
return
# TODO what happens if file in job_dir already exists?
logger.warning(f"Unpacking {archive_fn_in_tmpdir} to {target.job_dir}")
await async_wrap(unpack_archive)(archive_fn_in_tmpdir, target.job_dir)
Expand Down
23 changes: 20 additions & 3 deletions src/bartender/filesystems/dirac_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import re
from typing import Literal

from pydantic import BaseModel
from pydantic import BaseModel, validator

from bartender.shared.dirac_config import ProxyConfig

Expand All @@ -9,8 +10,11 @@ class DiracFileSystemConfig(BaseModel):
"""Configuration for DIRAC file system.
Args:
lfn_root: Location on grid storage where files of jobs can be stored. Used to
localize description.
lfn_root: Location on grid storage where files of jobs can be stored.
Used to localize description.
To stage output files the root should be located within
the user's home directory.
Home directory is formatted like `/<VO>/user/<initial>/<username>`.
storage_element: Storage element for lfn_root.
proxy: Proxy configuration.
"""
Expand All @@ -19,3 +23,16 @@ class DiracFileSystemConfig(BaseModel):
lfn_root: str
storage_element: str
proxy: ProxyConfig = ProxyConfig()

@validator("lfn_root")
def _validate_lfn_root(
cls, # noqa: N805 signature of validator
v: str, # noqa: WPS111 signature of validator
) -> str:
pattern = r"^\/\w+\/user\/([a-zA-Z])\/\1\w+\/.*$"
if not re.match(pattern, v):
template = "/<VO>/user/<initial>/<username>/<whatever>"
raise ValueError(
f"{v} should match the format `{template}`",
)
return v
23 changes: 22 additions & 1 deletion src/bartender/schedulers/abstract.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from abc import ABC, abstractmethod
from pathlib import Path
from types import TracebackType
from typing import Optional, Type
from typing import Optional, Tuple, Type

import aiofiles
from pydantic import BaseModel

from bartender.db.models.job_model import State
Expand Down Expand Up @@ -75,6 +76,26 @@ async def cancel(self, job_id: str) -> None:
job_id: Identifier of job.
"""

async def logs(self, job_id: str, job_dir: Path) -> Tuple[str, str]:
"""Get stdout and stderr of a job.
If job has not completed, then will raise an exception.
If job completed, then stdout,txt and stderr.txt are read from job_dir.
Args:
job_id: Identifier of job.
job_dir: Directory where stdout.txt and stderr.txt files
of job are stored.
Returns:
Tuple of stdout and stderr.
"""
async with aiofiles.open(job_dir / "stdout.txt", mode="r") as fout:
stdout = await fout.read()
async with aiofiles.open(job_dir / "stderr.txt", mode="r") as ferr:
stderr = await ferr.read()
return stdout, stderr

async def __aenter__(self) -> "AbstractScheduler":
return self

Expand Down
159 changes: 119 additions & 40 deletions src/bartender/schedulers/dirac.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import logging
import tarfile
from io import BytesIO
from pathlib import Path
from textwrap import dedent
from typing import Tuple

import aiofiles
from aiofiles.tempfile import TemporaryDirectory
from DIRAC.Core.Utilities.ReturnValues import DReturnType
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import (
JobMonitoringClient,
)
from DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient import SandboxStoreClient
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient

from bartender.async_utils import async_wrap
Expand Down Expand Up @@ -165,6 +170,50 @@ async def close(self) -> None:
"""Close scheduler."""
await teardown_proxy_renewer()

async def logs(self, job_id: str, job_dir: Path) -> Tuple[str, str]:
"""Get stdout and stderr of a job.
If job has not completed, then will raise an exception.
If job completed succesfully, then stdout,txt and stderr.txt are read
from job_dir.
If job failed, then stdout and stderr are read from grid storage.
Logs of a failed job includes logs of unpacking/packing the
input and output files.
The logs can be fetched on command line
with `dirac-wms-job-get-output <job id>`.
Args:
job_id: Identifier of job.
job_dir: Directory where stdout.txt and stderr.txt files
of job are stored.
Raises:
RuntimeError: When fetching of logs fails.
Returns:
Tuple of stdout and stderr.
"""
try:
return await super().logs(job_id, job_dir)
except FileNotFoundError:
logger.info("Failed to fetch logs from job_dir, trying grid storage")
download_sandbox = async_wrap(SandboxStoreClient().downloadSandboxForJob)
sandbox: DReturnType[bytes] = await download_sandbox(
job_id,
"Output",
inMemory=True,
)
if "Message" in sandbox:
message = sandbox.get("Message")
raise RuntimeError(f"Failed to fetch logs for {job_id}: {message}")
sandbox_bytes = sandbox["Value"]
with tarfile.open(fileobj=BytesIO(sandbox_bytes)) as tar:
await _extract_text_file(tar, "jobstdout.txt", job_dir / "stdout.txt")
await _extract_text_file(tar, "jobstderr.txt", job_dir / "stderr.txt")
return await super().logs(job_id, job_dir)

async def _job_script(self, description: JobDescription, scriptdir: Path) -> Path:
file = Path(scriptdir / "job.sh")
# default OutputSandboxLimit is 10Mb which is too small sometimes,
Expand All @@ -176,56 +225,51 @@ async def _job_script(self, description: JobDescription, scriptdir: Path) -> Pat
return file

def _job_script_content(self, description: JobDescription) -> str:
stage_in = self._stage_in_script(description)
stage_out = self._stage_out_script(description)
command = self._command_script(description)
# TODO when command has non-zero return code then
# output.tar is not uploaded,
# you can get logs with scheduduler.raw_logs()
# but output.tar is gone, wouild be nice to have it
# see possible options at
# https://github.com/i-VRESSE/bartender/pull/72
return dedent(
f"""\
#!/bin/bash
set -e
{stage_in}
{command}
{stage_out}
""",
)

def _stage_in_script(self, description: JobDescription) -> str:
fn = "input.tar"
fn_on_grid = description.job_dir / fn
return dedent(
f"""\
# Download & unpack input files
dirac-dms-get-file {fn_on_grid}
tar -xf {fn}
rm {fn}
# Unpack input files
tar -xf input.tar
rm input.tar
find . -type f > .input_files.txt
""",
)

def _stage_out_script(self, description: JobDescription) -> str:
fn = "output.tar"
fn_on_grid = description.job_dir / fn
se = self.config.storage_element
return dedent(
f"""\
# Pack & upload output files
echo {fn} >> .input_files.txt
tar -cf {fn} --exclude-from=.input_files.txt .
dirac-dms-add-file {fn_on_grid} {fn} {se}
rm {fn}
{command}
# Pack output files
echo output.tar >> .input_files.txt
tar -cf output.tar --exclude-from=.input_files.txt .
exit $(cat returncode)
""",
)

def _command_script(self, description: JobDescription) -> str:
command = description.command
if self.config.apptainer_image:
image = self.config.apptainer_image
# TODO if command is complex then qoutes are likely needed
command = f"apptainer run {image} {description.command}"
# added echo so DIRAC
# uploads sandbox and output.tar
# if stdout and stderr are empty then
# sandbox (containing jobstderr.txt and jobstdout.txt)
# is not uploaded and uploading output.tar fails due to
# UnboundLocalError: local variable 'result_sbUpload'

# added cat of stdout.txt and stderr.txt
# so if job fails then logs are available in output sandbox.
return dedent(
f"""\
# Run command
echo 'Running command for {description.job_dir}'
({command}) > stdout.txt 2> stderr.txt
echo -n $? > returncode
cat stdout.txt
cat stderr.txt >&2
""",
)

Expand All @@ -234,20 +278,55 @@ async def _jdl_script(self, description: JobDescription, scriptdir: Path) -> str
abs_job_sh = jobsh.absolute()

job_name = description.job_dir.name
# The jobstdout.txt and jobstderr.txt can be fetched
# with `dirac-wms-job-get-output <job id>`.
# TODO add input.tar in inputsandbox instead of dirac-dms-get-file
# tried but got `Failed Input Data Resolution` error
# TODO add output.tar in OutputData+OutputSE instead of dirac-dms-add-file
# tried but got
# `JobWrapperError: No output SEs defined in VO configuration` error
# jobstdout.txt and jobstderr.txt contain logs of whole job
# including logs of execution of command.

# OutputPath must be relative to user's home directory
# to prevent files being uploaded outside user's home directory.
output_path = _relative_output_dir(description)
return dedent(
f"""\
JobName = "{job_name}";
Executable = "{jobsh.name}";
InputSandbox = {{"{abs_job_sh}"}};
InputData = {{ "{description.job_dir}/input.tar" }};
InputDataModule = "DIRAC.WorkloadManagementSystem.Client.InputDataResolution";
InputDataPolicy = "DIRAC.WorkloadManagementSystem.Client.DownloadInputData";
OutputData = {{ "output.tar" }};
OutputSE = {{ "{self.config.storage_element}" }};
OutputPath = "{output_path}";
StdOutput = "jobstdout.txt";
StdError = "jobstderr.txt";
OutputSandbox = {{ "jobstdout.txt", "jobstderr.txt" }};
""",
""", # noqa: E501, WPS237
)


def _relative_output_dir(description: JobDescription) -> Path:
"""Return description.output_dir relative to user's home directory.
user home directory is /<vo>/user/<initial>/<user>
to write /tutoVO/user/c/ciuser/bartenderjobs/job1/input.tar
OutputPath must be bartenderjobs/job1
Args:
description: Description of job.
Returns:
.description.output_dir relative to user's home directory.
"""
nr_home_dir_parts = 5
return Path(*description.job_dir.parts[nr_home_dir_parts:])


async def _extract_text_file(tar: tarfile.TarFile, src: str, target: Path) -> None:
"""Extract text file from tarfile.
Args:
tar: Tarfile to extract file from.
src: Name of file to extract.
target: Path to write file to.
"""
aextract = async_wrap(tar.extract)
await aextract(src, target.parent)
(target.parent / src).rename(target)
Loading

0 comments on commit fa472c2

Please sign in to comment.