git mv Ingress ingress

This commit is contained in:
Prashanth Balasubramanian 2016-02-21 16:13:08 -08:00
parent 34b949c134
commit 3da4e74e5a
2185 changed files with 754743 additions and 0 deletions

View file

@ -0,0 +1,173 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"k8s.io/contrib/ingress/controllers/gce/backends"
"k8s.io/contrib/ingress/controllers/gce/healthchecks"
"k8s.io/contrib/ingress/controllers/gce/instances"
"k8s.io/contrib/ingress/controllers/gce/loadbalancers"
"k8s.io/contrib/ingress/controllers/gce/utils"
"k8s.io/kubernetes/pkg/cloudprovider"
gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
)
const (
defaultPort = 80
defaultHealthCheckPath = "/"
// A single instance-group is created per cluster manager.
// Tagged with the name of the controller.
instanceGroupPrefix = "k8s-ig"
// A backend is created per nodePort, tagged with the nodeport.
// This allows sharing of backends across loadbalancers.
backendPrefix = "k8s-be"
// A single target proxy/urlmap/forwarding rule is created per loadbalancer.
// Tagged with the namespace/name of the Ingress.
targetProxyPrefix = "k8s-tp"
forwardingRulePrefix = "k8s-fw"
urlMapPrefix = "k8s-um"
// Used in the test RunServer method to denote a delete request.
deleteType = "del"
// port 0 is used as a signal for port not found/no such port etc.
invalidPort = 0
// Names longer than this are truncated, because of GCE restrictions.
nameLenLimit = 62
)
// ClusterManager manages cluster resource pools.
type ClusterManager struct {
ClusterNamer utils.Namer
defaultBackendNodePort int64
instancePool instances.NodePool
backendPool backends.BackendPool
l7Pool loadbalancers.LoadBalancerPool
}
// IsHealthy returns an error if the cluster manager is unhealthy.
func (c *ClusterManager) IsHealthy() (err error) {
// TODO: Expand on this, for now we just want to detect when the GCE client
// is broken.
_, err = c.backendPool.List()
return
}
func (c *ClusterManager) shutdown() error {
if err := c.l7Pool.Shutdown(); err != nil {
return err
}
// The backend pool will also delete instance groups.
return c.backendPool.Shutdown()
}
// Checkpoint performs a checkpoint with the cloud.
// - lbNames are the names of L7 loadbalancers we wish to exist. If they already
// exist, they should not have any broken links between say, a UrlMap and
// TargetHttpProxy.
// - nodeNames are the names of nodes we wish to add to all loadbalancer
// instance groups.
// - nodePorts are the ports for which we require BackendServices. Each of
// these ports must also be opened on the corresponding Instance Group.
// If in performing the checkpoint the cluster manager runs out of quota, a
// googleapi 403 is returned.
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, nodePorts []int64) error {
if err := c.backendPool.Sync(nodePorts); err != nil {
return err
}
if err := c.instancePool.Sync(nodeNames); err != nil {
return err
}
if err := c.l7Pool.Sync(lbs); err != nil {
return err
}
return nil
}
// GC garbage collects unused resources.
// - lbNames are the names of L7 loadbalancers we wish to exist. Those not in
// this list are removed from the cloud.
// - nodePorts are the ports for which we want BackendServies. BackendServices
// for ports not in this list are deleted.
// This method ignores googleapi 404 errors (StatusNotFound).
func (c *ClusterManager) GC(lbNames []string, nodePorts []int64) error {
// On GC:
// * Loadbalancers need to get deleted before backends.
// * Backends are refcounted in a shared pool.
// * We always want to GC backends even if there was an error in GCing
// loadbalancers, because the next Sync could rely on the GC for quota.
// * There are at least 2 cases for backend GC:
// 1. The loadbalancer has been deleted.
// 2. An update to the url map drops the refcount of a backend. This can
// happen when an Ingress is updated, if we don't GC after the update
// we'll leak the backend.
lbErr := c.l7Pool.GC(lbNames)
beErr := c.backendPool.GC(nodePorts)
if lbErr != nil {
return lbErr
}
if beErr != nil {
return beErr
}
return nil
}
func defaultInstanceGroupName(clusterName string) string {
return fmt.Sprintf("%v-%v", instanceGroupPrefix, clusterName)
}
// NewClusterManager creates a cluster manager for shared resources.
// - name: is the name used to tag cluster wide shared resources. This is the
// string passed to glbc via --gce-cluster-name.
// - defaultBackendNodePort: is the node port of glbc's default backend. This is
// the kubernetes Service that serves the 404 page if no urls match.
// - defaultHealthCheckPath: is the default path used for L7 health checks, eg: "/healthz"
func NewClusterManager(
name string,
defaultBackendNodePort int64,
defaultHealthCheckPath string) (*ClusterManager, error) {
cloudInterface, err := cloudprovider.GetCloudProvider("gce", nil)
if err != nil {
return nil, err
}
cloud := cloudInterface.(*gce.GCECloud)
cluster := ClusterManager{ClusterNamer: utils.Namer{name}}
zone, err := cloud.GetZone()
if err != nil {
return nil, err
}
cluster.instancePool = instances.NewNodePool(cloud, zone.FailureDomain)
healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer)
cluster.backendPool = backends.NewBackendPool(
cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer)
defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer)
defaultBackendPool := backends.NewBackendPool(
cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer)
cluster.defaultBackendNodePort = defaultBackendNodePort
cluster.l7Pool = loadbalancers.NewLoadBalancerPool(
cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer)
return &cluster, nil
}

