feat: support topology aware hints (#9165)

* support topology aware hints

Signed-off-by: tombokombo <tombo@sysart.tech>

* add flag to enable topology and fixes

Signed-off-by: tombokombo <tombo@sysart.tech>

* update readme

Signed-off-by: tombokombo <tombo@sysart.tech>

* add e2e test

Signed-off-by: tombokombo <tombo@sysart.tech>

* isolate topology test

Signed-off-by: tombokombo <tombo@sysart.tech>

* gofmt fix

Signed-off-by: tombokombo <tombo@sysart.tech>

Signed-off-by: tombokombo <tombo@sysart.tech>
This commit is contained in:
Tomas Hulata 2023-01-16 03:46:50 +01:00 committed by GitHub
parent ada114315e
commit 5b2a9475dc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 564 additions and 18 deletions

View file

@ -53,6 +53,7 @@ const (
defUpstreamName = "upstream-default-backend"
defServerName = "_"
rootLocation = "/"
emptyZone = ""
)
// Configuration contains all the settings required by an Ingress controller
@ -131,6 +132,21 @@ type Configuration struct {
DynamicConfigurationRetries int
DisableSyncEvents bool
EnableTopologyAwareRouting bool
}
func getIngressPodZone(svc *apiv1.Service) string {
svcKey := k8s.MetaNamespaceKey(svc)
if svcZoneAnnotation, ok := svc.ObjectMeta.GetAnnotations()[apiv1.AnnotationTopologyAwareHints]; ok {
if strings.ToLower(svcZoneAnnotation) == "auto" {
if foundZone, ok := k8s.IngressNodeDetails.GetLabels()[apiv1.LabelTopologyZone]; ok {
klog.V(3).Infof("Svc has topology aware annotation enabled, try to use zone %q where controller pod is running for Service %q ", foundZone, svcKey)
return foundZone
}
}
}
return emptyZone
}
// GetPublishService returns the Service used to set the load-balancer status of Ingresses.
@ -429,6 +445,13 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
var endps []ingress.Endpoint
/* #nosec */
targetPort, err := strconv.Atoi(svcPort) // #nosec
var zone string
if n.cfg.EnableTopologyAwareRouting {
zone = getIngressPodZone(svc)
} else {
zone = emptyZone
}
if err != nil {
// not a port number, fall back to using port name
klog.V(3).Infof("Searching Endpoints with %v port name %q for Service %q", proto, svcPort, nsName)
@ -436,7 +459,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
sp := svc.Spec.Ports[i]
if sp.Name == svcPort {
if sp.Protocol == proto {
endps = getEndpointsFromSlices(svc, &sp, proto, n.store.GetServiceEndpointsSlices)
endps = getEndpointsFromSlices(svc, &sp, proto, zone, n.store.GetServiceEndpointsSlices)
break
}
}
@ -447,7 +470,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
sp := svc.Spec.Ports[i]
if sp.Port == int32(targetPort) {
if sp.Protocol == proto {
endps = getEndpointsFromSlices(svc, &sp, proto, n.store.GetServiceEndpointsSlices)
endps = getEndpointsFromSlices(svc, &sp, proto, zone, n.store.GetServiceEndpointsSlices)
break
}
}
@ -498,8 +521,13 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint())
return upstream
}
endps := getEndpointsFromSlices(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices)
var zone string
if n.cfg.EnableTopologyAwareRouting {
zone = getIngressPodZone(svc)
} else {
zone = emptyZone
}
endps := getEndpointsFromSlices(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices)
if len(endps) == 0 {
klog.Warningf("Service %q does not have any active Endpoint", svcKey)
endps = []ingress.Endpoint{n.DefaultEndpoint()}
@ -827,7 +855,13 @@ func (n *NGINXController) getBackendServers(ingresses []*ingress.Ingress) ([]*in
}
sp := location.DefaultBackend.Spec.Ports[0]
endps := getEndpointsFromSlices(location.DefaultBackend, &sp, apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices)
var zone string
if n.cfg.EnableTopologyAwareRouting {
zone = getIngressPodZone(location.DefaultBackend)
} else {
zone = emptyZone
}
endps := getEndpointsFromSlices(location.DefaultBackend, &sp, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices)
// custom backend is valid only if contains at least one endpoint
if len(endps) > 0 {
name := fmt.Sprintf("custom-default-backend-%v-%v", location.DefaultBackend.GetNamespace(), location.DefaultBackend.GetName())
@ -1083,7 +1117,12 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres
if err != nil {
return upstreams, err
}
var zone string
if n.cfg.EnableTopologyAwareRouting {
zone = getIngressPodZone(svc)
} else {
zone = emptyZone
}
klog.V(3).Infof("Obtaining ports information for Service %q", svcKey)
// Ingress with an ExternalName Service and no port defined for that Service
if svc.Spec.Type == apiv1.ServiceTypeExternalName {
@ -1092,7 +1131,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres
return upstreams, nil
}
servicePort := externalNamePorts(backendPort, svc)
endps := getEndpointsFromSlices(svc, servicePort, apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices)
endps := getEndpointsFromSlices(svc, servicePort, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices)
if len(endps) == 0 {
klog.Warningf("Service %q does not have any active Endpoint.", svcKey)
return upstreams, nil
@ -1109,7 +1148,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres
servicePort.TargetPort.String() == backendPort ||
servicePort.Name == backendPort {
endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices)
endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices)
if len(endps) == 0 {
klog.Warningf("Service %q does not have any active Endpoint.", svcKey)
}