Skip to content

Commit

Permalink
Enhance etcd lease to add peer url enabled status as an annotation (#530
Browse files Browse the repository at this point in the history
)

* fix for Issue#529

* fixed tests

* Update pkg/health/heartbeat/heartbeat.go

Co-authored-by: Tim Usner <[email protected]>

* resolved compilation error

* encorporated check recommendations

* addressed review comments

* addressed review comments

* corrected case for multi-node and added tests

* corrected peer url in integration test and added addition unit tests

* corrected integration test

* add error to the fatal log messages

* fixed indentation issue with yaml config file

* corrected initial-advertise-peer-urls

* corrected initial-advertise-peer-urls

Co-authored-by: Tim Usner <[email protected]>
  • Loading branch information
unmarshall and timuthy authored Sep 5, 2022
1 parent b68a54e commit 7590f01
Show file tree
Hide file tree
Showing 19 changed files with 388 additions and 132 deletions.
21 changes: 18 additions & 3 deletions pkg/health/heartbeat/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
const (
podName = "POD_NAME"
podNamespace = "POD_NAMESPACE"
// PeerURLTLSEnabledKey is the name of the annotation that will be added to the lease and will indicate whether TLS has been enabled for peer URL
PeerURLTLSEnabledKey = "member.etcd.gardener.cloud/tls-enabled"
)

// Heartbeat contains information to perform regular heart beats in a Kubernetes cluster.
Expand All @@ -46,10 +48,11 @@ type Heartbeat struct {
k8sClient client.Client
podName string
podNamespace string
metadata map[string]string // metadata is currently added as annotations to the k8s lease object
}

