Update GCE ingress controller

This commit is contained in:
Manuel de Brito Fontes 2016-11-10 20:31:49 -03:00
parent 1bc383f9c5
commit 4d1887310b
21 changed files with 125 additions and 111 deletions

View file

@ -23,14 +23,15 @@ import (
"sync"
"time"
"k8s.io/contrib/ingress/controllers/gce/loadbalancers"
"k8s.io/contrib/ingress/controllers/gce/utils"
"k8s.io/ingress/controllers/gce/loadbalancers"
"k8s.io/ingress/controllers/gce/utils"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
@ -39,7 +40,7 @@ import (
)
var (
keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
// DefaultClusterUID is the uid to use for clusters resources created by an
// L7 controller created without specifying the --cluster-uid flag.
@ -52,11 +53,11 @@ var (
// LoadBalancerController watches the kubernetes api and adds/removes services
// from the loadbalancer, via loadBalancerConfig.
type LoadBalancerController struct {
client *client.Client
ingController *framework.Controller
nodeController *framework.Controller
svcController *framework.Controller
podController *framework.Controller
client client.Interface
ingController *cache.Controller
nodeController *cache.Controller
svcController *cache.Controller
podController *cache.Controller
ingLister StoreToIngressLister
nodeLister cache.StoreToNodeLister
svcLister cache.StoreToServiceLister
@ -86,11 +87,12 @@ type LoadBalancerController struct {
// - clusterManager: A ClusterManager capable of creating all cloud resources
// required for L7 loadbalancing.
// - resyncPeriod: Watchers relist from the Kubernetes API server this often.
func NewLoadBalancerController(kubeClient *client.Client, clusterManager *ClusterManager, resyncPeriod time.Duration, namespace string) (*LoadBalancerController, error) {
func NewLoadBalancerController(kubeClient client.Interface, clusterManager *ClusterManager, resyncPeriod time.Duration, namespace string) (*LoadBalancerController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
eventBroadcaster.StartRecordingToSink(unversionedcore.EventSinkImpl{
Interface: kubeClient.Core().Events(""),
})
lbc := LoadBalancerController{
client: kubeClient,
CloudClusterManager: clusterManager,
@ -103,7 +105,7 @@ func NewLoadBalancerController(kubeClient *client.Client, clusterManager *Cluste
lbc.hasSynced = lbc.storesSynced
// Ingress watch handlers
pathHandlers := framework.ResourceEventHandlerFuncs{
pathHandlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !isGCEIngress(addIng) {
@ -133,7 +135,7 @@ func NewLoadBalancerController(kubeClient *client.Client, clusterManager *Cluste
lbc.ingQueue.enqueue(cur)
},
}
lbc.ingLister.Store, lbc.ingController = framework.NewInformer(
lbc.ingLister.Store, lbc.ingController = cache.NewInformer(
&cache.ListWatch{
ListFunc: ingressListFunc(lbc.client, namespace),
WatchFunc: ingressWatchFunc(lbc.client, namespace),
@ -141,7 +143,7 @@ func NewLoadBalancerController(kubeClient *client.Client, clusterManager *Cluste
&extensions.Ingress{}, resyncPeriod, pathHandlers)
// Service watch handlers
svcHandlers := framework.ResourceEventHandlerFuncs{
svcHandlers := cache.ResourceEventHandlerFuncs{
AddFunc: lbc.enqueueIngressForService,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
@ -151,36 +153,39 @@ func NewLoadBalancerController(kubeClient *client.Client, clusterManager *Cluste
// Ingress deletes matter, service deletes don't.
}
lbc.svcLister.Store, lbc.svcController = framework.NewInformer(
cache.NewListWatchFromClient(
lbc.client, "services", namespace, fields.Everything()),
&api.Service{}, resyncPeriod, svcHandlers)
lbc.podLister.Indexer, lbc.podController = framework.NewIndexerInformer(
cache.NewListWatchFromClient(lbc.client, "pods", namespace, fields.Everything()),
&api.Pod{},
lbc.svcLister.Indexer, lbc.svcController = cache.NewIndexerInformer(
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "services", namespace, fields.Everything()),
&api.Service{},
resyncPeriod,
framework.ResourceEventHandlerFuncs{},
svcHandlers,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
nodeHandlers := framework.ResourceEventHandlerFuncs{
lbc.podLister.Indexer, lbc.podController = cache.NewIndexerInformer(
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "pods", namespace, fields.Everything()),
&api.Pod{},
resyncPeriod,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
nodeHandlers := cache.ResourceEventHandlerFuncs{
AddFunc: lbc.nodeQueue.enqueue,
DeleteFunc: lbc.nodeQueue.enqueue,
// Nodes are updated every 10s and we don't care, so no update handler.
}
// Node watch handlers
lbc.nodeLister.Store, lbc.nodeController = framework.NewInformer(
lbc.nodeLister.Store, lbc.nodeController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(opts api.ListOptions) (runtime.Object, error) {
return lbc.client.Get().
return lbc.client.Core().RESTClient().Get().
Resource("nodes").
FieldsSelectorParam(fields.Everything()).
Do().
Get()
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return lbc.client.Get().
return lbc.client.Core().RESTClient().Get().
Prefix("watch").
Resource("nodes").
FieldsSelectorParam(fields.Everything()).
@ -196,15 +201,15 @@ func NewLoadBalancerController(kubeClient *client.Client, clusterManager *Cluste
return &lbc, nil
}
func ingressListFunc(c *client.Client, ns string) func(api.ListOptions) (runtime.Object, error) {
func ingressListFunc(c client.Interface, ns string) func(api.ListOptions) (runtime.Object, error) {
return func(opts api.ListOptions) (runtime.Object, error) {
return c.Extensions().Ingress(ns).List(opts)
return c.Extensions().Ingresses(ns).List(opts)
}
}
func ingressWatchFunc(c *client.Client, ns string) func(options api.ListOptions) (watch.Interface, error) {
func ingressWatchFunc(c client.Interface, ns string) func(options api.ListOptions) (watch.Interface, error) {
return func(options api.ListOptions) (watch.Interface, error) {
return c.Extensions().Ingress(ns).Watch(options)
return c.Extensions().Ingresses(ns).Watch(options)
}
}
@ -364,7 +369,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
// updateIngressStatus updates the IP and annotations of a loadbalancer.
// The annotations are parsed by kubectl describe.
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing extensions.Ingress) error {
ingClient := lbc.client.Extensions().Ingress(ing.Namespace)
ingClient := lbc.client.Extensions().Ingresses(ing.Namespace)
// Update IP through update/status endpoint
ip := l7.GetIP()