Skip to content

Commit

Permalink
feat: add support for metallb shared addresses
Browse files Browse the repository at this point in the history
Add support for shared IP addresses with MetalLB in the CRD
configuration.

Signed-off-by: Tim Jones <[email protected]>
  • Loading branch information
TimJones committed Nov 8, 2024
1 parent 0de9ce3 commit 4a88d59
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 38 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,6 @@ If `MetalLB` management is enabled, then CCM does the following.
- If there is no other service, delete all CCM managed `bgpeers` and the default `bgpadvertisement`
- delete the Elastic IP reservation from Equinix Metal

**NOTE:** [IP Address sharing](https://metallb.universe.tf/usage/#ip-address-sharing) is not yet supported in Cloud Provider Equinix Metal.

CCM itself does **not** install/deploy the load-balancer and it may exists before enable it. This can be deployed by the administrator separately, using the manifest provided in the releases page, or in any other manner. Not having metallb installed but enabled in the CCM configuration will end up allowing you to continue deploying kubernetes services, but the external ip assignment will remain pending, making it useless.

In order to instruct metallb which IPs to announce and from where, CCM takes direct responsibility for managing the
Expand Down
62 changes: 51 additions & 11 deletions metal/loadbalancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strconv"
"strings"

"golang.org/x/exp/slices"
"sigs.k8s.io/cloud-provider-equinix-metal/metal/loadbalancers"
"sigs.k8s.io/cloud-provider-equinix-metal/metal/loadbalancers/emlb"
"sigs.k8s.io/cloud-provider-equinix-metal/metal/loadbalancers/empty"
Expand Down Expand Up @@ -286,17 +287,32 @@ func (l *loadBalancers) EnsureLoadBalancerDeleted(ctx context.Context, clusterNa
klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: no IP reservation found for %s, nothing to delete", svcName)
return nil
}

// remove it from any implementation-specific parts
svcIPCidr = fmt.Sprintf("%s/%d", ipReservation.GetAddress(), ipReservation.GetCidr())
klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: for %s entry %s", svcName, svcIPCidr)

if err := l.implementor.RemoveService(ctx, service.Namespace, service.Name, svcIPCidr, service); err != nil {
if errors.Is(err, metallb.ErrIPStillInUse) {
// IP is still in use by another service, just remove this service tag
klog.V(2).Info("EnsureLoadBalancerDeleted(): remove: not removing IP, still in use")
tags := slices.DeleteFunc(ipReservation.GetTags(), func(s string) bool {
return s == svcTag
})
if _, _, err = l.client.IPAddressesApi.UpdateIPAddress(context.Background(), ipReservation.GetId()).IPAssignmentUpdateInput(metal.IPAssignmentUpdateInput{Tags: tags}).Execute(); err != nil {
return fmt.Errorf("failed to update IP removing old service tag: %w", err)
}
return nil
}
return fmt.Errorf("error removing IP %s: %w", ipReservation.GetAddress(), err)
}

// delete the reservation
klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: for %s EIP ID %s", svcName, ipReservation.GetId())
if _, err := l.client.IPAddressesApi.DeleteIPAddress(context.Background(), ipReservation.GetId()).Execute(); err != nil {
return fmt.Errorf("failed to remove IP address reservation %s from project: %w", ipReservation.GetAddress(), err)
}
// remove it from any implementation-specific parts
svcIPCidr = fmt.Sprintf("%s/%d", ipReservation.GetAddress(), ipReservation.GetCidr())
klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: for %s entry %s", svcName, svcIPCidr)
}

