Skip to content

Commit

Permalink
add option to disable the collection of system statistics #72
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-schick committed Jul 18, 2022
1 parent d93b078 commit 1a2f830
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 10 deletions.
8 changes: 6 additions & 2 deletions mara_pipelines/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ def bash_command_string() -> str:
return '/usr/bin/env bash -o pipefail'


def system_statistics_collection_period() -> int:
"""How often should system statistics be collected in seconds"""
def system_statistics_collection_period() -> typing.Union[float, None]:
"""
How often should system statistics be collected in seconds.
When zero or None the collecting of system statistics is disabled.
"""
return 1


Expand Down
19 changes: 12 additions & 7 deletions mara_pipelines/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def run_pipeline(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None,
# The function that is run in a sub process
def run():

statistics_process: multiprocessing.Process = None

try:
# capture output of print statements and other unplanned output
logger.redirect_output(event_queue, pipeline.path())
Expand Down Expand Up @@ -125,7 +127,8 @@ def ensure_task_processes_killed():
if tp.is_alive():
# give it a chance to gracefully shutdown
tp.terminate()
statistics_process.kill()
if statistics_process:
statistics_process.kill()
except BaseException as e:
print(f"Exception during TaskProcess cleanup: {repr(e)}", file=sys.stderr, flush=True)
return
Expand Down Expand Up @@ -188,9 +191,10 @@ def track_finished_pipelines():
)

# collect system stats in a separate Process
statistics_process = multiprocessing.Process(
target=lambda: system_statistics.generate_system_statistics(event_queue), name='system_statistics')
statistics_process.start()
if config.system_statistics_collection_period():
statistics_process = multiprocessing.Process(
target=lambda: system_statistics.generate_system_statistics(event_queue), name='system_statistics')
statistics_process.start()

# run as long
# - as task processes are still running
Expand Down Expand Up @@ -309,9 +313,10 @@ def track_finished_pipelines():
# run again because `dequeue` might have moved more nodes to `finished_nodes`
track_finished_pipelines()

# kill the stats process (joining or terminating does not work in gunicorn)
os.kill(statistics_process.pid, signal.SIGKILL)
statistics_process.join()
if statistics_process:
# kill the stats process (joining or terminating does not work in gunicorn)
os.kill(statistics_process.pid, signal.SIGKILL)
statistics_process.join()

# run finished
event_queue.put(pipeline_events.RunFinished(node_path=pipeline.path(), end_time=datetime.datetime.now(tz.utc),
Expand Down
6 changes: 5 additions & 1 deletion mara_pipelines/logging/system_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ def generate_system_statistics(event_queue: multiprocessing.Queue) -> None:
:param event_queue: The queue to write the events to
"""
period = config.system_statistics_collection_period()
if not period:
# the collecting of system statistics is disabled.
return

import psutil

def cpu_usage():
Expand All @@ -68,7 +73,6 @@ def swap_usage():
# immediately send event for current cpu, mem and swap usage
event_queue.put(SystemStatistics(
datetime.datetime.now(), cpu_usage=cpu_usage(), mem_usage=mem_usage(), swap_usage=swap_usage()))
period = config.system_statistics_collection_period()

n = 0

Expand Down

0 comments on commit 1a2f830

Please sign in to comment.