View file

@ -0,0 +1,435 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"net/http"
"reflect"
"sync"
"time"
"k8s.io/contrib/ingress/controllers/gce/loadbalancers"
"k8s.io/contrib/ingress/controllers/gce/utils"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
var (
keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
// DefaultClusterUID is the uid to use for clusters resources created by an
// L7 controller created without specifying the --cluster-uid flag.
DefaultClusterUID = ""
)
// LoadBalancerController watches the kubernetes api and adds/removes services
// from the loadbalancer, via loadBalancerConfig.
type LoadBalancerController struct {
client *client.Client
ingController *framework.Controller
nodeController *framework.Controller
svcController *framework.Controller
ingLister StoreToIngressLister
nodeLister cache.StoreToNodeLister
svcLister cache.StoreToServiceLister
CloudClusterManager *ClusterManager
recorder record.EventRecorder
nodeQueue *taskQueue
ingQueue *taskQueue
tr *GCETranslator
stopCh chan struct{}
// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
// allowing concurrent stoppers leads to stack traces.
stopLock sync.Mutex
shutdown bool
}
// NewLoadBalancerController creates a controller for gce loadbalancers.
// - kubeClient: A kubernetes REST client.
// - clusterManager: A ClusterManager capable of creating all cloud resources
// required for L7 loadbalancing.
// - resyncPeriod: Watchers relist from the Kubernetes API server this often.
func NewLoadBalancerController(kubeClient *client.Client, clusterManager *ClusterManager, resyncPeriod time.Duration, namespace string) (*LoadBalancerController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
lbc := LoadBalancerController{
client: kubeClient,
CloudClusterManager: clusterManager,
stopCh: make(chan struct{}),
recorder: eventBroadcaster.NewRecorder(
api.EventSource{Component: "loadbalancer-controller"}),
}
lbc.nodeQueue = NewTaskQueue(lbc.syncNodes)
lbc.ingQueue = NewTaskQueue(lbc.sync)
// Ingress watch handlers
pathHandlers := framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
lbc.recorder.Eventf(addIng, api.EventTypeNormal, "ADD", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name))
lbc.ingQueue.enqueue(obj)
},
DeleteFunc: lbc.ingQueue.enqueue,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
glog.V(3).Infof("Ingress %v changed, syncing",
cur.(*extensions.Ingress).Name)
}
lbc.ingQueue.enqueue(cur)
},
}
lbc.ingLister.Store, lbc.ingController = framework.NewInformer(
&cache.ListWatch{
ListFunc: ingressListFunc(lbc.client, namespace),
WatchFunc: ingressWatchFunc(lbc.client, namespace),
},
&extensions.Ingress{}, resyncPeriod, pathHandlers)
// Service watch handlers
svcHandlers := framework.ResourceEventHandlerFuncs{
AddFunc: lbc.enqueueIngressForService,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
lbc.enqueueIngressForService(cur)
}
},
// Ingress deletes matter, service deletes don't.
}
lbc.svcLister.Store, lbc.svcController = framework.NewInformer(
cache.NewListWatchFromClient(
lbc.client, "services", namespace, fields.Everything()),
&api.Service{}, resyncPeriod, svcHandlers)
nodeHandlers := framework.ResourceEventHandlerFuncs{
AddFunc: lbc.nodeQueue.enqueue,
DeleteFunc: lbc.nodeQueue.enqueue,
// Nodes are updated every 10s and we don't care, so no update handler.
}
// Node watch handlers
lbc.nodeLister.Store, lbc.nodeController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(opts api.ListOptions) (runtime.Object, error) {
return lbc.client.Get().
Resource("nodes").
FieldsSelectorParam(fields.Everything()).
Do().
Get()
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return lbc.client.Get().
Prefix("watch").
Resource("nodes").
FieldsSelectorParam(fields.Everything()).
Param("resourceVersion", options.ResourceVersion).Watch()
},
},
&api.Node{}, 0, nodeHandlers)
lbc.tr = &GCETranslator{&lbc}
glog.V(3).Infof("Created new loadbalancer controller")
return &lbc, nil
}
func ingressListFunc(c *client.Client, ns string) func(api.ListOptions) (runtime.Object, error) {
return func(opts api.ListOptions) (runtime.Object, error) {
return c.Extensions().Ingress(ns).List(opts)
}
}
func ingressWatchFunc(c *client.Client, ns string) func(options api.ListOptions) (watch.Interface, error) {
return func(options api.ListOptions) (watch.Interface, error) {
return c.Extensions().Ingress(ns).Watch(options)
}
}
// enqueueIngressForService enqueues all the Ingress' for a Service.
func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) {
svc := obj.(*api.Service)
ings, err := lbc.ingLister.GetServiceIngress(svc)
if err != nil {
glog.V(5).Infof("ignoring service %v: %v", svc.Name, err)
return
}
for _, ing := range ings {
lbc.ingQueue.enqueue(&ing)
}
}
// Run starts the loadbalancer controller.
func (lbc *LoadBalancerController) Run() {
glog.Infof("Starting loadbalancer controller")
go lbc.ingController.Run(lbc.stopCh)
go lbc.nodeController.Run(lbc.stopCh)
go lbc.svcController.Run(lbc.stopCh)
go lbc.ingQueue.run(time.Second, lbc.stopCh)
go lbc.nodeQueue.run(time.Second, lbc.stopCh)
<-lbc.stopCh
glog.Infof("Shutting down Loadbalancer Controller")
}
// Stop stops the loadbalancer controller. It also deletes cluster resources
// if deleteAll is true.
func (lbc *LoadBalancerController) Stop(deleteAll bool) error {
// Stop is invoked from the http endpoint.
lbc.stopLock.Lock()
defer lbc.stopLock.Unlock()
// Only try draining the workqueue if we haven't already.
if !lbc.shutdown {
close(lbc.stopCh)
glog.Infof("Shutting down controller queues.")
lbc.ingQueue.shutdown()
lbc.nodeQueue.shutdown()
lbc.shutdown = true
}
// Deleting shared cluster resources is idempotent.
if deleteAll {
glog.Infof("Shutting down cluster manager.")
return lbc.CloudClusterManager.shutdown()
}
return nil
}
// sync manages Ingress create/updates/deletes.
func (lbc *LoadBalancerController) sync(key string) {
glog.V(3).Infof("Syncing %v", key)
paths, err := lbc.ingLister.List()
if err != nil {
lbc.ingQueue.requeue(key, err)
return
}
nodePorts := lbc.tr.toNodePorts(&paths)
lbNames := lbc.ingLister.Store.ListKeys()
lbs, _ := lbc.ListRuntimeInfo()
nodeNames, err := lbc.getReadyNodeNames()
if err != nil {
lbc.ingQueue.requeue(key, err)
return
}
obj, ingExists, err := lbc.ingLister.Store.GetByKey(key)
if err != nil {
lbc.ingQueue.requeue(key, err)
return
}
// This performs a 2 phase checkpoint with the cloud:
// * Phase 1 creates/verifies resources are as expected. At the end of a
// successful checkpoint we know that existing L7s are WAI, and the L7
// for the Ingress associated with "key" is ready for a UrlMap update.
// If this encounters an error, eg for quota reasons, we want to invoke
// Phase 2 right away and retry checkpointing.
// * Phase 2 performs GC by refcounting shared resources. This needs to
// happen periodically whether or not stage 1 fails. At the end of a
// successful GC we know that there are no dangling cloud resources that
// don't have an associated Kubernetes Ingress/Service/Endpoint.
defer func() {
if err := lbc.CloudClusterManager.GC(lbNames, nodePorts); err != nil {
lbc.ingQueue.requeue(key, err)
}
glog.V(3).Infof("Finished syncing %v", key)
}()
if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, nodePorts); err != nil {
// TODO: Implement proper backoff for the queue.
eventMsg := "GCE"
if utils.IsHTTPErrorCode(err, http.StatusForbidden) {
eventMsg += " :Quota"
}
if ingExists {
lbc.recorder.Eventf(obj.(*extensions.Ingress), api.EventTypeWarning, eventMsg, err.Error())
} else {
err = fmt.Errorf("%v Error: %v", eventMsg, err)
}
lbc.ingQueue.requeue(key, err)
return
}
if !ingExists {
return
}
// Update the UrlMap of the single loadbalancer that came through the watch.
l7, err := lbc.CloudClusterManager.l7Pool.Get(key)
if err != nil {
lbc.ingQueue.requeue(key, err)
return
}
ing := *obj.(*extensions.Ingress)
if urlMap, err := lbc.tr.toUrlMap(&ing); err != nil {
lbc.ingQueue.requeue(key, err)
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
lbc.recorder.Eventf(&ing, api.EventTypeWarning, "UrlMap", err.Error())
lbc.ingQueue.requeue(key, err)
} else if lbc.updateIngressStatus(l7, ing); err != nil {
lbc.recorder.Eventf(&ing, api.EventTypeWarning, "Status", err.Error())
lbc.ingQueue.requeue(key, err)
}
return
}
// updateIngressStatus updates the IP and annotations of a loadbalancer.
// The annotations are parsed by kubectl describe.
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing extensions.Ingress) error {
ingClient := lbc.client.Extensions().Ingress(ing.Namespace)
// Update IP through update/status endpoint
ip := l7.GetIP()
currIng, err := ingClient.Get(ing.Name)
if err != nil {
return err
}
currIng.Status = extensions.IngressStatus{
LoadBalancer: api.LoadBalancerStatus{
Ingress: []api.LoadBalancerIngress{
{IP: ip},
},
},
}
lbIPs := ing.Status.LoadBalancer.Ingress
if len(lbIPs) == 0 && ip != "" || lbIPs[0].IP != ip {
// TODO: If this update fails it's probably resource version related,
// which means it's advantageous to retry right away vs requeuing.
glog.Infof("Updating loadbalancer %v/%v with IP %v", ing.Namespace, ing.Name, ip)
if _, err := ingClient.UpdateStatus(currIng); err != nil {
return err
}
lbc.recorder.Eventf(currIng, api.EventTypeNormal, "CREATE", "ip: %v", ip)
}
// Update annotations through /update endpoint
currIng, err = ingClient.Get(ing.Name)
if err != nil {
return err
}
currIng.Annotations = loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.CloudClusterManager.backendPool)
if !reflect.DeepEqual(ing.Annotations, currIng.Annotations) {
glog.V(3).Infof("Updating annotations of %v/%v", ing.Namespace, ing.Name)
if _, err := ingClient.Update(currIng); err != nil {
return err
}
}
return nil
}
// ListRuntimeInfo lists L7RuntimeInfo as understood by the loadbalancer module.
func (lbc *LoadBalancerController) ListRuntimeInfo() (lbs []*loadbalancers.L7RuntimeInfo, err error) {
for _, m := range lbc.ingLister.Store.List() {
ing := m.(*extensions.Ingress)
k, err := keyFunc(ing)
if err != nil {
glog.Warningf("Cannot get key for Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
continue
}
tls, err := lbc.loadSecrets(ing)
if err != nil {
glog.Warningf("Cannot get certs for Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
}
lbs = append(lbs, &loadbalancers.L7RuntimeInfo{
Name: k,
TLS: tls,
AllowHTTP: ingAnnotations(ing.ObjectMeta.Annotations).allowHTTP(),
})
}
return lbs, nil
}
func (lbc *LoadBalancerController) loadSecrets(ing *extensions.Ingress) (*loadbalancers.TLSCerts, error) {
if len(ing.Spec.TLS) == 0 {
return nil, nil
}
// GCE L7s currently only support a single cert.
if len(ing.Spec.TLS) > 1 {
glog.Warningf("Ignoring %d certs and taking the first for ingress %v/%v",
len(ing.Spec.TLS)-1, ing.Namespace, ing.Name)
}
secretName := ing.Spec.TLS[0].SecretName
// TODO: Replace this for a secret watcher.
glog.V(3).Infof("Retrieving secret for ing %v with name %v", ing.Name, secretName)
secret, err := lbc.client.Secrets(ing.Namespace).Get(secretName)
if err != nil {
return nil, err
}
cert, ok := secret.Data[api.TLSCertKey]
if !ok {
return nil, fmt.Errorf("Secret %v has no private key", secretName)
}
key, ok := secret.Data[api.TLSPrivateKeyKey]
if !ok {
return nil, fmt.Errorf("Secret %v has no cert", secretName)
}
// TODO: Validate certificate with hostnames in ingress?
return &loadbalancers.TLSCerts{Key: string(key), Cert: string(cert)}, nil
}
// syncNodes manages the syncing of kubernetes nodes to gce instance groups.
// The instancegroups are referenced by loadbalancer backends.
func (lbc *LoadBalancerController) syncNodes(key string) {
nodeNames, err := lbc.getReadyNodeNames()
if err != nil {
lbc.nodeQueue.requeue(key, err)
return
}
if err := lbc.CloudClusterManager.instancePool.Sync(nodeNames); err != nil {
lbc.nodeQueue.requeue(key, err)
}
return
}
func nodeReady(node api.Node) bool {
for ix := range node.Status.Conditions {
condition := &node.Status.Conditions[ix]
if condition.Type == api.NodeReady {
return condition.Status == api.ConditionTrue
}
}
return false
}
// getReadyNodeNames returns names of schedulable, ready nodes from the node lister.
func (lbc *LoadBalancerController) getReadyNodeNames() ([]string, error) {
nodeNames := []string{}
nodes, err := lbc.nodeLister.NodeCondition(nodeReady).List()
if err != nil {
return nodeNames, err
}
for _, n := range nodes.Items {
if n.Spec.Unschedulable {
continue
}
nodeNames = append(nodeNames, n.Name)
}
return nodeNames, nil
}

View file

@ -0,0 +1,375 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"math/rand"
"testing"
"time"
compute "google.golang.org/api/compute/v1"
"k8s.io/contrib/ingress/controllers/gce/loadbalancers"
"k8s.io/contrib/ingress/controllers/gce/utils"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/apis/extensions"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/intstr"
)
const testClusterName = "testcluster"
var (
testPathMap = map[string]string{"/foo": defaultBackendName(testClusterName)}
testIPManager = testIP{}
)
// TODO: Use utils.Namer instead of this function.
func defaultBackendName(clusterName string) string {
return fmt.Sprintf("%v-%v", backendPrefix, clusterName)
}
// newLoadBalancerController create a loadbalancer controller.
func newLoadBalancerController(t *testing.T, cm *fakeClusterManager, masterUrl string) *LoadBalancerController {
client := client.NewOrDie(&client.Config{Host: masterUrl, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
lb, err := NewLoadBalancerController(client, cm.ClusterManager, 1*time.Second, api.NamespaceAll)
if err != nil {
t.Fatalf("%v", err)
}
return lb
}
// toHTTPIngressPaths converts the given pathMap to a list of HTTPIngressPaths.
func toHTTPIngressPaths(pathMap map[string]string) []extensions.HTTPIngressPath {
httpPaths := []extensions.HTTPIngressPath{}
for path, backend := range pathMap {
httpPaths = append(httpPaths, extensions.HTTPIngressPath{
Path: path,
Backend: extensions.IngressBackend{
ServiceName: backend,
ServicePort: testBackendPort,
},
})
}
return httpPaths
}
// toIngressRules converts the given ingressRule map to a list of IngressRules.
func toIngressRules(hostRules map[string]utils.FakeIngressRuleValueMap) []extensions.IngressRule {
rules := []extensions.IngressRule{}
for host, pathMap := range hostRules {
rules = append(rules, extensions.IngressRule{
Host: host,
IngressRuleValue: extensions.IngressRuleValue{
HTTP: &extensions.HTTPIngressRuleValue{
Paths: toHTTPIngressPaths(pathMap),
},
},
})
}
return rules
}
// newIngress returns a new Ingress with the given path map.
func newIngress(hostRules map[string]utils.FakeIngressRuleValueMap) *extensions.Ingress {
return &extensions.Ingress{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("%v", util.NewUUID()),
Namespace: api.NamespaceNone,
},
Spec: extensions.IngressSpec{
Backend: &extensions.IngressBackend{
ServiceName: defaultBackendName(testClusterName),
ServicePort: testBackendPort,
},
Rules: toIngressRules(hostRules),
},
Status: extensions.IngressStatus{
LoadBalancer: api.LoadBalancerStatus{
Ingress: []api.LoadBalancerIngress{
{IP: testIPManager.ip()},
},
},
},
}
}
// validIngress returns a valid Ingress.
func validIngress() *extensions.Ingress {
return newIngress(map[string]utils.FakeIngressRuleValueMap{
"foo.bar.com": testPathMap,
})
}
// getKey returns the key for an ingress.
func getKey(ing *extensions.Ingress, t *testing.T) string {
key, err := keyFunc(ing)
if err != nil {
t.Fatalf("Unexpected error getting key for Ingress %v: %v", ing.Name, err)
}
return key
}
// nodePortManager is a helper to allocate ports to services and
// remember the allocations.
type nodePortManager struct {
portMap map[string]int
start int
end int
namer utils.Namer
}
// randPort generated pseudo random port numbers.
func (p *nodePortManager) getNodePort(svcName string) int {
if port, ok := p.portMap[svcName]; ok {
return port
}
p.portMap[svcName] = rand.Intn(p.end-p.start) + p.start
return p.portMap[svcName]
}
// toNodePortSvcNames converts all service names in the given map to gce node
// port names, eg foo -> k8-be-<foo nodeport>
func (p *nodePortManager) toNodePortSvcNames(inputMap map[string]utils.FakeIngressRuleValueMap) map[string]utils.FakeIngressRuleValueMap {
expectedMap := map[string]utils.FakeIngressRuleValueMap{}
for host, rules := range inputMap {
ruleMap := utils.FakeIngressRuleValueMap{}
for path, svc := range rules {
ruleMap[path] = p.namer.BeName(int64(p.portMap[svc]))
}
expectedMap[host] = ruleMap
}
return expectedMap
}
func newPortManager(st, end int) *nodePortManager {
return &nodePortManager{map[string]int{}, st, end, utils.Namer{}}
}
// addIngress adds an ingress to the loadbalancer controllers ingress store. If
// a nodePortManager is supplied, it also adds all backends to the service store
// with a nodePort acquired through it.
func addIngress(lbc *LoadBalancerController, ing *extensions.Ingress, pm *nodePortManager) {
lbc.ingLister.Store.Add(ing)
if pm == nil {
return
}
for _, rule := range ing.Spec.Rules {
for _, path := range rule.HTTP.Paths {
svc := &api.Service{
ObjectMeta: api.ObjectMeta{
Name: path.Backend.ServiceName,
Namespace: ing.Namespace,
},
}
var svcPort api.ServicePort
switch path.Backend.ServicePort.Type {
case intstr.Int:
svcPort = api.ServicePort{Port: int(path.Backend.ServicePort.IntVal)}
default:
svcPort = api.ServicePort{Name: path.Backend.ServicePort.StrVal}
}
svcPort.NodePort = pm.getNodePort(path.Backend.ServiceName)
svc.Spec.Ports = []api.ServicePort{svcPort}
lbc.svcLister.Store.Add(svc)
}
}
}
func TestLbCreateDelete(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
lbc := newLoadBalancerController(t, cm, "")
inputMap1 := map[string]utils.FakeIngressRuleValueMap{
"foo.example.com": {
"/foo1": "foo1svc",
"/foo2": "foo2svc",
},
"bar.example.com": {
"/bar1": "bar1svc",
"/bar2": "bar2svc",
},
}
inputMap2 := map[string]utils.FakeIngressRuleValueMap{
"baz.foobar.com": {
"/foo": "foo1svc",
"/bar": "bar1svc",
},
}
pm := newPortManager(1, 65536)
ings := []*extensions.Ingress{}
for _, m := range []map[string]utils.FakeIngressRuleValueMap{inputMap1, inputMap2} {
newIng := newIngress(m)
addIngress(lbc, newIng, pm)
ingStoreKey := getKey(newIng, t)
lbc.sync(ingStoreKey)
l7, err := cm.l7Pool.Get(ingStoreKey)
if err != nil {
t.Fatalf("%v", err)
}
cm.fakeLbs.CheckURLMap(t, l7, pm.toNodePortSvcNames(m))
ings = append(ings, newIng)
}
lbc.ingLister.Store.Delete(ings[0])
lbc.sync(getKey(ings[0], t))
// BackendServices associated with ports of deleted Ingress' should get gc'd
// when the Ingress is deleted, regardless of the service. At the same time
// we shouldn't pull shared backends out from existing loadbalancers.
unexpected := []int{pm.portMap["foo2svc"], pm.portMap["bar2svc"]}
expected := []int{pm.portMap["foo1svc"], pm.portMap["bar1svc"]}
for _, port := range expected {
if _, err := cm.backendPool.Get(int64(port)); err != nil {
t.Fatalf("%v", err)
}
}
for _, port := range unexpected {
if be, err := cm.backendPool.Get(int64(port)); err == nil {
t.Fatalf("Found backend %+v for port %v", be, port)
}
}
lbc.ingLister.Store.Delete(ings[1])
lbc.sync(getKey(ings[1], t))
// No cluster resources (except the defaults used by the cluster manager)
// should exist at this point.
for _, port := range expected {
if be, err := cm.backendPool.Get(int64(port)); err == nil {
t.Fatalf("Found backend %+v for port %v", be, port)
}
}
if len(cm.fakeLbs.Fw) != 0 || len(cm.fakeLbs.Um) != 0 || len(cm.fakeLbs.Tp) != 0 {
t.Fatalf("Loadbalancer leaked resources")
}
for _, lbName := range []string{getKey(ings[0], t), getKey(ings[1], t)} {
if l7, err := cm.l7Pool.Get(lbName); err == nil {
t.Fatalf("Found unexpected loadbalandcer %+v: %v", l7, err)
}
}
}
func TestLbFaultyUpdate(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
lbc := newLoadBalancerController(t, cm, "")
inputMap := map[string]utils.FakeIngressRuleValueMap{
"foo.example.com": {
"/foo1": "foo1svc",
"/foo2": "foo2svc",
},
"bar.example.com": {
"/bar1": "bar1svc",
"/bar2": "bar2svc",
},
}
ing := newIngress(inputMap)
pm := newPortManager(1, 65536)
addIngress(lbc, ing, pm)
ingStoreKey := getKey(ing, t)
lbc.sync(ingStoreKey)
l7, err := cm.l7Pool.Get(ingStoreKey)
if err != nil {
t.Fatalf("%v", err)
}
cm.fakeLbs.CheckURLMap(t, l7, pm.toNodePortSvcNames(inputMap))
// Change the urlmap directly through the lb pool, resync, and
// make sure the controller corrects it.
l7.UpdateUrlMap(utils.GCEURLMap{
"foo.example.com": {
"/foo1": &compute.BackendService{SelfLink: "foo2svc"},
},
})
lbc.sync(ingStoreKey)
cm.fakeLbs.CheckURLMap(t, l7, pm.toNodePortSvcNames(inputMap))
}
func TestLbDefaulting(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
lbc := newLoadBalancerController(t, cm, "")
// Make sure the controller plugs in the default values accepted by GCE.
ing := newIngress(map[string]utils.FakeIngressRuleValueMap{"": {"": "foo1svc"}})
pm := newPortManager(1, 65536)
addIngress(lbc, ing, pm)
ingStoreKey := getKey(ing, t)
lbc.sync(ingStoreKey)
l7, err := cm.l7Pool.Get(ingStoreKey)
if err != nil {
t.Fatalf("%v", err)
}
expectedMap := map[string]utils.FakeIngressRuleValueMap{loadbalancers.DefaultHost: {loadbalancers.DefaultPath: "foo1svc"}}
cm.fakeLbs.CheckURLMap(t, l7, pm.toNodePortSvcNames(expectedMap))
}
func TestLbNoService(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
lbc := newLoadBalancerController(t, cm, "")
inputMap := map[string]utils.FakeIngressRuleValueMap{
"foo.example.com": {
"/foo1": "foo1svc",
},
}
ing := newIngress(inputMap)
ing.Spec.Backend.ServiceName = "foo1svc"
ingStoreKey := getKey(ing, t)
// Adds ingress to store, but doesn't create an associated service.
// This will still create the associated loadbalancer, it will just
// have empty rules. The rules will get corrected when the service
// pops up.
addIngress(lbc, ing, nil)
lbc.sync(ingStoreKey)
l7, err := cm.l7Pool.Get(ingStoreKey)
if err != nil {
t.Fatalf("%v", err)
}
// Creates the service, next sync should have complete url map.
pm := newPortManager(1, 65536)
addIngress(lbc, ing, pm)
lbc.enqueueIngressForService(&api.Service{
ObjectMeta: api.ObjectMeta{
Name: "foo1svc",
Namespace: ing.Namespace,
},
})
// TODO: This will hang if the previous step failed to insert into queue
key, _ := lbc.ingQueue.queue.Get()
lbc.sync(key.(string))
inputMap[utils.DefaultBackendKey] = map[string]string{
utils.DefaultBackendKey: "foo1svc",
}
expectedMap := pm.toNodePortSvcNames(inputMap)
cm.fakeLbs.CheckURLMap(t, l7, expectedMap)
}
type testIP struct {
start int
}
func (t *testIP) ip() string {
t.start++
return fmt.Sprintf("0.0.0.%v", t.start)
}
// TODO: Test lb status update when annotation stabilize

View file

@ -0,0 +1,52 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This is the structure of the gce l7 controller:
// apiserver <-> controller ---> pools --> cloud
// | |
// |-> Ingress |-> backends
// |-> Services | |-> health checks
// |-> Nodes |
// |-> instance groups
// | |-> port per backend
// |
// |-> loadbalancers
// |-> http proxy
// |-> forwarding rule
// |-> urlmap
// * apiserver: kubernetes api serer.
// * controller: gce l7 controller, watches apiserver and interacts
// with sync pools. The controller doesn't know anything about the cloud.
// Communication between the controller and pools is 1 way.
// * pool: the controller tells each pool about desired state by inserting
// into shared memory store. The pools sync this with the cloud. Pools are
// also responsible for periodically checking the edge links between various
// cloud resources.
//
// A note on sync pools: this package has 3 sync pools: for node, instances and
// loadbalancer resources. A sync pool is meant to record all creates/deletes
// performed by a controller and periodically verify that links are not broken.
// For example, the controller might create a backend via backendPool.Add(),
// the backend pool remembers this and continuously verifies that the backend
// is connected to the right instance group, and that the instance group has
// the right ports open.
//
// A note on naming convention: per golang style guide for Initialisms, Http
// should be HTTP and Url should be URL, however because these interfaces
// must match their siblings in the Kubernetes cloud provider, which are in turn
// consistent with GCE compute API, there might be inconsistencies.
package controller

View file

@ -0,0 +1,70 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"k8s.io/contrib/ingress/controllers/gce/backends"
"k8s.io/contrib/ingress/controllers/gce/healthchecks"
"k8s.io/contrib/ingress/controllers/gce/instances"
"k8s.io/contrib/ingress/controllers/gce/loadbalancers"
"k8s.io/contrib/ingress/controllers/gce/utils"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/sets"
)
const (
testDefaultBeNodePort = int64(3000)
defaultZone = "default-zone"
)
var testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80}
// ClusterManager fake
type fakeClusterManager struct {
*ClusterManager
fakeLbs *loadbalancers.FakeLoadBalancers
fakeBackends *backends.FakeBackendServices
fakeIGs *instances.FakeInstanceGroups
}
// NewFakeClusterManager creates a new fake ClusterManager.
func NewFakeClusterManager(clusterName string) *fakeClusterManager {
fakeLbs := loadbalancers.NewFakeLoadBalancers(clusterName)
fakeBackends := backends.NewFakeBackendServices()
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
fakeHCs := healthchecks.NewFakeHealthChecks()
namer := utils.Namer{clusterName}
nodePool := instances.NewNodePool(fakeIGs, defaultZone)
healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer)
backendPool := backends.NewBackendPool(
fakeBackends,
healthChecker, nodePool, namer)
l7Pool := loadbalancers.NewLoadBalancerPool(
fakeLbs,
// TODO: change this
backendPool,
testDefaultBeNodePort,
namer,
)
cm := &ClusterManager{
ClusterNamer: namer,
instancePool: nodePool,
backendPool: backendPool,
l7Pool: l7Pool,
}
return &fakeClusterManager{cm, fakeLbs, fakeBackends, fakeIGs}
}

