From cec8e18a3578f2b17de9d83bf7fc2d6411d3df03 Mon Sep 17 00:00:00 2001 From: Noah Held <41909795+zuqq@users.noreply.github.com> Date: Mon, 26 Feb 2024 16:19:10 +0000 Subject: [PATCH] Add `jsonb` backend to Lookout (#3408) --- internal/common/database/lookout/jobstates.go | 4 +- internal/lookout/ui/src/index.tsx | 6 +- .../src/services/lookoutV2/GetJobsService.ts | 13 +- .../services/lookoutV2/GroupJobsService.ts | 12 +- internal/lookout/ui/src/utils.tsx | 6 + internal/lookoutv2/application.go | 18 +- internal/lookoutv2/conversions/convert.go | 29 +- .../lookoutv2/conversions/convert_test.go | 10 +- .../lookoutv2/gen/restapi/embedded_spec.go | 46 +++ .../restapi/operations/get_jobs_parameters.go | 44 +++ .../restapi/operations/get_jobs_urlbuilder.go | 16 + .../operations/group_jobs_parameters.go | 44 +++ .../operations/group_jobs_urlbuilder.go | 16 + internal/lookoutv2/model/model.go | 46 ++- internal/lookoutv2/repository/getjobs.go | 148 +++++++-- internal/lookoutv2/repository/getjobs_test.go | 17 +- internal/lookoutv2/repository/groupjobs.go | 35 +- .../lookoutv2/repository/groupjobs_test.go | 17 +- internal/lookoutv2/repository/querybuilder.go | 306 +++++++++++++++++- internal/lookoutv2/repository/util.go | 46 ++- internal/lookoutv2/swagger.yaml | 14 +- 21 files changed, 758 insertions(+), 135 deletions(-) diff --git a/internal/common/database/lookout/jobstates.go b/internal/common/database/lookout/jobstates.go index 20ea463dde5..b3df64386c1 100644 --- a/internal/common/database/lookout/jobstates.go +++ b/internal/common/database/lookout/jobstates.go @@ -25,7 +25,6 @@ const ( JobPreemptedOrdinal = 7 JobLeasedOrdinal = 8 - JobRunLeased JobRunState = "RUN_LEASED" JobRunPending JobRunState = "RUN_PENDING" JobRunRunning JobRunState = "RUN_RUNNING" JobRunSucceeded JobRunState = "RUN_SUCCEEDED" @@ -36,6 +35,7 @@ const ( JobRunLeaseReturned JobRunState = "RUN_LEASE_RETURNED" JobRunLeaseExpired JobRunState = "RUN_LEASE_EXPIRED" JobRunMaxRunsExceeded JobRunState = "RUN_MAX_RUNS_EXCEEDED" + JobRunLeased JobRunState = "RUN_LEASED" JobRunPendingOrdinal = 1 JobRunRunningOrdinal = 2 @@ -89,4 +89,6 @@ var ( JobRunLeaseExpiredOrdinal: JobRunLeaseExpired, JobRunMaxRunsExceededOrdinal: JobRunMaxRunsExceeded, } + + JobRunStateOrdinalMap = util.InverseMap(JobRunStateMap) ) diff --git a/internal/lookout/ui/src/index.tsx b/internal/lookout/ui/src/index.tsx index 564b9f74eb5..d2fb28b09c4 100644 --- a/internal/lookout/ui/src/index.tsx +++ b/internal/lookout/ui/src/index.tsx @@ -35,8 +35,10 @@ import "./index.css" const fakeDataEnabled = uiConfig.fakeDataEnabled const v2TestJobs = fakeDataEnabled ? makeRandomJobs(10000, 42) : [] - const v2GetJobsService = fakeDataEnabled ? new FakeGetJobsService(v2TestJobs) : new GetJobsService() - const v2GroupJobsService = fakeDataEnabled ? new FakeGroupJobsService(v2TestJobs) : new GroupJobsService() + const v2GetJobsService = fakeDataEnabled ? new FakeGetJobsService(v2TestJobs) : new GetJobsService(uiConfig.backend) + const v2GroupJobsService = fakeDataEnabled + ? new FakeGroupJobsService(v2TestJobs) + : new GroupJobsService(uiConfig.backend) const v2RunErrorService = fakeDataEnabled ? new FakeGetRunErrorService() : new GetRunErrorService() const v2LogService = fakeDataEnabled ? new FakeLogService() diff --git a/internal/lookout/ui/src/services/lookoutV2/GetJobsService.ts b/internal/lookout/ui/src/services/lookoutV2/GetJobsService.ts index 91108161ce1..f8f973a239a 100644 --- a/internal/lookout/ui/src/services/lookoutV2/GetJobsService.ts +++ b/internal/lookout/ui/src/services/lookoutV2/GetJobsService.ts @@ -16,6 +16,12 @@ export type GetJobsResponse = { } export class GetJobsService implements IGetJobsService { + private backend: string | undefined + + constructor(backend: string | undefined) { + this.backend = backend + } + async getJobs( filters: JobFilter[], activeJobSets: boolean, @@ -24,7 +30,11 @@ export class GetJobsService implements IGetJobsService { take: number, abortSignal?: AbortSignal, ): Promise { - const response = await fetch("/api/v1/jobs", { + let path = "/api/v1/jobs" + if (this.backend) { + path += "?" + new URLSearchParams({ backend: this.backend }) + } + const response = await fetch(path, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ @@ -36,7 +46,6 @@ export class GetJobsService implements IGetJobsService { }), signal: abortSignal, }) - const json = await response.json() return { jobs: json.jobs ?? [], diff --git a/internal/lookout/ui/src/services/lookoutV2/GroupJobsService.ts b/internal/lookout/ui/src/services/lookoutV2/GroupJobsService.ts index 8a65b0fa301..eb0a9d43992 100644 --- a/internal/lookout/ui/src/services/lookoutV2/GroupJobsService.ts +++ b/internal/lookout/ui/src/services/lookoutV2/GroupJobsService.ts @@ -23,6 +23,12 @@ export type GroupJobsResponse = { } export class GroupJobsService implements IGroupJobsService { + private backend: string | undefined + + constructor(backend: string | undefined) { + this.backend = backend + } + async groupJobs( filters: JobFilter[], activeJobSets: boolean, @@ -33,7 +39,11 @@ export class GroupJobsService implements IGroupJobsService { take: number, abortSignal?: AbortSignal, ): Promise { - const response = await fetch("/api/v1/jobGroups", { + let path = "/api/v1/jobGroups" + if (this.backend) { + path += "?" + new URLSearchParams({ backend: this.backend }) + } + const response = await fetch(path, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ diff --git a/internal/lookout/ui/src/utils.tsx b/internal/lookout/ui/src/utils.tsx index c60413d08ff..5a6bc0c0e63 100644 --- a/internal/lookout/ui/src/utils.tsx +++ b/internal/lookout/ui/src/utils.tsx @@ -24,6 +24,7 @@ interface UIConfig { oidcEnabled: boolean oidc?: OidcConfig commandSpecs: CommandSpec[] + backend: string | undefined } export type RequestStatus = "Loading" | "Idle" @@ -52,6 +53,7 @@ export async function getUIConfig(): Promise { oidcEnabled: false, oidc: undefined, commandSpecs: [], + backend: undefined, } try { @@ -76,6 +78,7 @@ export async function getUIConfig(): Promise { }) } } + if (json.Backend) config.backend = json.Backend } catch (e) { console.error(e) } @@ -91,6 +94,9 @@ export async function getUIConfig(): Promise { if (window.location.pathname === "/oidc") config.oidcEnabled = true + const backend = searchParams.get("backend") + if (backend) config.backend = backend + return config } diff --git a/internal/lookoutv2/application.go b/internal/lookoutv2/application.go index bf60d59b836..3b6100292fe 100644 --- a/internal/lookoutv2/application.go +++ b/internal/lookoutv2/application.go @@ -31,8 +31,10 @@ func Serve(configuration configuration.LookoutV2Config) error { return err } - getJobsRepo := repository.NewSqlGetJobsRepository(db) - groupJobsRepo := repository.NewSqlGroupJobsRepository(db) + getJobsRepo := repository.NewSqlGetJobsRepository(db, false) + getJobsJsonbRepo := repository.NewSqlGetJobsRepository(db, true) + groupJobsRepo := repository.NewSqlGroupJobsRepository(db, false) + groupJobsJsonbRepo := repository.NewSqlGroupJobsRepository(db, true) decompressor := compress.NewThreadSafeZlibDecompressor() getJobRunErrorRepo := repository.NewSqlGetJobRunErrorRepository(db, decompressor) getJobSpecRepo := repository.NewSqlGetJobSpecRepository(db, decompressor) @@ -54,7 +56,11 @@ func Serve(configuration configuration.LookoutV2Config) error { func(params operations.GetJobsParams) middleware.Responder { filters := util.Map(params.GetJobsRequest.Filters, conversions.FromSwaggerFilter) order := conversions.FromSwaggerOrder(params.GetJobsRequest.Order) - result, err := getJobsRepo.GetJobs( + repo := getJobsRepo + if backend := params.Backend; backend != nil && *backend == "jsonb" { + repo = getJobsJsonbRepo + } + result, err := repo.GetJobs( armadacontext.New(params.HTTPRequest.Context(), logger), filters, params.GetJobsRequest.ActiveJobSets, @@ -75,7 +81,11 @@ func Serve(configuration configuration.LookoutV2Config) error { func(params operations.GroupJobsParams) middleware.Responder { filters := util.Map(params.GroupJobsRequest.Filters, conversions.FromSwaggerFilter) order := conversions.FromSwaggerOrder(params.GroupJobsRequest.Order) - result, err := groupJobsRepo.GroupBy( + repo := groupJobsRepo + if backend := params.Backend; backend != nil && *backend == "jsonb" { + repo = groupJobsJsonbRepo + } + result, err := repo.GroupBy( armadacontext.New(params.HTTPRequest.Context(), logger), filters, params.GroupJobsRequest.ActiveJobSets, diff --git a/internal/lookoutv2/conversions/convert.go b/internal/lookoutv2/conversions/convert.go index a32ddb332e2..5d53a84897a 100644 --- a/internal/lookoutv2/conversions/convert.go +++ b/internal/lookoutv2/conversions/convert.go @@ -5,6 +5,7 @@ import ( "github.com/go-openapi/strfmt" + "github.com/armadaproject/armada/internal/common/database/lookout" "github.com/armadaproject/armada/internal/lookoutv2/gen/models" "github.com/armadaproject/armada/internal/lookoutv2/gen/restapi/operations" "github.com/armadaproject/armada/internal/lookoutv2/model" @@ -17,7 +18,7 @@ func ToSwaggerJob(job *model.Job) *models.Job { } return &models.Job{ Annotations: job.Annotations, - Cancelled: toSwaggerTimePtr(job.Cancelled), + Cancelled: ToSwaggerTime(job.Cancelled), CPU: job.Cpu, Duplicate: job.Duplicate, EphemeralStorage: job.EphemeralStorage, @@ -43,13 +44,13 @@ func ToSwaggerRun(run *model.Run) *models.Run { return &models.Run{ Cluster: run.Cluster, ExitCode: run.ExitCode, - Finished: toSwaggerTimePtr(run.Finished), - JobRunState: run.JobRunState, + Finished: PostgreSQLTimeToSwaggerTime(run.Finished), + JobRunState: string(lookout.JobRunStateMap[run.JobRunState]), Node: run.Node, - Leased: toSwaggerTimePtr(run.Leased), - Pending: toSwaggerTimePtr(run.Pending), + Leased: PostgreSQLTimeToSwaggerTime(run.Leased), + Pending: PostgreSQLTimeToSwaggerTime(run.Pending), RunID: run.RunId, - Started: toSwaggerTimePtr(run.Started), + Started: PostgreSQLTimeToSwaggerTime(run.Started), } } @@ -90,10 +91,18 @@ func FromSwaggerGroupedField(groupedField *operations.GroupJobsParamsBodyGrouped } } -func toSwaggerTimePtr(ts *time.Time) *strfmt.DateTime { - if ts == nil { +func ToSwaggerTime(t *time.Time) *strfmt.DateTime { + if t == nil { return nil } - swaggerTs := strfmt.DateTime(*ts) - return &swaggerTs + s := strfmt.DateTime(*t) + return &s +} + +func PostgreSQLTimeToSwaggerTime(t *model.PostgreSQLTime) *strfmt.DateTime { + if t == nil { + return nil + } + s := strfmt.DateTime(t.Time) + return &s } diff --git a/internal/lookoutv2/conversions/convert_test.go b/internal/lookoutv2/conversions/convert_test.go index 898034c5113..c14ce732a90 100644 --- a/internal/lookoutv2/conversions/convert_test.go +++ b/internal/lookoutv2/conversions/convert_test.go @@ -77,13 +77,13 @@ var ( { Cluster: "cluster", ExitCode: pointer.Int32(322), - Finished: &baseTime, - JobRunState: string(lookout.JobRunLeaseReturned), + Finished: model.NewPostgreSQLTime(&baseTime), + JobRunState: lookout.JobRunStateOrdinalMap[lookout.JobRunLeaseReturned], Node: pointer.String("node"), - Leased: &baseTime, - Pending: &baseTime, + Leased: model.NewPostgreSQLTime(&baseTime), + Pending: model.NewPostgreSQLTime(&baseTime), RunId: "run-id", - Started: &baseTime, + Started: model.NewPostgreSQLTime(&baseTime), }, }, State: string(lookout.JobFailed), diff --git a/internal/lookoutv2/gen/restapi/embedded_spec.go b/internal/lookoutv2/gen/restapi/embedded_spec.go index ed1a7acecd3..eb940a1a35d 100644 --- a/internal/lookoutv2/gen/restapi/embedded_spec.go +++ b/internal/lookoutv2/gen/restapi/embedded_spec.go @@ -103,6 +103,9 @@ func init() { } } } + }, + { + "$ref": "#/parameters/backend" } ], "responses": { @@ -301,6 +304,9 @@ func init() { } } } + }, + { + "$ref": "#/parameters/backend" } ], "responses": { @@ -655,6 +661,17 @@ func init() { } } } + }, + "parameters": { + "backend": { + "enum": [ + "jsonb" + ], + "type": "string", + "description": "The backend to use for this request.", + "name": "backend", + "in": "query" + } } }`)) FlatSwaggerJSON = json.RawMessage([]byte(`{ @@ -743,6 +760,15 @@ func init() { } } } + }, + { + "enum": [ + "jsonb" + ], + "type": "string", + "description": "The backend to use for this request.", + "name": "backend", + "in": "query" } ], "responses": { @@ -941,6 +967,15 @@ func init() { } } } + }, + { + "enum": [ + "jsonb" + ], + "type": "string", + "description": "The backend to use for this request.", + "name": "backend", + "in": "query" } ], "responses": { @@ -1312,6 +1347,17 @@ func init() { } } } + }, + "parameters": { + "backend": { + "enum": [ + "jsonb" + ], + "type": "string", + "description": "The backend to use for this request.", + "name": "backend", + "in": "query" + } } }`)) } diff --git a/internal/lookoutv2/gen/restapi/operations/get_jobs_parameters.go b/internal/lookoutv2/gen/restapi/operations/get_jobs_parameters.go index de30e30006d..3085016524a 100644 --- a/internal/lookoutv2/gen/restapi/operations/get_jobs_parameters.go +++ b/internal/lookoutv2/gen/restapi/operations/get_jobs_parameters.go @@ -13,6 +13,7 @@ import ( "github.com/go-openapi/errors" "github.com/go-openapi/runtime" "github.com/go-openapi/runtime/middleware" + "github.com/go-openapi/strfmt" "github.com/go-openapi/validate" ) @@ -33,6 +34,10 @@ type GetJobsParams struct { // HTTP Request Object HTTPRequest *http.Request `json:"-"` + /*The backend to use for this request. + In: query + */ + Backend *string /* Required: true In: body @@ -49,6 +54,13 @@ func (o *GetJobsParams) BindRequest(r *http.Request, route *middleware.MatchedRo o.HTTPRequest = r + qs := runtime.Values(r.URL.Query()) + + qBackend, qhkBackend, _ := qs.GetOK("backend") + if err := o.bindBackend(qBackend, qhkBackend, route.Formats); err != nil { + res = append(res, err) + } + if runtime.HasBody(r) { defer r.Body.Close() var body GetJobsBody @@ -81,3 +93,35 @@ func (o *GetJobsParams) BindRequest(r *http.Request, route *middleware.MatchedRo } return nil } + +// bindBackend binds and validates parameter Backend from query. +func (o *GetJobsParams) bindBackend(rawData []string, hasKey bool, formats strfmt.Registry) error { + var raw string + if len(rawData) > 0 { + raw = rawData[len(rawData)-1] + } + + // Required: false + // AllowEmptyValue: false + + if raw == "" { // empty values pass all other validations + return nil + } + o.Backend = &raw + + if err := o.validateBackend(formats); err != nil { + return err + } + + return nil +} + +// validateBackend carries on validations for parameter Backend +func (o *GetJobsParams) validateBackend(formats strfmt.Registry) error { + + if err := validate.EnumCase("backend", "query", *o.Backend, []interface{}{"jsonb"}, true); err != nil { + return err + } + + return nil +} diff --git a/internal/lookoutv2/gen/restapi/operations/get_jobs_urlbuilder.go b/internal/lookoutv2/gen/restapi/operations/get_jobs_urlbuilder.go index dbf9f5ba85d..57679cd2ce2 100644 --- a/internal/lookoutv2/gen/restapi/operations/get_jobs_urlbuilder.go +++ b/internal/lookoutv2/gen/restapi/operations/get_jobs_urlbuilder.go @@ -13,7 +13,11 @@ import ( // GetJobsURL generates an URL for the get jobs operation type GetJobsURL struct { + Backend *string + _basePath string + // avoid unkeyed usage + _ struct{} } // WithBasePath sets the base path for this url builder, only required when it's different from the @@ -40,6 +44,18 @@ func (o *GetJobsURL) Build() (*url.URL, error) { _basePath := o._basePath _result.Path = golangswaggerpaths.Join(_basePath, _path) + qs := make(url.Values) + + var backendQ string + if o.Backend != nil { + backendQ = *o.Backend + } + if backendQ != "" { + qs.Set("backend", backendQ) + } + + _result.RawQuery = qs.Encode() + return &_result, nil } diff --git a/internal/lookoutv2/gen/restapi/operations/group_jobs_parameters.go b/internal/lookoutv2/gen/restapi/operations/group_jobs_parameters.go index 6fea992693d..6df300eeabb 100644 --- a/internal/lookoutv2/gen/restapi/operations/group_jobs_parameters.go +++ b/internal/lookoutv2/gen/restapi/operations/group_jobs_parameters.go @@ -13,6 +13,7 @@ import ( "github.com/go-openapi/errors" "github.com/go-openapi/runtime" "github.com/go-openapi/runtime/middleware" + "github.com/go-openapi/strfmt" "github.com/go-openapi/validate" ) @@ -33,6 +34,10 @@ type GroupJobsParams struct { // HTTP Request Object HTTPRequest *http.Request `json:"-"` + /*The backend to use for this request. + In: query + */ + Backend *string /* Required: true In: body @@ -49,6 +54,13 @@ func (o *GroupJobsParams) BindRequest(r *http.Request, route *middleware.Matched o.HTTPRequest = r + qs := runtime.Values(r.URL.Query()) + + qBackend, qhkBackend, _ := qs.GetOK("backend") + if err := o.bindBackend(qBackend, qhkBackend, route.Formats); err != nil { + res = append(res, err) + } + if runtime.HasBody(r) { defer r.Body.Close() var body GroupJobsBody @@ -81,3 +93,35 @@ func (o *GroupJobsParams) BindRequest(r *http.Request, route *middleware.Matched } return nil } + +// bindBackend binds and validates parameter Backend from query. +func (o *GroupJobsParams) bindBackend(rawData []string, hasKey bool, formats strfmt.Registry) error { + var raw string + if len(rawData) > 0 { + raw = rawData[len(rawData)-1] + } + + // Required: false + // AllowEmptyValue: false + + if raw == "" { // empty values pass all other validations + return nil + } + o.Backend = &raw + + if err := o.validateBackend(formats); err != nil { + return err + } + + return nil +} + +// validateBackend carries on validations for parameter Backend +func (o *GroupJobsParams) validateBackend(formats strfmt.Registry) error { + + if err := validate.EnumCase("backend", "query", *o.Backend, []interface{}{"jsonb"}, true); err != nil { + return err + } + + return nil +} diff --git a/internal/lookoutv2/gen/restapi/operations/group_jobs_urlbuilder.go b/internal/lookoutv2/gen/restapi/operations/group_jobs_urlbuilder.go index 6c086342ca7..6c72a5424a0 100644 --- a/internal/lookoutv2/gen/restapi/operations/group_jobs_urlbuilder.go +++ b/internal/lookoutv2/gen/restapi/operations/group_jobs_urlbuilder.go @@ -13,7 +13,11 @@ import ( // GroupJobsURL generates an URL for the group jobs operation type GroupJobsURL struct { + Backend *string + _basePath string + // avoid unkeyed usage + _ struct{} } // WithBasePath sets the base path for this url builder, only required when it's different from the @@ -40,6 +44,18 @@ func (o *GroupJobsURL) Build() (*url.URL, error) { _basePath := o._basePath _result.Path = golangswaggerpaths.Join(_basePath, _path) + qs := make(url.Values) + + var backendQ string + if o.Backend != nil { + backendQ = *o.Backend + } + if backendQ != "" { + qs.Set("backend", backendQ) + } + + _result.RawQuery = qs.Encode() + return &_result, nil } diff --git a/internal/lookoutv2/model/model.go b/internal/lookoutv2/model/model.go index 8ffd27b0a9d..c3f88ec69f0 100644 --- a/internal/lookoutv2/model/model.go +++ b/internal/lookoutv2/model/model.go @@ -42,16 +42,52 @@ type Job struct { CancelReason *string } +// PostgreSQLTime is a wrapper around time.Time that converts to UTC when +// deserializing from JSON. +// +// It exists to work around the following issue: +// +// 1. PostgreSQL serializes UTC timestamps within a JSON object in the format +// "2023-11-03T09:10:42.201577+00:00"; in particular, this format uses a +// timezone offset instead of "Z" to indicate UTC. +// 2. When deserializing this UTC timestamp, Go sets the location of the +// resulting time.Time value to "local". +// 3. Tests compare timestamps with == instead of Equal, which means that two +// time.Time values with different locations are not considered equal +// (even if they represent the same instants in time). +type PostgreSQLTime struct { + Time time.Time +} + +func NewPostgreSQLTime(t *time.Time) *PostgreSQLTime { + if t == nil { + return nil + } + return &PostgreSQLTime{Time: *t} +} + +func (t PostgreSQLTime) MarshalJSON() ([]byte, error) { + return t.Time.MarshalJSON() +} + +func (t *PostgreSQLTime) UnmarshalJSON(b []byte) error { + if err := t.Time.UnmarshalJSON(b); err != nil { + return err + } + t.Time = t.Time.UTC() + return nil +} + type Run struct { Cluster string ExitCode *int32 - Finished *time.Time - JobRunState string + Finished *PostgreSQLTime + JobRunState int Node *string - Leased *time.Time - Pending *time.Time + Leased *PostgreSQLTime + Pending *PostgreSQLTime RunId string - Started *time.Time + Started *PostgreSQLTime } type JobGroup struct { diff --git a/internal/lookoutv2/repository/getjobs.go b/internal/lookoutv2/repository/getjobs.go index 39566cd3263..7e283003f6d 100644 --- a/internal/lookoutv2/repository/getjobs.go +++ b/internal/lookoutv2/repository/getjobs.go @@ -2,6 +2,7 @@ package repository import ( "database/sql" + "encoding/json" "fmt" "sort" "time" @@ -22,8 +23,9 @@ type GetJobsRepository interface { } type SqlGetJobsRepository struct { - db *pgxpool.Pool - lookoutTables *LookoutTables + db *pgxpool.Pool + lookoutTables *LookoutTables + useJsonbBackend bool } type GetJobsResult struct { @@ -70,14 +72,23 @@ type annotationRow struct { annotationValue string } -func NewSqlGetJobsRepository(db *pgxpool.Pool) *SqlGetJobsRepository { +func NewSqlGetJobsRepository(db *pgxpool.Pool, useJsonbBackend bool) *SqlGetJobsRepository { return &SqlGetJobsRepository{ - db: db, - lookoutTables: NewTables(), + db: db, + lookoutTables: NewTables(), + useJsonbBackend: useJsonbBackend, } } func (r *SqlGetJobsRepository) GetJobs(ctx *armadacontext.Context, filters []*model.Filter, activeJobSets bool, order *model.Order, skip int, take int) (*GetJobsResult, error) { + getJobs := r.getJobs + if r.useJsonbBackend { + getJobs = r.getJobsJsonb + } + return getJobs(ctx, filters, activeJobSets, order, skip, take) +} + +func (r *SqlGetJobsRepository) getJobs(ctx *armadacontext.Context, filters []*model.Filter, activeJobSets bool, order *model.Order, skip int, take int) (*GetJobsResult, error) { var jobRows []*jobRow var runRows []*runRow var annotationRows []*annotationRow @@ -134,33 +145,77 @@ func (r *SqlGetJobsRepository) GetJobs(ctx *armadacontext.Context, filters []*mo }, nil } +func (r *SqlGetJobsRepository) getJobsJsonb(ctx *armadacontext.Context, filters []*model.Filter, activeJobSets bool, order *model.Order, skip int, take int) (*GetJobsResult, error) { + query, err := NewQueryBuilder(r.lookoutTables).GetJobsJsonb(filters, activeJobSets, order, skip, take) + if err != nil { + return nil, err + } + logQuery(query, "GetJobs") + var jobs []*model.Job + if err := pgx.BeginTxFunc(ctx, r.db, pgx.TxOptions{ + IsoLevel: pgx.RepeatableRead, + AccessMode: pgx.ReadWrite, + DeferrableMode: pgx.Deferrable, + }, func(tx pgx.Tx) error { + rows, err := tx.Query(ctx, query.Sql, query.Args...) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var row jobRow + var annotations sql.NullString + var runs sql.NullString + if err := rows.Scan( + &row.jobId, + &row.queue, + &row.owner, + &row.namespace, + &row.jobSet, + &row.cpu, + &row.memory, + &row.ephemeralStorage, + &row.gpu, + &row.priority, + &row.submitted, + &row.cancelled, + &row.state, + &row.lastTransitionTime, + &row.duplicate, + &row.priorityClass, + &row.latestRunId, + &row.cancelReason, + &annotations, + &runs, + ); err != nil { + return err + } + job := jobRowToModel(&row) + if annotations.Valid { + if err := json.Unmarshal([]byte(annotations.String), &job.Annotations); err != nil { + return err + } + } + if runs.Valid { + if err := json.Unmarshal([]byte(runs.String), &job.Runs); err != nil { + return err + } + } + jobs = append(jobs, job) + } + return nil + }); err != nil { + return nil, err + } + return &GetJobsResult{Jobs: jobs}, nil +} + func rowsToJobs(jobRows []*jobRow, runRows []*runRow, annotationRows []*annotationRow) ([]*model.Job, error) { jobMap := make(map[string]*model.Job) // Map from Job ID to Job orderedJobIds := make([]string, len(jobRows)) for i, row := range jobRows { - job := &model.Job{ - Annotations: make(map[string]string), - Cancelled: database.ParseNullTime(row.cancelled), - Cpu: row.cpu, - Duplicate: row.duplicate, - EphemeralStorage: row.ephemeralStorage, - Gpu: row.gpu, - JobId: row.jobId, - JobSet: row.jobSet, - LastActiveRunId: database.ParseNullString(row.latestRunId), - LastTransitionTime: row.lastTransitionTime, - Memory: row.memory, - Owner: row.owner, - Namespace: database.ParseNullString(row.namespace), - Priority: row.priority, - PriorityClass: database.ParseNullString(row.priorityClass), - Queue: row.queue, - Runs: []*model.Run{}, - State: string(lookout.JobStateMap[row.state]), - Submitted: row.submitted, - CancelReason: database.ParseNullString(row.cancelReason), - } + job := jobRowToModel(row) jobMap[row.jobId] = job orderedJobIds[i] = row.jobId } @@ -169,13 +224,13 @@ func rowsToJobs(jobRows []*jobRow, runRows []*runRow, annotationRows []*annotati run := &model.Run{ Cluster: row.cluster, ExitCode: database.ParseNullInt32(row.exitCode), - Finished: database.ParseNullTime(row.finished), - JobRunState: string(lookout.JobRunStateMap[row.jobRunState]), + Finished: model.NewPostgreSQLTime(database.ParseNullTime(row.finished)), + JobRunState: row.jobRunState, Node: database.ParseNullString(row.node), - Leased: database.ParseNullTime(row.leased), - Pending: database.ParseNullTime(row.pending), + Leased: model.NewPostgreSQLTime(database.ParseNullTime(row.leased)), + Pending: model.NewPostgreSQLTime(database.ParseNullTime(row.pending)), RunId: row.runId, - Started: database.ParseNullTime(row.started), + Started: model.NewPostgreSQLTime(database.ParseNullTime(row.started)), } job, ok := jobMap[row.jobId] if !ok { @@ -202,6 +257,31 @@ func rowsToJobs(jobRows []*jobRow, runRows []*runRow, annotationRows []*annotati return jobs, nil } +func jobRowToModel(row *jobRow) *model.Job { + return &model.Job{ + Annotations: make(map[string]string), + Cancelled: database.ParseNullTime(row.cancelled), + Cpu: row.cpu, + Duplicate: row.duplicate, + EphemeralStorage: row.ephemeralStorage, + Gpu: row.gpu, + JobId: row.jobId, + JobSet: row.jobSet, + LastActiveRunId: database.ParseNullString(row.latestRunId), + LastTransitionTime: row.lastTransitionTime, + Memory: row.memory, + Owner: row.owner, + Namespace: database.ParseNullString(row.namespace), + Priority: row.priority, + PriorityClass: database.ParseNullString(row.priorityClass), + Queue: row.queue, + Runs: make([]*model.Run, 0), + State: string(lookout.JobStateMap[row.state]), + Submitted: row.submitted, + CancelReason: database.ParseNullString(row.cancelReason), + } +} + func sortRuns(runs []*model.Run) { sort.Slice(runs, func(i, j int) bool { timeA, err := getJobRunTime(runs[i]) @@ -220,10 +300,10 @@ func sortRuns(runs []*model.Run) { func getJobRunTime(run *model.Run) (time.Time, error) { if run.Leased != nil { - return *run.Leased, nil + return run.Leased.Time, nil } if run.Pending != nil { - return *run.Pending, nil + return run.Pending.Time, nil } return time.Time{}, errors.Errorf("error when getting run time for run with id %s", run.RunId) } diff --git a/internal/lookoutv2/repository/getjobs_test.go b/internal/lookoutv2/repository/getjobs_test.go index 85b948e007d..3fc3342e29b 100644 --- a/internal/lookoutv2/repository/getjobs_test.go +++ b/internal/lookoutv2/repository/getjobs_test.go @@ -53,12 +53,17 @@ var ( ) func withGetJobsSetup(f func(*instructions.InstructionConverter, *lookoutdb.LookoutDb, *SqlGetJobsRepository) error) error { - return lookout.WithLookoutDb(func(db *pgxpool.Pool) error { - converter := instructions.NewInstructionConverter(metrics.Get(), userAnnotationPrefix, &compress.NoOpCompressor{}, true) - store := lookoutdb.NewLookoutDb(db, nil, metrics.Get(), 10) - repo := NewSqlGetJobsRepository(db) - return f(converter, store, repo) - }) + for _, useJsonbBackend := range []bool{false, true} { + if err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error { + converter := instructions.NewInstructionConverter(metrics.Get(), userAnnotationPrefix, &compress.NoOpCompressor{}, true) + store := lookoutdb.NewLookoutDb(db, nil, metrics.Get(), 10) + repo := NewSqlGetJobsRepository(db, useJsonbBackend) + return f(converter, store, repo) + }); err != nil { + return err + } + } + return nil } func TestGetJobsSingle(t *testing.T) { diff --git a/internal/lookoutv2/repository/groupjobs.go b/internal/lookoutv2/repository/groupjobs.go index 43b3e296f34..7ee8f53c098 100644 --- a/internal/lookoutv2/repository/groupjobs.go +++ b/internal/lookoutv2/repository/groupjobs.go @@ -30,16 +30,18 @@ type GroupJobsRepository interface { } type SqlGroupJobsRepository struct { - db *pgxpool.Pool - lookoutTables *LookoutTables + db *pgxpool.Pool + lookoutTables *LookoutTables + useJsonbBackend bool } const stateAggregatePrefix = "state_" -func NewSqlGroupJobsRepository(db *pgxpool.Pool) *SqlGroupJobsRepository { +func NewSqlGroupJobsRepository(db *pgxpool.Pool, useJsonbBackend bool) *SqlGroupJobsRepository { return &SqlGroupJobsRepository{ - db: db, - lookoutTables: NewTables(), + db: db, + lookoutTables: NewTables(), + useJsonbBackend: useJsonbBackend, } } @@ -53,26 +55,31 @@ func (r *SqlGroupJobsRepository) GroupBy( skip int, take int, ) (*GroupByResult, error) { + qb := NewQueryBuilder(r.lookoutTables) + groupBy := qb.GroupBy + if r.useJsonbBackend { + groupBy = qb.GroupByJsonb + } + query, err := groupBy(filters, activeJobSets, order, groupedField, aggregates, skip, take) + if err != nil { + return nil, err + } + logQuery(query, "GroupBy") + var groups []*model.JobGroup - err := pgx.BeginTxFunc(ctx, r.db, pgx.TxOptions{ + if err := pgx.BeginTxFunc(ctx, r.db, pgx.TxOptions{ IsoLevel: pgx.RepeatableRead, AccessMode: pgx.ReadOnly, DeferrableMode: pgx.Deferrable, }, func(tx pgx.Tx) error { - groupByQuery, err := NewQueryBuilder(r.lookoutTables).GroupBy(filters, activeJobSets, order, groupedField, aggregates, skip, take) - if err != nil { - return err - } - logQuery(groupByQuery, "GroupBy") - groupRows, err := tx.Query(ctx, groupByQuery.Sql, groupByQuery.Args...) + groupRows, err := tx.Query(ctx, query.Sql, query.Args...) if err != nil { return err } groups, err = rowsToGroups(groupRows, groupedField, aggregates, filters) return err - }) - if err != nil { + }); err != nil { return nil, err } diff --git a/internal/lookoutv2/repository/groupjobs_test.go b/internal/lookoutv2/repository/groupjobs_test.go index 40626c3d059..328d44650e0 100644 --- a/internal/lookoutv2/repository/groupjobs_test.go +++ b/internal/lookoutv2/repository/groupjobs_test.go @@ -21,12 +21,17 @@ import ( ) func withGroupJobsSetup(f func(*instructions.InstructionConverter, *lookoutdb.LookoutDb, *SqlGroupJobsRepository) error) error { - return lookout.WithLookoutDb(func(db *pgxpool.Pool) error { - converter := instructions.NewInstructionConverter(metrics.Get(), userAnnotationPrefix, &compress.NoOpCompressor{}, false) - store := lookoutdb.NewLookoutDb(db, nil, metrics.Get(), 10) - repo := NewSqlGroupJobsRepository(db) - return f(converter, store, repo) - }) + for _, useJsonbBackend := range []bool{false, true} { + if err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error { + converter := instructions.NewInstructionConverter(metrics.Get(), userAnnotationPrefix, &compress.NoOpCompressor{}, false) + store := lookoutdb.NewLookoutDb(db, nil, metrics.Get(), 10) + repo := NewSqlGroupJobsRepository(db, useJsonbBackend) + return f(converter, store, repo) + }); err != nil { + return err + } + } + return nil } func TestGroupByQueue(t *testing.T) { diff --git a/internal/lookoutv2/repository/querybuilder.go b/internal/lookoutv2/repository/querybuilder.go index c66ac410144..0f77b5fab67 100644 --- a/internal/lookoutv2/repository/querybuilder.go +++ b/internal/lookoutv2/repository/querybuilder.go @@ -22,6 +22,20 @@ const ( activeJobSetsTableAbbrev = "active_job_sets" ) +var ( + activeJobSetsTable = fmt.Sprintf( + `( + SELECT DISTINCT %s, %s + FROM %s + WHERE state IN (%d, %d, %d, %d) +)`, + queueCol, jobSetCol, + jobTable, + lookout.JobQueuedOrdinal, lookout.JobPendingOrdinal, lookout.JobRunningOrdinal, lookout.JobLeasedOrdinal, + ) + joinWithActiveJobSetsTable = fmt.Sprintf("INNER JOIN %s AS %s USING (%s, %s)", activeJobSetsTable, activeJobSetsTableAbbrev, queueCol, jobSetCol) +) + type Query struct { Sql string Args []interface{} @@ -147,7 +161,101 @@ func (qb *QueryBuilder) InsertIntoTempTable(tempTableName string, filters []*mod }, nil } -// GroupBy returns Query that performs a group by on filters +func (qb *QueryBuilder) GetJobsJsonb( + filters []*model.Filter, + activeJobSets bool, + order *model.Order, + skip int, + take int, +) (*Query, error) { + if err := qb.validateFilters(filters); err != nil { + return nil, errors.Wrap(err, "filters are invalid") + } + if err := qb.validateOrder(order); err != nil { + return nil, errors.Wrap(err, "order is invalid") + } + + activeJobSetsFilter := "" + if activeJobSets { + activeJobSetsFilter = joinWithActiveJobSetsTable + } + + where, err := qb.makeWhereJsonb(filters) + if err != nil { + return nil, err + } + + orderBy, err := qb.makeOrderByJsonb(order) + if err != nil { + return nil, err + } + + query := fmt.Sprintf( + `SELECT + selected_jobs.job_id, + selected_jobs.queue, + selected_jobs.owner, + selected_jobs.namespace, + selected_jobs.jobset, + selected_jobs.cpu, + selected_jobs.memory, + selected_jobs.ephemeral_storage, + selected_jobs.gpu, + selected_jobs.priority, + selected_jobs.submitted, + selected_jobs.cancelled, + selected_jobs.state, + selected_jobs.last_transition_time, + selected_jobs.duplicate, + selected_jobs.priority_class, + selected_jobs.latest_run_id, + selected_jobs.cancel_reason, + selected_jobs.annotations, + selected_runs.runs +FROM ( + SELECT * + FROM %s AS %s + %s + %s + %s + %s +) AS selected_jobs +CROSS JOIN LATERAL ( + SELECT + COALESCE( + json_agg( + json_strip_nulls( + json_build_object( + 'runId', run_id, + 'cluster', cluster, + 'node', node, + 'leased', leased AT TIME ZONE 'UTC', + 'pending', pending AT TIME ZONE 'UTC', + 'started', started AT TIME ZONE 'UTC', + 'finished', finished AT TIME ZONE 'UTC', + 'jobRunState', job_run_state, + 'exitCode', exit_code + ) + ) + ORDER BY COALESCE(leased, pending) + ) FILTER (WHERE run_id IS NOT NULL), + '[]' + ) AS runs + FROM %s + WHERE job_id = selected_jobs.job_id +) AS selected_runs`, + jobTable, jobTableAbbrev, + activeJobSetsFilter, + where, + orderBy, + limitOffsetSql(skip, take), + jobRunTable, + ) + + query, args := templateSql(query, qb.queryValues) + return &Query{Sql: query, Args: args}, nil +} + func (qb *QueryBuilder) GroupBy( filters []*model.Filter, activeJobSets bool, @@ -257,6 +365,89 @@ func (qb *QueryBuilder) GroupBy( }, nil } +func (qb *QueryBuilder) GroupByJsonb( + filters []*model.Filter, + activeJobSets bool, + order *model.Order, + groupedField *model.GroupedField, + aggregates []string, + skip int, + take int, +) (*Query, error) { + err := qb.validateFilters(filters) + if err != nil { + return nil, errors.Wrap(err, "filters are invalid") + } + err = qb.validateGroupOrder(order) + if err != nil { + return nil, errors.Wrap(err, "group order is invalid") + } + err = qb.validateAggregates(aggregates) + if err != nil { + return nil, errors.Wrap(err, "aggregates are invalid") + } + err = qb.validateGroupedField(groupedField) + if err != nil { + return nil, errors.Wrap(err, "group field is invalid") + } + + activeJobSetsFilter := "" + if activeJobSets { + activeJobSetsFilter = joinWithActiveJobSetsTable + } + + groupByColumn := queryColumn{table: jobTable, abbrev: jobTableAbbrev} + if groupedField.IsAnnotation { + groupByColumn.name = qb.annotationColumnJsonb(groupedField.Field) + } else { + groupByColumn.name = groupedField.Field + } + + queryAggregators, err := qb.getQueryAggregators(aggregates, filters, map[string]bool{jobTable: true}) + if err != nil { + return nil, err + } + selectList, err := qb.getAggregatesSql(queryAggregators) + if err != nil { + return nil, err + } + + where, err := qb.makeWhereJsonb(filters) + if err != nil { + return nil, err + } + + groupBy, err := qb.createGroupBySQL(order, &groupByColumn, aggregates) + if err != nil { + return nil, err + } + + orderBy, err := qb.groupByOrderSql(order) + if err != nil { + return nil, err + } + + query := fmt.Sprintf( + `SELECT %s.%s, %s +FROM %s as %s +%s +%s +%s +%s +%s`, + groupByColumn.abbrev, groupByColumn.name, selectList, + jobTable, jobTableAbbrev, + activeJobSetsFilter, + where, + groupBy, + orderBy, + limitOffsetSql(skip, take), + ) + + query, args := templateSql(query, qb.queryValues) + return &Query{Sql: query, Args: args}, nil +} + func (qb *QueryBuilder) createGroupBySQL(order *model.Order, groupCol *queryColumn, aggregates []string) (string, error) { expr := fmt.Sprintf("GROUP BY %s.%s", groupCol.abbrev, groupCol.name) isInAggregators := len(aggregates) > 0 && func(sl []string, t string) bool { @@ -457,16 +648,12 @@ func (qb *QueryBuilder) makeFromSql(queryTables map[string]bool, normalFilters [ } if activeJobSets { - activeJobSetsTable := ` - SELECT DISTINCT queue, jobset - FROM job - WHERE state IN (1, 2, 3, 8) - ` fromBuilder.Join( Inner, - fmt.Sprintf("( %s )", activeJobSetsTable), + activeJobSetsTable, activeJobSetsTableAbbrev, - []string{queueCol, jobSetCol}) + []string{queueCol, jobSetCol}, + ) } return fromBuilder, nil @@ -550,7 +737,7 @@ func (qb *QueryBuilder) annotationFilterCondition(annotationFilter *model.Filter if annotationFilter.Match == model.MatchExists { return fmt.Sprintf("%s = %s", annotationKeyCol, key), nil } - comparator, err := qb.comparatorForMatch(annotationFilter.Match) + comparator, err := operatorForMatch(annotationFilter.Match) if err != nil { return "", err } @@ -594,7 +781,7 @@ func (qb *QueryBuilder) makeQueryFilters(filters []*model.Filter, queryTables ma } value := filter.Value if col == stateCol { - value, err = parseValueForState(filter) + value, err = parseValueForState(value) if err != nil { return nil, err } @@ -612,8 +799,8 @@ func (qb *QueryBuilder) makeQueryFilters(filters []*model.Filter, queryTables ma return result, nil } -func parseValueForState(filter *model.Filter) (interface{}, error) { - switch v := filter.Value.(type) { +func parseValueForState(value interface{}) (interface{}, error) { + switch v := value.(type) { case string: ordinal, err := stateToOrdinal(v) if err != nil { @@ -642,7 +829,7 @@ func parseValueForState(filter *model.Filter) (interface{}, error) { } return result, nil default: - return nil, errors.Errorf("unsupported type for state: %v: %T", filter.Value, filter.Value) + return nil, errors.Errorf("unsupported type for state: %v: %T", value, value) } } @@ -666,7 +853,7 @@ func (qb *QueryBuilder) queryFiltersToSql(filters []*queryFilter, useAbbrev bool // Given a value, a match, a table abbreviation and a column name, returns the corresponding comparison expression for // use in a WHERE clause func (qb *QueryBuilder) comparisonExpr(value interface{}, match, abbrev, colName string, useAbbrev bool) (string, error) { - comparator, err := qb.comparatorForMatch(match) + comparator, err := operatorForMatch(match) if err != nil { return "", err } @@ -684,8 +871,93 @@ func (qb *QueryBuilder) comparisonExpr(value interface{}, match, abbrev, colName abbrev, colName, comparator, formattedValue), nil } -// Given a match string, return the corresponding SQL compare operation -func (qb *QueryBuilder) comparatorForMatch(match string) (string, error) { +func (qb *QueryBuilder) makeWhereJsonb(filters []*model.Filter) (string, error) { + if len(filters) == 0 { + return "", nil + } + var clauses []string + for _, filter := range filters { + clause, err := qb.makeWhereClauseJsonb(filter) + if err != nil { + return "", err + } + clauses = append(clauses, clause) + } + return fmt.Sprintf("WHERE %s", strings.Join(clauses, " AND ")), nil +} + +func (qb *QueryBuilder) makeWhereClauseJsonb(filter *model.Filter) (string, error) { + var column string + if filter.IsAnnotation { + switch filter.Match { + case model.MatchExact: + placeholder := qb.recordValue(map[string]interface{}{filter.Field: filter.Value}) + // GIN indexes are very particular about the kinds of predicates they + // support; for example, neither + // + // annotations->>'host_instance_id' = '35170439' + // + // nor + // + // annotations['host_instance_id'] = '35170439' + // + // can use the GIN index on the annotations column, as jsonb_path_ops + // GIN indexes only support the operators @>, @?, and @@: + // + // https://www.postgresql.org/docs/current/datatype-json.html#JSON-INDEXING + return fmt.Sprintf("%s.annotations @> %s", jobTableAbbrev, placeholder), nil + case model.MatchExists: + placeholder := qb.recordValue(filter.Field) + return fmt.Sprintf("%s.annotations ? %s", jobTableAbbrev, placeholder), nil + default: + column = qb.annotationColumnJsonb(filter.Field) + } + } else { + var err error + column, err = qb.lookoutTables.ColumnFromField(filter.Field) + if err != nil { + return "", err + } + } + + operator, err := operatorForMatch(filter.Match) + if err != nil { + return "", err + } + + value := filter.Value + if column == stateCol { + var err error + value, err = parseValueForState(value) + if err != nil { + return "", err + } + } + placeholder, err := qb.valueForMatch(value, filter.Match) + if err != nil { + return "", err + } + + return fmt.Sprintf("%s.%s %s %s", jobTableAbbrev, column, operator, placeholder), nil +} + +func (qb *QueryBuilder) annotationColumnJsonb(key string) string { + placeholder := qb.recordValue(key) + return fmt.Sprintf("annotations->>%s", placeholder) +} + +func (qb *QueryBuilder) makeOrderByJsonb(order *model.Order) (string, error) { + if orderIsNull(order) { + return "", nil + } + column, err := qb.lookoutTables.ColumnFromField(order.Field) + if err != nil { + return "", err + } + return fmt.Sprintf("ORDER BY %s.%s %s", jobTableAbbrev, column, order.Direction), nil +} + +func operatorForMatch(match string) (string, error) { switch match { case model.MatchExact: return "=", nil @@ -840,7 +1112,7 @@ func (qb *QueryBuilder) getQueryAggregators(aggregates []string, filters []*mode } func (qb *QueryBuilder) getAggregatesSql(aggregators []QueryAggregator) (string, error) { - selectList := []string{"COUNT(*) AS count"} + selectList := []string{fmt.Sprintf("COUNT(*) AS %s", countCol)} for _, agg := range aggregators { sql, err := agg.AggregateSql() if err != nil { diff --git a/internal/lookoutv2/repository/util.go b/internal/lookoutv2/repository/util.go index 9e4c98bb2fc..a7b1de95cac 100644 --- a/internal/lookoutv2/repository/util.go +++ b/internal/lookoutv2/repository/util.go @@ -55,7 +55,7 @@ type runPatch struct { cluster *string exitCode *int32 finished *time.Time - jobRunState *string + jobRunState lookout.JobRunState node *string leased *time.Time pending *time.Time @@ -188,7 +188,7 @@ func (js *JobSimulator) Lease(runId string, timestamp time.Time) *JobSimulator { js.job.State = string(lookout.JobLeased) updateRun(js.job, &runPatch{ runId: runId, - jobRunState: pointer.String(string(lookout.JobRunLeased)), + jobRunState: lookout.JobRunLeased, leased: &ts, }) return js @@ -228,7 +228,7 @@ func (js *JobSimulator) Pending(runId string, cluster string, timestamp time.Tim rp := &runPatch{ runId: runId, cluster: &cluster, - jobRunState: pointer.String(string(lookout.JobRunPending)), + jobRunState: lookout.JobRunPending, pending: &ts, } if js.converter.IsLegacy() { @@ -266,7 +266,7 @@ func (js *JobSimulator) Running(runId string, node string, timestamp time.Time) js.job.State = string(lookout.JobRunning) updateRun(js.job, &runPatch{ runId: runId, - jobRunState: pointer.String(string(lookout.JobRunRunning)), + jobRunState: lookout.JobRunRunning, node: &node, started: &ts, }) @@ -291,7 +291,7 @@ func (js *JobSimulator) RunSucceeded(runId string, timestamp time.Time) *JobSimu runId: runId, exitCode: pointer.Int32(0), finished: &ts, - jobRunState: pointer.String(string(lookout.JobRunSucceeded)), + jobRunState: lookout.JobRunSucceeded, }) return js } @@ -339,7 +339,7 @@ func (js *JobSimulator) LeaseReturned(runId string, message string, timestamp ti updateRun(js.job, &runPatch{ runId: runId, finished: &ts, - jobRunState: pointer.String(string(lookout.JobRunLeaseReturned)), + jobRunState: lookout.JobRunLeaseReturned, }) return js } @@ -413,7 +413,7 @@ func (js *JobSimulator) RunFailed(runId string, node string, exitCode int32, mes runId: runId, exitCode: &exitCode, finished: &ts, - jobRunState: pointer.String(string(lookout.JobRunFailed)), + jobRunState: lookout.JobRunFailed, node: &node, }) return js @@ -506,7 +506,7 @@ func (js *JobSimulator) RunTerminated(runId string, cluster string, node string, runId: runId, cluster: &cluster, finished: &ts, - jobRunState: pointer.String(string(lookout.JobRunTerminated)), + jobRunState: lookout.JobRunTerminated, node: &node, }) return js @@ -543,7 +543,7 @@ func (js *JobSimulator) RunUnschedulable(runId string, cluster string, node stri runId: runId, cluster: &cluster, finished: &ts, - jobRunState: pointer.String(string(lookout.JobRunUnableToSchedule)), + jobRunState: lookout.JobRunUnableToSchedule, node: &node, }) return js @@ -573,7 +573,7 @@ func (js *JobSimulator) LeaseExpired(timestamp time.Time) *JobSimulator { updateRun(js.job, &runPatch{ runId: eventutil.LEGACY_RUN_ID, finished: &ts, - jobRunState: pointer.String(string(lookout.JobRunLeaseExpired)), + jobRunState: lookout.JobRunLeaseExpired, }) return js } @@ -623,20 +623,16 @@ func updateRun(job *model.Job, patch *runPatch) { if patch.cluster != nil { cluster = *patch.cluster } - state := "" - if patch.jobRunState != nil { - state = *patch.jobRunState - } job.Runs = append(job.Runs, &model.Run{ Cluster: cluster, ExitCode: patch.exitCode, - Finished: patch.finished, - JobRunState: state, + Finished: model.NewPostgreSQLTime(patch.finished), + JobRunState: lookout.JobRunStateOrdinalMap[patch.jobRunState], Node: patch.node, - Leased: patch.leased, - Pending: patch.pending, + Leased: model.NewPostgreSQLTime(patch.leased), + Pending: model.NewPostgreSQLTime(patch.pending), RunId: patch.runId, - Started: patch.started, + Started: model.NewPostgreSQLTime(patch.started), }) } @@ -648,22 +644,20 @@ func patchRun(run *model.Run, patch *runPatch) { run.ExitCode = patch.exitCode } if patch.finished != nil { - run.Finished = patch.finished - } - if patch.jobRunState != nil { - run.JobRunState = *patch.jobRunState + run.Finished = model.NewPostgreSQLTime(patch.finished) } + run.JobRunState = lookout.JobRunStateOrdinalMap[patch.jobRunState] if patch.node != nil { run.Node = patch.node } if patch.leased != nil { - run.Leased = patch.leased + run.Leased = model.NewPostgreSQLTime(patch.leased) } if patch.pending != nil { - run.Pending = patch.pending + run.Pending = model.NewPostgreSQLTime(patch.pending) } if patch.started != nil { - run.Started = patch.started + run.Started = model.NewPostgreSQLTime(patch.started) } } diff --git a/internal/lookoutv2/swagger.yaml b/internal/lookoutv2/swagger.yaml index d2a9e2d52f5..39024f5737d 100644 --- a/internal/lookoutv2/swagger.yaml +++ b/internal/lookoutv2/swagger.yaml @@ -242,6 +242,16 @@ definitions: minLength: 1 x-nullable: false +parameters: + backend: + name: backend + in: query + description: The backend to use for this request. + required: false + type: string + enum: + - jsonb + paths: /health: get: @@ -291,7 +301,7 @@ paths: take: type: integer description: "Number of jobs to fetch." - + - $ref: "#/parameters/backend" produces: - application/json responses: @@ -445,7 +455,7 @@ paths: take: type: integer description: "Number of job groups to fetch." - + - $ref: "#/parameters/backend" produces: - application/json responses: