Skip to content

Commit

Permalink
refactor Policy controller (#6388)
Browse files Browse the repository at this point in the history
  • Loading branch information
pdabelf5 authored Sep 10, 2024
1 parent 890af8e commit 7c62ede
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 101 deletions.
66 changes: 0 additions & 66 deletions internal/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,14 +548,6 @@ func (nsi *namespacedInformer) addVirtualServerRouteHandler(handlers cache.Resou
nsi.cacheSyncs = append(nsi.cacheSyncs, informer.HasSynced)
}

func (nsi *namespacedInformer) addPolicyHandler(handlers cache.ResourceEventHandlerFuncs) {
informer := nsi.confSharedInformerFactory.K8s().V1().Policies().Informer()
informer.AddEventHandler(handlers)
nsi.policyLister = informer.GetStore()

nsi.cacheSyncs = append(nsi.cacheSyncs, informer.HasSynced)
}

func (lbc *LoadBalancerController) addNamespaceHandler(handlers cache.ResourceEventHandlerFuncs, nsLabel string) {
optionsModifier := func(options *meta_v1.ListOptions) {
options.LabelSelector = nsLabel
Expand Down Expand Up @@ -1172,64 +1164,6 @@ func (lbc *LoadBalancerController) cleanupUnwatchedNamespacedResources(nsi *name
nsi.stop()
}

func (lbc *LoadBalancerController) syncPolicy(task task) {
key := task.Key
var obj interface{}
var polExists bool
var err error

ns, _, _ := cache.SplitMetaNamespaceKey(key)
obj, polExists, err = lbc.getNamespacedInformer(ns).policyLister.GetByKey(key)
if err != nil {
lbc.syncQueue.Requeue(task, err)
return
}

glog.V(2).Infof("Adding, Updating or Deleting Policy: %v\n", key)

if polExists && lbc.HasCorrectIngressClass(obj) {
pol := obj.(*conf_v1.Policy)
err := validation.ValidatePolicy(pol, lbc.isNginxPlus, lbc.enableOIDC, lbc.appProtectEnabled)
if err != nil {
msg := fmt.Sprintf("Policy %v/%v is invalid and was rejected: %v", pol.Namespace, pol.Name, err)
lbc.recorder.Eventf(pol, api_v1.EventTypeWarning, "Rejected", msg)

if lbc.reportCustomResourceStatusEnabled() {
err = lbc.statusUpdater.UpdatePolicyStatus(pol, conf_v1.StateInvalid, "Rejected", msg)
if err != nil {
glog.V(3).Infof("Failed to update policy %s status: %v", key, err)
}
}
} else {
msg := fmt.Sprintf("Policy %v/%v was added or updated", pol.Namespace, pol.Name)
lbc.recorder.Eventf(pol, api_v1.EventTypeNormal, "AddedOrUpdated", msg)

if lbc.reportCustomResourceStatusEnabled() {
err = lbc.statusUpdater.UpdatePolicyStatus(pol, conf_v1.StateValid, "AddedOrUpdated", msg)
if err != nil {
glog.V(3).Infof("Failed to update policy %s status: %v", key, err)
}
}
}
}

// it is safe to ignore the error
namespace, name, _ := ParseNamespaceName(key)

resources := lbc.configuration.FindResourcesForPolicy(namespace, name)
resourceExes := lbc.createExtendedResources(resources)

// Only VirtualServers support policies
if len(resourceExes.VirtualServerExes) == 0 {
return
}

warnings, updateErr := lbc.configurator.AddOrUpdateVirtualServers(resourceExes.VirtualServerExes)
lbc.updateResourcesStatusAndEvents(resources, warnings, updateErr)

// Note: updating the status of a policy based on a reload is not needed.
}

func (lbc *LoadBalancerController) syncVirtualServer(task task) {
key := task.Key
var obj interface{}
Expand Down
35 changes: 0 additions & 35 deletions internal/k8s/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,41 +326,6 @@ func createVirtualServerRouteHandlers(lbc *LoadBalancerController) cache.Resourc
}
}

func createPolicyHandlers(lbc *LoadBalancerController) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pol := obj.(*conf_v1.Policy)
glog.V(3).Infof("Adding Policy: %v", pol.Name)
lbc.AddSyncQueue(pol)
},
DeleteFunc: func(obj interface{}) {
pol, isPol := obj.(*conf_v1.Policy)
if !isPol {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.V(3).Infof("Error received unexpected object: %v", obj)
return
}
pol, ok = deletedState.Obj.(*conf_v1.Policy)
if !ok {
glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-Policy object: %v", deletedState.Obj)
return
}
}
glog.V(3).Infof("Removing Policy: %v", pol.Name)
lbc.AddSyncQueue(pol)
},
UpdateFunc: func(old, cur interface{}) {
curPol := cur.(*conf_v1.Policy)
oldPol := old.(*conf_v1.Policy)
if !reflect.DeepEqual(oldPol.Spec, curPol.Spec) {
glog.V(3).Infof("Policy %v changed, syncing", curPol.Name)
lbc.AddSyncQueue(curPol)
}
},
}
}

