Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful termination of shell scripts #1644

Merged
merged 7 commits into from
Oct 24, 2023
13 changes: 13 additions & 0 deletions buildtest/builders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,19 @@ def _write_build_script(self, modules=None, modulepurge=None, unload_modules=Non

lines = ["#!/bin/bash"]

trap_msg = """
# Function to handle all signals and perform cleanup
function cleanup() {
echo "Signal trapped. Performing cleanup before exiting."
exitcode=$?
echo "buildtest: command \`$BASH_COMMAND' failed (exit code: $exitcode)"
exit $exitcode
}

# Trap all signals and call the cleanup function
trap cleanup SIGINT SIGTERM SIGHUP SIGQUIT SIGABRT SIGKILL SIGALRM SIGPIPE SIGTERM SIGTSTP SIGTTIN SIGTTOU
"""
lines.append(trap_msg)
lines += self._default_test_variables()
lines.append("# source executor startup script")

Expand Down
8 changes: 5 additions & 3 deletions buildtest/cli/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,7 @@ def discover_buildspecs(

# if no files discovered let's stop now
if not buildspec_dict["included"]:
msg = "There are no config files to process."
sys.exit(msg)
sys.exit("There are no config files to process.")

logger.debug(
f"buildtest discovered the following Buildspecs: {buildspec_dict['included']}"
Expand Down Expand Up @@ -1223,7 +1222,10 @@ def run_phase(self):
"""

console.rule("[bold red]Running Tests")
self.buildexecutor.run(self.builders)
try:
self.buildexecutor.run(self.builders)
except KeyboardInterrupt:
raise KeyboardInterrupt

builders = self.buildexecutor.get_validbuilders()
########## TEST SUMMARY ####################
Expand Down
132 changes: 80 additions & 52 deletions buildtest/executors/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import multiprocessing as mp
import os
import shutil
import time

from rich.table import Column, Table
Expand Down Expand Up @@ -306,69 +307,94 @@ def run(self, builders):
pool = mp.Pool(num_workers)
console.print(f"Spawning {num_workers} processes for processing builders")
count = 0
while True:
active_builders = []
count += 1
console.rule(f"Iteration {count}")

for builder in self.builders:
if builder.is_pending():
active_builders.append(builder)

run_builders = self.select_builders_to_run(active_builders)

# if max_jobs property is set then reduce the number of jobs to run to max_jobs
if self.max_jobs:
run_builders = run_builders[: self.max_jobs]
try:
while True:
count += 1
console.rule(f"Iteration {count}")
active_builders = [
builder for builder in self.builders if builder.is_pending()
]

run_builders = self.select_builders_to_run(active_builders)

# if max_jobs property is set then reduce the number of jobs to run to max_jobs
if self.max_jobs:
run_builders = run_builders[: self.max_jobs]

if not run_builders:
raise BuildTestError("Unable to run tests")

run_table = Table(
Column("Builder", overflow="fold", style="red"),
title="Builders Eligible to Run",
header_style="blue",
)
for builder in run_builders:
run_table.add_row(f"{str(builder)}")
console.print(run_table)

if not run_builders:
raise BuildTestError("Unable to run tests ")
results = []

run_table = Table(
Column("Builder", overflow="fold", style="red"),
title="Builders Eligible to Run",
header_style="blue",
)
for builder in run_builders:
run_table.add_row(f"{str(builder)}")
console.print(run_table)
for builder in run_builders:
executor = self._choose_executor(builder)
results.append(pool.apply_async(executor.run, args=(builder,)))
self.builders.remove(builder)

results = []
for result in results:
task = result.get()
if isinstance(task, BuilderBase):
self.builders.add(task)

for builder in run_builders:
executor = self._choose_executor(builder)
results.append(pool.apply_async(executor.run, args=(builder,)))
self.builders.remove(builder)
pending_jobs = {
builder
for builder in self.builders
if builder.is_batch_job() and builder.is_running()
}

for result in results:
task = result.get()
if isinstance(task, BuilderBase):
self.builders.add(task)
self.poll(pending_jobs)

pending_jobs = set()
for builder in self.builders:
# returns True if attribute builder.job is an instance of class Job. Only add jobs that are active running for pending
if builder.is_batch_job() and builder.is_running():
pending_jobs.add(builder)
# remove any failed jobs from list
# for builder in self.builders:
# if builder.is_failed():
# self.builders.remove(builder)

self.poll(pending_jobs)
# set Terminate to True if no builders are pending or running

# remove any failed jobs from list
# for builder in self.builders:
# if builder.is_failed():
# self.builders.remove(builder)
terminate = not any(
builder.is_pending() or builder.is_running()
for builder in self.builders
)

terminate = True
if terminate:
break
except KeyboardInterrupt:
console.print("[red]Caught KeyboardInterrupt, terminating workers")

# condition below checks if all tests are complete, if any are pending or running we need to stay in loop until jobs are finished
# until finished
for builder in self.builders:
if builder.is_pending() or builder.is_running():
terminate = False

if terminate:
break

console.print(
f"[blue]{builder}[/blue]: [red]Removing test directory: {builder.test_root}"
)
try:
shutil.rmtree(builder.test_root)
except OSError as err:
console.print(
f"[blue]{builder}[/blue]: [red]Unable to delete test directory {builder.test_root} with error: {err.strerror}"
)
continue

if builder.is_batch_job():
console.print(
f"[blue]{builder}[/blue]: [red]Cancelling Job {builder.job.get()}"
)
builder.job.cancel()

# close the worker pool by preventing any more tasks from being submitted
pool.close()

# terminate all worker processes
pool.join()

raise KeyboardInterrupt
# close the worker pool by preventing any more tasks from being submitted
pool.close()

Expand All @@ -385,6 +411,8 @@ def poll(self, pending_jobs):
while pending_jobs:
print(f"Polling Jobs in {self.pollinterval} seconds")
time.sleep(self.pollinterval)

# time.sleep(self.pollinterval)
jobs = pending_jobs.copy()

# for every pending job poll job and mark if job is finished or cancelled
Expand Down
80 changes: 46 additions & 34 deletions buildtest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
import shutil
import sys
import tempfile
import webbrowser

Expand Down Expand Up @@ -145,40 +146,51 @@ def main():
if args.subcommands in ["build", "bd"]:
stdout_file = tempfile.NamedTemporaryFile(delete=True, suffix=".txt")
with Tee(stdout_file.name):
cmd = BuildTest(
configuration=configuration,
buildspecs=args.buildspec,
exclude_buildspecs=args.exclude,
executors=args.executor,
tags=args.tags,
name=args.name,
exclude_tags=args.exclude_tags,
filter_buildspecs=args.filter,
rebuild=args.rebuild,
stage=args.stage,
testdir=args.testdir,
buildtest_system=system,
report_file=report_file,
maxpendtime=args.maxpendtime,
poll_interval=args.pollinterval,
remove_stagedir=args.remove_stagedir,
retry=args.retry,
account=args.account,
helpfilter=args.helpfilter,
numprocs=args.procs,
numnodes=args.nodes,
modules=args.modules,
modulepurge=args.module_purge,
unload_modules=args.unload_modules,
rerun=args.rerun,
executor_type=args.executor_type,
timeout=args.timeout,
limit=args.limit,
save_profile=args.save_profile,
profile=args.profile,
max_jobs=args.max_jobs,
)
cmd.build()
try:
cmd = BuildTest(
configuration=configuration,
buildspecs=args.buildspec,
exclude_buildspecs=args.exclude,
executors=args.executor,
tags=args.tags,
name=args.name,
exclude_tags=args.exclude_tags,
filter_buildspecs=args.filter,
rebuild=args.rebuild,
stage=args.stage,
testdir=args.testdir,
buildtest_system=system,
report_file=report_file,
maxpendtime=args.maxpendtime,
poll_interval=args.pollinterval,
remove_stagedir=args.remove_stagedir,
retry=args.retry,
account=args.account,
helpfilter=args.helpfilter,
numprocs=args.procs,
numnodes=args.nodes,
modules=args.modules,
modulepurge=args.module_purge,
unload_modules=args.unload_modules,
rerun=args.rerun,
executor_type=args.executor_type,
timeout=args.timeout,
limit=args.limit,
save_profile=args.save_profile,
profile=args.profile,
max_jobs=args.max_jobs,
)
cmd.build()
except KeyboardInterrupt as err:
console.print(
"[red]Unable to complete buildtest build command, signal: KeyboardInterrupt detected"
)
console.print(err)
sys.exit(1)
except SystemExit as err:
console.print("[red]buildtest build command failed")
console.print(err)
sys.exit(1)

if cmd.build_success():
build_history_dir = cmd.get_build_history_dir()
Expand Down
Loading