Refactoring of kubernetes informers and local caches

This commit is contained in:
Manuel de Brito Fontes 2018-01-18 16:14:42 -03:00
parent 8975800740
commit e9a00ff916
23 changed files with 1704 additions and 817 deletions

View file

@ -36,14 +36,9 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
"k8s.io/ingress-nginx/internal/ingress/annotations/healthcheck"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
"k8s.io/ingress-nginx/internal/ingress/annotations/proxy"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
"k8s.io/ingress-nginx/internal/ingress/defaults"
"k8s.io/ingress-nginx/internal/ingress/resolver"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/internal/task"
)
@ -101,14 +96,9 @@ type Configuration struct {
SyncRateLimit float32
}
// GetDefaultBackend returns the default backend
func (n NGINXController) GetDefaultBackend() defaults.Backend {
return n.backendDefaults
}
// GetPublishService returns the configured service used to set ingress status
func (n NGINXController) GetPublishService() *apiv1.Service {
s, err := n.listers.Service.GetByName(n.cfg.PublishService)
s, err := n.store.GetService(n.cfg.PublishService)
if err != nil {
return nil
}
@ -116,16 +106,6 @@ func (n NGINXController) GetPublishService() *apiv1.Service {
return s
}
// GetSecret searches for a secret in the local secrets Store
func (n NGINXController) GetSecret(name string) (*apiv1.Secret, error) {
return n.listers.Secret.GetByName(name)
}
// GetService searches for a service in the local secrets Store
func (n NGINXController) GetService(name string) (*apiv1.Service, error) {
return n.listers.Service.GetByName(name)
}
// sync collects all the pieces required to assemble the configuration file and
// then sends the content to the backend (OnUpdate) receiving the populated
// template as response reloading the backend if is required.
@ -138,33 +118,21 @@ func (n *NGINXController) syncIngress(item interface{}) error {
if element, ok := item.(task.Element); ok {
if name, ok := element.Key.(string); ok {
if obj, exists, _ := n.listers.Ingress.GetByKey(name); exists {
ing := obj.(*extensions.Ingress)
n.readSecrets(ing)
if ing, err := n.store.GetIngress(name); err == nil {
n.store.ReadSecrets(ing)
}
}
}
// Sort ingress rules using the ResourceVersion field
ings := n.listers.Ingress.List()
ings := n.store.ListIngresses()
sort.SliceStable(ings, func(i, j int) bool {
ir := ings[i].(*extensions.Ingress).ResourceVersion
jr := ings[j].(*extensions.Ingress).ResourceVersion
ir := ings[i].ResourceVersion
jr := ings[j].ResourceVersion
return ir < jr
})
// filter ingress rules
var ingresses []*extensions.Ingress
for _, ingIf := range ings {
ing := ingIf.(*extensions.Ingress)
if !class.IsValid(ing) {
continue
}
ingresses = append(ingresses, ing)
}
upstreams, servers := n.getBackendServers(ingresses)
upstreams, servers := n.getBackendServers(ings)
var passUpstreams []*ingress.SSLPassthroughBackend
for _, server := range servers {
@ -232,7 +200,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
return []ingress.L4Service{}
}
configmap, err := n.listers.ConfigMap.GetByName(configmapName)
configmap, err := n.store.GetConfigMap(configmapName)
if err != nil {
glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err)
return []ingress.L4Service{}
@ -290,19 +258,12 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
continue
}
svcObj, svcExists, err := n.listers.Service.GetByKey(nsName)
svc, err := n.store.GetService(nsName)
if err != nil {
glog.Warningf("error getting service %v: %v", nsName, err)
continue
}
if !svcExists {
glog.Warningf("service %v was not found", nsName)
continue
}
svc := svcObj.(*apiv1.Service)
var endps []ingress.Endpoint
targetPort, err := strconv.Atoi(svcPort)
if err != nil {
@ -359,20 +320,13 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
Name: defUpstreamName,
}
svcKey := n.cfg.DefaultService
svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey)
svc, err := n.store.GetService(svcKey)
if err != nil {
glog.Warningf("unexpected error searching the default backend %v: %v", n.cfg.DefaultService, err)
upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint())
return upstream
}
if !svcExists {
glog.Warningf("service %v does not exist", svcKey)
upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint())
return upstream
}
svc := svcObj.(*apiv1.Service)
endps := n.getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Config{})
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
@ -392,7 +346,10 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
servers := n.createServers(ingresses, upstreams, du)
for _, ing := range ingresses {
anns := n.getIngressAnnotations(ing)
anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Errorf("unexpected error reading ingress annotations: %v", err)
}
for _, rule := range ing.Spec.Rules {
host := rule.Host
@ -603,29 +560,6 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
return aUpstreams, aServers
}
// GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret
func (n NGINXController) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) {
if _, exists := n.sslCertTracker.Get(name); !exists {
n.syncSecret(name)
}
_, err := n.listers.Secret.GetByName(name)
if err != nil {
return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err)
}
bc, exists := n.sslCertTracker.Get(name)
if !exists {
return &resolver.AuthSSLCert{}, fmt.Errorf("secret %v does not exist", name)
}
cert := bc.(*ingress.SSLCert)
return &resolver.AuthSSLCert{
Secret: name,
CAFileName: cert.CAFileName,
PemSHA: cert.PemSHA,
}, nil
}
// createUpstreams creates the NGINX upstreams for each service referenced in
// Ingress rules. The servers inside the upstream are endpoints.
func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingress.Backend) map[string]*ingress.Backend {
@ -633,7 +567,10 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
upstreams[defUpstreamName] = du
for _, ing := range data {
anns := n.getIngressAnnotations(ing)
anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Errorf("unexpected error reading ingress annotations: %v", err)
}
var defBackend string
if ing.Spec.Backend != nil {
@ -730,7 +667,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
upstreams[name].Endpoints = endp
}
s, err := n.listers.Service.GetByName(svcKey)
s, err := n.store.GetService(svcKey)
if err != nil {
glog.Warningf("error obtaining service: %v", err)
continue
@ -745,13 +682,11 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
}
func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) {
svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey)
if !svcExists {
svc, err := n.store.GetService(svcKey)
if err != nil {
return endpoint, fmt.Errorf("service %v does not exist", svcKey)
}
svc := svcObj.(*apiv1.Service)
if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" {
return endpoint, fmt.Errorf("No ClusterIP found for service %s", svcKey)
}
@ -783,7 +718,7 @@ func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *exte
// to a service.
func (n *NGINXController) serviceEndpoints(svcKey, backendPort string,
hz *healthcheck.Config) ([]ingress.Endpoint, error) {
svc, err := n.listers.Service.GetByName(svcKey)
svc, err := n.store.GetService(svcKey)
var upstreams []ingress.Endpoint
if err != nil {
@ -865,7 +800,7 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
// remove the alias to avoid conflicts.
aliases := make(map[string]string, len(data))
bdef := n.GetDefaultBackend()
bdef := n.store.GetDefaultBackend()
ngxProxy := proxy.Config{
BodySize: bdef.ProxyBodySize,
ConnectTimeout: bdef.ProxyConnectTimeout,
@ -885,7 +820,7 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
// Tries to fetch the default Certificate from nginx configuration.
// If it does not exists, use the ones generated on Start()
defaultCertificate, err := n.getPemCertificate(n.cfg.DefaultSSLCertificate)
defaultCertificate, err := n.store.GetLocalSecret(n.cfg.DefaultSSLCertificate)
if err == nil {
defaultPemFileName = defaultCertificate.PemFileName
defaultPemSHA = defaultCertificate.PemSHA
@ -908,7 +843,10 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
// initialize all the servers
for _, ing := range data {
anns := n.getIngressAnnotations(ing)
anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Errorf("unexpected error reading ingress annotations: %v", err)
}
// default upstream server
un := du.Name
@ -976,7 +914,10 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
// configure default location, alias, and SSL
for _, ing := range data {
anns := n.getIngressAnnotations(ing)
anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Errorf("unexpected error reading ingress annotations: %v", err)
}
for _, rule := range ing.Spec.Rules {
host := rule.Host
@ -1041,13 +982,12 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
}
key := fmt.Sprintf("%v/%v", ing.Namespace, tlsSecretName)
bc, exists := n.sslCertTracker.Get(key)
if !exists {
cert, err := n.store.GetLocalSecret(key)
if err != nil {
glog.Warningf("ssl certificate \"%v\" does not exist in local store", key)
continue
}
cert := bc.(*ingress.SSLCert)
err = cert.Certificate.VerifyHostname(host)
if err != nil {
glog.Warningf("unexpected error validating SSL certificate %v for host %v. Reason: %v", key, host, err)
@ -1124,7 +1064,7 @@ func (n *NGINXController) getEndpoints(
}
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String())
ep, err := n.listers.Endpoint.GetServiceEndpoints(s)
ep, err := n.store.GetServiceEndpoints(s)
if err != nil {
glog.Warningf("unexpected error obtaining service endpoints: %v", err)
return upsServers
@ -1173,24 +1113,6 @@ func (n *NGINXController) getEndpoints(
return upsServers
}
// readSecrets extracts information about secrets from an Ingress rule
func (n *NGINXController) readSecrets(ing *extensions.Ingress) {
for _, tls := range ing.Spec.TLS {
if tls.SecretName == "" {
continue
}
key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName)
n.syncSecret(key)
}
key, _ := parser.GetStringAnnotation("auth-tls-secret", ing)
if key == "" {
return
}
n.syncSecret(key)
}
func (n *NGINXController) isForceReload() bool {
return atomic.LoadInt32(&n.forceReload) != 0
}
@ -1204,27 +1126,3 @@ func (n *NGINXController) SetForceReload(shouldReload bool) {
atomic.StoreInt32(&n.forceReload, 0)
}
}
func (n *NGINXController) extractAnnotations(ing *extensions.Ingress) {
glog.V(3).Infof("updating annotations information for ingress %v/%v", ing.Namespace, ing.Name)
anns := n.annotations.Extract(ing)
err := n.listers.IngressAnnotation.Update(anns)
if err != nil {
glog.Errorf("unexpected error updating annotations information for ingress %v/%v: %v", anns.Namespace, anns.Name, err)
}
}
// getByIngress returns the parsed annotations from an Ingress
func (n *NGINXController) getIngressAnnotations(ing *extensions.Ingress) *annotations.Ingress {
key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name)
item, exists, err := n.listers.IngressAnnotation.GetByKey(key)
if err != nil {
glog.Errorf("unexpected error getting ingress annotation %v: %v", key, err)
return &annotations.Ingress{}
}
if !exists {
glog.Errorf("ingress annotation %v was not found", key)
return &annotations.Ingress{}
}
return item.(*annotations.Ingress)
}