Use nginx upstreams and reload only if configuration changes

This commit is contained in:
Manuel de Brito Fontes 2016-03-14 23:29:13 -03:00
parent d0a15b1267
commit cad814cbb3
50 changed files with 370 additions and 10432 deletions

View file

@ -19,7 +19,7 @@ package main
import (
"fmt"
"net/http"
"reflect"
"sort"
"sync"
"time"
@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/contrib/ingress/controllers/nginx-third-party/nginx"
@ -58,28 +59,26 @@ const (
// namespace/serviceName:portToExport pairings. This assumes you've opened up the right
// hostPorts for each service that serves ingress traffic. Te value of portToExport indicates the
// port to listen inside nginx, not the port of the service.
lbTcpServices = "tcpservices"
lbTCPServices = "tcpservices"
k8sAnnotationPrefix = "nginx-ingress.kubernetes.io"
)
var (
keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
)
// loadBalancerController watches the kubernetes api and adds/removes services
// from the loadbalancer
type loadBalancerController struct {
client *client.Client
ingController *framework.Controller
configController *framework.Controller
endpController *framework.Controller
svcController *framework.Controller
ingLister StoreToIngressLister
svcLister cache.StoreToServiceLister
configLister StoreToConfigMapLister
endpLister cache.StoreToEndpointsLister
recorder record.EventRecorder
ingQueue *taskQueue
configQueue *taskQueue
stopCh chan struct{}
ngx *nginx.NginxManager
nginx *nginx.NginxManager
lbInfo *lbInfo
// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
@ -95,97 +94,47 @@ func (a annotations) getNginxConfig() (string, bool) {
return val, ok
}
func (a annotations) getTcpServices() (string, bool) {
val, ok := a[fmt.Sprintf("%v/%v", k8sAnnotationPrefix, lbTcpServices)]
func (a annotations) getTCPServices() (string, bool) {
val, ok := a[fmt.Sprintf("%v/%v", k8sAnnotationPrefix, lbTCPServices)]
return val, ok
}
// NewLoadBalancerController creates a controller for nginx loadbalancer
func NewLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Duration, defaultSvc, customErrorSvc nginx.Service, namespace string, lbInfo *lbInfo) (*loadBalancerController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
// newLoadBalancerController creates a controller for nginx loadbalancer
func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Duration, defaultSvc, customErrorSvc nginx.Service, namespace string, lbInfo *lbInfo) (*loadBalancerController, error) {
lbc := loadBalancerController{
client: kubeClient,
stopCh: make(chan struct{}),
recorder: eventBroadcaster.NewRecorder(
api.EventSource{Component: "nginx-lb-controller"}),
lbInfo: lbInfo,
nginx: nginx.NewManager(kubeClient, defaultSvc, customErrorSvc),
}
lbc.ingQueue = NewTaskQueue(lbc.syncIngress)
lbc.configQueue = NewTaskQueue(lbc.syncConfig)
lbc.ngx = nginx.NewManager(kubeClient, defaultSvc, customErrorSvc)
// Ingress watch handlers
pathHandlers := framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
lbc.recorder.Eventf(addIng, api.EventTypeNormal, "ADD", fmt.Sprintf("Adding ingress %s/%s", addIng.Namespace, addIng.Name))
lbc.ingQueue.enqueue(obj)
},
DeleteFunc: lbc.ingQueue.enqueue,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
glog.V(2).Infof("Ingress %v changed, syncing", cur.(*extensions.Ingress).Name)
}
lbc.ingQueue.enqueue(cur)
},
}
lbc.ingLister.Store, lbc.ingController = framework.NewInformer(
&cache.ListWatch{
ListFunc: ingressListFunc(lbc.client, namespace),
WatchFunc: ingressWatchFunc(lbc.client, namespace),
},
&extensions.Ingress{}, resyncPeriod, pathHandlers)
// Config watch handlers
configHandlers := framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
lbc.configQueue.enqueue(obj)
},
DeleteFunc: lbc.configQueue.enqueue,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
glog.V(2).Infof("nginx rc changed, syncing")
lbc.configQueue.enqueue(cur)
}
},
}
&extensions.Ingress{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
lbc.configLister.Store, lbc.configController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(api.ListOptions) (runtime.Object, error) {
switch lbInfo.DeployType.(type) {
case *api.ReplicationController:
rc, err := kubeClient.ReplicationControllers(lbInfo.PodNamespace).Get(lbInfo.ObjectName)
return &api.ReplicationControllerList{
Items: []api.ReplicationController{*rc},
}, err
case *extensions.DaemonSet:
ds, err := kubeClient.Extensions().DaemonSets(lbInfo.PodNamespace).Get(lbInfo.ObjectName)
return &extensions.DaemonSetList{
Items: []extensions.DaemonSet{*ds},
}, err
default:
return nil, errInvalidKind
}
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
switch lbInfo.DeployType.(type) {
case *api.ReplicationController:
options.LabelSelector = labels.SelectorFromSet(labels.Set{"name": lbInfo.ObjectName})
return kubeClient.ReplicationControllers(lbInfo.PodNamespace).Watch(options)
case *extensions.DaemonSet:
options.LabelSelector = labels.SelectorFromSet(labels.Set{"name": lbInfo.ObjectName})
return kubeClient.Extensions().DaemonSets(lbInfo.PodNamespace).Watch(options)
default:
return nil, errInvalidKind
}
},
ListFunc: configListFunc(kubeClient, lbc.lbInfo.DeployType, namespace, lbInfo.ObjectName),
WatchFunc: configWatchFunc(kubeClient, lbc.lbInfo.DeployType, namespace, lbInfo.ObjectName),
},
&api.ReplicationController{}, resyncPeriod, configHandlers)
&api.ReplicationController{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
lbc.endpLister.Store, lbc.endpController = framework.NewInformer(
&cache.ListWatch{
ListFunc: endpointsListFunc(kubeClient, namespace),
WatchFunc: endpointsWatchFunc(kubeClient, namespace),
},
&api.Endpoints{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
lbc.svcLister.Store, lbc.svcController = framework.NewInformer(
&cache.ListWatch{
ListFunc: serviceListFunc(kubeClient, namespace),
WatchFunc: serviceWatchFunc(kubeClient, namespace),
},
&api.Service{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
return &lbc, nil
}
@ -202,117 +151,66 @@ func ingressWatchFunc(c *client.Client, ns string) func(options api.ListOptions)
}
}
// syncIngress manages Ingress create/updates/deletes.
func (lbc *loadBalancerController) syncIngress(key string) {
glog.V(2).Infof("Syncing Ingress %v", key)
obj, ingExists, err := lbc.ingLister.Store.GetByKey(key)
if err != nil {
lbc.ingQueue.requeue(key, err)
return
func serviceListFunc(c *client.Client, ns string) func(api.ListOptions) (runtime.Object, error) {
return func(opts api.ListOptions) (runtime.Object, error) {
return c.Services(ns).List(opts)
}
if !ingExists {
glog.Errorf("Ingress not found: %v", key)
return
}
// this means some Ingress rule changed. There is no need to reload nginx but
// we need to update the rules to use invoking "POST /update-ingress" with the
// list of Ingress rules
ingList := lbc.ingLister.Store.List()
if err := lbc.ngx.SyncIngress(ingList); err != nil {
lbc.ingQueue.requeue(key, err)
return
}
ing := *obj.(*extensions.Ingress)
if err := lbc.updateIngressStatus(ing); err != nil {
lbc.recorder.Eventf(&ing, api.EventTypeWarning, "Status", err.Error())
lbc.ingQueue.requeue(key, err)
}
return
}
// syncConfig manages changes in nginx configuration.
func (lbc *loadBalancerController) syncConfig(key string) {
glog.Infof("Syncing nginx configuration")
if !lbc.ingController.HasSynced() {
glog.Infof("deferring sync till endpoints controller has synced")
time.Sleep(100 * time.Millisecond)
func serviceWatchFunc(c *client.Client, ns string) func(options api.ListOptions) (watch.Interface, error) {
return func(options api.ListOptions) (watch.Interface, error) {
return c.Services(ns).Watch(options)
}
// we only need to sync nginx
if key != fmt.Sprintf("%v/%v", lbc.lbInfo.PodNamespace, lbc.lbInfo.ObjectName) {
glog.Warningf("skipping sync because the event is not related to a change in configuration")
return
}
obj, configExists, err := lbc.configLister.Store.GetByKey(key)
if err != nil {
lbc.configQueue.requeue(key, err)
return
}
if !configExists {
glog.Errorf("Configuration not found: %v", key)
return
}
glog.V(2).Infof("Syncing config %v", key)
var kindAnnotations map[string]string
switch obj.(type) {
case *api.ReplicationController:
rc := *obj.(*api.ReplicationController)
kindAnnotations = rc.Annotations
case *extensions.DaemonSet:
rc := *obj.(*extensions.DaemonSet)
kindAnnotations = rc.Annotations
}
ngxCfgAnn, _ := annotations(kindAnnotations).getNginxConfig()
tcpSvcAnn, _ := annotations(kindAnnotations).getTcpServices()
ngxConfig, err := lbc.ngx.ReadConfig(ngxCfgAnn)
if err != nil {
glog.Warningf("%v", err)
}
// TODO: tcp services can change (new item in the annotation list)
// TODO: skip get everytime
tcpServices := getTcpServices(lbc.client, tcpSvcAnn)
lbc.ngx.Reload(ngxConfig, tcpServices)
return
}
// updateIngressStatus updates the IP and annotations of a loadbalancer.
// The annotations are parsed by kubectl describe.
func (lbc *loadBalancerController) updateIngressStatus(ing extensions.Ingress) error {
ingClient := lbc.client.Extensions().Ingress(ing.Namespace)
ip := lbc.lbInfo.PodIP
currIng, err := ingClient.Get(ing.Name)
if err != nil {
return err
}
currIng.Status = extensions.IngressStatus{
LoadBalancer: api.LoadBalancerStatus{
Ingress: []api.LoadBalancerIngress{
{IP: ip},
},
},
func configListFunc(c *client.Client, deployType runtime.Object, ns, name string) func(api.ListOptions) (runtime.Object, error) {
return func(api.ListOptions) (runtime.Object, error) {
switch deployType.(type) {
case *api.ReplicationController:
rc, err := c.ReplicationControllers(ns).Get(name)
return &api.ReplicationControllerList{
Items: []api.ReplicationController{*rc},
}, err
case *extensions.DaemonSet:
ds, err := c.Extensions().DaemonSets(ns).Get(name)
return &extensions.DaemonSetList{
Items: []extensions.DaemonSet{*ds},
}, err
default:
return nil, errInvalidKind
}
}
}
glog.Infof("Updating loadbalancer %v/%v with IP %v", ing.Namespace, ing.Name, ip)
lbc.recorder.Eventf(currIng, api.EventTypeNormal, "CREATE", "ip: %v", ip)
return nil
func configWatchFunc(c *client.Client, deployType runtime.Object, ns, name string) func(options api.ListOptions) (watch.Interface, error) {
return func(options api.ListOptions) (watch.Interface, error) {
switch deployType.(type) {
case *api.ReplicationController:
options.LabelSelector = labels.SelectorFromSet(labels.Set{"name": name})
return c.ReplicationControllers(ns).Watch(options)
case *extensions.DaemonSet:
options.LabelSelector = labels.SelectorFromSet(labels.Set{"name": name})
return c.Extensions().DaemonSets(ns).Watch(options)
default:
return nil, errInvalidKind
}
}
}
func endpointsListFunc(c *client.Client, ns string) func(api.ListOptions) (runtime.Object, error) {
return func(opts api.ListOptions) (runtime.Object, error) {
return c.Endpoints(ns).List(opts)
}
}
func endpointsWatchFunc(c *client.Client, ns string) func(options api.ListOptions) (watch.Interface, error) {
return func(options api.ListOptions) (watch.Interface, error) {
return c.Endpoints(ns).Watch(options)
}
}
func (lbc *loadBalancerController) registerHandlers() {
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
if err := lbc.ngx.IsHealthy(); err != nil {
if err := lbc.nginx.IsHealthy(); err != nil {
w.WriteHeader(500)
w.Write([]byte("nginx error"))
return
@ -329,6 +227,136 @@ func (lbc *loadBalancerController) registerHandlers() {
glog.Fatalf(fmt.Sprintf("%v", http.ListenAndServe(fmt.Sprintf(":%v", *healthzPort), nil)))
}
func (lbc *loadBalancerController) sync() {
ings := lbc.ingLister.Store.List()
upstreams, servers, update := lbc.updateNGINX(ings)
if update {
glog.V(2).Infof("syncing NGINX config")
var kindAnnotations map[string]string
ngxCfgAnn, _ := annotations(kindAnnotations).getNginxConfig()
tcpSvcAnn, _ := annotations(kindAnnotations).getTCPServices()
ngxConfig, err := lbc.nginx.ReadConfig(ngxCfgAnn)
if err != nil {
glog.Warningf("%v", err)
}
tcpServices := getTCPServices(lbc.client, tcpSvcAnn)
lbc.nginx.CheckAndReload(ngxConfig, upstreams, servers, tcpServices)
}
}
func (lbc *loadBalancerController) updateNGINX(data []interface{}) ([]nginx.Upstream, []nginx.Server, bool) {
pems := make(map[string]string)
upstreams := make(map[string]nginx.Upstream)
var servers []nginx.Server
for _, ingIf := range data {
ing := ingIf.(extensions.Ingress)
for _, rule := range ing.Spec.Rules {
if rule.IngressRuleValue.HTTP == nil {
continue
}
for _, path := range rule.HTTP.Paths {
name := ing.Namespace + "-" + path.Backend.ServiceName
var ups nginx.Upstream
if existent, ok := upstreams[name]; ok {
ups = existent
} else {
ups := nginx.NewUpstreamWithDefaultServer(name)
upstreams[name] = ups
}
svcKey := ing.Namespace + "/" + path.Backend.ServiceName
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey)
if err != nil {
glog.Infof("error getting service %v from the cache: %v", svcKey, err)
} else {
if svcExists {
svc := svcObj.(*api.Service)
if svc.Spec.ClusterIP != "None" && svc.Spec.ClusterIP != "" {
upsServer := nginx.UpstreamServer{Address: svc.Spec.ClusterIP, Port: path.Backend.ServicePort.String()}
ups.Backends = []nginx.UpstreamServer{upsServer}
} else if svc.Spec.ClusterIP == "None" {
endps, err := lbc.endpLister.GetServiceEndpoints(svc)
if err != nil {
glog.Infof("error getting endpoints for service %v from the cache: %v", svc, err)
} else {
upsServers := endpointsToUpstreamServers(endps, path.Backend.ServicePort.IntValue())
if len(upsServers) > 0 {
ups.Backends = upsServers
}
}
}
}
}
//upstreams[name] = append(upstreams[name], ups)
}
}
for _, rule := range ing.Spec.Rules {
server := nginx.Server{Name: rule.Host}
if pemFile, ok := pems[rule.Host]; ok {
server.SSL = true
server.SSLCertificate = pemFile
server.SSLCertificateKey = pemFile
}
var locations []nginx.Location
for _, path := range rule.HTTP.Paths {
loc := nginx.Location{Path: path.Path}
upsName := ing.GetName() + "-" + path.Backend.ServiceName
for _, ups := range upstreams {
if upsName == ups.Name {
loc.Upstream = ups
}
}
locations = append(locations, loc)
}
server.Locations = locations
servers = append(servers, server)
}
}
uValues := make([]nginx.Upstream, 0, len(upstreams))
for _, value := range upstreams {
sort.Sort(nginx.UpstreamServerByAddrPort(value.Backends))
uValues = append(uValues, value)
}
sort.Sort(nginx.UpstreamByNameServers(uValues))
sort.Sort(nginx.ServerByNamePort(servers))
return uValues, servers, true
}
func endpointsToUpstreamServers(endps api.Endpoints, servicePort int) []nginx.UpstreamServer {
var upsServers []nginx.UpstreamServer
for _, subset := range endps.Subsets {
for _, port := range subset.Ports {
if port.Port == servicePort {
for _, address := range subset.Addresses {
ups := nginx.UpstreamServer{Address: address.IP, Port: fmt.Sprintf("%v", servicePort)}
upsServers = append(upsServers, ups)
}
break
}
}
}
return upsServers
}
// Stop stops the loadbalancer controller.
func (lbc *loadBalancerController) Stop() {
// Stop is invoked from the http endpoint.
@ -339,29 +367,22 @@ func (lbc *loadBalancerController) Stop() {
if !lbc.shutdown {
close(lbc.stopCh)
glog.Infof("Shutting down controller queues")
lbc.ingQueue.shutdown()
lbc.configQueue.shutdown()
lbc.shutdown = true
}
}
// Run starts the loadbalancer controller.
func (lbc *loadBalancerController) Run() {
glog.Infof("Starting nginx loadbalancer controller")
go lbc.ngx.Start()
glog.Infof("Starting NGINX loadbalancer controller")
go lbc.nginx.Start()
go lbc.registerHandlers()
go lbc.configController.Run(lbc.stopCh)
go lbc.configQueue.run(time.Second, lbc.stopCh)
// Initial nginx configuration.
lbc.syncConfig(lbc.lbInfo.PodNamespace + "/" + lbc.lbInfo.ObjectName)
time.Sleep(5 * time.Second)
go lbc.ingController.Run(lbc.stopCh)
go lbc.ingQueue.run(time.Second, lbc.stopCh)
// periodic check for changes in configuration
go wait.Until(lbc.sync, 5*time.Second, wait.NeverStop)
<-lbc.stopCh
glog.Infof("Shutting down nginx loadbalancer controller")
glog.Infof("Shutting down NGINX loadbalancer controller")
}