Skip to content

Commit

Permalink
e2e tests: test fix for issue k8snetworkplumbingwg#182
Browse files Browse the repository at this point in the history
Issue [0] reports an error when a pod associated to a `StatefulSet`
whose IPPool is already full is deleted. According to it, the new pod -
scheduled by the `StatefulSet` - cannot run because the IPPool is
already full, and the old pod's IP cannot be garbage collected because
we match by pod reference - and the "new" pod is stuck in `creating`
phase.

[0] - k8snetworkplumbingwg#182

Signed-off-by: Miguel Duarte Barroso <[email protected]>
  • Loading branch information
maiqueb committed Mar 10, 2022
1 parent fedb06c commit aa764a3
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 83 deletions.
273 changes: 190 additions & 83 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var _ = Describe("Whereabouts functionality", func() {
clientInfo, err = NewClientInfo(config)
Expect(err).NotTo(HaveOccurred())

netAttachDef = macvlanNetworkWithWhereaboutsIPAMNetwork()
netAttachDef = macvlanNetworkWithWhereaboutsIPAMNetwork(testNetworkName, testNamespace, ipv4TestRange)

By("creating a NetworkAttachmentDefinition for whereabouts")
_, err = clientInfo.addNetAttachDef(netAttachDef)
Expand Down Expand Up @@ -104,87 +104,185 @@ var _ = Describe("Whereabouts functionality", func() {
statefulSetName = "statefulthingy"
)

BeforeEach(func() {
var err error
_, err = clientInfo.provisionStatefulSet(statefulSetName, namespace, serviceName, initialReplicaNumber)
Expect(err).NotTo(HaveOccurred())
Expect(
clientInfo.Client.CoreV1().Pods(namespace).List(
context.TODO(), metav1.ListOptions{LabelSelector: selector})).To(
WithTransform(func(podList *core.PodList) int { return len(podList.Items) }, Equal(initialReplicaNumber)))
Context("regular sized network", func() {
BeforeEach(func() {
var err error
_, err = clientInfo.provisionStatefulSet(statefulSetName, namespace, serviceName, initialReplicaNumber, testNetworkName)
Expect(err).NotTo(HaveOccurred())
Expect(
clientInfo.Client.CoreV1().Pods(namespace).List(
context.TODO(), metav1.ListOptions{LabelSelector: selector})).To(
WithTransform(func(podList *core.PodList) int { return len(podList.Items) }, Equal(initialReplicaNumber)))
})

AfterEach(func() {
Expect(clientInfo.deleteStatefulSet(namespace, serviceName, selector)).To(Succeed())
Expect(
clientInfo.Client.CoreV1().Pods(namespace).List(
context.TODO(), metav1.ListOptions{LabelSelector: selector})).To(
WithTransform(func(podList *core.PodList) []core.Pod { return podList.Items }, BeEmpty()),
"cannot have leaked pods in the system")

poolAllocations := func(ipPool *v1alpha1.IPPool) map[string]v1alpha1.IPAllocation {
return ipPool.Spec.Allocations
}
Expect(
clientInfo.WbClient.WhereaboutsV1alpha1().IPPools(ipPoolNamespace).Get(
context.TODO(),
wbstorage.NormalizeRange(ipv4TestRange),
metav1.GetOptions{})).To(
WithTransform(poolAllocations, BeEmpty()),
"cannot have leaked IPAllocations in the system")
})

It("IPPools feature allocations", func() {
ipPool, err := clientInfo.WbClient.WhereaboutsV1alpha1().IPPools(ipPoolNamespace).Get(context.TODO(), wbstorage.NormalizeRange(ipv4TestRange), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
Expect(len(ipPool.Spec.Allocations)).To(Equal(initialReplicaNumber))
})

table.DescribeTable("stateful sets scale up / down", func(testSetup func(int), instanceDelta int) {
const scaleTimeout = createTimeout * 6

testSetup(instanceDelta)

Eventually(func() (int, error) {
ipPool, err := clientInfo.WbClient.WhereaboutsV1alpha1().IPPools(ipPoolNamespace).Get(
context.TODO(), wbstorage.NormalizeRange(ipv4TestRange), metav1.GetOptions{})
if err != nil {
return -1, err
}

return len(ipPool.Spec.Allocations), nil
}, scaleTimeout).Should(
Equal(initialReplicaNumber), "we should have one allocation for each live pod")
},
table.Entry("scale up then down 5 replicas", func(deltaInstances int) {
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, deltaInstances)).To(Succeed())
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, -deltaInstances)).To(Succeed())
}, 5),
table.Entry("scale up then down 10 replicas", func(deltaInstances int) {
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, deltaInstances)).To(Succeed())
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, -deltaInstances)).To(Succeed())
}, 10),
table.Entry("scale up then down 20 replicas", func(deltaInstances int) {
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, deltaInstances)).To(Succeed())
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, -deltaInstances)).To(Succeed())
}, 20),
table.Entry("scale down then up 5 replicas", func(deltaInstances int) {
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, -deltaInstances)).To(Succeed())
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, deltaInstances)).To(Succeed())
}, 5),
table.Entry("scale down then up 10 replicas", func(deltaInstances int) {
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, -deltaInstances)).To(Succeed())
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, deltaInstances)).To(Succeed())
}, 10),
table.Entry("scale down then up 20 replicas", func(deltaInstances int) {
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, -deltaInstances)).To(Succeed())
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, deltaInstances)).To(Succeed())
}, 20),
)
})

