Remove interface

This commit is contained in:
Manuel de Brito Fontes 2016-11-11 20:43:35 -03:00
parent ed9a416b01
commit f2b627486d
13 changed files with 72 additions and 872 deletions

View file

@ -76,17 +76,7 @@ var (
reservedPorts = []string{"80", "443", "8181", "18080"}
)
// Interface holds the methods to handle an Ingress backend
type Interface interface {
Start()
Stop() error
Info() string
healthz.HealthzChecker
}
// GenericController watches the kubernetes api and adds/removes services from the loadbalancer
// GenericController holds the boilerplate code required to build an Ingress controlller.
type GenericController struct {
healthz.HealthzChecker
@ -143,12 +133,13 @@ type Configuration struct {
DefaultHealthzURL string
// optional
PublishService string
// Backend is the particular implementation to be used.
// (for instance NGINX)
Backend ingress.Controller
}
// newIngressController creates an Ingress controller
func newIngressController(config *Configuration) Interface {
func newIngressController(config *Configuration) *GenericController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
@ -280,7 +271,7 @@ func newIngressController(config *Configuration) Interface {
IngressLister: ic.ingLister,
})
return ic
return &ic
}
func (ic *GenericController) controllersInSync() bool {
@ -365,8 +356,8 @@ func (ic *GenericController) sync(key interface{}) error {
}
}
upstreams, servers := ic.getUpstreamServers()
var passUpstreams []*ingress.SSLPassthroughUpstreams
upstreams, servers := ic.getBackendServers()
var passUpstreams []*ingress.SSLPassthroughBackend
for _, server := range servers {
if !server.SSLPassthrough {
continue
@ -376,8 +367,8 @@ func (ic *GenericController) sync(key interface{}) error {
if loc.Path != rootLocation {
continue
}
passUpstreams = append(passUpstreams, &ingress.SSLPassthroughUpstreams{
Upstream: loc.Upstream,
passUpstreams = append(passUpstreams, &ingress.SSLPassthroughBackend{
Upstream: loc.Backend,
Host: server.Name,
})
break
@ -388,8 +379,8 @@ func (ic *GenericController) sync(key interface{}) error {
HealthzURL: ic.cfg.DefaultHealthzURL,
Upstreams: upstreams,
Servers: servers,
TCPUpstreams: ic.getTCPServices(),
UDPUpstreams: ic.getUDPServices(),
TCPEndpoints: ic.getTCPServices(),
UPDEndpoints: ic.getUDPServices(),
PassthroughUpstreams: passUpstreams,
})
if err != nil {
@ -497,7 +488,7 @@ func (ic *GenericController) getStreamServices(data map[string]string, proto api
svc := svcObj.(*api.Service)
var endps []ingress.UpstreamServer
var endps []ingress.Endpoint
targetPort, err := strconv.Atoi(svcPort)
if err != nil {
for _, sp := range svc.Spec.Ports {
@ -516,7 +507,7 @@ func (ic *GenericController) getStreamServices(data map[string]string, proto api
}
}
sort.Sort(ingress.UpstreamServerByAddrPort(endps))
sort.Sort(ingress.EndpointByAddrPort(endps))
// tcp upstreams cannot contain empty upstreams and there is no
// default backend equivalent for TCP
@ -527,9 +518,9 @@ func (ic *GenericController) getStreamServices(data map[string]string, proto api
svcs = append(svcs, &ingress.Location{
Path: k,
Upstream: ingress.Upstream{
Upstream: ingress.Backend{
Name: fmt.Sprintf("%v-%v-%v", svcNs, svcName, port),
Backends: endps,
Endpoints: endps,
},
})
}
@ -540,21 +531,21 @@ func (ic *GenericController) getStreamServices(data map[string]string, proto api
// getDefaultUpstream returns an upstream associated with the
// default backend service. In case of error retrieving information
// configure the upstream to return http code 503.
func (ic *GenericController) getDefaultUpstream() *ingress.Upstream {
upstream := &ingress.Upstream{
func (ic *GenericController) getDefaultUpstream() *ingress.Backend {
upstream := &ingress.Backend{
Name: defUpstreamName,
}
svcKey := ic.cfg.DefaultService
svcObj, svcExists, err := ic.svcLister.Indexer.GetByKey(svcKey)
if err != nil {
glog.Warningf("unexpected error searching the default backend %v: %v", ic.cfg.DefaultService, err)
upstream.Backends = append(upstream.Backends, newDefaultServer())
upstream.Endpoints = append(upstream.Endpoints, newDefaultServer())
return upstream
}
if !svcExists {
glog.Warningf("service %v does not exists", svcKey)
upstream.Backends = append(upstream.Backends, newDefaultServer())
upstream.Endpoints = append(upstream.Endpoints, newDefaultServer())
return upstream
}
@ -562,10 +553,10 @@ func (ic *GenericController) getDefaultUpstream() *ingress.Upstream {
endps := ic.getEndpoints(svc, svc.Spec.Ports[0].TargetPort, api.ProtocolTCP, &healthcheck.Upstream{})
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
endps = []ingress.UpstreamServer{newDefaultServer()}
endps = []ingress.Endpoint{newDefaultServer()}
}
upstream.Backends = append(upstream.Backends, endps...)
upstream.Endpoints = append(upstream.Endpoints, endps...)
return upstream
}
@ -579,9 +570,9 @@ func (c ingressByRevision) Less(i, j int) bool {
return ir < jr
}
// getUpstreamServers returns a list of Upstream and Server to be used by the backend
// getBackendServers returns a list of Upstream and Server to be used by the backend
// An upstream can be used in multiple servers if the namespace, service name and port are the same
func (ic *GenericController) getUpstreamServers() ([]*ingress.Upstream, []*ingress.Server) {
func (ic *GenericController) getBackendServers() ([]*ingress.Backend, []*ingress.Server) {
ings := ic.ingLister.Store.List()
sort.Sort(ingressByRevision(ings))
@ -699,19 +690,19 @@ func (ic *GenericController) getUpstreamServers() ([]*ingress.Upstream, []*ingre
addLoc = false
if !loc.IsDefBackend {
glog.V(3).Infof("avoiding replacement of ingress rule %v/%v location %v upstream %v (%v)", ing.Namespace, ing.Name, loc.Path, ups.Name, loc.Upstream.Name)
glog.V(3).Infof("avoiding replacement of ingress rule %v/%v location %v upstream %v (%v)", ing.Namespace, ing.Name, loc.Path, ups.Name, loc.Backend.Name)
break
}
glog.V(3).Infof("replacing ingress rule %v/%v location %v upstream %v (%v)", ing.Namespace, ing.Name, loc.Path, ups.Name, loc.Upstream.Name)
loc.Upstream = *ups
glog.V(3).Infof("replacing ingress rule %v/%v location %v upstream %v (%v)", ing.Namespace, ing.Name, loc.Path, ups.Name, loc.Backend.Name)
loc.Backend = *ups
loc.IsDefBackend = false
loc.BasicDigestAuth = *nginxAuth
loc.RateLimit = *rl
loc.Redirect = *locRew
loc.SecureUpstream = secUpstream
loc.Whitelist = *wl
loc.Upstream = *ups
loc.Backend = *ups
loc.EnableCORS = eCORS
loc.ExternalAuth = ra
loc.Proxy = *prx
@ -744,16 +735,16 @@ func (ic *GenericController) getUpstreamServers() ([]*ingress.Upstream, []*ingre
// TODO: find a way to make this more readable
// The structs must be ordered to always generate the same file
// if the content does not change.
aUpstreams := make([]*ingress.Upstream, 0, len(upstreams))
aUpstreams := make([]*ingress.Backend, 0, len(upstreams))
for _, value := range upstreams {
if len(value.Backends) == 0 {
if len(value.Endpoints) == 0 {
glog.V(3).Infof("upstream %v does not have any active endpoints. Using default backend", value.Name)
value.Backends = append(value.Backends, newDefaultServer())
value.Endpoints = append(value.Endpoints, newDefaultServer())
}
sort.Sort(ingress.UpstreamServerByAddrPort(value.Backends))
sort.Sort(ingress.EndpointByAddrPort(value.Endpoints))
aUpstreams = append(aUpstreams, value)
}
sort.Sort(ingress.UpstreamByNameServers(aUpstreams))
sort.Sort(ingress.BackendByNameServers(aUpstreams))
aServers := make([]*ingress.Server, 0, len(servers))
for _, value := range servers {
@ -781,8 +772,8 @@ func (ic *GenericController) getAuthCertificate(secretName string) (*authtls.SSL
// createUpstreams creates the NGINX upstreams for each service referenced in
// Ingress rules. The servers inside the upstream are endpoints.
func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ingress.Upstream {
upstreams := make(map[string]*ingress.Upstream)
func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ingress.Backend {
upstreams := make(map[string]*ingress.Backend)
upstreams[defUpstreamName] = ic.getDefaultUpstream()
upsDefaults := ic.cfg.Backend.UpstreamDefaults()
@ -803,7 +794,7 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName)
endps, err := ic.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), hz)
upstreams[defBackend].Backends = append(upstreams[defBackend].Backends, endps...)
upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...)
if err != nil {
glog.Warningf("error creating upstream %v: %v", defBackend, err)
}
@ -833,7 +824,7 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
glog.Warningf("error obtaining service endpoints: %v", err)
continue
}
upstreams[name].Backends = endp
upstreams[name].Endpoints = endp
}
}
}
@ -844,10 +835,10 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
// serviceEndpoints returns the upstream servers (endpoints) associated
// to a service.
func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
hz *healthcheck.Upstream) ([]ingress.UpstreamServer, error) {
hz *healthcheck.Upstream) ([]ingress.Endpoint, error) {
svcObj, svcExists, err := ic.svcLister.Indexer.GetByKey(svcKey)
var upstreams []ingress.UpstreamServer
var upstreams []ingress.Endpoint
if err != nil {
return upstreams, fmt.Errorf("error getting service %v from the cache: %v", svcKey, err)
}
@ -878,7 +869,7 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
return upstreams, nil
}
func (ic *GenericController) createServers(data []interface{}, upstreams map[string]*ingress.Upstream) map[string]*ingress.Server {
func (ic *GenericController) createServers(data []interface{}, upstreams map[string]*ingress.Backend) map[string]*ingress.Server {
servers := make(map[string]*ingress.Server)
ngxProxy := *proxy.ParseAnnotations(ic.cfg.Backend.UpstreamDefaults(), nil)
@ -973,15 +964,15 @@ func (ic *GenericController) getEndpoints(
s *api.Service,
servicePort intstr.IntOrString,
proto api.Protocol,
hz *healthcheck.Upstream) []ingress.UpstreamServer {
hz *healthcheck.Upstream) []ingress.Endpoint {
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String())
ep, err := ic.endpLister.GetServiceEndpoints(s)
if err != nil {
glog.Warningf("unexpected error obtaining service endpoints: %v", err)
return []ingress.UpstreamServer{}
return []ingress.Endpoint{}
}
upsServers := []ingress.UpstreamServer{}
upsServers := []ingress.Endpoint{}
for _, ss := range ep.Subsets {
for _, epPort := range ss.Ports {
@ -1023,7 +1014,7 @@ func (ic *GenericController) getEndpoints(
}
for _, epAddress := range ss.Addresses {
ups := ingress.UpstreamServer{
ups := ingress.Endpoint{
Address: epAddress.IP,
Port: fmt.Sprintf("%v", targetPort),
MaxFails: hz.MaxFails,