// NewHeartbeat returns the heartbeat object.
func NewHeartbeat(logger *logrus.Entry, etcdConfig *brtypes.EtcdConnectionConfig, clientSet client.Client) (*Heartbeat, error) {
func NewHeartbeat(logger *logrus.Entry, etcdConfig *brtypes.EtcdConnectionConfig, clientSet client.Client, metadata map[string]string) (*Heartbeat, error) {
if etcdConfig == nil {
return nil, &errors.EtcdError{
Message: "nil etcd config passed, can not create heartbeat",
Expand All @@ -74,6 +77,7 @@ func NewHeartbeat(logger *logrus.Entry, etcdConfig *brtypes.EtcdConnectionConfig
k8sClient: clientSet,
podName: memberName,
podNamespace: namespace,
metadata: metadata,
}, nil
}

Expand Down Expand Up @@ -129,6 +133,13 @@ func (hb *Heartbeat) RenewMemberLease(ctx context.Context) error {
renewedMemberLease.Spec.HolderIdentity = &memberID
renewedTime := time.Now()
renewedMemberLease.Spec.RenewTime = &metav1.MicroTime{Time: renewedTime}
// Update only keys from metadata
if renewedMemberLease.Annotations == nil {
renewedMemberLease.Annotations = map[string]string{}
}
for k, v := range hb.metadata {
renewedMemberLease.Annotations[k] = v
}

err = hb.k8sClient.Patch(ctx, renewedMemberLease, client.MergeFrom(memberLease))
if err != nil {
Expand Down Expand Up @@ -306,14 +317,18 @@ func DeltaSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, k8s
}

// RenewMemberLeasePeriodically has a timer and will periodically call RenewMemberLeases to renew the member lease until stopped
func RenewMemberLeasePeriodically(ctx context.Context, stopCh chan struct{}, hconfig *brtypes.HealthConfig, logger *logrus.Entry, etcdConfig *brtypes.EtcdConnectionConfig) error {
func RenewMemberLeasePeriodically(ctx context.Context, stopCh chan struct{}, hconfig *brtypes.HealthConfig, logger *logrus.Entry, etcdConfig *brtypes.EtcdConnectionConfig, peerURLTLSEnabled bool) error {

clientSet, err := miscellaneous.GetKubernetesClientSetOrError()
if err != nil {
return fmt.Errorf("failed to create clientset: %v", err)
}

hb, err := NewHeartbeat(logger, etcdConfig, clientSet)
metadata := map[string]string{
PeerURLTLSEnabledKey: strconv.FormatBool(peerURLTLSEnabled),
}

hb, err := NewHeartbeat(logger, etcdConfig, clientSet, metadata)
if err != nil {
return fmt.Errorf("failed to initialize new heartbeat: %v", err)
}
Expand Down
79 changes: 46 additions & 33 deletions pkg/health/heartbeat/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"
"time"

heartbeat "github.com/gardener/etcd-backup-restore/pkg/health/heartbeat"
"github.com/gardener/etcd-backup-restore/pkg/health/heartbeat"
"github.com/gardener/etcd-backup-restore/pkg/miscellaneous"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"
"github.com/gardener/etcd-backup-restore/pkg/wrappers"
Expand All @@ -18,47 +18,58 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"sigs.k8s.io/controller-runtime/pkg/client"
fake "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

var _ = Describe("Heartbeat", func() {
var (
etcdConnectionConfig *brtypes.EtcdConnectionConfig
metadata map[string]string
)

BeforeEach(func() {
etcdConnectionConfig = brtypes.NewEtcdConnectionConfig()
etcdConnectionConfig.Endpoints = []string{etcd.Clients[0].Addr().String()}
etcdConnectionConfig.ConnectionTimeout.Duration = 5 * time.Second
metadata = map[string]string{}
})

Describe("creating Heartbeat", func() {
BeforeEach(func() {
os.Setenv("POD_NAME", "test_pod")
os.Setenv("POD_NAMESPACE", "test_namespace")
Expect(os.Setenv("POD_NAME", "test_pod")).To(Succeed())
Expect(os.Setenv("POD_NAMESPACE", "test_namespace")).To(Succeed())
})
AfterEach(func() {
os.Unsetenv("POD_NAME")
os.Unsetenv("POD_NAMESPACE")
Expect(os.Unsetenv("POD_NAME")).To(Succeed())
Expect(os.Unsetenv("POD_NAMESPACE")).To(Succeed())
})
Context("With valid config", func() {
It("should not return error", func() {
_, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, miscellaneous.GetFakeKubernetesClientSet())
_, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, miscellaneous.GetFakeKubernetesClientSet(), metadata)
Expect(err).ShouldNot(HaveOccurred())
})
})
Context("With invalid etcdconnection config passed", func() {
It("should return error", func() {
_, err := heartbeat.NewHeartbeat(logger, nil, miscellaneous.GetFakeKubernetesClientSet())
_, err := heartbeat.NewHeartbeat(logger, nil, miscellaneous.GetFakeKubernetesClientSet(), metadata)
Expect(err).Should(HaveOccurred())
})
})
Context("With invalid clientset passed", func() {
It("should return error", func() {
_, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, nil)
_, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, nil, metadata)
Expect(err).Should(HaveOccurred())
})
})
Context("With valid config and metadata", func() {
BeforeEach(func() {
metadata[heartbeat.PeerURLTLSEnabledKey] = "true"
})
It("should not return error", func() {
_, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, miscellaneous.GetFakeKubernetesClientSet(), metadata)
Expect(err).ToNot(HaveOccurred())
})
})
})

Describe("Renewing snapshot lease", func() {
Expand All @@ -68,8 +79,8 @@ var _ = Describe("Heartbeat", func() {
)
Context("With valid full snapshot lease present", func() {
BeforeEach(func() {
os.Setenv("POD_NAME", "test_pod")
os.Setenv("POD_NAMESPACE", "test_namespace")
Expect(os.Setenv("POD_NAME", "test_pod")).To(Succeed())
Expect(os.Setenv("POD_NAMESPACE", "test_namespace")).To(Succeed())
k8sClientset = fake.NewClientBuilder().Build()
lease = &v1.Lease{
TypeMeta: metav1.TypeMeta{
Expand All @@ -83,8 +94,8 @@ var _ = Describe("Heartbeat", func() {
}
})
AfterEach(func() {
os.Unsetenv("POD_NAME")
os.Unsetenv("POD_NAMESPACE")
Expect(os.Unsetenv("POD_NAME")).To(Succeed())
Expect(os.Unsetenv("POD_NAMESPACE")).To(Succeed())
})
It("Should Correctly update holder identity of full snapshot lease", func() {
Expect(os.Getenv("POD_NAME")).Should(Equal("test_pod"))
Expand All @@ -104,10 +115,10 @@ var _ = Describe("Heartbeat", func() {
Expect(err).ShouldNot(HaveOccurred())

l := &v1.Lease{}
k8sClientset.Get(context.TODO(), client.ObjectKey{
Expect(k8sClientset.Get(context.TODO(), client.ObjectKey{
Namespace: lease.Namespace,
Name: lease.Name,
}, l)
}, l)).To(Succeed())

Expect(l.Spec.HolderIdentity).To(PointTo(Equal("989")))
Expect(err).ShouldNot(HaveOccurred())
Expand All @@ -119,7 +130,7 @@ var _ = Describe("Heartbeat", func() {
Expect(os.Getenv("POD_NAME")).Should(Equal("test_pod"))
Expect(os.Getenv("POD_NAMESPACE")).Should(Equal("test_namespace"))

k8sClientset.Create(context.TODO(), lease)
Expect(k8sClientset.Create(context.TODO(), lease)).To(Succeed())

err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, nil, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
Expect(err).Should(HaveOccurred())
Expand All @@ -130,8 +141,8 @@ var _ = Describe("Heartbeat", func() {
})
Context("With valid delta snapshot lease present", func() {
BeforeEach(func() {
os.Setenv("POD_NAME", "test_pod")
os.Setenv("POD_NAMESPACE", "test_namespace")
Expect(os.Setenv("POD_NAME", "test_pod")).To(Succeed())
Expect(os.Setenv("POD_NAMESPACE", "test_namespace")).To(Succeed())
k8sClientset = fake.NewClientBuilder().Build()
lease = &v1.Lease{
TypeMeta: metav1.TypeMeta{
Expand All @@ -145,8 +156,8 @@ var _ = Describe("Heartbeat", func() {
}
})
AfterEach(func() {
os.Unsetenv("POD_NAME")
os.Unsetenv("POD_NAMESPACE")
Expect(os.Unsetenv("POD_NAME")).To(Succeed())
Expect(os.Unsetenv("POD_NAMESPACE")).To(Succeed())
})
It("Should renew and correctly update holder identity of delta snapshot lease when delta snapshot list is passed", func() {
Expect(os.Getenv("POD_NAME")).Should(Equal("test_pod"))
Expand Down Expand Up @@ -251,8 +262,8 @@ var _ = Describe("Heartbeat", func() {
)
Context("With corresponding member lease present", func() {
BeforeEach(func() {
os.Setenv("POD_NAME", "test_pod")
os.Setenv("POD_NAMESPACE", "test_namespace")
Expect(os.Setenv("POD_NAME", "test_pod")).To(Succeed())
Expect(os.Setenv("POD_NAMESPACE", "test_namespace")).To(Succeed())
lease = &v1.Lease{
TypeMeta: metav1.TypeMeta{
Kind: "Lease",
Expand All @@ -273,22 +284,23 @@ var _ = Describe("Heartbeat", func() {
Namespace: os.Getenv("POD_NAMESPACE"),
},
}
metadata = map[string]string{heartbeat.PeerURLTLSEnabledKey: "true"}
})
AfterEach(func() {
os.Unsetenv("POD_NAME")
os.Unsetenv("POD_NAMESPACE")
Expect(os.Unsetenv("POD_NAME")).To(Succeed())
Expect(os.Unsetenv("POD_NAMESPACE")).To(Succeed())
})
It("Should correctly update the member lease", func() {
clientSet := miscellaneous.GetFakeKubernetesClientSet()
heartbeat, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, clientSet)
hb, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, clientSet, metadata)
Expect(err).ShouldNot(HaveOccurred())

err = clientSet.Create(context.TODO(), lease)
Expect(err).ShouldNot(HaveOccurred())
err = clientSet.Create(context.TODO(), pod)
Expect(err).ShouldNot(HaveOccurred())

err = heartbeat.RenewMemberLease(context.TODO())
err = hb.RenewMemberLease(context.TODO())
Expect(err).ShouldNot(HaveOccurred())

l := &v1.Lease{}
Expand All @@ -298,7 +310,8 @@ var _ = Describe("Heartbeat", func() {
}, l)
Expect(err).ShouldNot(HaveOccurred())
Expect(l.Spec.HolderIdentity).ToNot(BeNil())

Expect(l.Annotations).ToNot(BeEmpty())
Expect(l.Annotations[heartbeat.PeerURLTLSEnabledKey]).To(Equal("true"))
err = clientSet.Delete(context.TODO(), l)
Expect(err).ShouldNot(HaveOccurred())
err = clientSet.Delete(context.TODO(), pod)
Expand All @@ -307,15 +320,15 @@ var _ = Describe("Heartbeat", func() {
})
Context("With corresponding member lease not present", func() {
BeforeEach(func() {
os.Setenv("POD_NAME", "test_pod")
os.Setenv("POD_NAMESPACE", "test_namespace")
Expect(os.Setenv("POD_NAME", "test_pod")).To(Succeed())
Expect(os.Setenv("POD_NAMESPACE", "test_namespace")).To(Succeed())
})
AfterEach(func() {
os.Unsetenv("POD_NAME")
os.Unsetenv("POD_NAMESPACE")
Expect(os.Unsetenv("POD_NAME")).To(Succeed())
Expect(os.Unsetenv("POD_NAMESPACE")).To(Succeed())
})
It("Should return an error", func() {
heartbeat, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, miscellaneous.GetFakeKubernetesClientSet())
heartbeat, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, miscellaneous.GetFakeKubernetesClientSet(), metadata)
Expect(err).ShouldNot(HaveOccurred())

err = heartbeat.RenewMemberLease(context.TODO())
Expand All @@ -339,7 +352,7 @@ var _ = Describe("Heartbeat", func() {
})
Context("With fail to create clientset", func() {
It("Should return an error", func() {
err := heartbeat.RenewMemberLeasePeriodically(testCtx, mmStopCh, hConfig, logger, etcdConnectionConfig)
err := heartbeat.RenewMemberLeasePeriodically(testCtx, mmStopCh, hConfig, logger, etcdConnectionConfig, true)
Expect(err).Should(HaveOccurred())
})
})
Expand Down
18 changes: 9 additions & 9 deletions pkg/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ import (
)

// Initialize has the following steps:
// * Check if data directory exists.
// - If data directory exists
// * Check for data corruption.
// - If data directory is in corrupted state, clear the data directory.
// - If data directory does not exist.
// * Check if Latest snapshot available.
// - Try to perform an Etcd data restoration from the latest snapshot.
// - No snapshots are available, start etcd as a fresh installation.
// - Check if data directory exists.
// - If data directory exists
// - Check for data corruption.
// - If data directory is in corrupted state, clear the data directory.
// - If data directory does not exist.
// - Check if Latest snapshot available.
// - Try to perform an Etcd data restoration from the latest snapshot.
// - No snapshots are available, start etcd as a fresh installation.
func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int64) error {
metrics.CurrentClusterSize.With(prometheus.Labels{}).Set(float64(e.Validator.OriginalClusterSize))
start := time.Now()
Expand Down Expand Up @@ -113,7 +113,7 @@ func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int6
return nil
}

//NewInitializer creates an etcd initializer object.
// NewInitializer creates an etcd initializer object.
func NewInitializer(options *brtypes.RestoreOptions, snapstoreConfig *brtypes.SnapstoreConfig, etcdConnectionConfig *brtypes.EtcdConnectionConfig, logger *logrus.Logger) *EtcdInitializer {
zapLogger, _ := zap.NewProduction()
etcdInit := &EtcdInitializer{
Expand Down
2 changes: 1 addition & 1 deletion pkg/initializer/validator/datavalidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
2 changes: 1 addition & 1 deletion pkg/leaderelection/leaderelection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
Loading

0 comments on commit 7590f01

Please sign in to comment.