Add configuration for retries in non-idempotent requests

This commit is contained in:
Manuel de Brito Fontes 2016-03-30 00:47:20 -03:00
parent c9f8a06399
commit 7abc7a77f6
8 changed files with 105 additions and 32 deletions

View file

@ -198,11 +198,11 @@ func (lbc *loadBalancerController) sync(key string) {
}
ngxConfig := lbc.nginx.ReadConfig(cfg)
tcpServices := lbc.getTCPServices()
lbc.nginx.CheckAndReload(ngxConfig, nginx.IngressConfig{
Upstreams: upstreams,
Servers: servers,
TCPUpstreams: tcpServices,
TCPUpstreams: lbc.getTCPServices(),
UDPUpstreams: lbc.getUDPServices(),
})
}
@ -285,12 +285,12 @@ func (lbc *loadBalancerController) getServices(data map[string]string, proto api
var endps []nginx.UpstreamServer
targetPort, err := strconv.Atoi(svcPort[1])
if err != nil {
endps = lbc.getEndpoints(svc, intstr.FromString(svcPort[1]))
endps = lbc.getEndpoints(svc, intstr.FromString(svcPort[1]), proto)
} else {
// we need to use the TargetPort (where the endpoints are running)
for _, sp := range svc.Spec.Ports {
if sp.Port == targetPort {
endps = lbc.getEndpoints(svc, sp.TargetPort)
endps = lbc.getEndpoints(svc, sp.TargetPort, proto)
break
}
}
@ -335,7 +335,7 @@ func (lbc *loadBalancerController) getDefaultUpstream() *nginx.Upstream {
svc := svcObj.(*api.Service)
endps := lbc.getEndpoints(svc, svc.Spec.Ports[0].TargetPort)
endps := lbc.getEndpoints(svc, svc.Spec.Ports[0].TargetPort, api.ProtocolTCP)
if len(endps) == 0 {
glog.Warningf("service %v does no have any active endpoints", svcKey)
upstream.Backends = append(upstream.Backends, nginx.NewDefaultServer())
@ -383,7 +383,7 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]*ng
for _, servicePort := range svc.Spec.Ports {
if servicePort.Port == path.Backend.ServicePort.IntValue() {
endps := lbc.getEndpoints(svc, servicePort.TargetPort)
endps := lbc.getEndpoints(svc, servicePort.TargetPort, api.ProtocolTCP)
if len(endps) == 0 {
glog.Warningf("service %v does no have any active endpoints", svcKey)
}
@ -526,7 +526,7 @@ func (lbc *loadBalancerController) getPemsFromIngress(data []interface{}) map[st
}
// getEndpoints returns a list of <endpoint ip>:<port> for a given service/target port combination.
func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort intstr.IntOrString) []nginx.UpstreamServer {
func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort intstr.IntOrString, proto api.Protocol) []nginx.UpstreamServer {
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String())
ep, err := lbc.endpLister.GetServiceEndpoints(s)
if err != nil {
@ -538,6 +538,11 @@ func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort ints
for _, ss := range ep.Subsets {
for _, epPort := range ss.Ports {
if !reflect.DeepEqual(epPort.Protocol, proto) {
continue
}
var targetPort int
switch servicePort.Type {
case intstr.Int: