Skip to content

Commit

Permalink
fix job not stopped after execute findished
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick Zhao <[email protected]>
  • Loading branch information
PetrusZ committed Dec 23, 2024
1 parent c877c27 commit 927cd23
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2023 The KodeRover Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package mongodb

import (
"context"

"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/mongo"

"github.com/koderover/zadig/v2/pkg/microservice/aslan/config"
mongotool "github.com/koderover/zadig/v2/pkg/tool/mongo"
)

type UpdateWorkflowTaskLogColl struct {
*mongo.Collection

coll string
}

type UpdateWorkflowTaskLog struct {
WorkflowName string `bson:"workflow_name" json:"workflow_name"`
TaskID int64 `bson:"task_id" json:"task_id"`
StartTime int64 `bson:"start_time" json:"start_time"`
EndTime int64 `bson:"end_time" json:"end_time"`
Status string `bson:"status" json:"status"`
Data interface{} `bson:"data" json:"data"`
}

func (c UpdateWorkflowTaskLog) TableName() string {
return "update_workflow_task_log"
}

func NewUpdateWorkflowTaskLogColl() *UpdateWorkflowTaskLogColl {
name := UpdateWorkflowTaskLog{}.TableName()
return &UpdateWorkflowTaskLogColl{
Collection: mongotool.Database(config.MongoDatabase()).Collection(name),
coll: name,
}
}

func (c *UpdateWorkflowTaskLogColl) GetCollectionName() string {
return c.coll
}

func (c *UpdateWorkflowTaskLogColl) EnsureIndex(ctx context.Context) error {
return nil
}

func (c *UpdateWorkflowTaskLogColl) Create(args *UpdateWorkflowTaskLog) error {
if args == nil {
return errors.New("nil UpdateWorkflowTaskLog")
}

_, err := c.InsertOne(context.Background(), args)
return err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2023 The KodeRover Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package mongodb

import (
"context"

"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/mongo"

"github.com/koderover/zadig/v2/pkg/microservice/aslan/config"
mongotool "github.com/koderover/zadig/v2/pkg/tool/mongo"
)

type WaitPodFinishLogColl struct {
*mongo.Collection

coll string
}

type WaitPodFinishLog struct {
JobName string `bson:"job_name" json:"job_name"`
JobStatus string `bson:"job_status" json:"job_status"`
Namespace string `bson:"namespace" json:"namespace"`
PodName string `bson:"pod_name" json:"pod_name"`
Status string `bson:"status" json:"status"`
}

func (c WaitPodFinishLog) TableName() string {
return "wait_pod_finish_log"
}

func NewWaitPodFinishLogColl() *WaitPodFinishLogColl {
name := WaitPodFinishLog{}.TableName()
return &WaitPodFinishLogColl{
Collection: mongotool.Database(config.MongoDatabase()).Collection(name),
coll: name,
}
}

func (c *WaitPodFinishLogColl) GetCollectionName() string {
return c.coll
}

func (c *WaitPodFinishLogColl) EnsureIndex(ctx context.Context) error {
return nil
}

func (c *WaitPodFinishLogColl) Create(args *WaitPodFinishLog) error {
if args == nil {
return errors.New("nil WaitPodFinishLog")
}

_, err := c.InsertOne(context.Background(), args)
return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -836,10 +836,8 @@ func waitJobEndByCheckingConfigMap(ctx context.Context, taskTimeout <-chan time.
select {
case <-ctx.Done():
return config.StatusCancelled, ""

case <-taskTimeout:
return config.StatusTimeout, ""

default:
job, err := jobLister.Get(jobName)
if err != nil {
Expand All @@ -864,6 +862,15 @@ func waitJobEndByCheckingConfigMap(ctx context.Context, taskTimeout <-chan time.
return config.StatusFailed, errMsg
}
for _, pod := range pods {
commonrepo.NewWaitPodFinishLogColl().Create(
&commonrepo.WaitPodFinishLog{
Namespace: namespace,
PodName: pod.Name,
JobName: jobName,
JobStatus: job.Status.String(),
Status: string(pod.Status.Phase),
})

ipod := wrapper.Pod(pod)
if ipod.Pending() {
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,17 @@ func updateworkflowStatus(workflow *commonmodels.WorkflowTask) {
}

func (c *workflowCtl) updateWorkflowTask() {
c.workflowTaskMutex.Lock()
commonrepo.NewUpdateWorkflowTaskLogColl().Create(&commonrepo.UpdateWorkflowTaskLog{
WorkflowName: c.workflowTask.WorkflowName,
TaskID: c.workflowTask.TaskID,
StartTime: c.workflowTask.StartTime,
EndTime: c.workflowTask.EndTime,
Status: string(c.workflowTask.Status),
Data: c.workflowTask,
})
c.workflowTaskMutex.Unlock()

taskInColl, err := commonrepo.NewworkflowTaskv4Coll().Find(c.workflowTask.WorkflowName, c.workflowTask.TaskID)
if err != nil {
c.logger.Errorf("find workflow task v4 %s failed,error: %v", c.workflowTask.WorkflowName, err)
Expand Down

0 comments on commit 927cd23

Please sign in to comment.