AfterEach(func() {
Expect(clientInfo.deleteStatefulSet(namespace, serviceName, selector)).To(Succeed())
Expect(
clientInfo.Client.CoreV1().Pods(namespace).List(
context.TODO(), metav1.ListOptions{LabelSelector: selector})).To(
WithTransform(func(podList *core.PodList) []core.Pod { return podList.Items }, BeEmpty()),
"cannot have leaked pods in the system")

poolAllocations := func(ipPool *v1alpha1.IPPool) map[string]v1alpha1.IPAllocation {
return ipPool.Spec.Allocations
}
Expect(
clientInfo.WbClient.WhereaboutsV1alpha1().IPPools(ipPoolNamespace).Get(
context.TODO(),
wbstorage.NormalizeRange(ipv4TestRange),
metav1.GetOptions{})).To(
WithTransform(poolAllocations, BeEmpty()),
"cannot have leaked IPAllocations in the system")
})
Context("network with very few IPs", func() {
const (
namespace = "default"
networkName = "meganet2000"
rangeWithTwoIPs = "10.10.0.0/30"
replicaNumber = 2
statefulSetCreateTimeout = 20 * time.Second
)

It("IPPools feature allocations", func() {
ipPool, err := clientInfo.WbClient.WhereaboutsV1alpha1().IPPools(ipPoolNamespace).Get(context.TODO(), wbstorage.NormalizeRange(ipv4TestRange), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
Expect(len(ipPool.Spec.Allocations)).To(Equal(initialReplicaNumber))
var tinyNetwork *nettypes.NetworkAttachmentDefinition

BeforeEach(func() {
var err error
tinyNetwork, err = clientInfo.addNetAttachDef(
macvlanNetworkWithWhereaboutsIPAMNetwork(networkName, namespace, rangeWithTwoIPs))
Expect(err).NotTo(HaveOccurred())

_, err = clientInfo.provisionStatefulSet(statefulSetName, namespace, serviceName, replicaNumber, networkName)
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
Expect(clientInfo.delNetAttachDef(tinyNetwork)).To(Succeed())
Expect(clientInfo.deleteStatefulSet(namespace, serviceName, selector)).To(Succeed())
})

It("IPPool is exhausted", func() {
const scaleUpReplicas = 1
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, scaleUpReplicas)).To(Succeed())
Expect(
WaitForStatefulSetReady(
clientInfo.Client,
namespace,
serviceName,
replicaNumber+scaleUpReplicas,
statefulSetCreateTimeout)).To(HaveOccurred(), "the IPPool is already at its limits")
})

Context("deleting a pod from the statefulset", func() {
var (
containerID string
podRef string
)

BeforeEach(func() {
ipPool, err := clientInfo.WbClient.WhereaboutsV1alpha1().IPPools(ipPoolNamespace).Get(
context.TODO(),
wbstorage.NormalizeRange(rangeWithTwoIPs),
metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
Expect(ipPool.Spec.Allocations).NotTo(BeEmpty())

containerID = ipPool.Spec.Allocations["1"].ContainerID
podRef = ipPool.Spec.Allocations["1"].PodRef

decomposedPodRef := strings.Split(podRef, "/")
Expect(len(decomposedPodRef)).To(Equal(2))
podName := decomposedPodRef[1]

rightNow := int64(0)
Expect(clientInfo.Client.CoreV1().Pods(namespace).Delete(
context.TODO(), podName, metav1.DeleteOptions{GracePeriodSeconds: &rightNow})).To(Succeed())

Eventually(func() error {
return WaitForStatefulSet(time.Second, isStatefulSetRecovering(clientInfo.Client, serviceName, namespace, replicaNumber))
}, statefulSetCreateTimeout).Should(Succeed())

scaleUpTimeout := 10 * time.Second
Eventually(func() error {
return WaitForStatefulSetReady(
clientInfo.Client, namespace, serviceName, replicaNumber, scaleUpTimeout)
}, statefulSetCreateTimeout).Should(Succeed())
})

It("can recover from an exhausted IP pool", func() {
ipPool, err := clientInfo.WbClient.WhereaboutsV1alpha1().IPPools(ipPoolNamespace).Get(
context.TODO(),
wbstorage.NormalizeRange(rangeWithTwoIPs),
metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
Expect(ipPool.Spec.Allocations).NotTo(BeEmpty())

Expect(allocationForPodRef(podRef, *ipPool).ContainerID).NotTo(Equal(containerID))
})
})
})

table.DescribeTable("stateful sets scale up / down", func(testSetup func(int), instanceDelta int) {
const scaleTimeout = createTimeout * 6

testSetup(instanceDelta)

Eventually(func() (int, error) {
ipPool, err := clientInfo.WbClient.WhereaboutsV1alpha1().IPPools(ipPoolNamespace).Get(
context.TODO(), wbstorage.NormalizeRange(ipv4TestRange), metav1.GetOptions{})
if err != nil {
return -1, err
}

return len(ipPool.Spec.Allocations), nil
}, scaleTimeout).Should(
Equal(initialReplicaNumber), "we should have one allocation for each live pod")
},
table.Entry("scale up then down 5 replicas", func(deltaInstances int) {
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, deltaInstances)).To(Succeed())
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, -deltaInstances)).To(Succeed())
}, 5),
table.Entry("scale up then down 10 replicas", func(deltaInstances int) {
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, deltaInstances)).To(Succeed())
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, -deltaInstances)).To(Succeed())
}, 10),
table.Entry("scale up then down 20 replicas", func(deltaInstances int) {
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, deltaInstances)).To(Succeed())
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, -deltaInstances)).To(Succeed())
}, 20),
table.Entry("scale down then up 5 replicas", func(deltaInstances int) {
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, -deltaInstances)).To(Succeed())
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, deltaInstances)).To(Succeed())
}, 5),
table.Entry("scale down then up 10 replicas", func(deltaInstances int) {
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, -deltaInstances)).To(Succeed())
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, deltaInstances)).To(Succeed())
}, 10),
table.Entry("scale down then up 20 replicas", func(deltaInstances int) {
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, -deltaInstances)).To(Succeed())
Expect(clientInfo.scaleStatefulSet(serviceName, namespace, deltaInstances)).To(Succeed())
}, 20),
)
})
})
})

