Skip to content

Commit

Permalink
Merge pull request #78 from seung-lab/igneous_nfs
Browse files Browse the repository at this point in the history
Make more use of nfs server for igneous tasks
  • Loading branch information
ranlu authored Feb 12, 2024
2 parents 70f266e + 69ee346 commit 4b92ad4
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 25 deletions.
2 changes: 1 addition & 1 deletion cloud/google/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def GenerateConfig(context):
resources['resources'] += easyseg_worker_resource

if "nfsServer" in context.properties:
nfs_server_resource = GenerateNFSServer(context, hostname_nfs_server)
nfs_server_resource = GenerateNFSServer(context, hostname_manager, hostname_nfs_server)
resources['resources'] += nfs_server_resource

return resources
35 changes: 26 additions & 9 deletions cloud/google/nfs_server.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
from common import ZonalComputeUrl, GenerateDisk, GenerateBootDisk, GenerateNetworkInterface
from common import ZonalComputeUrl, GenerateDisk, GenerateBootDisk, GenerateNetworkInterface, INSTALL_DOCKER_CMD, GenerateAirflowVar
from workers import GenerateDockerCommand, GenerateCeleryWorkerCommand, GenerateEnvironVar


def GenerateNFSServerStartupScript():
startup_script = '''
def GenerateNFSServerStartupScript(context, hostname_manager):
env_variables = GenerateAirflowVar(context, hostname_manager)

docker_env = [f'-e {k}' for k in env_variables]
docker_image = context.properties['seuronImage']

oom_canary_cmd = GenerateDockerCommand(docker_image, docker_env) + ' ' + "python utils/memory_monitor.py ${AIRFLOW__CELERY__BROKER_URL} bot-message-queue >& /dev/null"
worker_cmd = GenerateCeleryWorkerCommand(docker_image, docker_env+['-p 8793:8793'], queue="nfs", concurrency=1)

startup_script = f'''
#!/bin/bash
set -e
mkdir -p /share
#DRIVES=($(lsblk | grep -oE 'nvme[a-z0-9A-Z]*' | cut -d' ' -f1 | awk '{ print "/dev/"$1 }'))
#mdadm --create /dev/md0 --level=0 --raid-devices=${#DRIVES[@]} ${DRIVES[@]}
#mkfs.ext4 -F /dev/md0
#mount /dev/md0 /share
mkdir -p /var/log/airflow/logs
chmod 777 /var/log/airflow/logs
DEBIAN_FRONTEND=noninteractive apt-get -y -o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confold" dist-upgrade
{INSTALL_DOCKER_CMD}
{GenerateEnvironVar(context, env_variables)}
if [ ! -f "/etc/bootstrap_done" ]; then
Expand All @@ -35,14 +47,17 @@ def GenerateNFSServerStartupScript():
mount /dev/sdb /share
chmod 777 /share
systemctl restart nfs-kernel-server.service
{oom_canary_cmd} &
{worker_cmd}
'''
return startup_script


def GenerateNFSServer(context, hostname_nfs_server):
def GenerateNFSServer(context, hostname_manager, hostname_nfs_server):
nfs_server_param = context.properties["nfsServer"]

startup_script = GenerateNFSServerStartupScript()
startup_script = GenerateNFSServerStartupScript(context, hostname_manager)

diskType = ZonalComputeUrl(
context.env['project'],
Expand Down Expand Up @@ -78,8 +93,10 @@ def GenerateNFSServer(context, hostname_nfs_server):
'networkInterfaces': [GenerateNetworkInterface(context, nfs_server_param['subnetwork'])],
'serviceAccounts': [{
'scopes': [
'https://www.googleapis.com/auth/compute',
'https://www.googleapis.com/auth/logging.write',
'https://www.googleapis.com/auth/monitoring.write',
'https://www.googleapis.com/auth/devstorage.read_write',
],
}],
}
Expand Down
2 changes: 2 additions & 0 deletions cloud/google/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def GenerateWorkers(context, hostname_manager, hostname_nfs_server, worker):
else:
env_variables["HAVE_GPUS"] = "False"

if use_shared_volume:
env_variables["CLOUD_VOLUME_CACHE_DIR"] = "/share/.cloudvolume"

docker_env = [f'-e {k}' for k in env_variables]

Expand Down
12 changes: 8 additions & 4 deletions dags/igneous_and_cloudvolume.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,23 @@ def check_queue(queue, agg=None, refill_threshold=0):
import requests
from time import sleep
from slack_message import slack_message
from urllib.parse import urlparse
from kombu import Connection
import traceback

broker = configuration.get('celery', 'BROKER_URL')
totalTries = 2
nTries = totalTries
count = 0
parsed_uri = urlparse(broker)
rq_host = parsed_uri.hostname
with Connection(broker) as conn:
ret_queue = conn.SimpleQueue(queue+"_ret")
err_queue = conn.SimpleQueue(queue+"_err")
while True:
sleep(5)
try:
ret = requests.get("http://rabbitmq:15672/api/queues/%2f/{}".format(queue), auth=('guest', 'guest'))
ret = requests.get(f"http://{rq_host}:15672/api/queues/%2f/{queue}", auth=('guest', 'guest'))
queue_status = ret.json()
nTasks = queue_status["messages"]
except Exception as e:
Expand Down Expand Up @@ -737,13 +740,14 @@ def submit_igneous_tasks():

ret = globals()["submit_tasks"]()
if isinstance(ret, dict):
tasks = list(ret["task_list"])
ret["task_list"] = tasks
tasks = list(ret.get("task_list", []))
if tasks:
ret["task_list"] = tasks
else:
tasks = list(ret)

if not tasks:
return
return ret

if len(tasks) > 1000000:
slack_message(":exclamation:*Error* too many ({}) tasks, bail".format(len(tasks)))
Expand Down
15 changes: 11 additions & 4 deletions dags/igneous_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from airflow.utils.weight_rule import WeightRule
from datetime import datetime
from slack_message import task_failure_alert
from helper_ops import scale_down_cluster_op, collect_metrics_op, toggle_nfs_server_op
from helper_ops import scale_down_cluster_op, collect_metrics_op, toggle_nfs_server_op, placeholder_op
from dag_utils import get_connection

igneous_default_args = {
'owner': 'seuronbot',
Expand All @@ -17,16 +18,22 @@
dag_igneous = DAG("igneous", default_args=igneous_default_args, schedule_interval=None, tags=['igneous tasks'])
scaling_igneous_finish = scale_down_cluster_op(dag_igneous, "igneous_finish", "igneous", 0, "cluster")

start_nfs_server = toggle_nfs_server_op(dag_igneous, on=True)
stop_nfs_server = toggle_nfs_server_op(dag_igneous, on=False)
if get_connection("NFSServer"):
start_nfs_server = toggle_nfs_server_op(dag_igneous, on=True)
stop_nfs_server = toggle_nfs_server_op(dag_igneous, on=False)
queue = "nfs"
else:
start_nfs_server = placeholder_op(dag_igneous, on=True)
stop_nfs_server = placeholder_op(dag_igneous, on=False)
queue = "manager"

submit_igneous_tasks = PythonOperator(
task_id="submit_igneous_tasks",
python_callable=submit_igneous_tasks,
priority_weight=100000,
on_failure_callback=task_failure_alert,
weight_rule=WeightRule.ABSOLUTE,
queue="manager",
queue=queue,
dag=dag_igneous
)

Expand Down
16 changes: 9 additions & 7 deletions dags/igneous_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ def create_igneous_ops(param, dag):

if get_connection("NFSServer"):
nfs_kwargs = {"frag_path": f"file:///share/{run_name}"}
queue = "nfs"
else:
nfs_kwargs = {"frag_path": None}
queue = "manager"

if not param.get("SKIP_DOWNSAMPLE", False):
if not param.get("SKIP_MESHING", False):
Expand All @@ -25,7 +27,7 @@ def create_igneous_ops(param, dag):
op_args=[run_name, seg_cloudpath, param.get("SIZE_THRESHOLDED_MESH", False), ],
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue="manager",
queue=queue,
dag=dag
)
ops[-1] >> current_op
Expand All @@ -39,7 +41,7 @@ def create_igneous_ops(param, dag):
op_kwargs=nfs_kwargs,
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue="manager",
queue=queue,
dag=dag
)

Expand All @@ -54,7 +56,7 @@ def create_igneous_ops(param, dag):
op_kwargs=nfs_kwargs,
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue="manager",
queue=queue,
dag=dag
)
else:
Expand All @@ -65,7 +67,7 @@ def create_igneous_ops(param, dag):
op_kwargs=nfs_kwargs,
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue="manager",
queue=queue,
dag=dag
)

Expand All @@ -83,7 +85,7 @@ def create_igneous_ops(param, dag):
op_args=[run_name, downsample_target],
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue="manager",
queue=queue,
dag=dag
)

Expand All @@ -98,7 +100,7 @@ def create_igneous_ops(param, dag):
op_kwargs=nfs_kwargs,
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue="manager",
queue=queue,
dag=dag
)

Expand All @@ -112,7 +114,7 @@ def create_igneous_ops(param, dag):
op_kwargs=nfs_kwargs,
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue="manager",
queue=queue,
dag=dag
)

Expand Down

0 comments on commit 4b92ad4

Please sign in to comment.