[GLBC] Support backside re-encryption (#519)
Support backside re-encryption
This commit is contained in:
parent
7f3763590a
commit
642cb74cc7
21 changed files with 1046 additions and 433 deletions
|
|
@ -65,7 +65,7 @@ const (
|
|||
// ClusterManager manages cluster resource pools.
|
||||
type ClusterManager struct {
|
||||
ClusterNamer *utils.Namer
|
||||
defaultBackendNodePort int64
|
||||
defaultBackendNodePort backends.ServicePort
|
||||
instancePool instances.NodePool
|
||||
backendPool backends.BackendPool
|
||||
l7Pool loadbalancers.LoadBalancerPool
|
||||
|
|
@ -83,9 +83,7 @@ type ClusterManager struct {
|
|||
// Init initializes the cluster manager.
|
||||
func (c *ClusterManager) Init(tr *GCETranslator) {
|
||||
c.instancePool.Init(tr)
|
||||
for _, h := range c.healthCheckers {
|
||||
h.Init(tr)
|
||||
}
|
||||
c.backendPool.Init(tr)
|
||||
// TODO: Initialize other members as needed.
|
||||
}
|
||||
|
||||
|
|
@ -126,17 +124,17 @@ func (c *ClusterManager) shutdown() error {
|
|||
// these ports must also be opened on the corresponding Instance Group.
|
||||
// If in performing the checkpoint the cluster manager runs out of quota, a
|
||||
// googleapi 403 is returned.
|
||||
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, nodePorts []int64) error {
|
||||
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, nodePorts []backends.ServicePort) error {
|
||||
// Multiple ingress paths can point to the same service (and hence nodePort)
|
||||
// but each nodePort can only have one set of cloud resources behind it. So
|
||||
// don't waste time double validating GCE BackendServices.
|
||||
portMap := map[int64]struct{}{}
|
||||
portMap := map[int64]backends.ServicePort{}
|
||||
for _, p := range nodePorts {
|
||||
portMap[p] = struct{}{}
|
||||
portMap[p.Port] = p
|
||||
}
|
||||
nodePorts = []int64{}
|
||||
for p := range portMap {
|
||||
nodePorts = append(nodePorts, p)
|
||||
nodePorts = []backends.ServicePort{}
|
||||
for _, sp := range portMap {
|
||||
nodePorts = append(nodePorts, sp)
|
||||
}
|
||||
if err := c.backendPool.Sync(nodePorts); err != nil {
|
||||
return err
|
||||
|
|
@ -158,7 +156,12 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName
|
|||
// we shouldn't leak the firewall rule.
|
||||
fwNodePorts = append(fwNodePorts, c.defaultBackendNodePort)
|
||||
}
|
||||
if err := c.firewallPool.Sync(fwNodePorts, nodeNames); err != nil {
|
||||
|
||||
var np []int64
|
||||
for _, p := range fwNodePorts {
|
||||
np = append(np, p.Port)
|
||||
}
|
||||
if err := c.firewallPool.Sync(np, nodeNames); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -171,7 +174,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName
|
|||
// - nodePorts are the ports for which we want BackendServies. BackendServices
|
||||
// for ports not in this list are deleted.
|
||||
// This method ignores googleapi 404 errors (StatusNotFound).
|
||||
func (c *ClusterManager) GC(lbNames []string, nodePorts []int64) error {
|
||||
func (c *ClusterManager) GC(lbNames []string, nodePorts []backends.ServicePort) error {
|
||||
|
||||
// On GC:
|
||||
// * Loadbalancers need to get deleted before backends.
|
||||
|
|
@ -240,7 +243,7 @@ func getGCEClient(config io.Reader) *gce.GCECloud {
|
|||
func NewClusterManager(
|
||||
configFilePath string,
|
||||
namer *utils.Namer,
|
||||
defaultBackendNodePort int64,
|
||||
defaultBackendNodePort backends.ServicePort,
|
||||
defaultHealthCheckPath string) (*ClusterManager, error) {
|
||||
|
||||
// TODO: Make this more resilient. Currently we create the cloud client
|
||||
|
|
@ -279,15 +282,12 @@ func NewClusterManager(
|
|||
cluster.healthCheckers = []healthchecks.HealthChecker{healthChecker, defaultBackendHealthChecker}
|
||||
|
||||
// TODO: This needs to change to a consolidated management of the default backend.
|
||||
cluster.backendPool = backends.NewBackendPool(
|
||||
cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort}, true)
|
||||
defaultBackendPool := backends.NewBackendPool(
|
||||
cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false)
|
||||
cluster.backendPool = backends.NewBackendPool(cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort.Port}, true)
|
||||
defaultBackendPool := backends.NewBackendPool(cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false)
|
||||
cluster.defaultBackendNodePort = defaultBackendNodePort
|
||||
|
||||
// L7 pool creates targetHTTPProxy, ForwardingRules, UrlMaps, StaticIPs.
|
||||
cluster.l7Pool = loadbalancers.NewLoadBalancerPool(
|
||||
cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer)
|
||||
cluster.l7Pool = loadbalancers.NewLoadBalancerPool(cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer)
|
||||
cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer)
|
||||
return &cluster, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,12 +29,11 @@ import (
|
|||
"k8s.io/ingress/controllers/gce/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
testDefaultBeNodePort = int64(3000)
|
||||
var (
|
||||
testDefaultBeNodePort = backends.ServicePort{Port: 3000, Protocol: utils.ProtocolHTTP}
|
||||
testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80}
|
||||
)
|
||||
|
||||
var testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80}
|
||||
|
||||
// ClusterManager fake
|
||||
type fakeClusterManager struct {
|
||||
*ClusterManager
|
||||
|
|
@ -48,14 +47,13 @@ func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager
|
|||
fakeLbs := loadbalancers.NewFakeLoadBalancers(clusterName)
|
||||
fakeBackends := backends.NewFakeBackendServices(func(op int, be *compute.BackendService) error { return nil })
|
||||
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
|
||||
fakeHCs := healthchecks.NewFakeHealthChecks()
|
||||
fakeHCP := healthchecks.NewFakeHealthCheckProvider()
|
||||
namer := utils.NewNamer(clusterName, firewallName)
|
||||
|
||||
nodePool := instances.NewNodePool(fakeIGs)
|
||||
nodePool.Init(&instances.FakeZoneLister{Zones: []string{"zone-a"}})
|
||||
|
||||
healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer)
|
||||
healthChecker.Init(&healthchecks.FakeHealthCheckGetter{})
|
||||
healthChecker := healthchecks.NewHealthChecker(fakeHCP, "/", namer)
|
||||
|
||||
backendPool := backends.NewBackendPool(
|
||||
fakeBackends,
|
||||
|
|
|
|||
|
|
@ -25,6 +25,8 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
api_v1 "k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/ingress/controllers/gce/backends"
|
||||
"k8s.io/ingress/controllers/gce/utils"
|
||||
)
|
||||
|
||||
// Pods created in loops start from this time, for routines that
|
||||
|
|
@ -94,17 +96,18 @@ func TestInstancesAddedToZones(t *testing.T) {
|
|||
func TestProbeGetter(t *testing.T) {
|
||||
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
|
||||
lbc := newLoadBalancerController(t, cm)
|
||||
nodePortToHealthCheck := map[int64]string{
|
||||
3001: "/healthz",
|
||||
3002: "/foo",
|
||||
|
||||
nodePortToHealthCheck := map[backends.ServicePort]string{
|
||||
{Port: 3001, Protocol: utils.ProtocolHTTP}: "/healthz",
|
||||
{Port: 3002, Protocol: utils.ProtocolHTTPS}: "/foo",
|
||||
}
|
||||
addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault)
|
||||
for p, exp := range nodePortToHealthCheck {
|
||||
got, err := lbc.tr.HealthCheck(p)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to get health check for node port %v: %v", p, err)
|
||||
} else if got.RequestPath != exp {
|
||||
t.Errorf("Wrong health check for node port %v, got %v expected %v", p, got.RequestPath, exp)
|
||||
got, err := lbc.tr.GetProbe(p)
|
||||
if err != nil || got == nil {
|
||||
t.Errorf("Failed to get probe for node port %v: %v", p, err)
|
||||
} else if getProbePath(got) != exp {
|
||||
t.Errorf("Wrong path for node port %v, got %v expected %v", p, getProbePath(got), exp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -112,8 +115,8 @@ func TestProbeGetter(t *testing.T) {
|
|||
func TestProbeGetterNamedPort(t *testing.T) {
|
||||
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
|
||||
lbc := newLoadBalancerController(t, cm)
|
||||
nodePortToHealthCheck := map[int64]string{
|
||||
3001: "/healthz",
|
||||
nodePortToHealthCheck := map[backends.ServicePort]string{
|
||||
{Port: 3001, Protocol: utils.ProtocolHTTP}: "/healthz",
|
||||
}
|
||||
addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault)
|
||||
for _, p := range lbc.podLister.Indexer.List() {
|
||||
|
|
@ -122,11 +125,11 @@ func TestProbeGetterNamedPort(t *testing.T) {
|
|||
pod.Spec.Containers[0].ReadinessProbe.Handler.HTTPGet.Port = intstr.IntOrString{Type: intstr.String, StrVal: "test"}
|
||||
}
|
||||
for p, exp := range nodePortToHealthCheck {
|
||||
got, err := lbc.tr.HealthCheck(p)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to get health check for node port %v: %v", p, err)
|
||||
} else if got.RequestPath != exp {
|
||||
t.Errorf("Wrong health check for node port %v, got %v expected %v", p, got.RequestPath, exp)
|
||||
got, err := lbc.tr.GetProbe(p)
|
||||
if err != nil || got == nil {
|
||||
t.Errorf("Failed to get probe for node port %v: %v", p, err)
|
||||
} else if getProbePath(got) != exp {
|
||||
t.Errorf("Wrong path for node port %v, got %v expected %v", p, getProbePath(got), exp)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -167,31 +170,31 @@ func TestProbeGetterCrossNamespace(t *testing.T) {
|
|||
},
|
||||
}
|
||||
lbc.podLister.Indexer.Add(firstPod)
|
||||
nodePortToHealthCheck := map[int64]string{
|
||||
3001: "/healthz",
|
||||
nodePortToHealthCheck := map[backends.ServicePort]string{
|
||||
{Port: 3001, Protocol: utils.ProtocolHTTP}: "/healthz",
|
||||
}
|
||||
addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault)
|
||||
|
||||
for p, exp := range nodePortToHealthCheck {
|
||||
got, err := lbc.tr.HealthCheck(p)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to get health check for node port %v: %v", p, err)
|
||||
} else if got.RequestPath != exp {
|
||||
t.Errorf("Wrong health check for node port %v, got %v expected %v", p, got.RequestPath, exp)
|
||||
got, err := lbc.tr.GetProbe(p)
|
||||
if err != nil || got == nil {
|
||||
t.Errorf("Failed to get probe for node port %v: %v", p, err)
|
||||
} else if getProbePath(got) != exp {
|
||||
t.Errorf("Wrong path for node port %v, got %v expected %v", p, getProbePath(got), exp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[int64]string, ns string) {
|
||||
func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[backends.ServicePort]string, ns string) {
|
||||
delay := time.Minute
|
||||
for np, u := range nodePortToHealthCheck {
|
||||
l := map[string]string{fmt.Sprintf("app-%d", np): "test"}
|
||||
l := map[string]string{fmt.Sprintf("app-%d", np.Port): "test"}
|
||||
svc := &api_v1.Service{
|
||||
Spec: api_v1.ServiceSpec{
|
||||
Selector: l,
|
||||
Ports: []api_v1.ServicePort{
|
||||
{
|
||||
NodePort: int32(np),
|
||||
NodePort: int32(np.Port),
|
||||
TargetPort: intstr.IntOrString{
|
||||
Type: intstr.Int,
|
||||
IntVal: 80,
|
||||
|
|
@ -200,14 +203,14 @@ func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[int64]string
|
|||
},
|
||||
},
|
||||
}
|
||||
svc.Name = fmt.Sprintf("%d", np)
|
||||
svc.Name = fmt.Sprintf("%d", np.Port)
|
||||
svc.Namespace = ns
|
||||
lbc.svcLister.Indexer.Add(svc)
|
||||
|
||||
pod := &api_v1.Pod{
|
||||
ObjectMeta: meta_v1.ObjectMeta{
|
||||
Labels: l,
|
||||
Name: fmt.Sprintf("%d", np),
|
||||
Name: fmt.Sprintf("%d", np.Port),
|
||||
Namespace: ns,
|
||||
CreationTimestamp: meta_v1.NewTime(firstPodCreationTime.Add(delay)),
|
||||
},
|
||||
|
|
@ -218,7 +221,7 @@ func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[int64]string
|
|||
ReadinessProbe: &api_v1.Probe{
|
||||
Handler: api_v1.Handler{
|
||||
HTTPGet: &api_v1.HTTPGetAction{
|
||||
Scheme: api_v1.URISchemeHTTP,
|
||||
Scheme: api_v1.URIScheme(string(np.Protocol)),
|
||||
Path: u,
|
||||
Port: intstr.IntOrString{
|
||||
Type: intstr.Int,
|
||||
|
|
@ -257,3 +260,7 @@ func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) {
|
|||
}
|
||||
lbc.CloudClusterManager.instancePool.Init(lbc.tr)
|
||||
}
|
||||
|
||||
func getProbePath(p *api_v1.Probe) string {
|
||||
return p.Handler.HTTPGet.Path
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package controller
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
|
@ -37,6 +38,7 @@ import (
|
|||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
||||
"k8s.io/ingress/controllers/gce/backends"
|
||||
"k8s.io/ingress/controllers/gce/loadbalancers"
|
||||
"k8s.io/ingress/controllers/gce/utils"
|
||||
)
|
||||
|
|
@ -63,6 +65,12 @@ const (
|
|||
// to the target proxies of the Ingress.
|
||||
preSharedCertKey = "ingress.gcp.kubernetes.io/pre-shared-cert"
|
||||
|
||||
// serviceApplicationProtocolKey is a stringified JSON map of port names to
|
||||
// protocol strings. Possible values are HTTP, HTTPS
|
||||
// Example:
|
||||
// '{"my-https-port":"HTTPS","my-http-port":"HTTP"}'
|
||||
serviceApplicationProtocolKey = "service.alpha.kubernetes.io/app-protocols"
|
||||
|
||||
// ingressClassKey picks a specific "class" for the Ingress. The controller
|
||||
// only processes Ingresses with this annotation either unset, or set
|
||||
// to either gceIngessClass or the empty string.
|
||||
|
|
@ -116,6 +124,30 @@ func (ing ingAnnotations) ingressClass() string {
|
|||
return val
|
||||
}
|
||||
|
||||
// svcAnnotations represents Service annotations.
|
||||
type svcAnnotations map[string]string
|
||||
|
||||
func (svc svcAnnotations) ApplicationProtocols() (map[string]utils.AppProtocol, error) {
|
||||
val, ok := svc[serviceApplicationProtocolKey]
|
||||
if !ok {
|
||||
return map[string]utils.AppProtocol{}, nil
|
||||
}
|
||||
|
||||
var portToProtos map[string]utils.AppProtocol
|
||||
err := json.Unmarshal([]byte(val), &portToProtos)
|
||||
|
||||
// Verify protocol is an accepted value
|
||||
for _, proto := range portToProtos {
|
||||
switch proto {
|
||||
case utils.ProtocolHTTP, utils.ProtocolHTTPS:
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid port application protocol: %v", proto)
|
||||
}
|
||||
}
|
||||
|
||||
return portToProtos, err
|
||||
}
|
||||
|
||||
// isGCEIngress returns true if the given Ingress either doesn't specify the
|
||||
// ingress.class annotation, or it's set to "gce".
|
||||
func isGCEIngress(ing *extensions.Ingress) bool {
|
||||
|
|
@ -134,6 +166,15 @@ func (e errorNodePortNotFound) Error() string {
|
|||
e.backend, e.origErr)
|
||||
}
|
||||
|
||||
type errorSvcAppProtosParsing struct {
|
||||
svc *api_v1.Service
|
||||
origErr error
|
||||
}
|
||||
|
||||
func (e errorSvcAppProtosParsing) Error() string {
|
||||
return fmt.Sprintf("could not parse %v annotation on Service %v/%v, err: %v", serviceApplicationProtocolKey, e.svc.Namespace, e.svc.Name, e.origErr)
|
||||
}
|
||||
|
||||
// taskQueue manages a work queue through an independent worker that
|
||||
// invokes the given sync function for every work item inserted.
|
||||
type taskQueue struct {
|
||||
|
|
@ -221,6 +262,7 @@ type StoreToPodLister struct {
|
|||
cache.Indexer
|
||||
}
|
||||
|
||||
// List returns a list of all pods based on selector
|
||||
func (s *StoreToPodLister) List(selector labels.Selector) (ret []*api_v1.Pod, err error) {
|
||||
err = ListAll(s.Indexer, selector, func(m interface{}) {
|
||||
ret = append(ret, m.(*api_v1.Pod))
|
||||
|
|
@ -228,6 +270,7 @@ func (s *StoreToPodLister) List(selector labels.Selector) (ret []*api_v1.Pod, er
|
|||
return ret, err
|
||||
}
|
||||
|
||||
// ListAll iterates a store and passes selected item to a func
|
||||
func ListAll(store cache.Store, selector labels.Selector, appendFn cache.AppendFunc) error {
|
||||
for _, m := range store.List() {
|
||||
metadata, err := meta.Accessor(m)
|
||||
|
|
@ -362,17 +405,16 @@ func (t *GCETranslator) toGCEBackend(be *extensions.IngressBackend, ns string) (
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
backend, err := t.CloudClusterManager.backendPool.Get(int64(port))
|
||||
backend, err := t.CloudClusterManager.backendPool.Get(port.Port)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"no GCE backend exists for port %v, kube backend %+v", port, be)
|
||||
return nil, fmt.Errorf("no GCE backend exists for port %v, kube backend %+v", port, be)
|
||||
}
|
||||
return backend, nil
|
||||
}
|
||||
|
||||
// getServiceNodePort looks in the svc store for a matching service:port,
|
||||
// and returns the nodeport.
|
||||
func (t *GCETranslator) getServiceNodePort(be extensions.IngressBackend, namespace string) (int, error) {
|
||||
func (t *GCETranslator) getServiceNodePort(be extensions.IngressBackend, namespace string) (backends.ServicePort, error) {
|
||||
obj, exists, err := t.svcLister.Indexer.Get(
|
||||
&api_v1.Service{
|
||||
ObjectMeta: meta_v1.ObjectMeta{
|
||||
|
|
@ -381,37 +423,51 @@ func (t *GCETranslator) getServiceNodePort(be extensions.IngressBackend, namespa
|
|||
},
|
||||
})
|
||||
if !exists {
|
||||
return invalidPort, errorNodePortNotFound{be, fmt.Errorf(
|
||||
"service %v/%v not found in store", namespace, be.ServiceName)}
|
||||
return backends.ServicePort{}, errorNodePortNotFound{be, fmt.Errorf("service %v/%v not found in store", namespace, be.ServiceName)}
|
||||
}
|
||||
if err != nil {
|
||||
return invalidPort, errorNodePortNotFound{be, err}
|
||||
return backends.ServicePort{}, errorNodePortNotFound{be, err}
|
||||
}
|
||||
var nodePort int
|
||||
for _, p := range obj.(*api_v1.Service).Spec.Ports {
|
||||
svc := obj.(*api_v1.Service)
|
||||
appProtocols, err := svcAnnotations(svc.GetAnnotations()).ApplicationProtocols()
|
||||
if err != nil {
|
||||
return backends.ServicePort{}, errorSvcAppProtosParsing{svc, err}
|
||||
}
|
||||
|
||||
var port *api_v1.ServicePort
|
||||
PortLoop:
|
||||
for _, p := range svc.Spec.Ports {
|
||||
np := p
|
||||
switch be.ServicePort.Type {
|
||||
case intstr.Int:
|
||||
if p.Port == be.ServicePort.IntVal {
|
||||
nodePort = int(p.NodePort)
|
||||
break
|
||||
port = &np
|
||||
break PortLoop
|
||||
}
|
||||
default:
|
||||
if p.Name == be.ServicePort.StrVal {
|
||||
nodePort = int(p.NodePort)
|
||||
break
|
||||
port = &np
|
||||
break PortLoop
|
||||
}
|
||||
}
|
||||
}
|
||||
if nodePort != invalidPort {
|
||||
return nodePort, nil
|
||||
|
||||
if port == nil {
|
||||
return backends.ServicePort{}, errorNodePortNotFound{be, fmt.Errorf("could not find matching nodeport from service")}
|
||||
}
|
||||
return invalidPort, errorNodePortNotFound{be, fmt.Errorf(
|
||||
"could not find matching nodeport from service")}
|
||||
|
||||
proto := utils.ProtocolHTTP
|
||||
if protoStr, exists := appProtocols[port.Name]; exists {
|
||||
proto = utils.AppProtocol(protoStr)
|
||||
}
|
||||
|
||||
p := backends.ServicePort{Port: int64(port.NodePort), Protocol: proto}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// toNodePorts converts a pathlist to a flat list of nodeports.
|
||||
func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 {
|
||||
knownPorts := []int64{}
|
||||
func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []backends.ServicePort {
|
||||
var knownPorts []backends.ServicePort
|
||||
for _, ing := range ings.Items {
|
||||
defaultBackend := ing.Spec.Backend
|
||||
if defaultBackend != nil {
|
||||
|
|
@ -419,7 +475,7 @@ func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 {
|
|||
if err != nil {
|
||||
glog.Infof("%v", err)
|
||||
} else {
|
||||
knownPorts = append(knownPorts, int64(port))
|
||||
knownPorts = append(knownPorts, port)
|
||||
}
|
||||
}
|
||||
for _, rule := range ing.Spec.Rules {
|
||||
|
|
@ -433,7 +489,7 @@ func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 {
|
|||
glog.Infof("%v", err)
|
||||
continue
|
||||
}
|
||||
knownPorts = append(knownPorts, int64(port))
|
||||
knownPorts = append(knownPorts, port)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -479,7 +535,7 @@ func (t *GCETranslator) ListZones() ([]string, error) {
|
|||
|
||||
// geHTTPProbe returns the http readiness probe from the first container
|
||||
// that matches targetPort, from the set of pods matching the given labels.
|
||||
func (t *GCETranslator) getHTTPProbe(svc api_v1.Service, targetPort intstr.IntOrString) (*api_v1.Probe, error) {
|
||||
func (t *GCETranslator) getHTTPProbe(svc api_v1.Service, targetPort intstr.IntOrString, protocol utils.AppProtocol) (*api_v1.Probe, error) {
|
||||
l := svc.Spec.Selector
|
||||
|
||||
// Lookup any container with a matching targetPort from the set of pods
|
||||
|
|
@ -498,12 +554,13 @@ func (t *GCETranslator) getHTTPProbe(svc api_v1.Service, targetPort intstr.IntOr
|
|||
}
|
||||
logStr := fmt.Sprintf("Pod %v matching service selectors %v (targetport %+v)", pod.Name, l, targetPort)
|
||||
for _, c := range pod.Spec.Containers {
|
||||
if !isSimpleHTTPProbe(c.ReadinessProbe) {
|
||||
if !isSimpleHTTPProbe(c.ReadinessProbe) || string(protocol) != string(c.ReadinessProbe.HTTPGet.Scheme) {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, p := range c.Ports {
|
||||
if targetPort.Type == intstr.Int && targetPort.IntVal == p.ContainerPort ||
|
||||
targetPort.Type == intstr.String && targetPort.StrVal == p.Name {
|
||||
if (targetPort.Type == intstr.Int && targetPort.IntVal == p.ContainerPort) ||
|
||||
(targetPort.Type == intstr.String && targetPort.StrVal == p.Name) {
|
||||
|
||||
readinessProbePort := c.ReadinessProbe.Handler.HTTPGet.Port
|
||||
switch readinessProbePort.Type {
|
||||
|
|
@ -529,80 +586,39 @@ func (t *GCETranslator) getHTTPProbe(svc api_v1.Service, targetPort intstr.IntOr
|
|||
|
||||
// isSimpleHTTPProbe returns true if the given Probe is:
|
||||
// - an HTTPGet probe, as opposed to a tcp or exec probe
|
||||
// - has a scheme of HTTP, as opposed to HTTPS
|
||||
// - has no special host or headers fields
|
||||
func isSimpleHTTPProbe(probe *api_v1.Probe) bool {
|
||||
return (probe != nil && probe.Handler.HTTPGet != nil && probe.Handler.HTTPGet.Host == "" &&
|
||||
probe.Handler.HTTPGet.Scheme == api_v1.URISchemeHTTP && len(probe.Handler.HTTPGet.HTTPHeaders) == 0)
|
||||
len(probe.Handler.HTTPGet.HTTPHeaders) == 0)
|
||||
}
|
||||
|
||||
// HealthCheck returns the http readiness probe for the endpoint backing the
|
||||
// given nodePort. If no probe is found it returns a health check with "" as
|
||||
// the request path, callers are responsible for swapping this out for the
|
||||
// appropriate default.
|
||||
func (t *GCETranslator) HealthCheck(port int64) (*compute.HttpHealthCheck, error) {
|
||||
// GetProbe returns a probe that's used for the given nodeport
|
||||
func (t *GCETranslator) GetProbe(port backends.ServicePort) (*api_v1.Probe, error) {
|
||||
sl := t.svcLister.List()
|
||||
var ingresses []extensions.Ingress
|
||||
var healthCheck *compute.HttpHealthCheck
|
||||
// Find the label and target port of the one service with the given nodePort
|
||||
for _, as := range sl {
|
||||
s := as.(*api_v1.Service)
|
||||
for _, p := range s.Spec.Ports {
|
||||
|
||||
// Find the label and target port of the one service with the given nodePort
|
||||
var service api_v1.Service
|
||||
var svcPort api_v1.ServicePort
|
||||
var found bool
|
||||
OuterLoop:
|
||||
for _, as := range sl {
|
||||
service = *as.(*api_v1.Service)
|
||||
for _, sp := range service.Spec.Ports {
|
||||
svcPort = sp
|
||||
// only one Service can match this nodePort, try and look up
|
||||
// the readiness probe of the pods behind it
|
||||
if int32(port) != p.NodePort {
|
||||
continue
|
||||
if int32(port.Port) == sp.NodePort {
|
||||
found = true
|
||||
break OuterLoop
|
||||
}
|
||||
rp, err := t.getHTTPProbe(*s, p.TargetPort)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if rp == nil {
|
||||
glog.Infof("No pod in service %v with node port %v has declared a matching readiness probe for health checks.", s.Name, port)
|
||||
break
|
||||
}
|
||||
|
||||
healthPath := rp.Handler.HTTPGet.Path
|
||||
// GCE requires a leading "/" for health check urls.
|
||||
if string(healthPath[0]) != "/" {
|
||||
healthPath = fmt.Sprintf("/%v", healthPath)
|
||||
}
|
||||
|
||||
host := rp.Handler.HTTPGet.Host
|
||||
glog.Infof("Found custom health check for Service %v nodeport %v: %v%v", s.Name, port, host, healthPath)
|
||||
// remember the ingresses that use this Service so we can send
|
||||
// the right events
|
||||
ingresses, err = t.ingLister.GetServiceIngress(s)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to list ingresses for service %v", s.Name)
|
||||
}
|
||||
|
||||
healthCheck = &compute.HttpHealthCheck{
|
||||
Port: port,
|
||||
RequestPath: healthPath,
|
||||
Host: host,
|
||||
Description: "kubernetes L7 health check from readiness probe.",
|
||||
// set a low health threshold and a high failure threshold.
|
||||
// We're just trying to detect if the node networking is
|
||||
// borked, service level outages will get detected sooner
|
||||
// by kube-proxy.
|
||||
CheckIntervalSec: int64(rp.PeriodSeconds + utils.DefaultHealthCheckInterval),
|
||||
TimeoutSec: int64(rp.TimeoutSeconds),
|
||||
HealthyThreshold: utils.DefaultHealthyThreshold,
|
||||
UnhealthyThreshold: utils.DefaultUnhealthyThreshold,
|
||||
// TODO: include headers after updating compute godep.
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
if healthCheck == nil {
|
||||
healthCheck = utils.DefaultHealthCheckTemplate(port)
|
||||
|
||||
if !found {
|
||||
return nil, fmt.Errorf("unable to find nodeport %v in any service", port)
|
||||
}
|
||||
for _, ing := range ingresses {
|
||||
t.recorder.Eventf(&ing, api_v1.EventTypeNormal, "GCE", fmt.Sprintf("health check using %v:%v%v", healthCheck.Host, healthCheck.Port, healthCheck.RequestPath))
|
||||
}
|
||||
return healthCheck, nil
|
||||
|
||||
return t.getHTTPProbe(service, svcPort.TargetPort, port.Protocol)
|
||||
}
|
||||
|
||||
// PodsByCreationTimestamp sorts a list of Pods by creation timestamp, using their names as a tie breaker.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue