From d21fca650c8152d992ad5f7f590f70b1368bc60b Mon Sep 17 00:00:00 2001 From: Helber Belmiro Date: Wed, 8 Jan 2025 12:24:22 -0300 Subject: [PATCH] fix(backend): Synced ScheduledWorkflow CRs on apiserver startup (#11469) Signed-off-by: Helber Belmiro --- .../client/scheduled_workflow_fake.go | 6 +- backend/src/apiserver/list/list.go | 19 ++++- backend/src/apiserver/list/list_test.go | 7 ++ backend/src/apiserver/main.go | 18 ++++- .../apiserver/resource/resource_manager.go | 73 +++++++++++++++++++ .../resource/resource_manager_test.go | 29 ++++++++ 6 files changed, 145 insertions(+), 7 deletions(-) diff --git a/backend/src/apiserver/client/scheduled_workflow_fake.go b/backend/src/apiserver/client/scheduled_workflow_fake.go index 5b81722ee35..970fd26e658 100644 --- a/backend/src/apiserver/client/scheduled_workflow_fake.go +++ b/backend/src/apiserver/client/scheduled_workflow_fake.go @@ -66,9 +66,9 @@ func (c *FakeScheduledWorkflowClient) Get(ctx context.Context, name string, opti return nil, k8errors.NewNotFound(k8schema.ParseGroupResource("scheduledworkflows.kubeflow.org"), name) } -func (c *FakeScheduledWorkflowClient) Update(context.Context, *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) { - glog.Error("This fake method is not yet implemented.") - return nil, nil +func (c *FakeScheduledWorkflowClient) Update(_ context.Context, scheduledWorkflow *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) { + c.scheduledWorkflows[scheduledWorkflow.Name] = scheduledWorkflow + return scheduledWorkflow, nil } func (c *FakeScheduledWorkflowClient) DeleteCollection(ctx context.Context, options *v1.DeleteOptions, listOptions v1.ListOptions) error { diff --git a/backend/src/apiserver/list/list.go b/backend/src/apiserver/list/list.go index e38be8f7339..174eff961d8 100644 --- a/backend/src/apiserver/list/list.go +++ b/backend/src/apiserver/list/list.go @@ -22,6 +22,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "math" "reflect" "strings" @@ -97,6 +98,13 @@ type Options struct { *token } +func EmptyOptions() *Options { + return &Options{ + math.MaxInt32, + &token{}, + } +} + // Matches returns trues if the sorting and filtering criteria in o matches that // of the one supplied in opts. func (o *Options) Matches(opts *Options) bool { @@ -213,9 +221,14 @@ func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuild if o.IsDesc { order = "DESC" } - sqlBuilder = sqlBuilder. - OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order)). - OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order)) + + if o.SortByFieldName != "" { + sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order)) + } + + if o.KeyFieldName != "" { + sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order)) + } return sqlBuilder } diff --git a/backend/src/apiserver/list/list_test.go b/backend/src/apiserver/list/list_test.go index 1806e158eec..e207cd900ab 100644 --- a/backend/src/apiserver/list/list_test.go +++ b/backend/src/apiserver/list/list_test.go @@ -15,6 +15,8 @@ package list import ( + "fmt" + "math" "reflect" "strings" "testing" @@ -645,6 +647,11 @@ func TestAddPaginationAndFilterToSelect(t *testing.T) { wantSQL: "SELECT * FROM MyTable ORDER BY SortField DESC, KeyField DESC LIMIT 124", wantArgs: nil, }, + { + in: EmptyOptions(), + wantSQL: fmt.Sprintf("SELECT * FROM MyTable LIMIT %d", math.MaxInt32+1), + wantArgs: nil, + }, { in: &Options{ PageSize: 123, diff --git a/backend/src/apiserver/main.go b/backend/src/apiserver/main.go index 5430674897b..9841503a32f 100644 --- a/backend/src/apiserver/main.go +++ b/backend/src/apiserver/main.go @@ -28,6 +28,7 @@ import ( "os" "strconv" "strings" + "sync" "time" "github.com/fsnotify/fsnotify" @@ -106,10 +107,25 @@ func main() { } log.SetLevel(level) + backgroundCtx, backgroundCancel := context.WithCancel(context.Background()) + defer backgroundCancel() + wg := sync.WaitGroup{} + wg.Add(1) + go reconcileSwfCrs(resourceManager, backgroundCtx, &wg) go startRpcServer(resourceManager) + // This is blocking startHttpProxy(resourceManager) - + backgroundCancel() clientManager.Close() + wg.Wait() +} + +func reconcileSwfCrs(resourceManager *resource.ResourceManager, ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + err := resourceManager.ReconcileSwfCrs(ctx) + if err != nil { + log.Errorf("Could not reconcile the ScheduledWorkflow Kubernetes resources: %v", err) + } } // A custom http request header matcher to pass on the user identity diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 0e804de1dc7..94fdc578c62 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -18,8 +18,10 @@ import ( "context" "encoding/json" "fmt" + scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1" "io" "net" + "reflect" "strconv" "github.com/cenkalti/backoff" @@ -567,6 +569,77 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model return newRun, nil } +// ReconcileSwfCrs reconciles the ScheduledWorkflow CRs based on existing jobs. +func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error { + filterContext := &model.FilterContext{ + ReferenceKey: &model.ReferenceKey{Type: model.NamespaceResourceType, ID: common.GetPodNamespace()}, + } + + opts := list.EmptyOptions() + + jobs, _, _, err := r.jobStore.ListJobs(filterContext, opts) + + if err != nil { + return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources") + } + + for i := range jobs { + select { + case <-ctx.Done(): + return nil + default: + } + + tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec) + if err != nil { + return failedToReconcileSwfCrsError(err) + } + + newScheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i]) + if err != nil { + return failedToReconcileSwfCrsError(err) + } + + for { + currentScheduledWorkflow, err := r.getScheduledWorkflowClient(jobs[i].Namespace).Get(ctx, jobs[i].K8SName, v1.GetOptions{}) + if err != nil { + if util.IsNotFound(err) { + break + } + return failedToReconcileSwfCrsError(err) + } + + if !reflect.DeepEqual(currentScheduledWorkflow.Spec, newScheduledWorkflow.Spec) { + currentScheduledWorkflow.Spec = newScheduledWorkflow.Spec + err = r.updateSwfCrSpec(ctx, jobs[i].Namespace, currentScheduledWorkflow) + if err != nil { + if apierrors.IsConflict(errors.Unwrap(err)) { + continue + } else if util.IsNotFound(errors.Cause(err)) { + break + } + return failedToReconcileSwfCrsError(err) + } + } + break + } + } + + return nil +} + +func failedToReconcileSwfCrsError(err error) error { + return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources") +} + +func (r *ResourceManager) updateSwfCrSpec(ctx context.Context, k8sNamespace string, scheduledWorkflow *scheduledworkflow.ScheduledWorkflow) error { + _, err := r.getScheduledWorkflowClient(k8sNamespace).Update(ctx, scheduledWorkflow) + if err != nil { + return util.Wrap(err, "Failed to update ScheduledWorkflow") + } + return nil +} + // Fetches a run with a given id. func (r *ResourceManager) GetRun(runId string) (*model.Run, error) { run, err := r.runStore.GetRun(runId) diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index f02792a3a08..3a29893fec5 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -3146,6 +3146,35 @@ func TestReportScheduledWorkflowResource_Success_withRuntimeParamsV2(t *testing. assert.Equal(t, expectedJob.ToV1(), actualJob.ToV1()) } +func TestReconcileSwfCrs(t *testing.T) { + store, manager, job := initWithJobV2(t) + defer store.Close() + + fetchedJob, err := manager.GetJob(job.UUID) + require.Nil(t, err) + require.NotNil(t, fetchedJob) + + swfClient := store.SwfClient().ScheduledWorkflow("ns1") + + options := v1.GetOptions{} + ctx := context.Background() + + swf, err := swfClient.Get(ctx, "job-", options) + require.Nil(t, err) + + // emulates an invalid/outdated spec + swf.Spec.Workflow.Spec = nil + swf, err = swfClient.Update(ctx, swf) + require.Nil(t, swf.Spec.Workflow.Spec) + + err = manager.ReconcileSwfCrs(ctx) + require.Nil(t, err) + + swf, err = swfClient.Get(ctx, "job-", options) + require.Nil(t, err) + require.NotNil(t, swf.Spec.Workflow.Spec) +} + func TestReportScheduledWorkflowResource_Error(t *testing.T) { store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) defer store.Close()