Merge pull request #1181 from freehan/shared-informer

switch to use shared informer
This commit is contained in:
Nick Sardo 2017-08-28 16:50:05 -07:00 committed by GitHub
commit 3e8bc53443
200 changed files with 11638 additions and 145 deletions

View file

@ -17,16 +17,10 @@ limitations under the License.
package controller
import (
"bytes"
"io"
"io/ioutil"
"net/http"
"os"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/cloudprovider"
gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/ingress/controllers/gce/backends"
@ -59,9 +53,6 @@ const (
// Names longer than this are truncated, because of GCE restrictions.
nameLenLimit = 62
// Sleep interval to retry cloud client creation.
cloudClientRetryInterval = 10 * time.Second
)
// ClusterManager manages cluster resource pools.
@ -211,81 +202,17 @@ func (c *ClusterManager) GC(lbNames []string, nodePorts []backends.ServicePort)
return nil
}
func getGCEClient(config io.Reader) *gce.GCECloud {
getConfigReader := func() io.Reader { return nil }
if config != nil {
allConfig, err := ioutil.ReadAll(config)
if err != nil {
glog.Fatalf("Error while reading entire config: %v", err)
}
glog.V(2).Infof("Using cloudprovider config file:\n%v ", string(allConfig))
getConfigReader = func() io.Reader {
return bytes.NewReader(allConfig)
}
} else {
glog.V(2).Infoln("No cloudprovider config file provided. Continuing with default values.")
}
// Creating the cloud interface involves resolving the metadata server to get
// an oauth token. If this fails, the token provider assumes it's not on GCE.
// No errors are thrown. So we need to keep retrying till it works because
// we know we're on GCE.
for {
cloudInterface, err := cloudprovider.GetCloudProvider("gce", getConfigReader())
if err == nil {
cloud := cloudInterface.(*gce.GCECloud)
// If this controller is scheduled on a node without compute/rw
// it won't be allowed to list backends. We can assume that the
// user has no need for Ingress in this case. If they grant
// permissions to the node they will have to restart the controller
// manually to re-create the client.
if _, err = cloud.ListGlobalBackendServices(); err == nil || utils.IsHTTPErrorCode(err, http.StatusForbidden) {
return cloud
}
glog.Warningf("Failed to list backend services, retrying: %v", err)
} else {
glog.Warningf("Failed to retrieve cloud interface, retrying: %v", err)
}
time.Sleep(cloudClientRetryInterval)
}
}
// NewClusterManager creates a cluster manager for shared resources.
// - namer: is the namer used to tag cluster wide shared resources.
// - defaultBackendNodePort: is the node port of glbc's default backend. This is
// the kubernetes Service that serves the 404 page if no urls match.
// - defaultHealthCheckPath: is the default path used for L7 health checks, eg: "/healthz".
func NewClusterManager(
configFilePath string,
cloud *gce.GCECloud,
namer *utils.Namer,
defaultBackendNodePort backends.ServicePort,
defaultHealthCheckPath string) (*ClusterManager, error) {
// TODO: Make this more resilient. Currently we create the cloud client
// and pass it through to all the pools. This makes unit testing easier.
// However if the cloud client suddenly fails, we should try to re-create it
// and continue.
var cloud *gce.GCECloud
if configFilePath != "" {
glog.Infof("Reading config from path %v", configFilePath)
config, err := os.Open(configFilePath)
if err != nil {
return nil, err
}
defer config.Close()
cloud = getGCEClient(config)
glog.Infof("Successfully loaded cloudprovider using config %q", configFilePath)
} else {
// While you might be tempted to refactor so we simply assing nil to the
// config and only invoke getGCEClient once, that will not do the right
// thing because a nil check against an interface isn't true in golang.
cloud = getGCEClient(nil)
glog.Infof("Created GCE client without a config file")
}
// Names are fundamental to the cluster, the uid allocator makes sure names don't collide.
cluster := ClusterManager{ClusterNamer: namer}

View file

@ -24,10 +24,11 @@ import (
"github.com/golang/glog"
api_v1 "k8s.io/api/core/v1"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
informerv1 "k8s.io/client-go/informers/core/v1"
informerv1beta1 "k8s.io/client-go/informers/extensions/v1beta1"
"k8s.io/client-go/kubernetes"
scheme "k8s.io/client-go/kubernetes/scheme"
unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1"
@ -53,17 +54,45 @@ var (
storeSyncPollPeriod = 5 * time.Second
)
// ControllerContext holds
type ControllerContext struct {
IngressInformer cache.SharedIndexInformer
ServiceInformer cache.SharedIndexInformer
PodInformer cache.SharedIndexInformer
NodeInformer cache.SharedIndexInformer
// Stop is the stop channel shared among controllers
StopCh chan struct{}
}
func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration) *ControllerContext {
return &ControllerContext{
IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
StopCh: make(chan struct{}),
}
}
func (ctx *ControllerContext) Start() {
go ctx.IngressInformer.Run(ctx.StopCh)
go ctx.ServiceInformer.Run(ctx.StopCh)
go ctx.PodInformer.Run(ctx.StopCh)
go ctx.NodeInformer.Run(ctx.StopCh)
}
// LoadBalancerController watches the kubernetes api and adds/removes services
// from the loadbalancer, via loadBalancerConfig.
type LoadBalancerController struct {
client kubernetes.Interface
ingController cache.Controller
nodeController cache.Controller
svcController cache.Controller
podController cache.Controller
ingLister StoreToIngressLister
nodeLister StoreToNodeLister
svcLister StoreToServiceLister
client kubernetes.Interface
ingressSynced cache.InformerSynced
serviceSynced cache.InformerSynced
podSynced cache.InformerSynced
nodeSynced cache.InformerSynced
ingLister StoreToIngressLister
nodeLister StoreToNodeLister
svcLister StoreToServiceLister
// Health checks are the readiness probes of containers on pods.
podLister StoreToPodLister
// TODO: Watch secrets
@ -90,7 +119,7 @@ type LoadBalancerController struct {
// - clusterManager: A ClusterManager capable of creating all cloud resources
// required for L7 loadbalancing.
// - resyncPeriod: Watchers relist from the Kubernetes API server this often.
func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *ClusterManager, resyncPeriod time.Duration, namespace string) (*LoadBalancerController, error) {
func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerContext, clusterManager *ClusterManager) (*LoadBalancerController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
@ -99,23 +128,32 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *
lbc := LoadBalancerController{
client: kubeClient,
CloudClusterManager: clusterManager,
stopCh: make(chan struct{}),
stopCh: ctx.StopCh,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme,
api_v1.EventSource{Component: "loadbalancer-controller"}),
apiv1.EventSource{Component: "loadbalancer-controller"}),
}
lbc.nodeQueue = NewTaskQueue(lbc.syncNodes)
lbc.ingQueue = NewTaskQueue(lbc.sync)
lbc.hasSynced = lbc.storesSynced
// Ingress watch handlers
pathHandlers := cache.ResourceEventHandlerFuncs{
lbc.ingressSynced = ctx.IngressInformer.HasSynced
lbc.serviceSynced = ctx.ServiceInformer.HasSynced
lbc.podSynced = ctx.PodInformer.HasSynced
lbc.nodeSynced = ctx.NodeInformer.HasSynced
lbc.ingLister.Store = ctx.IngressInformer.GetStore()
lbc.svcLister.Indexer = ctx.ServiceInformer.GetIndexer()
lbc.podLister.Indexer = ctx.PodInformer.GetIndexer()
lbc.nodeLister.Indexer = ctx.NodeInformer.GetIndexer()
// ingress event handler
ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !isGCEIngress(addIng) {
glog.Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, ingressClassKey)
return
}
lbc.recorder.Eventf(addIng, api_v1.EventTypeNormal, "ADD", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name))
lbc.recorder.Eventf(addIng, apiv1.EventTypeNormal, "ADD", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name))
lbc.ingQueue.enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
@ -137,13 +175,10 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *
}
lbc.ingQueue.enqueue(cur)
},
}
lbc.ingLister.Store, lbc.ingController = cache.NewInformer(
cache.NewListWatchFromClient(lbc.client.Extensions().RESTClient(), "ingresses", namespace, fields.Everything()),
&extensions.Ingress{}, resyncPeriod, pathHandlers)
})
// Service watch handlers
svcHandlers := cache.ResourceEventHandlerFuncs{
// service event handler
ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: lbc.enqueueIngressForService,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
@ -151,38 +186,14 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *
}
},
// Ingress deletes matter, service deletes don't.
}
})
lbc.svcLister.Indexer, lbc.svcController = cache.NewIndexerInformer(
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "services", namespace, fields.Everything()),
&api_v1.Service{},
resyncPeriod,
svcHandlers,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
lbc.podLister.Indexer, lbc.podController = cache.NewIndexerInformer(
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "pods", namespace, fields.Everything()),
&api_v1.Pod{},
resyncPeriod,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
// Node watch handlers
nodeHandlers := cache.ResourceEventHandlerFuncs{
// node event handler
ctx.NodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: lbc.nodeQueue.enqueue,
DeleteFunc: lbc.nodeQueue.enqueue,
// Nodes are updated every 10s and we don't care, so no update handler.
}
lbc.nodeLister.Indexer, lbc.nodeController = cache.NewIndexerInformer(
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "nodes", api_v1.NamespaceAll, fields.Everything()),
&api_v1.Node{},
resyncPeriod,
nodeHandlers,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
})
lbc.tr = &GCETranslator{&lbc}
lbc.tlsLoader = &apiServerTLSLoader{client: lbc.client}
@ -193,7 +204,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *
// enqueueIngressForService enqueues all the Ingress' for a Service.
func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) {
svc := obj.(*api_v1.Service)
svc := obj.(*apiv1.Service)
ings, err := lbc.ingLister.GetServiceIngress(svc)
if err != nil {
glog.V(5).Infof("ignoring service %v: %v", svc.Name, err)
@ -210,10 +221,6 @@ func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) {
// Run starts the loadbalancer controller.
func (lbc *LoadBalancerController) Run() {
glog.Infof("Starting loadbalancer controller")
go lbc.ingController.Run(lbc.stopCh)
go lbc.nodeController.Run(lbc.stopCh)
go lbc.svcController.Run(lbc.stopCh)
go lbc.podController.Run(lbc.stopCh)
go lbc.ingQueue.run(time.Second, lbc.stopCh)
go lbc.nodeQueue.run(time.Second, lbc.stopCh)
<-lbc.stopCh
@ -250,14 +257,14 @@ func (lbc *LoadBalancerController) storesSynced() bool {
return (
// wait for pods to sync so we don't allocate a default health check when
// an endpoint has a readiness probe.
lbc.podController.HasSynced() &&
lbc.podSynced() &&
// wait for services so we don't thrash on backend creation.
lbc.svcController.HasSynced() &&
lbc.serviceSynced() &&
// wait for nodes so we don't disconnect a backend from an instance
// group just because we don't realize there are nodes in that zone.
lbc.nodeController.HasSynced() &&
lbc.nodeSynced() &&
// Wait for ingresses as a safety measure. We don't really need this.
lbc.ingController.HasSynced())
lbc.ingressSynced())
}
// sync manages Ingress create/updates/deletes.
@ -312,7 +319,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
// TODO: Implement proper backoff for the queue.
eventMsg := "GCE"
if ingExists {
lbc.recorder.Eventf(obj.(*extensions.Ingress), api_v1.EventTypeWarning, eventMsg, err.Error())
lbc.recorder.Eventf(obj.(*extensions.Ingress), apiv1.EventTypeWarning, eventMsg, err.Error())
} else {
err = fmt.Errorf("%v, error: %v", eventMsg, err)
}
@ -333,10 +340,10 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
if urlMap, err := lbc.tr.toURLMap(&ing); err != nil {
syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err)
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
lbc.recorder.Eventf(&ing, api_v1.EventTypeWarning, "UrlMap", err.Error())
lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
syncError = fmt.Errorf("%v, update url map error: %v", syncError, err)
} else if err := lbc.updateIngressStatus(l7, ing); err != nil {
lbc.recorder.Eventf(&ing, api_v1.EventTypeWarning, "Status", err.Error())
lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "Status", err.Error())
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
}
return syncError
@ -354,8 +361,8 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
return err
}
currIng.Status = extensions.IngressStatus{
LoadBalancer: api_v1.LoadBalancerStatus{
Ingress: []api_v1.LoadBalancerIngress{
LoadBalancer: apiv1.LoadBalancerStatus{
Ingress: []apiv1.LoadBalancerIngress{
{IP: ip},
},
},
@ -369,7 +376,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
if _, err := ingClient.UpdateStatus(currIng); err != nil {
return err
}
lbc.recorder.Eventf(currIng, api_v1.EventTypeNormal, "CREATE", "ip: %v", ip)
lbc.recorder.Eventf(currIng, apiv1.EventTypeNormal, "CREATE", "ip: %v", ip)
}
}
// Update annotations through /update endpoint
@ -437,11 +444,11 @@ func (lbc *LoadBalancerController) syncNodes(key string) error {
}
func getNodeReadyPredicate() listers.NodeConditionPredicate {
return func(node *api_v1.Node) bool {
return func(node *apiv1.Node) bool {
for ix := range node.Status.Conditions {
condition := &node.Status.Conditions[ix]
if condition.Type == api_v1.NodeReady {
return condition.Status == api_v1.ConditionTrue
if condition.Type == apiv1.NodeReady {
return condition.Status == apiv1.ConditionTrue
}
}
return false

View file

@ -53,7 +53,8 @@ func defaultBackendName(clusterName string) string {
// newLoadBalancerController create a loadbalancer controller.
func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalancerController {
kubeClient := fake.NewSimpleClientset()
lb, err := NewLoadBalancerController(kubeClient, cm.ClusterManager, 1*time.Second, api_v1.NamespaceAll)
ctx := NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second)
lb, err := NewLoadBalancerController(kubeClient, ctx, cm.ClusterManager)
if err != nil {
t.Fatalf("%v", err)
}

View file

@ -17,8 +17,11 @@ limitations under the License.
package main
import (
"bytes"
go_flag "flag"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/signal"
@ -46,6 +49,8 @@ import (
"k8s.io/ingress/controllers/gce/loadbalancers"
"k8s.io/ingress/controllers/gce/storage"
"k8s.io/ingress/controllers/gce/utils"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
)
// Entrypoint of GLBC. Example invocation:
@ -72,6 +77,9 @@ const (
// Key used to persist UIDs to configmaps.
uidConfigMapName = "ingress-uid"
// Sleep interval to retry cloud client creation.
cloudClientRetryInterval = 10 * time.Second
)
var (
@ -241,13 +249,36 @@ func main() {
SvcPort: intstr.FromInt(int(port)),
}
var cloud *gce.GCECloud
if *inCluster || *useRealCloud {
// Create cluster manager
namer, err := newNamer(kubeClient, *clusterName, controller.DefaultFirewallName)
if err != nil {
glog.Fatalf("%v", err)
}
clusterManager, err = controller.NewClusterManager(*configFilePath, namer, defaultBackendNodePort, *healthCheckPath)
// TODO: Make this more resilient. Currently we create the cloud client
// and pass it through to all the pools. This makes unit testing easier.
// However if the cloud client suddenly fails, we should try to re-create it
// and continue.
if *configFilePath != "" {
glog.Infof("Reading config from path %v", configFilePath)
config, err := os.Open(*configFilePath)
if err != nil {
glog.Fatalf("%v", err)
}
defer config.Close()
cloud = getGCEClient(config)
glog.Infof("Successfully loaded cloudprovider using config %q", configFilePath)
} else {
// While you might be tempted to refactor so we simply assing nil to the
// config and only invoke getGCEClient once, that will not do the right
// thing because a nil check against an interface isn't true in golang.
cloud = getGCEClient(nil)
glog.Infof("Created GCE client without a config file")
}
clusterManager, err = controller.NewClusterManager(cloud, namer, defaultBackendNodePort, *healthCheckPath)
if err != nil {
glog.Fatalf("%v", err)
}
@ -256,11 +287,14 @@ func main() {
clusterManager = controller.NewFakeClusterManager(*clusterName, controller.DefaultFirewallName).ClusterManager
}
ctx := controller.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod)
// Start loadbalancer controller
lbc, err := controller.NewLoadBalancerController(kubeClient, clusterManager, *resyncPeriod, *watchNamespace)
lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager)
if err != nil {
glog.Fatalf("%v", err)
}
if clusterManager.ClusterNamer.GetClusterName() != "" {
glog.V(3).Infof("Cluster name %+v", clusterManager.ClusterNamer.GetClusterName())
}
@ -268,6 +302,7 @@ func main() {
go registerHandlers(lbc)
go handleSigterm(lbc, *deleteAllOnQuit)
ctx.Start()
lbc.Run()
for {
glog.Infof("Handled quit, awaiting pod deletion.")
@ -437,3 +472,45 @@ func getNodePort(client kubernetes.Interface, ns, name string) (port, nodePort i
})
return
}
func getGCEClient(config io.Reader) *gce.GCECloud {
getConfigReader := func() io.Reader { return nil }
if config != nil {
allConfig, err := ioutil.ReadAll(config)
if err != nil {
glog.Fatalf("Error while reading entire config: %v", err)
}
glog.V(2).Infof("Using cloudprovider config file:\n%v ", string(allConfig))
getConfigReader = func() io.Reader {
return bytes.NewReader(allConfig)
}
} else {
glog.V(2).Infoln("No cloudprovider config file provided. Continuing with default values.")
}
// Creating the cloud interface involves resolving the metadata server to get
// an oauth token. If this fails, the token provider assumes it's not on GCE.
// No errors are thrown. So we need to keep retrying till it works because
// we know we're on GCE.
for {
cloudInterface, err := cloudprovider.GetCloudProvider("gce", getConfigReader())
if err == nil {
cloud := cloudInterface.(*gce.GCECloud)
// If this controller is scheduled on a node without compute/rw
// it won't be allowed to list backends. We can assume that the
// user has no need for Ingress in this case. If they grant
// permissions to the node they will have to restart the controller
// manually to re-create the client.
if _, err = cloud.ListGlobalBackendServices(); err == nil || utils.IsHTTPErrorCode(err, http.StatusForbidden) {
return cloud
}
glog.Warningf("Failed to list backend services, retrying: %v", err)
} else {
glog.Warningf("Failed to retrieve cloud interface, retrying: %v", err)
}
time.Sleep(cloudClientRetryInterval)
}
}