Refactor annotations

This commit is contained in:
Manuel de Brito Fontes 2017-11-07 13:36:51 -03:00
parent f215828b1b
commit fb33c58d18
33 changed files with 370 additions and 401 deletions

View file

@ -37,6 +37,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/ingress-nginx/pkg/ingress"
"k8s.io/ingress-nginx/pkg/ingress/annotations"
"k8s.io/ingress-nginx/pkg/ingress/annotations/class"
"k8s.io/ingress-nginx/pkg/ingress/annotations/healthcheck"
"k8s.io/ingress-nginx/pkg/ingress/annotations/parser"
@ -316,7 +317,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
for _, sp := range svc.Spec.Ports {
if sp.Name == svcPort {
if sp.Protocol == proto {
endps = n.getEndpoints(svc, &sp, proto, &healthcheck.Upstream{})
endps = n.getEndpoints(svc, &sp, proto, &healthcheck.Config{})
break
}
}
@ -327,7 +328,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
for _, sp := range svc.Spec.Ports {
if sp.Port == int32(targetPort) {
if sp.Protocol == proto {
endps = n.getEndpoints(svc, &sp, proto, &healthcheck.Upstream{})
endps = n.getEndpoints(svc, &sp, proto, &healthcheck.Config{})
break
}
}
@ -379,7 +380,7 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
}
svc := svcObj.(*apiv1.Service)
endps := n.getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Upstream{})
endps := n.getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Config{})
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
endps = []ingress.Endpoint{n.DefaultEndpoint()}
@ -398,8 +399,7 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
servers := n.createServers(ingresses, upstreams, du)
for _, ing := range ingresses {
affinity := n.annotations.SessionAffinity(ing)
anns := n.annotations.Extract(ing)
anns := n.getIngressAnnotations(ing)
for _, rule := range ing.Spec.Rules {
host := rule.Host
@ -418,13 +418,11 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
}
if server.CertificateAuth.CAFileName == "" {
ca := n.annotations.CertificateAuth(ing)
if ca != nil {
server.CertificateAuth = *ca
// It is possible that no CAFileName is found in the secret
if server.CertificateAuth.CAFileName == "" {
glog.V(3).Infof("secret %v does not contain 'ca.crt', mutual authentication not enabled - ingress rule %v/%v.", server.CertificateAuth.Secret, ing.Namespace, ing.Name)
}
server.CertificateAuth = anns.CertificateAuth
// It is possible that no CAFileName is found in the secret
if server.CertificateAuth.CAFileName == "" {
glog.V(3).Infof("secret %v does not contain 'ca.crt', mutual authentication not enabled - ingress rule %v/%v.", server.CertificateAuth.Secret, ing.Namespace, ing.Name)
}
} else {
glog.V(3).Infof("server %v already contains a mutual authentication configuration - ingress rule %v/%v", server.Hostname, ing.Namespace, ing.Name)
@ -461,7 +459,19 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
loc.Port = ups.Port
loc.Service = ups.Service
loc.Ingress = ing
mergeLocationAnnotations(loc, anns)
loc.BasicDigestAuth = anns.BasicDigestAuth
loc.ClientBodyBufferSize = anns.ClientBodyBufferSize
loc.ConfigurationSnippet = anns.ConfigurationSnippet
loc.CorsConfig = anns.CorsConfig
loc.ExternalAuth = anns.ExternalAuth
loc.Proxy = anns.Proxy
loc.RateLimit = anns.RateLimit
loc.Redirect = anns.Redirect
loc.Rewrite = anns.Rewrite
loc.UpstreamVhost = anns.UpstreamVhost
loc.VtsFilterKey = anns.VtsFilterKey
loc.Whitelist = anns.Whitelist
if loc.Redirect.FromToWWW {
server.RedirectFromToWWW = true
}
@ -472,14 +482,26 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
if addLoc {
glog.V(3).Infof("adding location %v in ingress rule %v/%v upstream %v", nginxPath, ing.Namespace, ing.Name, ups.Name)
loc := &ingress.Location{
Path: nginxPath,
Backend: ups.Name,
IsDefBackend: false,
Service: ups.Service,
Port: ups.Port,
Ingress: ing,
Path: nginxPath,
Backend: ups.Name,
IsDefBackend: false,
Service: ups.Service,
Port: ups.Port,
Ingress: ing,
BasicDigestAuth: anns.BasicDigestAuth,
ClientBodyBufferSize: anns.ClientBodyBufferSize,
ConfigurationSnippet: anns.ConfigurationSnippet,
CorsConfig: anns.CorsConfig,
ExternalAuth: anns.ExternalAuth,
Proxy: anns.Proxy,
RateLimit: anns.RateLimit,
Redirect: anns.Redirect,
Rewrite: anns.Rewrite,
UpstreamVhost: anns.UpstreamVhost,
VtsFilterKey: anns.VtsFilterKey,
Whitelist: anns.Whitelist,
}
mergeLocationAnnotations(loc, anns)
if loc.Redirect.FromToWWW {
server.RedirectFromToWWW = true
}
@ -487,12 +509,12 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
}
if ups.SessionAffinity.AffinityType == "" {
ups.SessionAffinity.AffinityType = affinity.AffinityType
ups.SessionAffinity.AffinityType = anns.SessionAffinity.Type
}
if affinity.AffinityType == "cookie" {
ups.SessionAffinity.CookieSessionAffinity.Name = affinity.CookieConfig.Name
ups.SessionAffinity.CookieSessionAffinity.Hash = affinity.CookieConfig.Hash
if anns.SessionAffinity.Type == "cookie" {
ups.SessionAffinity.CookieSessionAffinity.Name = anns.SessionAffinity.Cookie.Name
ups.SessionAffinity.CookieSessionAffinity.Hash = anns.SessionAffinity.Cookie.Hash
locs := ups.SessionAffinity.CookieSessionAffinity.Locations
if _, ok := locs[host]; !ok {
@ -519,7 +541,7 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
// check if the location contains endpoints and a custom default backend
if location.DefaultBackend != nil {
sp := location.DefaultBackend.Spec.Ports[0]
endps := n.getEndpoints(location.DefaultBackend, &sp, apiv1.ProtocolTCP, &healthcheck.Upstream{})
endps := n.getEndpoints(location.DefaultBackend, &sp, apiv1.ProtocolTCP, &healthcheck.Config{})
if len(endps) > 0 {
glog.V(3).Infof("using custom default backend in server %v location %v (service %v/%v)",
server.Hostname, location.Path, location.DefaultBackend.Namespace, location.DefaultBackend.Name)
@ -617,10 +639,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
upstreams[defUpstreamName] = du
for _, ing := range data {
secUpstream := n.annotations.SecureUpstream(ing)
hz := n.annotations.HealthCheck(ing)
serviceUpstream := n.annotations.ServiceUpstream(ing)
upstreamHashBy := n.annotations.UpstreamHashBy(ing)
anns := n.getIngressAnnotations(ing)
var defBackend string
if ing.Spec.Backend != nil {
@ -635,7 +654,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
// Add the service cluster endpoint as the upstream instead of individual endpoints
// if the serviceUpstream annotation is enabled
if serviceUpstream {
if anns.ServiceUpstream {
endpoint, err := n.getServiceClusterEndpoint(svcKey, ing.Spec.Backend)
if err != nil {
glog.Errorf("Failed to get service cluster endpoint for service %s: %v", svcKey, err)
@ -645,7 +664,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
}
if len(upstreams[defBackend].Endpoints) == 0 {
endps, err := n.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), hz)
endps, err := n.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), &anns.HealthCheck)
upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...)
if err != nil {
glog.Warningf("error creating upstream %v: %v", defBackend, err)
@ -674,22 +693,22 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
upstreams[name].Port = path.Backend.ServicePort
if !upstreams[name].Secure {
upstreams[name].Secure = secUpstream.Secure
upstreams[name].Secure = anns.SecureUpstream.Secure
}
if upstreams[name].SecureCACert.Secret == "" {
upstreams[name].SecureCACert = secUpstream.CACert
upstreams[name].SecureCACert = anns.SecureUpstream.CACert
}
if upstreams[name].UpstreamHashBy == "" {
upstreams[name].UpstreamHashBy = upstreamHashBy
upstreams[name].UpstreamHashBy = anns.UpstreamHashBy
}
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName)
// Add the service cluster endpoint as the upstream instead of individual endpoints
// if the serviceUpstream annotation is enabled
if serviceUpstream {
if anns.ServiceUpstream {
endpoint, err := n.getServiceClusterEndpoint(svcKey, &path.Backend)
if err != nil {
glog.Errorf("failed to get service cluster endpoint for service %s: %v", svcKey, err)
@ -699,7 +718,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
}
if len(upstreams[name].Endpoints) == 0 {
endp, err := n.serviceEndpoints(svcKey, path.Backend.ServicePort.String(), hz)
endp, err := n.serviceEndpoints(svcKey, path.Backend.ServicePort.String(), &anns.HealthCheck)
if err != nil {
glog.Warningf("error obtaining service endpoints: %v", err)
continue
@ -759,7 +778,7 @@ func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *exte
// serviceEndpoints returns the upstream servers (endpoints) associated
// to a service.
func (n *NGINXController) serviceEndpoints(svcKey, backendPort string,
hz *healthcheck.Upstream) ([]ingress.Endpoint, error) {
hz *healthcheck.Config) ([]ingress.Endpoint, error) {
svc, err := n.listers.Service.GetByName(svcKey)
var upstreams []ingress.Endpoint
@ -843,7 +862,7 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
aliases := make(map[string]string, len(data))
bdef := n.GetDefaultBackend()
ngxProxy := proxy.Configuration{
ngxProxy := proxy.Config{
BodySize: bdef.ProxyBodySize,
ConnectTimeout: bdef.ProxyConnectTimeout,
SendTimeout: bdef.ProxySendTimeout,
@ -884,9 +903,7 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
// initialize all the servers
for _, ing := range data {
// check if ssl passthrough is configured
sslpt := n.annotations.SSLPassthrough(ing)
anns := n.getIngressAnnotations(ing)
// default upstream server
un := du.Name
@ -930,16 +947,14 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
Service: &apiv1.Service{},
},
},
SSLPassthrough: sslpt,
SSLPassthrough: anns.SSLPassthrough,
}
}
}
// configure default location, alias, and SSL
for _, ing := range data {
// setup server-alias based on annotations
aliasAnnotation := n.annotations.Alias(ing)
srvsnippet := n.annotations.ServerSnippet(ing)
anns := n.getIngressAnnotations(ing)
for _, rule := range ing.Spec.Rules {
host := rule.Host
@ -948,11 +963,11 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
}
// setup server aliases
if aliasAnnotation != "" {
if anns.Alias != "" {
if servers[host].Alias == "" {
servers[host].Alias = aliasAnnotation
if _, ok := aliases[aliasAnnotation]; !ok {
aliases[aliasAnnotation] = host
servers[host].Alias = anns.Alias
if _, ok := aliases["Alias"]; !ok {
aliases["Alias"] = host
}
} else {
glog.Warningf("ingress %v/%v for host %v contains an Alias but one has already been configured.",
@ -961,14 +976,14 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
}
//notifying the user that it has already been configured.
if servers[host].ServerSnippet != "" && srvsnippet != "" {
if servers[host].ServerSnippet != "" && anns.ServerSnippet != "" {
glog.Warningf("ingress %v/%v for host %v contains a Server Snippet section that it has already been configured.",
ing.Namespace, ing.Name, host)
}
// only add a server snippet if the server does not have one previously configured
if servers[host].ServerSnippet == "" && srvsnippet != "" {
servers[host].ServerSnippet = srvsnippet
if servers[host].ServerSnippet == "" && anns.ServerSnippet != "" {
servers[host].ServerSnippet = anns.ServerSnippet
}
// only add a certificate if the server does not have one previously configured
@ -1044,7 +1059,7 @@ func (n *NGINXController) getEndpoints(
s *apiv1.Service,
servicePort *apiv1.ServicePort,
proto apiv1.Protocol,
hz *healthcheck.Upstream) []ingress.Endpoint {
hz *healthcheck.Config) []ingress.Endpoint {
upsServers := []ingress.Endpoint{}
@ -1152,6 +1167,7 @@ func (n *NGINXController) isForceReload() bool {
return atomic.LoadInt32(&n.forceReload) != 0
}
// SetForceReload sets if the ingress controller should be reloaded or not
func (n *NGINXController) SetForceReload(shouldReload bool) {
if shouldReload {
atomic.StoreInt32(&n.forceReload, 1)
@ -1160,3 +1176,24 @@ func (n *NGINXController) SetForceReload(shouldReload bool) {
atomic.StoreInt32(&n.forceReload, 0)
}
}
func (n *NGINXController) extractAnnotations(ing *extensions.Ingress) {
anns := n.annotations.Extract(ing)
glog.V(3).Infof("updating annotations information for ingres %v/%v", anns.Namespace, anns.Name)
n.listers.IngressAnnotation.Update(anns)
}
// getByIngress returns the parsed annotations from an Ingress
func (n *NGINXController) getIngressAnnotations(ing *extensions.Ingress) *annotations.Ingress {
key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name)
item, exists, err := n.listers.IngressAnnotation.GetByKey(key)
if err != nil {
glog.Errorf("unexpected error getting ingress annotation %v: %v", key, err)
return &annotations.Ingress{}
}
if !exists {
glog.Errorf("ingress annotation %v was not found", key)
return &annotations.Ingress{}
}
return item.(*annotations.Ingress)
}