Update gce controller
This commit is contained in:
parent
ea7f943160
commit
c7c2a564a9
26 changed files with 275 additions and 236 deletions
|
|
@ -23,20 +23,21 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
listers "k8s.io/client-go/listers/core/v1"
|
||||
base_api "k8s.io/client-go/pkg/api"
|
||||
api "k8s.io/client-go/pkg/api/v1"
|
||||
extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
||||
"k8s.io/ingress/controllers/gce/loadbalancers"
|
||||
"k8s.io/ingress/controllers/gce/utils"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
client "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -57,16 +58,16 @@ var (
|
|||
// LoadBalancerController watches the kubernetes api and adds/removes services
|
||||
// from the loadbalancer, via loadBalancerConfig.
|
||||
type LoadBalancerController struct {
|
||||
client client.Interface
|
||||
ingController *cache.Controller
|
||||
nodeController *cache.Controller
|
||||
svcController *cache.Controller
|
||||
podController *cache.Controller
|
||||
client kubernetes.Interface
|
||||
ingController cache.Controller
|
||||
nodeController cache.Controller
|
||||
svcController cache.Controller
|
||||
podController cache.Controller
|
||||
ingLister StoreToIngressLister
|
||||
nodeLister cache.StoreToNodeLister
|
||||
svcLister cache.StoreToServiceLister
|
||||
nodeLister StoreToNodeLister
|
||||
svcLister StoreToServiceLister
|
||||
// Health checks are the readiness probes of containers on pods.
|
||||
podLister cache.StoreToPodLister
|
||||
podLister StoreToPodLister
|
||||
// TODO: Watch secrets
|
||||
CloudClusterManager *ClusterManager
|
||||
recorder record.EventRecorder
|
||||
|
|
@ -91,7 +92,7 @@ type LoadBalancerController struct {
|
|||
// - clusterManager: A ClusterManager capable of creating all cloud resources
|
||||
// required for L7 loadbalancing.
|
||||
// - resyncPeriod: Watchers relist from the Kubernetes API server this often.
|
||||
func NewLoadBalancerController(kubeClient client.Interface, clusterManager *ClusterManager, resyncPeriod time.Duration, namespace string) (*LoadBalancerController, error) {
|
||||
func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *ClusterManager, resyncPeriod time.Duration, namespace string) (*LoadBalancerController, error) {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
|
||||
|
|
@ -101,7 +102,7 @@ func NewLoadBalancerController(kubeClient client.Interface, clusterManager *Clus
|
|||
client: kubeClient,
|
||||
CloudClusterManager: clusterManager,
|
||||
stopCh: make(chan struct{}),
|
||||
recorder: eventBroadcaster.NewRecorder(
|
||||
recorder: eventBroadcaster.NewRecorder(base_api.Scheme,
|
||||
api.EventSource{Component: "loadbalancer-controller"}),
|
||||
}
|
||||
lbc.nodeQueue = NewTaskQueue(lbc.syncNodes)
|
||||
|
|
@ -140,10 +141,7 @@ func NewLoadBalancerController(kubeClient client.Interface, clusterManager *Clus
|
|||
},
|
||||
}
|
||||
lbc.ingLister.Store, lbc.ingController = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: ingressListFunc(lbc.client, namespace),
|
||||
WatchFunc: ingressWatchFunc(lbc.client, namespace),
|
||||
},
|
||||
cache.NewListWatchFromClient(lbc.client.Extensions().RESTClient(), "ingresses", namespace, fields.Everything()),
|
||||
&extensions.Ingress{}, resyncPeriod, pathHandlers)
|
||||
|
||||
// Service watch handlers
|
||||
|
|
@ -173,30 +171,14 @@ func NewLoadBalancerController(kubeClient client.Interface, clusterManager *Clus
|
|||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
|
||||
nodeHandlers := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: lbc.nodeQueue.enqueue,
|
||||
DeleteFunc: lbc.nodeQueue.enqueue,
|
||||
// Nodes are updated every 10s and we don't care, so no update handler.
|
||||
}
|
||||
// Node watch handlers
|
||||
lbc.nodeLister.Store, lbc.nodeController = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(opts api.ListOptions) (runtime.Object, error) {
|
||||
return lbc.client.Core().RESTClient().Get().
|
||||
Resource("nodes").
|
||||
FieldsSelectorParam(fields.Everything()).
|
||||
Do().
|
||||
Get()
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return lbc.client.Core().RESTClient().Get().
|
||||
Prefix("watch").
|
||||
Resource("nodes").
|
||||
FieldsSelectorParam(fields.Everything()).
|
||||
Param("resourceVersion", options.ResourceVersion).Watch()
|
||||
},
|
||||
},
|
||||
&api.Node{}, 0, nodeHandlers)
|
||||
lbc.nodeLister.Indexer, lbc.nodeController = cache.NewIndexerInformer(
|
||||
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "nodes", api.NamespaceAll, fields.Everything()),
|
||||
&api.Node{},
|
||||
resyncPeriod,
|
||||
cache.ResourceEventHandlerFuncs{},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
|
||||
lbc.tr = &GCETranslator{&lbc}
|
||||
lbc.tlsLoader = &apiServerTLSLoader{client: lbc.client}
|
||||
|
|
@ -205,18 +187,6 @@ func NewLoadBalancerController(kubeClient client.Interface, clusterManager *Clus
|
|||
return &lbc, nil
|
||||
}
|
||||
|
||||
func ingressListFunc(c client.Interface, ns string) func(api.ListOptions) (runtime.Object, error) {
|
||||
return func(opts api.ListOptions) (runtime.Object, error) {
|
||||
return c.Extensions().Ingresses(ns).List(opts)
|
||||
}
|
||||
}
|
||||
|
||||
func ingressWatchFunc(c client.Interface, ns string) func(options api.ListOptions) (watch.Interface, error) {
|
||||
return func(options api.ListOptions) (watch.Interface, error) {
|
||||
return c.Extensions().Ingresses(ns).Watch(options)
|
||||
}
|
||||
}
|
||||
|
||||
// enqueueIngressForService enqueues all the Ingress' for a Service.
|
||||
func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) {
|
||||
svc := obj.(*api.Service)
|
||||
|
|
@ -377,7 +347,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
|
|||
|
||||
// Update IP through update/status endpoint
|
||||
ip := l7.GetIP()
|
||||
currIng, err := ingClient.Get(ing.Name)
|
||||
currIng, err := ingClient.Get(ing.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -401,7 +371,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
|
|||
}
|
||||
}
|
||||
// Update annotations through /update endpoint
|
||||
currIng, err = ingClient.Get(ing.Name)
|
||||
currIng, err = ingClient.Get(ing.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -464,7 +434,7 @@ func (lbc *LoadBalancerController) syncNodes(key string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func getNodeReadyPredicate() cache.NodeConditionPredicate {
|
||||
func getNodeReadyPredicate() listers.NodeConditionPredicate {
|
||||
return func(node *api.Node) bool {
|
||||
for ix := range node.Status.Conditions {
|
||||
condition := &node.Status.Conditions[ix]
|
||||
|
|
@ -479,7 +449,7 @@ func getNodeReadyPredicate() cache.NodeConditionPredicate {
|
|||
// getReadyNodeNames returns names of schedulable, ready nodes from the node lister.
|
||||
func (lbc *LoadBalancerController) getReadyNodeNames() ([]string, error) {
|
||||
nodeNames := []string{}
|
||||
nodes, err := lbc.nodeLister.NodeCondition(getNodeReadyPredicate()).List()
|
||||
nodes, err := listers.NewNodeLister(lbc.nodeLister.Indexer).ListWithPredicate(getNodeReadyPredicate())
|
||||
if err != nil {
|
||||
return nodeNames, err
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue