Skip to content

Commit

Permalink
Fix duplicated system stats if you run multiple ETLs in parallel (#38)
Browse files Browse the repository at this point in the history
If one is running two or more ETLs at the same time and these runs produce a system statistics at the exact same millisecond, we would fail one of these runs with a failure to add the statistic to the DB.

Now:

A new run_id column which contains the run_id of the run which produces the system statistic. Old rows are migrated to -1 automatically. The default retention period is 30 days, so after 30 days these should be gone.
We have a compound PK (run_id, timestamp), but unfortunately upgraded DB do not pick this up automatically.
For that reason we now simply ignore bad writes for system statistics events -> if the mara DB is upgraded without the compound PK, this will catch these cases.
The UI now only shows the statistics for the specific run (with a fallback for any -1 run_ids in the stats table.
Replaces #29
Closes: #22 #29
  • Loading branch information
martin-loetzsch authored Jun 9, 2020
2 parents b72e6cb + fb50438 commit 82ee8bf
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 22 deletions.
10 changes: 5 additions & 5 deletions data_integration/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ def run_pipeline(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None,
# The function that is run in a sub process
def run():

# collect system stats in a separate Process
statistics_process = multiprocessing_context.Process(
target=lambda: system_statistics.generate_system_statistics(event_queue), name='system_statistics')
statistics_process.start()

try:
# capture output of print statements and other unplanned output
logger.redirect_output(event_queue, pipeline.path())
Expand Down Expand Up @@ -183,6 +178,11 @@ def track_finished_pipelines():
is_root_pipeline=(pipeline.parent is None))
)

# collect system stats in a separate Process
statistics_process = multiprocessing.Process(
target=lambda: system_statistics.generate_system_statistics(event_queue), name='system_statistics')
statistics_process.start()

# run as long
# - as task processes are still running
# - as there is still stuff in the node queue
Expand Down
27 changes: 19 additions & 8 deletions data_integration/logging/run_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class SystemStatistics(Base):
__tablename__ = 'data_integration_system_statistics'

timestamp = sqlalchemy.Column(sqlalchemy.TIMESTAMP(timezone=True), primary_key=True, index=True)
# server_default needs to be here to support the migration to -1 for old runs
run_id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, nullable=False,
server_default=sqlalchemy.text('-1'))
disc_read = sqlalchemy.Column(sqlalchemy.FLOAT)
disc_write = sqlalchemy.Column(sqlalchemy.FLOAT)
net_recv = sqlalchemy.Column(sqlalchemy.FLOAT)
Expand Down Expand Up @@ -128,14 +131,22 @@ def handle_event(self, event: events.Event):
RETURNING node_run_id''', (self.run_id, event.node_path, event.start_time, event.is_pipeline))

elif isinstance(event, system_statistics.SystemStatistics):
with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor
cursor.execute(f'''
INSERT INTO data_integration_system_statistics (timestamp, disc_read, disc_write, net_recv, net_sent,
cpu_usage, mem_usage, swap_usage, iowait)
VALUES ({"%s, %s, %s, %s, %s, %s, %s, %s, %s"})''',
(event.timestamp, event.disc_read, event.disc_write, event.net_recv,
event.net_sent, event.cpu_usage, event.mem_usage, event.swap_usage, event.iowait))

try:
with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor
cursor.execute(f'''
INSERT INTO data_integration_system_statistics (timestamp, run_id, disc_read, disc_write, net_recv, net_sent,
cpu_usage, mem_usage, swap_usage, iowait)
VALUES ({"%s, %s, %s, %s, %s, %s, %s, %s, %s, %s"})''',
(event.timestamp, self.run_id, event.disc_read, event.disc_write, event.net_recv,
event.net_sent, event.cpu_usage, event.mem_usage, event.swap_usage, event.iowait))
except Exception as e:
# The old version of the database table had only a PK on timestamp. If one is running multiple
# ETLs at the same time it could happened that two of them get inserted with same TS and it fails.
# Nowadays we have a compound PK but the migration scripts didn't pick this up so we could still
# have a single PK on upgraded tables.
# As we do not really care about every single stat we simply throw away that single stat, the next
# will come in 1 sec (default, if not changed...)
print(f'Ignored problem on inserting system statistic events into the table: {e!r}', flush=True)
elif isinstance(event, pipeline_events.NodeFinished):
with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor
cursor.execute(f'''
Expand Down
31 changes: 22 additions & 9 deletions data_integration/ui/last_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ def run_output(path: str, run_id: int, limit: bool):
SELECT node_path, message, format, is_error
FROM data_integration_node_run
JOIN data_integration_node_output USING (node_run_id)
WHERE node_path [1:{"%s"}] = %s
WHERE node_path [1:{"%s"}] = %s
AND run_id = %s
ORDER BY timestamp
ORDER BY timestamp
''' + ('LIMIT ' + str(line_limit + 1) if limit else ''), (len(node.path()), node.path(), run_id))

rows = cursor.fetchall()
return str(_.script[f"""
nodePage.showOutput({json.dumps(rows[:line_limit] if limit else rows)},
"{path}",
"{path}",
{'true' if len(rows) == line_limit + 1 else 'false'});
"""])

Expand All @@ -132,10 +132,23 @@ def system_stats(path: str, run_id: int):

with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor
cursor.execute(f'''
SELECT data_integration_system_statistics.*
FROM data_integration_system_statistics
JOIN data_integration_node_run ON timestamp BETWEEN start_time AND end_time
WHERE run_id = {"%s"} AND node_path = {"%s"};''', (run_id, node.path()))
SELECT
-- needs to be spelled out to be able to rely on the order in the postprocessing of the row
-- run_id is not needed in the frontend...
stats.timestamp,
stats.disc_read,
stats.disc_write,
stats.net_recv,
stats.net_sent,
stats.cpu_usage,
stats.mem_usage,
stats.swap_usage,
stats.iowait
FROM data_integration_node_run nr
JOIN data_integration_system_statistics stats ON stats.timestamp BETWEEN nr.start_time AND nr.end_time
-- -1 is fallback for old cases where we didn't have a node ID -> can be removed after 2021-01-01 or so
AND (stats.run_id = nr.run_id OR stats.run_id = -1)
WHERE nr.run_id = {"%s"} AND nr.node_path = {"%s"};''', (run_id, node.path()))

data = [[row[0].isoformat()] + list(row[1:]) for row in cursor.fetchall()]
if len(data) >= 15:
Expand All @@ -160,10 +173,10 @@ def timeline_chart(path: str, run_id: int):

with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor
cursor.execute(f'''
SELECT node_path, start_time, end_time, max(end_time) over () AS max_end_time, succeeded, is_pipeline
SELECT node_path, start_time, end_time, max(end_time) over () AS max_end_time, succeeded, is_pipeline
FROM data_integration_node_run
WHERE node_path [1 :{'%(level)s'}] = {'%(node_path)s'}
AND array_length(node_path, 1) > {'%(level)s'}
AND array_length(node_path, 1) > {'%(level)s'}
AND run_id = {'%(run_id)s'};''', {'level': len(node.path()), 'node_path': node.path(), 'run_id': run_id})

nodes = [{'label': ' / '.join(node_path[len(node.path()):]),
Expand Down

0 comments on commit 82ee8bf

Please sign in to comment.