Skip to content

Commit

Permalink
Add jsonb backend to Lookout (#3408)
Browse files Browse the repository at this point in the history
  • Loading branch information
zuqq authored Feb 26, 2024
1 parent 817f7d3 commit cec8e18
Show file tree
Hide file tree
Showing 21 changed files with 758 additions and 135 deletions.
4 changes: 3 additions & 1 deletion internal/common/database/lookout/jobstates.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ const (
JobPreemptedOrdinal = 7
JobLeasedOrdinal = 8

JobRunLeased JobRunState = "RUN_LEASED"
JobRunPending JobRunState = "RUN_PENDING"
JobRunRunning JobRunState = "RUN_RUNNING"
JobRunSucceeded JobRunState = "RUN_SUCCEEDED"
Expand All @@ -36,6 +35,7 @@ const (
JobRunLeaseReturned JobRunState = "RUN_LEASE_RETURNED"
JobRunLeaseExpired JobRunState = "RUN_LEASE_EXPIRED"
JobRunMaxRunsExceeded JobRunState = "RUN_MAX_RUNS_EXCEEDED"
JobRunLeased JobRunState = "RUN_LEASED"

JobRunPendingOrdinal = 1
JobRunRunningOrdinal = 2
Expand Down Expand Up @@ -89,4 +89,6 @@ var (
JobRunLeaseExpiredOrdinal: JobRunLeaseExpired,
JobRunMaxRunsExceededOrdinal: JobRunMaxRunsExceeded,
}

JobRunStateOrdinalMap = util.InverseMap(JobRunStateMap)
)
6 changes: 4 additions & 2 deletions internal/lookout/ui/src/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ import "./index.css"
const fakeDataEnabled = uiConfig.fakeDataEnabled

const v2TestJobs = fakeDataEnabled ? makeRandomJobs(10000, 42) : []
const v2GetJobsService = fakeDataEnabled ? new FakeGetJobsService(v2TestJobs) : new GetJobsService()
const v2GroupJobsService = fakeDataEnabled ? new FakeGroupJobsService(v2TestJobs) : new GroupJobsService()
const v2GetJobsService = fakeDataEnabled ? new FakeGetJobsService(v2TestJobs) : new GetJobsService(uiConfig.backend)
const v2GroupJobsService = fakeDataEnabled
? new FakeGroupJobsService(v2TestJobs)
: new GroupJobsService(uiConfig.backend)
const v2RunErrorService = fakeDataEnabled ? new FakeGetRunErrorService() : new GetRunErrorService()
const v2LogService = fakeDataEnabled
? new FakeLogService()
Expand Down
13 changes: 11 additions & 2 deletions internal/lookout/ui/src/services/lookoutV2/GetJobsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ export type GetJobsResponse = {
}

export class GetJobsService implements IGetJobsService {
private backend: string | undefined

constructor(backend: string | undefined) {
this.backend = backend
}

async getJobs(
filters: JobFilter[],
activeJobSets: boolean,
Expand All @@ -24,7 +30,11 @@ export class GetJobsService implements IGetJobsService {
take: number,
abortSignal?: AbortSignal,
): Promise<GetJobsResponse> {
const response = await fetch("/api/v1/jobs", {
let path = "/api/v1/jobs"
if (this.backend) {
path += "?" + new URLSearchParams({ backend: this.backend })
}
const response = await fetch(path, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
Expand All @@ -36,7 +46,6 @@ export class GetJobsService implements IGetJobsService {
}),
signal: abortSignal,
})

const json = await response.json()
return {
jobs: json.jobs ?? [],
Expand Down
12 changes: 11 additions & 1 deletion internal/lookout/ui/src/services/lookoutV2/GroupJobsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ export type GroupJobsResponse = {
}

export class GroupJobsService implements IGroupJobsService {
private backend: string | undefined

constructor(backend: string | undefined) {
this.backend = backend
}

async groupJobs(
filters: JobFilter[],
activeJobSets: boolean,
Expand All @@ -33,7 +39,11 @@ export class GroupJobsService implements IGroupJobsService {
take: number,
abortSignal?: AbortSignal,
): Promise<GroupJobsResponse> {
const response = await fetch("/api/v1/jobGroups", {
let path = "/api/v1/jobGroups"
if (this.backend) {
path += "?" + new URLSearchParams({ backend: this.backend })
}
const response = await fetch(path, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
Expand Down
6 changes: 6 additions & 0 deletions internal/lookout/ui/src/utils.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ interface UIConfig {
oidcEnabled: boolean
oidc?: OidcConfig
commandSpecs: CommandSpec[]
backend: string | undefined
}

export type RequestStatus = "Loading" | "Idle"
Expand Down Expand Up @@ -52,6 +53,7 @@ export async function getUIConfig(): Promise<UIConfig> {
oidcEnabled: false,
oidc: undefined,
commandSpecs: [],
backend: undefined,
}

try {
Expand All @@ -76,6 +78,7 @@ export async function getUIConfig(): Promise<UIConfig> {
})
}
}
if (json.Backend) config.backend = json.Backend
} catch (e) {
console.error(e)
}
Expand All @@ -91,6 +94,9 @@ export async function getUIConfig(): Promise<UIConfig> {

if (window.location.pathname === "/oidc") config.oidcEnabled = true

const backend = searchParams.get("backend")
if (backend) config.backend = backend

return config
}

Expand Down
18 changes: 14 additions & 4 deletions internal/lookoutv2/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ func Serve(configuration configuration.LookoutV2Config) error {
return err
}

getJobsRepo := repository.NewSqlGetJobsRepository(db)
groupJobsRepo := repository.NewSqlGroupJobsRepository(db)
getJobsRepo := repository.NewSqlGetJobsRepository(db, false)
getJobsJsonbRepo := repository.NewSqlGetJobsRepository(db, true)
groupJobsRepo := repository.NewSqlGroupJobsRepository(db, false)
groupJobsJsonbRepo := repository.NewSqlGroupJobsRepository(db, true)
decompressor := compress.NewThreadSafeZlibDecompressor()
getJobRunErrorRepo := repository.NewSqlGetJobRunErrorRepository(db, decompressor)
getJobSpecRepo := repository.NewSqlGetJobSpecRepository(db, decompressor)
Expand All @@ -54,7 +56,11 @@ func Serve(configuration configuration.LookoutV2Config) error {
func(params operations.GetJobsParams) middleware.Responder {
filters := util.Map(params.GetJobsRequest.Filters, conversions.FromSwaggerFilter)
order := conversions.FromSwaggerOrder(params.GetJobsRequest.Order)
result, err := getJobsRepo.GetJobs(
repo := getJobsRepo
if backend := params.Backend; backend != nil && *backend == "jsonb" {
repo = getJobsJsonbRepo
}
result, err := repo.GetJobs(
armadacontext.New(params.HTTPRequest.Context(), logger),
filters,
params.GetJobsRequest.ActiveJobSets,
Expand All @@ -75,7 +81,11 @@ func Serve(configuration configuration.LookoutV2Config) error {
func(params operations.GroupJobsParams) middleware.Responder {
filters := util.Map(params.GroupJobsRequest.Filters, conversions.FromSwaggerFilter)
order := conversions.FromSwaggerOrder(params.GroupJobsRequest.Order)
result, err := groupJobsRepo.GroupBy(
repo := groupJobsRepo
if backend := params.Backend; backend != nil && *backend == "jsonb" {
repo = groupJobsJsonbRepo
}
result, err := repo.GroupBy(
armadacontext.New(params.HTTPRequest.Context(), logger),
filters,
params.GroupJobsRequest.ActiveJobSets,
Expand Down
29 changes: 19 additions & 10 deletions internal/lookoutv2/conversions/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/go-openapi/strfmt"

"github.com/armadaproject/armada/internal/common/database/lookout"
"github.com/armadaproject/armada/internal/lookoutv2/gen/models"
"github.com/armadaproject/armada/internal/lookoutv2/gen/restapi/operations"
"github.com/armadaproject/armada/internal/lookoutv2/model"
Expand All @@ -17,7 +18,7 @@ func ToSwaggerJob(job *model.Job) *models.Job {
}
return &models.Job{
Annotations: job.Annotations,
Cancelled: toSwaggerTimePtr(job.Cancelled),
Cancelled: ToSwaggerTime(job.Cancelled),
CPU: job.Cpu,
Duplicate: job.Duplicate,
EphemeralStorage: job.EphemeralStorage,
Expand All @@ -43,13 +44,13 @@ func ToSwaggerRun(run *model.Run) *models.Run {
return &models.Run{
Cluster: run.Cluster,
ExitCode: run.ExitCode,
Finished: toSwaggerTimePtr(run.Finished),
JobRunState: run.JobRunState,
Finished: PostgreSQLTimeToSwaggerTime(run.Finished),
JobRunState: string(lookout.JobRunStateMap[run.JobRunState]),
Node: run.Node,
Leased: toSwaggerTimePtr(run.Leased),
Pending: toSwaggerTimePtr(run.Pending),
Leased: PostgreSQLTimeToSwaggerTime(run.Leased),
Pending: PostgreSQLTimeToSwaggerTime(run.Pending),
RunID: run.RunId,
Started: toSwaggerTimePtr(run.Started),
Started: PostgreSQLTimeToSwaggerTime(run.Started),
}
}

Expand Down Expand Up @@ -90,10 +91,18 @@ func FromSwaggerGroupedField(groupedField *operations.GroupJobsParamsBodyGrouped
}
}

func toSwaggerTimePtr(ts *time.Time) *strfmt.DateTime {
if ts == nil {
func ToSwaggerTime(t *time.Time) *strfmt.DateTime {
if t == nil {
return nil
}
swaggerTs := strfmt.DateTime(*ts)
return &swaggerTs
s := strfmt.DateTime(*t)
return &s
}

func PostgreSQLTimeToSwaggerTime(t *model.PostgreSQLTime) *strfmt.DateTime {
if t == nil {
return nil
}
s := strfmt.DateTime(t.Time)
return &s
}
10 changes: 5 additions & 5 deletions internal/lookoutv2/conversions/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ var (
{
Cluster: "cluster",
ExitCode: pointer.Int32(322),
Finished: &baseTime,
JobRunState: string(lookout.JobRunLeaseReturned),
Finished: model.NewPostgreSQLTime(&baseTime),
JobRunState: lookout.JobRunStateOrdinalMap[lookout.JobRunLeaseReturned],
Node: pointer.String("node"),
Leased: &baseTime,
Pending: &baseTime,
Leased: model.NewPostgreSQLTime(&baseTime),
Pending: model.NewPostgreSQLTime(&baseTime),
RunId: "run-id",
Started: &baseTime,
Started: model.NewPostgreSQLTime(&baseTime),
},
},
State: string(lookout.JobFailed),
Expand Down
46 changes: 46 additions & 0 deletions internal/lookoutv2/gen/restapi/embedded_spec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cec8e18

Please sign in to comment.