From 9d554eaf3a5d26f8edd87d1805750435cdc56b36 Mon Sep 17 00:00:00 2001 From: Mohamed Abdelfatah <39927413+Mo-Fatah@users.noreply.github.com> Date: Tue, 27 Feb 2024 18:23:54 +0200 Subject: [PATCH] Fix metrics double counting (#3416) * Update post-scheduling metrics collection Signed-off-by: Mohamed Abdelfatah * lint fix Signed-off-by: Mohamed Abdelfatah * move preempted metrics updates to UpdateFailed Signed-off-by: Mohamed Abdelfatah * wip Signed-off-by: Mohamed Abdelfatah * wip Signed-off-by: Mohamed Abdelfatah * wip Signed-off-by: Mohamed Abdelfatah * Add comments for some metrics updated approaches Signed-off-by: Mohamed Abdelfatah * some clean up Signed-off-by: Mohamed Abdelfatah --------- Signed-off-by: Mohamed Abdelfatah --- internal/scheduler/metrics/metrics.go | 42 ++++++++++----------------- internal/scheduler/scheduler.go | 9 ++---- 2 files changed, 19 insertions(+), 32 deletions(-) diff --git a/internal/scheduler/metrics/metrics.go b/internal/scheduler/metrics/metrics.go index 3da8956b4c4..c6875fa9659 100644 --- a/internal/scheduler/metrics/metrics.go +++ b/internal/scheduler/metrics/metrics.go @@ -186,11 +186,6 @@ func (m *Metrics) Update( return err } } - if jst.Leased { - if err := m.UpdateLeased(jst.Job, nil); err != nil { - return err - } - } if jst.Pending { if err := m.UpdatePending(jst.Job); err != nil { return err @@ -217,10 +212,12 @@ func (m *Metrics) Update( } } if jst.Preempted { - if err := m.UpdatePreempted(jst.Job, nil); err != nil { + if err := m.UpdatePreempted(jst.Job); err != nil { return err } } + // UpdateLeased is called by the scheduler directly once a job is leased. + // It is not called here to avoid double counting. return nil } @@ -275,6 +272,12 @@ func (m *Metrics) UpdateCancelled(job *jobdb.Job) error { func (m *Metrics) UpdateFailed(ctx *armadacontext.Context, job *jobdb.Job, jobRunErrorsByRunId map[uuid.UUID]*armadaevents.Error) error { category, subCategory := m.failedCategoryAndSubCategoryFromJob(ctx, job, jobRunErrorsByRunId) + if category == jobRunPreempted { + // It is safer to UpdatePreempted from preemption errors and not from the scheduler cycle result. + // e.g. The scheduler might decide to preempt a job, but before the job is preempted, it happens to succeed, + // in which case it should be reported as a success, not a preemption. + return m.UpdatePreempted(job) + } latestRun := job.LatestRun() priorState, priorStateTime := getPriorState(job, latestRun, latestRun.TerminatedTime()) labels := m.buffer[0:0] @@ -310,23 +313,17 @@ func (m *Metrics) UpdateSucceeded(job *jobdb.Job) error { return nil } -func (m *Metrics) UpdateLeased(job *jobdb.Job, jctx *schedulercontext.JobSchedulingContext) error { - if job == nil { - job = jctx.Job.(*jobdb.Job) - } +func (m *Metrics) UpdateLeased(jctx *schedulercontext.JobSchedulingContext) error { + job := jctx.Job.(*jobdb.Job) latestRun := job.LatestRun() - priorState, priorStateTime := getPriorState(job, latestRun, latestRun.LeaseTime()) + priorState, priorStateTime := getPriorState(job, latestRun, &jctx.Created) labels := m.buffer[0:0] labels = append(labels, priorState) labels = append(labels, leased) labels = append(labels, "") // No category for leased. labels = append(labels, "") // No subCategory for leased. - if jctx != nil { - labels = appendLabelsFromJobSchedulingContext(labels, jctx) - } else { - labels = appendLabelsFromJob(labels, job) - } - if err := m.updateResourceSecondsCounterVec(m.resourceSeconds, labels, job, latestRun.LeaseTime(), priorStateTime); err != nil { + labels = appendLabelsFromJobSchedulingContext(labels, jctx) + if err := m.updateResourceSecondsCounterVec(m.resourceSeconds, labels, job, &jctx.Created, priorStateTime); err != nil { return err } if err := m.updateCounterVecFromJob(m.transitions, labels[1:], job); err != nil { @@ -335,10 +332,7 @@ func (m *Metrics) UpdateLeased(job *jobdb.Job, jctx *schedulercontext.JobSchedul return nil } -func (m *Metrics) UpdatePreempted(job *jobdb.Job, jctx *schedulercontext.JobSchedulingContext) error { - if job == nil { - job = jctx.Job.(*jobdb.Job) - } +func (m *Metrics) UpdatePreempted(job *jobdb.Job) error { latestRun := job.LatestRun() priorState, priorStateTime := getPriorState(job, latestRun, latestRun.PreemptedTime()) labels := m.buffer[0:0] @@ -346,11 +340,7 @@ func (m *Metrics) UpdatePreempted(job *jobdb.Job, jctx *schedulercontext.JobSche labels = append(labels, preempted) labels = append(labels, "") // No category for preempted. labels = append(labels, "") // No subCategory for preempted. - if jctx != nil { - labels = appendLabelsFromJobSchedulingContext(labels, jctx) - } else { - labels = appendLabelsFromJob(labels, job) - } + labels = appendLabelsFromJob(labels, job) if err := m.updateResourceSecondsCounterVec(m.resourceSeconds, labels, job, latestRun.PreemptedTime(), priorStateTime); err != nil { return err } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index cacc6e263a4..556ce651b61 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -368,12 +368,7 @@ func (s *Scheduler) updateMetricsFromSchedulerResult(ctx *armadacontext.Context, return nil } for _, jctx := range overallSchedulerResult.ScheduledJobs { - if err := s.schedulerMetrics.UpdateLeased(nil, jctx); err != nil { - return err - } - } - for _, jctx := range overallSchedulerResult.PreemptedJobs { - if err := s.schedulerMetrics.UpdatePreempted(nil, jctx); err != nil { + if err := s.schedulerMetrics.UpdateLeased(jctx); err != nil { return err } } @@ -382,6 +377,8 @@ func (s *Scheduler) updateMetricsFromSchedulerResult(ctx *armadacontext.Context, return err } } + // UpdatePreempted is called from within UpdateFailed if the job has a JobRunPreemptedError. + // This is to make sure that preempttion is counted only when the job is actually preempted, not when the scheduler decides to preempt it. return nil }