Skip to content

Commit

Permalink
Fix metrics double counting (#3416)
Browse files Browse the repository at this point in the history
* Update post-scheduling metrics collection

Signed-off-by: Mohamed Abdelfatah <[email protected]>

* lint fix

Signed-off-by: Mohamed Abdelfatah <[email protected]>

* move preempted metrics updates to UpdateFailed

Signed-off-by: Mohamed Abdelfatah <[email protected]>

* wip

Signed-off-by: Mohamed Abdelfatah <[email protected]>

* wip

Signed-off-by: Mohamed Abdelfatah <[email protected]>

* wip

Signed-off-by: Mohamed Abdelfatah <[email protected]>

* Add comments for some metrics updated approaches

Signed-off-by: Mohamed Abdelfatah <[email protected]>

* some clean up

Signed-off-by: Mohamed Abdelfatah <[email protected]>

---------

Signed-off-by: Mohamed Abdelfatah <[email protected]>
  • Loading branch information
Mo-Fatah authored Feb 27, 2024
1 parent b1bb423 commit 9d554ea
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 32 deletions.
42 changes: 16 additions & 26 deletions internal/scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,6 @@ func (m *Metrics) Update(
return err
}
}
if jst.Leased {
if err := m.UpdateLeased(jst.Job, nil); err != nil {
return err
}
}
if jst.Pending {
if err := m.UpdatePending(jst.Job); err != nil {
return err
Expand All @@ -217,10 +212,12 @@ func (m *Metrics) Update(
}
}
if jst.Preempted {
if err := m.UpdatePreempted(jst.Job, nil); err != nil {
if err := m.UpdatePreempted(jst.Job); err != nil {
return err
}
}
// UpdateLeased is called by the scheduler directly once a job is leased.
// It is not called here to avoid double counting.
return nil
}

Expand Down Expand Up @@ -275,6 +272,12 @@ func (m *Metrics) UpdateCancelled(job *jobdb.Job) error {

func (m *Metrics) UpdateFailed(ctx *armadacontext.Context, job *jobdb.Job, jobRunErrorsByRunId map[uuid.UUID]*armadaevents.Error) error {
category, subCategory := m.failedCategoryAndSubCategoryFromJob(ctx, job, jobRunErrorsByRunId)
if category == jobRunPreempted {
// It is safer to UpdatePreempted from preemption errors and not from the scheduler cycle result.
// e.g. The scheduler might decide to preempt a job, but before the job is preempted, it happens to succeed,
// in which case it should be reported as a success, not a preemption.
return m.UpdatePreempted(job)
}
latestRun := job.LatestRun()
priorState, priorStateTime := getPriorState(job, latestRun, latestRun.TerminatedTime())
labels := m.buffer[0:0]
Expand Down Expand Up @@ -310,23 +313,17 @@ func (m *Metrics) UpdateSucceeded(job *jobdb.Job) error {
return nil
}

func (m *Metrics) UpdateLeased(job *jobdb.Job, jctx *schedulercontext.JobSchedulingContext) error {
if job == nil {
job = jctx.Job.(*jobdb.Job)
}
func (m *Metrics) UpdateLeased(jctx *schedulercontext.JobSchedulingContext) error {
job := jctx.Job.(*jobdb.Job)
latestRun := job.LatestRun()
priorState, priorStateTime := getPriorState(job, latestRun, latestRun.LeaseTime())
priorState, priorStateTime := getPriorState(job, latestRun, &jctx.Created)
labels := m.buffer[0:0]
labels = append(labels, priorState)
labels = append(labels, leased)
labels = append(labels, "") // No category for leased.
labels = append(labels, "") // No subCategory for leased.
if jctx != nil {
labels = appendLabelsFromJobSchedulingContext(labels, jctx)
} else {
labels = appendLabelsFromJob(labels, job)
}
if err := m.updateResourceSecondsCounterVec(m.resourceSeconds, labels, job, latestRun.LeaseTime(), priorStateTime); err != nil {
labels = appendLabelsFromJobSchedulingContext(labels, jctx)
if err := m.updateResourceSecondsCounterVec(m.resourceSeconds, labels, job, &jctx.Created, priorStateTime); err != nil {
return err
}
if err := m.updateCounterVecFromJob(m.transitions, labels[1:], job); err != nil {
Expand All @@ -335,22 +332,15 @@ func (m *Metrics) UpdateLeased(job *jobdb.Job, jctx *schedulercontext.JobSchedul
return nil
}

func (m *Metrics) UpdatePreempted(job *jobdb.Job, jctx *schedulercontext.JobSchedulingContext) error {
if job == nil {
job = jctx.Job.(*jobdb.Job)
}
func (m *Metrics) UpdatePreempted(job *jobdb.Job) error {
latestRun := job.LatestRun()
priorState, priorStateTime := getPriorState(job, latestRun, latestRun.PreemptedTime())
labels := m.buffer[0:0]
labels = append(labels, priorState)
labels = append(labels, preempted)
labels = append(labels, "") // No category for preempted.
labels = append(labels, "") // No subCategory for preempted.
if jctx != nil {
labels = appendLabelsFromJobSchedulingContext(labels, jctx)
} else {
labels = appendLabelsFromJob(labels, job)
}
labels = appendLabelsFromJob(labels, job)
if err := m.updateResourceSecondsCounterVec(m.resourceSeconds, labels, job, latestRun.PreemptedTime(), priorStateTime); err != nil {
return err
}
Expand Down
9 changes: 3 additions & 6 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,12 +368,7 @@ func (s *Scheduler) updateMetricsFromSchedulerResult(ctx *armadacontext.Context,
return nil
}
for _, jctx := range overallSchedulerResult.ScheduledJobs {
if err := s.schedulerMetrics.UpdateLeased(nil, jctx); err != nil {
return err
}
}
for _, jctx := range overallSchedulerResult.PreemptedJobs {
if err := s.schedulerMetrics.UpdatePreempted(nil, jctx); err != nil {
if err := s.schedulerMetrics.UpdateLeased(jctx); err != nil {
return err
}
}
Expand All @@ -382,6 +377,8 @@ func (s *Scheduler) updateMetricsFromSchedulerResult(ctx *armadacontext.Context,
return err
}
}
// UpdatePreempted is called from within UpdateFailed if the job has a JobRunPreemptedError.
// This is to make sure that preempttion is counted only when the job is actually preempted, not when the scheduler decides to preempt it.
return nil
}

Expand Down

0 comments on commit 9d554ea

Please sign in to comment.