diff --git a/pkg/health/heartbeat/heartbeat.go b/pkg/health/heartbeat/heartbeat.go index bd64bf588..78347eb51 100644 --- a/pkg/health/heartbeat/heartbeat.go +++ b/pkg/health/heartbeat/heartbeat.go @@ -36,6 +36,8 @@ import ( const ( podName = "POD_NAME" podNamespace = "POD_NAMESPACE" + // PeerURLTLSEnabledKey is the name of the annotation that will be added to the lease and will indicate whether TLS has been enabled for peer URL + PeerURLTLSEnabledKey = "member.etcd.gardener.cloud/tls-enabled" ) // Heartbeat contains information to perform regular heart beats in a Kubernetes cluster. @@ -46,10 +48,11 @@ type Heartbeat struct { k8sClient client.Client podName string podNamespace string + metadata map[string]string // metadata is currently added as annotations to the k8s lease object } // NewHeartbeat returns the heartbeat object. -func NewHeartbeat(logger *logrus.Entry, etcdConfig *brtypes.EtcdConnectionConfig, clientSet client.Client) (*Heartbeat, error) { +func NewHeartbeat(logger *logrus.Entry, etcdConfig *brtypes.EtcdConnectionConfig, clientSet client.Client, metadata map[string]string) (*Heartbeat, error) { if etcdConfig == nil { return nil, &errors.EtcdError{ Message: "nil etcd config passed, can not create heartbeat", @@ -74,6 +77,7 @@ func NewHeartbeat(logger *logrus.Entry, etcdConfig *brtypes.EtcdConnectionConfig k8sClient: clientSet, podName: memberName, podNamespace: namespace, + metadata: metadata, }, nil } @@ -129,6 +133,13 @@ func (hb *Heartbeat) RenewMemberLease(ctx context.Context) error { renewedMemberLease.Spec.HolderIdentity = &memberID renewedTime := time.Now() renewedMemberLease.Spec.RenewTime = &metav1.MicroTime{Time: renewedTime} + // Update only keys from metadata + if renewedMemberLease.Annotations == nil { + renewedMemberLease.Annotations = map[string]string{} + } + for k, v := range hb.metadata { + renewedMemberLease.Annotations[k] = v + } err = hb.k8sClient.Patch(ctx, renewedMemberLease, client.MergeFrom(memberLease)) if err != nil { @@ -306,14 +317,18 @@ func DeltaSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, k8s } // RenewMemberLeasePeriodically has a timer and will periodically call RenewMemberLeases to renew the member lease until stopped -func RenewMemberLeasePeriodically(ctx context.Context, stopCh chan struct{}, hconfig *brtypes.HealthConfig, logger *logrus.Entry, etcdConfig *brtypes.EtcdConnectionConfig) error { +func RenewMemberLeasePeriodically(ctx context.Context, stopCh chan struct{}, hconfig *brtypes.HealthConfig, logger *logrus.Entry, etcdConfig *brtypes.EtcdConnectionConfig, peerURLTLSEnabled bool) error { clientSet, err := miscellaneous.GetKubernetesClientSetOrError() if err != nil { return fmt.Errorf("failed to create clientset: %v", err) } - hb, err := NewHeartbeat(logger, etcdConfig, clientSet) + metadata := map[string]string{ + PeerURLTLSEnabledKey: strconv.FormatBool(peerURLTLSEnabled), + } + + hb, err := NewHeartbeat(logger, etcdConfig, clientSet, metadata) if err != nil { return fmt.Errorf("failed to initialize new heartbeat: %v", err) } diff --git a/pkg/health/heartbeat/heartbeat_test.go b/pkg/health/heartbeat/heartbeat_test.go index d90d8c1e7..6d0cf5e4c 100644 --- a/pkg/health/heartbeat/heartbeat_test.go +++ b/pkg/health/heartbeat/heartbeat_test.go @@ -5,7 +5,7 @@ import ( "os" "time" - heartbeat "github.com/gardener/etcd-backup-restore/pkg/health/heartbeat" + "github.com/gardener/etcd-backup-restore/pkg/health/heartbeat" "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/gardener/etcd-backup-restore/pkg/wrappers" @@ -18,47 +18,58 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" - fake "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/fake" ) var _ = Describe("Heartbeat", func() { var ( etcdConnectionConfig *brtypes.EtcdConnectionConfig + metadata map[string]string ) BeforeEach(func() { etcdConnectionConfig = brtypes.NewEtcdConnectionConfig() etcdConnectionConfig.Endpoints = []string{etcd.Clients[0].Addr().String()} etcdConnectionConfig.ConnectionTimeout.Duration = 5 * time.Second + metadata = map[string]string{} }) Describe("creating Heartbeat", func() { BeforeEach(func() { - os.Setenv("POD_NAME", "test_pod") - os.Setenv("POD_NAMESPACE", "test_namespace") + Expect(os.Setenv("POD_NAME", "test_pod")).To(Succeed()) + Expect(os.Setenv("POD_NAMESPACE", "test_namespace")).To(Succeed()) }) AfterEach(func() { - os.Unsetenv("POD_NAME") - os.Unsetenv("POD_NAMESPACE") + Expect(os.Unsetenv("POD_NAME")).To(Succeed()) + Expect(os.Unsetenv("POD_NAMESPACE")).To(Succeed()) }) Context("With valid config", func() { It("should not return error", func() { - _, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, miscellaneous.GetFakeKubernetesClientSet()) + _, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, miscellaneous.GetFakeKubernetesClientSet(), metadata) Expect(err).ShouldNot(HaveOccurred()) }) }) Context("With invalid etcdconnection config passed", func() { It("should return error", func() { - _, err := heartbeat.NewHeartbeat(logger, nil, miscellaneous.GetFakeKubernetesClientSet()) + _, err := heartbeat.NewHeartbeat(logger, nil, miscellaneous.GetFakeKubernetesClientSet(), metadata) Expect(err).Should(HaveOccurred()) }) }) Context("With invalid clientset passed", func() { It("should return error", func() { - _, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, nil) + _, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, nil, metadata) Expect(err).Should(HaveOccurred()) }) }) + Context("With valid config and metadata", func() { + BeforeEach(func() { + metadata[heartbeat.PeerURLTLSEnabledKey] = "true" + }) + It("should not return error", func() { + _, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, miscellaneous.GetFakeKubernetesClientSet(), metadata) + Expect(err).ToNot(HaveOccurred()) + }) + }) }) Describe("Renewing snapshot lease", func() { @@ -68,8 +79,8 @@ var _ = Describe("Heartbeat", func() { ) Context("With valid full snapshot lease present", func() { BeforeEach(func() { - os.Setenv("POD_NAME", "test_pod") - os.Setenv("POD_NAMESPACE", "test_namespace") + Expect(os.Setenv("POD_NAME", "test_pod")).To(Succeed()) + Expect(os.Setenv("POD_NAMESPACE", "test_namespace")).To(Succeed()) k8sClientset = fake.NewClientBuilder().Build() lease = &v1.Lease{ TypeMeta: metav1.TypeMeta{ @@ -83,8 +94,8 @@ var _ = Describe("Heartbeat", func() { } }) AfterEach(func() { - os.Unsetenv("POD_NAME") - os.Unsetenv("POD_NAMESPACE") + Expect(os.Unsetenv("POD_NAME")).To(Succeed()) + Expect(os.Unsetenv("POD_NAMESPACE")).To(Succeed()) }) It("Should Correctly update holder identity of full snapshot lease", func() { Expect(os.Getenv("POD_NAME")).Should(Equal("test_pod")) @@ -104,10 +115,10 @@ var _ = Describe("Heartbeat", func() { Expect(err).ShouldNot(HaveOccurred()) l := &v1.Lease{} - k8sClientset.Get(context.TODO(), client.ObjectKey{ + Expect(k8sClientset.Get(context.TODO(), client.ObjectKey{ Namespace: lease.Namespace, Name: lease.Name, - }, l) + }, l)).To(Succeed()) Expect(l.Spec.HolderIdentity).To(PointTo(Equal("989"))) Expect(err).ShouldNot(HaveOccurred()) @@ -119,7 +130,7 @@ var _ = Describe("Heartbeat", func() { Expect(os.Getenv("POD_NAME")).Should(Equal("test_pod")) Expect(os.Getenv("POD_NAMESPACE")).Should(Equal("test_namespace")) - k8sClientset.Create(context.TODO(), lease) + Expect(k8sClientset.Create(context.TODO(), lease)).To(Succeed()) err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, nil, k8sClientset, brtypes.DefaultFullSnapshotLeaseName) Expect(err).Should(HaveOccurred()) @@ -130,8 +141,8 @@ var _ = Describe("Heartbeat", func() { }) Context("With valid delta snapshot lease present", func() { BeforeEach(func() { - os.Setenv("POD_NAME", "test_pod") - os.Setenv("POD_NAMESPACE", "test_namespace") + Expect(os.Setenv("POD_NAME", "test_pod")).To(Succeed()) + Expect(os.Setenv("POD_NAMESPACE", "test_namespace")).To(Succeed()) k8sClientset = fake.NewClientBuilder().Build() lease = &v1.Lease{ TypeMeta: metav1.TypeMeta{ @@ -145,8 +156,8 @@ var _ = Describe("Heartbeat", func() { } }) AfterEach(func() { - os.Unsetenv("POD_NAME") - os.Unsetenv("POD_NAMESPACE") + Expect(os.Unsetenv("POD_NAME")).To(Succeed()) + Expect(os.Unsetenv("POD_NAMESPACE")).To(Succeed()) }) It("Should renew and correctly update holder identity of delta snapshot lease when delta snapshot list is passed", func() { Expect(os.Getenv("POD_NAME")).Should(Equal("test_pod")) @@ -251,8 +262,8 @@ var _ = Describe("Heartbeat", func() { ) Context("With corresponding member lease present", func() { BeforeEach(func() { - os.Setenv("POD_NAME", "test_pod") - os.Setenv("POD_NAMESPACE", "test_namespace") + Expect(os.Setenv("POD_NAME", "test_pod")).To(Succeed()) + Expect(os.Setenv("POD_NAMESPACE", "test_namespace")).To(Succeed()) lease = &v1.Lease{ TypeMeta: metav1.TypeMeta{ Kind: "Lease", @@ -273,14 +284,15 @@ var _ = Describe("Heartbeat", func() { Namespace: os.Getenv("POD_NAMESPACE"), }, } + metadata = map[string]string{heartbeat.PeerURLTLSEnabledKey: "true"} }) AfterEach(func() { - os.Unsetenv("POD_NAME") - os.Unsetenv("POD_NAMESPACE") + Expect(os.Unsetenv("POD_NAME")).To(Succeed()) + Expect(os.Unsetenv("POD_NAMESPACE")).To(Succeed()) }) It("Should correctly update the member lease", func() { clientSet := miscellaneous.GetFakeKubernetesClientSet() - heartbeat, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, clientSet) + hb, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, clientSet, metadata) Expect(err).ShouldNot(HaveOccurred()) err = clientSet.Create(context.TODO(), lease) @@ -288,7 +300,7 @@ var _ = Describe("Heartbeat", func() { err = clientSet.Create(context.TODO(), pod) Expect(err).ShouldNot(HaveOccurred()) - err = heartbeat.RenewMemberLease(context.TODO()) + err = hb.RenewMemberLease(context.TODO()) Expect(err).ShouldNot(HaveOccurred()) l := &v1.Lease{} @@ -298,7 +310,8 @@ var _ = Describe("Heartbeat", func() { }, l) Expect(err).ShouldNot(HaveOccurred()) Expect(l.Spec.HolderIdentity).ToNot(BeNil()) - + Expect(l.Annotations).ToNot(BeEmpty()) + Expect(l.Annotations[heartbeat.PeerURLTLSEnabledKey]).To(Equal("true")) err = clientSet.Delete(context.TODO(), l) Expect(err).ShouldNot(HaveOccurred()) err = clientSet.Delete(context.TODO(), pod) @@ -307,15 +320,15 @@ var _ = Describe("Heartbeat", func() { }) Context("With corresponding member lease not present", func() { BeforeEach(func() { - os.Setenv("POD_NAME", "test_pod") - os.Setenv("POD_NAMESPACE", "test_namespace") + Expect(os.Setenv("POD_NAME", "test_pod")).To(Succeed()) + Expect(os.Setenv("POD_NAMESPACE", "test_namespace")).To(Succeed()) }) AfterEach(func() { - os.Unsetenv("POD_NAME") - os.Unsetenv("POD_NAMESPACE") + Expect(os.Unsetenv("POD_NAME")).To(Succeed()) + Expect(os.Unsetenv("POD_NAMESPACE")).To(Succeed()) }) It("Should return an error", func() { - heartbeat, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, miscellaneous.GetFakeKubernetesClientSet()) + heartbeat, err := heartbeat.NewHeartbeat(logger, etcdConnectionConfig, miscellaneous.GetFakeKubernetesClientSet(), metadata) Expect(err).ShouldNot(HaveOccurred()) err = heartbeat.RenewMemberLease(context.TODO()) @@ -339,7 +352,7 @@ var _ = Describe("Heartbeat", func() { }) Context("With fail to create clientset", func() { It("Should return an error", func() { - err := heartbeat.RenewMemberLeasePeriodically(testCtx, mmStopCh, hConfig, logger, etcdConnectionConfig) + err := heartbeat.RenewMemberLeasePeriodically(testCtx, mmStopCh, hConfig, logger, etcdConnectionConfig, true) Expect(err).Should(HaveOccurred()) }) }) diff --git a/pkg/initializer/initializer.go b/pkg/initializer/initializer.go index 97b85100b..924bc88b8 100644 --- a/pkg/initializer/initializer.go +++ b/pkg/initializer/initializer.go @@ -36,14 +36,14 @@ import ( ) // Initialize has the following steps: -// * Check if data directory exists. -// - If data directory exists -// * Check for data corruption. -// - If data directory is in corrupted state, clear the data directory. -// - If data directory does not exist. -// * Check if Latest snapshot available. -// - Try to perform an Etcd data restoration from the latest snapshot. -// - No snapshots are available, start etcd as a fresh installation. +// - Check if data directory exists. +// - If data directory exists +// - Check for data corruption. +// - If data directory is in corrupted state, clear the data directory. +// - If data directory does not exist. +// - Check if Latest snapshot available. +// - Try to perform an Etcd data restoration from the latest snapshot. +// - No snapshots are available, start etcd as a fresh installation. func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int64) error { metrics.CurrentClusterSize.With(prometheus.Labels{}).Set(float64(e.Validator.OriginalClusterSize)) start := time.Now() @@ -113,7 +113,7 @@ func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int6 return nil } -//NewInitializer creates an etcd initializer object. +// NewInitializer creates an etcd initializer object. func NewInitializer(options *brtypes.RestoreOptions, snapstoreConfig *brtypes.SnapstoreConfig, etcdConnectionConfig *brtypes.EtcdConnectionConfig, logger *logrus.Logger) *EtcdInitializer { zapLogger, _ := zap.NewProduction() etcdInit := &EtcdInitializer{ diff --git a/pkg/initializer/validator/datavalidator_test.go b/pkg/initializer/validator/datavalidator_test.go index 87da93f0f..816ec33db 100644 --- a/pkg/initializer/validator/datavalidator_test.go +++ b/pkg/initializer/validator/datavalidator_test.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/leaderelection/leaderelection_test.go b/pkg/leaderelection/leaderelection_test.go index ab6b6f006..1ee9ddb1b 100644 --- a/pkg/leaderelection/leaderelection_test.go +++ b/pkg/leaderelection/leaderelection_test.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/member/member_control.go b/pkg/member/member_control.go index 00d9d217a..bf545dc95 100644 --- a/pkg/member/member_control.go +++ b/pkg/member/member_control.go @@ -4,9 +4,7 @@ import ( "context" "errors" "fmt" - "os" "strconv" - "strings" "time" "github.com/gardener/etcd-backup-restore/pkg/etcdutil" @@ -19,7 +17,6 @@ import ( "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/etcdserver/etcdserverpb" - "gopkg.in/yaml.v2" "k8s.io/client-go/util/retry" ) @@ -48,8 +45,8 @@ type Control interface { // This will succeed if and only if learner is in a healthy state and the learner is in sync with leader. PromoteMember(context.Context) error - // UpdateMember updates the peer address of a specified etcd cluster member. - UpdateMember(context.Context, client.ClusterCloser) error + // UpdateMemberPeerURL updates the peer address of a specified etcd cluster member. + UpdateMemberPeerURL(context.Context, client.ClusterCloser) (string, error) // RemoveMember removes the member from the etcd cluster. RemoveMember(context.Context) error @@ -92,7 +89,7 @@ func NewMemberControl(etcdConnConfig *brtypes.EtcdConnectionConfig) Control { // AddMemberAsLearner add a member as a learner to the etcd cluster func (m *memberControl) AddMemberAsLearner(ctx context.Context) error { //Add member as learner to cluster - memberURL, err := getMemberURL(m.configFile, m.podName) + memberURL, err := getMemberPeerURL(m.configFile, m.podName) if err != nil { m.logger.Fatalf("Error fetching etcd member URL : %v", err) } @@ -174,54 +171,38 @@ func (m *memberControl) IsMemberInCluster(ctx context.Context) (bool, error) { return false, nil } -func getMemberURL(configFile string, podName string) (string, error) { - configYML, err := os.ReadFile(configFile) +func getMemberPeerURL(configFile string, podName string) (string, error) { + config, err := miscellaneous.ReadConfigFileAsMap(configFile) if err != nil { - return "", fmt.Errorf("unable to read etcd config file: %v", err) + return "", err } - - config := map[string]interface{}{} - if err := yaml.Unmarshal([]byte(configYML), &config); err != nil { - return "", fmt.Errorf("unable to unmarshal etcd config yaml file: %v", err) - } - initAdPeerURL := config["initial-advertise-peer-urls"] - peerURL, err := parsePeerURL(initAdPeerURL.(string), podName) + peerURL, err := miscellaneous.ParsePeerURL(initAdPeerURL.(string), podName) if err != nil { return "", fmt.Errorf("could not parse peer URL from the config file : %v", err) } return peerURL, nil } -func parsePeerURL(peerURL, podName string) (string, error) { - tokens := strings.Split(peerURL, "@") - if len(tokens) < 4 { - return "", fmt.Errorf("invalid peer URL : %s", peerURL) - } - domaiName := fmt.Sprintf("%s.%s.%s", tokens[1], tokens[2], "svc") - - return fmt.Sprintf("%s://%s.%s:%s", tokens[0], podName, domaiName, tokens[3]), nil -} - -// updateMemberPeerAddress updated the peer address of a specified etcd member -func (m *memberControl) updateMemberPeerAddress(ctx context.Context, cli client.ClusterCloser, id uint64) error { +// doUpdateMemberPeerAddress updated the peer address of a specified etcd member +func (m *memberControl) doUpdateMemberPeerAddress(ctx context.Context, cli client.ClusterCloser, id uint64) (string, error) { // Already existing clusters have `http://localhost:2380` as the peer address. This needs to explicitly updated to the new address // TODO: Remove this peer address updation logic on etcd-br v0.20.0 m.logger.Infof("Updating member peer URL for %s", m.podName) - memberURL, err := getMemberURL(m.configFile, m.podName) + memberPeerURL, err := getMemberPeerURL(m.configFile, m.podName) if err != nil { - return fmt.Errorf("could not fetch member URL : %v", err) + return "", fmt.Errorf("could not fetch member URL : %v", err) } memberUpdateCtx, cancel := context.WithTimeout(ctx, EtcdTimeout) defer cancel() - if _, err = cli.MemberUpdate(memberUpdateCtx, id, []string{memberURL}); err == nil { + if _, err = cli.MemberUpdate(memberUpdateCtx, id, []string{memberPeerURL}); err == nil { m.logger.Info("Successfully updated the member peer URL") - return nil + return memberPeerURL, nil } - return err + return "", err } // PromoteMember promotes an etcd member from a learner to a voting member of the cluster. This will succeed only if its logs are caught up with the leader @@ -261,18 +242,18 @@ func findMember(existingMembers []*etcdserverpb.Member, memberName string) *etcd return nil } -// UpdateMember updates the peer address of a specified etcd cluster member. -func (m *memberControl) UpdateMember(ctx context.Context, cli client.ClusterCloser) error { +// UpdateMemberPeerURL updates the peer address of a specified etcd cluster member. +func (m *memberControl) UpdateMemberPeerURL(ctx context.Context, cli client.ClusterCloser) (string, error) { m.logger.Infof("Attempting to update the member Info: %v", m.podName) ctx, cancel := context.WithTimeout(ctx, brtypes.DefaultEtcdConnectionTimeout) defer cancel() membersInfo, err := cli.MemberList(ctx) if err != nil { - return fmt.Errorf("error listing members: %v", err) + return "", fmt.Errorf("error listing members: %v", err) } - return m.updateMemberPeerAddress(ctx, cli, membersInfo.Header.GetMemberId()) + return m.doUpdateMemberPeerAddress(ctx, cli, membersInfo.Header.GetMemberId()) } // RemoveMember removes the member from the etcd cluster. diff --git a/pkg/member/member_control_test.go b/pkg/member/member_control_test.go index acc457f96..c284ecd19 100644 --- a/pkg/member/member_control_test.go +++ b/pkg/member/member_control_test.go @@ -23,6 +23,9 @@ var _ = Describe("Membercontrol", func() { factory *mockfactory.MockFactory cl *mockfactory.MockClusterCloser ) + const ( + podName = "test-pod" + ) BeforeEach(func() { etcdConnectionConfig = brtypes.NewEtcdConnectionConfig() @@ -31,7 +34,7 @@ var _ = Describe("Membercontrol", func() { etcdConnectionConfig.SnapshotTimeout.Duration = 30 * time.Second etcdConnectionConfig.DefragTimeout.Duration = 30 * time.Second - os.Setenv("POD_NAME", "test-pod") + os.Setenv("POD_NAME", podName) ctrl = gomock.NewController(GinkgoT()) factory = mockfactory.NewMockFactory(ctrl) @@ -132,8 +135,10 @@ var _ = Describe("Membercontrol", func() { cl.EXPECT().MemberUpdate(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) - err = m.UpdateMember(context.TODO(), client) + peerUrl, err := m.UpdateMemberPeerURL(context.TODO(), client) Expect(err).ShouldNot(HaveOccurred()) + expectedPeerUrl := fmt.Sprintf("http://%s.%s.%s.svc:2380", podName, "etcd-main-peer", "default") + Expect(peerUrl).To(Equal(expectedPeerUrl)) }) }) @@ -161,7 +166,7 @@ var _ = Describe("Membercontrol", func() { cl.EXPECT().MemberUpdate(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("unable to connect to dummy etcd")) - err = m.UpdateMember(context.TODO(), client) + _, err = m.UpdateMemberPeerURL(context.TODO(), client) Expect(err).Should(HaveOccurred()) }) }) diff --git a/pkg/miscellaneous/miscellaneous.go b/pkg/miscellaneous/miscellaneous.go index 59bf66bad..a3c75430b 100644 --- a/pkg/miscellaneous/miscellaneous.go +++ b/pkg/miscellaneous/miscellaneous.go @@ -489,3 +489,27 @@ func RemoveMemberFromCluster(ctx context.Context, cli etcdClient.ClusterCloser, logger.Infof("successfully removed member [ID: %v] from the cluster", strconv.FormatUint(memberID, 16)) return nil } + +// ReadConfigFileAsMap reads the config file given a path and converts it into a map[string]interface{} +func ReadConfigFileAsMap(path string) (map[string]interface{}, error) { + configYML, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("unable to read etcd c file at path: %s : %v", path, err) + } + + c := map[string]interface{}{} + if err := yaml.Unmarshal(configYML, &c); err != nil { + return nil, fmt.Errorf("unable to unmarshal etcd c yaml file at path: %s : %v", path, err) + } + return c, nil +} + +// ParsePeerURL forms a PeerURL, given podName by parsing the initial-advertise-peer-urls +func ParsePeerURL(initialAdvertisePeerURLs, podName string) (string, error) { + tokens := strings.Split(initialAdvertisePeerURLs, "@") + if len(tokens) < 4 { + return "", fmt.Errorf("invalid peer URL : %s", initialAdvertisePeerURLs) + } + domaiName := fmt.Sprintf("%s.%s.%s", tokens[1], tokens[2], "svc") + return fmt.Sprintf("%s://%s.%s:%s", tokens[0], podName, domaiName, tokens[3]), nil +} diff --git a/pkg/miscellaneous/miscellaneous_test.go b/pkg/miscellaneous/miscellaneous_test.go index 05312b737..bafc9543b 100644 --- a/pkg/miscellaneous/miscellaneous_test.go +++ b/pkg/miscellaneous/miscellaneous_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "io" + "path/filepath" "reflect" "time" @@ -576,6 +577,61 @@ var _ = Describe("Miscellaneous Tests", func() { }) }) }) + Describe("parse peer urls config", func() { + var ( + initialAdPeerURL string + podName string + ) + BeforeEach(func() { + podName = "etcd-test-pod-0" + }) + Context("parse peer url", func() { + It("parsing well-defined initial-advertise-peer-urls", func() { + initialAdPeerURL = "https@etcd-events-peer@shoot--dev--test@2380" + peerURL, err := ParsePeerURL(initialAdPeerURL, podName) + Expect(err).To(BeNil()) + Expect(peerURL).To(Equal("https://etcd-test-pod-0.etcd-events-peer.shoot--dev--test.svc:2380")) + }) + It("parsing malformed initial-advertise-peer-urls", func() { + initialAdPeerURL = "https@etcd-events-peer@shoot--dev--test" + _, err := ParsePeerURL(initialAdPeerURL, podName) + Expect(err).ToNot(BeNil()) + }) + }) + }) + Describe("read config file into a map", func() { + const testdataPath = "testdata" + var ( + configPath string + ) + Context("valid config file", func() { + BeforeEach(func() { + configPath = filepath.Join(testdataPath, "valid_config.yaml") + }) + It("test read and parse yaml", func() { + configAsMap, err := ReadConfigFileAsMap(configPath) + Expect(err).To(BeNil()) + Expect(configAsMap).ToNot(BeNil()) + Expect(configAsMap["name"]).To(Equal("etcd-57c38d")) //just testing one property + }) + }) + Context("invalid file path", func() { + It("test read and parse for a non-existent path", func() { + configPath = "file-does-not-exist.yaml" + _, err := ReadConfigFileAsMap(configPath) + Expect(err).ToNot(BeNil()) + }) + }) + Context("invalid yaml file", func() { + BeforeEach(func() { + configPath = filepath.Join(testdataPath, "invalid_config.yaml") + }) + It("test read and parse an invalid config yaml", func() { + _, err := ReadConfigFileAsMap(configPath) + Expect(err).ToNot(BeNil()) + }) + }) + }) }) diff --git a/pkg/miscellaneous/testdata/invalid_config.yaml b/pkg/miscellaneous/testdata/invalid_config.yaml new file mode 100644 index 000000000..c8e29b3a9 --- /dev/null +++ b/pkg/miscellaneous/testdata/invalid_config.yaml @@ -0,0 +1,27 @@ +advertise-client-urls: https@etcd-events-peer@shoot--dev--test@2379 +auto-compaction-mode: periodic + auto-compaction-retention: 30m +client-transport-security: + auto-tls: false + cert-file: /var/etcd/ssl/client/server/tls.crt + client-cert-auth: true + key-file: /var/etcd/ssl/client/server/tls.key + trusted-ca-file: /var/etcd/ssl/client/ca/bundle.crt +data-dir: /var/etcd/data/new.etcd +enable-v2: false +initial-advertise-peer-urls: https@etcd-events-peer@shoot--dev--test@2380 +initial-cluster: "etcd-events-0=https://etcd-events-0.etcd-events-peer.shoot--dev--test.svc:2380,etcd-events-1=https://etcd-events-1.etcd-events-peer.shoot--dev--test.svc:2380,etcd-events-2=https://etcd-events-2.etcd-events-peer.shoot--dev--test.svc:2380" +initial-cluster-state: new +initial-cluster-token: etcd-cluster +listen-client-urls: "https://0.0.0.0:2379" +listen-peer-urls: "https://0.0.0.0:2380" +metrics: basic +name: etcd-57c38d +peer-transport-security: + auto-tls: false + cert-file: /var/etcd/ssl/peer/server/tls.crt + client-cert-auth: true + key-file: /var/etcd/ssl/peer/server/tls.key + trusted-ca-file: /var/etcd/ssl/peer/ca/bundle.crt +quota-backend-bytes: 8589934592 +snapshot-count: 75000 diff --git a/pkg/miscellaneous/testdata/valid_config.yaml b/pkg/miscellaneous/testdata/valid_config.yaml new file mode 100644 index 000000000..f711d5ccd --- /dev/null +++ b/pkg/miscellaneous/testdata/valid_config.yaml @@ -0,0 +1,69 @@ +name: etcd-57c38d +# Path to the data directory. +data-dir: /var/etcd/data/new.etcd +# metrics configuration +metrics: basic +# Number of committed transactions to trigger a snapshot to disk. +snapshot-count: 75000 + +# Accept etcd V2 client requests +enable-v2: false +# Raise alarms when backend size exceeds the given quota. 0 means use the +# default quota. +quota-backend-bytes: 8589934592 +client-transport-security: + # Path to the client server TLS cert file. + cert-file: /var/etcd/ssl/client/server/tls.crt + + # Path to the client server TLS key file. + key-file: /var/etcd/ssl/client/server/tls.key + + # Enable client cert authentication. + client-cert-auth: true + + # Path to the client server TLS trusted CA cert file. + trusted-ca-file: /var/etcd/ssl/client/ca/bundle.crt + + # Client TLS using generated certificates + auto-tls: false +# List of comma separated URLs to listen on for client traffic. +listen-client-urls: https://0.0.0.0:2379 + +# List of this member's client URLs to advertise to the public. +# The URLs needed to be a comma-separated list. +advertise-client-urls: https@etcd-events-peer@shoot--dev--test@2379 +peer-transport-security: + # Path to the peer server TLS cert file. + cert-file: /var/etcd/ssl/peer/server/tls.crt + + # Path to the peer server TLS key file. + key-file: /var/etcd/ssl/peer/server/tls.key + + # Enable peer cert authentication. + client-cert-auth: true + + # Path to the peer server TLS trusted CA cert file. + trusted-ca-file: /var/etcd/ssl/peer/ca/bundle.crt + + # Peer TLS using generated certificates + auto-tls: false +# List of comma separated URLs to listen on for peer traffic. +listen-peer-urls: https://0.0.0.0:2380 + +# List of this member's peer URLs to advertise to the public. +# The URLs needed to be a comma-separated list. +initial-advertise-peer-urls: https@etcd-events-peer@shoot--dev--test@2380 + +# Initial cluster token for the etcd cluster during bootstrap. +initial-cluster-token: etcd-cluster + +# Initial cluster state ('new' or 'existing'). +initial-cluster-state: new + +# Initial cluster +initial-cluster: etcd-events-0=https://etcd-events-0.etcd-events-peer.shoot--dev--test.svc:2380,etcd-events-1=https://etcd-events-1.etcd-events-peer.shoot--dev--test.svc:2380,etcd-events-2=https://etcd-events-2.etcd-events-peer.shoot--dev--test.svc:2380 + +# auto-compaction-mode ("periodic" or "revision"). +auto-compaction-mode: periodic +# auto-compaction-retention defines Auto compaction retention length for etcd. +auto-compaction-retention: 30m \ No newline at end of file diff --git a/pkg/server/backrestoreserver_test.go b/pkg/server/backrestoreserver_test.go new file mode 100644 index 000000000..6e7d3380e --- /dev/null +++ b/pkg/server/backrestoreserver_test.go @@ -0,0 +1,38 @@ +package server + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("backrestoreserver tests", func() { + const podName = "etcd-test-pod-0" + var ( + initialAdvertisePeerURLs string + ) + + Describe("testing isPeerURLTLSEnabled", func() { + Context("testing with non-TLS enabled peer url", func() { + BeforeEach(func() { + initialAdvertisePeerURLs = "http@etcd-main-peer@default@2380" + }) + It("test", func() { + enabled, err := isPeerURLTLSEnabled(initialAdvertisePeerURLs, podName) + Expect(err).To(BeNil()) + Expect(enabled).To(BeFalse()) + }) + + }) + Context("testing with TLS enabled peer url", func() { + BeforeEach(func() { + initialAdvertisePeerURLs = "https@etcd-main-peer@default@2380" + }) + It("test", func() { + enabled, err := isPeerURLTLSEnabled(initialAdvertisePeerURLs, podName) + Expect(err).To(BeNil()) + Expect(enabled).To(BeFalse()) + }) + }) + }) + +}) diff --git a/pkg/server/backuprestoreserver.go b/pkg/server/backuprestoreserver.go index c38fd99d5..e0ba2014d 100644 --- a/pkg/server/backuprestoreserver.go +++ b/pkg/server/backuprestoreserver.go @@ -19,28 +19,26 @@ import ( "fmt" "net" "net/http" - "os" + "net/url" "sync" "sync/atomic" "time" "github.com/gardener/etcd-backup-restore/pkg/backoff" "github.com/gardener/etcd-backup-restore/pkg/common" - "github.com/gardener/etcd-backup-restore/pkg/leaderelection" - "github.com/gardener/etcd-backup-restore/pkg/metrics" - "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" - brtypes "github.com/gardener/etcd-backup-restore/pkg/types" - "github.com/ghodss/yaml" - "github.com/gardener/etcd-backup-restore/pkg/defragmentor" "github.com/gardener/etcd-backup-restore/pkg/errors" "github.com/gardener/etcd-backup-restore/pkg/etcdutil" "github.com/gardener/etcd-backup-restore/pkg/health/heartbeat" "github.com/gardener/etcd-backup-restore/pkg/health/membergarbagecollector" "github.com/gardener/etcd-backup-restore/pkg/initializer" + "github.com/gardener/etcd-backup-restore/pkg/leaderelection" "github.com/gardener/etcd-backup-restore/pkg/member" + "github.com/gardener/etcd-backup-restore/pkg/metrics" + "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" "github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/prometheus/client_golang/prometheus" cron "github.com/robfig/cron/v3" @@ -64,6 +62,11 @@ var ( // runServerWithSnapshotter indicates whether to start server with or without snapshotter. runServerWithSnapshotter bool = true retryTimeout = 5 * time.Second + peerURLTLSEnabled bool +) + +const ( + https = "https" ) // NewBackupRestoreServer return new backup restore server. @@ -95,21 +98,31 @@ func NewBackupRestoreServer(logger *logrus.Logger, config *BackupRestoreComponen // Run starts the backup restore server. func (b *BackupRestoreServer) Run(ctx context.Context) error { - var inputFileName string + var etcdConfigPath string var err error - inputFileName = miscellaneous.GetConfigFilePath() + etcdConfigPath = miscellaneous.GetConfigFilePath() - configYML, err := os.ReadFile(inputFileName) + config, err := miscellaneous.ReadConfigFileAsMap(etcdConfigPath) if err != nil { - b.logger.Fatalf("Unable to read etcd config file: %v", err) + b.logger.WithFields(logrus.Fields{ + "configFile": etcdConfigPath, + }).Fatalf("failed to read etcd config file: %v", err) return err } - config := map[string]interface{}{} - if err := yaml.Unmarshal([]byte(configYML), &config); err != nil { - b.logger.Fatalf("Unable to unmarshal etcd config yaml file: %v", err) - return err + podName, err := miscellaneous.GetEnvVarOrError("POD_NAME") + if err != nil { + b.logger.Fatalf("Error reading POD_NAME env var : %v", err) + } + + initialAdvertisePeerURLs := config["initial-advertise-peer-urls"].(string) + peerURLTLSEnabled, err = isPeerURLTLSEnabled(initialAdvertisePeerURLs, podName) + if err != nil { + b.logger.WithFields(logrus.Fields{ + "podName": podName, + "initial-advertise-peer-urls": initialAdvertisePeerURLs, + }).Fatalf("failed to parse initial-advertise-peer-urls: %v", err) } initialClusterSize, err := miscellaneous.GetClusterSize(fmt.Sprint(config["initial-cluster"])) @@ -157,7 +170,7 @@ func (b *BackupRestoreServer) startHTTPServer(initializer initializer.Initialize EnableProfiling: b.config.ServerConfig.EnableProfiling, ReqCh: make(chan struct{}), AckCh: make(chan struct{}), - EnableTLS: (b.config.ServerConfig.TLSCertFile != "" && b.config.ServerConfig.TLSKeyFile != ""), + EnableTLS: b.config.ServerConfig.TLSCertFile != "" && b.config.ServerConfig.TLSKeyFile != "", ServerTLSCertFile: b.config.ServerConfig.TLSCertFile, ServerTLSKeyFile: b.config.ServerConfig.TLSKeyFile, HTTPHandlerMutex: &sync.Mutex{}, @@ -195,6 +208,7 @@ func (b *BackupRestoreServer) runServer(ctx context.Context, restoreOpts *brtype metrics.CurrentClusterSize.With(prometheus.Labels{}).Set(float64(restoreOpts.OriginalClusterSize)) // Promotes member if it is a learner + if restoreOpts.OriginalClusterSize > 1 { for { select { @@ -208,7 +222,7 @@ func (b *BackupRestoreServer) runServer(ctx context.Context, restoreOpts *brtype if err == nil { break } - miscellaneous.SleepWithContext(ctx, retryTimeout) + _ = miscellaneous.SleepWithContext(ctx, retryTimeout) } } else { // when OriginalClusterSize = 1 @@ -220,7 +234,8 @@ func (b *BackupRestoreServer) runServer(ctx context.Context, restoreOpts *brtype if err != nil { return err } - return m.UpdateMember(ctx, cli) + _, err = m.UpdateMemberPeerURL(ctx, cli) + return err }) if err != nil { b.logger.Error("unable to update the member") @@ -280,7 +295,7 @@ func (b *BackupRestoreServer) runServer(ctx context.Context, restoreOpts *brtype mmStopCh = make(chan struct{}) if b.config.HealthConfig.MemberLeaseRenewalEnabled { go func() { - if err := heartbeat.RenewMemberLeasePeriodically(ctx, mmStopCh, b.config.HealthConfig, b.logger, b.config.EtcdConnectionConfig); err != nil { + if err := heartbeat.RenewMemberLeasePeriodically(ctx, mmStopCh, b.config.HealthConfig, b.logger, b.config.EtcdConnectionConfig, peerURLTLSEnabled); err != nil { b.logger.Fatalf("failed RenewMemberLeases: %v", err) } }() @@ -320,7 +335,7 @@ func (b *BackupRestoreServer) runServer(ctx context.Context, restoreOpts *brtype if b.config.HealthConfig.MemberLeaseRenewalEnabled { go func() { - if err := heartbeat.RenewMemberLeasePeriodically(ctx, mmStopCh, b.config.HealthConfig, b.logger, b.config.EtcdConnectionConfig); err != nil { + if err := heartbeat.RenewMemberLeasePeriodically(ctx, mmStopCh, b.config.HealthConfig, b.logger, b.config.EtcdConnectionConfig, peerURLTLSEnabled); err != nil { b.logger.Fatalf("failed RenewMemberLeases: %v", err) } }() @@ -621,3 +636,15 @@ func (b *BackupRestoreServer) stopSnapshotter(handler *HTTPHandler) { handler.Logger.Info("Waiting for acknowledgment...") <-handler.AckCh } + +func isPeerURLTLSEnabled(initialAdvertisePeerURLs string, podName string) (bool, error) { + rawPeerURL, err := miscellaneous.ParsePeerURL(initialAdvertisePeerURLs, podName) + if err != nil { + return false, err + } + peerURL, err := url.Parse(rawPeerURL) + if err != nil { + return false, err + } + return peerURL.Scheme == https, nil +} diff --git a/pkg/snapshot/restorer/restorer.go b/pkg/snapshot/restorer/restorer.go index a4376d915..4a34d659e 100644 --- a/pkg/snapshot/restorer/restorer.go +++ b/pkg/snapshot/restorer/restorer.go @@ -127,7 +127,7 @@ func (r *Restorer) Restore(ro brtypes.RestoreOptions, m member.Control) (*embed. return e, err } defer clientCluster.Close() - m.UpdateMember(context.TODO(), clientCluster) + m.UpdateMemberPeerURL(context.TODO(), clientCluster) } return e, nil } diff --git a/pkg/snapshot/restorer/restorer_test.go b/pkg/snapshot/restorer/restorer_test.go index 33f2b0a30..1cc8123cf 100644 --- a/pkg/snapshot/restorer/restorer_test.go +++ b/pkg/snapshot/restorer/restorer_test.go @@ -928,8 +928,8 @@ func corruptEtcdDir() error { return os.RemoveAll(etcdDir) } -//takeValidSnaps saves valid snaps in the v1 prefix dir of snapstore so that restorer could restore from them -//TODO: Consider removing when backward compatibility no longer needed +// takeValidSnaps saves valid snaps in the v1 prefix dir of snapstore so that restorer could restore from them +// TODO: Consider removing when backward compatibility no longer needed func takeValidSnaps(logger *logrus.Entry, container string, resp *utils.EtcdDataPopulationResponse, deltaSnapshotPeriod time.Duration, endpoints []string, mode int, backupVersion int) error { //Here we run the snapshotter to take snapshots. The snapshotter by default stores the snaps in the v2 directory. //We then move those snaps into the v1 dir under a 'Backup-xxxxxx' dir @@ -1026,8 +1026,8 @@ func takeValidSnaps(logger *logrus.Entry, container string, resp *utils.EtcdData return nil } -//takeInvalidV1Snaps saves an invalid snap in the v1 prefix dir of the snapstore -//TODO: Consider removing when backward compatibility no longer needed +// takeInvalidV1Snaps saves an invalid snap in the v1 prefix dir of the snapstore +// TODO: Consider removing when backward compatibility no longer needed func takeInvalidV1Snaps(container string) error { //V1 snapstore object store, err := snapstore.GetSnapstore(&brtypes.SnapstoreConfig{Container: container, Provider: "Local", Prefix: "v2"}) diff --git a/pkg/snapshot/snapshotter/snapshotter_test.go b/pkg/snapshot/snapshotter/snapshotter_test.go index 71cdb48a5..5545d9f8b 100644 --- a/pkg/snapshot/snapshotter/snapshotter_test.go +++ b/pkg/snapshot/snapshotter/snapshotter_test.go @@ -910,7 +910,7 @@ func prepareStoreForBackwardCompatibleGC(forTime time.Time, storeContainer strin return store, snapstoreConfig } -//validateLimitBasedSnapshots verifies whether the snapshot list after being garbage collected using the limit-based configuration is a valid snapshot list +// validateLimitBasedSnapshots verifies whether the snapshot list after being garbage collected using the limit-based configuration is a valid snapshot list func validateLimitBasedSnapshots(list brtypes.SnapList, maxBackups uint, mode string) { incr := false fullSnapCount := 0 diff --git a/pkg/snapstore/ocs_s3_snapstore.go b/pkg/snapstore/ocs_s3_snapstore.go index 663bc3fcd..d2d08ec5d 100644 --- a/pkg/snapstore/ocs_s3_snapstore.go +++ b/pkg/snapstore/ocs_s3_snapstore.go @@ -177,7 +177,7 @@ func isOCSConfigEmpty(config ocsAuthOptions) error { return fmt.Errorf("ocs s3 credentials: region, secretAccessKey, endpoint or accessKeyID is missing") } -//OCSSnapStoreHash calculates and returns the hash of OCS snapstore secret. +// OCSSnapStoreHash calculates and returns the hash of OCS snapstore secret. func OCSSnapStoreHash(config *brtypes.SnapstoreConfig) (string, error) { ao, err := getOCSAuthOptions("") if err != nil { diff --git a/test/e2e/integration/cloud_backup_test.go b/test/e2e/integration/cloud_backup_test.go index 7f43bee75..31d9dcd33 100644 --- a/test/e2e/integration/cloud_backup_test.go +++ b/test/e2e/integration/cloud_backup_test.go @@ -132,26 +132,27 @@ var _ = Describe("CloudBackup", func() { // Create and place a ETCD config yaml outfile := "/tmp/etcd.conf.yaml" etcdConfigYaml := `# Human-readable name for this member. - name: etcd1 - data-dir: ` + os.Getenv("ETCD_DATA_DIR") + ` - metrics: extensive - snapshot-count: 75000 - enable-v2: false - quota-backend-bytes: 1073741824 - listen-client-urls: http://0.0.0.0:2379 - advertise-client-urls: http://0.0.0.0:2379 - initial-advertise-peer-urls: http://0.0.0.0:2380 - initial-cluster: etcd1=http://0.0.0.0:2380 - initial-cluster-token: new - initial-cluster-state: new - auto-compaction-mode: periodic - auto-compaction-retention: 30m` +name: etcd1 +data-dir: ` + os.Getenv("ETCD_DATA_DIR") + ` +metrics: extensive +snapshot-count: 75000 +enable-v2: false +quota-backend-bytes: 1073741824 +listen-client-urls: http://0.0.0.0:2379 +advertise-client-urls: http://0.0.0.0:2379 +initial-advertise-peer-urls: http@etcd-main-peer@default@2380 +initial-cluster: etcd1=http://0.0.0.0:2380 +initial-cluster-token: new +initial-cluster-state: new +auto-compaction-mode: periodic +auto-compaction-retention: 30m` err := os.WriteFile(outfile, []byte(etcdConfigYaml), 0755) Expect(err).ShouldNot(HaveOccurred()) - os.Setenv("ETCD_CONF", outfile) + Expect(os.Setenv("ETCD_CONF", outfile)).To(Succeed()) // Required as the config file for embedded ETCD fetches ETCD instance name from the POD_NAME variable - os.Setenv("POD_NAME", "etcd1") + Expect(os.Setenv("POD_NAME", "etcd1")).To(Succeed()) + Expect(os.Setenv("POD_NAMESPACE", "etcd-test")).To(Succeed()) }) Describe("Regular backups", func() { @@ -168,7 +169,7 @@ var _ = Describe("CloudBackup", func() { snapList, err := store.List() Expect(err).ShouldNot(HaveOccurred()) for _, snap := range snapList { - store.Delete(*snap) + Expect(store.Delete(*snap)).To(Succeed()) } cmdEtcd, etcdErrChan = startEtcd() diff --git a/test/utils/utils.go b/test/utils/utils.go index d3b601a7a..8e7dfa657 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -191,7 +191,7 @@ func ContextWithWaitGroup(parent context.Context, wg *sync.WaitGroup) context.Co } // ContextWithGracePeriod returns a new context, whose Done channel is closed when parent -//context is closed with additional . +// context is closed with additional . func ContextWithGracePeriod(parent context.Context, gracePeriod time.Duration) context.Context { ctx, cancel := context.WithCancel(context.TODO()) go func() {