This commit is contained in:
Manuel de Brito Fontes 2017-09-17 15:42:31 -03:00
parent f478084cd8
commit 0661eaa08c
29 changed files with 264 additions and 281 deletions

View file

@ -28,7 +28,7 @@ import (
"github.com/golang/glog"
api "k8s.io/api/core/v1"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/fields"
@ -37,11 +37,12 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
fcache "k8s.io/client-go/tools/cache/testing"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/ingress/core/pkg/ingress"
"k8s.io/ingress/core/pkg/ingress/annotations/class"
"k8s.io/ingress/core/pkg/ingress/annotations/healthcheck"
@ -157,7 +158,7 @@ func newIngressController(config *Configuration) *GenericController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
Interface: config.Client.CoreV1().Events(config.Namespace),
})
@ -166,7 +167,7 @@ func newIngressController(config *Configuration) *GenericController {
stopLock: &sync.Mutex{},
stopCh: make(chan struct{}),
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, api.EventSource{
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: "ingress-controller",
}),
sslCertTracker: newSSLCertTracker(),
@ -185,7 +186,7 @@ func newIngressController(config *Configuration) *GenericController {
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
return
}
ic.recorder.Eventf(addIng, api.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
ic.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
ic.syncQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
@ -207,7 +208,7 @@ func newIngressController(config *Configuration) *GenericController {
glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
return
}
ic.recorder.Eventf(delIng, api.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
ic.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
ic.syncQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
@ -217,12 +218,12 @@ func newIngressController(config *Configuration) *GenericController {
validCur := class.IsValid(curIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass)
if !validOld && validCur {
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
ic.recorder.Eventf(curIng, api.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validOld && !validCur {
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
ic.recorder.Eventf(curIng, api.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validCur && !reflect.DeepEqual(old, cur) {
ic.recorder.Eventf(curIng, api.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
}
ic.syncQueue.Enqueue(cur)
@ -232,13 +233,13 @@ func newIngressController(config *Configuration) *GenericController {
secrEventHandler := cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
sec := cur.(*api.Secret)
sec := cur.(*apiv1.Secret)
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
ic.syncSecret(key)
}
},
DeleteFunc: func(obj interface{}) {
sec, ok := obj.(*api.Secret)
sec, ok := obj.(*apiv1.Secret)
if !ok {
// If we reached here it means the secret was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
@ -246,7 +247,7 @@ func newIngressController(config *Configuration) *GenericController {
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
sec, ok = tombstone.Obj.(*api.Secret)
sec, ok = tombstone.Obj.(*apiv1.Secret)
if !ok {
glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj)
return
@ -265,8 +266,8 @@ func newIngressController(config *Configuration) *GenericController {
ic.syncQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
oep := old.(*api.Endpoints)
ocur := cur.(*api.Endpoints)
oep := old.(*apiv1.Endpoints)
ocur := cur.(*apiv1.Endpoints)
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
ic.syncQueue.Enqueue(cur)
}
@ -275,7 +276,7 @@ func newIngressController(config *Configuration) *GenericController {
mapEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
upCmap := obj.(*api.ConfigMap)
upCmap := obj.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
if mapKey == ic.cfg.ConfigMapName {
glog.V(2).Infof("adding configmap %v to backend", mapKey)
@ -285,7 +286,7 @@ func newIngressController(config *Configuration) *GenericController {
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
upCmap := cur.(*api.ConfigMap)
upCmap := cur.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
if mapKey == ic.cfg.ConfigMapName {
glog.V(2).Infof("updating configmap backend (%v)", mapKey)
@ -294,15 +295,15 @@ func newIngressController(config *Configuration) *GenericController {
}
// updates to configuration configmaps can trigger an update
if mapKey == ic.cfg.ConfigMapName || mapKey == ic.cfg.TCPConfigMapName || mapKey == ic.cfg.UDPConfigMapName {
ic.recorder.Eventf(upCmap, api.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
ic.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
ic.syncQueue.Enqueue(cur)
}
}
},
}
watchNs := api.NamespaceAll
if ic.cfg.ForceNamespaceIsolation && ic.cfg.Namespace != api.NamespaceAll {
watchNs := apiv1.NamespaceAll
if ic.cfg.ForceNamespaceIsolation && ic.cfg.Namespace != apiv1.NamespaceAll {
watchNs = ic.cfg.Namespace
}
@ -312,29 +313,29 @@ func newIngressController(config *Configuration) *GenericController {
ic.endpLister.Store, ic.endpController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()),
&api.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler)
&apiv1.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler)
ic.secrLister.Store, ic.secrController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()),
&api.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler)
&apiv1.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler)
ic.mapLister.Store, ic.mapController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()),
&api.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler)
&apiv1.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler)
ic.svcLister.Store, ic.svcController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()),
&api.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
&apiv1.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
var nodeListerWatcher cache.ListerWatcher
if config.DisableNodeList {
nodeListerWatcher = fcache.NewFakeControllerSource()
} else {
nodeListerWatcher = cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "nodes", api.NamespaceAll, fields.Everything())
nodeListerWatcher = cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
}
ic.nodeLister.Store, ic.nodeController = cache.NewInformer(
nodeListerWatcher,
&api.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
&apiv1.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
if config.UpdateStatus {
ic.syncStatus = status.NewStatusSyncer(status.Config{
@ -382,7 +383,7 @@ func (ic GenericController) GetDefaultBackend() defaults.Backend {
}
// GetPublishService returns the configured service used to set ingress status
func (ic GenericController) GetPublishService() *api.Service {
func (ic GenericController) GetPublishService() *apiv1.Service {
s, err := ic.GetService(ic.cfg.PublishService)
if err != nil {
return nil
@ -397,7 +398,7 @@ func (ic GenericController) GetRecorder() record.EventRecorder {
}
// GetSecret searches for a secret in the local secrets Store
func (ic GenericController) GetSecret(name string) (*api.Secret, error) {
func (ic GenericController) GetSecret(name string) (*apiv1.Secret, error) {
s, exists, err := ic.secrLister.Store.GetByKey(name)
if err != nil {
return nil, err
@ -405,11 +406,11 @@ func (ic GenericController) GetSecret(name string) (*api.Secret, error) {
if !exists {
return nil, fmt.Errorf("secret %v was not found", name)
}
return s.(*api.Secret), nil
return s.(*apiv1.Secret), nil
}
// GetService searches for a service in the local secrets Store
func (ic GenericController) GetService(name string) (*api.Service, error) {
func (ic GenericController) GetService(name string) (*apiv1.Service, error) {
s, exists, err := ic.svcLister.Store.GetByKey(name)
if err != nil {
return nil, err
@ -417,10 +418,10 @@ func (ic GenericController) GetService(name string) (*api.Service, error) {
if !exists {
return nil, fmt.Errorf("service %v was not found", name)
}
return s.(*api.Service), nil
return s.(*apiv1.Service), nil
}
func (ic *GenericController) getConfigMap(ns, name string) (*api.ConfigMap, error) {
func (ic *GenericController) getConfigMap(ns, name string) (*apiv1.ConfigMap, error) {
s, exists, err := ic.mapLister.Store.GetByKey(fmt.Sprintf("%v/%v", ns, name))
if err != nil {
return nil, err
@ -428,7 +429,7 @@ func (ic *GenericController) getConfigMap(ns, name string) (*api.ConfigMap, erro
if !exists {
return nil, fmt.Errorf("configmap %v was not found", name)
}
return s.(*api.ConfigMap), nil
return s.(*apiv1.ConfigMap), nil
}
// sync collects all the pieces required to assemble the configuration file and
@ -474,8 +475,8 @@ func (ic *GenericController) syncIngress(key interface{}) error {
pcfg := ingress.Configuration{
Backends: upstreams,
Servers: servers,
TCPEndpoints: ic.getStreamServices(ic.cfg.TCPConfigMapName, api.ProtocolTCP),
UDPEndpoints: ic.getStreamServices(ic.cfg.UDPConfigMapName, api.ProtocolUDP),
TCPEndpoints: ic.getStreamServices(ic.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
UDPEndpoints: ic.getStreamServices(ic.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
PassthroughBackends: passUpstreams,
}
@ -503,7 +504,7 @@ func (ic *GenericController) syncIngress(key interface{}) error {
return nil
}
func (ic *GenericController) getStreamServices(configmapName string, proto api.Protocol) []ingress.L4Service {
func (ic *GenericController) getStreamServices(configmapName string, proto apiv1.Protocol) []ingress.L4Service {
glog.V(3).Infof("obtaining information about stream services of type %v located in configmap %v", proto, configmapName)
if configmapName == "" {
// no configmap configured
@ -549,7 +550,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto api.P
useProxyProtocol := false
// Proxy protocol is possible if the service is TCP
if len(nsSvcPort) == 3 && proto == api.ProtocolTCP {
if len(nsSvcPort) == 3 && proto == apiv1.ProtocolTCP {
if strings.ToUpper(nsSvcPort[2]) == "PROXY" {
useProxyProtocol = true
}
@ -572,7 +573,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto api.P
continue
}
svc := svcObj.(*api.Service)
svc := svcObj.(*apiv1.Service)
var endps []ingress.Endpoint
targetPort, err := strconv.Atoi(svcPort)
@ -643,8 +644,8 @@ func (ic *GenericController) getDefaultUpstream() *ingress.Backend {
return upstream
}
svc := svcObj.(*api.Service)
endps := ic.getEndpoints(svc, &svc.Spec.Ports[0], api.ProtocolTCP, &healthcheck.Upstream{})
svc := svcObj.(*apiv1.Service)
endps := ic.getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Upstream{})
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
endps = []ingress.Endpoint{ic.cfg.Backend.DefaultEndpoint()}
@ -811,7 +812,7 @@ func (ic *GenericController) getBackendServers() ([]*ingress.Backend, []*ingress
// check if the location contains endpoints and a custom default backend
if location.DefaultBackend != nil {
sp := location.DefaultBackend.Spec.Ports[0]
endps := ic.getEndpoints(location.DefaultBackend, &sp, api.ProtocolTCP, &healthcheck.Upstream{})
endps := ic.getEndpoints(location.DefaultBackend, &sp, apiv1.ProtocolTCP, &healthcheck.Upstream{})
if len(endps) > 0 {
glog.V(3).Infof("using custom default backend in server %v location %v (service %v/%v)",
server.Hostname, location.Path, location.DefaultBackend.Namespace, location.DefaultBackend.Name)
@ -1004,7 +1005,7 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
continue
}
upstreams[name].Service = s.(*api.Service)
upstreams[name].Service = s.(*apiv1.Service)
}
}
}
@ -1019,7 +1020,7 @@ func (ic *GenericController) getServiceClusterEndpoint(svcKey string, backend *e
return endpoint, fmt.Errorf("service %v does not exist", svcKey)
}
svc := svcObj.(*api.Service)
svc := svcObj.(*apiv1.Service)
if svc.Spec.ClusterIP == "" {
return endpoint, fmt.Errorf("No ClusterIP found for service %s", svcKey)
}
@ -1046,7 +1047,7 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
return upstreams, err
}
svc := svcObj.(*api.Service)
svc := svcObj.(*apiv1.Service)
glog.V(3).Infof("obtaining port information for service %v", svcKey)
for _, servicePort := range svc.Spec.Ports {
// targetPort could be a string, use the name or the port (int)
@ -1054,7 +1055,7 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
servicePort.TargetPort.String() == backendPort ||
servicePort.Name == backendPort {
endps := ic.getEndpoints(svc, &servicePort, api.ProtocolTCP, hz)
endps := ic.getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, hz)
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
}
@ -1175,7 +1176,7 @@ func (ic *GenericController) createServers(data []interface{},
IsDefBackend: true,
Backend: un,
Proxy: ngxProxy,
Service: &api.Service{},
Service: &apiv1.Service{},
},
}, SSLPassthrough: sslpt}
}
@ -1262,9 +1263,9 @@ func (ic *GenericController) createServers(data []interface{},
// getEndpoints returns a list of <endpoint ip>:<port> for a given service/target port combination.
func (ic *GenericController) getEndpoints(
s *api.Service,
servicePort *api.ServicePort,
proto api.Protocol,
s *apiv1.Service,
servicePort *apiv1.ServicePort,
proto apiv1.Protocol,
hz *healthcheck.Upstream) []ingress.Endpoint {
upsServers := []ingress.Endpoint{}
@ -1275,7 +1276,7 @@ func (ic *GenericController) getEndpoints(
adus := make(map[string]bool)
// ExternalName services
if s.Spec.Type == api.ServiceTypeExternalName {
if s.Spec.Type == apiv1.ServiceTypeExternalName {
targetPort := servicePort.TargetPort.IntValue()
// check for invalid port value
if targetPort <= 0 {