// areResourcesDifferent returns true if the resources are different based on their spec.
func areResourcesDifferent(oldresource, resource *unstructured.Unstructured) (bool, error) {
oldSpec, found, err := unstructured.NestedMap(oldresource.Object, "spec")
Expand Down
113 changes: 113 additions & 0 deletions internal/k8s/policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package k8s

import (
"fmt"
"reflect"

"github.com/golang/glog"
conf_v1 "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1"
"github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/validation"
api_v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)

func createPolicyHandlers(lbc *LoadBalancerController) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pol := obj.(*conf_v1.Policy)
glog.V(3).Infof("Adding Policy: %v", pol.Name)
lbc.AddSyncQueue(pol)
},
DeleteFunc: func(obj interface{}) {
pol, isPol := obj.(*conf_v1.Policy)
if !isPol {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.V(3).Infof("Error received unexpected object: %v", obj)
return
}
pol, ok = deletedState.Obj.(*conf_v1.Policy)
if !ok {
glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-Policy object: %v", deletedState.Obj)
return
}
}
glog.V(3).Infof("Removing Policy: %v", pol.Name)
lbc.AddSyncQueue(pol)
},
UpdateFunc: func(old, cur interface{}) {
curPol := cur.(*conf_v1.Policy)
oldPol := old.(*conf_v1.Policy)
if !reflect.DeepEqual(oldPol.Spec, curPol.Spec) {
glog.V(3).Infof("Policy %v changed, syncing", curPol.Name)
lbc.AddSyncQueue(curPol)
}
},
}
}

func (nsi *namespacedInformer) addPolicyHandler(handlers cache.ResourceEventHandlerFuncs) {
informer := nsi.confSharedInformerFactory.K8s().V1().Policies().Informer()
informer.AddEventHandler(handlers) //nolint:errcheck,gosec
nsi.policyLister = informer.GetStore()

nsi.cacheSyncs = append(nsi.cacheSyncs, informer.HasSynced)
}

func (lbc *LoadBalancerController) syncPolicy(task task) {
key := task.Key
var obj interface{}
var polExists bool
var err error

ns, _, _ := cache.SplitMetaNamespaceKey(key)
obj, polExists, err = lbc.getNamespacedInformer(ns).policyLister.GetByKey(key)
if err != nil {
lbc.syncQueue.Requeue(task, err)
return
}

glog.V(2).Infof("Adding, Updating or Deleting Policy: %v\n", key)

if polExists && lbc.HasCorrectIngressClass(obj) {
pol := obj.(*conf_v1.Policy)
err := validation.ValidatePolicy(pol, lbc.isNginxPlus, lbc.enableOIDC, lbc.appProtectEnabled)
if err != nil {
msg := fmt.Sprintf("Policy %v/%v is invalid and was rejected: %v", pol.Namespace, pol.Name, err)
lbc.recorder.Eventf(pol, api_v1.EventTypeWarning, "Rejected", msg)

if lbc.reportCustomResourceStatusEnabled() {
err = lbc.statusUpdater.UpdatePolicyStatus(pol, conf_v1.StateInvalid, "Rejected", msg)
if err != nil {
glog.V(3).Infof("Failed to update policy %s status: %v", key, err)
}
}
} else {
msg := fmt.Sprintf("Policy %v/%v was added or updated", pol.Namespace, pol.Name)
lbc.recorder.Eventf(pol, api_v1.EventTypeNormal, "AddedOrUpdated", msg)

if lbc.reportCustomResourceStatusEnabled() {
err = lbc.statusUpdater.UpdatePolicyStatus(pol, conf_v1.StateValid, "AddedOrUpdated", msg)
if err != nil {
glog.V(3).Infof("Failed to update policy %s status: %v", key, err)
}
}
}
}

// it is safe to ignore the error
namespace, name, _ := ParseNamespaceName(key)

resources := lbc.configuration.FindResourcesForPolicy(namespace, name)
resourceExes := lbc.createExtendedResources(resources)

// Only VirtualServers support policies
if len(resourceExes.VirtualServerExes) == 0 {
return
}

warnings, updateErr := lbc.configurator.AddOrUpdateVirtualServers(resourceExes.VirtualServerExes)
lbc.updateResourcesStatusAndEvents(resources, warnings, updateErr)

// Note: updating the status of a policy based on a reload is not needed.
}

0 comments on commit 7c62ede

Please sign in to comment.