View file

@ -0,0 +1,306 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"strconv"
"time"
compute "google.golang.org/api/compute/v1"
"k8s.io/contrib/ingress/controllers/gce/loadbalancers"
"k8s.io/contrib/ingress/controllers/gce/utils"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"github.com/golang/glog"
)
const allowHTTPKey = "kubernetes.io/ingress.allowHTTP"
// ingAnnotations represents Ingress annotations.
type ingAnnotations map[string]string
// allowHTTP returns the allowHTTP flag. True by default.
func (ing ingAnnotations) allowHTTP() bool {
val, ok := ing[allowHTTPKey]
if !ok {
return true
}
v, err := strconv.ParseBool(val)
if err != nil {
return true
}
return v
}
// errorNodePortNotFound is an implementation of error.
type errorNodePortNotFound struct {
backend extensions.IngressBackend
origErr error
}
func (e errorNodePortNotFound) Error() string {
return fmt.Sprintf("Could not find nodeport for backend %+v: %v",
e.backend, e.origErr)
}
// taskQueue manages a work queue through an independent worker that
// invokes the given sync function for every work item inserted.
type taskQueue struct {
// queue is the work queue the worker polls
queue *workqueue.Type
// sync is called for each item in the queue
sync func(string)
// workerDone is closed when the worker exits
workerDone chan struct{}
}
func (t *taskQueue) run(period time.Duration, stopCh <-chan struct{}) {
wait.Until(t.worker, period, stopCh)
}
// enqueue enqueues ns/name of the given api object in the task queue.
func (t *taskQueue) enqueue(obj interface{}) {
key, err := keyFunc(obj)
if err != nil {
glog.Infof("Couldn't get key for object %+v: %v", obj, err)
return
}
t.queue.Add(key)
}
func (t *taskQueue) requeue(key string, err error) {
glog.Errorf("Requeuing %v, err %v", key, err)
t.queue.Add(key)
}
// worker processes work in the queue through sync.
func (t *taskQueue) worker() {
for {
key, quit := t.queue.Get()
if quit {
close(t.workerDone)
return
}
glog.V(3).Infof("Syncing %v", key)
t.sync(key.(string))
t.queue.Done(key)
}
}
// shutdown shuts down the work queue and waits for the worker to ACK
func (t *taskQueue) shutdown() {
t.queue.ShutDown()
<-t.workerDone
}
// NewTaskQueue creates a new task queue with the given sync function.
// The sync function is called for every element inserted into the queue.
func NewTaskQueue(syncFn func(string)) *taskQueue {
return &taskQueue{
queue: workqueue.New(),
sync: syncFn,
workerDone: make(chan struct{}),
}
}
// compareLinks returns true if the 2 self links are equal.
func compareLinks(l1, l2 string) bool {
// TODO: These can be partial links
return l1 == l2 && l1 != ""
}
// StoreToIngressLister makes a Store that lists Ingress.
// TODO: Move this to cache/listers post 1.1.
type StoreToIngressLister struct {
cache.Store
}
// List lists all Ingress' in the store.
func (s *StoreToIngressLister) List() (ing extensions.IngressList, err error) {
for _, m := range s.Store.List() {
ing.Items = append(ing.Items, *(m.(*extensions.Ingress)))
}
return ing, nil
}
// GetServiceIngress gets all the Ingress' that have rules pointing to a service.
// Note that this ignores services without the right nodePorts.
func (s *StoreToIngressLister) GetServiceIngress(svc *api.Service) (ings []extensions.Ingress, err error) {
for _, m := range s.Store.List() {
ing := *m.(*extensions.Ingress)
if ing.Namespace != svc.Namespace {
continue
}
for _, rules := range ing.Spec.Rules {
if rules.IngressRuleValue.HTTP == nil {
continue
}
for _, p := range rules.IngressRuleValue.HTTP.Paths {
if p.Backend.ServiceName == svc.Name {
ings = append(ings, ing)
}
}
}
}
if len(ings) == 0 {
err = fmt.Errorf("No ingress for service %v", svc.Name)
}
return
}
// GCETranslator helps with kubernetes -> gce api conversion.
type GCETranslator struct {
*LoadBalancerController
}
// toUrlMap converts an ingress to a map of subdomain: url-regex: gce backend.
func (t *GCETranslator) toUrlMap(ing *extensions.Ingress) (utils.GCEURLMap, error) {
hostPathBackend := utils.GCEURLMap{}
for _, rule := range ing.Spec.Rules {
if rule.HTTP == nil {
glog.Errorf("Ignoring non http Ingress rule")
continue
}
pathToBackend := map[string]*compute.BackendService{}
for _, p := range rule.HTTP.Paths {
backend, err := t.toGCEBackend(&p.Backend, ing.Namespace)
if err != nil {
// If a service doesn't have a nodeport we can still forward traffic
// to all other services under the assumption that the user will
// modify nodeport.
if _, ok := err.(errorNodePortNotFound); ok {
glog.Infof("%v", err)
continue
}
// If a service doesn't have a backend, there's nothing the user
// can do to correct this (the admin might've limited quota).
// So keep requeuing the l7 till all backends exist.
return utils.GCEURLMap{}, err
}
// The Ingress spec defines empty path as catch-all, so if a user
// asks for a single host and multiple empty paths, all traffic is
// sent to one of the last backend in the rules list.
path := p.Path
if path == "" {
path = loadbalancers.DefaultPath
}
pathToBackend[path] = backend
}
// If multiple hostless rule sets are specified, last one wins
host := rule.Host
if host == "" {
host = loadbalancers.DefaultHost
}
hostPathBackend[host] = pathToBackend
}
defaultBackend, _ := t.toGCEBackend(ing.Spec.Backend, ing.Namespace)
hostPathBackend.PutDefaultBackend(defaultBackend)
return hostPathBackend, nil
}
func (t *GCETranslator) toGCEBackend(be *extensions.IngressBackend, ns string) (*compute.BackendService, error) {
if be == nil {
return nil, nil
}
port, err := t.getServiceNodePort(*be, ns)
if err != nil {
return nil, err
}
backend, err := t.CloudClusterManager.backendPool.Get(int64(port))
if err != nil {
return nil, fmt.Errorf(
"No GCE backend exists for port %v, kube backend %+v", port, be)
}
return backend, nil
}
// getServiceNodePort looks in the svc store for a matching service:port,
// and returns the nodeport.
func (t *GCETranslator) getServiceNodePort(be extensions.IngressBackend, namespace string) (int, error) {
obj, exists, err := t.svcLister.Store.Get(
&api.Service{
ObjectMeta: api.ObjectMeta{
Name: be.ServiceName,
Namespace: namespace,
},
})
if !exists {
return invalidPort, errorNodePortNotFound{be, fmt.Errorf(
"Service %v/%v not found in store", namespace, be.ServiceName)}
}
if err != nil {
return invalidPort, errorNodePortNotFound{be, err}
}
var nodePort int
for _, p := range obj.(*api.Service).Spec.Ports {
switch be.ServicePort.Type {
case intstr.Int:
if p.Port == int(be.ServicePort.IntVal) {
nodePort = p.NodePort
break
}
default:
if p.Name == be.ServicePort.StrVal {
nodePort = p.NodePort
break
}
}
}
if nodePort != invalidPort {
return nodePort, nil
}
return invalidPort, errorNodePortNotFound{be, fmt.Errorf(
"Could not find matching nodeport from service.")}
}
// toNodePorts converts a pathlist to a flat list of nodeports.
func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 {
knownPorts := []int64{}
for _, ing := range ings.Items {
defaultBackend := ing.Spec.Backend
if defaultBackend != nil {
port, err := t.getServiceNodePort(*defaultBackend, ing.Namespace)
if err != nil {
glog.Infof("%v", err)
} else {
knownPorts = append(knownPorts, int64(port))
}
}
for _, rule := range ing.Spec.Rules {
if rule.HTTP == nil {
glog.Errorf("Ignoring non http Ingress rule.")
continue
}
for _, path := range rule.HTTP.Paths {
port, err := t.getServiceNodePort(path.Backend, ing.Namespace)
if err != nil {
glog.Infof("%v", err)
continue
}
knownPorts = append(knownPorts, int64(port))
}
}
}
return knownPorts
}