diff --git a/pkg/microservice/aslan/core/common/repository/mongodb/update_workflow_task_log.go b/pkg/microservice/aslan/core/common/repository/mongodb/update_workflow_task_log.go new file mode 100644 index 0000000000..a7c1fab40a --- /dev/null +++ b/pkg/microservice/aslan/core/common/repository/mongodb/update_workflow_task_log.go @@ -0,0 +1,71 @@ +/* + * Copyright 2023 The KodeRover Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mongodb + +import ( + "context" + + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/mongo" + + "github.com/koderover/zadig/v2/pkg/microservice/aslan/config" + mongotool "github.com/koderover/zadig/v2/pkg/tool/mongo" +) + +type UpdateWorkflowTaskLogColl struct { + *mongo.Collection + + coll string +} + +type UpdateWorkflowTaskLog struct { + WorkflowName string `bson:"workflow_name" json:"workflow_name"` + TaskID int64 `bson:"task_id" json:"task_id"` + StartTime int64 `bson:"start_time" json:"start_time"` + EndTime int64 `bson:"end_time" json:"end_time"` + Status string `bson:"status" json:"status"` + Data interface{} `bson:"data" json:"data"` +} + +func (c UpdateWorkflowTaskLog) TableName() string { + return "update_workflow_task_log" +} + +func NewUpdateWorkflowTaskLogColl() *UpdateWorkflowTaskLogColl { + name := UpdateWorkflowTaskLog{}.TableName() + return &UpdateWorkflowTaskLogColl{ + Collection: mongotool.Database(config.MongoDatabase()).Collection(name), + coll: name, + } +} + +func (c *UpdateWorkflowTaskLogColl) GetCollectionName() string { + return c.coll +} + +func (c *UpdateWorkflowTaskLogColl) EnsureIndex(ctx context.Context) error { + return nil +} + +func (c *UpdateWorkflowTaskLogColl) Create(args *UpdateWorkflowTaskLog) error { + if args == nil { + return errors.New("nil UpdateWorkflowTaskLog") + } + + _, err := c.InsertOne(context.Background(), args) + return err +} diff --git a/pkg/microservice/aslan/core/common/repository/mongodb/wait_pod_finish_log.go b/pkg/microservice/aslan/core/common/repository/mongodb/wait_pod_finish_log.go new file mode 100644 index 0000000000..19d377ecdc --- /dev/null +++ b/pkg/microservice/aslan/core/common/repository/mongodb/wait_pod_finish_log.go @@ -0,0 +1,70 @@ +/* + * Copyright 2023 The KodeRover Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mongodb + +import ( + "context" + + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/mongo" + + "github.com/koderover/zadig/v2/pkg/microservice/aslan/config" + mongotool "github.com/koderover/zadig/v2/pkg/tool/mongo" +) + +type WaitPodFinishLogColl struct { + *mongo.Collection + + coll string +} + +type WaitPodFinishLog struct { + JobName string `bson:"job_name" json:"job_name"` + JobStatus string `bson:"job_status" json:"job_status"` + Namespace string `bson:"namespace" json:"namespace"` + PodName string `bson:"pod_name" json:"pod_name"` + Status string `bson:"status" json:"status"` +} + +func (c WaitPodFinishLog) TableName() string { + return "wait_pod_finish_log" +} + +func NewWaitPodFinishLogColl() *WaitPodFinishLogColl { + name := WaitPodFinishLog{}.TableName() + return &WaitPodFinishLogColl{ + Collection: mongotool.Database(config.MongoDatabase()).Collection(name), + coll: name, + } +} + +func (c *WaitPodFinishLogColl) GetCollectionName() string { + return c.coll +} + +func (c *WaitPodFinishLogColl) EnsureIndex(ctx context.Context) error { + return nil +} + +func (c *WaitPodFinishLogColl) Create(args *WaitPodFinishLog) error { + if args == nil { + return errors.New("nil WaitPodFinishLog") + } + + _, err := c.InsertOne(context.Background(), args) + return err +} diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go index 70f5407968..419ca2f2c2 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go @@ -836,10 +836,8 @@ func waitJobEndByCheckingConfigMap(ctx context.Context, taskTimeout <-chan time. select { case <-ctx.Done(): return config.StatusCancelled, "" - case <-taskTimeout: return config.StatusTimeout, "" - default: job, err := jobLister.Get(jobName) if err != nil { @@ -864,6 +862,15 @@ func waitJobEndByCheckingConfigMap(ctx context.Context, taskTimeout <-chan time. return config.StatusFailed, errMsg } for _, pod := range pods { + commonrepo.NewWaitPodFinishLogColl().Create( + &commonrepo.WaitPodFinishLog{ + Namespace: namespace, + PodName: pod.Name, + JobName: jobName, + JobStatus: job.Status.String(), + Status: string(pod.Status.Phase), + }) + ipod := wrapper.Pod(pod) if ipod.Pending() { continue diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/workflow.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/workflow.go index 557f676624..e8a69fc619 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/workflow.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/workflow.go @@ -549,6 +549,17 @@ func updateworkflowStatus(workflow *commonmodels.WorkflowTask) { } func (c *workflowCtl) updateWorkflowTask() { + c.workflowTaskMutex.Lock() + commonrepo.NewUpdateWorkflowTaskLogColl().Create(&commonrepo.UpdateWorkflowTaskLog{ + WorkflowName: c.workflowTask.WorkflowName, + TaskID: c.workflowTask.TaskID, + StartTime: c.workflowTask.StartTime, + EndTime: c.workflowTask.EndTime, + Status: string(c.workflowTask.Status), + Data: c.workflowTask, + }) + c.workflowTaskMutex.Unlock() + taskInColl, err := commonrepo.NewworkflowTaskv4Coll().Find(c.workflowTask.WorkflowName, c.workflowTask.TaskID) if err != nil { c.logger.Errorf("find workflow task v4 %s failed,error: %v", c.workflowTask.WorkflowName, err)