Improve event handling using a workqueue

This commit is contained in:
Manuel de Brito Fontes 2016-03-22 15:01:04 -03:00
parent f5892e06fe
commit 13c21386e2
18 changed files with 1384 additions and 206 deletions

View file

@ -18,6 +18,7 @@ package main
import (
"fmt"
"reflect"
"sort"
"strconv"
"strings"
@ -33,7 +34,6 @@ import (
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/contrib/ingress/controllers/nginx-third-party/nginx"
@ -43,6 +43,10 @@ const (
defUpstreamName = "upstream-default-backend"
)
var (
keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
)
// loadBalancerController watches the kubernetes api and adds/removes services
// from the loadbalancer
type loadBalancerController struct {
@ -59,6 +63,8 @@ type loadBalancerController struct {
nxgConfigMap string
tcpConfigMap string
syncQueue *taskQueue
// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
// allowing concurrent stoppers leads to stack traces.
@ -80,19 +86,35 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
defaultSvc: defaultSvc,
}
lbc.syncQueue = NewTaskQueue(lbc.sync)
eventHandler := framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
lbc.syncQueue.enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
lbc.syncQueue.enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
lbc.syncQueue.enqueue(cur)
}
},
}
lbc.ingLister.Store, lbc.ingController = framework.NewInformer(
&cache.ListWatch{
ListFunc: ingressListFunc(lbc.client, namespace),
WatchFunc: ingressWatchFunc(lbc.client, namespace),
},
&extensions.Ingress{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
&extensions.Ingress{}, resyncPeriod, eventHandler)
lbc.endpLister.Store, lbc.endpController = framework.NewInformer(
&cache.ListWatch{
ListFunc: endpointsListFunc(lbc.client, namespace),
WatchFunc: endpointsWatchFunc(lbc.client, namespace),
},
&api.Endpoints{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
&api.Endpoints{}, resyncPeriod, eventHandler)
lbc.svcLister.Store, lbc.svcController = framework.NewInformer(
&cache.ListWatch{
@ -140,6 +162,10 @@ func endpointsWatchFunc(c *client.Client, ns string) func(options api.ListOption
}
}
func (lbc *loadBalancerController) controllersInSync() bool {
return lbc.ingController.HasSynced() && lbc.svcController.HasSynced() && lbc.endpController.HasSynced()
}
func (lbc *loadBalancerController) getConfigMap(ns, name string) (*api.ConfigMap, error) {
return lbc.client.ConfigMaps(ns).Get(name)
}
@ -148,7 +174,12 @@ func (lbc *loadBalancerController) getTCPConfigMap(ns, name string) (*api.Config
return lbc.client.ConfigMaps(ns).Get(name)
}
func (lbc *loadBalancerController) sync() {
func (lbc *loadBalancerController) sync(key string) {
if !lbc.controllersInSync() {
lbc.syncQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced"))
return
}
ings := lbc.ingLister.Store.List()
upstreams, servers := lbc.getUpstreamServers(ings)
@ -160,11 +191,7 @@ func (lbc *loadBalancerController) sync() {
cfg = &api.ConfigMap{}
}
ngxConfig, err := lbc.nginx.ReadConfig(cfg)
if err != nil {
glog.Warningf("%v", err)
}
ngxConfig := lbc.nginx.ReadConfig(cfg)
tcpServices := lbc.getTCPServices()
lbc.nginx.CheckAndReload(ngxConfig, nginx.IngressConfig{
Upstreams: upstreams,
@ -239,6 +266,13 @@ func (lbc *loadBalancerController) getTCPServices() []*nginx.Location {
}
}
// tcp upstreams cannot contain empty upstreams and there is no
// default backend equivalent for TCP
if len(endps) == 0 {
glog.Warningf("service %v/%v does no have any active endpoints", svcNs, svcName)
continue
}
tcpSvcs = append(tcpSvcs, &nginx.Location{
Path: k,
Upstream: nginx.Upstream{
@ -441,14 +475,13 @@ func (lbc *loadBalancerController) getPemsFromIngress(data []interface{}) map[st
continue
}
cn, err := lbc.nginx.CheckSSLCertificate(secretName)
pemFileName := lbc.nginx.AddOrUpdateCertAndKey(secretName, string(cert), string(key))
cn, err := lbc.nginx.CheckSSLCertificate(pemFileName)
if err != nil {
glog.Warningf("No valid SSL certificate found in secret %v", secretName)
continue
}
pemFileName := lbc.nginx.AddOrUpdateCertAndKey(secretName, string(cert), string(key))
for _, host := range tls.Hosts {
if isHostValid(host, cn) {
pems[host] = pemFileName
@ -513,6 +546,7 @@ func (lbc *loadBalancerController) Stop() {
close(lbc.stopCh)
glog.Infof("shutting down controller queues")
lbc.shutdown = true
lbc.syncQueue.shutdown()
}
}
@ -525,8 +559,7 @@ func (lbc *loadBalancerController) Run() {
go lbc.endpController.Run(lbc.stopCh)
go lbc.svcController.Run(lbc.stopCh)
// periodic check for changes in configuration
go wait.Until(lbc.sync, 5*time.Second, wait.NeverStop)
go lbc.syncQueue.run(time.Second, lbc.stopCh)
<-lbc.stopCh
glog.Infof("shutting down NGINX loadbalancer controller")