Improve resource usage in nginx controller

This commit is contained in:
Manuel de Brito Fontes 2017-09-18 20:53:26 -03:00
parent 1a68536e29
commit cd288b9993
17 changed files with 388 additions and 794 deletions

View file

@ -67,15 +67,11 @@ func (ic *GenericController) syncSecret(key string) {
// getPemCertificate receives a secret, and creates a ingress.SSLCert as return.
// It parses the secret and verifies if it's a keypair, or a 'ca.crt' secret only.
func (ic *GenericController) getPemCertificate(secretName string) (*ingress.SSLCert, error) {
secretInterface, exists, err := ic.secrLister.Store.GetByKey(secretName)
secret, err := ic.listers.Secret.GetByName(secretName)
if err != nil {
return nil, fmt.Errorf("error retrieving secret %v: %v", secretName, err)
}
if !exists {
return nil, fmt.Errorf("secret named %v does not exist", secretName)
}
secret := secretInterface.(*apiv1.Secret)
cert, okcert := secret.Data[apiv1.TLSCertKey]
key, okkey := secret.Data[apiv1.TLSPrivateKeyKey]

View file

@ -86,6 +86,13 @@ func buildSecrListerForBackendSSL() store.SecretLister {
return secrLister
}
func buildListers() *ingress.StoreLister {
sl := &ingress.StoreLister{}
sl.Ingress.Store = buildIngListenerForBackendSSL()
sl.Secret.Store = buildSecrListerForBackendSSL()
return sl
}
func buildControllerForBackendSSL() cache_client.Controller {
cfg := &cache_client.Config{
Queue: &MockQueue{Synced: true},
@ -99,8 +106,7 @@ func buildGenericControllerForBackendSSL() *GenericController {
cfg: &Configuration{
Client: buildSimpleClientSetForBackendSSL(),
},
ingLister: buildIngListenerForBackendSSL(),
secrLister: buildSecrListerForBackendSSL(),
listers: buildListers(),
ingController: buildControllerForBackendSSL(),
endpController: buildControllerForBackendSSL(),
@ -162,7 +168,7 @@ func TestSyncSecret(t *testing.T) {
secret.SetNamespace("default")
secret.SetName("foo_secret")
secret.Data = foo.Data
ic.secrLister.Add(secret)
ic.listers.Secret.Add(secret)
key := "default/foo_secret"
// for add
@ -209,7 +215,7 @@ func TestGetPemCertificate(t *testing.T) {
ic := buildGenericControllerForBackendSSL()
secret := buildSecretForBackendSSL()
secret.Data = foo.Data
ic.secrLister.Add(secret)
ic.listers.Secret.Add(secret)
sslCert, err := ic.getPemCertificate(foo.secretName)
if foo.eErr {

View file

@ -31,7 +31,6 @@ import (
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
@ -39,7 +38,6 @@ import (
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
fcache "k8s.io/client-go/tools/cache/testing"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
@ -51,7 +49,6 @@ import (
"k8s.io/ingress/core/pkg/ingress/defaults"
"k8s.io/ingress/core/pkg/ingress/resolver"
"k8s.io/ingress/core/pkg/ingress/status"
"k8s.io/ingress/core/pkg/ingress/store"
"k8s.io/ingress/core/pkg/k8s"
"k8s.io/ingress/core/pkg/net/ssl"
local_strings "k8s.io/ingress/core/pkg/strings"
@ -87,12 +84,7 @@ type GenericController struct {
secrController cache.Controller
mapController cache.Controller
ingLister store.IngressLister
svcLister store.ServiceLister
nodeLister store.NodeLister
endpLister store.EndpointLister
secrLister store.SecretLister
mapLister store.ConfigMapLister
listers *ingress.StoreLister
annotations annotationExtractor
@ -119,6 +111,8 @@ type GenericController struct {
runningConfig *ingress.Configuration
forceReload bool
initialSyncDone bool
}
// Configuration contains all the settings required by an Ingress controller
@ -171,177 +165,18 @@ func newIngressController(config *Configuration) *GenericController {
Component: "ingress-controller",
}),
sslCertTracker: newSSLCertTracker(),
listers: &ingress.StoreLister{},
}
ic.syncQueue = task.NewTaskQueue(ic.syncIngress)
// from here to the end of the method all the code is just boilerplate
// required to watch Ingress, Secrets, ConfigMaps and Endoints.
// This is used to detect new content, updates or removals and act accordingly
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !class.IsValid(addIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
a, _ := parser.GetStringAnnotation(class.IngressKey, addIng)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
return
}
ic.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
ic.syncQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
delIng, ok := obj.(*extensions.Ingress)
if !ok {
// If we reached here it means the ingress was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
delIng, ok = tombstone.Obj.(*extensions.Ingress)
if !ok {
glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
return
}
}
if !class.IsValid(delIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
return
}
ic.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
ic.syncQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
oldIng := old.(*extensions.Ingress)
curIng := cur.(*extensions.Ingress)
validOld := class.IsValid(oldIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass)
validCur := class.IsValid(curIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass)
if !validOld && validCur {
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validOld && !validCur {
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validCur && !reflect.DeepEqual(old, cur) {
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
}
ic.syncQueue.Enqueue(cur)
},
}
secrEventHandler := cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
sec := cur.(*apiv1.Secret)
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
ic.syncSecret(key)
}
},
DeleteFunc: func(obj interface{}) {
sec, ok := obj.(*apiv1.Secret)
if !ok {
// If we reached here it means the secret was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
sec, ok = tombstone.Obj.(*apiv1.Secret)
if !ok {
glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj)
return
}
}
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
ic.sslCertTracker.DeleteAll(key)
},
}
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ic.syncQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
ic.syncQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
oep := old.(*apiv1.Endpoints)
ocur := cur.(*apiv1.Endpoints)
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
ic.syncQueue.Enqueue(cur)
}
},
}
mapEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
upCmap := obj.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
if mapKey == ic.cfg.ConfigMapName {
glog.V(2).Infof("adding configmap %v to backend", mapKey)
ic.cfg.Backend.SetConfig(upCmap)
ic.forceReload = true
}
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
upCmap := cur.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
if mapKey == ic.cfg.ConfigMapName {
glog.V(2).Infof("updating configmap backend (%v)", mapKey)
ic.cfg.Backend.SetConfig(upCmap)
ic.forceReload = true
}
// updates to configuration configmaps can trigger an update
if mapKey == ic.cfg.ConfigMapName || mapKey == ic.cfg.TCPConfigMapName || mapKey == ic.cfg.UDPConfigMapName {
ic.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
ic.syncQueue.Enqueue(cur)
}
}
},
}
watchNs := apiv1.NamespaceAll
if ic.cfg.ForceNamespaceIsolation && ic.cfg.Namespace != apiv1.NamespaceAll {
watchNs = ic.cfg.Namespace
}
ic.ingLister.Store, ic.ingController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()),
&extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler)
ic.endpLister.Store, ic.endpController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()),
&apiv1.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler)
ic.secrLister.Store, ic.secrController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()),
&apiv1.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler)
ic.mapLister.Store, ic.mapController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()),
&apiv1.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler)
ic.svcLister.Store, ic.svcController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()),
&apiv1.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
var nodeListerWatcher cache.ListerWatcher
if config.DisableNodeList {
nodeListerWatcher = fcache.NewFakeControllerSource()
} else {
nodeListerWatcher = cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
}
ic.nodeLister.Store, ic.nodeController = cache.NewInformer(
nodeListerWatcher,
&apiv1.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
ic.createListers(config.DisableNodeList)
if config.UpdateStatus {
ic.syncStatus = status.NewStatusSyncer(status.Config{
Client: config.Client,
PublishService: ic.cfg.PublishService,
IngressLister: ic.ingLister,
IngressLister: ic.listers.Ingress,
ElectionID: config.ElectionID,
IngressClass: config.IngressClass,
DefaultIngressClass: config.DefaultIngressClass,
@ -353,14 +188,7 @@ func newIngressController(config *Configuration) *GenericController {
}
ic.annotations = newAnnotationExtractor(ic)
ic.cfg.Backend.SetListers(ingress.StoreLister{
Ingress: ic.ingLister,
Service: ic.svcLister,
Node: ic.nodeLister,
Endpoint: ic.endpLister,
Secret: ic.secrLister,
ConfigMap: ic.mapLister,
})
ic.cfg.Backend.SetListers(ic.listers)
cloner.RegisterDeepCopyFunc(ingress.GetGeneratedDeepCopyFuncs)
@ -384,7 +212,7 @@ func (ic GenericController) GetDefaultBackend() defaults.Backend {
// GetPublishService returns the configured service used to set ingress status
func (ic GenericController) GetPublishService() *apiv1.Service {
s, err := ic.GetService(ic.cfg.PublishService)
s, err := ic.listers.Service.GetByName(ic.cfg.PublishService)
if err != nil {
return nil
}
@ -399,37 +227,12 @@ func (ic GenericController) GetRecorder() record.EventRecorder {
// GetSecret searches for a secret in the local secrets Store
func (ic GenericController) GetSecret(name string) (*apiv1.Secret, error) {
s, exists, err := ic.secrLister.Store.GetByKey(name)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("secret %v was not found", name)
}
return s.(*apiv1.Secret), nil
return ic.listers.Secret.GetByName(name)
}
// GetService searches for a service in the local secrets Store
func (ic GenericController) GetService(name string) (*apiv1.Service, error) {
s, exists, err := ic.svcLister.Store.GetByKey(name)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("service %v was not found", name)
}
return s.(*apiv1.Service), nil
}
func (ic *GenericController) getConfigMap(ns, name string) (*apiv1.ConfigMap, error) {
s, exists, err := ic.mapLister.Store.GetByKey(fmt.Sprintf("%v/%v", ns, name))
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("configmap %v was not found", name)
}
return s.(*apiv1.ConfigMap), nil
return ic.listers.Service.GetByName(name)
}
// sync collects all the pieces required to assemble the configuration file and
@ -443,7 +246,7 @@ func (ic *GenericController) syncIngress(key interface{}) error {
}
if name, ok := key.(string); ok {
if obj, exists, _ := ic.ingLister.GetByKey(name); exists {
if obj, exists, _ := ic.listers.Ingress.GetByKey(name); exists {
ing := obj.(*extensions.Ingress)
ic.readSecrets(ing)
}
@ -511,15 +314,15 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1
return []ingress.L4Service{}
}
ns, name, err := k8s.ParseNameNS(configmapName)
_, _, err := k8s.ParseNameNS(configmapName)
if err != nil {
glog.Errorf("unexpected error reading configmap %v: %v", name, err)
glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err)
return []ingress.L4Service{}
}
configmap, err := ic.getConfigMap(ns, name)
configmap, err := ic.listers.ConfigMap.GetByName(configmapName)
if err != nil {
glog.Errorf("unexpected error reading configmap %v: %v", name, err)
glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err)
return []ingress.L4Service{}
}
@ -562,7 +365,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1
continue
}
svcObj, svcExists, err := ic.svcLister.Store.GetByKey(nsName)
svcObj, svcExists, err := ic.listers.Service.GetByKey(nsName)
if err != nil {
glog.Warningf("error getting service %v: %v", nsName, err)
continue
@ -578,7 +381,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1
var endps []ingress.Endpoint
targetPort, err := strconv.Atoi(svcPort)
if err != nil {
glog.V(3).Infof("searching service %v/%v endpoints using the name '%v'", svcNs, svcName, svcPort)
glog.V(3).Infof("searching service %v endpoints using the name '%v'", svcNs, svcName, svcPort)
for _, sp := range svc.Spec.Ports {
if sp.Name == svcPort {
if sp.Protocol == proto {
@ -631,7 +434,7 @@ func (ic *GenericController) getDefaultUpstream() *ingress.Backend {
Name: defUpstreamName,
}
svcKey := ic.cfg.DefaultService
svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey)
svcObj, svcExists, err := ic.listers.Service.GetByKey(svcKey)
if err != nil {
glog.Warningf("unexpected error searching the default backend %v: %v", ic.cfg.DefaultService, err)
upstream.Endpoints = append(upstream.Endpoints, ic.cfg.Backend.DefaultEndpoint())
@ -656,36 +459,19 @@ func (ic *GenericController) getDefaultUpstream() *ingress.Backend {
return upstream
}
type ingressByRevision []interface{}
func (c ingressByRevision) Len() int { return len(c) }
func (c ingressByRevision) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c ingressByRevision) Less(i, j int) bool {
ir := c[i].(*extensions.Ingress).ResourceVersion
jr := c[j].(*extensions.Ingress).ResourceVersion
return ir < jr
}
// getBackendServers returns a list of Upstream and Server to be used by the backend
// An upstream can be used in multiple servers if the namespace, service name and port are the same
func (ic *GenericController) getBackendServers() ([]*ingress.Backend, []*ingress.Server) {
ings := ic.ingLister.Store.List()
sort.Sort(ingressByRevision(ings))
ings := ic.listers.Ingress.List()
sort.Slice(ings, func(i, j int) bool {
ir := ings[i].(*extensions.Ingress).ResourceVersion
jr := ings[j].(*extensions.Ingress).ResourceVersion
return ir < jr
})
upstreams := ic.createUpstreams(ings)
servers := ic.createServers(ings, upstreams)
// If a server has a hostname equivalent to a pre-existing alias, then we
// remove the alias to avoid conflicts.
for _, server := range servers {
for j, alias := range servers {
if server.Hostname == alias.Alias {
glog.Warningf("There is a conflict with server hostname '%v' and alias '%v' (in server %v). Removing alias to avoid conflicts.",
server.Hostname, alias.Hostname, alias.Hostname)
servers[j].Alias = ""
}
}
}
du := ic.getDefaultUpstream()
upstreams := ic.createUpstreams(ings, du)
servers := ic.createServers(ings, upstreams, du)
for _, ingIf := range ings {
ing := ingIf.(*extensions.Ingress)
@ -860,15 +646,22 @@ func (ic *GenericController) getBackendServers() ([]*ingress.Backend, []*ingress
}
if ic.cfg.SortBackends {
sort.Sort(ingress.BackendByNameServers(aUpstreams))
sort.Slice(aUpstreams, func(a, b int) bool {
return aUpstreams[a].Name < aUpstreams[b].Name
})
}
aServers := make([]*ingress.Server, 0, len(servers))
for _, value := range servers {
sort.Sort(ingress.LocationByPath(value.Locations))
sort.Slice(value.Locations, func(i, j int) bool {
return value.Locations[i].Path > value.Locations[j].Path
})
aServers = append(aServers, value)
}
sort.Sort(ingress.ServerByName(aServers))
sort.Slice(aServers, func(i, j int) bool {
return aServers[i].Hostname < aServers[j].Hostname
})
return aUpstreams, aServers
}
@ -879,7 +672,7 @@ func (ic GenericController) GetAuthCertificate(secretName string) (*resolver.Aut
ic.syncSecret(secretName)
}
_, err := ic.GetSecret(secretName)
_, err := ic.listers.Secret.GetByName(secretName)
if err != nil {
return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err)
}
@ -898,9 +691,9 @@ func (ic GenericController) GetAuthCertificate(secretName string) (*resolver.Aut
// createUpstreams creates the NGINX upstreams for each service referenced in
// Ingress rules. The servers inside the upstream are endpoints.
func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ingress.Backend {
func (ic *GenericController) createUpstreams(data []interface{}, du *ingress.Backend) map[string]*ingress.Backend {
upstreams := make(map[string]*ingress.Backend)
upstreams[defUpstreamName] = ic.getDefaultUpstream()
upstreams[defUpstreamName] = du
for _, ingIf := range data {
ing := ingIf.(*extensions.Ingress)
@ -994,18 +787,13 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
upstreams[name].Endpoints = endp
}
s, exists, err := ic.svcLister.Store.GetByKey(svcKey)
s, err := ic.listers.Service.GetByName(svcKey)
if err != nil {
glog.Warningf("error obtaining service: %v", err)
continue
}
if !exists {
glog.Warningf("service %v does not exists", svcKey)
continue
}
upstreams[name].Service = s.(*apiv1.Service)
upstreams[name].Service = s
}
}
}
@ -1014,7 +802,7 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
}
func (ic *GenericController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) {
svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey)
svcObj, svcExists, err := ic.listers.Service.GetByKey(svcKey)
if !svcExists {
return endpoint, fmt.Errorf("service %v does not exist", svcKey)
@ -1035,19 +823,13 @@ func (ic *GenericController) getServiceClusterEndpoint(svcKey string, backend *e
// to a service.
func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
hz *healthcheck.Upstream) ([]ingress.Endpoint, error) {
svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey)
svc, err := ic.listers.Service.GetByName(svcKey)
var upstreams []ingress.Endpoint
if err != nil {
return upstreams, fmt.Errorf("error getting service %v from the cache: %v", svcKey, err)
}
if !svcExists {
err = fmt.Errorf("service %v does not exist", svcKey)
return upstreams, err
}
svc := svcObj.(*apiv1.Service)
glog.V(3).Infof("obtaining port information for service %v", svcKey)
for _, servicePort := range svc.Spec.Ports {
// targetPort could be a string, use the name or the port (int)
@ -1061,7 +843,15 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
}
if ic.cfg.SortBackends {
sort.Sort(ingress.EndpointByAddrPort(endps))
sort.Slice(endps, func(i, j int) bool {
iName := endps[i].Address
jName := endps[j].Address
if iName != jName {
return iName < jName
}
return endps[i].Port < endps[j].Port
})
}
upstreams = append(upstreams, endps...)
break
@ -1084,11 +874,16 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
// SSL certificates. Each server is configured with location / using a default
// backend specified by the user or the one inside the ingress spec.
func (ic *GenericController) createServers(data []interface{},
upstreams map[string]*ingress.Backend) map[string]*ingress.Server {
servers := make(map[string]*ingress.Server)
upstreams map[string]*ingress.Backend,
du *ingress.Backend) map[string]*ingress.Server {
servers := make(map[string]*ingress.Server, len(data))
// If a server has a hostname equivalent to a pre-existing alias, then we
// remove the alias to avoid conflicts.
aliases := make(map[string]string, len(data))
bdef := ic.GetDefaultBackend()
ngxProxy := proxy.Configuration{
ngxProxy := &proxy.Configuration{
BodySize: bdef.ProxyBodySize,
ConnectTimeout: bdef.ProxyConnectTimeout,
SendTimeout: bdef.ProxySendTimeout,
@ -1111,7 +906,6 @@ func (ic *GenericController) createServers(data []interface{},
}
// initialize the default server
du := ic.getDefaultUpstream()
servers[defServerName] = &ingress.Server{
Hostname: defServerName,
SSLCertificate: defaultPemFileName,
@ -1137,7 +931,6 @@ func (ic *GenericController) createServers(data []interface{},
sslpt := ic.annotations.SSLPassthrough(ing)
// default upstream server
du := ic.getDefaultUpstream()
un := du.Name
if ing.Spec.Backend != nil {
@ -1178,7 +971,9 @@ func (ic *GenericController) createServers(data []interface{},
Proxy: ngxProxy,
Service: &apiv1.Service{},
},
}, SSLPassthrough: sslpt}
},
SSLPassthrough: sslpt,
}
}
}
@ -1200,6 +995,11 @@ func (ic *GenericController) createServers(data []interface{},
// setup server aliases
servers[host].Alias = aliasAnnotation
if aliasAnnotation != "" {
if _, ok := aliases[aliasAnnotation]; !ok {
aliases[aliasAnnotation] = host
}
}
// only add a certificate if the server does not have one previously configured
if servers[host].SSLCertificate != "" {
@ -1258,6 +1058,12 @@ func (ic *GenericController) createServers(data []interface{},
}
}
for alias, host := range aliases {
if _, ok := servers[alias]; ok {
glog.Warningf("There is a conflict with server hostname '%v' and alias '%v' (in server %v). Removing alias to avoid conflicts.", alias, host)
servers[host].Alias = ""
}
}
return servers
}
@ -1292,7 +1098,7 @@ func (ic *GenericController) getEndpoints(
}
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String())
ep, err := ic.endpLister.GetServiceEndpoints(s)
ep, err := ic.listers.Endpoint.GetServiceEndpoints(s)
if err != nil {
glog.Warningf("unexpected error obtaining service endpoints: %v", err)
return upsServers
@ -1402,9 +1208,16 @@ func (ic GenericController) Start() {
}
// initial sync of secrets to avoid unnecessary reloads
for _, key := range ic.ingLister.ListKeys() {
if obj, exists, _ := ic.ingLister.GetByKey(key); exists {
for _, key := range ic.listers.Ingress.ListKeys() {
if obj, exists, _ := ic.listers.Ingress.GetByKey(key); exists {
ing := obj.(*extensions.Ingress)
if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
continue
}
ic.readSecrets(ing)
}
}
@ -1417,6 +1230,12 @@ func (ic GenericController) Start() {
go ic.syncStatus.Run(ic.stopCh)
}
ic.initialSyncDone = true
time.Sleep(5 * time.Second)
// force initial sync
ic.syncQueue.Enqueue(&extensions.Ingress{})
<-ic.stopCh
}

View file

@ -0,0 +1,200 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"reflect"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"
fcache "k8s.io/client-go/tools/cache/testing"
"k8s.io/ingress/core/pkg/ingress/annotations/class"
"k8s.io/ingress/core/pkg/ingress/annotations/parser"
)
func (ic *GenericController) createListers(disableNodeLister bool) {
// from here to the end of the method all the code is just boilerplate
// required to watch Ingress, Secrets, ConfigMaps and Endoints.
// This is used to detect new content, updates or removals and act accordingly
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !class.IsValid(addIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
a, _ := parser.GetStringAnnotation(class.IngressKey, addIng)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
return
}
ic.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
if ic.initialSyncDone {
ic.syncQueue.Enqueue(obj)
}
},
DeleteFunc: func(obj interface{}) {
delIng, ok := obj.(*extensions.Ingress)
if !ok {
// If we reached here it means the ingress was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
delIng, ok = tombstone.Obj.(*extensions.Ingress)
if !ok {
glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
return
}
}
if !class.IsValid(delIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
return
}
ic.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
ic.syncQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
oldIng := old.(*extensions.Ingress)
curIng := cur.(*extensions.Ingress)
validOld := class.IsValid(oldIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass)
validCur := class.IsValid(curIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass)
if !validOld && validCur {
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validOld && !validCur {
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validCur && !reflect.DeepEqual(old, cur) {
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
}
ic.syncQueue.Enqueue(cur)
},
}
secrEventHandler := cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
sec := cur.(*apiv1.Secret)
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
ic.syncSecret(key)
}
},
DeleteFunc: func(obj interface{}) {
sec, ok := obj.(*apiv1.Secret)
if !ok {
// If we reached here it means the secret was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
sec, ok = tombstone.Obj.(*apiv1.Secret)
if !ok {
glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj)
return
}
}
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
ic.sslCertTracker.DeleteAll(key)
},
}
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ic.syncQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
ic.syncQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
oep := old.(*apiv1.Endpoints)
ocur := cur.(*apiv1.Endpoints)
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
ic.syncQueue.Enqueue(cur)
}
},
}
mapEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
upCmap := obj.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
if mapKey == ic.cfg.ConfigMapName {
glog.V(2).Infof("adding configmap %v to backend", mapKey)
ic.cfg.Backend.SetConfig(upCmap)
ic.forceReload = true
}
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
upCmap := cur.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
if mapKey == ic.cfg.ConfigMapName {
glog.V(2).Infof("updating configmap backend (%v)", mapKey)
ic.cfg.Backend.SetConfig(upCmap)
ic.forceReload = true
}
// updates to configuration configmaps can trigger an update
if mapKey == ic.cfg.ConfigMapName || mapKey == ic.cfg.TCPConfigMapName || mapKey == ic.cfg.UDPConfigMapName {
ic.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
ic.syncQueue.Enqueue(cur)
}
}
},
}
watchNs := apiv1.NamespaceAll
if ic.cfg.ForceNamespaceIsolation && ic.cfg.Namespace != apiv1.NamespaceAll {
watchNs = ic.cfg.Namespace
}
ic.listers.Ingress.Store, ic.ingController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()),
&extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler)
ic.listers.Endpoint.Store, ic.endpController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()),
&apiv1.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler)
ic.listers.Secret.Store, ic.secrController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()),
&apiv1.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler)
ic.listers.ConfigMap.Store, ic.mapController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()),
&apiv1.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler)
ic.listers.Service.Store, ic.svcController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()),
&apiv1.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
var nodeListerWatcher cache.ListerWatcher
if disableNodeLister {
nodeListerWatcher = fcache.NewFakeControllerSource()
} else {
nodeListerWatcher = cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
}
ic.listers.Node.Store, ic.nodeController = cache.NewInformer(
nodeListerWatcher,
&apiv1.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
}

View file

@ -51,7 +51,7 @@ func TestMergeLocationAnnotations(t *testing.T) {
"Redirect": redirect.Redirect{},
"Rewrite": rewrite.Redirect{},
"Whitelist": ipwhitelist.SourceRange{},
"Proxy": proxy.Configuration{},
"Proxy": &proxy.Configuration{},
"UsePortInRedirects": true,
}