Skip to content

Commit

Permalink
transfer leadership before removing the leader node (#289)
Browse files Browse the repository at this point in the history
  • Loading branch information
bschimke95 authored Apr 9, 2024
1 parent 3adb61e commit 0602a3c
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 20 deletions.
42 changes: 36 additions & 6 deletions src/k8s/pkg/client/dqlite/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package dqlite
import (
"context"
"fmt"
"time"

"github.com/canonical/k8s/pkg/utils/control"
)

func (c *Client) RemoveNodeByAddress(ctx context.Context, address string) error {
Expand All @@ -18,6 +21,7 @@ func (c *Client) RemoveNodeByAddress(ctx context.Context, address string) error
var (
memberExists, clusterHasOtherVoters bool
memberToRemove NodeInfo
freeSpareNode *NodeInfo
)
for _, member := range members {
switch {
Expand All @@ -27,21 +31,47 @@ func (c *Client) RemoveNodeByAddress(ctx context.Context, address string) error

case member.Address != address && member.Role == Voter:
clusterHasOtherVoters = true

case member.Address != address && member.Role == Spare:
// This is only used in a two node setup, where the leader node is removed.
// The free spare node will be promoted to leader before the existing leader is removed.
freeSpareNode = &member
}
}

if !memberExists {
return fmt.Errorf("cluster does not have a node with address %v", address)
}

// TODO: consider using client.Transfer() for a different node to become leader
if !clusterHasOtherVoters {
return fmt.Errorf("not removing node because there are no other voter members")
}
if freeSpareNode == nil {
// This normally should not happen. There should always be a backup node, except
// if one tries to remove the last node in the cluster.
return fmt.Errorf("cannot transfer dqlite leadership as there is no remaining spare node")
}

if err := client.Remove(ctx, memberToRemove.ID); err != nil {
return fmt.Errorf("failed to remove node %#v from dqlite cluster: %w", memberToRemove, err)
// Leadership can only be transfered to a voter or standby node.
// Therefore the remaining node in the cluster needs to be promoted first.
if err := client.Assign(ctx, freeSpareNode.ID, Voter); err != nil {
return fmt.Errorf("failed to assign voter role to %d: %w", freeSpareNode.ID, err)
}
// Transfer leadership to remaining node in cluster.
if err := client.Transfer(ctx, freeSpareNode.ID); err != nil {
return fmt.Errorf("failed to transfer leadership to %d: %w", freeSpareNode.ID, err)
}
// Recreate client to point to the new leader.
client, err = c.clientGetter(ctx)
if err != nil {
return fmt.Errorf("failed to create dqlite client: %w", err)
}
}

return nil
// Remove the node from the cluster. Retry as the leadership transfer might still be in progress.
// For a large database this might take some time.
return control.RetryFor(ctx, 10, 5*time.Second, func() error {
if err := client.Remove(ctx, memberToRemove.ID); err != nil {
return fmt.Errorf("failed to remove node %v from dqlite cluster: %w", memberToRemove, err)
}
return nil
})
}
20 changes: 14 additions & 6 deletions src/k8s/pkg/client/dqlite/remove_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestRemoveNodeByAddress(t *testing.T) {
})
})

