Merge remote-tracking branch 'upstream/master' into nginx/extauth_headers

# Conflicts:
#	core/pkg/ingress/annotations/authreq/main.go
This commit is contained in:
rsafronov 2017-03-13 15:04:37 -04:00
commit 7034e1de69
82 changed files with 3053 additions and 724 deletions

View file

@ -8,4 +8,4 @@ Configuring a webserver or loadbalancer is harder than it should be. Most webser
## What is an Ingress Controller?
An Ingress Controller is a daemon, deployed as a Kubernetes Pod, that watches the ApiServer's `/ingresses` endpoint for updates to the [Ingress resource](https://github.com/kubernetes/kubernetes/blob/master/docs/user-guide/ingress.md). Its job is to satisfy requests for ingress.
An Ingress Controller is a daemon, deployed as a Kubernetes Pod, that watches the apiserver's `/ingresses` endpoint for updates to the [Ingress resource](https://github.com/kubernetes/kubernetes/blob/master/docs/user-guide/ingress.md). Its job is to satisfy requests for ingress.

View file

@ -12,23 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO: use radial/busyboxplus:curl or alping instead
FROM ubuntu:14.04
MAINTAINER Prashanth B <beeps@google.com>
FROM alpine:3.5
# so apt-get doesn't complain
ENV DEBIAN_FRONTEND=noninteractive
RUN sed -i 's/^exit 101/exit 0/' /usr/sbin/policy-rc.d
RUN apk add --no-cache ca-certificates
# TODO: Move to using haproxy:1.5 image instead. Honestly,
# that image isn't much smaller and the convenience of having
# an ubuntu container for dev purposes trumps the tiny amounts
# of disk and bandwidth we'd save in doing so.
RUN \
apt-get update && \
apt-get install -y ca-certificates && \
apt-get install -y curl && \
rm -rf /var/lib/apt/lists/*
ADD glbc glbc
COPY glbc glbc
ENTRYPOINT ["/glbc"]

View file

@ -1,7 +1,7 @@
all: push
# 0.0 shouldn't clobber any released builds
TAG = 0.9.1
TAG = 0.9.2
PREFIX = gcr.io/google_containers/glbc
server:

View file

@ -327,7 +327,7 @@ So simply delete the replication controller:
$ kubectl get rc glbc
CONTROLLER CONTAINER(S) IMAGE(S) SELECTOR REPLICAS AGE
glbc default-http-backend gcr.io/google_containers/defaultbackend:1.0 k8s-app=glbc,version=v0.5 1 2m
l7-lb-controller gcr.io/google_containers/glbc:0.9.1
l7-lb-controller gcr.io/google_containers/glbc:0.9.2
$ kubectl delete rc glbc
replicationcontroller "glbc" deleted
@ -340,7 +340,7 @@ glbc-6m6b6 1/1 Terminating 0 13m
__The prod way__: If you didn't start the controller with `--delete-all-on-quit`, you can execute a GET on the `/delete-all-and-quit` endpoint. This endpoint is deliberately not exported.
```
$ kubectl exec -it glbc-6m6b6 -- curl http://localhost:8081/delete-all-and-quit
$ kubectl exec -it glbc-6m6b6 -- wget -q -O- http://localhost:8081/delete-all-and-quit
..Hangs till quit is done..
$ kubectl logs glbc-6m6b6 --follow

View file

@ -46,6 +46,10 @@ var (
// L7 controller created without specifying the --cluster-uid flag.
DefaultClusterUID = ""
// DefaultFirewallName is the name to user for firewall rules created
// by an L7 controller when the --fireall-rule is not used.
DefaultFirewallName = ""
// Frequency to poll on local stores to sync.
storeSyncPollPeriod = 5 * time.Second
)
@ -423,14 +427,23 @@ func (lbc *LoadBalancerController) ListRuntimeInfo() (lbs []*loadbalancers.L7Run
glog.Warningf("Cannot get key for Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
continue
}
tls, err := lbc.tlsLoader.load(&ing)
if err != nil {
glog.Warningf("Cannot get certs for Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
}
var tls *loadbalancers.TLSCerts
annotations := ingAnnotations(ing.ObjectMeta.Annotations)
// Load the TLS cert from the API Spec if it is not specified in the annotation.
// TODO: enforce this with validation.
if annotations.useNamedTLS() == "" {
tls, err = lbc.tlsLoader.load(&ing)
if err != nil {
glog.Warningf("Cannot get certs for Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
}
}
lbs = append(lbs, &loadbalancers.L7RuntimeInfo{
Name: k,
TLS: tls,
TLSName: annotations.useNamedTLS(),
AllowHTTP: annotations.allowHTTP(),
StaticIPName: annotations.staticIPName(),
})

View file

@ -199,7 +199,8 @@ func addIngress(lbc *LoadBalancerController, ing *extensions.Ingress, pm *nodePo
}
func TestLbCreateDelete(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
testFirewallName := "quux"
cm := NewFakeClusterManager(DefaultClusterUID, testFirewallName)
lbc := newLoadBalancerController(t, cm, "")
inputMap1 := map[string]utils.FakeIngressRuleValueMap{
"foo.example.com": {
@ -240,6 +241,7 @@ func TestLbCreateDelete(t *testing.T) {
unexpected := []int{pm.portMap["foo2svc"], pm.portMap["bar2svc"]}
expected := []int{pm.portMap["foo1svc"], pm.portMap["bar1svc"]}
firewallPorts := sets.NewString()
pm.namer.SetFirewallName(testFirewallName)
firewallName := pm.namer.FrName(pm.namer.FrSuffix())
if firewallRule, err := cm.firewallPool.(*firewalls.FirewallRules).GetFirewall(firewallName); err != nil {
@ -290,7 +292,7 @@ func TestLbCreateDelete(t *testing.T) {
}
func TestLbFaultyUpdate(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
lbc := newLoadBalancerController(t, cm, "")
inputMap := map[string]utils.FakeIngressRuleValueMap{
"foo.example.com": {
@ -327,7 +329,7 @@ func TestLbFaultyUpdate(t *testing.T) {
}
func TestLbDefaulting(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
lbc := newLoadBalancerController(t, cm, "")
// Make sure the controller plugs in the default values accepted by GCE.
ing := newIngress(map[string]utils.FakeIngressRuleValueMap{"": {"": "foo1svc"}})
@ -345,7 +347,7 @@ func TestLbDefaulting(t *testing.T) {
}
func TestLbNoService(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
lbc := newLoadBalancerController(t, cm, "")
inputMap := map[string]utils.FakeIngressRuleValueMap{
"foo.example.com": {
@ -389,7 +391,7 @@ func TestLbNoService(t *testing.T) {
}
func TestLbChangeStaticIP(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
lbc := newLoadBalancerController(t, cm, "")
inputMap := map[string]utils.FakeIngressRuleValueMap{
"foo.example.com": {

View file

@ -44,12 +44,12 @@ type fakeClusterManager struct {
}
// NewFakeClusterManager creates a new fake ClusterManager.
func NewFakeClusterManager(clusterName string) *fakeClusterManager {
func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager {
fakeLbs := loadbalancers.NewFakeLoadBalancers(clusterName)
fakeBackends := backends.NewFakeBackendServices(func(op int, be *compute.BackendService) error { return nil })
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
fakeHCs := healthchecks.NewFakeHealthChecks()
namer := utils.NewNamer(clusterName)
namer := utils.NewNamer(clusterName, firewallName)
nodePool := instances.NewNodePool(fakeIGs)
nodePool.Init(&instances.FakeZoneLister{Zones: []string{"zone-a"}})

View file

@ -32,7 +32,7 @@ import (
var firstPodCreationTime = time.Date(2006, 01, 02, 15, 04, 05, 0, time.UTC)
func TestZoneListing(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
lbc := newLoadBalancerController(t, cm, "")
zoneToNode := map[string][]string{
"zone-1": {"n1"},
@ -57,7 +57,7 @@ func TestZoneListing(t *testing.T) {
}
func TestInstancesAddedToZones(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
lbc := newLoadBalancerController(t, cm, "")
zoneToNode := map[string][]string{
"zone-1": {"n1", "n2"},
@ -92,7 +92,7 @@ func TestInstancesAddedToZones(t *testing.T) {
}
func TestProbeGetter(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
lbc := newLoadBalancerController(t, cm, "")
nodePortToHealthCheck := map[int64]string{
3001: "/healthz",
@ -110,7 +110,7 @@ func TestProbeGetter(t *testing.T) {
}
func TestProbeGetterNamedPort(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
lbc := newLoadBalancerController(t, cm, "")
nodePortToHealthCheck := map[int64]string{
3001: "/healthz",
@ -133,7 +133,7 @@ func TestProbeGetterNamedPort(t *testing.T) {
}
func TestProbeGetterCrossNamespace(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
lbc := newLoadBalancerController(t, cm, "")
firstPod := &api.Pod{

View file

@ -52,6 +52,13 @@ const (
// responsibility to create/delete it.
staticIPNameKey = "kubernetes.io/ingress.global-static-ip-name"
// preSharedCertKey represents the specific pre-shared SSL
// certicate for the Ingress controller to use. The controller *does not*
// manage this certificate, it is the users responsibility to create/delete it.
// In GCP, the Ingress controller assigns the SSL certificate with this name
// to the target proxies of the Ingress.
preSharedCertKey = "ingress.gcp.kubernetes.io/pre-shared-cert"
// ingressClassKey picks a specific "class" for the Ingress. The controller
// only processes Ingresses with this annotation either unset, or set
// to either gceIngessClass or the empty string.
@ -79,6 +86,16 @@ func (ing ingAnnotations) allowHTTP() bool {
return v
}
// useNamedTLS returns the name of the GCE SSL certificate. Empty by default.
func (ing ingAnnotations) useNamedTLS() string {
val, ok := ing[preSharedCertKey]
if !ok {
return ""
}
return val
}
func (ing ingAnnotations) staticIPName() string {
val, ok := ing[staticIPNameKey]
if !ok {

View file

@ -0,0 +1,63 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthchecks
import (
"testing"
"k8s.io/ingress/controllers/gce/utils"
)
func TestFakeHealthCheckActions(t *testing.T) {
namer := &utils.Namer{}
healthChecks := NewHealthChecker(NewFakeHealthChecks(), "/", namer)
healthChecks.Init(&FakeHealthCheckGetter{DefaultHealthCheck: nil})
err := healthChecks.Add(80)
if err != nil {
t.Fatalf("unexpected error")
}
_, err1 := healthChecks.Get(8080)
if err1 == nil {
t.Errorf("expected error")
}
hc, err2 := healthChecks.Get(80)
if err2 != nil {
t.Errorf("unexpected error")
} else {
if hc == nil {
t.Errorf("expected a *compute.HttpHealthCheck")
}
}
err = healthChecks.Delete(8080)
if err == nil {
t.Errorf("expected error")
}
err = healthChecks.Delete(80)
if err != nil {
t.Errorf("unexpected error")
}
_, err3 := healthChecks.Get(80)
if err3 == nil {
t.Errorf("expected error")
}
}

View file

@ -246,6 +246,8 @@ type L7RuntimeInfo struct {
IP string
// TLS are the tls certs to use in termination.
TLS *TLSCerts
// TLSName is the name of/for the tls cert to use.
TLSName string
// AllowHTTP will not setup :80, if TLS is nil and AllowHTTP is set,
// no loadbalancer is created.
AllowHTTP bool
@ -350,6 +352,24 @@ func (l *L7) deleteOldSSLCert() (err error) {
}
func (l *L7) checkSSLCert() (err error) {
certName := l.runtimeInfo.TLSName
// Use the named GCE cert when it is specified by the annotation.
if certName != "" {
// Ask GCE for the cert, checking for problems and existence.
cert, err := l.cloud.GetSslCertificate(certName)
if err != nil {
return err
}
if cert == nil {
return fmt.Errorf("Cannot find existing sslCertificate %v for %v", certName, l.Name)
}
glog.Infof("Using existing sslCertificate %v for %v", certName, l.Name)
l.sslCert = cert
return nil
}
// TODO: Currently, GCE only supports a single certificate per static IP
// so we don't need to bother with disambiguation. Naming the cert after
// the loadbalancer is a simplification.
@ -363,7 +383,7 @@ func (l *L7) checkSSLCert() (err error) {
// TODO: Clean this code up into a ring buffer.
primaryCertName := l.namer.Truncate(fmt.Sprintf("%v-%v", sslCertPrefix, l.Name))
secondaryCertName := l.namer.Truncate(fmt.Sprintf("%v-%d-%v", sslCertPrefix, 1, l.Name))
certName := primaryCertName
certName = primaryCertName
if l.sslCert != nil {
certName = l.sslCert.Name
}
@ -580,13 +600,14 @@ func (l *L7) edgeHop() error {
return err
}
}
// Defer promoting an emphemral to a static IP till it's really needed.
if l.runtimeInfo.AllowHTTP && l.runtimeInfo.TLS != nil {
// Defer promoting an ephemeral to a static IP until it's really needed.
if l.runtimeInfo.AllowHTTP && (l.runtimeInfo.TLS != nil || l.runtimeInfo.TLSName != "") {
glog.V(3).Infof("checking static ip for %v", l.Name)
if err := l.checkStaticIP(); err != nil {
return err
}
}
if l.runtimeInfo.TLS != nil {
if l.runtimeInfo.TLS != nil || l.runtimeInfo.TLSName != "" {
glog.V(3).Infof("validating https for %v", l.Name)
if err := l.edgeHopHttps(); err != nil {
return err
@ -846,7 +867,8 @@ func (l *L7) Cleanup() error {
}
l.tps = nil
}
if l.sslCert != nil {
// Delete the SSL cert if it is from a secret, not referencing a pre-created GCE cert.
if l.sslCert != nil && l.runtimeInfo.TLSName == "" {
glog.Infof("Deleting sslcert %v", l.sslCert.Name)
if err := l.cloud.DeleteSslCertificate(l.sslCert.Name); err != nil {
if !utils.IsHTTPErrorCode(err, http.StatusNotFound) {
@ -936,6 +958,9 @@ func GetLBAnnotations(l7 *L7, existing map[string]string, backendPool backends.B
if l7.ip != nil {
existing[fmt.Sprintf("%v/static-ip", utils.K8sAnnotationPrefix)] = l7.ip.Name
}
if l7.sslCert != nil {
existing[fmt.Sprintf("%v/ssl-cert", utils.K8sAnnotationPrefix)] = l7.sslCert.Name
}
// TODO: We really want to know *when* a backend flipped states.
existing[fmt.Sprintf("%v/backends", utils.K8sAnnotationPrefix)] = jsonBackendState
return existing

View file

@ -103,6 +103,40 @@ func TestCreateHTTPSLoadBalancer(t *testing.T) {
}
}
func TestCreateHTTPSLoadBalancerAnnotationCert(t *testing.T) {
// This should NOT create the forwarding rule and target proxy
// associated with the HTTP branch of this loadbalancer.
tlsName := "external-cert-name"
lbInfo := &L7RuntimeInfo{
Name: "test",
AllowHTTP: false,
TLSName: tlsName,
}
f := NewFakeLoadBalancers(lbInfo.Name)
f.CreateSslCertificate(&compute.SslCertificate{
Name: tlsName,
})
pool := newFakeLoadBalancerPool(f, t)
pool.Sync([]*L7RuntimeInfo{lbInfo})
l7, err := pool.Get(lbInfo.Name)
if err != nil || l7 == nil {
t.Fatalf("Expected l7 not created")
}
um, err := f.GetUrlMap(f.umName())
if err != nil ||
um.DefaultService != pool.(*L7s).glbcDefaultBackend.SelfLink {
t.Fatalf("%v", err)
}
tps, err := f.GetTargetHttpsProxy(f.tpName(true))
if err != nil || tps.UrlMap != um.SelfLink {
t.Fatalf("%v", err)
}
fws, err := f.GetGlobalForwardingRule(f.fwName(true))
if err != nil || fws.Target != tps.SelfLink {
t.Fatalf("%v", err)
}
}
func TestCreateBothLoadBalancers(t *testing.T) {
// This should create 2 forwarding rules and target proxies
// but they should use the same urlmap, and have the same
@ -236,7 +270,8 @@ func TestUpdateUrlMapNoChanges(t *testing.T) {
func TestNameParsing(t *testing.T) {
clusterName := "123"
namer := utils.NewNamer(clusterName)
firewallName := clusterName
namer := utils.NewNamer(clusterName, firewallName)
fullName := namer.Truncate(fmt.Sprintf("%v-%v", forwardingRulePrefix, namer.LBName("testlb")))
annotationsMap := map[string]string{
fmt.Sprintf("%v/forwarding-rule", utils.K8sAnnotationPrefix): fullName,
@ -308,7 +343,7 @@ func TestClusterNameChange(t *testing.T) {
}
func TestInvalidClusterNameChange(t *testing.T) {
namer := utils.NewNamer("test--123")
namer := utils.NewNamer("test--123", "test--123")
if got := namer.GetClusterName(); got != "123" {
t.Fatalf("Expected name 123, got %v", got)
}

View file

@ -62,7 +62,7 @@ const (
alphaNumericChar = "0"
// Current docker image version. Only used in debug logging.
imageVersion = "glbc:0.9.1"
imageVersion = "glbc:0.9.2"
// Key used to persist UIDs to configmaps.
uidConfigMapName = "ingress-uid"
@ -70,7 +70,7 @@ const (
var (
flags = flag.NewFlagSet(
`gclb: gclb --runngin-in-cluster=false --default-backend-node-port=123`,
`glbc: glbc --running-in-cluster=false`,
flag.ExitOnError)
clusterName = flags.String("cluster-uid", controller.DefaultClusterUID,
@ -215,7 +215,7 @@ func main() {
if *inCluster || *useRealCloud {
// Create cluster manager
namer, err := newNamer(kubeClient, *clusterName)
namer, err := newNamer(kubeClient, *clusterName, controller.DefaultFirewallName)
if err != nil {
glog.Fatalf("%v", err)
}
@ -225,7 +225,7 @@ func main() {
}
} else {
// Create fake cluster manager
clusterManager = controller.NewFakeClusterManager(*clusterName).ClusterManager
clusterManager = controller.NewFakeClusterManager(*clusterName, controller.DefaultFirewallName).ClusterManager
}
// Start loadbalancer controller
@ -247,32 +247,100 @@ func main() {
}
}
func newNamer(kubeClient client.Interface, clusterName string) (*utils.Namer, error) {
func newNamer(kubeClient client.Interface, clusterName string, fwName string) (*utils.Namer, error) {
name, err := getClusterUID(kubeClient, clusterName)
if err != nil {
return nil, err
}
fw_name, err := getFirewallName(kubeClient, fwName, name)
if err != nil {
return nil, err
}
namer := utils.NewNamer(name)
vault := storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName)
namer := utils.NewNamer(name, fw_name)
uidVault := storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName)
// Start a goroutine to poll the cluster UID config map
// We don't watch because we know exactly which configmap we want and this
// controller already watches 5 other resources, so it isn't worth the cost
// of another connection and complexity.
go wait.Forever(func() {
uid, found, err := vault.Get()
existing := namer.GetClusterName()
if found && uid != existing {
glog.Infof("Cluster uid changed from %v -> %v", existing, uid)
namer.SetClusterName(uid)
} else if err != nil {
glog.Errorf("Failed to reconcile cluster uid %v, currently set to %v", err, existing)
for _, key := range [...]string{storage.UidDataKey, storage.ProviderDataKey} {
val, found, err := uidVault.Get(key)
if err != nil {
glog.Errorf("Can't read uidConfigMap %v", uidConfigMapName)
} else if !found {
errmsg := fmt.Sprintf("Can't read %v from uidConfigMap %v", key, uidConfigMapName)
if key == storage.UidDataKey {
glog.Errorf(errmsg)
} else {
glog.V(4).Infof(errmsg)
}
} else {
switch key {
case storage.UidDataKey:
if uid := namer.GetClusterName(); uid != val {
glog.Infof("Cluster uid changed from %v -> %v", uid, val)
namer.SetClusterName(val)
}
case storage.ProviderDataKey:
if fw_name := namer.GetFirewallName(); fw_name != val {
glog.Infof("Cluster firewall name changed from %v -> %v", fw_name, val)
namer.SetFirewallName(val)
}
}
}
}
}, 5*time.Second)
return namer, nil
}
// useDefaultOrLookupVault returns either a 'default_name' or if unset, obtains a name from a ConfigMap.
// The returned value follows this priority:
// If the provided 'default_name' is not empty, that name is used.
// This is effectively a client override via a command line flag.
// else, check cfgVault with 'cm_key' as a key and if found, use the associated value
// else, return an empty 'name' and pass along an error iff the configmap lookup is erroneous.
func useDefaultOrLookupVault(cfgVault *storage.ConfigMapVault, cm_key, default_name string) (string, error) {
if default_name != "" {
glog.Infof("Using user provided %v %v", cm_key, default_name)
// Don't save the uid in the vault, so users can rollback through
// setting the accompany flag to ""
return default_name, nil
}
val, found, err := cfgVault.Get(cm_key)
if err != nil {
// This can fail because of:
// 1. No such config map - found=false, err=nil
// 2. No such key in config map - found=false, err=nil
// 3. Apiserver flake - found=false, err!=nil
// It is not safe to proceed in 3.
return "", fmt.Errorf("Failed to retrieve %v: %v, returning empty name", cm_key, err)
} else if !found {
// Not found but safe to proceed.
return "", nil
}
glog.Infof("Using %v = %q saved in ConfigMap", cm_key, val)
return val, nil
}
// getFirewallName returns the firewall rule name to use for this cluster. For
// backwards compatibility, the firewall name will default to the cluster UID.
// Use getFlagOrLookupVault to obtain a stored or overridden value for the firewall name.
// else, use the cluster UID as a backup (this retains backwards compatibility).
func getFirewallName(kubeClient client.Interface, name, cluster_uid string) (string, error) {
cfgVault := storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName)
if fw_name, err := useDefaultOrLookupVault(cfgVault, storage.ProviderDataKey, name); err != nil {
return "", err
} else if fw_name != "" {
return fw_name, cfgVault.Put(storage.ProviderDataKey, fw_name)
} else {
glog.Infof("Using cluster UID %v as firewall name", cluster_uid)
return cluster_uid, cfgVault.Put(storage.ProviderDataKey, cluster_uid)
}
}
// getClusterUID returns the cluster UID. Rules for UID generation:
// If the user specifies a --cluster-uid param it overwrites everything
// else, check UID config map for a previously recorded uid
@ -281,26 +349,12 @@ func newNamer(kubeClient client.Interface, clusterName string) (*utils.Namer, er
// else, allocate a new uid
func getClusterUID(kubeClient client.Interface, name string) (string, error) {
cfgVault := storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName)
if name != "" {
glog.Infof("Using user provided cluster uid %v", name)
// Don't save the uid in the vault, so users can rollback through
// --cluster-uid=""
if name, err := useDefaultOrLookupVault(cfgVault, storage.UidDataKey, name); err != nil {
return "", err
} else if name != "" {
return name, nil
}
existingUID, found, err := cfgVault.Get()
if found {
glog.Infof("Using saved cluster uid %q", existingUID)
return existingUID, nil
} else if err != nil {
// This can fail because of:
// 1. No such config map - found=false, err=nil
// 2. No such key in config map - found=false, err=nil
// 3. Apiserver flake - found=false, err!=nil
// It is not safe to proceed in 3.
return "", fmt.Errorf("Failed to retrieve current uid: %v, using %q as name", err, name)
}
// Check if the cluster has an Ingress with ip
ings, err := kubeClient.Extensions().Ingresses(api.NamespaceAll).List(api.ListOptions{LabelSelector: labels.Everything()})
if err != nil {
@ -311,10 +365,10 @@ func getClusterUID(kubeClient client.Interface, name string) (string, error) {
if len(ing.Status.LoadBalancer.Ingress) != 0 {
c := namer.ParseName(loadbalancers.GCEResourceName(ing.Annotations, "forwarding-rule"))
if c.ClusterName != "" {
return c.ClusterName, cfgVault.Put(c.ClusterName)
return c.ClusterName, cfgVault.Put(storage.UidDataKey, c.ClusterName)
}
glog.Infof("Found a working Ingress, assuming uid is empty string")
return "", cfgVault.Put("")
return "", cfgVault.Put(storage.UidDataKey, "")
}
}
@ -329,7 +383,7 @@ func getClusterUID(kubeClient client.Interface, name string) (string, error) {
return "", err
}
uid := fmt.Sprintf("%x", b)
return uid, cfgVault.Put(uid)
return uid, cfgVault.Put(storage.UidDataKey, uid)
}
// getNodePort waits for the Service, and returns it's first node port.

View file

@ -61,7 +61,7 @@ spec:
requests:
cpu: 10m
memory: 20Mi
- image: gcr.io/google_containers/glbc:0.9.1
- image: gcr.io/google_containers/glbc:0.9.2
livenessProbe:
httpGet:
path: /healthz

View file

@ -19,6 +19,7 @@ package storage
import (
"fmt"
"strings"
"sync"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
@ -27,73 +28,86 @@ import (
client "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
)
// UIDVault stores UIDs.
type UIDVault interface {
Get() (string, bool, error)
Put(string) error
Delete() error
}
// uidDataKey is the key used in config maps to store the UID.
const uidDataKey = "uid"
const (
// UidDataKey is the key used in config maps to store the UID.
UidDataKey = "uid"
// ProviderDataKey is the key used in config maps to store the Provider
// UID which we use to ensure unique firewalls.
ProviderDataKey = "provider-uid"
)
// ConfigMapVault stores cluster UIDs in config maps.
// It's a layer on top of ConfigMapStore that just implements the utils.uidVault
// interface.
type ConfigMapVault struct {
storeLock sync.Mutex
ConfigMapStore cache.Store
namespace string
name string
}
// Get retrieves the cluster UID from the cluster config map.
// Get retrieves the value associated to the provided 'key' from the cluster config map.
// If this method returns an error, it's guaranteed to be apiserver flake.
// If the error is a not found error it sets the boolean to false and
// returns and error of nil instead.
func (c *ConfigMapVault) Get() (string, bool, error) {
key := fmt.Sprintf("%v/%v", c.namespace, c.name)
item, found, err := c.ConfigMapStore.GetByKey(key)
func (c *ConfigMapVault) Get(key string) (string, bool, error) {
keyStore := fmt.Sprintf("%v/%v", c.namespace, c.name)
item, found, err := c.ConfigMapStore.GetByKey(keyStore)
if err != nil || !found {
return "", false, err
}
cfg := item.(*api.ConfigMap)
if k, ok := cfg.Data[uidDataKey]; ok {
data := item.(*api.ConfigMap).Data
c.storeLock.Lock()
defer c.storeLock.Unlock()
if k, ok := data[key]; ok {
return k, true, nil
}
return "", false, fmt.Errorf("Found config map %v but it doesn't contain uid key: %+v", key, cfg.Data)
glog.Infof("Found config map %v but it doesn't contain key %v: %+v", keyStore, key, data)
return "", false, nil
}
// Put stores the given UID in the cluster config map.
func (c *ConfigMapVault) Put(uid string) error {
// Put inserts a key/value pair in the cluster config map.
// If the key already exists, the value provided is stored.
func (c *ConfigMapVault) Put(key, val string) error {
c.storeLock.Lock()
defer c.storeLock.Unlock()
apiObj := &api.ConfigMap{
ObjectMeta: api.ObjectMeta{
Name: c.name,
Namespace: c.namespace,
},
Data: map[string]string{uidDataKey: uid},
}
cfgMapKey := fmt.Sprintf("%v/%v", c.namespace, c.name)
item, exists, err := c.ConfigMapStore.GetByKey(cfgMapKey)
if err == nil && exists {
data := item.(*api.ConfigMap).Data
if k, ok := data[uidDataKey]; ok && k == uid {
existingVal, ok := data[key]
if ok && existingVal == val {
// duplicate, no need to update.
return nil
} else if ok {
glog.Infof("Configmap %v has key %v but wrong value %v, updating", cfgMapKey, k, uid)
}
data[key] = val
apiObj.Data = data
if existingVal != val {
glog.Infof("Configmap %v has key %v but wrong value %v, updating to %v", cfgMapKey, key, existingVal, val)
} else {
glog.Infof("Configmap %v will be updated with %v = %v", cfgMapKey, key, val)
}
if err := c.ConfigMapStore.Update(apiObj); err != nil {
return fmt.Errorf("Failed to update %v: %v", cfgMapKey, err)
}
} else if err := c.ConfigMapStore.Add(apiObj); err != nil {
return fmt.Errorf("Failed to add %v: %v", cfgMapKey, err)
} else {
apiObj.Data = map[string]string{key: val}
if err := c.ConfigMapStore.Add(apiObj); err != nil {
return fmt.Errorf("Failed to add %v: %v", cfgMapKey, err)
}
}
glog.Infof("Successfully stored uid %q in config map %v", uid, cfgMapKey)
glog.Infof("Successfully stored key %v = %v in config map %v", key, val, cfgMapKey)
return nil
}
// Delete deletes the cluster UID storing config map.
// Delete deletes the ConfigMapStore.
func (c *ConfigMapVault) Delete() error {
cfgMapKey := fmt.Sprintf("%v/%v", c.namespace, c.name)
item, _, err := c.ConfigMapStore.GetByKey(cfgMapKey)
@ -108,13 +122,19 @@ func (c *ConfigMapVault) Delete() error {
// This client is essentially meant to abstract out the details of
// configmaps and the API, and just store/retrieve a single value, the cluster uid.
func NewConfigMapVault(c client.Interface, uidNs, uidConfigMapName string) *ConfigMapVault {
return &ConfigMapVault{NewConfigMapStore(c), uidNs, uidConfigMapName}
return &ConfigMapVault{
ConfigMapStore: NewConfigMapStore(c),
namespace: uidNs,
name: uidConfigMapName}
}
// NewFakeConfigMapVault is an implementation of the ConfigMapStore that doesn't
// persist configmaps. Only used in testing.
func NewFakeConfigMapVault(ns, name string) *ConfigMapVault {
return &ConfigMapVault{cache.NewStore(cache.MetaNamespaceKeyFunc), ns, name}
return &ConfigMapVault{
ConfigMapStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
namespace: ns,
name: name}
}
// ConfigMapStore wraps the store interface. Implementations usually persist

View file

@ -24,31 +24,51 @@ import (
func TestConfigMapUID(t *testing.T) {
vault := NewFakeConfigMapVault(api.NamespaceSystem, "ingress-uid")
uid := ""
k, exists, err := vault.Get()
// Get value from an empty vault.
val, exists, err := vault.Get(UidDataKey)
if exists {
t.Errorf("Got a key from an empyt vault")
t.Errorf("Got value from an empty vault")
}
vault.Put(uid)
k, exists, err = vault.Get()
// Store empty value for UidDataKey.
uid := ""
vault.Put(UidDataKey, uid)
val, exists, err = vault.Get(UidDataKey)
if !exists || err != nil {
t.Errorf("Failed to retrieve value from vault")
t.Errorf("Failed to retrieve value from vault: %v", err)
}
if k != "" {
if val != "" {
t.Errorf("Failed to store empty string as a key in the vault")
}
vault.Put("newuid")
k, exists, err = vault.Get()
// Store actual value in key.
storedVal := "newuid"
vault.Put(UidDataKey, storedVal)
val, exists, err = vault.Get(UidDataKey)
if !exists || err != nil {
t.Errorf("Failed to retrieve value from vault")
} else if val != storedVal {
t.Errorf("Failed to store empty string as a key in the vault")
}
if k != "newuid" {
t.Errorf("Failed to modify uid")
// Store second value which will have the affect of updating to Store
// rather than adding.
secondVal := "bar"
vault.Put("foo", secondVal)
val, exists, err = vault.Get("foo")
if !exists || err != nil || val != secondVal {
t.Errorf("Failed to retrieve second value from vault")
}
val, exists, err = vault.Get(UidDataKey)
if !exists || err != nil || val != storedVal {
t.Errorf("Failed to retrieve first value from vault")
}
// Delete value.
if err := vault.Delete(); err != nil {
t.Errorf("Failed to delete uid %v", err)
}
if uid, exists, _ := vault.Get(); exists {
t.Errorf("Found uid %v, expected none", uid)
if _, exists, _ := vault.Get(UidDataKey); exists {
t.Errorf("Found uid but expected none after deletion")
}
}

View file

@ -92,14 +92,16 @@ const (
// Namer handles centralized naming for the cluster.
type Namer struct {
clusterName string
nameLock sync.Mutex
clusterName string
firewallName string
nameLock sync.Mutex
}
// NewNamer creates a new namer.
func NewNamer(clusterName string) *Namer {
// NewNamer creates a new namer with a Cluster and Firewall name.
func NewNamer(clusterName, firewallName string) *Namer {
namer := &Namer{}
namer.SetClusterName(clusterName)
namer.SetFirewallName(firewallName)
return namer
}
@ -123,6 +125,16 @@ func (n *Namer) SetClusterName(name string) {
n.clusterName = name
}
// SetFirewallName sets the firewall name of this cluster.
func (n *Namer) SetFirewallName(firewall_name string) {
n.nameLock.Lock()
defer n.nameLock.Unlock()
if n.firewallName != firewall_name {
glog.Infof("Changing firewall name from %v to %v", n.firewallName, firewall_name)
n.firewallName = firewall_name
}
}
// GetClusterName returns the UID/name of this cluster.
func (n *Namer) GetClusterName() string {
n.nameLock.Lock()
@ -130,6 +142,18 @@ func (n *Namer) GetClusterName() string {
return n.clusterName
}
// GetFirewallName returns the firewall name of this cluster.
func (n *Namer) GetFirewallName() string {
n.nameLock.Lock()
defer n.nameLock.Unlock()
// Retain backwards compatible behavior where firewallName == clusterName.
if n.firewallName == "" {
return n.clusterName
} else {
return n.firewallName
}
}
// Truncate truncates the given key to a GCE length limit.
func (n *Namer) Truncate(key string) string {
if len(key) > nameLenLimit {
@ -216,12 +240,12 @@ func (n *Namer) IGName() string {
// FrSuffix constructs the glbc specific suffix for the FirewallRule.
func (n *Namer) FrSuffix() string {
clusterName := n.GetClusterName()
firewallName := n.GetFirewallName()
// The entire cluster only needs a single firewall rule.
if clusterName == "" {
if firewallName == "" {
return globalFirewallSuffix
}
return n.Truncate(fmt.Sprintf("%v%v%v", globalFirewallSuffix, clusterNameDelimiter, clusterName))
return n.Truncate(fmt.Sprintf("%v%v%v", globalFirewallSuffix, clusterNameDelimiter, firewallName))
}
// FrName constructs the full firewall rule name, this is the name assigned by

View file

@ -6,6 +6,7 @@ BUILDTAGS=
RELEASE?=0.9.0-beta.2
PREFIX?=gcr.io/google_containers/nginx-ingress-controller
GOOS?=linux
DOCKER?=gcloud docker --
REPO_INFO=$(shell git config --get remote.origin.url)
@ -20,11 +21,11 @@ build: clean
-ldflags "-s -w -X ${PKG}/pkg/version.RELEASE=${RELEASE} -X ${PKG}/pkg/version.COMMIT=${COMMIT} -X ${PKG}/pkg/version.REPO=${REPO_INFO}" \
-o rootfs/nginx-ingress-controller ${PKG}/pkg/cmd/controller
container: build
docker build --pull -t $(PREFIX):$(RELEASE) rootfs
container:
$(DOCKER) build --pull -t $(PREFIX):$(RELEASE) rootfs
push: container
gcloud docker -- push $(PREFIX):$(RELEASE)
$(DOCKER) push $(PREFIX):$(RELEASE)
fmt:
@echo "+ $@"

View file

@ -1,6 +1,6 @@
# Nginx Ingress Controller
This is an nginx Ingress controller that uses [ConfigMap](https://github.com/kubernetes/kubernetes/blob/master/docs/design/configmap.md) to store the nginx configuration. See [Ingress controller documentation](../README.md) for details on how it works.
This is an nginx Ingress controller that uses [ConfigMap](https://github.com/kubernetes/community/blob/master/contributors/design-proposals/configmap.md) to store the nginx configuration. See [Ingress controller documentation](../README.md) for details on how it works.
## Contents
* [Conventions](#conventions)

View file

@ -40,6 +40,7 @@ The following annotations are supported:
|Name |type|
|---------------------------|------|
|[ingress.kubernetes.io/add-base-url](#rewrite)|true or false|
|[ingress.kubernetes.io/affinity](#session-affinity)|true or false|
|[ingress.kubernetes.io/auth-realm](#authentication)|string|
|[ingress.kubernetes.io/auth-secret](#authentication)|string|
|[ingress.kubernetes.io/auth-type](#authentication)|basic or digest|
@ -47,18 +48,18 @@ The following annotations are supported:
|[ingress.kubernetes.io/auth-tls-secret](#Certificate Authentication)|string|
|[ingress.kubernetes.io/auth-tls-verify-depth](#Certificate Authentication)|number|
|[ingress.kubernetes.io/enable-cors](#enable-cors)|true or false|
|[ingress.kubernetes.io/force-ssl-redirect](#server-side-https-enforcement-through-redirect)|true or false|
|[ingress.kubernetes.io/limit-connections](#rate-limiting)|number|
|[ingress.kubernetes.io/limit-rps](#rate-limiting)|number|
|[ingress.kubernetes.io/proxy-body-size](#custom-max-body-size)|string|
|[ingress.kubernetes.io/rewrite-target](#rewrite)|URI|
|[ingress.kubernetes.io/secure-backends](#secure-backends)|true or false|
|[ingress.kubernetes.io/session-cookie-name](#cookie-affinity)|string|
|[ingress.kubernetes.io/session-cookie-hash](#cookie-affinity)|string|
|[ingress.kubernetes.io/ssl-redirect](#server-side-https-enforcement-through-redirect)|true or false|
|[ingress.kubernetes.io/upstream-max-fails](#custom-nginx-upstream-checks)|number|
|[ingress.kubernetes.io/upstream-fail-timeout](#custom-nginx-upstream-checks)|number|
|[ingress.kubernetes.io/whitelist-source-range](#whitelist-source-range)|CIDR|
|[ingress.kubernetes.io/affinity](#session-affinity)|true or false|
|[ingress.kubernetes.io/session-cookie-name](#cookie-affinity)|string|
|[ingress.kubernetes.io/session-cookie-hash](#cookie-affinity)|string|
@ -126,7 +127,7 @@ The secret must be created in the same namespace as the Ingress rule.
ingress.kubernetes.io/auth-realm: "realm string"
```
Please check the [auth](examples/auth/README.md) example.
Please check the [auth](/examples/auth/nginx/README.md) example.
### Certificate Authentication
@ -146,7 +147,7 @@ ingress.kubernetes.io/auth-tls-verify-depth
The validation depth between the provided client certificate and the Certification Authority chain.
Please check the [tls-auth](examples/auth/client-certs/README.md) example.
Please check the [tls-auth](/examples/auth/client-certs/nginx/README.md) example.
### Enable CORS
@ -163,7 +164,7 @@ Additionally it is possible to set `ingress.kubernetes.io/auth-method` to specif
ingress.kubernetes.io/auth-url: "URL to the authentication service"
```
Please check the [external-auth](examples/external-auth/README.md) example.
Please check the [external-auth](/examples/auth/external-auth/nginx/README.md) example.
### Rewrite
@ -198,6 +199,8 @@ By default the controller redirects (301) to `HTTPS` if TLS is enabled for that
To configure this feature for specific ingress resources, you can use the `ingress.kubernetes.io/ssl-redirect: "false"` annotation in the particular resource.
When using SSL offloading outside of cluster (e.g. AWS ELB) it may be usefull to enforce a redirect to `HTTPS` even when there is not TLS cert available. This can be achieved by using the `ingress.kubernetes.io/force-ssl-redirect: "true"` annotation in the particular resource.
### Whitelist source range
@ -207,7 +210,7 @@ To configure this setting globally for all Ingress rules, the `whitelist-source-
*Note:* Adding an annotation to an Ingress rule overrides any global restriction.
Please check the [whitelist](examples/affinity/cookie/nginx/README.md) example.
Please check the [whitelist](/examples/affinity/cookie/nginx/README.md) example.
### Session Affinity
@ -221,7 +224,7 @@ If you use the ``cookie`` type you can also specify the name of the cookie that
In case of NGINX the annotation `ingress.kubernetes.io/session-cookie-hash` defines which algorithm will be used to 'hash' the used upstream. Default value is `md5` and possible values are `md5`, `sha1` and `index`.
The `index` option is not hashed, an in-memory index is used instead, it's quicker and the overhead is shorter Warning: the matching against upstream servers list is inconsistent. So, at reload, if upstreams servers has changed, index values are not guaranted to correspond to the same server as before! USE IT WITH CAUTION and only if you need to!
In NGINX this feature is implemented by the third party module [nginx-sticky-module-ng](https://bitbucket.org/nginx-goodies/nginx-sticky-module-ng). The workflow used to define which upstream server will be used is explained [here]https://bitbucket.org/nginx-goodies/nginx-sticky-module-ng/raw/08a395c66e425540982c00482f55034e1fee67b6/docs/sticky.pdf
In NGINX this feature is implemented by the third party module [nginx-sticky-module-ng](https://bitbucket.org/nginx-goodies/nginx-sticky-module-ng). The workflow used to define which upstream server will be used is explained [here](https://bitbucket.org/nginx-goodies/nginx-sticky-module-ng/raw/08a395c66e425540982c00482f55034e1fee67b6/docs/sticky.pdf)
@ -239,6 +242,9 @@ Example usage: `custom-http-errors: 404,415`
**disable-access-log:** Disables the Access Log from the entire Ingress Controller. This is 'false' by default.
**disable-ipv6:** Disable listening on IPV6. This is 'false' by default.
**enable-dynamic-tls-records:** Enables dynamically sized TLS records to improve time-to-first-byte. Enabled by default. See [CloudFlare's blog](https://blog.cloudflare.com/optimizing-tls-over-tcp-to-reduce-latency) for more information.
@ -330,7 +336,7 @@ The recommendation above prioritizes algorithms that provide perfect [forward se
Please check the [Mozilla SSL Configuration Generator](https://mozilla.github.io/server-side-tls/ssl-config-generator/).
**ssl-dh-param:** sets the Base64 string that contains Diffie-Hellman key to help with "Perfect Forward Secrecy".
**ssl-dh-param:** Sets the name of the secret that contains Diffie-Hellman key to help with "Perfect Forward Secrecy".
https://www.openssl.org/docs/manmaster/apps/dhparam.html
https://wiki.mozilla.org/Security/Server_Side_TLS#DHE_handshake_and_dhparam
http://nginx.org/en/docs/http/ngx_http_ssl_module.html#ssl_dhparam

View file

@ -17,217 +17,79 @@ limitations under the License.
package main
import (
"path/filepath"
"github.com/golang/glog"
common "github.com/ncabatoff/process-exporter"
"github.com/ncabatoff/process-exporter/proc"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/ingress/controllers/nginx/pkg/metric/collector"
)
type exeMatcher struct {
name string
args []string
}
const (
ngxStatusPath = "/internal_nginx_status"
ngxVtsPath = "/nginx_status/format/json"
)
func (em exeMatcher) MatchAndName(nacl common.NameAndCmdline) (bool, string) {
if len(nacl.Cmdline) == 0 {
return false, ""
func (n *NGINXController) setupMonitor(sm statusModule) {
csm := n.statusModule
if csm != sm {
glog.Infof("changing prometheus collector from %v to %v", csm, sm)
n.stats.stop(csm)
n.stats.start(sm)
n.statusModule = sm
}
cmd := filepath.Base(nacl.Cmdline[0])
return em.name == cmd, ""
}
func (n *NGINXController) setupMonitor(args []string) {
pc, err := newProcessCollector(true, exeMatcher{"nginx", args})
type statsCollector struct {
process prometheus.Collector
basic collector.Stopable
vts collector.Stopable
namespace string
watchClass string
}
func (s *statsCollector) stop(sm statusModule) {
switch sm {
case defaultStatusModule:
s.basic.Stop()
prometheus.Unregister(s.basic)
break
case vtsStatusModule:
s.vts.Stop()
prometheus.Unregister(s.vts)
break
}
}
func (s *statsCollector) start(sm statusModule) {
switch sm {
case defaultStatusModule:
s.basic = collector.NewNginxStatus(s.namespace, s.watchClass, ngxHealthPort, ngxStatusPath)
prometheus.Register(s.basic)
break
case vtsStatusModule:
s.vts = collector.NewNGINXVTSCollector(s.namespace, s.watchClass, ngxHealthPort, ngxVtsPath)
prometheus.Register(s.vts)
break
}
}
func newStatsCollector(ns, class, binary string) *statsCollector {
glog.Infof("starting new nginx stats collector for Ingress controller running in namespace %v (class %v)", ns, class)
pc, err := collector.NewNamedProcess(true, collector.BinaryNameMatcher{
Name: "nginx",
Binary: binary,
})
if err != nil {
glog.Fatalf("unexpected error registering nginx collector: %v", err)
}
err = prometheus.Register(pc)
if err != nil {
glog.Warningf("unexpected error registering nginx collector: %v", err)
}
}
var (
numprocsDesc = prometheus.NewDesc(
"nginx_num_procs",
"number of processes",
nil, nil)
cpuSecsDesc = prometheus.NewDesc(
"nginx_cpu_seconds_total",
"Cpu usage in seconds",
nil, nil)
readBytesDesc = prometheus.NewDesc(
"nginx_read_bytes_total",
"number of bytes read",
nil, nil)
writeBytesDesc = prometheus.NewDesc(
"nginx_write_bytes_total",
"number of bytes written",
nil, nil)
memResidentbytesDesc = prometheus.NewDesc(
"nginx_resident_memory_bytes",
"number of bytes of memory in use",
nil, nil)
memVirtualbytesDesc = prometheus.NewDesc(
"nginx_virtual_memory_bytes",
"number of bytes of memory in use",
nil, nil)
startTimeDesc = prometheus.NewDesc(
"nginx_oldest_start_time_seconds",
"start time in seconds since 1970/01/01",
nil, nil)
activeDesc = prometheus.NewDesc(
"nginx_active_connections",
"total number of active connections",
nil, nil)
acceptedDesc = prometheus.NewDesc(
"nginx_accepted_connections",
"total number of accepted client connections",
nil, nil)
handledDesc = prometheus.NewDesc(
"nginx_handled_connections",
"total number of handled connections",
nil, nil)
requestsDesc = prometheus.NewDesc(
"nginx_total_requests",
"total number of client requests",
nil, nil)
readingDesc = prometheus.NewDesc(
"nginx_current_reading_connections",
"current number of connections where nginx is reading the request header",
nil, nil)
writingDesc = prometheus.NewDesc(
"nginx_current_writing_connections",
"current number of connections where nginx is writing the response back to the client",
nil, nil)
waitingDesc = prometheus.NewDesc(
"nginx_current_waiting_connections",
"current number of idle client connections waiting for a request",
nil, nil)
)
type (
scrapeRequest struct {
results chan<- prometheus.Metric
done chan struct{}
}
namedProcessCollector struct {
scrapeChan chan scrapeRequest
*proc.Grouper
fs *proc.FS
}
)
func newProcessCollector(
children bool,
n common.MatchNamer) (*namedProcessCollector, error) {
fs, err := proc.NewFS("/proc")
if err != nil {
return nil, err
}
p := &namedProcessCollector{
scrapeChan: make(chan scrapeRequest),
Grouper: proc.NewGrouper(children, n),
fs: fs,
}
_, err = p.Update(p.fs.AllProcs())
if err != nil {
return nil, err
}
go p.start()
return p, nil
}
// Describe implements prometheus.Collector.
func (p *namedProcessCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- cpuSecsDesc
ch <- numprocsDesc
ch <- readBytesDesc
ch <- writeBytesDesc
ch <- memResidentbytesDesc
ch <- memVirtualbytesDesc
ch <- startTimeDesc
}
// Collect implements prometheus.Collector.
func (p *namedProcessCollector) Collect(ch chan<- prometheus.Metric) {
req := scrapeRequest{results: ch, done: make(chan struct{})}
p.scrapeChan <- req
<-req.done
}
func (p *namedProcessCollector) start() {
for req := range p.scrapeChan {
ch := req.results
p.scrape(ch)
req.done <- struct{}{}
}
}
func (p *namedProcessCollector) scrape(ch chan<- prometheus.Metric) {
s, err := getNginxStatus()
if err != nil {
glog.Warningf("unexpected error obtaining nginx status info: %v", err)
return
}
ch <- prometheus.MustNewConstMetric(activeDesc,
prometheus.GaugeValue, float64(s.Active))
ch <- prometheus.MustNewConstMetric(acceptedDesc,
prometheus.GaugeValue, float64(s.Accepted))
ch <- prometheus.MustNewConstMetric(handledDesc,
prometheus.GaugeValue, float64(s.Handled))
ch <- prometheus.MustNewConstMetric(requestsDesc,
prometheus.GaugeValue, float64(s.Requests))
ch <- prometheus.MustNewConstMetric(readingDesc,
prometheus.GaugeValue, float64(s.Reading))
ch <- prometheus.MustNewConstMetric(writingDesc,
prometheus.GaugeValue, float64(s.Writing))
ch <- prometheus.MustNewConstMetric(waitingDesc,
prometheus.GaugeValue, float64(s.Waiting))
_, err = p.Update(p.fs.AllProcs())
if err != nil {
glog.Warningf("unexpected error obtaining nginx process info: %v", err)
return
}
for gname, gcounts := range p.Groups() {
glog.Infof("%v", gname)
glog.Infof("%v", gcounts)
ch <- prometheus.MustNewConstMetric(numprocsDesc,
prometheus.GaugeValue, float64(gcounts.Procs))
ch <- prometheus.MustNewConstMetric(memResidentbytesDesc,
prometheus.GaugeValue, float64(gcounts.Memresident))
ch <- prometheus.MustNewConstMetric(memVirtualbytesDesc,
prometheus.GaugeValue, float64(gcounts.Memvirtual))
ch <- prometheus.MustNewConstMetric(startTimeDesc,
prometheus.GaugeValue, float64(gcounts.OldestStartTime.Unix()))
ch <- prometheus.MustNewConstMetric(cpuSecsDesc,
prometheus.CounterValue, gcounts.Cpu)
ch <- prometheus.MustNewConstMetric(readBytesDesc,
prometheus.CounterValue, float64(gcounts.ReadBytes))
ch <- prometheus.MustNewConstMetric(writeBytesDesc,
prometheus.CounterValue, float64(gcounts.WriteBytes))
glog.Fatalf("unexpected error registering nginx collector: %v", err)
}
return &statsCollector{
namespace: ns,
watchClass: class,
process: pc,
}
}

View file

@ -29,28 +29,35 @@ import (
"time"
"github.com/golang/glog"
"github.com/mitchellh/mapstructure"
"github.com/spf13/pflag"
"k8s.io/kubernetes/pkg/api"
"strings"
"k8s.io/ingress/controllers/nginx/pkg/config"
ngx_template "k8s.io/ingress/controllers/nginx/pkg/template"
"k8s.io/ingress/controllers/nginx/pkg/version"
"k8s.io/ingress/core/pkg/ingress"
"k8s.io/ingress/core/pkg/ingress/defaults"
"k8s.io/ingress/core/pkg/net/ssl"
)
type statusModule string
const (
ngxHealthPort = 18080
ngxHealthPath = "/healthz"
ngxStatusPath = "/internal_nginx_status"
defaultStatusModule statusModule = "default"
vtsStatusModule statusModule = "vts"
)
var (
tmplPath = "/etc/nginx/template/nginx.tmpl"
cfgPath = "/etc/nginx/nginx.conf"
binary = "/usr/sbin/nginx"
tmplPath = "/etc/nginx/template/nginx.tmpl"
cfgPath = "/etc/nginx/nginx.conf"
binary = "/usr/sbin/nginx"
defIngressClass = "nginx"
)
// newNGINXController creates a new NGINX Ingress controller.
@ -61,7 +68,7 @@ func newNGINXController() ingress.Controller {
if ngx == "" {
ngx = binary
}
n := NGINXController{
n := &NGINXController{
binary: ngx,
configmap: &api.ConfigMap{},
}
@ -93,7 +100,7 @@ Error loading new template : %v
go n.Start()
return ingress.Controller(&n)
return ingress.Controller(n)
}
// NGINXController ...
@ -105,10 +112,18 @@ type NGINXController struct {
storeLister ingress.StoreLister
binary string
cmdArgs []string
watchClass string
namespace string
stats *statsCollector
statusModule statusModule
}
// Start start a new NGINX master process running in foreground.
func (n NGINXController) Start() {
func (n *NGINXController) Start() {
glog.Info("starting NGINX process...")
done := make(chan error, 1)
@ -155,7 +170,7 @@ func (n *NGINXController) start(cmd *exec.Cmd, done chan error) {
return
}
n.setupMonitor(cmd.Args)
n.cmdArgs = cmd.Args
go func() {
done <- cmd.Wait()
@ -175,6 +190,7 @@ func (n NGINXController) Reload(data []byte) ([]byte, bool, error) {
}
o, e := exec.Command(n.binary, "-s", "reload").CombinedOutput()
return o, true, e
}
@ -185,23 +201,7 @@ func (n NGINXController) BackendDefaults() defaults.Backend {
return d.Backend
}
return n.backendDefaults()
}
func (n *NGINXController) backendDefaults() defaults.Backend {
d := config.NewDefault()
config := &mapstructure.DecoderConfig{
Metadata: nil,
WeaklyTypedInput: true,
Result: &d,
TagName: "json",
}
decoder, err := mapstructure.NewDecoder(config)
if err != nil {
glog.Warningf("unexpected error merging defaults: %v", err)
}
decoder.Decode(n.configmap.Data)
return d.Backend
return ngx_template.ReadConfig(n.configmap.Data).Backend
}
// isReloadRequired check if the new configuration file is different
@ -218,6 +218,7 @@ func (n NGINXController) isReloadRequired(data []byte) bool {
}
if !bytes.Equal(src, data) {
tmpfile, err := ioutil.TempFile("", "nginx-cfg-diff")
if err != nil {
glog.Errorf("error creating temporal file: %s", err)
@ -239,6 +240,7 @@ func (n NGINXController) isReloadRequired(data []byte) bool {
glog.Infof("NGINX configuration diff\n")
glog.Infof("%v", string(diffOutput))
}
os.Remove(tmpfile.Name())
return len(diffOutput) > 0
}
return false
@ -255,8 +257,25 @@ func (n NGINXController) Info() *ingress.BackendInfo {
}
// OverrideFlags customize NGINX controller flags
func (n NGINXController) OverrideFlags(flags *pflag.FlagSet) {
flags.Set("ingress-class", "nginx")
func (n *NGINXController) OverrideFlags(flags *pflag.FlagSet) {
ic, _ := flags.GetString("ingress-class")
wc, _ := flags.GetString("watch-namespace")
if ic == "" {
ic = defIngressClass
}
if ic != defIngressClass {
glog.Warningf("only Ingress with class %v will be processed by this ingress controller", ic)
}
flags.Set("ingress-class", ic)
n.stats = newStatsCollector(ic, wc, n.binary)
}
// DefaultIngressClass just return the default ingress class
func (n NGINXController) DefaultIngressClass() string {
return defIngressClass
}
// testTemplate checks if the NGINX configuration inside the byte array is valid
@ -267,7 +286,10 @@ func (n NGINXController) testTemplate(cfg []byte) error {
return err
}
defer tmpfile.Close()
ioutil.WriteFile(tmpfile.Name(), cfg, 0644)
err = ioutil.WriteFile(tmpfile.Name(), cfg, 0644)
if err != nil {
return err
}
out, err := exec.Command(n.binary, "-t", "-c", tmpfile.Name()).CombinedOutput()
if err != nil {
// this error is different from the rest because it must be clear why nginx is not working
@ -314,6 +336,13 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) ([]byte, er
cfg := ngx_template.ReadConfig(n.configmap.Data)
// we need to check if the status module configuration changed
if cfg.EnableVtsStatus {
n.setupMonitor(vtsStatusModule)
} else {
n.setupMonitor(defaultStatusModule)
}
// NGINX cannot resize the has tables used to store server names.
// For this reason we check if the defined size defined is correct
// for the FQDN defined in the ingress rules adjusting the value
@ -349,6 +378,32 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) ([]byte, er
}
}
sslDHParam := ""
if cfg.SSLDHParam != "" {
secretName := cfg.SSLDHParam
s, exists, err := n.storeLister.Secret.GetByKey(secretName)
if err != nil {
glog.Warningf("unexpected error reading secret %v: %v", secretName, err)
}
if exists {
secret := s.(*api.Secret)
nsSecName := strings.Replace(secretName, "/", "-", -1)
dh, ok := secret.Data["dhparam.pem"]
if ok {
pemFileName, err := ssl.AddOrUpdateDHParam(nsSecName, dh)
if err != nil {
glog.Warningf("unexpected error adding or updating dhparam %v file: %v", nsSecName, err)
} else {
sslDHParam = pemFileName
}
}
}
}
cfg.SSLDHParam = sslDHParam
content, err := n.t.Write(config.TemplateConfig{
ProxySetHeaders: setHeaders,
MaxOpenFiles: maxOpenFiles,

View file

@ -1,99 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
)
var (
ac = regexp.MustCompile(`Active connections: (\d+)`)
sahr = regexp.MustCompile(`(\d+)\s(\d+)\s(\d+)`)
reading = regexp.MustCompile(`Reading: (\d+)`)
writing = regexp.MustCompile(`Writing: (\d+)`)
waiting = regexp.MustCompile(`Waiting: (\d+)`)
)
type nginxStatus struct {
// Active total number of active connections
Active int
// Accepted total number of accepted client connections
Accepted int
// Handled total number of handled connections. Generally, the parameter value is the same as accepts unless some resource limits have been reached (for example, the worker_connections limit).
Handled int
// Requests total number of client requests.
Requests int
// Reading current number of connections where nginx is reading the request header.
Reading int
// Writing current number of connections where nginx is writing the response back to the client.
Writing int
// Waiting current number of idle client connections waiting for a request.
Waiting int
}
func getNginxStatus() (*nginxStatus, error) {
resp, err := http.DefaultClient.Get(fmt.Sprintf("http://localhost:%v%v", ngxHealthPort, ngxStatusPath))
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx status page: %v", err)
}
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx status page (%v)", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, fmt.Errorf("unexpected error scraping nginx status page (status %v)", resp.StatusCode)
}
return parse(string(data)), nil
}
func parse(data string) *nginxStatus {
acr := ac.FindStringSubmatch(data)
sahrr := sahr.FindStringSubmatch(data)
readingr := reading.FindStringSubmatch(data)
writingr := writing.FindStringSubmatch(data)
waitingr := waiting.FindStringSubmatch(data)
return &nginxStatus{
toInt(acr, 1),
toInt(sahrr, 1),
toInt(sahrr, 2),
toInt(sahrr, 3),
toInt(readingr, 1),
toInt(writingr, 1),
toInt(waitingr, 1),
}
}
func toInt(data []string, pos int) int {
if len(data) == 0 {
return 0
}
if pos > len(data) {
return 0
}
if v, err := strconv.Atoi(data[pos]); err == nil {
return v
}
return 0
}

View file

@ -17,11 +17,11 @@ limitations under the License.
package config
import (
"fmt"
"runtime"
"github.com/golang/glog"
"fmt"
"k8s.io/ingress/core/pkg/ingress"
"k8s.io/ingress/core/pkg/ingress/defaults"
)
@ -47,9 +47,9 @@ const (
gzipTypes = "application/atom+xml application/javascript application/x-javascript application/json application/rss+xml application/vnd.ms-fontobject application/x-font-ttf application/x-web-app-manifest+json application/xhtml+xml application/xml font/opentype image/svg+xml image/x-icon text/css text/plain text/x-component"
logFormatUpstream = "'%v - [$proxy_add_x_forwarded_for] - $remote_user [$time_local] \"$request\" $status $body_bytes_sent \"$http_referer\" \"$http_user_agent\" $request_length $request_time [$proxy_upstream_name] $upstream_addr $upstream_response_length $upstream_response_time $upstream_status'"
logFormatUpstream = `%v - [$proxy_add_x_forwarded_for] - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" $request_length $request_time [$proxy_upstream_name] $upstream_addr $upstream_response_length $upstream_response_time $upstream_status"`
logFormatStream = "'$remote_addr [$time_local] $protocol [$ssl_preread_server_name] [$stream_upstream] $status $bytes_sent $bytes_received $session_time'"
logFormatStream = `[$time_local] $protocol [$ssl_preread_server_name] [$stream_upstream] $status $bytes_sent $bytes_received $session_time`
// http://nginx.org/en/docs/http/ngx_http_ssl_module.html#ssl_buffer_size
// Sets the size of the buffer used for sending data.
@ -97,10 +97,8 @@ type Configuration struct {
//http://nginx.org/en/docs/http/ngx_http_log_module.html
DisableAccessLog bool `json:"disable-access-log,omitempty"`
// EnableSPDY enables spdy and use ALPN and NPN to advertise the availability of the two protocols
// https://blog.cloudflare.com/open-sourcing-our-nginx-http-2-spdy-code
// By default this is enabled
EnableSPDY bool `json:"enable-spdy"`
// DisableIpv6 disable listening on ipv6 address
DisableIpv6 bool `json:"disable-ipv6,omitempty"`
// EnableStickySessions enabled sticky sessions using cookies
// https://bitbucket.org/nginx-goodies/nginx-sticky-module-ng
@ -123,6 +121,14 @@ type Configuration struct {
// Log levels above are listed in the order of increasing severity
ErrorLogLevel string `json:"error-log-level,omitempty"`
// https://nginx.org/en/docs/http/ngx_http_v2_module.html#http2_max_field_size
// HTTP2MaxFieldSize Limits the maximum size of an HPACK-compressed request header field
HTTP2MaxFieldSize string `json:"http2-max-field-size,omitempty"`
// https://nginx.org/en/docs/http/ngx_http_v2_module.html#http2_max_header_size
// HTTP2MaxHeaderSize Limits the maximum size of the entire request header list after HPACK decompression
HTTP2MaxHeaderSize string `json:"http2-max-header-size,omitempty"`
// Enables or disables the header HSTS in servers running SSL
HSTS bool `json:"hsts,omitempty"`
@ -193,7 +199,7 @@ type Configuration struct {
// http://nginx.org/en/docs/http/ngx_http_ssl_module.html#ssl_ciphers
SSLCiphers string `json:"ssl-ciphers,omitempty"`
// Base64 string that contains Diffie-Hellman key to help with "Perfect Forward Secrecy"
// The secret that contains Diffie-Hellman key to help with "Perfect Forward Secrecy"
// https://www.openssl.org/docs/manmaster/apps/dhparam.html
// https://wiki.mozilla.org/Security/Server_Side_TLS#DHE_handshake_and_dhparam
// http://nginx.org/en/docs/http/ngx_http_ssl_module.html#ssl_dhparam
@ -253,10 +259,10 @@ type Configuration struct {
func NewDefault() Configuration {
cfg := Configuration{
ClientHeaderBufferSize: "1k",
DisableAccessLog: false,
EnableDynamicTLSRecords: true,
EnableSPDY: false,
ErrorLogLevel: errorLevel,
HTTP2MaxFieldSize: "4k",
HTTP2MaxHeaderSize: "16k",
HSTS: true,
HSTSIncludeSubdomains: true,
HSTSMaxAge: hstsMaxAge,
@ -264,7 +270,7 @@ func NewDefault() Configuration {
KeepAlive: 75,
LargeClientHeaderBuffers: "4 8k",
LogFormatStream: logFormatStream,
LogFormatUpstream: BuildLogFormatUpstream(false),
LogFormatUpstream: logFormatUpstream,
MaxWorkerConnections: 16384,
MapHashBucketSize: 64,
ProxyRealIPCIDR: defIPCIDR,
@ -278,7 +284,6 @@ func NewDefault() Configuration {
SSLSessionCacheSize: sslSessionCacheSize,
SSLSessionTickets: true,
SSLSessionTimeout: sslSessionTimeout,
UseProxyProtocol: false,
UseGzip: true,
WorkerProcesses: runtime.NumCPU(),
VtsStatusZoneSize: "10m",
@ -295,7 +300,6 @@ func NewDefault() Configuration {
CustomHTTPErrors: []int{},
WhitelistSourceRange: []string{},
SkipAccessLogURLs: []string{},
UsePortInRedirects: false,
},
}
@ -306,13 +310,18 @@ func NewDefault() Configuration {
return cfg
}
// BuildLogFormatUpstream format the log_format upstream based on proxy_protocol
func BuildLogFormatUpstream(useProxyProtocol bool) string {
if useProxyProtocol {
return fmt.Sprintf(logFormatUpstream, "$proxy_protocol_addr")
// BuildLogFormatUpstream format the log_format upstream using
// proxy_protocol_addr as remote client address if UseProxyProtocol
// is enabled.
func (cfg Configuration) BuildLogFormatUpstream() string {
if cfg.LogFormatUpstream == logFormatUpstream {
if cfg.UseProxyProtocol {
return fmt.Sprintf(cfg.LogFormatUpstream, "$proxy_protocol_addr")
}
return fmt.Sprintf(cfg.LogFormatUpstream, "$remote_addr")
}
return fmt.Sprintf(logFormatUpstream, "$remote_addr")
return cfg.LogFormatUpstream
}
// TemplateConfig contains the nginx configuration to render the file nginx.conf

View file

@ -1,3 +1,19 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
@ -9,19 +25,22 @@ func TestBuildLogFormatUpstream(t *testing.T) {
testCases := []struct {
useProxyProtocol bool // use proxy protocol
curLogFormat string
expected string
}{
{true, fmt.Sprintf(logFormatUpstream, "$proxy_protocol_addr")},
{false, fmt.Sprintf(logFormatUpstream, "$remote_addr")},
{true, logFormatUpstream, fmt.Sprintf(logFormatUpstream, "$proxy_protocol_addr")},
{false, logFormatUpstream, fmt.Sprintf(logFormatUpstream, "$remote_addr")},
{true, "my-log-format", "my-log-format"},
{false, "john-log-format", "john-log-format"},
}
for _, testCase := range testCases {
result := BuildLogFormatUpstream(testCase.useProxyProtocol)
cfg := NewDefault()
cfg.UseProxyProtocol = testCase.useProxyProtocol
cfg.LogFormatUpstream = testCase.curLogFormat
result := cfg.BuildLogFormatUpstream()
if result != testCase.expected {
t.Errorf(" expected %v but return %v", testCase.expected, result)
}
}
}

View file

@ -0,0 +1,160 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"fmt"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
type (
nginxStatusCollector struct {
scrapeChan chan scrapeRequest
ngxHealthPort int
ngxVtsPath string
data *nginxStatusData
}
nginxStatusData struct {
active *prometheus.Desc
accepted *prometheus.Desc
handled *prometheus.Desc
requests *prometheus.Desc
reading *prometheus.Desc
writing *prometheus.Desc
waiting *prometheus.Desc
}
)
func buildNS(namespace, class string) string {
if namespace == "" {
namespace = "all"
}
if class == "" {
class = "all"
}
return fmt.Sprintf("%v_%v", namespace, class)
}
// NewNginxStatus returns a new prometheus collector the default nginx status module
func NewNginxStatus(namespace, class string, ngxHealthPort int, ngxVtsPath string) Stopable {
p := nginxStatusCollector{
scrapeChan: make(chan scrapeRequest),
ngxHealthPort: ngxHealthPort,
ngxVtsPath: ngxVtsPath,
}
ns := buildNS(namespace, class)
p.data = &nginxStatusData{
active: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "active_connections"),
"total number of active connections",
nil, nil),
accepted: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "accepted_connections"),
"total number of accepted client connections",
nil, nil),
handled: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "handled_connections"),
"total number of handled connections",
nil, nil),
requests: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "total_requests"),
"total number of client requests",
nil, nil),
reading: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "current_reading_connections"),
"current number of connections where nginx is reading the request header",
nil, nil),
writing: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "current_writing_connections"),
"current number of connections where nginx is writing the response back to the client",
nil, nil),
waiting: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "current_waiting_connections"),
"current number of idle client connections waiting for a request",
nil, nil),
}
go p.start()
return p
}
// Describe implements prometheus.Collector.
func (p nginxStatusCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- p.data.active
ch <- p.data.accepted
ch <- p.data.handled
ch <- p.data.requests
ch <- p.data.reading
ch <- p.data.writing
ch <- p.data.waiting
}
// Collect implements prometheus.Collector.
func (p nginxStatusCollector) Collect(ch chan<- prometheus.Metric) {
req := scrapeRequest{results: ch, done: make(chan struct{})}
p.scrapeChan <- req
<-req.done
}
func (p nginxStatusCollector) start() {
for req := range p.scrapeChan {
ch := req.results
p.scrape(ch)
req.done <- struct{}{}
}
}
func (p nginxStatusCollector) Stop() {
close(p.scrapeChan)
}
// nginxStatusCollector scrap the nginx status
func (p nginxStatusCollector) scrape(ch chan<- prometheus.Metric) {
s, err := getNginxStatus(p.ngxHealthPort, p.ngxVtsPath)
if err != nil {
glog.Warningf("unexpected error obtaining nginx status info: %v", err)
return
}
ch <- prometheus.MustNewConstMetric(p.data.active,
prometheus.GaugeValue, float64(s.Active))
ch <- prometheus.MustNewConstMetric(p.data.accepted,
prometheus.GaugeValue, float64(s.Accepted))
ch <- prometheus.MustNewConstMetric(p.data.handled,
prometheus.GaugeValue, float64(s.Handled))
ch <- prometheus.MustNewConstMetric(p.data.requests,
prometheus.GaugeValue, float64(s.Requests))
ch <- prometheus.MustNewConstMetric(p.data.reading,
prometheus.GaugeValue, float64(s.Reading))
ch <- prometheus.MustNewConstMetric(p.data.writing,
prometheus.GaugeValue, float64(s.Writing))
ch <- prometheus.MustNewConstMetric(p.data.waiting,
prometheus.GaugeValue, float64(s.Waiting))
}

View file

@ -0,0 +1,173 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"path/filepath"
"github.com/golang/glog"
common "github.com/ncabatoff/process-exporter"
"github.com/ncabatoff/process-exporter/proc"
"github.com/prometheus/client_golang/prometheus"
)
// BinaryNameMatcher ...
type BinaryNameMatcher struct {
Name string
Binary string
}
// MatchAndName returns false if the match failed, otherwise
// true and the resulting name.
func (em BinaryNameMatcher) MatchAndName(nacl common.NameAndCmdline) (bool, string) {
if len(nacl.Cmdline) == 0 {
return false, ""
}
cmd := filepath.Base(em.Binary)
return em.Name == cmd, ""
}
type namedProcessData struct {
numProcs *prometheus.Desc
cpuSecs *prometheus.Desc
readBytes *prometheus.Desc
writeBytes *prometheus.Desc
memResidentbytes *prometheus.Desc
memVirtualbytes *prometheus.Desc
startTime *prometheus.Desc
}
type namedProcess struct {
*proc.Grouper
scrapeChan chan scrapeRequest
fs *proc.FS
data namedProcessData
}
// NewNamedProcess returns a new prometheus collector for the nginx process
func NewNamedProcess(children bool, mn common.MatchNamer) (prometheus.Collector, error) {
fs, err := proc.NewFS("/proc")
if err != nil {
return nil, err
}
p := namedProcess{
scrapeChan: make(chan scrapeRequest),
Grouper: proc.NewGrouper(children, mn),
fs: fs,
}
_, err = p.Update(p.fs.AllProcs())
if err != nil {
return nil, err
}
p.data = namedProcessData{
numProcs: prometheus.NewDesc(
"num_procs",
"number of processes",
nil, nil),
cpuSecs: prometheus.NewDesc(
"cpu_seconds_total",
"Cpu usage in seconds",
nil, nil),
readBytes: prometheus.NewDesc(
"read_bytes_total",
"number of bytes read",
nil, nil),
writeBytes: prometheus.NewDesc(
"write_bytes_total",
"number of bytes written",
nil, nil),
memResidentbytes: prometheus.NewDesc(
"resident_memory_bytes",
"number of bytes of memory in use",
nil, nil),
memVirtualbytes: prometheus.NewDesc(
"virtual_memory_bytes",
"number of bytes of memory in use",
nil, nil),
startTime: prometheus.NewDesc(
"oldest_start_time_seconds",
"start time in seconds since 1970/01/01",
nil, nil),
}
go p.start()
return p, nil
}
// Describe implements prometheus.Collector.
func (p namedProcess) Describe(ch chan<- *prometheus.Desc) {
ch <- p.data.cpuSecs
ch <- p.data.numProcs
ch <- p.data.readBytes
ch <- p.data.writeBytes
ch <- p.data.memResidentbytes
ch <- p.data.memVirtualbytes
ch <- p.data.startTime
}
// Collect implements prometheus.Collector.
func (p namedProcess) Collect(ch chan<- prometheus.Metric) {
req := scrapeRequest{results: ch, done: make(chan struct{})}
p.scrapeChan <- req
<-req.done
}
func (p namedProcess) start() {
for req := range p.scrapeChan {
ch := req.results
p.scrape(ch)
req.done <- struct{}{}
}
}
func (p namedProcess) Stop() {
close(p.scrapeChan)
}
func (p namedProcess) scrape(ch chan<- prometheus.Metric) {
_, err := p.Update(p.fs.AllProcs())
if err != nil {
glog.Warningf("unexpected error obtaining nginx process info: %v", err)
return
}
for _, gcounts := range p.Groups() {
ch <- prometheus.MustNewConstMetric(p.data.numProcs,
prometheus.GaugeValue, float64(gcounts.Procs))
ch <- prometheus.MustNewConstMetric(p.data.memResidentbytes,
prometheus.GaugeValue, float64(gcounts.Memresident))
ch <- prometheus.MustNewConstMetric(p.data.memVirtualbytes,
prometheus.GaugeValue, float64(gcounts.Memvirtual))
ch <- prometheus.MustNewConstMetric(p.data.startTime,
prometheus.GaugeValue, float64(gcounts.OldestStartTime.Unix()))
ch <- prometheus.MustNewConstMetric(p.data.cpuSecs,
prometheus.CounterValue, gcounts.Cpu)
ch <- prometheus.MustNewConstMetric(p.data.readBytes,
prometheus.CounterValue, float64(gcounts.ReadBytes))
ch <- prometheus.MustNewConstMetric(p.data.writeBytes,
prometheus.CounterValue, float64(gcounts.WriteBytes))
}
}

View file

@ -0,0 +1,30 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import "github.com/prometheus/client_golang/prometheus"
// Stopable defines a prometheus collector that can be stopped
type Stopable interface {
prometheus.Collector
Stop()
}
type scrapeRequest struct {
results chan<- prometheus.Metric
done chan struct{}
}

View file

@ -0,0 +1,225 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"github.com/golang/glog"
)
var (
ac = regexp.MustCompile(`Active connections: (\d+)`)
sahr = regexp.MustCompile(`(\d+)\s(\d+)\s(\d+)`)
reading = regexp.MustCompile(`Reading: (\d+)`)
writing = regexp.MustCompile(`Writing: (\d+)`)
waiting = regexp.MustCompile(`Waiting: (\d+)`)
)
type basicStatus struct {
// Active total number of active connections
Active int
// Accepted total number of accepted client connections
Accepted int
// Handled total number of handled connections. Generally, the parameter value is the same as accepts unless some resource limits have been reached (for example, the worker_connections limit).
Handled int
// Requests total number of client requests.
Requests int
// Reading current number of connections where nginx is reading the request header.
Reading int
// Writing current number of connections where nginx is writing the response back to the client.
Writing int
// Waiting current number of idle client connections waiting for a request.
Waiting int
}
// https://github.com/vozlt/nginx-module-vts
type vts struct {
NginxVersion string `json:"nginxVersion"`
LoadMsec int `json:"loadMsec"`
NowMsec int `json:"nowMsec"`
// Total connections and requests(same as stub_status_module in NGINX)
Connections connections `json:"connections"`
// Traffic(in/out) and request and response counts and cache hit ratio per each server zone
ServerZones map[string]serverZone `json:"serverZones"`
// Traffic(in/out) and request and response counts and cache hit ratio per each server zone filtered through
// the vhost_traffic_status_filter_by_set_key directive
FilterZones map[string]map[string]filterZone `json:"filterZones"`
// Traffic(in/out) and request and response counts per server in each upstream group
UpstreamZones map[string][]upstreamZone `json:"upstreamZones"`
}
type serverZone struct {
RequestCounter float64 `json:"requestCounter"`
InBytes float64 `json:"inBytes"`
OutBytes float64 `json:"outBytes"`
Responses response `json:"responses"`
Cache cache `json:"cache"`
}
type filterZone struct {
RequestCounter float64 `json:"requestCounter"`
InBytes float64 `json:"inBytes"`
OutBytes float64 `json:"outBytes"`
Cache cache `json:"cache"`
Responses response `json:"responses"`
}
type upstreamZone struct {
Responses response `json:"responses"`
Server string `json:"server"`
RequestCounter float64 `json:"requestCounter"`
InBytes float64 `json:"inBytes"`
OutBytes float64 `json:"outBytes"`
ResponseMsec float64 `json:"responseMsec"`
Weight float64 `json:"weight"`
MaxFails float64 `json:"maxFails"`
FailTimeout float64 `json:"failTimeout"`
Backup BoolToFloat64 `json:"backup"`
Down BoolToFloat64 `json:"down"`
}
type cache struct {
Miss float64 `json:"miss"`
Bypass float64 `json:"bypass"`
Expired float64 `json:"expired"`
Stale float64 `json:"stale"`
Updating float64 `json:"updating"`
Revalidated float64 `json:"revalidated"`
Hit float64 `json:"hit"`
Scarce float64 `json:"scarce"`
}
type response struct {
OneXx float64 `json:"1xx"`
TwoXx float64 `json:"2xx"`
TheeXx float64 `json:"3xx"`
FourXx float64 `json:"4xx"`
FiveXx float64 `json:"5xx"`
}
type connections struct {
Active float64 `json:"active"`
Reading float64 `json:"reading"`
Writing float64 `json:"writing"`
Waiting float64 `json:"waiting"`
Accepted float64 `json:"accepted"`
Handled float64 `json:"handled"`
Requests float64 `json:"requests"`
}
// BoolToFloat64 ...
type BoolToFloat64 float64
// UnmarshalJSON ...
func (bit BoolToFloat64) UnmarshalJSON(data []byte) error {
asString := string(data)
if asString == "1" || asString == "true" {
bit = 1
} else if asString == "0" || asString == "false" {
bit = 0
} else {
return fmt.Errorf(fmt.Sprintf("Boolean unmarshal error: invalid input %s", asString))
}
return nil
}
func getNginxStatus(ngxHealthPort int, ngxStatusPath string) (*basicStatus, error) {
url := fmt.Sprintf("http://localhost:%v%v", ngxHealthPort, ngxStatusPath)
glog.V(3).Infof("start scrapping url: %v", url)
data, err := httpBody(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx status page: %v", err)
}
return parse(string(data)), nil
}
func httpBody(url string) ([]byte, error) {
resp, err := http.DefaultClient.Get(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx : %v", err)
}
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx (%v)", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, fmt.Errorf("unexpected error scraping nginx (status %v)", resp.StatusCode)
}
return data, nil
}
func getNginxVtsMetrics(ngxHealthPort int, ngxVtsPath string) (*vts, error) {
url := fmt.Sprintf("http://localhost:%v%v", ngxHealthPort, ngxVtsPath)
glog.V(3).Infof("start scrapping url: %v", url)
data, err := httpBody(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx vts (%v)", err)
}
var vts *vts
err = json.Unmarshal(data, &vts)
if err != nil {
return nil, fmt.Errorf("unexpected error json unmarshal (%v)", err)
}
glog.V(3).Infof("scrap returned : %v", vts)
return vts, nil
}
func parse(data string) *basicStatus {
acr := ac.FindStringSubmatch(data)
sahrr := sahr.FindStringSubmatch(data)
readingr := reading.FindStringSubmatch(data)
writingr := writing.FindStringSubmatch(data)
waitingr := waiting.FindStringSubmatch(data)
return &basicStatus{
toInt(acr, 1),
toInt(sahrr, 1),
toInt(sahrr, 2),
toInt(sahrr, 3),
toInt(readingr, 1),
toInt(writingr, 1),
toInt(waitingr, 1),
}
}
func toInt(data []string, pos int) int {
if len(data) == 0 {
return 0
}
if pos > len(data) {
return 0
}
if v, err := strconv.Atoi(data[pos]); err == nil {
return v
}
return 0
}

View file

@ -14,35 +14,37 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package main
package collector
import (
"reflect"
"testing"
"github.com/kylelemons/godebug/pretty"
)
func TestParseStatus(t *testing.T) {
tests := []struct {
in string
out *nginxStatus
out *basicStatus
}{
{`Active connections: 43
server accepts handled requests
7368 7368 10993
Reading: 0 Writing: 5 Waiting: 38`,
&nginxStatus{43, 7368, 7368, 10993, 0, 5, 38},
&basicStatus{43, 7368, 7368, 10993, 0, 5, 38},
},
{`Active connections: 0
server accepts handled requests
1 7 0
Reading: A Writing: B Waiting: 38`,
&nginxStatus{0, 1, 7, 0, 0, 0, 38},
&basicStatus{0, 1, 7, 0, 0, 0, 38},
},
}
for _, test := range tests {
r := parse(test.in)
if !reflect.DeepEqual(r, test.out) {
if diff := pretty.Compare(r, test.out); diff != "" {
t.Logf("%v", diff)
t.Fatalf("expected %v but returned %v", test.out, r)
}
}

View file

@ -0,0 +1,269 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"reflect"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
const system = "nginx"
type (
vtsCollector struct {
scrapeChan chan scrapeRequest
ngxHealthPort int
ngxVtsPath string
data *vtsData
}
vtsData struct {
bytes *prometheus.Desc
cache *prometheus.Desc
connections *prometheus.Desc
response *prometheus.Desc
request *prometheus.Desc
filterZoneBytes *prometheus.Desc
filterZoneResponse *prometheus.Desc
filterZoneCache *prometheus.Desc
upstreamBackup *prometheus.Desc
upstreamBytes *prometheus.Desc
upstreamDown *prometheus.Desc
upstreamFailTimeout *prometheus.Desc
upstreamMaxFails *prometheus.Desc
upstreamResponses *prometheus.Desc
upstreamRequest *prometheus.Desc
upstreamResponseMsec *prometheus.Desc
upstreamWeight *prometheus.Desc
}
)
// NewNGINXVTSCollector returns a new prometheus collector for the VTS module
func NewNGINXVTSCollector(namespace, class string, ngxHealthPort int, ngxVtsPath string) Stopable {
p := vtsCollector{
scrapeChan: make(chan scrapeRequest),
ngxHealthPort: ngxHealthPort,
ngxVtsPath: ngxVtsPath,
}
ns := buildNS(namespace, class)
p.data = &vtsData{
bytes: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "bytes_total"),
"Nginx bytes count",
[]string{"server_zone", "direction"}, nil),
cache: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "cache_total"),
"Nginx cache count",
[]string{"server_zone", "type"}, nil),
connections: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "connections_total"),
"Nginx connections count",
[]string{"type"}, nil),
response: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "responses_total"),
"The number of responses with status codes 1xx, 2xx, 3xx, 4xx, and 5xx.",
[]string{"server_zone", "status_code"}, nil),
request: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "requests_total"),
"The total number of requested client connections.",
[]string{"server_zone"}, nil),
filterZoneBytes: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "filterzone_bytes_total"),
"Nginx bytes count",
[]string{"server_zone", "country", "direction"}, nil),
filterZoneResponse: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "filterzone_responses_total"),
"The number of responses with status codes 1xx, 2xx, 3xx, 4xx, and 5xx.",
[]string{"server_zone", "country", "status_code"}, nil),
filterZoneCache: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "filterzone_cache_total"),
"Nginx cache count",
[]string{"server_zone", "country", "type"}, nil),
upstreamBackup: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "upstream_backup"),
"Current backup setting of the server.",
[]string{"upstream", "server"}, nil),
upstreamBytes: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "upstream_bytes_total"),
"The total number of bytes sent to this server.",
[]string{"upstream", "server", "direction"}, nil),
upstreamDown: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "vts_upstream_down_total"),
"Current down setting of the server.",
[]string{"upstream", "server"}, nil),
upstreamFailTimeout: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "upstream_fail_timeout"),
"Current fail_timeout setting of the server.",
[]string{"upstream", "server"}, nil),
upstreamMaxFails: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "upstream_maxfails"),
"Current max_fails setting of the server.",
[]string{"upstream", "server"}, nil),
upstreamResponses: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "upstream_responses_total"),
"The number of upstream responses with status codes 1xx, 2xx, 3xx, 4xx, and 5xx.",
[]string{"upstream", "server", "status_code"}, nil),
upstreamRequest: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "upstream_requests_total"),
"The total number of client connections forwarded to this server.",
[]string{"upstream", "server"}, nil),
upstreamResponseMsec: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "upstream_response_msecs_avg"),
"The average of only upstream response processing times in milliseconds.",
[]string{"upstream", "server"}, nil),
upstreamWeight: prometheus.NewDesc(
prometheus.BuildFQName(system, ns, "upstream_weight"),
"Current upstream weight setting of the server.",
[]string{"upstream", "server"}, nil),
}
go p.start()
return p
}
// Describe implements prometheus.Collector.
func (p vtsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- p.data.bytes
ch <- p.data.cache
ch <- p.data.connections
ch <- p.data.request
ch <- p.data.response
ch <- p.data.upstreamBackup
ch <- p.data.upstreamBytes
ch <- p.data.upstreamDown
ch <- p.data.upstreamFailTimeout
ch <- p.data.upstreamMaxFails
ch <- p.data.upstreamRequest
ch <- p.data.upstreamResponseMsec
ch <- p.data.upstreamResponses
ch <- p.data.upstreamWeight
ch <- p.data.filterZoneBytes
ch <- p.data.filterZoneCache
ch <- p.data.filterZoneResponse
}
// Collect implements prometheus.Collector.
func (p vtsCollector) Collect(ch chan<- prometheus.Metric) {
req := scrapeRequest{results: ch, done: make(chan struct{})}
p.scrapeChan <- req
<-req.done
}
func (p vtsCollector) start() {
for req := range p.scrapeChan {
ch := req.results
p.scrapeVts(ch)
req.done <- struct{}{}
}
}
func (p vtsCollector) Stop() {
close(p.scrapeChan)
}
// scrapeVts scrape nginx vts metrics
func (p vtsCollector) scrapeVts(ch chan<- prometheus.Metric) {
nginxMetrics, err := getNginxVtsMetrics(p.ngxHealthPort, p.ngxVtsPath)
if err != nil {
glog.Warningf("unexpected error obtaining nginx status info: %v", err)
return
}
reflectMetrics(&nginxMetrics.Connections, p.data.connections, ch)
for name, zones := range nginxMetrics.UpstreamZones {
for pos, value := range zones {
reflectMetrics(&zones[pos].Responses, p.data.upstreamResponses, ch, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamRequest,
prometheus.CounterValue, zones[pos].RequestCounter, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamDown,
prometheus.CounterValue, float64(zones[pos].Down), name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamWeight,
prometheus.CounterValue, zones[pos].Weight, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamResponseMsec,
prometheus.CounterValue, zones[pos].ResponseMsec, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamBackup,
prometheus.CounterValue, float64(zones[pos].Backup), name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamFailTimeout,
prometheus.CounterValue, zones[pos].FailTimeout, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamMaxFails,
prometheus.CounterValue, zones[pos].MaxFails, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamBytes,
prometheus.CounterValue, zones[pos].InBytes, name, value.Server, "in")
ch <- prometheus.MustNewConstMetric(p.data.upstreamBytes,
prometheus.CounterValue, zones[pos].OutBytes, name, value.Server, "out")
}
}
for name, zone := range nginxMetrics.ServerZones {
reflectMetrics(&zone.Responses, p.data.response, ch, name)
reflectMetrics(&zone.Cache, p.data.cache, ch, name)
ch <- prometheus.MustNewConstMetric(p.data.request,
prometheus.CounterValue, zone.RequestCounter, name)
ch <- prometheus.MustNewConstMetric(p.data.bytes,
prometheus.CounterValue, zone.InBytes, name, "in")
ch <- prometheus.MustNewConstMetric(p.data.bytes,
prometheus.CounterValue, zone.OutBytes, name, "out")
}
for serverZone, countries := range nginxMetrics.FilterZones {
for country, zone := range countries {
reflectMetrics(&zone.Responses, p.data.filterZoneResponse, ch, serverZone, country)
reflectMetrics(&zone.Cache, p.data.filterZoneCache, ch, serverZone, country)
ch <- prometheus.MustNewConstMetric(p.data.filterZoneBytes,
prometheus.CounterValue, float64(zone.InBytes), serverZone, country, "in")
ch <- prometheus.MustNewConstMetric(p.data.filterZoneBytes,
prometheus.CounterValue, float64(zone.OutBytes), serverZone, country, "out")
}
}
}
func reflectMetrics(value interface{}, desc *prometheus.Desc, ch chan<- prometheus.Metric, labels ...string) {
val := reflect.ValueOf(value).Elem()
for i := 0; i < val.NumField(); i++ {
tag := val.Type().Field(i).Tag
l := append(labels, tag.Get("json"))
ch <- prometheus.MustNewConstMetric(desc,
prometheus.CounterValue, float64(val.Field(i).Interface().(float64)),
l...)
}
}

View file

@ -31,7 +31,6 @@ import (
"github.com/golang/glog"
"k8s.io/ingress/controllers/nginx/pkg/config"
nginxconfig "k8s.io/ingress/controllers/nginx/pkg/config"
"k8s.io/ingress/core/pkg/ingress"
ing_net "k8s.io/ingress/core/pkg/net"
"k8s.io/ingress/core/pkg/watch"
@ -250,14 +249,12 @@ func buildAuthResponseHeaders(input interface{}) []string {
}
func buildLogFormatUpstream(input interface{}) string {
config, ok := input.(config.Configuration)
cfg, ok := input.(config.Configuration)
if !ok {
glog.Errorf("error an ingress.buildLogFormatUpstream type but %T was returned", input)
}
return nginxconfig.BuildLogFormatUpstream(config.UseProxyProtocol)
return cfg.BuildLogFormatUpstream()
}
// buildProxyPass produces the proxy pass string, if the ingress has redirects

View file

@ -14,7 +14,7 @@ worker_rlimit_nofile {{ .MaxOpenFiles }};
events {
multi_accept on;
worker_connections {{ $cfg.MaxWorkerConnections }};
use epoll;
use epoll;
}
http {
@ -26,7 +26,7 @@ http {
real_ip_header X-Forwarded-For;
set_real_ip_from 0.0.0.0/0;
{{ end }}
real_ip_recursive on;
{{/* databases used to determine the country depending on the client IP address */}}
@ -51,7 +51,7 @@ http {
aio threads;
tcp_nopush on;
tcp_nodelay on;
log_subrequest on;
reset_timedout_connection on;
@ -60,6 +60,9 @@ http {
client_header_buffer_size {{ $cfg.ClientHeaderBufferSize }};
large_client_header_buffers {{ $cfg.LargeClientHeaderBuffers }};
http2_max_field_size {{ $cfg.HTTP2MaxFieldSize }};
http2_max_header_size {{ $cfg.HTTP2MaxHeaderSize }};
types_hash_max_size 2048;
server_names_hash_max_size {{ $cfg.ServerNameHashMaxSize }};
@ -73,13 +76,13 @@ http {
gzip_comp_level 5;
gzip_http_version 1.1;
gzip_min_length 256;
gzip_types {{ $cfg.GzipTypes }};
gzip_types {{ $cfg.GzipTypes }};
gzip_proxied any;
{{ end }}
server_tokens {{ if $cfg.ShowServerTokens }}on{{ else }}off{{ end }};
log_format upstreaminfo {{ buildLogFormatUpstream $cfg }};
log_format upstreaminfo '{{ buildLogFormatUpstream $cfg }}';
{{/* map urls that should not appear in access.log */}}
{{/* http://nginx.org/en/docs/http/ngx_http_log_module.html#access_log */}}
@ -207,10 +210,10 @@ http {
{{ range $index, $server := .Servers }}
server {
server_name {{ $server.Hostname }};
listen [::]:80{{ if $cfg.UseProxyProtocol }} proxy_protocol{{ end }}{{ if eq $index 0 }} ipv6only=off{{end}}{{ if eq $server.Hostname "_"}} default_server reuseport backlog={{ $backlogSize }}{{end}};
listen {{ if not $cfg.DisableIpv6 }}[::]:{{ end }}80{{ if $cfg.UseProxyProtocol }} proxy_protocol{{ end }}{{ if eq $server.Hostname "_"}} default_server {{ if not $cfg.DisableIpv6 }}ipv6only=off{{end}} reuseport backlog={{ $backlogSize }}{{end}};
{{/* Listen on 442 because port 443 is used in the stream section */}}
{{/* This listen cannot contains proxy_protocol directive because port 443 is in charge of decoding the protocol */}}
{{ if not (empty $server.SSLCertificate) }}listen {{ if gt (len $passthroughBackends) 0 }}442{{ else }}[::]:443 {{ end }}{{ if eq $server.Hostname "_"}} default_server reuseport backlog={{ $backlogSize }}{{end}} ssl {{ if $cfg.UseHTTP2 }}http2{{ end }};
{{/* This listen on port 442 cannot contains proxy_protocol directive because port 443 is in charge of decoding the protocol */}}
{{ if not (empty $server.SSLCertificate) }}listen {{ if gt (len $passthroughBackends) 0 }}442{{ else }}{{ if not $cfg.DisableIpv6 }}[::]:{{ end }}443 {{ if $cfg.UseProxyProtocol }} proxy_protocol {{ end }}{{ end }} {{ if eq $server.Hostname "_"}} default_server {{ if not $cfg.DisableIpv6 }}ipv6only=off{{end}} reuseport backlog={{ $backlogSize }}{{end}} ssl {{ if $cfg.UseHTTP2 }}http2{{ end }};
{{/* comment PEM sha is required to detect changes in the generated configuration and force a reload */}}
# PEM sha: {{ $server.SSLPemChecksum }}
ssl_certificate {{ $server.SSLCertificate }};
@ -237,20 +240,24 @@ http {
{{ if not (empty $authPath) }}
location = {{ $authPath }} {
internal;
set $proxy_upstream_name "internal";
{{ if not $location.ExternalAuth.SendBody }}
proxy_pass_request_body off;
proxy_set_header Content-Length "";
{{ end }}
{{ if not (empty $location.ExternalAuth.Method) }}
{{ if not (empty $location.ExternalAuth.Method) }}
proxy_method {{ $location.ExternalAuth.Method }};
proxy_set_header X-Original-URI $request_uri;
proxy_set_header X-Scheme $pass_access_scheme;
{{ end }}
proxy_set_header Host $host;
proxy_set_header Host $host;
proxy_pass_request_headers on;
set $target {{ $location.ExternalAuth.URL }};
proxy_pass $target;
}
{{ end }}
location {{ $path }} {
set $proxy_upstream_name "{{ $location.Backend }}";
@ -260,7 +267,7 @@ http {
allow {{ $ip }};{{ end }}
deny all;
{{ end }}
port_in_redirect {{ if $location.UsePortInRedirects }}on{{ else }}off{{ end }};
{{ if not (empty $authPath) }}
@ -270,10 +277,14 @@ http {
{{ $line }}
{{- end }}
{{ end }}
{{ if (and (not (empty $server.SSLCertificate)) $location.Redirect.SSLRedirect) }}
{{ if not (empty $location.ExternalAuth.SigninURL) }}
error_page 401 = {{ $location.ExternalAuth.SigninURL }};
{{ end }}
{{ if (or $location.Redirect.ForceSSLRedirect (and (not (empty $server.SSLCertificate)) $location.Redirect.SSLRedirect)) }}
# enforce ssl on server side
if ($scheme = http) {
if ($pass_access_scheme = http) {
return 301 https://$host$request_uri;
}
{{ end }}
@ -281,7 +292,7 @@ http {
{{ $limits := buildRateLimit $location }}
{{ range $limit := $limits }}
{{ $limit }}{{ end }}
{{ if $location.BasicDigestAuth.Secured }}
{{ if eq $location.BasicDigestAuth.Type "basic" }}
auth_basic "{{ $location.BasicDigestAuth.Realm }}";
@ -292,7 +303,7 @@ http {
{{ end }}
proxy_set_header Authorization "";
{{ end }}
{{ if $location.EnableCORS }}
{{ template "CORS" }}
{{ end }}
@ -317,6 +328,8 @@ http {
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Forwarded-Port $pass_port;
proxy_set_header X-Forwarded-Proto $pass_access_scheme;
proxy_set_header X-Original-URI $request_uri;
proxy_set_header X-Scheme $pass_access_scheme;
# mitigate HTTPoxy Vulnerability
# https://www.nginx.com/blog/mitigating-the-httpoxy-vulnerability-with-nginx/
@ -334,6 +347,7 @@ http {
proxy_redirect off;
proxy_buffering off;
proxy_buffer_size "{{ $location.Proxy.BufferSize }}";
proxy_buffers 4 "{{ $location.Proxy.BufferSize }}";
proxy_http_version 1.1;
@ -355,7 +369,7 @@ http {
{{ end }}
}
{{ end }}
{{ if eq $server.Hostname "_" }}
# health checks in cloud providers require the use of port 80
location {{ $healthzURI }} {
@ -367,7 +381,7 @@ http {
# with an external software (like sysdig)
location /nginx_status {
allow 127.0.0.1;
allow ::1;
{{ if not $cfg.DisableIpv6 }}allow ::1;{{ end }}
deny all;
access_log off;
@ -377,22 +391,24 @@ http {
{{ template "CUSTOM_ERRORS" $cfg }}
}
{{ end }}
# default server, used for NGINX healthcheck and access to nginx stats
server {
# Use the port 18080 (random value just to avoid known ports) as default port for nginx.
# Changing this value requires a change in:
# https://github.com/kubernetes/contrib/blob/master/ingress/controllers/nginx/nginx/command.go#L104
listen [::]:18080 ipv6only=off default_server reuseport backlog={{ .BacklogSize }};
listen {{ if not $cfg.DisableIpv6 }}[::]:{{ end }}18080 {{ if not $cfg.DisableIpv6 }}ipv6only=off{{end}} default_server reuseport backlog={{ .BacklogSize }};
location {{ $healthzURI }} {
access_log off;
return 200;
}
location /nginx_status {
set $proxy_upstream_name "internal";
{{ if $cfg.EnableVtsStatus }}
vhost_traffic_status_display;
vhost_traffic_status_display_format html;
@ -406,8 +422,10 @@ http {
# using prometheus.
# TODO: enable extraction for vts module.
location /internal_nginx_status {
set $proxy_upstream_name "internal";
allow 127.0.0.1;
allow ::1;
{{ if not $cfg.DisableIpv6 }}allow ::1;{{ end }}
deny all;
access_log off;
@ -445,7 +463,7 @@ stream {
{{ range $i, $passthrough := .PassthroughBackends }}
{{ $passthrough.Hostname }} {{ $passthrough.Backend }};
{{ end }}
# send SSL traffic to this nginx in a different port
# send SSL traffic to this nginx in a different port
default nginx-ssl-backend;
}
@ -467,20 +485,20 @@ stream {
{{ buildSSLPassthroughUpstreams $backends .PassthroughBackends }}
server {
listen [::]:443 ipv6only=off{{ if $cfg.UseProxyProtocol }} proxy_protocol{{ end }};
listen {{ if not $cfg.DisableIpv6 }}[::]:{{ end }}443 {{ if not $cfg.DisableIpv6 }}ipv6only=off{{ end }}{{ if $cfg.UseProxyProtocol }} proxy_protocol{{ end }};
proxy_pass $stream_upstream;
ssl_preread on;
}
{{ end }}
# TCP services
# TCP services
{{ range $i, $tcpServer := .TCPBackends }}
upstream {{ $tcpServer.Backend.Namespace }}-{{ $tcpServer.Backend.Name }}-{{ $tcpServer.Backend.Port }} {
{{ range $j, $endpoint := $tcpServer.Endpoints }}
server {{ $endpoint.Address }}:{{ $endpoint.Port }};
{{ end }}
}
server {
listen {{ $tcpServer.Port }};
proxy_pass {{ $tcpServer.Backend.Namespace }}-{{ $tcpServer.Backend.Name }}-{{ $tcpServer.Backend.Port }};
@ -494,11 +512,11 @@ stream {
server {{ $endpoint.Address }}:{{ $endpoint.Port }};
{{ end }}
}
server {
listen {{ $udpServer.Port }};
proxy_responses 1;
proxy_pass {{ $udpServer.Backend.Namespace }}-{{ $udpServer.Backend.Name }}-{{ $udpServer.Backend.Port }};
proxy_pass {{ $udpServer.Backend.Namespace }}-{{ $udpServer.Backend.Name }}-{{ $udpServer.Backend.Port }};
}
{{ end }}
}
@ -511,7 +529,7 @@ stream {
content_by_lua_block {
openURL(ngx.req.get_headers(0), {{ $errCode }})
}
}
}
{{ end }}
{{ end }}