func allocationForPodRef(podRef string, ipPool v1alpha1.IPPool) *v1alpha1.IPAllocation {
for _, allocation := range ipPool.Spec.Allocations {
if allocation.PodRef == podRef {
return &allocation
}
}
return nil
}

func clusterConfig() (*rest.Config, error) {
const kubeconfig = "KUBECONFIG"

Expand Down Expand Up @@ -277,11 +375,11 @@ func (c *ClientInfo) deletePod(pod *core.Pod) error {
return nil
}

func (c *ClientInfo) provisionStatefulSet(statefulSetName string, namespace string, serviceName string, replicas int) (*v1.StatefulSet, error) {
func (c *ClientInfo) provisionStatefulSet(statefulSetName string, namespace string, serviceName string, replicas int, networkNames ...string) (*v1.StatefulSet, error) {
const statefulSetCreateTimeout = 6 * createTimeout
statefulSet, err := c.Client.AppsV1().StatefulSets(namespace).Create(
context.TODO(),
statefulSetSpec(statefulSetName, serviceName, replicas, podNetworkSelectionElements(testNetworkName)),
statefulSetSpec(statefulSetName, serviceName, replicas, podNetworkSelectionElements(networkNames...)),
metav1.CreateOptions{})
if err != nil {
return nil, err
Expand All @@ -300,14 +398,23 @@ func (c *ClientInfo) provisionStatefulSet(statefulSetName string, namespace stri

func (c *ClientInfo) deleteStatefulSet(namespace string, serviceName string, labelSelector string) error {
const statefulSetDeleteTimeout = 6 * deleteTimeout
rightNow := int64(0)
if err := c.Client.AppsV1().StatefulSets(namespace).Delete(context.TODO(), serviceName, metav1.DeleteOptions{GracePeriodSeconds: &rightNow}); err != nil {

if err := c.Client.AppsV1().StatefulSets(namespace).Delete(
context.TODO(), serviceName, deleteRightNowAndBlockUntilAssociatedPodsAreGone()); err != nil {
return err
}

return WaitForStatefulSetGone(c.Client, namespace, serviceName, labelSelector, statefulSetDeleteTimeout)
}

func deleteRightNowAndBlockUntilAssociatedPodsAreGone() metav1.DeleteOptions {
var (
blockUntilAssociatedPodsAreGone = metav1.DeletePropagationForeground
rightNow = int64(0)
)
return metav1.DeleteOptions{GracePeriodSeconds: &rightNow, PropagationPolicy: &blockUntilAssociatedPodsAreGone}
}

func (c *ClientInfo) scaleStatefulSet(statefulSetName string, namespace string, deltaInstance int) error {
statefulSet, err := c.Client.AppsV1().StatefulSets(namespace).Get(context.TODO(), statefulSetName, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -339,8 +446,8 @@ func generateNetAttachDefSpec(name, namespace, config string) *nettypes.NetworkA
}
}

func macvlanNetworkWithWhereaboutsIPAMNetwork() *nettypes.NetworkAttachmentDefinition {
macvlanConfig := `{
func macvlanNetworkWithWhereaboutsIPAMNetwork(networkName string, namespaceName string, ipRange string) *nettypes.NetworkAttachmentDefinition {
macvlanConfig := fmt.Sprintf(`{
"cniVersion": "0.3.0",
"disableCheck": true,
"plugins": [
Expand All @@ -353,14 +460,14 @@ func macvlanNetworkWithWhereaboutsIPAMNetwork() *nettypes.NetworkAttachmentDefin
"leader_lease_duration": 1500,
"leader_renew_deadline": 1000,
"leader_retry_period": 500,
"range": "10.10.0.0/16",
"range": "%s",
"log_level": "debug",
"log_file": "/tmp/wb"
}
}
]
}`
return generateNetAttachDefSpec(testNetworkName, testNamespace, macvlanConfig)
}`, ipRange)
return generateNetAttachDefSpec(networkName, namespaceName, macvlanConfig)
}

func podObject(podName string, label, annotations map[string]string) *core.Pod {
Expand Down
15 changes: 15 additions & 0 deletions e2e/pod_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ func isStatefulSetRunning(cs *kubernetes.Clientset, serviceName string, namespac
}
}

func isStatefulSetRecovering(cs *kubernetes.Clientset, serviceName string, namespace string, expectedReplicas int) wait.ConditionFunc {
return func() (bool, error) {
statefulSet, err := cs.AppsV1().StatefulSets(namespace).Get(context.Background(), serviceName, metav1.GetOptions{})
if err != nil {
return false, err
}

return statefulSet.Status.ReadyReplicas < int32(expectedReplicas), nil
}
}

func isStatefulSetGone(cs *kubernetes.Clientset, serviceName string, namespace string, labelSelector string) wait.ConditionFunc {
return func() (done bool, err error) {
statefulSet, err := cs.AppsV1().StatefulSets(namespace).Get(context.Background(), serviceName, metav1.GetOptions{})
Expand Down Expand Up @@ -138,6 +149,10 @@ func WaitForStatefulSetReady(cs *kubernetes.Clientset, namespace, serviceName st
return wait.PollImmediate(time.Second, timeout, isStatefulSetRunning(cs, serviceName, namespace, expectedReplicas))
}

func WaitForStatefulSet(timeout time.Duration, conditionFunc wait.ConditionFunc) error {
return wait.PollImmediate(time.Second, timeout, conditionFunc)
}

// WaitForStatefulSetGone ...
func WaitForStatefulSetGone(cs *kubernetes.Clientset, namespace, serviceName string, labelSelector string, timeout time.Duration) error {
return wait.PollImmediate(time.Second, timeout, isStatefulSetGone(cs, serviceName, namespace, labelSelector))
Expand Down

0 comments on commit aa764a3

Please sign in to comment.