Skip to content

Commit

Permalink
more refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
dtomcej committed May 11, 2020
1 parent 80f3d87 commit 0d53e2d
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 49 deletions.
1 change: 1 addition & 0 deletions pkg/controller/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (t portMapperMock) Remove(svc k8s.ServiceWithPort) (int32, error) {
return t.removeFunc(svc)
}

//nolint:gocognit // This returns a false positive due to the portmapper funcs defined in the test loop.
func TestShadowServiceManager_Create(t *testing.T) {
tests := []struct {
name string
Expand Down
111 changes: 62 additions & 49 deletions pkg/topology/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,24 +223,7 @@ func (b *Builder) evaluateTrafficSplit(topology *Topology, trafficSplit *split.T

// As required by the SMI specification, backends must expose at least the same ports as the Service on
// which the TrafficSplit is.
for _, svcPort := range svc.Ports {
var portFound bool

for _, backendPort := range backendSvc.Ports {
if svcPort.Port == backendPort.Port {
portFound = true
break
}
}

if !portFound {
err := fmt.Errorf("port %d must be exposed by Service %q in order to be used as a backend", svcPort.Port, backendSvcKey)
setTrafficSplitWithErr(topology, trafficSplit, svcKey, err)
b.Logger.Errorf("Error building topology for TrafficSplit %q: %v", tsKey, err)

return
}
}
b.validateServiceAndBackendPorts(svc.Ports, backendSvc.Ports, topology, trafficSplit, svcKey, tsKey, backendSvcKey)

backends[i] = TrafficSplitBackend{
Weight: backend.Weight,
Expand All @@ -260,6 +243,27 @@ func (b *Builder) evaluateTrafficSplit(topology *Topology, trafficSplit *split.T
svc.TrafficSplits = append(svc.TrafficSplits, tsKey)
}

func (b *Builder) validateServiceAndBackendPorts(svcPorts []corev1.ServicePort, backendPorts []corev1.ServicePort, topology *Topology, trafficSplit *split.TrafficSplit, svcKey Key, tsKey Key, backendSvcKey Key) {
for _, svcPort := range svcPorts {
var portFound bool

for _, backendPort := range backendPorts {
if svcPort.Port == backendPort.Port {
portFound = true
break
}
}

if !portFound {
err := fmt.Errorf("port %d must be exposed by Service %q in order to be used as a backend", svcPort.Port, backendSvcKey)
setTrafficSplitWithErr(topology, trafficSplit, svcKey, err)
b.Logger.Errorf("Error building topology for TrafficSplit %q: %v", tsKey, err)

return
}
}
}

func setTrafficSplitWithErr(topology *Topology, trafficSplit *split.TrafficSplit, svcKey Key, err error) {
ts := &TrafficSplit{
Name: trafficSplit.Name,
Expand Down Expand Up @@ -349,20 +353,7 @@ func (b *Builder) getIncomingPodsForService(topology *Topology, svcKey Key, visi
}

if len(svc.TrafficSplits) == 0 {
var pods []Key

for _, ttKey := range svc.TrafficTargets {
tt, ok := topology.ServiceTrafficTargets[ttKey]
if !ok {
return nil, fmt.Errorf("unable to find TrafficTarget %q", ttKey)
}

for _, source := range tt.Sources {
pods = append(pods, source.Pods...)
}
}

return pods, nil
return getPodsForServiceWithNoTrafficSplits(topology, svc)
}

for _, tsKey := range svc.TrafficSplits {
Expand All @@ -386,6 +377,23 @@ func (b *Builder) getIncomingPodsForService(topology *Topology, svcKey Key, visi
return union, nil
}

func getPodsForServiceWithNoTrafficSplits(topology *Topology, svc *Service) ([]Key, error) {
var pods []Key

for _, ttKey := range svc.TrafficTargets {
tt, ok := topology.ServiceTrafficTargets[ttKey]
if !ok {
return nil, fmt.Errorf("unable to find TrafficTarget %q", ttKey)
}

for _, source := range tt.Sources {
pods = append(pods, source.Pods...)
}
}

return pods, nil
}

// unionPod returns the union of the given two slices.
func unionPod(pods1, pods2 []Key) []Key {
var union []Key
Expand Down Expand Up @@ -701,30 +709,35 @@ func (r *resources) indexPodsByService(ignoredResources mk8s.IgnoreWrapper, eps

for _, subset := range ep.Subsets {
for _, address := range subset.Addresses {
if address.TargetRef == nil {
continue
}
r.indexPodByService(ep, address, podsByName)
}
}
}
}

keyPod := Key{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace}
func (r *resources) indexPodByService(ep *corev1.Endpoints, address corev1.EndpointAddress, podsByName map[Key]*corev1.Pod) {
if address.TargetRef == nil {
return
}

pod, ok := podsByName[keyPod]
if !ok {
continue
}
keyPod := Key{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace}

keySA := Key{Name: pod.Spec.ServiceAccountName, Namespace: pod.Namespace}
keyEP := Key{Name: ep.Name, Namespace: ep.Namespace}
pod, ok := podsByName[keyPod]
if !ok {
return
}

if _, ok := r.PodsBySvcBySa[keySA]; !ok {
r.PodsBySvcBySa[keySA] = make(map[Key][]*corev1.Pod)
}
keySA := Key{Name: pod.Spec.ServiceAccountName, Namespace: pod.Namespace}
keyEP := Key{Name: ep.Name, Namespace: ep.Namespace}

r.PodsBySvcBySa[keySA][keyEP] = append(r.PodsBySvcBySa[keySA][keyEP], pod)
r.PodsBySvc[keyEP] = append(r.PodsBySvc[keyEP], pod)
}
}
if _, ok := r.PodsBySvcBySa[keySA]; !ok {
r.PodsBySvcBySa[keySA] = make(map[Key][]*corev1.Pod)
}

r.PodsBySvcBySa[keySA][keyEP] = append(r.PodsBySvcBySa[keySA][keyEP], pod)
r.PodsBySvc[keyEP] = append(r.PodsBySvc[keyEP], pod)
}

func (r *resources) indexSMIResources(ignoredResources mk8s.IgnoreWrapper, tts []*access.TrafficTarget, tss []*split.TrafficSplit, tcpRts []*spec.TCPRoute, httpRtGrps []*spec.HTTPRouteGroup) {
for _, httpRouteGroup := range httpRtGrps {
if ignoredResources.IsIgnored(httpRouteGroup.ObjectMeta) {
Expand Down

0 comments on commit 0d53e2d

Please sign in to comment.