Skip to content

Commit

Permalink
fix: fix when scale replicant pod to 0, core pod can not ready
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Jan 10, 2024
1 parent dd376af commit 818e5a4
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 88 deletions.
4 changes: 2 additions & 2 deletions apis/apps/v2beta1/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type EMQXStatus struct {
CoreNodes []EMQXNode `json:"coreNodes,omitempty"`
CoreNodesStatus EMQXNodesStatus `json:"coreNodesStatus,omitempty"`

ReplicantNodes []EMQXNode `json:"replicantNodes,omitempty"`
ReplicantNodesStatus *EMQXNodesStatus `json:"replicantNodesStatus,omitempty"`
ReplicantNodes []EMQXNode `json:"replicantNodes,omitempty"`
ReplicantNodesStatus EMQXNodesStatus `json:"replicantNodesStatus,omitempty"`

NodeEvacuationsStatus []NodeEvacuationStatus `json:"nodEvacuationsStatus,omitempty"`
}
Expand Down
6 changes: 1 addition & 5 deletions apis/apps/v2beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion controllers/apps/v2beta1/add_emqx_repl_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var _ = Describe("Check add repl controller", Ordered, Label("repl"), func() {
},
}
instance.Status = appsv2beta1.EMQXStatus{
ReplicantNodesStatus: &appsv2beta1.EMQXNodesStatus{
ReplicantNodesStatus: appsv2beta1.EMQXNodesStatus{
Replicas: 3,
},
Conditions: []metav1.Condition{
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/v2beta1/add_emqx_repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestGetNewReplicaSet(t *testing.T) {
Replicas: pointer.Int32(3),
},
}
instance.Status.ReplicantNodesStatus = &appsv2beta1.EMQXNodesStatus{
instance.Status.ReplicantNodesStatus = appsv2beta1.EMQXNodesStatus{
CollisionCount: pointer.Int32(0),
}

Expand Down
4 changes: 2 additions & 2 deletions controllers/apps/v2beta1/status_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *coreNodesProgressingStatus) nextStatus(ctx context.Context) {
emqx := s.emqxStatusMachine.GetEMQX()

updateSts, _, _ := getStateFulSetList(ctx, s.emqxStatusMachine.client, emqx)
if updateSts != nil && updateSts.Status.ReadyReplicas == emqx.Status.CoreNodesStatus.Replicas {
if updateSts != nil && updateSts.Status.ReadyReplicas != 0 && updateSts.Status.ReadyReplicas == emqx.Status.CoreNodesStatus.Replicas {
emqx.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.CoreNodesReady,
Status: metav1.ConditionTrue,
Expand Down Expand Up @@ -191,7 +191,7 @@ func (s *replicantNodesProgressingStatus) nextStatus(ctx context.Context) {
}

updateRs, _, _ := getReplicaSetList(ctx, s.emqxStatusMachine.client, emqx)
if updateRs != nil && updateRs.Status.ReadyReplicas == emqx.Status.ReplicantNodesStatus.Replicas {
if updateRs != nil && updateRs.Status.ReadyReplicas != 0 && updateRs.Status.ReadyReplicas == emqx.Status.ReplicantNodesStatus.Replicas {
emqx.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.ReplicantNodesReady,
Status: metav1.ConditionTrue,
Expand Down
1 change: 0 additions & 1 deletion controllers/apps/v2beta1/sync_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func (s *syncPods) reconcile(ctx context.Context, logger logr.Logger, instance *
}
}
}

} else {
if updateSts != nil {
for _, node := range instance.Status.CoreNodes {
Expand Down
4 changes: 2 additions & 2 deletions controllers/apps/v2beta1/sync_pods_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var _ = Describe("Check sync pods controller", Ordered, Label("node"), func() {
ReadyReplicas: 2,
Replicas: 1,
},
ReplicantNodesStatus: &appsv2beta1.EMQXNodesStatus{
ReplicantNodesStatus: appsv2beta1.EMQXNodesStatus{
UpdateRevision: "update",
UpdateReplicas: 1,
CurrentRevision: "current",
Expand Down Expand Up @@ -334,7 +334,7 @@ var _ = Describe("check can be scale down", func() {
Replicas: pointer.Int32Ptr(3),
},
}
instance.Status.ReplicantNodesStatus = &appsv2beta1.EMQXNodesStatus{
instance.Status.ReplicantNodesStatus = appsv2beta1.EMQXNodesStatus{
UpdateRevision: "update",
CurrentRevision: "current",
}
Expand Down
78 changes: 39 additions & 39 deletions controllers/apps/v2beta1/update_emqx_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,51 @@ type updateStatus struct {
}

func (u *updateStatus) reconcile(ctx context.Context, logger logr.Logger, instance *appsv2beta1.EMQX, r innerReq.RequesterInterface) subResult {
if instance.Spec.ReplicantTemplate != nil && instance.Status.ReplicantNodesStatus == nil {
instance.Status.ReplicantNodesStatus = &appsv2beta1.EMQXNodesStatus{
Replicas: *instance.Spec.ReplicantTemplate.Spec.Replicas,
}
instance.Status.CoreNodesStatus.Replicas = *instance.Spec.CoreTemplate.Spec.Replicas
if instance.Status.CoreNodesStatus.UpdateRevision != "" && instance.Status.CoreNodesStatus.CurrentRevision == "" {
instance.Status.CoreNodesStatus.CurrentRevision = instance.Status.CoreNodesStatus.UpdateRevision
}

updateRs, currentRs, oldRsList := getReplicaSetList(ctx, u.Client, instance)
if updateRs != nil {
if currentRs == nil || currentRs.Status.Replicas == 0 {
instance.Status.ReplicantNodesStatus.Replicas = *instance.Spec.ReplicantTemplate.Spec.Replicas
if instance.Status.ReplicantNodesStatus.UpdateRevision != "" && instance.Status.ReplicantNodesStatus.CurrentRevision == "" {
instance.Status.ReplicantNodesStatus.CurrentRevision = instance.Status.ReplicantNodesStatus.UpdateRevision
}

updateSts, currentSts, oldStsList := getStateFulSetList(ctx, u.Client, instance)
if updateSts != nil && updateSts.UID != currentSts.UID {
if currentSts.Status.Replicas == 0 {
var i int
for i = 0; i < len(oldRsList); i++ {
if oldRsList[i].Status.Replicas > 0 {
currentRs = oldRsList[i]
for i = 0; i < len(oldStsList); i++ {
if oldStsList[i].Status.Replicas > 0 {
currentSts = oldStsList[i]
break
}
}
if i == len(oldRsList) {
currentRs = updateRs
if i == len(oldStsList) {
currentSts = updateSts
}
instance.Status.ReplicantNodesStatus.CurrentRevision = currentRs.Labels[appsv2beta1.LabelsPodTemplateHashKey]
instance.Status.CoreNodesStatus.CurrentRevision = currentSts.Labels[appsv2beta1.LabelsPodTemplateHashKey]
if err := u.Client.Status().Update(ctx, instance); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update status")}
}
return subResult{}
}
}

updateSts, currentSts, oldStsList := getStateFulSetList(ctx, u.Client, instance)
if updateSts != nil {
if currentSts == nil || currentSts.Status.Replicas == 0 {
updateRs, currentRs, oldRsList := getReplicaSetList(ctx, u.Client, instance)
if updateRs != nil && updateRs.UID != currentRs.UID {
if currentRs.Status.Replicas == 0 {
var i int
for i = 0; i < len(oldStsList); i++ {
if oldStsList[i].Status.Replicas > 0 {
currentSts = oldStsList[i]
for i = 0; i < len(oldRsList); i++ {
if oldRsList[i].Status.Replicas > 0 {
currentRs = oldRsList[i]
break
}
}
if i == len(oldStsList) {
currentSts = updateSts
if i == len(oldRsList) {
currentRs = updateRs
}
instance.Status.CoreNodesStatus.CurrentRevision = currentSts.Labels[appsv2beta1.LabelsPodTemplateHashKey]
instance.Status.ReplicantNodesStatus.CurrentRevision = currentRs.Labels[appsv2beta1.LabelsPodTemplateHashKey]
if err := u.Client.Status().Update(ctx, instance); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update status")}
}
Expand All @@ -80,7 +84,6 @@ func (u *updateStatus) reconcile(ctx context.Context, logger logr.Logger, instan
}

instance.Status.CoreNodes = coreNodes
instance.Status.CoreNodesStatus.Replicas = *instance.Spec.CoreTemplate.Spec.Replicas
instance.Status.CoreNodesStatus.ReadyReplicas = 0
instance.Status.CoreNodesStatus.CurrentReplicas = 0
instance.Status.CoreNodesStatus.UpdateReplicas = 0
Expand All @@ -96,22 +99,19 @@ func (u *updateStatus) reconcile(ctx context.Context, logger logr.Logger, instan
}
}

if len(replNodes) > 0 {
instance.Status.ReplicantNodes = replNodes
instance.Status.ReplicantNodesStatus.Replicas = *instance.Spec.ReplicantTemplate.Spec.Replicas
instance.Status.ReplicantNodesStatus.ReadyReplicas = 0
instance.Status.ReplicantNodesStatus.CurrentReplicas = 0
instance.Status.ReplicantNodesStatus.UpdateReplicas = 0
for _, node := range replNodes {
if node.NodeStatus == "running" {
instance.Status.ReplicantNodesStatus.ReadyReplicas++
}
if currentRs != nil && node.ControllerUID == currentRs.UID {
instance.Status.ReplicantNodesStatus.CurrentReplicas++
}
if updateRs != nil && node.ControllerUID == updateRs.UID {
instance.Status.ReplicantNodesStatus.UpdateReplicas++
}
instance.Status.ReplicantNodes = replNodes
instance.Status.ReplicantNodesStatus.ReadyReplicas = 0
instance.Status.ReplicantNodesStatus.CurrentReplicas = 0
instance.Status.ReplicantNodesStatus.UpdateReplicas = 0
for _, node := range replNodes {
if node.NodeStatus == "running" {
instance.Status.ReplicantNodesStatus.ReadyReplicas++
}
if currentRs != nil && node.ControllerUID == currentRs.UID {
instance.Status.ReplicantNodesStatus.CurrentReplicas++
}
if updateRs != nil && node.ControllerUID == updateRs.UID {
instance.Status.ReplicantNodesStatus.UpdateReplicas++
}
}

Expand Down
45 changes: 24 additions & 21 deletions controllers/apps/v2beta1/update_pod_conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,40 +38,43 @@ func (u *updatePodConditions) reconcile(ctx context.Context, logger logr.Logger,
}

onServingCondition := corev1.PodCondition{
Type: appsv2beta1.PodOnServing,
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Type: appsv2beta1.PodOnServing,
}
for _, condition := range pod.Status.Conditions {
if condition.Type == appsv2beta1.PodOnServing {
onServingCondition.Status = condition.Status
onServingCondition.LastTransitionTime = condition.LastTransitionTime
}
}

if instance.Status.IsConditionTrue(appsv2beta1.Available) {
if (updateSts != nil && controllerRef.UID == updateSts.UID) ||
(updateRs != nil && controllerRef.UID == updateRs.UID) {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.ContainersReady && condition.Status == corev1.ConditionTrue {
onServingCondition.Status = u.checkInCluster(instance, r, pod)
break
switch controllerRef.UID {
case updateSts.UID, updateRs.UID:
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.ContainersReady && condition.Status == corev1.ConditionTrue {
status := u.checkInCluster(instance, r, pod)
if status != onServingCondition.Status {
onServingCondition.Status = status
onServingCondition.LastTransitionTime = metav1.Now()
}
break
}
}
} else {
if (currentSts != nil && controllerRef.UID == currentSts.UID) ||
(currentRs != nil && controllerRef.UID == currentRs.UID) ||
(updateSts != nil && controllerRef.UID == updateSts.UID) ||
(updateRs != nil && controllerRef.UID == updateRs.UID) {
case currentSts.UID, currentRs.UID:
// When available condition is true, need clean currentSts / currentRs pod
if instance.Status.IsConditionTrue(appsv2beta1.Available) {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.ContainersReady && condition.Status == corev1.ConditionTrue {
onServingCondition.Status = u.checkInCluster(instance, r, pod)
status := corev1.ConditionFalse
if status != onServingCondition.Status {
onServingCondition.Status = status
onServingCondition.LastTransitionTime = metav1.Now()
}
break
}
}
}
}

for _, condition := range pod.Status.Conditions {
if condition.Type == appsv2beta1.PodOnServing && condition.Status == onServingCondition.Status {
onServingCondition.LastTransitionTime = condition.LastTransitionTime
}
}
patchBytes, _ := json.Marshal(corev1.Pod{
Status: corev1.PodStatus{
Conditions: []corev1.PodCondition{onServingCondition},
Expand Down
7 changes: 0 additions & 7 deletions controllers/apps/v2beta1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,6 @@ func getReplicaSetList(ctx context.Context, k8sClient client.Client, instance *a
client.InNamespace(instance.Namespace),
client.MatchingLabels(labels),
)
if instance.Spec.ReplicantTemplate == nil {
for _, rs := range list.Items {
oldRsList = append(oldRsList, rs.DeepCopy())
}
sort.Sort(ReplicaSetsByCreationTimestamp(oldRsList))
return
}

for _, rs := range list.Items {
if hash, ok := rs.Labels[appsv2beta1.LabelsPodTemplateHashKey]; ok {
Expand Down
14 changes: 7 additions & 7 deletions e2e/v2beta1/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
WithTransform(func(instance *appsv2beta1.EMQX) []appsv2beta1.EMQXNode {
return instance.Status.ReplicantNodes
}, BeNil()),
WithTransform(func(instance *appsv2beta1.EMQX) *appsv2beta1.EMQXNodesStatus {
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.ReplicantNodesStatus
}, BeNil()),
),
Expand Down Expand Up @@ -130,7 +130,7 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
WithTransform(func(instance *appsv2beta1.EMQX) []appsv2beta1.EMQXNode {
return instance.Status.ReplicantNodes
}, BeNil()),
WithTransform(func(instance *appsv2beta1.EMQX) *appsv2beta1.EMQXNodesStatus {
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.ReplicantNodesStatus
}, BeNil()),
),
Expand Down Expand Up @@ -180,7 +180,7 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
WithTransform(func(instance *appsv2beta1.EMQX) []appsv2beta1.EMQXNode {
return instance.Status.ReplicantNodes
}, BeNil()),
WithTransform(func(instance *appsv2beta1.EMQX) *appsv2beta1.EMQXNodesStatus {
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.ReplicantNodesStatus
}, BeNil()),
),
Expand Down Expand Up @@ -220,7 +220,7 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
HaveField("CurrentRevision", Not(Equal(storage.Status.CoreNodesStatus.CurrentRevision))),
HaveField("UpdateRevision", Not(Equal(storage.Status.CoreNodesStatus.CurrentRevision))),
)),
WithTransform(func(instance *appsv2beta1.EMQX) *appsv2beta1.EMQXNodesStatus {
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.ReplicantNodesStatus
}, BeNil()),
),
Expand Down Expand Up @@ -332,7 +332,7 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
HaveField("UpdateRevision", Not(BeEmpty())),
HaveField("UpdateReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
)),
WithTransform(func(instance *appsv2beta1.EMQX) *appsv2beta1.EMQXNodesStatus {
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.ReplicantNodesStatus
}, And(
HaveField("Replicas", Equal(int32(*instance.Spec.ReplicantTemplate.Spec.Replicas))),
Expand Down Expand Up @@ -389,7 +389,7 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
WithTransform(func(instance *appsv2beta1.EMQX) []appsv2beta1.EMQXNode {
return instance.Status.ReplicantNodes
}, HaveLen(int(*instance.Spec.ReplicantTemplate.Spec.Replicas))),
WithTransform(func(instance *appsv2beta1.EMQX) *appsv2beta1.EMQXNodesStatus {
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.ReplicantNodesStatus
}, And(
HaveField("Replicas", Equal(int32(*instance.Spec.ReplicantTemplate.Spec.Replicas))),
Expand Down Expand Up @@ -436,7 +436,7 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
HaveField("CurrentRevision", Not(Equal(storage.Status.CoreNodesStatus.CurrentRevision))),
HaveField("UpdateRevision", Not(Equal(storage.Status.CoreNodesStatus.CurrentRevision))),
)),
WithTransform(func(instance *appsv2beta1.EMQX) *appsv2beta1.EMQXNodesStatus {
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.ReplicantNodesStatus
}, And(
HaveField("CurrentRevision", Not(Equal(storage.Status.ReplicantNodesStatus.CurrentRevision))),
Expand Down

0 comments on commit 818e5a4

Please sign in to comment.