if err := l.implementor.RemoveService(ctx, service.Namespace, service.Name, svcIPCidr, service); err != nil {
} else if err := l.implementor.RemoveService(ctx, service.Namespace, service.Name, svcIPCidr, service); err != nil {
return fmt.Errorf("error removing IP from configmap for %s: %w", svcName, err)
}
klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: removed service %s from implementation", svcName)
Expand Down Expand Up @@ -408,6 +424,8 @@ func (l *loadBalancers) addService(ctx context.Context, svc *v1.Service, nodes [
n []loadbalancers.Node
ips *metal.IPReservationList
)
// our default CIDR for each address is 32
cidr := int32(32)

if l.usesBGP {
// get IP address reservations and check if they any exists for this svc
Expand Down Expand Up @@ -501,14 +519,36 @@ func (l *loadBalancers) addService(ctx context.Context, svc *v1.Service, nodes [
}
klog.V(2).Infof("successfully assigned %s update service %s", svcIP, svcName)
}
// our default CIDR for each address is 32
cidr := int32(32)
if ipReservation != nil {
cidr = ipReservation.GetCidr()
}
svcIPCidr = fmt.Sprintf("%s/%d", svcIP, cidr)
// now need to pass it the nodes
}

svcIPCidr = fmt.Sprintf("%s/%d", svcIP, cidr)
if err = l.implementor.AddService(ctx, svc.Namespace, svc.Name, svcIPCidr, n, svc, nodes, loadBalancerName); err != nil {
return svcIPCidr, err
}

if l.usesBGP {
// Need to ensure the service tag is on the IP for shared IP Services
klog.V(2).Infof("service tag %s not found on IP %s, adding", svcTag, svcIP)
ips, _, err := l.client.IPAddressesApi.FindIPReservations(context.Background(), l.project).Execute()
if err != nil {
return svcIPCidr, fmt.Errorf("failed to list project IPs: %w", err)
}
for _, ip := range ips.GetIpAddresses() {
if *ip.IPReservation.Address == svcIP {
if !slices.Contains(ip.IPReservation.Tags, svcTag) {
tags := append(ip.IPReservation.Tags, svcTag)
if _, _, err = l.client.IPAddressesApi.UpdateIPAddress(context.Background(), *ip.IPReservation.Id).IPAssignmentUpdateInput(metal.IPAssignmentUpdateInput{Tags: tags}).Execute(); err != nil {
return svcIPCidr, fmt.Errorf("failed to update IP with new service tag: %w", err)
}
}
break
}
}

// now need to pass it the nodes
for _, node := range nodes {
// get the node provider ID
id := node.Spec.ProviderID
Expand Down Expand Up @@ -543,7 +583,7 @@ func (l *loadBalancers) addService(ctx context.Context, svc *v1.Service, nodes [
}
}

return svcIPCidr, l.implementor.AddService(ctx, svc.Namespace, svc.Name, svcIPCidr, n, svc, nodes, loadBalancerName)
return svcIPCidr, nil
}

func (l *loadBalancers) retrieveIPByTag(ctx context.Context, svc *v1.Service, tag string) (string, error) {
Expand Down
5 changes: 5 additions & 0 deletions metal/loadbalancers/metallb/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,5 +188,10 @@ func (m *CMConfigurer) RemoveAddressPoolByAddress(ctx context.Context, addr stri
return nil
}

// RemoveFromAddressPool remove service from a pool by name. If the matching pool is not found, do not change anything
func (m *CMConfigurer) RemoveFromAddressPool(ctx context.Context, svcNamespace, svcName string) error {
return nil
}

// RemoveAddressPool remove a pool by name. If the matching pool does not exist, do not change anything
func (m *CMConfigurer) RemoveAddressPool(ctx context.Context, pool string) error { return nil }
133 changes: 111 additions & 22 deletions metal/loadbalancers/metallb/cr.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,22 @@ import (
"strings"

metallbv1beta1 "go.universe.tf/metallb/api/v1beta1"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
defaultBgpAdvertisement = "equinix-metal-bgp-adv"
cpemLabelKey = "cloud-provider"
cpemLabelValue = "equinix-metal"
svcLabelKeyPrefix = "service-"
svcLabelValuePrefix = "namespace-"
defaultBgpAdvertisement = "equinix-metal-bgp-adv"
cpemLabelKey = "cloud-provider"
cpemLabelValue = "equinix-metal"
svcLabelKeyPrefix = "service-"
svcLabelValuePrefix = "namespace-"
svcAnnotationSharedPrefix = "shared-"
metallbAnnotationSharedIP = "metallb.universe.tf/allow-shared-ip" // Not exported as a const from metallb package :(
)

type CRDConfigurer struct {
Expand Down Expand Up @@ -149,36 +153,62 @@ func (m *CRDConfigurer) AddAddressPool(ctx context.Context, add *AddressPool, sv

addIPAddr := convertToIPAddr(*add, m.namespace, svcNamespace, svcName)

svc := corev1.Service{}
if err = m.client.Get(ctx, client.ObjectKey{Namespace: svcNamespace, Name: svcName}, &svc); err != nil {
return false, fmt.Errorf("unable to retrieve service: %w", err)
}

// go through the pools and see if we have one that matches
// - if same service name return false
//
// TODO (ocobleseqx)
// - Metallb allows ip address sharing for services, so we need to find a way to share a pool
// EnsureLoadBalancerDeleted filters ips by service tags, so when ip is specified and already exists
// it must be updted to include the new serviceNamespace/service
for _, o := range olds.Items {
var updateLabels, updateAddresses bool
var updateLabels, updateAddresses, updateAnnotations bool
// if same name check services labels
if o.GetName() == addIPAddr.GetName() {
for k := range o.GetLabels() {
if strings.HasPrefix(k, svcLabelKeyPrefix) {
osvc := strings.TrimPrefix(k, svcLabelKeyPrefix)
if osvc == svcName {
// already exists
// if service label and key matches
klog.V(2).Info("found matching address pool")
if o.Labels[serviceLabelKey(svcName)] == serviceLabelValue(svcNamespace) {
// if is shared and service exsits in shared annotation
if k, ok := svc.Annotations[metallbAnnotationSharedIP]; ok {
if containsSharedService(o.Annotations[sharedAnnotationKey(k)], svcNamespace, svcName) {
// already exists, and in shared annotation
return false, nil
} else {
updateAnnotations = true
}
} else {
// already exists, and not shared
return false, nil
}
}
// if we got here, none matched exactly, update labels
updateLabels = true
}
for _, addr := range addIPAddr.Spec.Addresses {
if slices.Contains(o.Spec.Addresses, addr) {
updateAddresses = true
break

// If we already need to update the annotations, then this is the owning service and it's just adding a shared-ip annotation
if !updateAnnotations {
// Otherwise we need to check that the IP is new or can be shared
for _, addr := range addIPAddr.Spec.Addresses {
if slices.Contains(o.Spec.Addresses, addr) {
klog.V(2).Info("found matching ip in other address pool, checking if it can be shared")
// Check the Service is configured to share the IP
sharedIpKey, ok := svc.Annotations[metallbAnnotationSharedIP]
if !ok {
return false, fmt.Errorf("unable to configure IPAddressPool: requested ip %s already in use and no %s annotation found", addr, metallbAnnotationSharedIP)
}

// Check the shared IP key matches the pool annotation
if _, ok := o.Annotations[sharedAnnotationKey(sharedIpKey)]; !ok {
return false, fmt.Errorf("unable to configure IPAddressPool: requested ip %s already in use and %s annotation does not match", addr, metallbAnnotationSharedIP)
}

updateAnnotations = true
updateAddresses = true
break
}
}
}
if updateLabels || updateAddresses {

if updateLabels || updateAddresses || updateAnnotations {
// update pool
patch := client.MergeFrom(o.DeepCopy())
if updateLabels {
Expand All @@ -189,7 +219,19 @@ func (m *CRDConfigurer) AddAddressPool(ctx context.Context, add *AddressPool, sv
addresses := append(o.Spec.Addresses, addIPAddr.Spec.Addresses...)
slices.Sort(addresses)
o.Spec.Addresses = slices.Compact(addresses)
o.Spec.Addresses = addresses
}
if updateAnnotations {
sharedIpKey := sharedAnnotationKey(svc.Annotations[metallbAnnotationSharedIP])
if sharedSvcs, ok := o.Annotations[sharedIpKey]; !ok {
// Safer way to set annotations in case the annotation map itself is nil
o.SetAnnotations(map[string]string{sharedIpKey: sharedServiceName(svcNamespace, svcName)})
} else {
sharedSvcs := strings.Split(sharedSvcs, ",")
sharedSvcs = append(sharedSvcs, sharedServiceName(svcNamespace, svcName))
slices.Sort(sharedSvcs)
sharedSvcs = slices.Compact(sharedSvcs)
o.Annotations[sharedIpKey] = strings.Join(sharedSvcs, ",")
}
}
err := m.client.Patch(ctx, &o, patch)
if err != nil {
Expand Down Expand Up @@ -236,6 +278,53 @@ func (m *CRDConfigurer) AddAddressPool(ctx context.Context, add *AddressPool, sv
return true, nil
}

// RemoveFromAddressPool removes a service from a pool by name. If the matching pool is not found, do not change anything
func (m *CRDConfigurer) RemoveFromAddressPool(ctx context.Context, svcNamespace, svcName string) error {
if svcNamespace == "" || svcName == "" {
return nil
}

olds, err := m.listIPAddressPools(ctx)
if err != nil {
return err
}

// go through the pools and see if we have a match
pool := poolName(svcNamespace, svcName)
for _, o := range olds.Items {
if slices.ContainsFunc(maps.Keys(o.GetAnnotations()), func(s string) bool {
return strings.HasPrefix(s, svcAnnotationSharedPrefix) && containsSharedService(o.Annotations[s], svcNamespace, svcName)
}) {
// If there are more services sharing this pool, we only need to remove this service from the annotation
for k, v := range o.GetAnnotations() {
if strings.HasPrefix(k, svcAnnotationSharedPrefix) && containsSharedService(v, svcNamespace, svcName) {
svcList := slices.DeleteFunc(strings.Split(v, ","), func(s string) bool {
return s == sharedServiceName(svcNamespace, svcName)
})
if len(svcList) == 0 {
// No other shared services with this key
return m.RemoveAddressPool(ctx, o.GetName())
} else {
patch := client.MergeFrom(o.DeepCopy())
delete(o.Labels, serviceLabelKey(svcName))
o.Annotations[k] = strings.Join(svcList, ",")
if err = m.client.Patch(ctx, &o, patch); err != nil {
return fmt.Errorf("unable to update IPAddressPool %s: %w", o.GetName(), err)
}
// Other Services still use this IP
return ErrIPStillInUse
}

}
}
} else if o.GetName() == pool {
// Not shared, so just delete the pool
return m.RemoveAddressPool(ctx, pool)
}
}
return nil
}

// RemoveAddressPool removes a pool by name. If the matching pool does not exist, do not change anything
func (m *CRDConfigurer) RemoveAddressPool(ctx context.Context, pool string) error {
if pool == "" {
Expand Down
15 changes: 15 additions & 0 deletions metal/loadbalancers/metallb/cr_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package metallb
import (
"fmt"
"regexp"
"slices"
"sort"
"strings"
"time"

metallbv1beta1 "go.universe.tf/metallb/api/v1beta1"
Expand All @@ -24,6 +26,19 @@ func serviceLabelValue(svcNamespace string) string {
return svcLabelValuePrefix + svcNamespace
}

func sharedAnnotationKey(sharedKey string) string {
return svcAnnotationSharedPrefix + sharedKey
}

func sharedServiceName(svcNamespace, svcName string) string {
return fmt.Sprintf("%s.%s", svcNamespace, svcName)
}

func containsSharedService(poolAnnotationValue, svcNamespace, svcName string) bool {
svcList := strings.Split(poolAnnotationValue, ",")
return slices.Contains(svcList, sharedServiceName(svcNamespace, svcName))
}

func convertToIPAddr(addr AddressPool, namespace, svcNamespace, svcName string) metallbv1beta1.IPAddressPool {
ip := metallbv1beta1.IPAddressPool{
Spec: metallbv1beta1.IPAddressPoolSpec{
Expand Down
13 changes: 10 additions & 3 deletions metal/loadbalancers/metallb/metallb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metallb

import (
"context"
"errors"
"fmt"
"net/url"
"strconv"
Expand Down Expand Up @@ -44,6 +45,9 @@ type Configurer interface {
// Returns if anything changed
AddAddressPool(ctx context.Context, add *AddressPool, svcNamespace, svcName string) (bool, error)

// RemoveFromAddressPool remove service from a pool by name. If the matching pool if not found, do not change anything
RemoveFromAddressPool(ctx context.Context, svcNamespace, svcName string) error

// RemoveAddressPool remove a pool by name. If the matching pool does not exist, do not change anything
RemoveAddressPool(ctx context.Context, pool string) error

Expand All @@ -62,6 +66,8 @@ type LB struct {
var (
_ loadbalancers.LB = (*LB)(nil)
crdConfiguration = false

ErrIPStillInUse = errors.New("ip address still in use")
)

// func NewLB(k8sclient kubernetes.Interface, k8sApiextensionsClientset *k8sapiextensionsclient.Clientset, config string) *LB {
Expand Down Expand Up @@ -102,6 +108,7 @@ func NewLB(k8sclient kubernetes.Interface, config string, featureFlags url.Value
if crdConfiguration {
scheme := runtime.NewScheme()
_ = metallbv1beta1.AddToScheme(scheme)
_ = v1.AddToScheme(scheme)
cl, err := client.New(clientconfig.GetConfigOrDie(), client.Options{Scheme: scheme})
if err != nil {
panic(err)
Expand Down Expand Up @@ -267,9 +274,9 @@ func updateIP(ctx context.Context, config Configurer, addr, svcNamespace, svcNam
return fmt.Errorf("error removing IP: %w", err)
}
} else {
if err := config.RemoveAddressPool(ctx, name); err != nil {
klog.V(2).Infof("error removing IPAddressPool: %v", err)
return fmt.Errorf("error removing IPAddressPool: %w", err)
if err := config.RemoveFromAddressPool(ctx, svcNamespace, svcName); err != nil {
klog.V(2).Infof("error removing from IPAddressPool: %v", err)
return fmt.Errorf("error removing from IPAddressPool: %w", err)
}
}
}
Expand Down

0 comments on commit 4a88d59

Please sign in to comment.