Allow custom health checks
This commit is contained in:
parent
a38fcda255
commit
675ce396ac
14 changed files with 340 additions and 41 deletions
|
|
@ -40,6 +40,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/util/intstr"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
||||
"k8s.io/contrib/ingress/controllers/nginx/healthcheck"
|
||||
"k8s.io/contrib/ingress/controllers/nginx/nginx"
|
||||
)
|
||||
|
||||
|
|
@ -327,9 +328,6 @@ func (lbc *loadBalancerController) sync(key string) {
|
|||
return
|
||||
}
|
||||
|
||||
ings := lbc.ingLister.Store.List()
|
||||
upstreams, servers := lbc.getUpstreamServers(ings)
|
||||
|
||||
var cfg *api.ConfigMap
|
||||
|
||||
ns, name, _ := parseNsName(lbc.nxgConfigMap)
|
||||
|
|
@ -339,6 +337,10 @@ func (lbc *loadBalancerController) sync(key string) {
|
|||
}
|
||||
|
||||
ngxConfig := lbc.nginx.ReadConfig(cfg)
|
||||
|
||||
ings := lbc.ingLister.Store.List()
|
||||
upstreams, servers := lbc.getUpstreamServers(ngxConfig, ings)
|
||||
|
||||
lbc.nginx.CheckAndReload(ngxConfig, nginx.IngressConfig{
|
||||
Upstreams: upstreams,
|
||||
Servers: servers,
|
||||
|
|
@ -489,7 +491,7 @@ func (lbc *loadBalancerController) getStreamServices(data map[string]string, pro
|
|||
if err != nil {
|
||||
for _, sp := range svc.Spec.Ports {
|
||||
if sp.Name == svcPort {
|
||||
endps = lbc.getEndpoints(svc, sp.TargetPort, proto)
|
||||
endps = lbc.getEndpoints(svc, sp.TargetPort, proto, &healthcheck.Upstream{})
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
@ -497,7 +499,7 @@ func (lbc *loadBalancerController) getStreamServices(data map[string]string, pro
|
|||
// we need to use the TargetPort (where the endpoints are running)
|
||||
for _, sp := range svc.Spec.Ports {
|
||||
if sp.Port == int32(targetPort) {
|
||||
endps = lbc.getEndpoints(svc, sp.TargetPort, proto)
|
||||
endps = lbc.getEndpoints(svc, sp.TargetPort, proto, &healthcheck.Upstream{})
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
@ -542,7 +544,7 @@ func (lbc *loadBalancerController) getDefaultUpstream() *nginx.Upstream {
|
|||
|
||||
svc := svcObj.(*api.Service)
|
||||
|
||||
endps := lbc.getEndpoints(svc, svc.Spec.Ports[0].TargetPort, api.ProtocolTCP)
|
||||
endps := lbc.getEndpoints(svc, svc.Spec.Ports[0].TargetPort, api.ProtocolTCP, &healthcheck.Upstream{})
|
||||
if len(endps) == 0 {
|
||||
glog.Warningf("service %v does no have any active endpoints", svcKey)
|
||||
upstream.Backends = append(upstream.Backends, nginx.NewDefaultServer())
|
||||
|
|
@ -553,8 +555,8 @@ func (lbc *loadBalancerController) getDefaultUpstream() *nginx.Upstream {
|
|||
return upstream
|
||||
}
|
||||
|
||||
func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]*nginx.Upstream, []*nginx.Server) {
|
||||
upstreams := lbc.createUpstreams(data)
|
||||
func (lbc *loadBalancerController) getUpstreamServers(ngxCfg nginx.NginxConfiguration, data []interface{}) ([]*nginx.Upstream, []*nginx.Server) {
|
||||
upstreams := lbc.createUpstreams(ngxCfg, data)
|
||||
upstreams[defUpstreamName] = lbc.getDefaultUpstream()
|
||||
|
||||
servers := lbc.createServers(data)
|
||||
|
|
@ -655,12 +657,14 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]*ng
|
|||
|
||||
// createUpstreams creates the NGINX upstreams for each service referenced in
|
||||
// Ingress rules. The servers inside the upstream are endpoints.
|
||||
func (lbc *loadBalancerController) createUpstreams(data []interface{}) map[string]*nginx.Upstream {
|
||||
func (lbc *loadBalancerController) createUpstreams(ngxCfg nginx.NginxConfiguration, data []interface{}) map[string]*nginx.Upstream {
|
||||
upstreams := make(map[string]*nginx.Upstream)
|
||||
|
||||
for _, ingIf := range data {
|
||||
ing := ingIf.(*extensions.Ingress)
|
||||
|
||||
hz := healthcheck.ParseAnnotations(ngxCfg, ing)
|
||||
|
||||
for _, rule := range ing.Spec.Rules {
|
||||
if rule.IngressRuleValue.HTTP == nil {
|
||||
continue
|
||||
|
|
@ -693,7 +697,7 @@ func (lbc *loadBalancerController) createUpstreams(data []interface{}) map[strin
|
|||
for _, servicePort := range svc.Spec.Ports {
|
||||
// targetPort could be a string, use the name or the port (int)
|
||||
if strconv.Itoa(int(servicePort.Port)) == bp || servicePort.TargetPort.String() == bp || servicePort.Name == bp {
|
||||
endps := lbc.getEndpoints(svc, servicePort.TargetPort, api.ProtocolTCP)
|
||||
endps := lbc.getEndpoints(svc, servicePort.TargetPort, api.ProtocolTCP, hz)
|
||||
if len(endps) == 0 {
|
||||
glog.Warningf("service %v does no have any active endpoints", svcKey)
|
||||
}
|
||||
|
|
@ -801,7 +805,7 @@ func (lbc *loadBalancerController) getPemsFromIngress(data []interface{}) map[st
|
|||
}
|
||||
|
||||
// getEndpoints returns a list of <endpoint ip>:<port> for a given service/target port combination.
|
||||
func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort intstr.IntOrString, proto api.Protocol) []nginx.UpstreamServer {
|
||||
func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort intstr.IntOrString, proto api.Protocol, hz *healthcheck.Upstream) []nginx.UpstreamServer {
|
||||
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String())
|
||||
ep, err := lbc.endpLister.GetServiceEndpoints(s)
|
||||
if err != nil {
|
||||
|
|
@ -859,7 +863,12 @@ func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort ints
|
|||
}
|
||||
|
||||
for _, epAddress := range ss.Addresses {
|
||||
ups := nginx.UpstreamServer{Address: epAddress.IP, Port: fmt.Sprintf("%v", targetPort)}
|
||||
ups := nginx.UpstreamServer{
|
||||
Address: epAddress.IP,
|
||||
Port: fmt.Sprintf("%v", targetPort),
|
||||
MaxFails: hz.MaxFails,
|
||||
FailTimeout: hz.FailTimeout,
|
||||
}
|
||||
upsServers = append(upsServers, ups)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue