Merge branch 'master' into xff

This commit is contained in:
Dario Nieuwenhuis 2018-08-16 18:15:14 +02:00 committed by GitHub
commit b5bcb93a4b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
1532 changed files with 65966 additions and 34963 deletions

View file

@ -26,6 +26,8 @@ import (
"github.com/pkg/errors"
)
const nginxPID = "/tmp/nginx.pid"
// Name returns the healthcheck name
func (n NGINXController) Name() string {
return "nginx-ingress-controller"
@ -33,7 +35,7 @@ func (n NGINXController) Name() string {
// Check returns if the nginx healthz endpoint is returning ok (status code 200)
func (n *NGINXController) Check(_ *http.Request) error {
res, err := http.Get(fmt.Sprintf("http://0.0.0.0:%v%v", n.cfg.ListenPorts.Status, ngxHealthPath))
res, err := http.Get(fmt.Sprintf("http://127.0.0.1:%v%v", n.cfg.ListenPorts.Status, ngxHealthPath))
if err != nil {
return err
}
@ -43,7 +45,7 @@ func (n *NGINXController) Check(_ *http.Request) error {
}
if n.cfg.DynamicConfigurationEnabled {
res, err := http.Get(fmt.Sprintf("http://0.0.0.0:%v/is-dynamic-lb-initialized", n.cfg.ListenPorts.Status))
res, err := http.Get(fmt.Sprintf("http://127.0.0.1:%v/is-dynamic-lb-initialized", n.cfg.ListenPorts.Status))
if err != nil {
return err
}
@ -58,13 +60,13 @@ func (n *NGINXController) Check(_ *http.Request) error {
if err != nil {
return errors.Wrap(err, "unexpected error reading /proc directory")
}
f, err := n.fileSystem.ReadFile("/run/nginx.pid")
f, err := n.fileSystem.ReadFile(nginxPID)
if err != nil {
return errors.Wrap(err, "unexpected error reading /run/nginx.pid")
return errors.Wrapf(err, "unexpected error reading %v", nginxPID)
}
pid, err := strconv.Atoi(strings.TrimRight(string(f), "\r\n"))
if err != nil {
return errors.Wrap(err, "unexpected error reading the PID from /run/nginx.pid")
return errors.Wrapf(err, "unexpected error reading the nginx PID from %v", nginxPID)
}
_, err = fs.NewProc(pid)

View file

@ -27,6 +27,7 @@ import (
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/kubernetes/pkg/util/filesystem"
"k8s.io/ingress-nginx/internal/file"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
)
@ -60,8 +61,8 @@ func TestNginxCheck(t *testing.T) {
})
// create pid file
fs.MkdirAll("/run", 0655)
pidFile, err := fs.Create("/run/nginx.pid")
fs.MkdirAll("/tmp", file.ReadWriteByUser)
pidFile, err := fs.Create(nginxPID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View file

@ -77,7 +77,7 @@ const (
sslSessionCacheSize = "10m"
// Default setting for load balancer algorithm
defaultLoadBalancerAlgorithm = "least_conn"
defaultLoadBalancerAlgorithm = ""
// Parameters for a shared memory zone that will keep states for various keys.
// http://nginx.org/en/docs/http/ngx_http_limit_conn_module.html#limit_conn_zone
@ -161,31 +161,6 @@ type Configuration struct {
// By default this is enabled
IgnoreInvalidHeaders bool `json:"ignore-invalid-headers"`
// EnableVtsStatus allows the replacement of the default status page with a third party module named
// nginx-module-vts - https://github.com/vozlt/nginx-module-vts
// By default this is disabled
EnableVtsStatus bool `json:"enable-vts-status,omitempty"`
// Vts config on http level
// Description: Sets parameters for a shared memory zone that will keep states for various keys. The cache is shared between all worker processe
// https://github.com/vozlt/nginx-module-vts#vhost_traffic_status_zone
// Default value is 10m
VtsStatusZoneSize string `json:"vts-status-zone-size,omitempty"`
// Vts config on http level
// Description: Enables the keys by user defined variable. The key is a key string to calculate traffic.
// The name is a group string to calculate traffic. The key and name can contain variables such as $host,
// $server_name. The name's group belongs to filterZones if specified. The key's group belongs to serverZones
// if not specified second argument name. The example with geoip module is as follows:
// https://github.com/vozlt/nginx-module-vts#vhost_traffic_status_filter_by_set_key
// Default value is $geoip_country_code country::*
VtsDefaultFilterKey string `json:"vts-default-filter-key,omitempty"`
// Description: Sets sum key used by vts json output, and the sum label in prometheus output.
// These indicate metrics values for all server zones combined, rather than for a specific one.
// Default value is *
VtsSumKey string `json:"vts-sum-key,omitempty"`
// RetryNonIdempotent since 1.9.13 NGINX will not retry non-idempotent requests (POST, LOCK, PATCH)
// in case of an error. The previous behavior can be restored using the value true
RetryNonIdempotent bool `json:"retry-non-idempotent"`
@ -247,6 +222,12 @@ type Configuration struct {
// http://nginx.org/en/docs/http/ngx_http_log_module.html#log_format
LogFormatStream string `json:"log-format-stream,omitempty"`
// If disabled, a worker process will accept one new connection at a time.
// Otherwise, a worker process will accept all new connections at a time.
// http://nginx.org/en/docs/ngx_core_module.html#multi_accept
// Default: true
EnableMultiAccept bool `json:"enable-multi-accept,omitempty"`
// Maximum number of simultaneous connections that can be opened by each worker process
// http://nginx.org/en/docs/ngx_core_module.html#worker_connections
MaxWorkerConnections int `json:"max-worker-connections,omitempty"`
@ -328,7 +309,7 @@ type Configuration struct {
// Sets the secret key used to encrypt and decrypt TLS session tickets.
// http://nginx.org/en/docs/http/ngx_http_ssl_module.html#ssl_session_tickets
// By default, a randomly generated key is used.
// Example: openssl rand 80 | base64 -w0
// Example: openssl rand 80 | openssl enc -A -base64
SSLSessionTicketKey string `json:"ssl-session-ticket-key,omitempty"`
// Time during which a client may reuse the session parameters stored in a cache.
@ -375,6 +356,9 @@ type Configuration struct {
// Default: true
UseHTTP2 bool `json:"use-http2,omitempty"`
// gzip Compression Level that will be used
GzipLevel int `json:"gzip-level,omitempty"`
// MIME types in addition to "text/html" to compress. The special value “*” matches any MIME type.
// Responses with the “text/html” type are always compressed if UseGzip is enabled
GzipTypes string `json:"gzip-types,omitempty"`
@ -461,6 +445,10 @@ type Configuration struct {
// Default: nginx
ZipkinServiceName string `json:"zipkin-service-name"`
// ZipkinSampleRate specifies sampling rate for traces
// Default: 1.0
ZipkinSampleRate float32 `json:"zipkin-sample-rate"`
// JaegerCollectorHost specifies the host to use when uploading traces
JaegerCollectorHost string `json:"jaeger-collector-host"`
@ -480,6 +468,9 @@ type Configuration struct {
// Default: 1
JaegerSamplerParam string `json:"jaeger-sampler-param"`
// MainSnippet adds custom configuration to the main section of the nginx configuration
MainSnippet string `json:"main-snippet"`
// HTTPSnippet adds custom configuration to the http section of the nginx configuration
HTTPSnippet string `json:"http-snippet"`
@ -497,8 +488,7 @@ type Configuration struct {
// ReusePort instructs NGINX to create an individual listening socket for
// each worker process (using the SO_REUSEPORT socket option), allowing a
// kernel to distribute incoming connections between worker processes
// Default: false
// Reason for the default: https://trac.nginx.org/nginx/ticket/1300
// Default: true
ReusePort bool `json:"reuse-port"`
// HideHeaders sets additional header that will not be passed from the upstream
@ -534,6 +524,9 @@ type Configuration struct {
// http://github.com/influxdata/nginx-influxdb-module/
// By default this is disabled
EnableInfluxDB bool `json:"enable-influxdb"`
// Checksum contains a checksum of the configmap configuration
Checksum string `json:"-"`
}
// NewDefault returns the default nginx configuration
@ -575,6 +568,7 @@ func NewDefault() Configuration {
HSTSMaxAge: hstsMaxAge,
HSTSPreload: false,
IgnoreInvalidHeaders: true,
GzipLevel: 5,
GzipTypes: gzipTypes,
KeepAlive: 75,
KeepAliveRequests: 100,
@ -582,6 +576,7 @@ func NewDefault() Configuration {
LogFormatEscapeJSON: false,
LogFormatStream: logFormatStream,
LogFormatUpstream: logFormatUpstream,
EnableMultiAccept: true,
MaxWorkerConnections: 16384,
MapHashBucketSize: 64,
NginxStatusIpv4Whitelist: defNginxStatusIpv4Whitelist,
@ -592,6 +587,7 @@ func NewDefault() Configuration {
ProxyHeadersHashMaxSize: 512,
ProxyHeadersHashBucketSize: 64,
ProxyStreamResponses: 1,
ReusePort: true,
ShowServerTokens: true,
SSLBufferSize: sslBufferSize,
SSLCiphers: sslCiphers,
@ -607,9 +603,6 @@ func NewDefault() Configuration {
WorkerProcesses: strconv.Itoa(runtime.NumCPU()),
WorkerShutdownTimeout: "10s",
LoadBalanceAlgorithm: defaultLoadBalancerAlgorithm,
VtsStatusZoneSize: "10m",
VtsDefaultFilterKey: "$geoip_country_code country::*",
VtsSumKey: "*",
VariablesHashBucketSize: 128,
VariablesHashMaxSize: 2048,
UseHTTP2: true,
@ -641,6 +634,7 @@ func NewDefault() Configuration {
BindAddressIpv6: defBindAddress,
ZipkinCollectorPort: 9411,
ZipkinServiceName: "nginx",
ZipkinSampleRate: 1.0,
JaegerCollectorPort: 6831,
JaegerServiceName: "nginx",
JaegerSamplerType: "const",

File diff suppressed because it is too large Load diff

View file

@ -17,6 +17,9 @@ limitations under the License.
package controller
import (
"crypto/x509"
"crypto/x509/pkix"
"encoding/asn1"
"testing"
extensions "k8s.io/api/extensions/v1beta1"
@ -25,13 +28,13 @@ import (
)
func TestExtractTLSSecretName(t *testing.T) {
tests := []struct {
testCases := map[string]struct {
host string
ingress *extensions.Ingress
fn func(string) (*ingress.SSLCert, error)
expName string
}{
{
"nil ingress": {
"foo.bar",
nil,
func(string) (*ingress.SSLCert, error) {
@ -39,7 +42,7 @@ func TestExtractTLSSecretName(t *testing.T) {
},
"",
},
{
"empty ingress": {
"foo.bar",
&extensions.Ingress{},
func(string) (*ingress.SSLCert, error) {
@ -47,7 +50,7 @@ func TestExtractTLSSecretName(t *testing.T) {
},
"",
},
{
"ingress tls, nil secret": {
"foo.bar",
&extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
@ -69,7 +72,7 @@ func TestExtractTLSSecretName(t *testing.T) {
},
"",
},
{
"ingress tls, no host, matching cert cn": {
"foo.bar",
&extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
@ -88,12 +91,38 @@ func TestExtractTLSSecretName(t *testing.T) {
},
func(string) (*ingress.SSLCert, error) {
return &ingress.SSLCert{
CN: []string{"foo.bar", "example.com"},
Certificate: fakeX509Cert([]string{"foo.bar", "example.com"}),
}, nil
},
"demo",
},
{
"ingress tls, no host, wildcard cert with matching cn": {
"foo.bar",
&extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: extensions.IngressSpec{
TLS: []extensions.IngressTLS{
{
SecretName: "demo",
},
},
Rules: []extensions.IngressRule{
{
Host: "test.foo.bar",
},
},
},
},
func(string) (*ingress.SSLCert, error) {
return &ingress.SSLCert{
Certificate: fakeX509Cert([]string{"*.foo.bar", "foo.bar"}),
}, nil
},
"demo",
},
"ingress tls, hosts, matching cert cn": {
"foo.bar",
&extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
@ -114,18 +143,29 @@ func TestExtractTLSSecretName(t *testing.T) {
},
},
func(string) (*ingress.SSLCert, error) {
return &ingress.SSLCert{
CN: []string{"foo.bar", "example.com"},
}, nil
return nil, nil
},
"demo",
},
}
for _, testCase := range tests {
name := extractTLSSecretName(testCase.host, testCase.ingress, testCase.fn)
if name != testCase.expName {
t.Errorf("expected %v as the name of the secret but got %v", testCase.expName, name)
}
for title, tc := range testCases {
t.Run(title, func(t *testing.T) {
name := extractTLSSecretName(tc.host, tc.ingress, tc.fn)
if name != tc.expName {
t.Errorf("Expected Secret name %q (got %q)", tc.expName, name)
}
})
}
}
var oidExtensionSubjectAltName = asn1.ObjectIdentifier{2, 5, 29, 17}
func fakeX509Cert(dnsNames []string) *x509.Certificate {
return &x509.Certificate{
DNSNames: dnsNames,
Extensions: []pkix.Extension{
{Id: oidExtensionSubjectAltName},
},
}
}

View file

@ -27,16 +27,12 @@ import (
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations/healthcheck"
"k8s.io/ingress-nginx/internal/k8s"
)
// getEndpoints returns a list of <endpoint ip>:<port> for a given service/target port combination.
func getEndpoints(
s *corev1.Service,
port *corev1.ServicePort,
proto corev1.Protocol,
hz *healthcheck.Config,
getServiceEndpoints func(*corev1.Service) (*corev1.Endpoints, error),
) []ingress.Endpoint {
// getEndpoints returns a list of Endpoint structs for a given service/target port combination.
func getEndpoints(s *corev1.Service, port *corev1.ServicePort, proto corev1.Protocol, hz *healthcheck.Config,
getServiceEndpoints func(string) (*corev1.Endpoints, error)) []ingress.Endpoint {
upsServers := []ingress.Endpoint{}
@ -44,26 +40,26 @@ func getEndpoints(
return upsServers
}
// avoid duplicated upstream servers when the service
// contains multiple port definitions sharing the same
// targetport.
adus := make(map[string]bool)
// using a map avoids duplicated upstream servers when the service
// contains multiple port definitions sharing the same targetport
processedUpstreamServers := make(map[string]struct{})
svcKey := k8s.MetaNamespaceKey(s)
// ExternalName services
if s.Spec.Type == corev1.ServiceTypeExternalName {
glog.V(3).Infof("Ingress using a service %v of type=ExternalName : %v", s.Name)
glog.V(3).Infof("Ingress using Service %q of type ExternalName.", svcKey)
targetPort := port.TargetPort.IntValue()
// check for invalid port value
if targetPort <= 0 {
glog.Errorf("ExternalName service with an invalid port: %v", targetPort)
glog.Errorf("ExternalName Service %q has an invalid port (%v)", svcKey, targetPort)
return upsServers
}
if net.ParseIP(s.Spec.ExternalName) == nil {
_, err := net.LookupHost(s.Spec.ExternalName)
if err != nil {
glog.Errorf("unexpected error resolving host %v: %v", s.Spec.ExternalName, err)
glog.Errorf("Error resolving host %q: %v", s.Spec.ExternalName, err)
return upsServers
}
}
@ -76,10 +72,10 @@ func getEndpoints(
})
}
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, port.String())
ep, err := getServiceEndpoints(s)
glog.V(3).Infof("Getting Endpoints for Service %q and port %v", svcKey, port.String())
ep, err := getServiceEndpoints(svcKey)
if err != nil {
glog.Warningf("unexpected error obtaining service endpoints: %v", err)
glog.Warningf("Error obtaining Endpoints for Service %q: %v", svcKey, err)
return upsServers
}
@ -99,14 +95,13 @@ func getEndpoints(
targetPort = epPort.Port
}
// check for invalid port value
if targetPort <= 0 {
continue
}
for _, epAddress := range ss.Addresses {
ep := fmt.Sprintf("%v:%v", epAddress.IP, targetPort)
if _, exists := adus[ep]; exists {
if _, exists := processedUpstreamServers[ep]; exists {
continue
}
ups := ingress.Endpoint{
@ -117,11 +112,11 @@ func getEndpoints(
Target: epAddress.TargetRef,
}
upsServers = append(upsServers, ups)
adus[ep] = true
processedUpstreamServers[ep] = struct{}{}
}
}
}
glog.V(3).Infof("endpoints found: %v", upsServers)
glog.V(3).Infof("Endpoints found for Service %q: %v", svcKey, upsServers)
return upsServers
}

View file

@ -33,44 +33,44 @@ func TestGetEndpoints(t *testing.T) {
port *corev1.ServicePort
proto corev1.Protocol
hz *healthcheck.Config
fn func(*corev1.Service) (*corev1.Endpoints, error)
fn func(string) (*corev1.Endpoints, error)
result []ingress.Endpoint
}{
{
"no service should return 0 endpoints",
"no service should return 0 endpoint",
nil,
nil,
corev1.ProtocolTCP,
nil,
func(*corev1.Service) (*corev1.Endpoints, error) {
func(string) (*corev1.Endpoints, error) {
return nil, nil
},
[]ingress.Endpoint{},
},
{
"no service port should return 0 endpoints",
"no service port should return 0 endpoint",
&corev1.Service{},
nil,
corev1.ProtocolTCP,
nil,
func(*corev1.Service) (*corev1.Endpoints, error) {
func(string) (*corev1.Endpoints, error) {
return nil, nil
},
[]ingress.Endpoint{},
},
{
"a service without endpoints should return 0 endpoints",
"a service without endpoint should return 0 endpoint",
&corev1.Service{},
&corev1.ServicePort{Name: "default"},
corev1.ProtocolTCP,
nil,
func(*corev1.Service) (*corev1.Endpoints, error) {
func(string) (*corev1.Endpoints, error) {
return &corev1.Endpoints{}, nil
},
[]ingress.Endpoint{},
},
{
"a service type ServiceTypeExternalName service with an invalid port should return 0 endpoints",
"a service type ServiceTypeExternalName service with an invalid port should return 0 endpoint",
&corev1.Service{
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeExternalName,
@ -79,7 +79,7 @@ func TestGetEndpoints(t *testing.T) {
&corev1.ServicePort{Name: "default"},
corev1.ProtocolTCP,
nil,
func(*corev1.Service) (*corev1.Endpoints, error) {
func(string) (*corev1.Endpoints, error) {
return &corev1.Endpoints{}, nil
},
[]ingress.Endpoint{},
@ -107,7 +107,7 @@ func TestGetEndpoints(t *testing.T) {
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
func(string) (*corev1.Endpoints, error) {
return &corev1.Endpoints{}, nil
},
[]ingress.Endpoint{
@ -142,13 +142,13 @@ func TestGetEndpoints(t *testing.T) {
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
func(string) (*corev1.Endpoints, error) {
return &corev1.Endpoints{}, nil
},
[]ingress.Endpoint{},
},
{
"should return no endpoints when there is an error searching for endpoints",
"should return no endpoint when there is an error searching for endpoints",
&corev1.Service{
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
@ -170,13 +170,13 @@ func TestGetEndpoints(t *testing.T) {
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
func(string) (*corev1.Endpoints, error) {
return nil, fmt.Errorf("unexpected error")
},
[]ingress.Endpoint{},
},
{
"should return no endpoints when the protocol does not match",
"should return no endpoint when the protocol does not match",
&corev1.Service{
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
@ -198,7 +198,7 @@ func TestGetEndpoints(t *testing.T) {
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
func(string) (*corev1.Endpoints, error) {
nodeName := "dummy"
return &corev1.Endpoints{
Subsets: []corev1.EndpointSubset{
@ -221,7 +221,7 @@ func TestGetEndpoints(t *testing.T) {
[]ingress.Endpoint{},
},
{
"should return no endpoints when there is no ready Addresses",
"should return no endpoint when there is no ready Addresses",
&corev1.Service{
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
@ -243,7 +243,7 @@ func TestGetEndpoints(t *testing.T) {
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
func(string) (*corev1.Endpoints, error) {
nodeName := "dummy"
return &corev1.Endpoints{
Subsets: []corev1.EndpointSubset{
@ -266,7 +266,7 @@ func TestGetEndpoints(t *testing.T) {
[]ingress.Endpoint{},
},
{
"should return no endpoints when the name of the port name do not match any port in the endpoint Subsets",
"should return no endpoint when the name of the port name do not match any port in the endpoint Subsets",
&corev1.Service{
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
@ -288,7 +288,7 @@ func TestGetEndpoints(t *testing.T) {
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
func(string) (*corev1.Endpoints, error) {
nodeName := "dummy"
return &corev1.Endpoints{
Subsets: []corev1.EndpointSubset{
@ -335,7 +335,7 @@ func TestGetEndpoints(t *testing.T) {
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
func(string) (*corev1.Endpoints, error) {
nodeName := "dummy"
return &corev1.Endpoints{
Subsets: []corev1.EndpointSubset{
@ -389,7 +389,7 @@ func TestGetEndpoints(t *testing.T) {
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
func(string) (*corev1.Endpoints, error) {
nodeName := "dummy"
return &corev1.Endpoints{
Subsets: []corev1.EndpointSubset{
@ -431,7 +431,7 @@ func TestGetEndpoints(t *testing.T) {
t.Run(testCase.name, func(t *testing.T) {
result := getEndpoints(testCase.svc, testCase.port, testCase.proto, testCase.hz, testCase.fn)
if len(testCase.result) != len(result) {
t.Errorf("expected %v Endpoints but got %v", testCase.result, len(result))
t.Errorf("Expected %d Endpoints but got %d", len(testCase.result), len(result))
}
})
}

View file

@ -1,123 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
type (
nginxStatusCollector struct {
scrapeChan chan scrapeRequest
ngxHealthPort int
ngxVtsPath string
data *nginxStatusData
watchNamespace string
ingressClass string
}
nginxStatusData struct {
connectionsTotal *prometheus.Desc
requestsTotal *prometheus.Desc
connections *prometheus.Desc
}
)
// NewNginxStatus returns a new prometheus collector the default nginx status module
func NewNginxStatus(watchNamespace, ingressClass string, ngxHealthPort int, ngxVtsPath string) Stopable {
p := nginxStatusCollector{
scrapeChan: make(chan scrapeRequest),
ngxHealthPort: ngxHealthPort,
ngxVtsPath: ngxVtsPath,
watchNamespace: watchNamespace,
ingressClass: ingressClass,
}
p.data = &nginxStatusData{
connectionsTotal: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "connections_total"),
"total number of connections with state {active, accepted, handled}",
[]string{"ingress_class", "namespace", "state"}, nil),
requestsTotal: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "requests_total"),
"total number of client requests",
[]string{"ingress_class", "namespace"}, nil),
connections: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "connnections"),
"current number of client connections with state {reading, writing, waiting}",
[]string{"ingress_class", "namespace", "state"}, nil),
}
go p.start()
return p
}
// Describe implements prometheus.Collector.
func (p nginxStatusCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- p.data.connectionsTotal
ch <- p.data.requestsTotal
ch <- p.data.connections
}
// Collect implements prometheus.Collector.
func (p nginxStatusCollector) Collect(ch chan<- prometheus.Metric) {
req := scrapeRequest{results: ch, done: make(chan struct{})}
p.scrapeChan <- req
<-req.done
}
func (p nginxStatusCollector) start() {
for req := range p.scrapeChan {
ch := req.results
p.scrape(ch)
req.done <- struct{}{}
}
}
func (p nginxStatusCollector) Stop() {
close(p.scrapeChan)
}
// nginxStatusCollector scrape the nginx status
func (p nginxStatusCollector) scrape(ch chan<- prometheus.Metric) {
s, err := getNginxStatus(p.ngxHealthPort, p.ngxVtsPath)
if err != nil {
glog.Warningf("unexpected error obtaining nginx status info: %v", err)
return
}
ch <- prometheus.MustNewConstMetric(p.data.connectionsTotal,
prometheus.CounterValue, float64(s.Active), p.ingressClass, p.watchNamespace, "active")
ch <- prometheus.MustNewConstMetric(p.data.connectionsTotal,
prometheus.CounterValue, float64(s.Accepted), p.ingressClass, p.watchNamespace, "accepted")
ch <- prometheus.MustNewConstMetric(p.data.connectionsTotal,
prometheus.CounterValue, float64(s.Handled), p.ingressClass, p.watchNamespace, "handled")
ch <- prometheus.MustNewConstMetric(p.data.requestsTotal,
prometheus.CounterValue, float64(s.Requests), p.ingressClass, p.watchNamespace)
ch <- prometheus.MustNewConstMetric(p.data.connections,
prometheus.GaugeValue, float64(s.Reading), p.ingressClass, p.watchNamespace, "reading")
ch <- prometheus.MustNewConstMetric(p.data.connections,
prometheus.GaugeValue, float64(s.Writing), p.ingressClass, p.watchNamespace, "writing")
ch <- prometheus.MustNewConstMetric(p.data.connections,
prometheus.GaugeValue, float64(s.Waiting), p.ingressClass, p.watchNamespace, "waiting")
}

View file

@ -1,174 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"path/filepath"
"github.com/golang/glog"
common "github.com/ncabatoff/process-exporter"
"github.com/ncabatoff/process-exporter/proc"
"github.com/prometheus/client_golang/prometheus"
)
// BinaryNameMatcher ...
type BinaryNameMatcher struct {
Name string
Binary string
}
// MatchAndName returns false if the match failed, otherwise
// true and the resulting name.
func (em BinaryNameMatcher) MatchAndName(nacl common.NameAndCmdline) (bool, string) {
if len(nacl.Cmdline) == 0 {
return false, ""
}
cmd := filepath.Base(em.Binary)
return em.Name == cmd, ""
}
type namedProcessData struct {
numProcs *prometheus.Desc
cpuSecs *prometheus.Desc
readBytes *prometheus.Desc
writeBytes *prometheus.Desc
memResidentbytes *prometheus.Desc
memVirtualbytes *prometheus.Desc
startTime *prometheus.Desc
}
type namedProcess struct {
*proc.Grouper
scrapeChan chan scrapeRequest
fs *proc.FS
data namedProcessData
}
// NewNamedProcess returns a new prometheus collector for the nginx process
func NewNamedProcess(children bool, mn common.MatchNamer) (prometheus.Collector, error) {
fs, err := proc.NewFS("/proc")
if err != nil {
return nil, err
}
p := namedProcess{
scrapeChan: make(chan scrapeRequest),
Grouper: proc.NewGrouper(children, mn),
fs: fs,
}
_, err = p.Update(p.fs.AllProcs())
if err != nil {
return nil, err
}
p.data = namedProcessData{
numProcs: prometheus.NewDesc(
"num_procs",
"number of processes",
nil, nil),
cpuSecs: prometheus.NewDesc(
"cpu_seconds_total",
"Cpu usage in seconds",
nil, nil),
readBytes: prometheus.NewDesc(
"read_bytes_total",
"number of bytes read",
nil, nil),
writeBytes: prometheus.NewDesc(
"write_bytes_total",
"number of bytes written",
nil, nil),
memResidentbytes: prometheus.NewDesc(
"resident_memory_bytes",
"number of bytes of memory in use",
nil, nil),
memVirtualbytes: prometheus.NewDesc(
"virtual_memory_bytes",
"number of bytes of memory in use",
nil, nil),
startTime: prometheus.NewDesc(
"oldest_start_time_seconds",
"start time in seconds since 1970/01/01",
nil, nil),
}
go p.start()
return p, nil
}
// Describe implements prometheus.Collector.
func (p namedProcess) Describe(ch chan<- *prometheus.Desc) {
ch <- p.data.cpuSecs
ch <- p.data.numProcs
ch <- p.data.readBytes
ch <- p.data.writeBytes
ch <- p.data.memResidentbytes
ch <- p.data.memVirtualbytes
ch <- p.data.startTime
}
// Collect implements prometheus.Collector.
func (p namedProcess) Collect(ch chan<- prometheus.Metric) {
req := scrapeRequest{results: ch, done: make(chan struct{})}
p.scrapeChan <- req
<-req.done
}
func (p namedProcess) start() {
for req := range p.scrapeChan {
ch := req.results
p.scrape(ch)
req.done <- struct{}{}
}
}
func (p namedProcess) Stop() {
close(p.scrapeChan)
}
func (p namedProcess) scrape(ch chan<- prometheus.Metric) {
_, err := p.Update(p.fs.AllProcs())
if err != nil {
glog.Warningf("unexpected error obtaining nginx process info: %v", err)
return
}
for _, gcounts := range p.Groups() {
ch <- prometheus.MustNewConstMetric(p.data.numProcs,
prometheus.GaugeValue, float64(gcounts.Procs))
ch <- prometheus.MustNewConstMetric(p.data.memResidentbytes,
prometheus.GaugeValue, float64(gcounts.Memresident))
ch <- prometheus.MustNewConstMetric(p.data.memVirtualbytes,
prometheus.GaugeValue, float64(gcounts.Memvirtual))
ch <- prometheus.MustNewConstMetric(p.data.startTime,
prometheus.GaugeValue, float64(gcounts.OldestStartTime.Unix()))
ch <- prometheus.MustNewConstMetric(p.data.cpuSecs,
prometheus.CounterValue, gcounts.Cpu)
ch <- prometheus.MustNewConstMetric(p.data.readBytes,
prometheus.CounterValue, float64(gcounts.ReadBytes))
ch <- prometheus.MustNewConstMetric(p.data.writeBytes,
prometheus.CounterValue, float64(gcounts.WriteBytes))
}
}

View file

@ -1,30 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import "github.com/prometheus/client_golang/prometheus"
// Stopable defines a prometheus collector that can be stopped
type Stopable interface {
prometheus.Collector
Stop()
}
type scrapeRequest struct {
results chan<- prometheus.Metric
done chan struct{}
}

View file

@ -1,225 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"github.com/golang/glog"
)
var (
ac = regexp.MustCompile(`Active connections: (\d+)`)
sahr = regexp.MustCompile(`(\d+)\s(\d+)\s(\d+)`)
reading = regexp.MustCompile(`Reading: (\d+)`)
writing = regexp.MustCompile(`Writing: (\d+)`)
waiting = regexp.MustCompile(`Waiting: (\d+)`)
)
type basicStatus struct {
// Active total number of active connections
Active int
// Accepted total number of accepted client connections
Accepted int
// Handled total number of handled connections. Generally, the parameter value is the same as accepts unless some resource limits have been reached (for example, the worker_connections limit).
Handled int
// Requests total number of client requests.
Requests int
// Reading current number of connections where nginx is reading the request header.
Reading int
// Writing current number of connections where nginx is writing the response back to the client.
Writing int
// Waiting current number of idle client connections waiting for a request.
Waiting int
}
// https://github.com/vozlt/nginx-module-vts
type vts struct {
NginxVersion string `json:"nginxVersion"`
LoadMsec int `json:"loadMsec"`
NowMsec int `json:"nowMsec"`
// Total connections and requests(same as stub_status_module in NGINX)
Connections connections `json:"connections"`
// Traffic(in/out) and request and response counts and cache hit ratio per each server zone
ServerZones map[string]serverZone `json:"serverZones"`
// Traffic(in/out) and request and response counts and cache hit ratio per each server zone filtered through
// the vhost_traffic_status_filter_by_set_key directive
FilterZones map[string]map[string]filterZone `json:"filterZones"`
// Traffic(in/out) and request and response counts per server in each upstream group
UpstreamZones map[string][]upstreamZone `json:"upstreamZones"`
}
type serverZone struct {
RequestCounter float64 `json:"requestCounter"`
InBytes float64 `json:"inBytes"`
OutBytes float64 `json:"outBytes"`
Responses response `json:"responses"`
Cache cache `json:"cache"`
}
type filterZone struct {
RequestCounter float64 `json:"requestCounter"`
InBytes float64 `json:"inBytes"`
OutBytes float64 `json:"outBytes"`
Cache cache `json:"cache"`
Responses response `json:"responses"`
}
type upstreamZone struct {
Responses response `json:"responses"`
Server string `json:"server"`
RequestCounter float64 `json:"requestCounter"`
InBytes float64 `json:"inBytes"`
OutBytes float64 `json:"outBytes"`
ResponseMsec float64 `json:"responseMsec"`
Weight float64 `json:"weight"`
MaxFails float64 `json:"maxFails"`
FailTimeout float64 `json:"failTimeout"`
Backup BoolToFloat64 `json:"backup"`
Down BoolToFloat64 `json:"down"`
}
type cache struct {
Miss float64 `json:"miss"`
Bypass float64 `json:"bypass"`
Expired float64 `json:"expired"`
Stale float64 `json:"stale"`
Updating float64 `json:"updating"`
Revalidated float64 `json:"revalidated"`
Hit float64 `json:"hit"`
Scarce float64 `json:"scarce"`
}
type response struct {
OneXx float64 `json:"1xx"`
TwoXx float64 `json:"2xx"`
TheeXx float64 `json:"3xx"`
FourXx float64 `json:"4xx"`
FiveXx float64 `json:"5xx"`
}
type connections struct {
Active float64 `json:"active"`
Reading float64 `json:"reading"`
Writing float64 `json:"writing"`
Waiting float64 `json:"waiting"`
Accepted float64 `json:"accepted"`
Handled float64 `json:"handled"`
Requests float64 `json:"requests"`
}
// BoolToFloat64 ...
type BoolToFloat64 float64
// UnmarshalJSON ...
func (bit BoolToFloat64) UnmarshalJSON(data []byte) error {
asString := string(data)
if asString == "1" || asString == "true" {
bit = 1
} else if asString == "0" || asString == "false" {
bit = 0
} else {
return fmt.Errorf(fmt.Sprintf("boolean unmarshal error: invalid input %s", asString))
}
return nil
}
func getNginxStatus(port int, path string) (*basicStatus, error) {
url := fmt.Sprintf("http://0.0.0.0:%v%v", port, path)
glog.V(3).Infof("start scraping url: %v", url)
data, err := httpBody(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx status page: %v", err)
}
return parse(string(data)), nil
}
func httpBody(url string) ([]byte, error) {
resp, err := http.DefaultClient.Get(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx : %v", err)
}
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx (%v)", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, fmt.Errorf("unexpected error scraping nginx (status %v)", resp.StatusCode)
}
return data, nil
}
func getNginxVtsMetrics(port int, path string) (*vts, error) {
url := fmt.Sprintf("http://0.0.0.0:%v%v", port, path)
glog.V(3).Infof("start scraping url: %v", url)
data, err := httpBody(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx vts (%v)", err)
}
var vts *vts
err = json.Unmarshal(data, &vts)
if err != nil {
return nil, fmt.Errorf("unexpected error json unmarshal (%v)", err)
}
glog.V(3).Infof("scrape returned : %v", vts)
return vts, nil
}
func parse(data string) *basicStatus {
acr := ac.FindStringSubmatch(data)
sahrr := sahr.FindStringSubmatch(data)
readingr := reading.FindStringSubmatch(data)
writingr := writing.FindStringSubmatch(data)
waitingr := waiting.FindStringSubmatch(data)
return &basicStatus{
toInt(acr, 1),
toInt(sahrr, 1),
toInt(sahrr, 2),
toInt(sahrr, 3),
toInt(readingr, 1),
toInt(writingr, 1),
toInt(waitingr, 1),
}
}
func toInt(data []string, pos int) int {
if len(data) == 0 {
return 0
}
if pos > len(data) {
return 0
}
if v, err := strconv.Atoi(data[pos]); err == nil {
return v
}
return 0
}

View file

@ -1,72 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"testing"
"github.com/kylelemons/godebug/pretty"
)
func TestParseStatus(t *testing.T) {
tests := []struct {
in string
out *basicStatus
}{
{`Active connections: 43
server accepts handled requests
7368 7368 10993
Reading: 0 Writing: 5 Waiting: 38`,
&basicStatus{43, 7368, 7368, 10993, 0, 5, 38},
},
{`Active connections: 0
server accepts handled requests
1 7 0
Reading: A Writing: B Waiting: 38`,
&basicStatus{0, 1, 7, 0, 0, 0, 38},
},
}
for _, test := range tests {
r := parse(test.in)
if diff := pretty.Compare(r, test.out); diff != "" {
t.Logf("%v", diff)
t.Fatalf("expected %v but returned %v", test.out, r)
}
}
}
func TestToint(t *testing.T) {
tests := []struct {
in []string
pos int
exp int
}{
{[]string{}, 0, 0},
{[]string{}, 1, 0},
{[]string{"A"}, 0, 0},
{[]string{"1"}, 0, 1},
{[]string{"a", "2"}, 1, 2},
}
for _, test := range tests {
v := toInt(test.in, test.pos)
if v != test.exp {
t.Fatalf("expected %v but returned %v", test.exp, v)
}
}
}

View file

@ -1,273 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"reflect"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
const ns = "nginx"
type (
vtsCollector struct {
scrapeChan chan scrapeRequest
port int
path string
data *vtsData
watchNamespace string
ingressClass string
}
vtsData struct {
bytes *prometheus.Desc
cache *prometheus.Desc
connections *prometheus.Desc
responses *prometheus.Desc
requests *prometheus.Desc
filterZoneBytes *prometheus.Desc
filterZoneResponses *prometheus.Desc
filterZoneCache *prometheus.Desc
upstreamBackup *prometheus.Desc
upstreamBytes *prometheus.Desc
upstreamDown *prometheus.Desc
upstreamFailTimeout *prometheus.Desc
upstreamMaxFails *prometheus.Desc
upstreamResponses *prometheus.Desc
upstreamRequests *prometheus.Desc
upstreamResponseMsec *prometheus.Desc
upstreamWeight *prometheus.Desc
}
)
// NewNGINXVTSCollector returns a new prometheus collector for the VTS module
func NewNGINXVTSCollector(watchNamespace, ingressClass string, port int, path string) Stopable {
p := vtsCollector{
scrapeChan: make(chan scrapeRequest),
port: port,
path: path,
watchNamespace: watchNamespace,
ingressClass: ingressClass,
}
p.data = &vtsData{
bytes: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "bytes_total"),
"Nginx bytes count",
[]string{"ingress_class", "namespace", "server_zone", "direction"}, nil),
cache: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "cache_total"),
"Nginx cache count",
[]string{"ingress_class", "namespace", "server_zone", "type"}, nil),
connections: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "connections_total"),
"Nginx connections count",
[]string{"ingress_class", "namespace", "type"}, nil),
responses: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "responses_total"),
"The number of responses with status codes 1xx, 2xx, 3xx, 4xx, and 5xx.",
[]string{"ingress_class", "namespace", "server_zone", "status_code"}, nil),
requests: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "requests_total"),
"The total number of requested client connections.",
[]string{"ingress_class", "namespace", "server_zone"}, nil),
filterZoneBytes: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "filterzone_bytes_total"),
"Nginx bytes count",
[]string{"ingress_class", "namespace", "server_zone", "key", "direction"}, nil),
filterZoneResponses: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "filterzone_responses_total"),
"The number of responses with status codes 1xx, 2xx, 3xx, 4xx, and 5xx.",
[]string{"ingress_class", "namespace", "server_zone", "key", "status_code"}, nil),
filterZoneCache: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "filterzone_cache_total"),
"Nginx cache count",
[]string{"ingress_class", "namespace", "server_zone", "key", "type"}, nil),
upstreamBackup: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_backup"),
"Current backup setting of the server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamBytes: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_bytes_total"),
"The total number of bytes sent to this server.",
[]string{"ingress_class", "namespace", "upstream", "server", "direction"}, nil),
upstreamDown: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "vts_upstream_down_total"),
"Current down setting of the server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamFailTimeout: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_fail_timeout"),
"Current fail_timeout setting of the server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamMaxFails: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_maxfails"),
"Current max_fails setting of the server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamResponses: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_responses_total"),
"The number of upstream responses with status codes 1xx, 2xx, 3xx, 4xx, and 5xx.",
[]string{"ingress_class", "namespace", "upstream", "server", "status_code"}, nil),
upstreamRequests: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_requests_total"),
"The total number of client connections forwarded to this server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamResponseMsec: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_response_msecs_avg"),
"The average of only upstream response processing times in milliseconds.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamWeight: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_weight"),
"Current upstream weight setting of the server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
}
go p.start()
return p
}
// Describe implements prometheus.Collector.
func (p vtsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- p.data.bytes
ch <- p.data.cache
ch <- p.data.connections
ch <- p.data.requests
ch <- p.data.responses
ch <- p.data.upstreamBackup
ch <- p.data.upstreamBytes
ch <- p.data.upstreamDown
ch <- p.data.upstreamFailTimeout
ch <- p.data.upstreamMaxFails
ch <- p.data.upstreamRequests
ch <- p.data.upstreamResponseMsec
ch <- p.data.upstreamResponses
ch <- p.data.upstreamWeight
ch <- p.data.filterZoneBytes
ch <- p.data.filterZoneCache
ch <- p.data.filterZoneResponses
}
// Collect implements prometheus.Collector.
func (p vtsCollector) Collect(ch chan<- prometheus.Metric) {
req := scrapeRequest{results: ch, done: make(chan struct{})}
p.scrapeChan <- req
<-req.done
}
func (p vtsCollector) start() {
for req := range p.scrapeChan {
ch := req.results
p.scrapeVts(ch)
req.done <- struct{}{}
}
}
func (p vtsCollector) Stop() {
close(p.scrapeChan)
}
// scrapeVts scrape nginx vts metrics
func (p vtsCollector) scrapeVts(ch chan<- prometheus.Metric) {
nginxMetrics, err := getNginxVtsMetrics(p.port, p.path)
if err != nil {
glog.Warningf("unexpected error obtaining nginx status info: %v", err)
return
}
reflectMetrics(&nginxMetrics.Connections, p.data.connections, ch, p.ingressClass, p.watchNamespace)
for name, zones := range nginxMetrics.UpstreamZones {
for pos, value := range zones {
reflectMetrics(&zones[pos].Responses, p.data.upstreamResponses, ch, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamRequests,
prometheus.CounterValue, zones[pos].RequestCounter, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamDown,
prometheus.CounterValue, float64(zones[pos].Down), p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamWeight,
prometheus.CounterValue, zones[pos].Weight, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamResponseMsec,
prometheus.CounterValue, zones[pos].ResponseMsec, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamBackup,
prometheus.CounterValue, float64(zones[pos].Backup), p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamFailTimeout,
prometheus.CounterValue, zones[pos].FailTimeout, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamMaxFails,
prometheus.CounterValue, zones[pos].MaxFails, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamBytes,
prometheus.CounterValue, zones[pos].InBytes, p.ingressClass, p.watchNamespace, name, value.Server, "in")
ch <- prometheus.MustNewConstMetric(p.data.upstreamBytes,
prometheus.CounterValue, zones[pos].OutBytes, p.ingressClass, p.watchNamespace, name, value.Server, "out")
}
}
for name, zone := range nginxMetrics.ServerZones {
reflectMetrics(&zone.Responses, p.data.responses, ch, p.ingressClass, p.watchNamespace, name)
reflectMetrics(&zone.Cache, p.data.cache, ch, p.ingressClass, p.watchNamespace, name)
ch <- prometheus.MustNewConstMetric(p.data.requests,
prometheus.CounterValue, zone.RequestCounter, p.ingressClass, p.watchNamespace, name)
ch <- prometheus.MustNewConstMetric(p.data.bytes,
prometheus.CounterValue, zone.InBytes, p.ingressClass, p.watchNamespace, name, "in")
ch <- prometheus.MustNewConstMetric(p.data.bytes,
prometheus.CounterValue, zone.OutBytes, p.ingressClass, p.watchNamespace, name, "out")
}
for serverZone, keys := range nginxMetrics.FilterZones {
for name, zone := range keys {
reflectMetrics(&zone.Responses, p.data.filterZoneResponses, ch, p.ingressClass, p.watchNamespace, serverZone, name)
reflectMetrics(&zone.Cache, p.data.filterZoneCache, ch, p.ingressClass, p.watchNamespace, serverZone, name)
ch <- prometheus.MustNewConstMetric(p.data.filterZoneBytes,
prometheus.CounterValue, zone.InBytes, p.ingressClass, p.watchNamespace, serverZone, name, "in")
ch <- prometheus.MustNewConstMetric(p.data.filterZoneBytes,
prometheus.CounterValue, zone.OutBytes, p.ingressClass, p.watchNamespace, serverZone, name, "out")
}
}
}
func reflectMetrics(value interface{}, desc *prometheus.Desc, ch chan<- prometheus.Metric, labels ...string) {
val := reflect.ValueOf(value).Elem()
for i := 0; i < val.NumField(); i++ {
tag := val.Type().Field(i).Tag
l := append(labels, tag.Get("json"))
ch <- prometheus.MustNewConstMetric(desc,
prometheus.CounterValue, val.Field(i).Interface().(float64),
l...)
}
}

View file

@ -1,120 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/ingress-nginx/internal/ingress"
)
const (
ns = "ingress_controller"
operation = "count"
reloadLabel = "reloads"
sslLabelExpire = "ssl_expire_time_seconds"
sslLabelHost = "host"
)
func init() {
prometheus.MustRegister(reloadOperation)
prometheus.MustRegister(reloadOperationErrors)
prometheus.MustRegister(sslExpireTime)
prometheus.MustRegister(configSuccess)
prometheus.MustRegister(configSuccessTime)
}
var (
configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "config_last_reload_successfull",
Help: `Whether the last configuration reload attemp was successful.
Prometheus alert example:
alert: IngressControllerFailedReload
expr: ingress_controller_config_last_reload_successfull == 0
for: 10m`,
})
configSuccessTime = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "config_last_reload_successfull_timestamp_seconds",
Help: "Timestamp of the last successful configuration reload.",
})
// TODO depreciate this metrics in favor of ingress_controller_config_last_reload_successfull_timestamp_seconds
reloadOperation = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: ns,
Name: "success",
Help: `DEPRECATED: use ingress_controller_config_last_reload_successfull_timestamp_seconds or ingress_controller_config_last_reload_successfull instead.
Cumulative number of Ingress controller reload operations`,
},
[]string{operation},
)
// TODO depreciate this metrics in favor of ingress_controller_config_last_reload_successfull_timestamp_seconds
reloadOperationErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: ns,
Name: "errors",
Help: `DEPRECATED: use ingress_controller_config_last_reload_successfull_timestamp_seconds or ingress_controller_config_last_reload_successfull instead.
Cumulative number of Ingress controller errors during reload operations`,
},
[]string{operation},
)
sslExpireTime = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: ns,
Name: sslLabelExpire,
Help: "Number of seconds since 1970 to the SSL Certificate expire. An example to check if this " +
"certificate will expire in 10 days is: \"ingress_controller_ssl_expire_time_seconds < (time() + (10 * 24 * 3600))\"",
},
[]string{sslLabelHost},
)
)
// IncReloadCount increment the reload counter
func IncReloadCount() {
reloadOperation.WithLabelValues(reloadLabel).Inc()
}
// IncReloadErrorCount increment the reload error counter
func IncReloadErrorCount() {
reloadOperationErrors.WithLabelValues(reloadLabel).Inc()
}
// ConfigSuccess set a boolean flag according to the output of the controller configuration reload
func ConfigSuccess(success bool) {
if success {
ConfigSuccessTime()
configSuccess.Set(1)
} else {
configSuccess.Set(0)
}
}
// ConfigSuccessTime set the current timestamp when the controller is successfully reloaded
func ConfigSuccessTime() {
configSuccessTime.Set(float64(time.Now().Unix()))
}
func setSSLExpireTime(servers []*ingress.Server) {
for _, s := range servers {
if s.Hostname != defServerName {
sslExpireTime.WithLabelValues(s.Hostname).Set(float64(s.SSLExpireTime.Unix()))
}
}
}

View file

@ -31,6 +31,7 @@ import (
"strings"
"sync"
"syscall"
"text/template"
"time"
"github.com/golang/glog"
@ -38,7 +39,6 @@ import (
proxyproto "github.com/armon/go-proxyproto"
"github.com/eapache/channels"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
@ -53,6 +53,7 @@ import (
"k8s.io/ingress-nginx/internal/ingress/controller/process"
"k8s.io/ingress-nginx/internal/ingress/controller/store"
ngx_template "k8s.io/ingress-nginx/internal/ingress/controller/template"
"k8s.io/ingress-nginx/internal/ingress/metric"
"k8s.io/ingress-nginx/internal/ingress/status"
ing_net "k8s.io/ingress-nginx/internal/net"
"k8s.io/ingress-nginx/internal/net/dns"
@ -61,30 +62,16 @@ import (
"k8s.io/ingress-nginx/internal/watch"
)
type statusModule string
const (
ngxHealthPath = "/healthz"
defaultStatusModule statusModule = "default"
vtsStatusModule statusModule = "vts"
)
var (
tmplPath = "/etc/nginx/template/nginx.tmpl"
cfgPath = "/etc/nginx/nginx.conf"
nginxBinary = "/usr/sbin/nginx"
tmplPath = "/etc/nginx/template/nginx.tmpl"
)
// NewNGINXController creates a new NGINX Ingress controller.
// If the environment variable NGINX_BINARY exists it will be used
// as source for nginx commands
func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXController {
ngx := os.Getenv("NGINX_BINARY")
if ngx == "" {
ngx = nginxBinary
}
func NewNGINXController(config *Configuration, mc metric.Collector, fs file.Filesystem) *NGINXController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
@ -93,12 +80,10 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
h, err := dns.GetSystemNameServers()
if err != nil {
glog.Warningf("unexpected error reading system nameservers: %v", err)
glog.Warningf("Error reading system nameservers: %v", err)
}
n := &NGINXController{
binary: ngx,
isIPV6Enabled: ing_net.IsIPv6Enabled(),
resolver: h,
@ -116,10 +101,11 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
fileSystem: fs,
// create an empty configuration.
runningConfig: &ingress.Configuration{},
runningConfig: new(ingress.Configuration),
Proxy: &TCPProxy{},
metricCollector: mc,
}
n.store = store.New(
@ -134,8 +120,6 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
fs,
n.updateCh)
n.stats = newStatsCollector(config.Namespace, class.IngressClass, n.binary, n.cfg.ListenPorts.Status)
n.syncQueue = task.NewTaskQueue(n.syncIngress)
n.annotations = annotations.NewAnnotationExtractor(n.store)
@ -153,7 +137,7 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
UseNodeInternalIP: config.UseNodeInternalIP,
})
} else {
glog.Warning("Update of ingress status is disabled (flag --update-status=false was specified)")
glog.Warning("Update of Ingress status is disabled (flag --update-status)")
}
onTemplateChange := func() {
@ -162,68 +146,66 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
// this error is different from the rest because it must be clear why nginx is not working
glog.Errorf(`
-------------------------------------------------------------------------------
Error loading new template : %v
Error loading new template: %v
-------------------------------------------------------------------------------
`, err)
return
}
n.t = template
glog.Info("new NGINX template loaded")
n.SetForceReload(true)
glog.Info("New NGINX configuration template loaded.")
n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
}
ngxTpl, err := ngx_template.NewTemplate(tmplPath, fs)
if err != nil {
glog.Fatalf("invalid NGINX template: %v", err)
glog.Fatalf("Invalid NGINX configuration template: %v", err)
}
n.t = ngxTpl
// TODO: refactor
if _, ok := fs.(filesystem.DefaultFs); !ok {
watch.NewDummyFileWatcher(tmplPath, onTemplateChange)
} else {
// do not setup watchers on tests
return n
}
_, err = watch.NewFileWatcher(tmplPath, onTemplateChange)
_, err = watch.NewFileWatcher(tmplPath, onTemplateChange)
if err != nil {
glog.Fatalf("Error creating file watcher for %v: %v", tmplPath, err)
}
filesToWatch := []string{}
err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
if err != nil {
glog.Fatalf("unexpected error creating file watcher: %v", err)
return err
}
filesToWatch := []string{}
err := filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
filesToWatch = append(filesToWatch, path)
if info.IsDir() {
return nil
}
filesToWatch = append(filesToWatch, path)
return nil
})
if err != nil {
glog.Fatalf("Error creating file watchers: %v", err)
}
for _, f := range filesToWatch {
_, err = watch.NewFileWatcher(f, func() {
glog.Info("File %v changed. Reloading NGINX", f)
n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
})
if err != nil {
glog.Fatalf("unexpected error creating file watcher: %v", err)
glog.Fatalf("Error creating file watcher for %v: %v", f, err)
}
for _, f := range filesToWatch {
_, err = watch.NewFileWatcher(f, func() {
glog.Info("file %v changed. Reloading NGINX", f)
n.SetForceReload(true)
})
if err != nil {
glog.Fatalf("unexpected error creating file watcher: %v", err)
}
}
}
return n
}
// NGINXController ...
// NGINXController describes a NGINX Ingress controller.
type NGINXController struct {
cfg *Configuration
@ -237,31 +219,24 @@ type NGINXController struct {
syncRateLimiter flowcontrol.RateLimiter
// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
// stopLock is used to enforce that only a single call to Stop send at
// a given time. We allow stopping through an HTTP endpoint and
// allowing concurrent stoppers leads to stack traces.
stopLock *sync.Mutex
stopCh chan struct{}
updateCh *channels.RingChannel
// ngxErrCh channel used to detect errors with the nginx processes
// ngxErrCh is used to detect errors with the NGINX processes
ngxErrCh chan error
// runningConfig contains the running configuration in the Backend
runningConfig *ingress.Configuration
forceReload int32
t *ngx_template.Template
binary string
resolver []net.IP
stats *statsCollector
statusModule statusModule
// returns true if IPV6 is enabled in the pod
isIPV6Enabled bool
isShuttingDown bool
@ -271,11 +246,13 @@ type NGINXController struct {
store store.Storer
fileSystem filesystem.Filesystem
metricCollector metric.Collector
}
// Start start a new NGINX master process running in foreground.
// Start starts a new NGINX master process running in the foreground.
func (n *NGINXController) Start() {
glog.Infof("starting Ingress controller")
glog.Infof("Starting NGINX Ingress controller")
n.store.Run(n.stopCh)
@ -283,9 +260,9 @@ func (n *NGINXController) Start() {
go n.syncStatus.Run()
}
cmd := exec.Command(n.binary, "-c", cfgPath)
cmd := nginxExecCommand()
// put nginx in another process group to prevent it
// put NGINX in another process group to prevent it
// to receive signals meant for the controller
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
@ -296,12 +273,12 @@ func (n *NGINXController) Start() {
n.setupSSLProxy()
}
glog.Info("starting NGINX process...")
glog.Info("Starting NGINX process")
n.start(cmd)
go n.syncQueue.Run(time.Second, n.stopCh)
// force initial sync
n.syncQueue.Enqueue(&extensions.Ingress{})
n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))
for {
select {
@ -320,7 +297,7 @@ func (n *NGINXController) Start() {
// release command resources
cmd.Process.Release()
// start a new nginx master process if the controller is not being stopped
cmd = exec.Command(n.binary, "-c", cfgPath)
cmd = nginxExecCommand()
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pgid: 0,
@ -334,12 +311,14 @@ func (n *NGINXController) Start() {
if evt, ok := event.(store.Event); ok {
glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
if evt.Type == store.ConfigurationEvent {
n.SetForceReload(true)
// TODO: is this necessary? Consider removing this special case
n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
continue
}
n.syncQueue.Enqueue(evt.Obj)
n.syncQueue.EnqueueSkippableTask(evt.Obj)
} else {
glog.Warningf("unexpected event type received %T", event)
glog.Warningf("Unexpected event type received %T", event)
}
case <-n.stopCh:
break
@ -354,21 +333,20 @@ func (n *NGINXController) Stop() error {
n.stopLock.Lock()
defer n.stopLock.Unlock()
// Only try draining the workqueue if we haven't already.
if n.syncQueue.IsShuttingDown() {
return fmt.Errorf("shutdown already in progress")
}
glog.Infof("shutting down controller queues")
glog.Infof("Shutting down controller queues")
close(n.stopCh)
go n.syncQueue.Shutdown()
if n.syncStatus != nil {
n.syncStatus.Shutdown()
}
// Send stop signal to Nginx
glog.Info("stopping NGINX process...")
cmd := exec.Command(n.binary, "-c", cfgPath, "-s", "quit")
// send stop signal to NGINX
glog.Info("Stopping NGINX process")
cmd := nginxExecCommand("-s", "quit")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
@ -376,7 +354,7 @@ func (n *NGINXController) Stop() error {
return err
}
// Wait for the Nginx process disappear
// wait for the NGINX process to terminate
timer := time.NewTicker(time.Second * 1)
for range timer.C {
if !process.IsNginxRunning() {
@ -393,7 +371,7 @@ func (n *NGINXController) start(cmd *exec.Cmd) {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
glog.Fatalf("nginx error: %v", err)
glog.Fatalf("NGINX error: %v", err)
n.ngxErrCh <- err
return
}
@ -416,18 +394,18 @@ func (n NGINXController) DefaultEndpoint() ingress.Endpoint {
// running the command "nginx -t" using a temporal file.
func (n NGINXController) testTemplate(cfg []byte) error {
if len(cfg) == 0 {
return fmt.Errorf("invalid nginx configuration (empty)")
return fmt.Errorf("Invalid NGINX configuration (empty)")
}
tmpfile, err := ioutil.TempFile("", "nginx-cfg")
if err != nil {
return err
}
defer tmpfile.Close()
err = ioutil.WriteFile(tmpfile.Name(), cfg, 0644)
err = ioutil.WriteFile(tmpfile.Name(), cfg, file.ReadWriteByUser)
if err != nil {
return err
}
out, err := exec.Command(n.binary, "-t", "-c", tmpfile.Name()).CombinedOutput()
out, err := nginxTestCommand(tmpfile.Name()).CombinedOutput()
if err != nil {
// this error is different from the rest because it must be clear why nginx is not working
oe := fmt.Sprintf(`
@ -443,14 +421,10 @@ Error: %v
return nil
}
// OnUpdate is called periodically by syncQueue to keep the configuration in sync.
//
// 1. converts configmap configuration to custom configuration object
// 2. write the custom template (the complexity depends on the implementation)
// 3. write the configuration file
//
// returning nil implies the backend will be reloaded.
// if an error is returned means requeue the update
// OnUpdate is called by the synchronization loop whenever configuration
// changes were detected. The received backend Configuration is merged with the
// configuration ConfigMap before generating the final configuration file.
// Returns nil in case the backend was successfully reloaded.
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
cfg := n.store.GetBackendConfiguration()
cfg.Resolver = n.resolver
@ -460,7 +434,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
for _, pb := range ingressCfg.PassthroughBackends {
svc := pb.Service
if svc == nil {
glog.Warningf("missing service for PassthroughBackends %v", pb.Backend)
glog.Warningf("Missing Service for SSL Passthrough backend %q", pb.Backend)
continue
}
port, err := strconv.Atoi(pb.Port.String())
@ -480,7 +454,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
}
}
//TODO: Allow PassthroughBackends to specify they support proxy-protocol
// TODO: Allow PassthroughBackends to specify they support proxy-protocol
servers = append(servers, &TCPServer{
Hostname: pb.Hostname,
IP: svc.Spec.ClusterIP,
@ -492,17 +466,10 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
n.Proxy.ServerList = servers
}
// we need to check if the status module configuration changed
if cfg.EnableVtsStatus {
n.setupMonitor(vtsStatusModule)
} else {
n.setupMonitor(defaultStatusModule)
}
// NGINX cannot resize the hash tables used to store server names.
// For this reason we check if the defined size defined is correct
// for the FQDN defined in the ingress rules adjusting the value
// if is required.
// NGINX cannot resize the hash tables used to store server names. For
// this reason we check if the current size is correct for the host
// names defined in the Ingress rules and adjust the value if
// necessary.
// https://trac.nginx.org/nginx/ticket/352
// https://trac.nginx.org/nginx/ticket/631
var longestName int
@ -520,7 +487,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
} else {
n = fmt.Sprintf("www.%v", srv.Hostname)
}
glog.V(3).Infof("creating redirect from %v to %v", srv.Hostname, n)
glog.V(3).Infof("Creating redirect from %q to %q", srv.Hostname, n)
if _, ok := redirectServers[n]; !ok {
found := false
for _, esrv := range ingressCfg.Servers {
@ -537,24 +504,24 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
}
if cfg.ServerNameHashBucketSize == 0 {
nameHashBucketSize := nginxHashBucketSize(longestName)
glog.V(3).Infof("adjusting ServerNameHashBucketSize variable to %v", nameHashBucketSize)
glog.V(3).Infof("Adjusting ServerNameHashBucketSize variable to %q", nameHashBucketSize)
cfg.ServerNameHashBucketSize = nameHashBucketSize
}
serverNameHashMaxSize := nextPowerOf2(serverNameBytes)
if cfg.ServerNameHashMaxSize < serverNameHashMaxSize {
glog.V(3).Infof("adjusting ServerNameHashMaxSize variable to %v", serverNameHashMaxSize)
glog.V(3).Infof("Adjusting ServerNameHashMaxSize variable to %q", serverNameHashMaxSize)
cfg.ServerNameHashMaxSize = serverNameHashMaxSize
}
// the limit of open files is per worker process
// and we leave some room to avoid consuming all the FDs available
wp, err := strconv.Atoi(cfg.WorkerProcesses)
glog.V(3).Infof("number of worker processes: %v", wp)
glog.V(3).Infof("Number of worker processes: %d", wp)
if err != nil {
wp = 1
}
maxOpenFiles := (sysctlFSFileMax() / wp) - 1024
glog.V(2).Infof("maximum number of open file descriptors : %v", maxOpenFiles)
glog.V(2).Infof("Maximum number of open file descriptors: %d", maxOpenFiles)
if maxOpenFiles < 1024 {
// this means the value of RLIMIT_NOFILE is too low.
maxOpenFiles = 1024
@ -564,7 +531,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
if cfg.ProxySetHeaders != "" {
cmap, err := n.store.GetConfigMap(cfg.ProxySetHeaders)
if err != nil {
glog.Warningf("unexpected error reading configmap %v: %v", cfg.ProxySetHeaders, err)
glog.Warningf("Error reading ConfigMap %q from local store: %v", cfg.ProxySetHeaders, err)
}
setHeaders = cmap.Data
@ -574,7 +541,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
if cfg.AddHeaders != "" {
cmap, err := n.store.GetConfigMap(cfg.AddHeaders)
if err != nil {
glog.Warningf("unexpected error reading configmap %v: %v", cfg.AddHeaders, err)
glog.Warningf("Error reading ConfigMap %q from local store: %v", cfg.AddHeaders, err)
}
addHeaders = cmap.Data
@ -586,7 +553,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
secret, err := n.store.GetSecret(secretName)
if err != nil {
glog.Warningf("unexpected error reading secret %v: %v", secretName, err)
glog.Warningf("Error reading Secret %q from local store: %v", secretName, err)
}
nsSecName := strings.Replace(secretName, "/", "-", -1)
@ -595,7 +562,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
if ok {
pemFileName, err := ssl.AddOrUpdateDHParam(nsSecName, dh, n.fileSystem)
if err != nil {
glog.Warningf("unexpected error adding or updating dhparam %v file: %v", nsSecName, err)
glog.Warningf("Error adding or updating dhparam file %v: %v", nsSecName, err)
} else {
sslDHParam = pemFileName
}
@ -628,12 +595,20 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
DisableLua: n.cfg.DisableLua,
}
content, err := n.t.Write(tc)
tc.Cfg.Checksum = ingressCfg.ConfigurationChecksum
content, err := n.t.Write(tc)
if err != nil {
return err
}
if cfg.EnableOpentracing {
err := createOpentracingCfg(cfg)
if err != nil {
return err
}
}
err = n.testTemplate(content)
if err != nil {
return err
@ -647,31 +622,28 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
return err
}
defer tmpfile.Close()
err = ioutil.WriteFile(tmpfile.Name(), content, 0644)
err = ioutil.WriteFile(tmpfile.Name(), content, file.ReadWriteByUser)
if err != nil {
return err
}
// executing diff can return exit code != 0
// TODO: executing diff can return exit code != 0
diffOutput, _ := exec.Command("diff", "-u", cfgPath, tmpfile.Name()).CombinedOutput()
glog.Infof("NGINX configuration diff\n")
glog.Infof("%v\n", string(diffOutput))
glog.Infof("NGINX configuration diff:\n%v", string(diffOutput))
// Do not use defer to remove the temporal file.
// This is helpful when there is an error in the
// temporal configuration (we can manually inspect the file).
// Only remove the file when no error occurred.
// we do not defer the deletion of temp files in order
// to keep them around for inspection in case of error
os.Remove(tmpfile.Name())
}
}
err = ioutil.WriteFile(cfgPath, content, 0644)
err = ioutil.WriteFile(cfgPath, content, file.ReadWriteByUser)
if err != nil {
return err
}
o, err := exec.Command(n.binary, "-s", "reload", "-c", cfgPath).CombinedOutput()
o, err := nginxExecCommand("-s", "reload").CombinedOutput()
if err != nil {
return fmt.Errorf("%v\n%v", err, string(o))
}
@ -679,9 +651,10 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
return nil
}
// nginxHashBucketSize computes the correct nginx hash_bucket_size for a hash with the given longest key
// nginxHashBucketSize computes the correct NGINX hash_bucket_size for a hash
// with the given longest key.
func nginxHashBucketSize(longestString int) int {
// See https://github.com/kubernetes/ingress-nginxs/issues/623 for an explanation
// see https://github.com/kubernetes/ingress-nginxs/issues/623 for an explanation
wordSize := 8 // Assume 64 bit CPU
n := longestString + 2
aligned := (n + wordSize - 1) & ^(wordSize - 1)
@ -708,7 +681,7 @@ func (n *NGINXController) setupSSLProxy() {
sslPort := n.cfg.ListenPorts.HTTPS
proxyPort := n.cfg.ListenPorts.SSLProxy
glog.Info("starting TLS proxy for SSL passthrough")
glog.Info("Starting TLS proxy for SSL Passthrough")
n.Proxy = &TCPProxy{
Default: &TCPServer{
Hostname: "localhost",
@ -725,32 +698,33 @@ func (n *NGINXController) setupSSLProxy() {
proxyList := &proxyproto.Listener{Listener: listener, ProxyHeaderTimeout: cfg.ProxyProtocolHeaderTimeout}
// start goroutine that accepts tcp connections in port 443
// accept TCP connections on the configured HTTPS port
go func() {
for {
var conn net.Conn
var err error
if n.store.GetBackendConfiguration().UseProxyProtocol {
// we need to wrap the listener in order to decode
// proxy protocol before handling the connection
// wrap the listener in order to decode Proxy
// Protocol before handling the connection
conn, err = proxyList.Accept()
} else {
conn, err = listener.Accept()
}
if err != nil {
glog.Warningf("unexpected error accepting tcp connection: %v", err)
glog.Warningf("Error accepting TCP connection: %v", err)
continue
}
glog.V(3).Infof("remote address %s to local %s", conn.RemoteAddr(), conn.LocalAddr())
glog.V(3).Infof("Handling connection from remote address %s to local %s", conn.RemoteAddr(), conn.LocalAddr())
go n.Proxy.Handle(conn)
}
}()
}
// IsDynamicConfigurationEnough decides if the new configuration changes can be dynamically applied without reloading
// IsDynamicConfigurationEnough returns whether a Configuration can be
// dynamically applied, without reloading the backend.
func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configuration) bool {
copyOfRunningConfig := *n.runningConfig
copyOfPcfg := *pcfg
@ -761,12 +735,16 @@ func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configurati
return copyOfRunningConfig.Equal(&copyOfPcfg)
}
// configureDynamically JSON encodes new Backends and POSTs it to an internal HTTP endpoint
// that is handled by Lua
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
func configureDynamically(pcfg *ingress.Configuration, port int) error {
backends := make([]*ingress.Backend, len(pcfg.Backends))
for i, backend := range pcfg.Backends {
var service *apiv1.Service
if backend.Service != nil {
service = &apiv1.Service{Spec: backend.Service.Spec}
}
luaBackend := &ingress.Backend{
Name: backend.Name,
Port: backend.Port,
@ -775,6 +753,7 @@ func configureDynamically(pcfg *ingress.Configuration, port int) error {
SessionAffinity: backend.SessionAffinity,
UpstreamHashBy: backend.UpstreamHashBy,
LoadBalancing: backend.LoadBalancing,
Service: service,
}
var endpoints []ingress.Endpoint
@ -796,7 +775,7 @@ func configureDynamically(pcfg *ingress.Configuration, port int) error {
return err
}
glog.V(2).Infof("posting backends configuration: %s", buf)
glog.V(2).Infof("Posting backends configuration: %s", buf)
url := fmt.Sprintf("http://localhost:%d/configuration/backends", port)
resp, err := http.Post(url, "application/json", bytes.NewReader(buf))
@ -806,7 +785,7 @@ func configureDynamically(pcfg *ingress.Configuration, port int) error {
defer func() {
if err := resp.Body.Close(); err != nil {
glog.Warningf("error while closing response body: \n%v", err)
glog.Warningf("Error while closing response body:\n%v", err)
}
}()
@ -816,3 +795,48 @@ func configureDynamically(pcfg *ingress.Configuration, port int) error {
return nil
}
const zipkinTmpl = `{
"service_name": "{{ .ZipkinServiceName }}",
"collector_host": "{{ .ZipkinCollectorHost }}",
"collector_port": {{ .ZipkinCollectorPort }},
"sample_rate": {{ .ZipkinSampleRate }}
}`
const jaegerTmpl = `{
"service_name": "{{ .JaegerServiceName }}",
"sampler": {
"type": "{{ .JaegerSamplerType }}",
"param": {{ .JaegerSamplerParam }}
},
"reporter": {
"localAgentHostPort": "{{ .JaegerCollectorHost }}:{{ .JaegerCollectorPort }}"
}
}`
func createOpentracingCfg(cfg ngx_config.Configuration) error {
var tmpl *template.Template
var err error
if cfg.ZipkinCollectorHost != "" {
tmpl, err = template.New("zipkin").Parse(zipkinTmpl)
if err != nil {
return err
}
} else if cfg.JaegerCollectorHost != "" {
tmpl, err = template.New("jarger").Parse(jaegerTmpl)
if err != nil {
return err
}
} else {
tmpl, _ = template.New("empty").Parse("{}")
}
tmplBuf := bytes.NewBuffer(make([]byte, 0))
err = tmpl.Execute(tmplBuf, cfg)
if err != nil {
return err
}
return ioutil.WriteFile("/etc/nginx/opentracing.json", tmplBuf.Bytes(), file.ReadWriteByUser)
}

View file

@ -144,12 +144,12 @@ func TestConfigureDynamically(t *testing.T) {
t.Fatal(err)
}
body := string(b)
if strings.Index(body, "target") != -1 {
if strings.Contains(body, "target") {
t.Errorf("unexpected target reference in JSON content: %v", body)
}
if strings.Index(body, "service") != -1 {
t.Errorf("unexpected service reference in JSON content: %v", body)
if !strings.Contains(body, "service") {
t.Errorf("service reference should be present in JSON content: %v", body)
}
}))

View file

@ -1,97 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/ingress-nginx/internal/ingress/controller/metric/collector"
)
const (
ngxStatusPath = "/nginx_status"
ngxVtsPath = "/nginx_status/format/json"
)
func (n *NGINXController) setupMonitor(sm statusModule) {
csm := n.statusModule
if csm != sm {
glog.Infof("changing prometheus collector from %v to %v", csm, sm)
n.stats.stop(csm)
n.stats.start(sm)
n.statusModule = sm
}
}
type statsCollector struct {
process prometheus.Collector
basic collector.Stopable
vts collector.Stopable
namespace string
watchClass string
port int
}
func (s *statsCollector) stop(sm statusModule) {
switch sm {
case defaultStatusModule:
s.basic.Stop()
prometheus.Unregister(s.basic)
case vtsStatusModule:
s.vts.Stop()
prometheus.Unregister(s.vts)
}
}
func (s *statsCollector) start(sm statusModule) {
switch sm {
case defaultStatusModule:
s.basic = collector.NewNginxStatus(s.namespace, s.watchClass, s.port, ngxStatusPath)
prometheus.Register(s.basic)
break
case vtsStatusModule:
s.vts = collector.NewNGINXVTSCollector(s.namespace, s.watchClass, s.port, ngxVtsPath)
prometheus.Register(s.vts)
break
}
}
func newStatsCollector(ns, class, binary string, port int) *statsCollector {
glog.Infof("starting new nginx stats collector for Ingress controller running in namespace %v (class %v)", ns, class)
glog.Infof("collector extracting information from port %v", port)
pc, err := collector.NewNamedProcess(true, collector.BinaryNameMatcher{
Name: "nginx",
Binary: binary,
})
if err != nil {
glog.Fatalf("unexpected error registering nginx collector: %v", err)
}
err = prometheus.Register(pc)
if err != nil {
glog.Fatalf("unexpected error registering nginx collector: %v", err)
}
return &statsCollector{
namespace: ns,
watchClass: class,
process: pc,
port: port,
}
}

View file

@ -33,20 +33,19 @@ import (
"k8s.io/ingress-nginx/internal/net/ssl"
)
// syncSecret keeps in sync Secrets used by Ingress rules with the files on
// disk to allow copy of the content of the secret to disk to be used
// by external processes.
// syncSecret synchronizes the content of a TLS Secret (certificate(s), secret
// key) with the filesystem. The resulting files can be used by NGINX.
func (s k8sStore) syncSecret(key string) {
s.mu.Lock()
defer s.mu.Unlock()
glog.V(3).Infof("starting syncing of secret %v", key)
glog.V(3).Infof("Syncing Secret %q", key)
// TODO: getPemCertificate should not write to disk to avoid unnecessary overhead
cert, err := s.getPemCertificate(key)
if err != nil {
if !isErrSecretForAuth(err) {
glog.Warningf("error obtaining PEM from secret %v: %v", key, err)
glog.Warningf("Error obtaining X.509 certificate: %v", err)
}
return
}
@ -58,7 +57,7 @@ func (s k8sStore) syncSecret(key string) {
// no need to update
return
}
glog.Infof("updating secret %v in the local store", key)
glog.Infof("Updating Secret %q in the local store", key)
s.sslStore.Update(key, cert)
// this update must trigger an update
// (like an update event from a change in Ingress)
@ -66,7 +65,7 @@ func (s k8sStore) syncSecret(key string) {
return
}
glog.Infof("adding secret %v to the local store", key)
glog.Infof("Adding Secret %q to the local store", key)
s.sslStore.Add(key, cert)
// this update must trigger an update
// (like an update event from a change in Ingress)
@ -78,7 +77,7 @@ func (s k8sStore) syncSecret(key string) {
func (s k8sStore) getPemCertificate(secretName string) (*ingress.SSLCert, error) {
secret, err := s.listers.Secret.ByKey(secretName)
if err != nil {
return nil, fmt.Errorf("error retrieving secret %v: %v", secretName, err)
return nil, err
}
cert, okcert := secret.Data[apiv1.TLSCertKey]
@ -93,40 +92,42 @@ func (s k8sStore) getPemCertificate(secretName string) (*ingress.SSLCert, error)
var sslCert *ingress.SSLCert
if okcert && okkey {
if cert == nil {
return nil, fmt.Errorf("secret %v has no 'tls.crt'", secretName)
return nil, fmt.Errorf("key 'tls.crt' missing from Secret %q", secretName)
}
if key == nil {
return nil, fmt.Errorf("secret %v has no 'tls.key'", secretName)
return nil, fmt.Errorf("key 'tls.key' missing from Secret %q", secretName)
}
// If 'ca.crt' is also present, it will allow this secret to be used in the
// 'nginx.ingress.kubernetes.io/auth-tls-secret' annotation
sslCert, err = ssl.AddOrUpdateCertAndKey(nsSecName, cert, key, ca, s.filesystem)
if err != nil {
return nil, fmt.Errorf("unexpected error creating pem file: %v", err)
return nil, err
}
glog.V(3).Infof("found 'tls.crt' and 'tls.key', configuring %v as a TLS Secret (CN: %v)", secretName, sslCert.CN)
msg := fmt.Sprintf("Configuring Secret %q for TLS encryption (CN: %v)", secretName, sslCert.CN)
if ca != nil {
glog.V(3).Infof("found 'ca.crt', secret %v can also be used for Certificate Authentication", secretName)
msg += " and authentication"
}
glog.V(3).Info(msg)
} else if ca != nil {
sslCert, err = ssl.AddCertAuth(nsSecName, ca, s.filesystem)
if err != nil {
return nil, fmt.Errorf("unexpected error creating pem file: %v", err)
return nil, err
}
// makes this secret in 'syncSecret' to be used for Certificate Authentication
// this does not enable Certificate Authentication
glog.V(3).Infof("found only 'ca.crt', configuring %v as an Certificate Authentication Secret", secretName)
glog.V(3).Infof("Configuring Secret %q for TLS authentication", secretName)
} else {
if auth != nil {
return nil, ErrSecretForAuth
}
return nil, fmt.Errorf("no keypair or CA cert could be found in %v", secretName)
return nil, fmt.Errorf("Secret %q contains no keypair or CA certificate", secretName)
}
sslCert.Name = secret.Name
@ -137,8 +138,8 @@ func (s k8sStore) getPemCertificate(secretName string) (*ingress.SSLCert, error)
func (s k8sStore) checkSSLChainIssues() {
for _, item := range s.ListLocalSSLCerts() {
secretName := k8s.MetaNamespaceKey(item)
secret, err := s.GetLocalSSLCert(secretName)
secrKey := k8s.MetaNamespaceKey(item)
secret, err := s.GetLocalSSLCert(secrKey)
if err != nil {
continue
}
@ -150,7 +151,7 @@ func (s k8sStore) checkSSLChainIssues() {
data, err := ssl.FullChainCert(secret.PemFileName, s.filesystem)
if err != nil {
glog.Errorf("unexpected error generating SSL certificate with full intermediate chain CA certs: %v", err)
glog.Errorf("Error generating CA certificate chain for Secret %q: %v", secrKey, err)
continue
}
@ -158,13 +159,13 @@ func (s k8sStore) checkSSLChainIssues() {
file, err := s.filesystem.Create(fullChainPemFileName)
if err != nil {
glog.Errorf("unexpected error creating SSL certificate file %v: %v", fullChainPemFileName, err)
glog.Errorf("Error creating SSL certificate file for Secret %q: %v", secrKey, err)
continue
}
_, err = file.Write(data)
if err != nil {
glog.Errorf("unexpected error creating SSL certificate: %v", err)
glog.Errorf("Error creating SSL certificate for Secret %q: %v", secrKey, err)
continue
}
@ -172,14 +173,14 @@ func (s k8sStore) checkSSLChainIssues() {
err = mergo.MergeWithOverwrite(dst, secret)
if err != nil {
glog.Errorf("unexpected error creating SSL certificate: %v", err)
glog.Errorf("Error creating SSL certificate for Secret %q: %v", secrKey, err)
continue
}
dst.FullChainPemFileName = fullChainPemFileName
glog.Infof("updating local copy of ssl certificate %v with missing intermediate CA certs", secretName)
s.sslStore.Update(secretName, dst)
glog.Infof("Updating local copy of SSL certificate %q with missing intermediate CA certs", secrKey)
s.sslStore.Update(secrKey, dst)
// this update must trigger an update
// (like an update event from a change in Ingress)
s.sendDummyEvent()

View file

@ -17,8 +17,6 @@ limitations under the License.
package store
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
@ -28,14 +26,14 @@ type ConfigMapLister struct {
cache.Store
}
// ByKey searches for a configmap in the local configmaps Store
// ByKey returns the ConfigMap matching key in the local ConfigMap Store.
func (cml *ConfigMapLister) ByKey(key string) (*apiv1.ConfigMap, error) {
s, exists, err := cml.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("configmap %v was not found", key)
return nil, NotExistsError(key)
}
return s.(*apiv1.ConfigMap), nil
}

View file

@ -17,8 +17,6 @@ limitations under the License.
package store
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
@ -28,15 +26,14 @@ type EndpointLister struct {
cache.Store
}
// GetServiceEndpoints returns the endpoints of a service, matched on service name.
func (s *EndpointLister) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) {
key := fmt.Sprintf("%v/%v", svc.Namespace, svc.Name)
// ByKey returns the Endpoints of the Service matching key in the local Endpoint Store.
func (s *EndpointLister) ByKey(key string) (*apiv1.Endpoints, error) {
eps, exists, err := s.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("could not find endpoints for service %v", key)
return nil, NotExistsError(key)
}
return eps.(*apiv1.Endpoints), nil
}

View file

@ -17,8 +17,6 @@ limitations under the License.
package store
import (
"fmt"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/client-go/tools/cache"
)
@ -28,14 +26,14 @@ type IngressLister struct {
cache.Store
}
// ByKey searches for an ingress in the local ingress Store
// ByKey returns the Ingress matching key in the local Ingress Store.
func (il IngressLister) ByKey(key string) (*extensions.Ingress, error) {
i, exists, err := il.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("ingress %v was not found", key)
return nil, NotExistsError(key)
}
return i.(*extensions.Ingress), nil
}

View file

@ -18,9 +18,22 @@ package store
import (
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-nginx/internal/ingress/annotations"
)
// IngressAnnotationsLister makes a Store that lists annotations in Ingress rules.
type IngressAnnotationsLister struct {
cache.Store
}
// ByKey returns the Ingress annotations matching key in the local Ingress annotations Store.
func (il IngressAnnotationsLister) ByKey(key string) (*annotations.Ingress, error) {
i, exists, err := il.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, NotExistsError(key)
}
return i.(*annotations.Ingress), nil
}

View file

@ -17,8 +17,6 @@ limitations under the License.
package store
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
@ -28,14 +26,14 @@ type SecretLister struct {
cache.Store
}
// ByKey searches for a secret in the local secrets Store
// ByKey returns the Secret matching key in the local Secret Store.
func (sl *SecretLister) ByKey(key string) (*apiv1.Secret, error) {
s, exists, err := sl.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("secret %v was not found", key)
return nil, NotExistsError(key)
}
return s.(*apiv1.Secret), nil
}

View file

@ -17,8 +17,6 @@ limitations under the License.
package store
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
@ -28,14 +26,14 @@ type ServiceLister struct {
cache.Store
}
// ByKey searches for a service in the local secrets Store
// ByKey returns the Service matching key in the local Service Store.
func (sl *ServiceLister) ByKey(key string) (*apiv1.Service, error) {
s, exists, err := sl.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("service %v was not found", key)
return nil, NotExistsError(key)
}
return s.(*apiv1.Service), nil
}

View file

@ -58,25 +58,26 @@ type Storer interface {
// GetBackendConfiguration returns the nginx configuration stored in a configmap
GetBackendConfiguration() ngx_config.Configuration
// GetConfigMap returns a ConfigmMap using the namespace and name as key
// GetConfigMap returns the ConfigMap matching key.
GetConfigMap(key string) (*corev1.ConfigMap, error)
// GetSecret returns a Secret using the namespace and name as key
// GetSecret returns the Secret matching key.
GetSecret(key string) (*corev1.Secret, error)
// GetService returns a Service using the namespace and name as key
// GetService returns the Service matching key.
GetService(key string) (*corev1.Service, error)
GetServiceEndpoints(svc *corev1.Service) (*corev1.Endpoints, error)
// GetServiceEndpoints returns the Endpoints of a Service matching key.
GetServiceEndpoints(key string) (*corev1.Endpoints, error)
// GetSecret returns an Ingress using the namespace and name as key
// GetIngress returns the Ingress matching key.
GetIngress(key string) (*extensions.Ingress, error)
// ListIngresses returns the list of Ingresses
// ListIngresses returns a list of all Ingresses in the store.
ListIngresses() []*extensions.Ingress
// GetIngressAnnotations returns the annotations associated to an Ingress
GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error)
// GetIngressAnnotations returns the parsed annotations of an Ingress matching key.
GetIngressAnnotations(key string) (*annotations.Ingress, error)
// GetLocalSSLCert returns the local copy of a SSLCert
GetLocalSSLCert(name string) (*ingress.SSLCert, error)
@ -110,7 +111,7 @@ const (
ConfigurationEvent EventType = "CONFIGURATION"
)
// Event holds the context of an event
// Event holds the context of an event.
type Event struct {
Type EventType
Obj interface{}
@ -125,7 +126,7 @@ type Informer struct {
ConfigMap cache.SharedIndexInformer
}
// Lister returns the stores for ingresses, services, endpoints, secrets and configmaps.
// Lister contains object listers (stores).
type Lister struct {
Ingress IngressLister
Service ServiceLister
@ -135,6 +136,14 @@ type Lister struct {
IngressAnnotation IngressAnnotationsLister
}
// NotExistsError is returned when an object does not exist in a local store.
type NotExistsError string
// Error implements the error interface.
func (e NotExistsError) Error() string {
return fmt.Sprintf("no object matching key %q in local store", string(e))
}
// Run initiates the synchronization of the informers against the API server.
func (i *Informer) Run(stopCh chan struct{}) {
go i.Endpoint.Run(stopCh)
@ -235,7 +244,7 @@ func New(checkOCSP bool,
Component: "nginx-ingress-controller",
})
// k8sStore fulfils resolver.Resolver interface
// k8sStore fulfills resolver.Resolver interface
store.annotations = annotations.NewAnnotationExtractor(store)
store.listers.IngressAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
@ -479,6 +488,18 @@ func New(checkOCSP bool,
if key == configmap {
store.setConfig(cm)
}
ings := store.listers.IngressAnnotation.List()
for _, ingKey := range ings {
key := k8s.MetaNamespaceKey(ingKey)
ing, err := store.GetIngress(key)
if err != nil {
glog.Errorf("could not find Ingress %v in local store: %v", key, err)
continue
}
store.extractAnnotations(ing)
}
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
@ -494,6 +515,14 @@ func New(checkOCSP bool,
store.informers.ConfigMap.AddEventHandler(cmEventHandler)
store.informers.Service.AddEventHandler(cache.ResourceEventHandlerFuncs{})
// do not wait for informers to read the configmap configuration
ns, name, _ := k8s.ParseNameNS(configmap)
cm, err := client.CoreV1().ConfigMaps(ns).Get(name, metav1.GetOptions{})
if err != nil {
glog.Warningf("Unexpected error reading configuration configmap: %v", err)
}
store.setConfig(cm)
return store
}
@ -581,7 +610,7 @@ func (s k8sStore) syncSecrets(ing *extensions.Ingress) {
}
}
// GetSecret returns a Secret using the namespace and name as key
// GetSecret returns the Secret matching key.
func (s k8sStore) GetSecret(key string) (*corev1.Secret, error) {
return s.listers.Secret.ByKey(key)
}
@ -598,12 +627,12 @@ func (s k8sStore) ListLocalSSLCerts() []*ingress.SSLCert {
return certs
}
// GetService returns a Service using the namespace and name as key
// GetService returns the Service matching key.
func (s k8sStore) GetService(key string) (*corev1.Service, error) {
return s.listers.Service.ByKey(key)
}
// GetIngress returns an Ingress using the namespace and name as key
// GetIngress returns the Ingress matching key.
func (s k8sStore) GetIngress(key string) (*extensions.Ingress, error) {
return s.listers.Ingress.ByKey(key)
}
@ -636,17 +665,14 @@ func (s k8sStore) ListIngresses() []*extensions.Ingress {
return ingresses
}
// GetIngressAnnotations returns the annotations associated to an Ingress
func (s k8sStore) GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error) {
key := k8s.MetaNamespaceKey(ing)
item, exists, err := s.listers.IngressAnnotation.GetByKey(key)
// GetIngressAnnotations returns the parsed annotations of an Ingress matching key.
func (s k8sStore) GetIngressAnnotations(key string) (*annotations.Ingress, error) {
ia, err := s.listers.IngressAnnotation.ByKey(key)
if err != nil {
return &annotations.Ingress{}, fmt.Errorf("unexpected error getting ingress annotation %v: %v", key, err)
return &annotations.Ingress{}, err
}
if !exists {
return &annotations.Ingress{}, fmt.Errorf("ingress annotations %v was not found", key)
}
return item.(*annotations.Ingress), nil
return ia, nil
}
// GetLocalSSLCert returns the local copy of a SSLCert
@ -654,12 +680,14 @@ func (s k8sStore) GetLocalSSLCert(key string) (*ingress.SSLCert, error) {
return s.sslStore.ByKey(key)
}
// GetConfigMap returns the ConfigMap matching key.
func (s k8sStore) GetConfigMap(key string) (*corev1.ConfigMap, error) {
return s.listers.ConfigMap.ByKey(key)
}
func (s k8sStore) GetServiceEndpoints(svc *corev1.Service) (*corev1.Endpoints, error) {
return s.listers.Endpoint.GetServiceEndpoints(svc)
// GetServiceEndpoints returns the Endpoints of a Service matching key.
func (s k8sStore) GetServiceEndpoints(key string) (*corev1.Endpoints, error) {
return s.listers.Endpoint.ByKey(key)
}
// GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret
@ -680,6 +708,34 @@ func (s k8sStore) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error)
}, nil
}
func (s k8sStore) writeSSLSessionTicketKey(cmap *corev1.ConfigMap, fileName string) {
ticketString := ngx_template.ReadConfig(cmap.Data).SSLSessionTicketKey
s.backendConfig.SSLSessionTicketKey = ""
if ticketString != "" {
ticketBytes := base64.StdEncoding.WithPadding(base64.StdPadding).DecodedLen(len(ticketString))
// 81 used instead of 80 because of padding
if !(ticketBytes == 48 || ticketBytes == 81) {
glog.Warningf("ssl-session-ticket-key must contain either 48 or 80 bytes")
}
decodedTicket, err := base64.StdEncoding.DecodeString(ticketString)
if err != nil {
glog.Errorf("unexpected error decoding ssl-session-ticket-key: %v", err)
return
}
err = ioutil.WriteFile(fileName, decodedTicket, file.ReadWriteByUser)
if err != nil {
glog.Errorf("unexpected error writing ssl-session-ticket-key to %s: %v", fileName, err)
return
}
s.backendConfig.SSLSessionTicketKey = ticketString
}
}
// GetDefaultBackend returns the default backend
func (s k8sStore) GetDefaultBackend() defaults.Backend {
return s.backendConfig.Backend
@ -691,16 +747,7 @@ func (s k8sStore) GetBackendConfiguration() ngx_config.Configuration {
func (s *k8sStore) setConfig(cmap *corev1.ConfigMap) {
s.backendConfig = ngx_template.ReadConfig(cmap.Data)
// TODO: this should not be done here
if s.backendConfig.SSLSessionTicketKey != "" {
d, err := base64.StdEncoding.DecodeString(s.backendConfig.SSLSessionTicketKey)
if err != nil {
glog.Warningf("unexpected error decoding key ssl-session-ticket-key: %v", err)
s.backendConfig.SSLSessionTicketKey = ""
}
ioutil.WriteFile("/etc/nginx/tickets.key", d, 0644)
}
s.writeSSLSessionTicketKey(cmap, "/etc/nginx/tickets.key")
}
// Run initiates the synchronization of the informers and the initial

View file

@ -18,45 +18,36 @@ package store
import (
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/eapache/channels"
"k8s.io/api/extensions/v1beta1"
extensions "k8s.io/api/extensions/v1beta1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"encoding/base64"
"io/ioutil"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
"k8s.io/ingress-nginx/test/e2e/framework"
)
func TestStore(t *testing.T) {
// TODO: find a way to avoid the need to use a real api server
home := os.Getenv("HOME")
kubeConfigFile := fmt.Sprintf("%v/.kube/config", home)
kubeContext := ""
kubeConfig, err := framework.LoadConfig(kubeConfigFile, kubeContext)
if err != nil {
t.Errorf("unexpected error loading kubeconfig file: %v", err)
}
clientSet, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
t.Errorf("unexpected error creating ingress client: %v", err)
}
clientSet := fake.NewSimpleClientset()
t.Run("should return an error searching for non existing objects", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
cm := createConfigMap(clientSet, ns, t)
defer deleteConfigMap(cm, ns, clientSet, t)
stopCh := make(chan struct{})
updateCh := channels.NewRingChannel(1024)
@ -118,6 +109,8 @@ func TestStore(t *testing.T) {
t.Run("should return one event for add, update and delete of ingress", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
cm := createConfigMap(clientSet, ns, t)
defer deleteConfigMap(cm, ns, clientSet, t)
stopCh := make(chan struct{})
updateCh := channels.NewRingChannel(1024)
@ -138,8 +131,9 @@ func TestStore(t *testing.T) {
continue
}
if _, ok := e.Obj.(*extensions.Ingress); !ok {
t.Errorf("expected an Ingress type but %T returned", e.Obj)
continue
}
switch e.Type {
case CreateEvent:
atomic.AddUint64(&add, 1)
@ -165,21 +159,22 @@ func TestStore(t *testing.T) {
storer.Run(stopCh)
ing, err := ensureIngress(&v1beta1.Ingress{
ing := ensureIngress(&extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "dummy",
Namespace: ns,
SelfLink: fmt.Sprintf("/apis/extensions/v1beta1/namespaces/%s/ingresses/dummy", ns),
},
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
Spec: extensions.IngressSpec{
Rules: []extensions.IngressRule{
{
Host: "dummy",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
IngressRuleValue: extensions.IngressRuleValue{
HTTP: &extensions.HTTPIngressRuleValue{
Paths: []extensions.HTTPIngressPath{
{
Path: "/",
Backend: v1beta1.IngressBackend{
Backend: extensions.IngressBackend{
ServiceName: "http-svc",
ServicePort: intstr.FromInt(80),
},
@ -190,30 +185,34 @@ func TestStore(t *testing.T) {
},
},
},
}, clientSet)
}, clientSet, t)
err := framework.WaitForIngressInNamespace(clientSet, ns, ing.Name)
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
t.Errorf("error waiting for secret: %v", err)
}
time.Sleep(1 * time.Second)
// create an invalid ingress (different class)
_, err = ensureIngress(&v1beta1.Ingress{
invalidIngress := ensureIngress(&extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "custom-class",
SelfLink: fmt.Sprintf("/apis/extensions/v1beta1/namespaces/%s/ingresses/custom-class", ns),
Namespace: ns,
Annotations: map[string]string{
"kubernetes.io/ingress.class": "something",
},
},
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
Spec: extensions.IngressSpec{
Rules: []extensions.IngressRule{
{
Host: "dummy",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
IngressRuleValue: extensions.IngressRuleValue{
HTTP: &extensions.HTTPIngressRuleValue{
Paths: []extensions.HTTPIngressPath{
{
Path: "/",
Backend: v1beta1.IngressBackend{
Backend: extensions.IngressBackend{
ServiceName: "http-svc",
ServicePort: intstr.FromInt(80),
},
@ -224,26 +223,28 @@ func TestStore(t *testing.T) {
},
},
},
}, clientSet)
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
}
}, clientSet, t)
defer deleteIngress(invalidIngress, clientSet, t)
ni := ing.DeepCopy()
ni.Spec.Rules[0].Host = "update-dummy"
_, err = ensureIngress(ni, clientSet)
_ = ensureIngress(ni, clientSet, t)
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
t.Errorf("error creating ingress: %v", err)
}
// Secret takes a bit to update
time.Sleep(3 * time.Second)
err = clientSet.Extensions().Ingresses(ni.Namespace).Delete(ni.Name, &metav1.DeleteOptions{})
if err != nil {
t.Errorf("error creating ingress: %v", err)
}
err = clientSet.ExtensionsV1beta1().
Ingresses(ni.Namespace).
Delete(ni.Name, &metav1.DeleteOptions{})
err = framework.WaitForNoIngressInNamespace(clientSet, ni.Namespace, ni.Name)
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
t.Errorf("error waiting for secret: %v", err)
}
framework.WaitForNoIngressInNamespace(clientSet, ni.Namespace, ni.Name)
time.Sleep(1 * time.Second)
if atomic.LoadUint64(&add) != 1 {
t.Errorf("expected 1 event of type Create but %v occurred", add)
@ -259,6 +260,8 @@ func TestStore(t *testing.T) {
t.Run("should not receive events from secret not referenced from ingress", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
cm := createConfigMap(clientSet, ns, t)
defer deleteConfigMap(cm, ns, clientSet, t)
stopCh := make(chan struct{})
updateCh := channels.NewRingChannel(1024)
@ -304,14 +307,14 @@ func TestStore(t *testing.T) {
storer.Run(stopCh)
secretName := "not-referenced"
_, err = framework.CreateIngressTLSSecret(clientSet, []string{"foo"}, secretName, ns)
_, err := framework.CreateIngressTLSSecret(clientSet, []string{"foo"}, secretName, ns)
if err != nil {
t.Errorf("unexpected error creating secret: %v", err)
t.Errorf("error creating secret: %v", err)
}
err = framework.WaitForSecretInNamespace(clientSet, ns, secretName)
if err != nil {
t.Errorf("unexpected error waiting for secret: %v", err)
t.Errorf("error waiting for secret: %v", err)
}
if atomic.LoadUint64(&add) != 0 {
@ -326,7 +329,7 @@ func TestStore(t *testing.T) {
err = clientSet.CoreV1().Secrets(ns).Delete(secretName, &metav1.DeleteOptions{})
if err != nil {
t.Errorf("unexpected error deleting secret: %v", err)
t.Errorf("error deleting secret: %v", err)
}
time.Sleep(1 * time.Second)
@ -345,6 +348,8 @@ func TestStore(t *testing.T) {
t.Run("should receive events from secret referenced from ingress", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
cm := createConfigMap(clientSet, ns, t)
defer deleteConfigMap(cm, ns, clientSet, t)
stopCh := make(chan struct{})
updateCh := channels.NewRingChannel(1024)
@ -392,40 +397,39 @@ func TestStore(t *testing.T) {
ingressName := "ingress-with-secret"
secretName := "referenced"
_, err := ensureIngress(&v1beta1.Ingress{
ing := ensureIngress(&extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: ingressName,
Namespace: ns,
SelfLink: fmt.Sprintf("/apis/extensions/v1beta1/namespaces/%s/ingresses/%s", ns, ingressName),
},
Spec: v1beta1.IngressSpec{
TLS: []v1beta1.IngressTLS{
Spec: extensions.IngressSpec{
TLS: []extensions.IngressTLS{
{
SecretName: secretName,
},
},
Backend: &v1beta1.IngressBackend{
Backend: &extensions.IngressBackend{
ServiceName: "http-svc",
ServicePort: intstr.FromInt(80),
},
},
}, clientSet)
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
}
}, clientSet, t)
defer deleteIngress(ing, clientSet, t)
err = framework.WaitForIngressInNamespace(clientSet, ns, ingressName)
err := framework.WaitForIngressInNamespace(clientSet, ns, ingressName)
if err != nil {
t.Errorf("unexpected error waiting for secret: %v", err)
t.Errorf("error waiting for secret: %v", err)
}
_, err = framework.CreateIngressTLSSecret(clientSet, []string{"foo"}, secretName, ns)
if err != nil {
t.Errorf("unexpected error creating secret: %v", err)
t.Errorf("error creating secret: %v", err)
}
err = framework.WaitForSecretInNamespace(clientSet, ns, secretName)
if err != nil {
t.Errorf("unexpected error waiting for secret: %v", err)
t.Errorf("error waiting for secret: %v", err)
}
// take into account secret sync
@ -441,7 +445,7 @@ func TestStore(t *testing.T) {
err = clientSet.CoreV1().Secrets(ns).Delete(secretName, &metav1.DeleteOptions{})
if err != nil {
t.Errorf("unexpected error deleting secret: %v", err)
t.Errorf("error deleting secret: %v", err)
}
time.Sleep(1 * time.Second)
@ -449,11 +453,14 @@ func TestStore(t *testing.T) {
if atomic.LoadUint64(&del) != 1 {
t.Errorf("expected 1 events of type Delete but %v occurred", del)
}
})
t.Run("should create an ingress with a secret which does not exist", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
cm := createConfigMap(clientSet, ns, t)
defer deleteConfigMap(cm, ns, clientSet, t)
stopCh := make(chan struct{})
updateCh := channels.NewRingChannel(1024)
@ -501,27 +508,28 @@ func TestStore(t *testing.T) {
name := "ingress-with-secret"
secretHosts := []string{name}
_, err := ensureIngress(&v1beta1.Ingress{
ing := ensureIngress(&extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
SelfLink: fmt.Sprintf("/apis/extensions/v1beta1/namespaces/%s/ingresses/%s", ns, name),
},
Spec: v1beta1.IngressSpec{
TLS: []v1beta1.IngressTLS{
Spec: extensions.IngressSpec{
TLS: []extensions.IngressTLS{
{
Hosts: secretHosts,
SecretName: name,
},
},
Rules: []v1beta1.IngressRule{
Rules: []extensions.IngressRule{
{
Host: name,
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
IngressRuleValue: extensions.IngressRuleValue{
HTTP: &extensions.HTTPIngressRuleValue{
Paths: []extensions.HTTPIngressPath{
{
Path: "/",
Backend: v1beta1.IngressBackend{
Backend: extensions.IngressBackend{
ServiceName: "http-svc",
ServicePort: intstr.FromInt(80),
},
@ -532,14 +540,12 @@ func TestStore(t *testing.T) {
},
},
},
}, clientSet)
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
}
}, clientSet, t)
defer deleteIngress(ing, clientSet, t)
err = framework.WaitForIngressInNamespace(clientSet, ns, name)
err := framework.WaitForIngressInNamespace(clientSet, ns, name)
if err != nil {
t.Errorf("unexpected error waiting for ingress: %v", err)
t.Errorf("error waiting for ingress: %v", err)
}
// take into account delay caused by:
@ -560,13 +566,13 @@ func TestStore(t *testing.T) {
_, err = framework.CreateIngressTLSSecret(clientSet, secretHosts, name, ns)
if err != nil {
t.Errorf("unexpected error creating secret: %v", err)
t.Errorf("error creating secret: %v", err)
}
t.Run("should exists a secret in the local store and filesystem", func(t *testing.T) {
err := framework.WaitForSecretInNamespace(clientSet, ns, name)
if err != nil {
t.Errorf("unexpected error waiting for secret: %v", err)
t.Errorf("error waiting for secret: %v", err)
}
time.Sleep(5 * time.Second)
@ -574,13 +580,13 @@ func TestStore(t *testing.T) {
pemFile := fmt.Sprintf("%v/%v-%v.pem", file.DefaultSSLDirectory, ns, name)
err = framework.WaitForFileInFS(pemFile, fs)
if err != nil {
t.Errorf("unexpected error waiting for file to exist on the file system: %v", err)
t.Errorf("error waiting for file to exist on the file system: %v", err)
}
secretName := fmt.Sprintf("%v/%v", ns, name)
sslCert, err := storer.GetLocalSSLCert(secretName)
if err != nil {
t.Errorf("unexpected error reading local secret %v: %v", secretName, err)
t.Errorf("error reading local secret %v: %v", secretName, err)
}
if sslCert == nil {
@ -602,41 +608,107 @@ func TestStore(t *testing.T) {
// check invalid secret (missing ca)
}
func createNamespace(clientSet *kubernetes.Clientset, t *testing.T) string {
t.Log("creating temporal namespace")
ns, err := framework.CreateKubeNamespace("store-test", clientSet)
if err != nil {
t.Errorf("unexpected error creating ingress client: %v", err)
}
t.Logf("temporal namespace %v created", ns)
func createNamespace(clientSet kubernetes.Interface, t *testing.T) string {
t.Helper()
t.Log("Creating temporal namespace")
return ns
namespace := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "store-test",
},
}
ns, err := clientSet.CoreV1().Namespaces().Create(namespace)
if err != nil {
t.Errorf("error creating the namespace: %v", err)
}
t.Logf("Temporal namespace %v created", ns)
return ns.Name
}
func deleteNamespace(ns string, clientSet *kubernetes.Clientset, t *testing.T) {
t.Logf("deleting temporal namespace %v created", ns)
err := framework.DeleteKubeNamespace(clientSet, ns)
func deleteNamespace(ns string, clientSet kubernetes.Interface, t *testing.T) {
t.Helper()
t.Logf("Deleting temporal namespace %v", ns)
err := clientSet.CoreV1().Namespaces().Delete(ns, &metav1.DeleteOptions{})
if err != nil {
t.Errorf("unexpected error creating ingress client: %v", err)
t.Errorf("error deleting the namespace: %v", err)
}
t.Logf("temporal namespace %v deleted", ns)
t.Logf("Temporal namespace %v deleted", ns)
}
func ensureIngress(ingress *extensions.Ingress, clientSet *kubernetes.Clientset) (*extensions.Ingress, error) {
s, err := clientSet.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress)
func createConfigMap(clientSet kubernetes.Interface, ns string, t *testing.T) string {
t.Helper()
t.Log("Creating temporal config map")
configMap := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config",
SelfLink: fmt.Sprintf("/api/v1/namespaces/%s/configmaps/config", ns),
},
}
cm, err := clientSet.CoreV1().ConfigMaps(ns).Create(configMap)
if err != nil {
t.Errorf("error creating the configuration map: %v", err)
}
t.Logf("Temporal configmap %v created", cm)
return cm.Name
}
func deleteConfigMap(cm, ns string, clientSet kubernetes.Interface, t *testing.T) {
t.Helper()
t.Logf("Deleting temporal configmap %v", cm)
err := clientSet.CoreV1().ConfigMaps(ns).Delete(cm, &metav1.DeleteOptions{})
if err != nil {
t.Errorf("error deleting the configmap: %v", err)
}
t.Logf("Temporal configmap %v deleted", cm)
}
func ensureIngress(ingress *extensions.Ingress, clientSet kubernetes.Interface, t *testing.T) *extensions.Ingress {
t.Helper()
ing, err := clientSet.Extensions().Ingresses(ingress.Namespace).Update(ingress)
if err != nil {
if k8sErrors.IsNotFound(err) {
return clientSet.ExtensionsV1beta1().Ingresses(ingress.Namespace).Create(ingress)
t.Logf("Ingress %v not found, creating", ingress)
ing, err = clientSet.Extensions().Ingresses(ingress.Namespace).Create(ingress)
if err != nil {
t.Fatalf("error creating ingress %+v: %v", ingress, err)
}
t.Logf("Ingress %+v created", ingress)
return ing
}
return nil, err
t.Fatalf("error updating ingress %+v: %v", ingress, err)
}
return s, nil
t.Logf("Ingress %+v updated", ingress)
return ing
}
func deleteIngress(ingress *extensions.Ingress, clientSet kubernetes.Interface, t *testing.T) {
t.Helper()
err := clientSet.Extensions().Ingresses(ingress.Namespace).Delete(ingress.Name, &metav1.DeleteOptions{})
if err != nil {
t.Errorf("failed to delete ingress %+v: %v", ingress, err)
}
t.Logf("Ingress %+v deleted", ingress)
}
func newFS(t *testing.T) file.Filesystem {
fs, err := file.NewFakeFS()
if err != nil {
t.Fatalf("unexpected error creating filesystem: %v", err)
t.Fatalf("error creating filesystem: %v", err)
}
return fs
}
@ -646,7 +718,7 @@ func newFS(t *testing.T) file.Filesystem {
func newStore(t *testing.T) *k8sStore {
fs, err := file.NewFakeFS()
if err != nil {
t.Fatalf("unexpected error: %v", err)
t.Fatalf("error: %v", err)
}
return &k8sStore{
@ -665,7 +737,7 @@ func newStore(t *testing.T) *k8sStore {
func TestUpdateSecretIngressMap(t *testing.T) {
s := newStore(t)
ingTpl := &v1beta1.Ingress{
ingTpl := &extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "testns",
@ -675,8 +747,8 @@ func TestUpdateSecretIngressMap(t *testing.T) {
t.Run("with TLS secret", func(t *testing.T) {
ing := ingTpl.DeepCopy()
ing.Spec = v1beta1.IngressSpec{
TLS: []v1beta1.IngressTLS{{SecretName: "tls"}},
ing.Spec = extensions.IngressSpec{
TLS: []extensions.IngressTLS{{SecretName: "tls"}},
}
s.listers.Ingress.Update(ing)
s.updateSecretIngressMap(ing)
@ -729,17 +801,17 @@ func TestUpdateSecretIngressMap(t *testing.T) {
func TestListIngresses(t *testing.T) {
s := newStore(t)
ingEmptyClass := &v1beta1.Ingress{
ingEmptyClass := &extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "test-1",
Namespace: "testns",
},
Spec: v1beta1.IngressSpec{
Backend: &v1beta1.IngressBackend{
Spec: extensions.IngressSpec{
Backend: &extensions.IngressBackend{
ServiceName: "demo",
ServicePort: intstr.FromInt(80),
},
Rules: []v1beta1.IngressRule{
Rules: []extensions.IngressRule{
{
Host: "foo.bar",
},
@ -748,7 +820,7 @@ func TestListIngresses(t *testing.T) {
}
s.listers.Ingress.Add(ingEmptyClass)
ingressToIgnore := &v1beta1.Ingress{
ingressToIgnore := &extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "test-2",
Namespace: "testns",
@ -756,8 +828,8 @@ func TestListIngresses(t *testing.T) {
"kubernetes.io/ingress.class": "something",
},
},
Spec: v1beta1.IngressSpec{
Backend: &v1beta1.IngressBackend{
Spec: extensions.IngressSpec{
Backend: &extensions.IngressBackend{
ServiceName: "demo",
ServicePort: intstr.FromInt(80),
},
@ -765,20 +837,20 @@ func TestListIngresses(t *testing.T) {
}
s.listers.Ingress.Add(ingressToIgnore)
ingressWithoutPath := &v1beta1.Ingress{
ingressWithoutPath := &extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "test-3",
Namespace: "testns",
},
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
Spec: extensions.IngressSpec{
Rules: []extensions.IngressRule{
{
Host: "foo.bar",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
IngressRuleValue: extensions.IngressRuleValue{
HTTP: &extensions.HTTPIngressRuleValue{
Paths: []extensions.HTTPIngressPath{
{
Backend: v1beta1.IngressBackend{
Backend: extensions.IngressBackend{
ServiceName: "demo",
ServicePort: intstr.FromInt(80),
},
@ -792,7 +864,7 @@ func TestListIngresses(t *testing.T) {
}
s.listers.Ingress.Add(ingressWithoutPath)
ingressWithNginxClass := &v1beta1.Ingress{
ingressWithNginxClass := &extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "test-4",
Namespace: "testns",
@ -800,16 +872,16 @@ func TestListIngresses(t *testing.T) {
"kubernetes.io/ingress.class": "nginx",
},
},
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
Spec: extensions.IngressSpec{
Rules: []extensions.IngressRule{
{
Host: "foo.bar",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
IngressRuleValue: extensions.IngressRuleValue{
HTTP: &extensions.HTTPIngressRuleValue{
Paths: []extensions.HTTPIngressPath{
{
Path: "/demo",
Backend: v1beta1.IngressBackend{
Backend: extensions.IngressBackend{
ServiceName: "demo",
ServicePort: intstr.FromInt(80),
},
@ -828,3 +900,39 @@ func TestListIngresses(t *testing.T) {
t.Errorf("Expected 3 Ingresses but got %v", s)
}
}
func TestWriteSSLSessionTicketKey(t *testing.T) {
tests := []string{
"9DyULjtYWz520d1rnTLbc4BOmN2nLAVfd3MES/P3IxWuwXkz9Fby0lnOZZUdNEMV",
"9SvN1C9AB5DvNde5fMKoJwAwICpqdjiMyxR+cv6NpAWv22rFd3gKt4wMyGxCm7l9Wh6BQPG0+csyBZSHHr2NOWj52Wx8xCegXf4NsSMBUqA=",
}
for _, test := range tests {
s := newStore(t)
cmap := &v1.ConfigMap{
Data: map[string]string{
"ssl-session-ticket-key": test,
},
}
f, err := ioutil.TempFile("", "ssl-session-ticket-test")
if err != nil {
t.Fatal(err)
}
s.writeSSLSessionTicketKey(cmap, f.Name())
content, err := ioutil.ReadFile(f.Name())
if err != nil {
t.Fatal(err)
}
encodedContent := base64.StdEncoding.EncodeToString(content)
f.Close()
if test != encodedContent {
t.Fatalf("expected %v but returned %s", test, encodedContent)
}
}
}

View file

@ -26,7 +26,7 @@ import (
"github.com/paultag/sniff/parser"
)
// TCPServer describes a server that works in passthrough mode
// TCPServer describes a server that works in passthrough mode.
type TCPServer struct {
Hostname string
IP string
@ -34,13 +34,13 @@ type TCPServer struct {
ProxyProtocol bool
}
// TCPProxy describes the passthrough servers and a default as catch all
// TCPProxy describes the passthrough servers and a default as catch all.
type TCPProxy struct {
ServerList []*TCPServer
Default *TCPServer
}
// Get returns the TCPServer to use
// Get returns the TCPServer to use for a given host.
func (p *TCPProxy) Get(host string) *TCPServer {
if p.ServerList == nil {
return p.Default
@ -63,19 +63,19 @@ func (p *TCPProxy) Handle(conn net.Conn) {
length, err := conn.Read(data)
if err != nil {
glog.V(4).Infof("error reading the first 4k of the connection: %s", err)
glog.V(4).Infof("Error reading the first 4k of the connection: %s", err)
return
}
proxy := p.Default
hostname, err := parser.GetHostname(data[:])
if err == nil {
glog.V(4).Infof("parsed hostname from TLS Client Hello: %s", hostname)
glog.V(4).Infof("Parsed hostname from TLS Client Hello: %s", hostname)
proxy = p.Get(hostname)
}
if proxy == nil {
glog.V(4).Infof("there is no configured proxy for SSL connections")
glog.V(4).Infof("There is no configured proxy for SSL connections.")
return
}
@ -86,7 +86,7 @@ func (p *TCPProxy) Handle(conn net.Conn) {
defer clientConn.Close()
if proxy.ProxyProtocol {
//Write out the proxy-protocol header
// write out the Proxy Protocol header
localAddr := conn.LocalAddr().(*net.TCPAddr)
remoteAddr := conn.RemoteAddr().(*net.TCPAddr)
protocol := "UNKNOWN"
@ -96,16 +96,16 @@ func (p *TCPProxy) Handle(conn net.Conn) {
protocol = "TCP6"
}
proxyProtocolHeader := fmt.Sprintf("PROXY %s %s %s %d %d\r\n", protocol, remoteAddr.IP.String(), localAddr.IP.String(), remoteAddr.Port, localAddr.Port)
glog.V(4).Infof("Writing proxy protocol header - %s", proxyProtocolHeader)
glog.V(4).Infof("Writing Proxy Protocol header: %s", proxyProtocolHeader)
_, err = fmt.Fprintf(clientConn, proxyProtocolHeader)
}
if err != nil {
glog.Errorf("unexpected error writing proxy-protocol header: %s", err)
glog.Errorf("Error writing Proxy Protocol header: %s", err)
clientConn.Close()
} else {
_, err = clientConn.Write(data[:length])
if err != nil {
glog.Errorf("unexpected error writing first 4k of proxy data: %s", err)
glog.Errorf("Error writing the first 4k of proxy data: %s", err)
clientConn.Close()
}
}

View file

@ -25,6 +25,7 @@ import (
"github.com/golang/glog"
"github.com/mitchellh/hashstructure"
"github.com/mitchellh/mapstructure"
"k8s.io/apimachinery/pkg/util/sets"
@ -132,7 +133,7 @@ func ReadConfig(src map[string]string) config.Configuration {
delete(conf, proxyHeaderTimeout)
duration, err := time.ParseDuration(val)
if err != nil {
glog.Warningf("proxy-protocol-header-timeout of %v encounted an error while being parsed %v. Switching to use default value instead.", val, err)
glog.Warningf("proxy-protocol-header-timeout of %v encountered an error while being parsed %v. Switching to use default value instead.", val, err)
} else {
to.ProxyProtocolHeaderTimeout = duration
}
@ -191,6 +192,15 @@ func ReadConfig(src map[string]string) config.Configuration {
glog.Warningf("unexpected error merging defaults: %v", err)
}
hash, err := hashstructure.Hash(to, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
glog.Warningf("unexpected error obtaining hash: %v", err)
}
to.Checksum = fmt.Sprintf("%v", hash)
return to
}

View file

@ -17,11 +17,13 @@ limitations under the License.
package template
import (
"fmt"
"reflect"
"testing"
"time"
"github.com/kylelemons/godebug/pretty"
"github.com/mitchellh/hashstructure"
"k8s.io/ingress-nginx/internal/ingress/controller/config"
)
@ -61,6 +63,7 @@ func TestMergeConfigMapToStruct(t *testing.T) {
"error-log-path": "/var/log/test/error.log",
"use-gzip": "true",
"enable-dynamic-tls-records": "false",
"gzip-level": "9",
"gzip-types": "text/html",
"proxy-real-ip-cidr": "1.1.1.1/8,2.2.2.2/24",
"bind-address": "1.1.1.1,2.2.2.2,3.3.3,2001:db8:a0b:12f0::1,3731:54:65fe:2::a7,33:33:33::33::33",
@ -79,6 +82,7 @@ func TestMergeConfigMapToStruct(t *testing.T) {
def.ProxySendTimeout = 2
def.EnableDynamicTLSRecords = false
def.UseProxyProtocol = true
def.GzipLevel = 9
def.GzipTypes = "text/html"
def.ProxyRealIPCIDR = []string{"1.1.1.1/8", "2.2.2.2/24"}
def.BindAddressIpv4 = []string{"1.1.1.1", "2.2.2.2"}
@ -88,6 +92,14 @@ func TestMergeConfigMapToStruct(t *testing.T) {
def.NginxStatusIpv6Whitelist = []string{"::1", "2001::/16"}
def.ProxyAddOriginalUriHeader = false
hash, err := hashstructure.Hash(def, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
t.Fatalf("unexpected error obtaining hash: %v", err)
}
def.Checksum = fmt.Sprintf("%v", hash)
to := ReadConfig(conf)
if diff := pretty.Compare(to, def); diff != "" {
t.Errorf("unexpected diff: (-got +want)\n%s", diff)
@ -107,6 +119,14 @@ func TestMergeConfigMapToStruct(t *testing.T) {
}
def = config.NewDefault()
hash, err = hashstructure.Hash(def, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
t.Fatalf("unexpected error obtaining hash: %v", err)
}
def.Checksum = fmt.Sprintf("%v", hash)
to = ReadConfig(map[string]string{})
if diff := pretty.Compare(to, def); diff != "" {
t.Errorf("unexpected diff: (-got +want)\n%s", diff)
@ -114,6 +134,15 @@ func TestMergeConfigMapToStruct(t *testing.T) {
def = config.NewDefault()
def.WhitelistSourceRange = []string{"1.1.1.1/32"}
hash, err = hashstructure.Hash(def, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
t.Fatalf("unexpected error obtaining hash: %v", err)
}
def.Checksum = fmt.Sprintf("%v", hash)
to = ReadConfig(map[string]string{
"whitelist-source-range": "1.1.1.1/32",
})
@ -126,7 +155,7 @@ func TestMergeConfigMapToStruct(t *testing.T) {
func TestDefaultLoadBalance(t *testing.T) {
conf := map[string]string{}
to := ReadConfig(conf)
if to.LoadBalanceAlgorithm != "least_conn" {
if to.LoadBalanceAlgorithm != "" {
t.Errorf("default load balance algorithm wrong")
}
}

View file

@ -130,6 +130,7 @@ var (
"filterRateLimits": filterRateLimits,
"buildRateLimitZones": buildRateLimitZones,
"buildRateLimit": buildRateLimit,
"buildResolversForLua": buildResolversForLua,
"buildResolvers": buildResolvers,
"buildUpstreamName": buildUpstreamName,
"isLocationInLocationList": isLocationInLocationList,
@ -151,7 +152,6 @@ var (
"isValidClientBodyBufferSize": isValidClientBodyBufferSize,
"buildForwardedFor": buildForwardedFor,
"buildAuthSignURL": buildAuthSignURL,
"buildOpentracingLoad": buildOpentracingLoad,
"buildOpentracing": buildOpentracing,
"proxySetHeader": proxySetHeader,
"buildInfluxDB": buildInfluxDB,
@ -192,6 +192,7 @@ func buildLuaSharedDictionaries(s interface{}, dynamicConfigurationEnabled bool,
if dynamicConfigurationEnabled {
out = append(out,
"lua_shared_dict configuration_data 5M",
"lua_shared_dict certificate_data 16M",
"lua_shared_dict locks 512k",
"lua_shared_dict balancer_ewma 1M",
"lua_shared_dict balancer_ewma_last_touched_at 1M",
@ -221,6 +222,33 @@ func buildLuaSharedDictionaries(s interface{}, dynamicConfigurationEnabled bool,
return strings.Join(out, ";\n\r") + ";"
}
func buildResolversForLua(res interface{}, disableIpv6 interface{}) string {
nss, ok := res.([]net.IP)
if !ok {
glog.Errorf("expected a '[]net.IP' type but %T was returned", res)
return ""
}
no6, ok := disableIpv6.(bool)
if !ok {
glog.Errorf("expected a 'bool' type but %T was returned", disableIpv6)
return ""
}
if len(nss) == 0 {
return ""
}
r := []string{}
for _, ns := range nss {
if ing_net.IsIPV6(ns) && no6 {
continue
}
r = append(r, fmt.Sprintf("\"%v\"", ns))
}
return strings.Join(r, ", ")
}
// buildResolvers returns the resolvers reading the /etc/resolv.conf file
func buildResolvers(res interface{}, disableIpv6 interface{}) string {
// NGINX need IPV6 addresses to be surrounded by brackets
@ -260,7 +288,7 @@ func buildResolvers(res interface{}, disableIpv6 interface{}) string {
}
// buildLocation produces the location string, if the ingress has redirects
// (specified through the nginx.ingress.kubernetes.io/rewrite-to annotation)
// (specified through the nginx.ingress.kubernetes.io/rewrite-target annotation)
func buildLocation(input interface{}) string {
location, ok := input.(*ingress.Location)
if !ok {
@ -351,7 +379,7 @@ func buildLoadBalancingConfig(b interface{}, fallbackLoadBalancing string) strin
return fmt.Sprintf("%s;", backend.LoadBalancing)
}
if fallbackLoadBalancing == "round_robin" {
if fallbackLoadBalancing == "round_robin" || fallbackLoadBalancing == "" {
return ""
}
@ -359,7 +387,7 @@ func buildLoadBalancingConfig(b interface{}, fallbackLoadBalancing string) strin
}
// buildProxyPass produces the proxy pass string, if the ingress has redirects
// (specified through the nginx.ingress.kubernetes.io/rewrite-to annotation)
// (specified through the nginx.ingress.kubernetes.io/rewrite-target annotation)
// If the annotation nginx.ingress.kubernetes.io/add-base-url:"true" is specified it will
// add a base tag in the head of the response from the service
func buildProxyPass(host string, b interface{}, loc interface{}, dynamicConfigurationEnabled bool) string {
@ -376,12 +404,28 @@ func buildProxyPass(host string, b interface{}, loc interface{}, dynamicConfigur
}
path := location.Path
proto := "http"
proto := "http://"
proxyPass := "proxy_pass"
switch location.BackendProtocol {
case "HTTPS":
proto = "https://"
case "GRPC":
proto = "grpc://"
proxyPass = "grpc_pass"
case "GRPCS":
proto = "grpcs://"
proxyPass = "grpc_pass"
case "AJP":
proto = ""
proxyPass = "ajp_pass"
}
// TODO: Remove after the deprecation of grpc-backend annotation
if location.GRPC {
proxyPass = "grpc_pass"
proto = "grpc"
proto = "grpc://"
}
upstreamName := "upstream_balancer"
@ -393,9 +437,11 @@ func buildProxyPass(host string, b interface{}, loc interface{}, dynamicConfigur
for _, backend := range backends {
if backend.Name == location.Backend {
if backend.Secure || backend.SSLPassthrough {
proto = "https"
// TODO: Remove after the deprecation of secure-backend annotation
proto = "https://"
// TODO: Remove after the deprecation of grpc-backend annotation
if location.GRPC {
proto = "grpcs"
proto = "grpcs://"
}
}
@ -408,7 +454,7 @@ func buildProxyPass(host string, b interface{}, loc interface{}, dynamicConfigur
}
// defProxyPass returns the default proxy_pass, just the name of the upstream
defProxyPass := fmt.Sprintf("%v %s://%s;", proxyPass, proto, upstreamName)
defProxyPass := fmt.Sprintf("%v %s%s;", proxyPass, proto, upstreamName)
// if the path in the ingress rule is equals to the target: no special rewrite
if path == location.Rewrite.Target {
@ -447,15 +493,15 @@ subs_filter '%v' '$1<base href="%v://$http_host%v">' ro;
// special case redirect to /
// ie /something to /
return fmt.Sprintf(`
rewrite %s(.*) /$1 break;
rewrite %s / break;
%v%v %s://%s;
rewrite (?i)%s(.*) /$1 break;
rewrite (?i)%s / break;
%v%v %s%s;
%v`, path, location.Path, xForwardedPrefix, proxyPass, proto, upstreamName, abu)
}
return fmt.Sprintf(`
rewrite %s(.*) %s/$1 break;
%v%v %s://%s;
rewrite (?i)%s(.*) %s/$1 break;
%v%v %s%s;
%v`, path, location.Rewrite.Target, xForwardedPrefix, proxyPass, proto, upstreamName, abu)
}
@ -725,8 +771,8 @@ func isValidClientBodyBufferSize(input interface{}) bool {
if err != nil {
sLowercase := strings.ToLower(s)
kCheck := strings.TrimSuffix(sLowercase, "k")
_, err := strconv.Atoi(kCheck)
check := strings.TrimSuffix(sLowercase, "k")
_, err := strconv.Atoi(check)
if err == nil {
return true
}
@ -816,14 +862,14 @@ func buildAuthSignURL(input interface{}) string {
u, _ := url.Parse(s)
q := u.Query()
if len(q) == 0 {
return fmt.Sprintf("%v?rd=$pass_access_scheme://$http_host$request_uri", s)
return fmt.Sprintf("%v?rd=$pass_access_scheme://$http_host$escaped_request_uri", s)
}
if q.Get("rd") != "" {
return s
}
return fmt.Sprintf("%v&rd=$pass_access_scheme://$http_host$request_uri", s)
return fmt.Sprintf("%v&rd=$pass_access_scheme://$http_host$escaped_request_uri", s)
}
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
@ -841,31 +887,6 @@ func randomString() string {
return string(b)
}
func buildOpentracingLoad(input interface{}) string {
cfg, ok := input.(config.Configuration)
if !ok {
glog.Errorf("expected a 'config.Configuration' type but %T was returned", input)
return ""
}
if !cfg.EnableOpentracing {
return ""
}
buf := bytes.NewBufferString("load_module /etc/nginx/modules/ngx_http_opentracing_module.so;")
buf.WriteString("\r\n")
if cfg.ZipkinCollectorHost != "" {
buf.WriteString("load_module /etc/nginx/modules/ngx_http_zipkin_module.so;")
} else if cfg.JaegerCollectorHost != "" {
buf.WriteString("load_module /etc/nginx/modules/ngx_http_jaeger_module.so;")
}
buf.WriteString("\r\n")
return buf.String()
}
func buildOpentracing(input interface{}) string {
cfg, ok := input.(config.Configuration)
if !ok {
@ -878,24 +899,14 @@ func buildOpentracing(input interface{}) string {
}
buf := bytes.NewBufferString("")
if cfg.ZipkinCollectorHost != "" {
buf.WriteString(fmt.Sprintf("zipkin_collector_host %v;", cfg.ZipkinCollectorHost))
buf.WriteString("\r\n")
buf.WriteString(fmt.Sprintf("zipkin_collector_port %v;", cfg.ZipkinCollectorPort))
buf.WriteString("\r\n")
buf.WriteString(fmt.Sprintf("zipkin_service_name %v;", cfg.ZipkinServiceName))
buf.WriteString("opentracing_load_tracer /usr/local/lib/libzipkin_opentracing.so /etc/nginx/opentracing.json;")
} else if cfg.JaegerCollectorHost != "" {
buf.WriteString(fmt.Sprintf("jaeger_reporter_local_agent_host_port %v:%v;", cfg.JaegerCollectorHost, cfg.JaegerCollectorPort))
buf.WriteString("\r\n")
buf.WriteString(fmt.Sprintf("jaeger_service_name %v;", cfg.JaegerServiceName))
buf.WriteString("\r\n")
buf.WriteString(fmt.Sprintf("jaeger_sampler_type %v;", cfg.JaegerSamplerType))
buf.WriteString("\r\n")
buf.WriteString(fmt.Sprintf("jaeger_sampler_param %v;", cfg.JaegerSamplerParam))
buf.WriteString("opentracing_load_tracer /usr/local/lib/libjaegertracing_plugin.so /etc/nginx/opentracing.json;")
}
buf.WriteString("\r\n")
return buf.String()
}
@ -929,7 +940,7 @@ func proxySetHeader(loc interface{}) string {
return "proxy_set_header"
}
if location.GRPC {
if location.GRPC || location.BackendProtocol == "GRPC" || location.BackendProtocol == "GRPCS" {
return "grpc_set_header"
}

View file

@ -122,7 +122,7 @@ var (
"/jenkins",
"~* /",
`
rewrite /(.*) /jenkins/$1 break;
rewrite (?i)/(.*) /jenkins/$1 break;
proxy_pass http://upstream-name;
`,
false,
@ -136,8 +136,8 @@ proxy_pass http://upstream-name;
"/",
`~* ^/something\/?(?<baseuri>.*)`,
`
rewrite /something/(.*) /$1 break;
rewrite /something / break;
rewrite (?i)/something/(.*) /$1 break;
rewrite (?i)/something / break;
proxy_pass http://upstream-name;
`,
false,
@ -151,7 +151,7 @@ proxy_pass http://upstream-name;
"/not-root",
"~* ^/end-with-slash/(?<baseuri>.*)",
`
rewrite /end-with-slash/(.*) /not-root/$1 break;
rewrite (?i)/end-with-slash/(.*) /not-root/$1 break;
proxy_pass http://upstream-name;
`,
false,
@ -165,7 +165,7 @@ proxy_pass http://upstream-name;
"/not-root",
`~* ^/something-complex\/?(?<baseuri>.*)`,
`
rewrite /something-complex/(.*) /not-root/$1 break;
rewrite (?i)/something-complex/(.*) /not-root/$1 break;
proxy_pass http://upstream-name;
`,
false,
@ -179,7 +179,7 @@ proxy_pass http://upstream-name;
"/jenkins",
"~* /",
`
rewrite /(.*) /jenkins/$1 break;
rewrite (?i)/(.*) /jenkins/$1 break;
proxy_pass http://upstream-name;
set_escape_uri $escaped_base_uri $baseuri;
@ -196,8 +196,8 @@ subs_filter '(<(?:H|h)(?:E|e)(?:A|a)(?:D|d)(?:[^">]|"[^"]*")*>)' '$1<base href="
"/",
`~* ^/something\/?(?<baseuri>.*)`,
`
rewrite /something/(.*) /$1 break;
rewrite /something / break;
rewrite (?i)/something/(.*) /$1 break;
rewrite (?i)/something / break;
proxy_pass http://upstream-name;
set_escape_uri $escaped_base_uri $baseuri;
@ -214,7 +214,7 @@ subs_filter '(<(?:H|h)(?:E|e)(?:A|a)(?:D|d)(?:[^">]|"[^"]*")*>)' '$1<base href="
"/not-root",
`~* ^/end-with-slash/(?<baseuri>.*)`,
`
rewrite /end-with-slash/(.*) /not-root/$1 break;
rewrite (?i)/end-with-slash/(.*) /not-root/$1 break;
proxy_pass http://upstream-name;
set_escape_uri $escaped_base_uri $baseuri;
@ -231,7 +231,7 @@ subs_filter '(<(?:H|h)(?:E|e)(?:A|a)(?:D|d)(?:[^">]|"[^"]*")*>)' '$1<base href="
"/not-root",
`~* ^/something-complex\/?(?<baseuri>.*)`,
`
rewrite /something-complex/(.*) /not-root/$1 break;
rewrite (?i)/something-complex/(.*) /not-root/$1 break;
proxy_pass http://upstream-name;
set_escape_uri $escaped_base_uri $baseuri;
@ -248,8 +248,8 @@ subs_filter '(<(?:H|h)(?:E|e)(?:A|a)(?:D|d)(?:[^">]|"[^"]*")*>)' '$1<base href="
"/",
`~* ^/something\/?(?<baseuri>.*)`,
`
rewrite /something/(.*) /$1 break;
rewrite /something / break;
rewrite (?i)/something/(.*) /$1 break;
rewrite (?i)/something / break;
proxy_pass http://upstream-name;
set_escape_uri $escaped_base_uri $baseuri;
@ -266,7 +266,7 @@ subs_filter '(<(?:H|h)(?:E|e)(?:A|a)(?:D|d)(?:[^">]|"[^"]*")*>)' '$1<base href="
"/something",
`~* /`,
`
rewrite /(.*) /something/$1 break;
rewrite (?i)/(.*) /something/$1 break;
proxy_pass http://sticky-upstream-name;
`,
false,
@ -280,7 +280,7 @@ proxy_pass http://sticky-upstream-name;
"/something",
`~* /`,
`
rewrite /(.*) /something/$1 break;
rewrite (?i)/(.*) /something/$1 break;
proxy_pass http://upstream_balancer;
`,
false,
@ -294,7 +294,7 @@ proxy_pass http://upstream_balancer;
"/something",
`~* ^/there\/?(?<baseuri>.*)`,
`
rewrite /there/(.*) /something/$1 break;
rewrite (?i)/there/(.*) /something/$1 break;
proxy_set_header X-Forwarded-Prefix "/there/";
proxy_pass http://sticky-upstream-name;
`,
@ -601,6 +601,26 @@ func TestBuildForwardedFor(t *testing.T) {
}
}
func TestBuildResolversForLua(t *testing.T) {
ipOne := net.ParseIP("192.0.0.1")
ipTwo := net.ParseIP("2001:db8:1234:0000:0000:0000:0000:0000")
ipList := []net.IP{ipOne, ipTwo}
expected := "\"192.0.0.1\", \"2001:db8:1234::\""
actual := buildResolversForLua(ipList, false)
if expected != actual {
t.Errorf("Expected '%v' but returned '%v'", expected, actual)
}
expected = "\"192.0.0.1\""
actual = buildResolversForLua(ipList, true)
if expected != actual {
t.Errorf("Expected '%v' but returned '%v'", expected, actual)
}
}
func TestBuildResolvers(t *testing.T) {
ipOne := net.ParseIP("192.0.0.1")
ipTwo := net.ParseIP("2001:db8:1234:0000:0000:0000:0000:0000")
@ -697,8 +717,8 @@ func TestBuildAuthSignURL(t *testing.T) {
cases := map[string]struct {
Input, Output string
}{
"default url": {"http://google.com", "http://google.com?rd=$pass_access_scheme://$http_host$request_uri"},
"with random field": {"http://google.com?cat=0", "http://google.com?cat=0&rd=$pass_access_scheme://$http_host$request_uri"},
"default url": {"http://google.com", "http://google.com?rd=$pass_access_scheme://$http_host$escaped_request_uri"},
"with random field": {"http://google.com?cat=0", "http://google.com?cat=0&rd=$pass_access_scheme://$http_host$escaped_request_uri"},
"with rd field": {"http://google.com?cat&rd=$request", "http://google.com?cat&rd=$request"},
}
for k, tc := range cases {

View file

@ -17,6 +17,8 @@ limitations under the License.
package controller
import (
"os"
"os/exec"
"syscall"
"github.com/golang/glog"
@ -41,29 +43,53 @@ func newUpstream(name string) *ingress.Backend {
}
}
// sysctlSomaxconn returns the value of net.core.somaxconn, i.e.
// maximum number of connections that can be queued for acceptance
// sysctlSomaxconn returns the maximum number of connections that can be queued
// for acceptance (value of net.core.somaxconn)
// http://nginx.org/en/docs/http/ngx_http_core_module.html#listen
func sysctlSomaxconn() int {
maxConns, err := sysctl.New().GetSysctl("net/core/somaxconn")
if err != nil || maxConns < 512 {
glog.V(3).Infof("system net.core.somaxconn=%v (using system default)", maxConns)
glog.V(3).Infof("net.core.somaxconn=%v (using system default)", maxConns)
return 511
}
return maxConns
}
// sysctlFSFileMax returns the value of fs.file-max, i.e.
// maximum number of open file descriptors
// sysctlFSFileMax returns the maximum number of open file descriptors (value
// of fs.file-max) or 0 in case of error.
func sysctlFSFileMax() int {
var rLimit syscall.Rlimit
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit)
if err != nil {
glog.Errorf("unexpected error reading system maximum number of open file descriptors (RLIMIT_NOFILE): %v", err)
// returning 0 means don't render the value
glog.Errorf("Error reading system maximum number of open file descriptors (RLIMIT_NOFILE): %v", err)
return 0
}
glog.V(2).Infof("rlimit.max=%v", rLimit.Max)
return int(rLimit.Max)
}
const (
defBinary = "/usr/sbin/nginx"
cfgPath = "/etc/nginx/nginx.conf"
)
func nginxExecCommand(args ...string) *exec.Cmd {
ngx := os.Getenv("NGINX_BINARY")
if ngx == "" {
ngx = defBinary
}
cmdArgs := []string{"--deep", ngx, "-c", cfgPath}
cmdArgs = append(cmdArgs, args...)
return exec.Command("authbind", cmdArgs...)
}
func nginxTestCommand(cfg string) *exec.Cmd {
ngx := os.Getenv("NGINX_BINARY")
if ngx == "" {
ngx = defBinary
}
return exec.Command("authbind", "--deep", ngx, "-c", cfg, "-t")
}