Configure nginx using a ConfigMap

This commit is contained in:
Manuel de Brito Fontes 2016-03-19 20:29:29 -03:00
parent 28f9cb0b2b
commit d9934ec4db
17 changed files with 378 additions and 417 deletions

View file

@ -20,6 +20,8 @@ import (
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
@ -30,7 +32,6 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/wait"
@ -40,59 +41,44 @@ import (
)
const (
// Name of the default config map that contains the configuration for nginx.
// Takes the form namespace/name.
// If the annotation does not exists the controller will create a new annotation with the default
// configuration.
lbConfigName = "lbconfig"
// If you have pure tcp services or https services that need L3 routing, you
// must specify them by name. Note that you are responsible for:
// 1. Making sure there is no collision between the service ports of these services.
// - You can have multiple <mysql svc name>:3306 specifications in this map, and as
// long as the service ports of your mysql service don't clash, you'll get
// loadbalancing for each one.
// 2. Exposing the service ports as node ports on a pod.
// 3. Adding firewall rules so these ports can ingress traffic.
defUpstreamName = "upstream-default-backend"
)
// loadBalancerController watches the kubernetes api and adds/removes services
// from the loadbalancer
type loadBalancerController struct {
client *client.Client
ingController *framework.Controller
configController *framework.Controller
endpController *framework.Controller
svcController *framework.Controller
ingLister StoreToIngressLister
svcLister cache.StoreToServiceLister
configLister StoreToConfigMapLister
endpLister cache.StoreToEndpointsLister
stopCh chan struct{}
nginx *nginx.NginxManager
lbInfo *lbInfo
nxgConfigMap string
tcpConfigMap string
client *client.Client
ingController *framework.Controller
endpController *framework.Controller
svcController *framework.Controller
ingLister StoreToIngressLister
svcLister cache.StoreToServiceLister
endpLister cache.StoreToEndpointsLister
nginx *nginx.NginxManager
lbInfo *lbInfo
defaultSvc string
nxgConfigMap string
tcpConfigMap string
// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
// allowing concurrent stoppers leads to stack traces.
stopLock sync.Mutex
shutdown bool
stopCh chan struct{}
}
// newLoadBalancerController creates a controller for nginx loadbalancer
func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Duration, defaultSvc nginx.Service,
namespace, nxgConfigMapName, tcpConfigMapName string, lbInfo *lbInfo) (*loadBalancerController, error) {
func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Duration, defaultSvc,
namespace, nxgConfigMapName, tcpConfigMapName string, lbRuntimeInfo *lbInfo) (*loadBalancerController, error) {
lbc := loadBalancerController{
client: kubeClient,
stopCh: make(chan struct{}),
lbInfo: lbInfo,
nginx: nginx.NewManager(kubeClient, defaultSvc),
lbInfo: lbRuntimeInfo,
nginx: nginx.NewManager(kubeClient),
nxgConfigMap: nxgConfigMapName,
tcpConfigMap: tcpConfigMapName,
defaultSvc: defaultSvc,
}
lbc.ingLister.Store, lbc.ingController = framework.NewInformer(
@ -102,24 +88,17 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
},
&extensions.Ingress{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
lbc.configLister.Store, lbc.configController = framework.NewInformer(
&cache.ListWatch{
ListFunc: configListFunc(kubeClient, lbc.lbInfo.DeployType, namespace, lbInfo.ObjectName),
WatchFunc: configWatchFunc(kubeClient, lbc.lbInfo.DeployType, namespace, lbInfo.ObjectName),
},
&api.ReplicationController{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
lbc.endpLister.Store, lbc.endpController = framework.NewInformer(
&cache.ListWatch{
ListFunc: endpointsListFunc(kubeClient, namespace),
WatchFunc: endpointsWatchFunc(kubeClient, namespace),
ListFunc: endpointsListFunc(lbc.client, namespace),
WatchFunc: endpointsWatchFunc(lbc.client, namespace),
},
&api.Endpoints{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
lbc.svcLister.Store, lbc.svcController = framework.NewInformer(
&cache.ListWatch{
ListFunc: serviceListFunc(kubeClient, namespace),
WatchFunc: serviceWatchFunc(kubeClient, namespace),
ListFunc: serviceListFunc(lbc.client, namespace),
WatchFunc: serviceWatchFunc(lbc.client, namespace),
},
&api.Service{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
@ -150,39 +129,6 @@ func serviceWatchFunc(c *client.Client, ns string) func(options api.ListOptions)
}
}
func configListFunc(c *client.Client, deployType runtime.Object, ns, name string) func(api.ListOptions) (runtime.Object, error) {
return func(api.ListOptions) (runtime.Object, error) {
switch deployType.(type) {
case *api.ReplicationController:
rc, err := c.ReplicationControllers(ns).Get(name)
return &api.ReplicationControllerList{
Items: []api.ReplicationController{*rc},
}, err
case *extensions.DaemonSet:
ds, err := c.Extensions().DaemonSets(ns).Get(name)
return &extensions.DaemonSetList{
Items: []extensions.DaemonSet{*ds},
}, err
default:
return nil, errInvalidKind
}
}
}
func configWatchFunc(c *client.Client, deployType runtime.Object, ns, name string) func(options api.ListOptions) (watch.Interface, error) {
return func(options api.ListOptions) (watch.Interface, error) {
switch deployType.(type) {
case *api.ReplicationController:
options.LabelSelector = labels.SelectorFromSet(labels.Set{"name": name})
return c.ReplicationControllers(ns).Watch(options)
case *extensions.DaemonSet:
options.LabelSelector = labels.SelectorFromSet(labels.Set{"name": name})
return c.Extensions().DaemonSets(ns).Watch(options)
default:
return nil, errInvalidKind
}
}
}
func endpointsListFunc(c *client.Client, ns string) func(api.ListOptions) (runtime.Object, error) {
return func(opts api.ListOptions) (runtime.Object, error) {
return c.Endpoints(ns).List(opts)
@ -195,12 +141,12 @@ func endpointsWatchFunc(c *client.Client, ns string) func(options api.ListOption
}
}
func (lbc *loadBalancerController) getConfigMap(name string) (api.ConfigMap, error) {
return lbc.client.ConfigMaps(lbc.lbInfo.PodNamespace).Get(name)
func (lbc *loadBalancerController) getConfigMap(ns, name string) (*api.ConfigMap, error) {
return lbc.client.ConfigMaps(ns).Get(name)
}
func (lbc *loadBalancerController) getTCPConfigMap(name string) (api.ConfigMap, error) {
return lbc.client.ConfigMaps(lbc.lbInfo.PodNamespace).Get(name)
func (lbc *loadBalancerController) getTCPConfigMap(ns, name string) (*api.ConfigMap, error) {
return lbc.client.ConfigMaps(ns).Get(name)
}
func (lbc *loadBalancerController) registerHandlers() {
@ -226,22 +172,141 @@ func (lbc *loadBalancerController) sync() {
ings := lbc.ingLister.Store.List()
upstreams, servers := lbc.getUpstreamServers(ings)
cfg, err := lbc.getConfigMap(lbc.nxgConfigMap)
var cfg *api.ConfigMap
ngxConfig, err := lbc.nginx.ReadConfig("")
ns, name, _ := parseNsName(lbc.nxgConfigMap)
cfg, err := lbc.getConfigMap(ns, name)
if err != nil {
cfg = &api.ConfigMap{}
}
ngxConfig, err := lbc.nginx.ReadConfig(cfg)
if err != nil {
glog.Warningf("%v", err)
}
tcpServices := lbc.getTCPServices()
lbc.nginx.CheckAndReload(ngxConfig, upstreams, servers, tcpServices)
lbc.nginx.CheckAndReload(ngxConfig, nginx.IngressConfig{
Upstreams: upstreams,
Servers: servers,
TCPUpstreams: tcpServices,
})
}
func (lbc *loadBalancerController) getTCPServices() []*nginx.Location {
if lbc.tcpConfigMap == "" {
// no configmap for TCP services
return []*nginx.Location{}
}
ns, name, err := parseNsName(lbc.tcpConfigMap)
if err != nil {
glog.Warningf("%v", err)
return []*nginx.Location{}
}
tcpMap, err := lbc.getTCPConfigMap(ns, name)
if err != nil {
glog.V(3).Infof("no configured tcp services found: %v", err)
return []*nginx.Location{}
}
var tcpSvcs []*nginx.Location
// k -> port to expose in nginx
// v -> <namespace>/<service name>:<port from service to be used>
for k, v := range tcpMap.Data {
port, err := strconv.Atoi(k)
if err != nil {
glog.Warningf("%v is not valid as a TCP port", k)
continue
}
svcPort := strings.Split(v, ":")
if len(svcPort) != 2 {
glog.Warningf("invalid format (namespace/name:port) '%v'", k)
continue
}
svcNs, svcName, err := parseNsName(svcPort[0])
if err != nil {
glog.Warningf("%v", err)
continue
}
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcPort[0])
if err != nil {
glog.Warningf("error getting service %v: %v", svcPort[0], err)
continue
}
if !svcExists {
glog.Warningf("service %v was not found", svcPort[0])
continue
}
svc := svcObj.(*api.Service)
var endps []nginx.UpstreamServer
targetPort, err := strconv.Atoi(svcPort[1])
if err != nil {
endps = lbc.getEndpoints(svc, intstr.FromString(svcPort[1]))
} else {
// we need to use the TargetPort (where the endpoints are running)
for _, sp := range svc.Spec.Ports {
if sp.Port == targetPort {
endps = lbc.getEndpoints(svc, sp.TargetPort)
break
}
}
}
tcpSvcs = append(tcpSvcs, &nginx.Location{
Path: k,
Upstream: nginx.Upstream{
Name: fmt.Sprintf("%v-%v-%v", svcNs, svcName, port),
Backends: endps,
},
})
}
return tcpSvcs
}
func (lbc *loadBalancerController) getDefaultUpstream() *nginx.Upstream {
upstream := &nginx.Upstream{
Name: defUpstreamName,
}
svcKey := lbc.defaultSvc
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey)
if err != nil {
glog.Warningf("unexpected error searching the default backend %v: %v", lbc.defaultSvc, err)
upstream.Backends = append(upstream.Backends, nginx.NewDefaultServer())
return upstream
}
if !svcExists {
glog.Warningf("service %v does no exists", svcKey)
upstream.Backends = append(upstream.Backends, nginx.NewDefaultServer())
return upstream
}
svc := svcObj.(*api.Service)
endps := lbc.getEndpoints(svc, svc.Spec.Ports[0].TargetPort)
if len(endps) == 0 {
glog.Warningf("service %v does no have any active endpoints", svcKey)
upstream.Backends = append(upstream.Backends, nginx.NewDefaultServer())
} else {
upstream.Backends = append(upstream.Backends, endps...)
}
return upstream
}
func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]*nginx.Upstream, []*nginx.Server) {
upstreams := lbc.createUpstreams(data)
servers := lbc.createServers(data)
//TODO: add default backend upstream
upstreams[defUpstreamName] = lbc.getDefaultUpstream()
for _, ingIf := range data {
ing := ingIf.(*extensions.Ingress)
@ -252,13 +317,13 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]*ng
}
server := servers[rule.Host]
var locations []nginx.Location
locations := []*nginx.Location{}
for _, path := range rule.HTTP.Paths {
upsName := ing.GetNamespace() + "-" + path.Backend.ServiceName
upsName := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.IntValue())
ups := upstreams[upsName]
svcKey := ing.GetNamespace() + "/" + path.Backend.ServiceName
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName)
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey)
if err != nil {
glog.Infof("error getting service %v from the cache: %v", svcKey, err)
@ -286,7 +351,7 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]*ng
for _, ups := range upstreams {
if upsName == ups.Name {
loc := nginx.Location{Path: path.Path}
loc := &nginx.Location{Path: path.Path}
loc.Upstream = *ups
locations = append(locations, loc)
break
@ -294,7 +359,9 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]*ng
}
}
server.Locations = append(server.Locations, locations...)
for _, loc := range locations {
server.Locations = append(server.Locations, loc)
}
}
}
@ -334,7 +401,7 @@ func (lbc *loadBalancerController) createUpstreams(data []interface{}) map[strin
}
for _, path := range rule.HTTP.Paths {
name := ing.GetNamespace() + "-" + path.Backend.ServiceName
name := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.IntValue())
if _, ok := upstreams[name]; !ok {
upstreams[name] = nginx.NewUpstream(name)
}
@ -355,7 +422,7 @@ func (lbc *loadBalancerController) createServers(data []interface{}) map[string]
for _, rule := range ing.Spec.Rules {
if _, ok := servers[rule.Host]; !ok {
servers[rule.Host] = &nginx.Server{Name: rule.Host}
servers[rule.Host] = &nginx.Server{Name: rule.Host, Locations: []*nginx.Location{}}
}
if pemFile, ok := pems[rule.Host]; ok {
@ -417,18 +484,18 @@ func (lbc *loadBalancerController) getPemsFromIngress(data []interface{}) map[st
// getEndpoints returns a list of <endpoint ip>:<port> for a given service/target port combination.
func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort intstr.IntOrString) []nginx.UpstreamServer {
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String())
ep, err := lbc.endpLister.GetServiceEndpoints(s)
if err != nil {
glog.Warningf("unexpected error obtaining service endpoints: %v", err)
return []nginx.UpstreamServer{}
}
var upsServers []nginx.UpstreamServer
upsServers := []nginx.UpstreamServer{}
for _, ss := range ep.Subsets {
for _, epPort := range ss.Ports {
var targetPort int
switch servicePort.Type {
case intstr.Int:
if epPort.Port == servicePort.IntValue() {
@ -451,6 +518,7 @@ func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort ints
}
}
glog.V(3).Infof("endpoints found: %v", upsServers)
return upsServers
}
@ -474,7 +542,6 @@ func (lbc *loadBalancerController) Run() {
go lbc.nginx.Start()
go lbc.registerHandlers()
go lbc.configController.Run(lbc.stopCh)
go lbc.ingController.Run(lbc.stopCh)
go lbc.endpController.Run(lbc.stopCh)
go lbc.svcController.Run(lbc.stopCh)