-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
33 changed files
with
2,132 additions
and
121 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
FROM ghcr.io/ecoextreml/stemmus_scope:1.5.0 | ||
|
||
LABEL maintainer="Bart Schilperoort <[email protected]>" | ||
LABEL org.opencontainers.image.source = "https://github.com/EcoExtreML/STEMMUS_SCOPE_Processing" | ||
|
||
# Requirements for building Python 3.10 | ||
RUN apt-get update && apt-get -y upgrade | ||
RUN apt-get install -y build-essential zlib1g-dev libncurses5-dev libgdbm-dev \ | ||
libnss3-dev libssl-dev libreadline-dev libffi-dev libsqlite3-dev wget libbz2-dev | ||
RUN apt-get install -y libhdf5-serial-dev | ||
|
||
# Get Python source and compile | ||
WORKDIR /python | ||
RUN wget https://www.python.org/ftp/python/3.10.12/Python-3.10.12.tgz --no-check-certificate | ||
RUN tar -xf Python-3.10.*.tgz | ||
WORKDIR /python/Python-3.10.12 | ||
RUN ./configure --prefix=/usr/local --enable-optimizations --enable-shared LDFLAGS="-Wl,-rpath /usr/local/lib" | ||
RUN make -j $(nproc) | ||
RUN make altinstall | ||
WORKDIR / | ||
|
||
# Pip install PyStemmusScope and dependencies | ||
COPY . /opt/PyStemmusScope | ||
RUN pip3.10 install /opt/PyStemmusScope/[docker] | ||
RUN pip3.10 install grpc4bmi==0.5.0 | ||
|
||
# # Set the STEMMUS_SCOPE environmental variable, so the BMI can find the executable | ||
WORKDIR / | ||
ENV STEMMUS_SCOPE /STEMMUS_SCOPE | ||
|
||
EXPOSE 55555 | ||
# Start grpc4bmi server | ||
CMD run-bmi-server --name "PyStemmusScope.bmi.implementation.StemmusScopeBmi" --port 55555 --debug |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
"""The Docker STEMMUS_SCOPE model process wrapper.""" | ||
import os | ||
import socket as pysocket | ||
import warnings | ||
from time import sleep | ||
from typing import Any | ||
from PyStemmusScope.bmi.docker_utils import check_tags | ||
from PyStemmusScope.bmi.docker_utils import find_image | ||
from PyStemmusScope.bmi.docker_utils import make_docker_vols_binds | ||
from PyStemmusScope.bmi.utils import MATLAB_ERROR | ||
from PyStemmusScope.bmi.utils import PROCESS_FINALIZED | ||
from PyStemmusScope.bmi.utils import PROCESS_READY | ||
from PyStemmusScope.bmi.utils import MatlabError | ||
from PyStemmusScope.config_io import read_config | ||
|
||
|
||
try: | ||
import docker | ||
except ImportError: | ||
docker = None | ||
|
||
|
||
def _model_is_ready(socket: Any, client: Any, container_id: Any) -> None: | ||
return _wait_for_model(PROCESS_READY, socket, client, container_id) | ||
|
||
|
||
def _model_is_finalized(socket: Any, client: Any, container_id: Any) -> None: | ||
return _wait_for_model(PROCESS_FINALIZED, socket, client, container_id) | ||
|
||
|
||
def _wait_for_model(phrase: bytes, socket: Any, client: Any, container_id: Any) -> None: | ||
"""Wait for the model to be ready to receive (more) commands, or is finalized.""" | ||
output = b"" | ||
|
||
while phrase not in output: | ||
try: | ||
data = socket.read(1) | ||
except TimeoutError as err: | ||
client.stop(container_id) | ||
logs = client.logs(container_id).decode("utf-8") | ||
msg = ( | ||
f"Container connection timed out '{container_id['Id']}'." | ||
f"\nPlease inspect logs:\n{logs}" | ||
) | ||
raise TimeoutError(msg) from err | ||
|
||
if data is None: | ||
msg = "Could not read data from socket. Docker container might be dead." | ||
raise ConnectionError(msg) | ||
else: | ||
output += bytes(data) | ||
|
||
if MATLAB_ERROR in output: | ||
client.stop(container_id) | ||
logs = client.logs(container_id).decode("utf-8") | ||
msg = ( | ||
f"Error in container '{container_id['Id']}'.\n" | ||
f"Please inspect logs:\n{logs}" | ||
) | ||
raise MatlabError(msg) | ||
|
||
|
||
def _attach_socket(client, container_id) -> Any: | ||
"""Attach a socket to a container and add a timeout to it.""" | ||
connection_timeout = 30 # seconds | ||
|
||
socket = client.attach_socket(container_id, {"stdin": 1, "stdout": 1, "stream": 1}) | ||
if isinstance(socket, pysocket.SocketIO): | ||
socket._sock.settimeout(connection_timeout) # type: ignore | ||
else: | ||
warnings.warn( | ||
message=( | ||
"Unknown socket type found. This might cause issues with the Docker" | ||
" connection. \nPlease report this to the developers in an issue " | ||
"on: https://github.com/EcoExtreML/STEMMUS_SCOPE_Processing/issues" | ||
), | ||
stacklevel=1, | ||
) | ||
return socket | ||
|
||
|
||
class StemmusScopeDocker: | ||
"""Communicate with a STEMMUS_SCOPE Docker container.""" | ||
|
||
# Default image, can be overridden with config: | ||
compatible_tags = ("1.5.0",) | ||
|
||
_process_ready_phrase = b"Select BMI mode:" | ||
_process_finalized_phrase = b"Finished clean up." | ||
|
||
def __init__(self, cfg_file: str): | ||
"""Create the Docker container..""" | ||
self.cfg_file = cfg_file | ||
config = read_config(cfg_file) | ||
|
||
self.image = config["DockerImage"] | ||
find_image(self.image) | ||
check_tags(self.image, self.compatible_tags) | ||
|
||
self.client = docker.APIClient() | ||
|
||
vols, binds = make_docker_vols_binds(cfg_file) | ||
self.container_id = self.client.create_container( | ||
self.image, | ||
stdin_open=True, | ||
tty=True, | ||
detach=True, | ||
user=f"{os.getuid()}:{os.getgid()}", # ensure correct user for writing files. | ||
volumes=vols, | ||
host_config=self.client.create_host_config(binds=binds), | ||
) | ||
|
||
self.running = False | ||
|
||
def _wait_for_model(self) -> None: | ||
"""Wait for the model to be ready to receive (more) commands.""" | ||
_model_is_ready(self.socket, self.client, self.container_id) | ||
|
||
def is_alive(self) -> bool: | ||
"""Return if the process is alive.""" | ||
return self.running | ||
|
||
def initialize(self) -> None: | ||
"""Initialize the model and wait for it to be ready.""" | ||
if self.is_alive(): | ||
self.client.stop(self.container_id) | ||
|
||
self.client.start(self.container_id) | ||
self.socket = _attach_socket(self.client, self.container_id) | ||
|
||
self._wait_for_model() | ||
os.write( | ||
self.socket.fileno(), | ||
bytes(f'initialize "{self.cfg_file}"\n', encoding="utf-8"), | ||
) | ||
self._wait_for_model() | ||
|
||
self.running = True | ||
|
||
def update(self) -> None: | ||
"""Update the model and wait for it to be ready.""" | ||
if self.is_alive(): | ||
os.write(self.socket.fileno(), b"update\n") | ||
self._wait_for_model() | ||
else: | ||
msg = "Docker container is not alive. Please restart the model." | ||
raise ConnectionError(msg) | ||
|
||
def finalize(self) -> None: | ||
"""Finalize the model.""" | ||
if self.is_alive(): | ||
os.write(self.socket.fileno(), b"finalize\n") | ||
_model_is_finalized( | ||
self.socket, | ||
self.client, | ||
self.container_id, | ||
) | ||
sleep(0.5) # Ensure the container can stop cleanly. | ||
self.client.stop(self.container_id) | ||
self.running = False | ||
self.client.remove_container(self.container_id, v=True) | ||
else: | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
"""Utility functions for making the docker process work.""" | ||
import warnings | ||
from pathlib import Path | ||
from PyStemmusScope.config_io import read_config | ||
|
||
|
||
try: | ||
import docker | ||
except ImportError: | ||
docker = None | ||
|
||
|
||
def make_docker_vols_binds(cfg_file: str) -> tuple[list[str], list[str]]: | ||
"""Make docker volume mounting configs. | ||
Args: | ||
cfg_file: Location of the config file | ||
Returns: | ||
volumes, binds | ||
""" | ||
cfg = read_config(cfg_file) | ||
cfg_dir = Path(cfg_file).parent | ||
volumes = [] | ||
binds = [] | ||
|
||
# Make sure no subpaths are mounted: | ||
if not cfg_dir.is_relative_to(cfg["InputPath"]): | ||
volumes.append(str(cfg_dir)) | ||
binds.append(f"{str(cfg_dir)}:{str(cfg_dir)}") | ||
if (not Path(cfg["InputPath"]).is_relative_to(cfg_dir)) or ( | ||
Path(cfg["InputPath"]) == cfg_dir | ||
): | ||
volumes.append(cfg["InputPath"]) | ||
binds.append(f"{cfg['InputPath']}:{cfg['InputPath']}") | ||
if not Path(cfg["OutputPath"]).is_relative_to(cfg_dir): | ||
volumes.append(cfg["OutputPath"]) | ||
binds.append(f"{cfg['OutputPath']}:{cfg['OutputPath']}") | ||
|
||
return volumes, binds | ||
|
||
|
||
def check_tags(image: str, compatible_tags: tuple[str, ...]): | ||
"""Check if the tag is compatible with this version of the BMI. | ||
Args: | ||
image: The full image name (including tag) | ||
compatible_tags: Tags which are known to be compatible with this version of the | ||
BMI. | ||
""" | ||
if ":" not in image: | ||
msg = ( | ||
"Could not validate the Docker image tag, as no tag was provided.\n" | ||
"Please set the Docker image tag in the configuration file." | ||
) | ||
warnings.warn(UserWarning(msg), stacklevel=1) | ||
|
||
tag = image.split(":")[-1] | ||
if tag not in compatible_tags: | ||
msg = ( | ||
f"Docker image tag '{tag}' not found in compatible tags " | ||
f"({compatible_tags}).\n" | ||
"You might experience issues or unexpected results." | ||
) | ||
warnings.warn(UserWarning(msg), stacklevel=1) | ||
|
||
|
||
def find_image(image: str) -> None: | ||
"""See if the desired image is available, and if not, try to pull it.""" | ||
client = docker.APIClient() | ||
images = client.images() | ||
tags = [] | ||
for img in images: | ||
for tag in img["RepoTags"]: | ||
tags.append(tag) | ||
if image not in set(tags): | ||
pull_image(image) | ||
|
||
|
||
def pull_image(image: str) -> None: | ||
"""Pull the image from ghcr/dockerhub.""" | ||
if ":" in image: | ||
image, tag = image.split(":") | ||
else: | ||
tag = None | ||
client = docker.from_env() | ||
image = client.images.pull(image, tag) |
Oops, something went wrong.