t.Run("LastVoterFails", func(t *testing.T) {
t.Run("LastVoter", func(t *testing.T) {
withDqliteCluster(t, 2, func(ctx context.Context, dirs []string) {
g := NewWithT(t)
client, err := dqlite.NewClient(ctx, dqlite.ClientOpts{
Expand All @@ -48,17 +48,25 @@ func TestRemoveNodeByAddress(t *testing.T) {
g.Expect(err).To(BeNil())
g.Expect(members).To(HaveLen(2))

memberToRemove := members[0].Address
memberToRemove := members[0]
remainingNode := members[1]
if members[0].Role != dqlite.Voter {
memberToRemove = members[1].Address
memberToRemove = members[1]
remainingNode = members[0]
}
g.Expect(memberToRemove.Role).To(Equal(dqlite.Voter))
g.Expect(remainingNode.Role).To(Equal(dqlite.Spare))

// Removing the last Voter should fail
g.Expect(client.RemoveNodeByAddress(ctx, memberToRemove)).ToNot(BeNil())
// Removing the last voter should succeed and leadership should be transfered.
g.Expect(client.RemoveNodeByAddress(ctx, memberToRemove.Address)).To(Succeed())

members, err = client.ListMembers(ctx)
g.Expect(err).To(BeNil())
g.Expect(members).To(HaveLen(2))
g.Expect(members).To(HaveLen(1))
g.Expect(members[0].Role == dqlite.Voter)
g.Expect(members[0].Address).ToNot(Equal(memberToRemove.Address))

g.Expect(client.RemoveNodeByAddress(ctx, remainingNode.Address)).ToNot(Succeed())
})
})
}
6 changes: 6 additions & 0 deletions src/k8s/pkg/client/dqlite/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ type NodeInfo = client.NodeInfo

// Voter is the role for nodes that participate in the Raft quorum.
var Voter = client.Voter

// StandBy is the role for nodes that do not participate in quroum but replicate the database.
var StandBy = client.StandBy

// Spare is the role for nodes that do not participate in quroum and do not replicate the database.
var Spare = client.Spare
4 changes: 2 additions & 2 deletions src/k8s/pkg/component/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func UpdateGatewayComponent(ctx context.Context, s snap.Snap, isRefresh bool) er
// while we try to restart them, which fails with:
// the object has been modified; please apply your changes to the latest version and try again
attempts := 3
if err := control.RetryFor(attempts, func() error {
if err := control.RetryFor(ctx, attempts, 0, func() error {
if err := client.RestartDeployment(ctx, "cilium-operator", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium-operator deployment: %w", err)
}
Expand All @@ -57,7 +57,7 @@ func UpdateGatewayComponent(ctx context.Context, s snap.Snap, isRefresh bool) er
return fmt.Errorf("failed to restart cilium-operator deployment after %d attempts: %w", attempts, err)
}

if err := control.RetryFor(attempts, func() error {
if err := control.RetryFor(ctx, attempts, 0, func() error {
if err := client.RestartDaemonset(ctx, "cilium", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium daemonset: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions src/k8s/pkg/component/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func UpdateIngressComponent(ctx context.Context, s snap.Snap, isRefresh bool, de
}

attempts := 3
if err := control.RetryFor(attempts, func() error {
if err := control.RetryFor(ctx, attempts, 0, func() error {
if err := client.RestartDeployment(ctx, "cilium-operator", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium-operator deployment: %w", err)
}
Expand All @@ -46,7 +46,7 @@ func UpdateIngressComponent(ctx context.Context, s snap.Snap, isRefresh bool, de
return fmt.Errorf("failed to restart cilium-operator deployment after %d attempts: %w", attempts, err)
}

if err := control.RetryFor(attempts, func() error {
if err := control.RetryFor(ctx, attempts, 0, func() error {
if err := client.RestartDaemonset(ctx, "cilium", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium daemonset: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions src/k8s/pkg/component/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func UpdateLoadBalancerComponent(ctx context.Context, s snap.Snap, isRefresh boo
}

attempts := 3
if err := control.RetryFor(attempts, func() error {
if err := control.RetryFor(ctx, attempts, 0, func() error {
if err := client.RestartDeployment(ctx, "cilium-operator", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium-operator deployment: %w", err)
}
Expand All @@ -121,7 +121,7 @@ func UpdateLoadBalancerComponent(ctx context.Context, s snap.Snap, isRefresh boo
return fmt.Errorf("failed to restart cilium-operator deployment after %d attempts: %w", attempts, err)
}

if err := control.RetryFor(attempts, func() error {
if err := control.RetryFor(ctx, attempts, 0, func() error {
if err := client.RestartDaemonset(ctx, "cilium", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium daemonset: %w", err)
}
Expand Down
16 changes: 14 additions & 2 deletions src/k8s/pkg/utils/control/retry.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
package control

func RetryFor(retryCount int, retryFunc func() error) error {
import (
"context"
"time"
)

// RetryFor will retry a given function for the given amount of times.
// RetryFor will wait for backoff between retries.
func RetryFor(ctx context.Context, retryCount int, delayBetweenRetry time.Duration, retryFunc func() error) error {
var err error = nil
for i := 0; i < retryCount; i++ {
if err = retryFunc(); err != nil {
continue
select {
case <-ctx.Done():
return context.Canceled
case <-time.After(delayBetweenRetry):
continue
}
}
break
}
Expand Down
64 changes: 64 additions & 0 deletions src/k8s/pkg/utils/control/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package control

import (
"context"
"errors"
"testing"
"time"
)

func TestRetryFor(t *testing.T) {
t.Run("Retry succeeds", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

retryCount := 3
count := 0

err := RetryFor(ctx, retryCount, 50*time.Millisecond, func() error {
count++
if count < retryCount {
return errors.New("failed")
}
return nil
})

if err != nil {
t.Errorf("Expected nil error, got: %v", err)
}
if count != retryCount {
t.Errorf("Expected retry count %d, got: %d", retryCount, count)
}
})

t.Run("Retry fails with context cancellation", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

retryCount := 3

err := RetryFor(ctx, retryCount, time.Second, func() error {
time.Sleep(200 * time.Millisecond)
return errors.New("failed")
})

if err != context.Canceled {
t.Errorf("Expected context.Canceled error, got: %v", err)
}
})

t.Run("Retry exhausts without success", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

retryCount := 3

err := RetryFor(ctx, retryCount, 100*time.Millisecond, func() error {
return errors.New("failed")
})

if err == nil {
t.Error("Expected non-nil error, got nil")
}
})
}

0 comments on commit 0602a3c

Please sign in to comment.