Merge pull request #981 from chrismoos/service_upstream

Add annotation to allow use of service ClusterIP for NGINX upstream.
This commit is contained in:
Manuel Alejandro de Brito Fontes 2017-07-19 12:20:30 -04:00 committed by GitHub
commit fbb96f4c83
5 changed files with 227 additions and 10 deletions

View file

@ -31,6 +31,7 @@ import (
"k8s.io/ingress/core/pkg/ingress/annotations/ratelimit"
"k8s.io/ingress/core/pkg/ingress/annotations/rewrite"
"k8s.io/ingress/core/pkg/ingress/annotations/secureupstream"
"k8s.io/ingress/core/pkg/ingress/annotations/serviceupstream"
"k8s.io/ingress/core/pkg/ingress/annotations/sessionaffinity"
"k8s.io/ingress/core/pkg/ingress/annotations/snippet"
"k8s.io/ingress/core/pkg/ingress/annotations/sslpassthrough"
@ -64,6 +65,7 @@ func newAnnotationExtractor(cfg extractorConfig) annotationExtractor {
"RateLimit": ratelimit.NewParser(),
"Redirect": rewrite.NewParser(cfg),
"SecureUpstream": secureupstream.NewParser(cfg),
"ServiceUpstream": serviceupstream.NewParser(),
"SessionAffinity": sessionaffinity.NewParser(),
"SSLPassthrough": sslpassthrough.NewParser(),
"ConfigurationSnippet": snippet.NewParser(),
@ -104,8 +106,14 @@ const (
healthCheck = "HealthCheck"
sslPassthrough = "SSLPassthrough"
sessionAffinity = "SessionAffinity"
serviceUpstream = "ServiceUpstream"
)
func (e *annotationExtractor) ServiceUpstream(ing *extensions.Ingress) bool {
val, _ := e.annotations[serviceUpstream].Parse(ing)
return val.(bool)
}
func (e *annotationExtractor) SecureUpstream(ing *extensions.Ingress) *secureupstream.Secure {
val, err := e.annotations[secureUpstream].Parse(ing)
if err != nil {

View file

@ -782,6 +782,7 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
secUpstream := ic.annotations.SecureUpstream(ing)
hz := ic.annotations.HealthCheck(ing)
serviceUpstream := ic.annotations.ServiceUpstream(ing)
var defBackend string
if ing.Spec.Backend != nil {
@ -792,13 +793,27 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
glog.V(3).Infof("creating upstream %v", defBackend)
upstreams[defBackend] = newUpstream(defBackend)
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName)
endps, err := ic.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), hz)
upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...)
if err != nil {
glog.Warningf("error creating upstream %v: %v", defBackend, err)
// Add the service cluster endpoint as the upstream instead of individual endpoints
// if the serviceUpstream annotation is enabled
if serviceUpstream {
endpoint, err := ic.getServiceClusterEndpoint(svcKey, ing.Spec.Backend)
if err != nil {
glog.Errorf("Failed to get service cluster endpoint for service %s: %v", svcKey, err)
} else {
upstreams[defBackend].Endpoints = []ingress.Endpoint{endpoint}
}
}
if len(upstreams[defBackend].Endpoints) == 0 {
endps, err := ic.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), hz)
upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...)
if err != nil {
glog.Warningf("error creating upstream %v: %v", defBackend, err)
}
}
}
for _, rule := range ing.Spec.Rules {
@ -827,12 +842,26 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
}
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName)
endp, err := ic.serviceEndpoints(svcKey, path.Backend.ServicePort.String(), hz)
if err != nil {
glog.Warningf("error obtaining service endpoints: %v", err)
continue
// Add the service cluster endpoint as the upstream instead of individual endpoints
// if the serviceUpstream annotation is enabled
if serviceUpstream {
endpoint, err := ic.getServiceClusterEndpoint(svcKey, &path.Backend)
if err != nil {
glog.Errorf("Failed to get service cluster endpoint for service %s: %v", svcKey, err)
} else {
upstreams[name].Endpoints = []ingress.Endpoint{endpoint}
}
}
if len(upstreams[name].Endpoints) == 0 {
endp, err := ic.serviceEndpoints(svcKey, path.Backend.ServicePort.String(), hz)
if err != nil {
glog.Warningf("error obtaining service endpoints: %v", err)
continue
}
upstreams[name].Endpoints = endp
}
upstreams[name].Endpoints = endp
s, exists, err := ic.svcLister.Store.GetByKey(svcKey)
if err != nil {
@ -853,6 +882,24 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
return upstreams
}
func (ic *GenericController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) {
svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey)
if !svcExists {
return endpoint, fmt.Errorf("service %v does not exist", svcKey)
}
svc := svcObj.(*api.Service)
if svc.Spec.ClusterIP == "" {
return endpoint, fmt.Errorf("No ClusterIP found for service %s", svcKey)
}
endpoint.Address = svc.Spec.ClusterIP
endpoint.Port = backend.ServicePort.String()
return endpoint, err
}
// serviceEndpoints returns the upstream servers (endpoints) associated
// to a service.
func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,