Merge remote-tracking branch 'origin' into refactor-cert

This commit is contained in:
Henry Tran 2018-06-21 11:40:49 -04:00
commit 86def984a3
89 changed files with 4420 additions and 1800 deletions

View file

@ -25,6 +25,12 @@ import (
"k8s.io/kubernetes/pkg/util/filesystem"
)
// ReadWriteByUser defines linux permission to read and write files for the owner user
const ReadWriteByUser = 0660
// ReadByUserGroup defines linux permission to read files by the user and group owner/s
const ReadByUserGroup = 0640
// Filesystem is an interface that we can use to mock various filesystem operations
type Filesystem interface {
filesystem.Filesystem
@ -35,7 +41,7 @@ func NewLocalFS() (Filesystem, error) {
fs := filesystem.DefaultFs{}
for _, directory := range directories {
err := fs.MkdirAll(directory, 0655)
err := fs.MkdirAll(directory, ReadWriteByUser)
if err != nil {
return nil, err
}
@ -97,12 +103,5 @@ func NewFakeFS() (Filesystem, error) {
}
}
fakeFs.MkdirAll("/run", 0655)
fakeFs.MkdirAll("/proc", 0655)
fakeFs.MkdirAll("/etc/nginx/template", 0655)
fakeFs.MkdirAll(DefaultSSLDirectory, 0655)
fakeFs.MkdirAll(AuthDirectory, 0655)
return fakeFs, nil
}

View file

@ -54,7 +54,6 @@ import (
"k8s.io/ingress-nginx/internal/ingress/annotations/sslpassthrough"
"k8s.io/ingress-nginx/internal/ingress/annotations/upstreamhashby"
"k8s.io/ingress-nginx/internal/ingress/annotations/upstreamvhost"
"k8s.io/ingress-nginx/internal/ingress/annotations/vtsfilterkey"
"k8s.io/ingress-nginx/internal/ingress/annotations/xforwardedprefix"
"k8s.io/ingress-nginx/internal/ingress/errors"
"k8s.io/ingress-nginx/internal/ingress/resolver"
@ -90,7 +89,6 @@ type Ingress struct {
UpstreamHashBy string
LoadBalancing string
UpstreamVhost string
VtsFilterKey string
Whitelist ipwhitelist.SourceRange
XForwardedPrefix bool
SSLCiphers string
@ -132,7 +130,6 @@ func NewAnnotationExtractor(cfg resolver.Resolver) Extractor {
"UpstreamHashBy": upstreamhashby.NewParser(cfg),
"LoadBalancing": loadbalancing.NewParser(cfg),
"UpstreamVhost": upstreamvhost.NewParser(cfg),
"VtsFilterKey": vtsfilterkey.NewParser(cfg),
"Whitelist": ipwhitelist.NewParser(cfg),
"XForwardedPrefix": xforwardedprefix.NewParser(cfg),
"SSLCiphers": sslcipher.NewParser(cfg),

View file

@ -19,8 +19,6 @@ package auth
import (
"fmt"
"io/ioutil"
"os"
"path"
"regexp"
"github.com/pkg/errors"
@ -86,17 +84,6 @@ type auth struct {
// NewParser creates a new authentication annotation parser
func NewParser(authDirectory string, r resolver.Resolver) parser.IngressAnnotation {
os.MkdirAll(authDirectory, 0755)
currPath := authDirectory
for currPath != "/" {
currPath = path.Dir(currPath)
err := os.Chmod(currPath, 0755)
if err != nil {
break
}
}
return auth{r, authDirectory}
}
@ -157,8 +144,7 @@ func dumpSecret(filename string, secret *api.Secret) error {
}
}
// TODO: check permissions required
err := ioutil.WriteFile(filename, val, 0777)
err := ioutil.WriteFile(filename, val, file.ReadWriteByUser)
if err != nil {
return ing_errors.LocationDenied{
Reason: errors.Wrap(err, "unexpected error creating password file"),

View file

@ -1,40 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vtsfilterkey
import (
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
"k8s.io/ingress-nginx/internal/ingress/resolver"
)
type vtsFilterKey struct {
r resolver.Resolver
}
// NewParser creates a new vts filter key annotation parser
func NewParser(r resolver.Resolver) parser.IngressAnnotation {
return vtsFilterKey{r}
}
// Parse parses the annotations contained in the ingress rule
// used to indicate if the location/s contains a fragment of
// configuration to be included inside the paths of the rules
func (a vtsFilterKey) Parse(ing *extensions.Ingress) (interface{}, error) {
return parser.GetStringAnnotation("vts-filter-key", ing)
}

View file

@ -26,6 +26,8 @@ import (
"github.com/pkg/errors"
)
const nginxPID = "/tmp/nginx.pid"
// Name returns the healthcheck name
func (n NGINXController) Name() string {
return "nginx-ingress-controller"
@ -58,13 +60,13 @@ func (n *NGINXController) Check(_ *http.Request) error {
if err != nil {
return errors.Wrap(err, "unexpected error reading /proc directory")
}
f, err := n.fileSystem.ReadFile("/run/nginx.pid")
f, err := n.fileSystem.ReadFile(nginxPID)
if err != nil {
return errors.Wrap(err, "unexpected error reading /run/nginx.pid")
return errors.Wrapf(err, "unexpected error reading %v", nginxPID)
}
pid, err := strconv.Atoi(strings.TrimRight(string(f), "\r\n"))
if err != nil {
return errors.Wrap(err, "unexpected error reading the PID from /run/nginx.pid")
return errors.Wrapf(err, "unexpected error reading the nginx PID from %v", nginxPID)
}
_, err = fs.NewProc(pid)

View file

@ -27,6 +27,7 @@ import (
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/kubernetes/pkg/util/filesystem"
"k8s.io/ingress-nginx/internal/file"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
)
@ -60,8 +61,8 @@ func TestNginxCheck(t *testing.T) {
})
// create pid file
fs.MkdirAll("/run", 0655)
pidFile, err := fs.Create("/run/nginx.pid")
fs.MkdirAll("/tmp", file.ReadWriteByUser)
pidFile, err := fs.Create(nginxPID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View file

@ -161,31 +161,6 @@ type Configuration struct {
// By default this is enabled
IgnoreInvalidHeaders bool `json:"ignore-invalid-headers"`
// EnableVtsStatus allows the replacement of the default status page with a third party module named
// nginx-module-vts - https://github.com/vozlt/nginx-module-vts
// By default this is disabled
EnableVtsStatus bool `json:"enable-vts-status,omitempty"`
// Vts config on http level
// Description: Sets parameters for a shared memory zone that will keep states for various keys. The cache is shared between all worker processe
// https://github.com/vozlt/nginx-module-vts#vhost_traffic_status_zone
// Default value is 10m
VtsStatusZoneSize string `json:"vts-status-zone-size,omitempty"`
// Vts config on http level
// Description: Enables the keys by user defined variable. The key is a key string to calculate traffic.
// The name is a group string to calculate traffic. The key and name can contain variables such as $host,
// $server_name. The name's group belongs to filterZones if specified. The key's group belongs to serverZones
// if not specified second argument name. The example with geoip module is as follows:
// https://github.com/vozlt/nginx-module-vts#vhost_traffic_status_filter_by_set_key
// Default value is $geoip_country_code country::*
VtsDefaultFilterKey string `json:"vts-default-filter-key,omitempty"`
// Description: Sets sum key used by vts json output, and the sum label in prometheus output.
// These indicate metrics values for all server zones combined, rather than for a specific one.
// Default value is *
VtsSumKey string `json:"vts-sum-key,omitempty"`
// RetryNonIdempotent since 1.9.13 NGINX will not retry non-idempotent requests (POST, LOCK, PATCH)
// in case of an error. The previous behavior can be restored using the value true
RetryNonIdempotent bool `json:"retry-non-idempotent"`
@ -531,6 +506,9 @@ type Configuration struct {
// http://github.com/influxdata/nginx-influxdb-module/
// By default this is disabled
EnableInfluxDB bool `json:"enable-influxdb"`
// Checksum contains a checksum of the configmap configuration
Checksum string `json:"-"`
}
// NewDefault returns the default nginx configuration
@ -603,9 +581,6 @@ func NewDefault() Configuration {
WorkerProcesses: strconv.Itoa(runtime.NumCPU()),
WorkerShutdownTimeout: "10s",
LoadBalanceAlgorithm: defaultLoadBalancerAlgorithm,
VtsStatusZoneSize: "10m",
VtsDefaultFilterKey: "$geoip_country_code country::*",
VtsSumKey: "*",
VariablesHashBucketSize: 128,
VariablesHashMaxSize: 2048,
UseHTTP2: true,

View file

@ -22,7 +22,6 @@ import (
"sort"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/golang/glog"
@ -61,15 +60,15 @@ type Configuration struct {
ForceNamespaceIsolation bool
// optional
// +optional
TCPConfigMapName string
// optional
// +optional
UDPConfigMapName string
DefaultHealthzURL string
DefaultSSLCertificate string
// optional
// +optional
PublishService string
PublishStatusAddress string
@ -98,7 +97,7 @@ type Configuration struct {
DisableLua bool
}
// GetPublishService returns the configured service used to set ingress status
// GetPublishService returns the Service used to set the load-balancer status of Ingresses.
func (n NGINXController) GetPublishService() *apiv1.Service {
s, err := n.store.GetService(n.cfg.PublishService)
if err != nil {
@ -108,9 +107,9 @@ func (n NGINXController) GetPublishService() *apiv1.Service {
return s
}
// sync collects all the pieces required to assemble the configuration file and
// then sends the content to the backend (OnUpdate) receiving the populated
// template as response reloading the backend if is required.
// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
func (n *NGINXController) syncIngress(interface{}) error {
n.syncRateLimiter.Accept()
@ -118,7 +117,7 @@ func (n *NGINXController) syncIngress(interface{}) error {
return nil
}
// Sort ingress rules using the ResourceVersion field
// sort Ingresses using the ResourceVersion field
ings := n.store.ListIngresses()
sort.SliceStable(ings, func(i, j int) bool {
ir := ings[i].ResourceVersion
@ -136,7 +135,7 @@ func (n *NGINXController) syncIngress(interface{}) error {
for _, loc := range server.Locations {
if loc.Path != rootLocation {
glog.Warningf("ignoring path %v of ssl passthrough host %v", loc.Path, server.Hostname)
glog.Warningf("Ignoring SSL Passthrough for location %q in server %q", loc.Path, server.Hostname)
continue
}
passUpstreams = append(passUpstreams, &ingress.SSLPassthroughBackend{
@ -155,27 +154,29 @@ func (n *NGINXController) syncIngress(interface{}) error {
TCPEndpoints: n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
UDPEndpoints: n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
PassthroughBackends: passUpstreams,
ConfigurationChecksum: n.store.GetBackendConfiguration().Checksum,
}
if !n.isForceReload() && n.runningConfig.Equal(&pcfg) {
glog.V(3).Infof("skipping backend reload (no changes detected)")
if n.runningConfig.Equal(&pcfg) {
glog.V(3).Infof("No configuration change detected, skipping backend reload.")
return nil
}
if n.cfg.DynamicConfigurationEnabled && n.IsDynamicConfigurationEnough(&pcfg) && !n.isForceReload() {
glog.Infof("skipping reload")
if n.cfg.DynamicConfigurationEnabled && n.IsDynamicConfigurationEnough(&pcfg) {
glog.Infof("Changes handled by the dynamic configuration, skipping backend reload.")
} else {
glog.Infof("backend reload required")
glog.Infof("Configuration changes detected, backend reload required.")
err := n.OnUpdate(pcfg)
if err != nil {
IncReloadErrorCount()
ConfigSuccess(false)
glog.Errorf("unexpected failure restarting the backend: \n%v", err)
glog.Errorf("Unexpected failure reloading the backend:\n%v", err)
return err
}
glog.Infof("ingress backend successfully reloaded...")
glog.Infof("Backend successfully reloaded.")
ConfigSuccess(true)
IncReloadCount()
setSSLExpireTime(servers)
@ -185,49 +186,45 @@ func (n *NGINXController) syncIngress(interface{}) error {
isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
go func(isFirstSync bool) {
if isFirstSync {
glog.Infof("first sync of Nginx configuration")
glog.Infof("Initial synchronization of the NGINX configuration.")
// it takes time for Nginx to start listening on the port
// it takes time for NGINX to start listening on the configured ports
time.Sleep(1 * time.Second)
}
err := configureDynamically(&pcfg, n.cfg.ListenPorts.Status)
if err == nil {
glog.Infof("dynamic reconfiguration succeeded")
glog.Infof("Dynamic reconfiguration succeeded.")
} else {
glog.Warningf("could not dynamically reconfigure: %v", err)
glog.Warningf("Dynamic reconfiguration failed: %v", err)
}
}(isFirstSync)
}
n.runningConfig = &pcfg
n.SetForceReload(false)
return nil
}
func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Protocol) []ingress.L4Service {
glog.V(3).Infof("obtaining information about stream services of type %v located in configmap %v", proto, configmapName)
glog.V(3).Infof("Obtaining information about %v stream services from ConfigMap %q", proto, configmapName)
if configmapName == "" {
// no configmap configured
return []ingress.L4Service{}
}
_, _, err := k8s.ParseNameNS(configmapName)
if err != nil {
glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err)
glog.Errorf("Error parsing ConfigMap reference %q: %v", configmapName, err)
return []ingress.L4Service{}
}
configmap, err := n.store.GetConfigMap(configmapName)
if err != nil {
glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err)
glog.Errorf("Error reading ConfigMap %q: %v", configmapName, err)
return []ingress.L4Service{}
}
var svcs []ingress.L4Service
var svcProxyProtocol ingress.ProxyProtocol
// k -> port to expose
// v -> <namespace>/<service name>:<port from service to be used>
rp := []int{
n.cfg.ListenPorts.HTTP,
@ -239,21 +236,22 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
}
reserverdPorts := sets.NewInt(rp...)
for k, v := range configmap.Data {
externalPort, err := strconv.Atoi(k)
// svcRef format: <(str)namespace>/<(str)service>:<(intstr)port>[:<(bool)decode>:<(bool)encode>]
for port, svcRef := range configmap.Data {
externalPort, err := strconv.Atoi(port)
if err != nil {
glog.Warningf("%v is not valid as a TCP/UDP port", k)
glog.Warningf("%q is not a valid %v port number", port, proto)
continue
}
if reserverdPorts.Has(externalPort) {
glog.Warningf("port %v cannot be used for TCP or UDP services. It is reserved for the Ingress controller", k)
glog.Warningf("Port %d cannot be used for %v stream services. It is reserved for the Ingress controller.", externalPort, proto)
continue
}
nsSvcPort := strings.Split(v, ":")
nsSvcPort := strings.Split(svcRef, ":")
if len(nsSvcPort) < 2 {
glog.Warningf("invalid format (namespace/name:port:[PROXY]:[PROXY]) '%v'", k)
glog.Warningf("Invalid Service reference %q for %v port %d", svcRef, proto, externalPort)
continue
}
@ -262,7 +260,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
svcProxyProtocol.Decode = false
svcProxyProtocol.Encode = false
// Proxy protocol is possible if the service is TCP
// Proxy Protocol is only compatible with TCP Services
if len(nsSvcPort) >= 3 && proto == apiv1.ProtocolTCP {
if len(nsSvcPort) >= 3 && strings.ToUpper(nsSvcPort[2]) == "PROXY" {
svcProxyProtocol.Decode = true
@ -280,14 +278,15 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
svc, err := n.store.GetService(nsName)
if err != nil {
glog.Warningf("error getting service %v: %v", nsName, err)
glog.Warningf("Error getting Service %q from local store: %v", nsName, err)
continue
}
var endps []ingress.Endpoint
targetPort, err := strconv.Atoi(svcPort)
if err != nil {
glog.V(3).Infof("searching service %v endpoints using the name '%v'", svcNs, svcName, svcPort)
// not a port number, fall back to using port name
glog.V(3).Infof("Searching Endpoints with %v port name %q for Service %q", proto, svcPort, nsName)
for _, sp := range svc.Spec.Ports {
if sp.Name == svcPort {
if sp.Protocol == proto {
@ -297,8 +296,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
}
}
} else {
// we need to use the TargetPort (where the endpoints are running)
glog.V(3).Infof("searching service %v/%v endpoints using the target port '%v'", svcNs, svcName, targetPort)
glog.V(3).Infof("Searching Endpoints with %v port number %d for Service %q", proto, targetPort, nsName)
for _, sp := range svc.Spec.Ports {
if sp.Port == int32(targetPort) {
if sp.Protocol == proto {
@ -309,10 +307,10 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
}
}
// stream services cannot contain empty upstreams and there is no
// default backend equivalent
// stream services cannot contain empty upstreams and there is
// no default backend equivalent
if len(endps) == 0 {
glog.Warningf("service %v/%v does not have any active endpoints for port %v and protocol %v", svcNs, svcName, svcPort, proto)
glog.Warningf("Service %q does not have any active Endpoint for %v port %v", nsName, proto, svcPort)
continue
}
@ -332,9 +330,8 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
return svcs
}
// getDefaultUpstream returns an upstream associated with the
// default backend service. In case of error retrieving information
// configure the upstream to return http code 503.
// getDefaultUpstream returns the upstream associated with the default backend.
// Configures the upstream to return HTTP code 503 in case of error.
func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
upstream := &ingress.Backend{
Name: defUpstreamName,
@ -342,14 +339,14 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
svcKey := n.cfg.DefaultService
svc, err := n.store.GetService(svcKey)
if err != nil {
glog.Warningf("unexpected error searching the default backend %v: %v", n.cfg.DefaultService, err)
glog.Warningf("Unexpected error getting default backend %q from local store: %v", n.cfg.DefaultService, err)
upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint())
return upstream
}
endps := getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Config{}, n.store.GetServiceEndpoints)
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
glog.Warningf("Service %q does not have any active Endpoint", svcKey)
endps = []ingress.Endpoint{n.DefaultEndpoint()}
}
@ -358,8 +355,9 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
return upstream
}
// getBackendServers returns a list of Upstream and Server to be used by the backend
// An upstream can be used in multiple servers if the namespace, service name and port are the same
// getBackendServers returns a list of Upstream and Server to be used by the
// backend. An upstream can be used in multiple servers if the namespace,
// service name and port are the same.
func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]*ingress.Backend, []*ingress.Server) {
du := n.getDefaultUpstream()
upstreams := n.createUpstreams(ingresses, du)
@ -368,7 +366,7 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
for _, ing := range ingresses {
anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Errorf("unexpected error reading ingress annotations: %v", err)
glog.Errorf("Unexpected error reading annotations for Ingress %q from local store: %v", ing.Name, err)
}
for _, rule := range ing.Spec.Rules {
@ -383,7 +381,7 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
if rule.HTTP == nil &&
host != defServerName {
glog.V(3).Infof("ingress rule %v/%v does not contain HTTP rules, using default backend", ing.Namespace, ing.Name)
glog.V(3).Infof("Ingress \"%v/%v\" does not contain any HTTP rule, using default backend.", ing.Namespace, ing.Name)
continue
}
@ -393,23 +391,21 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
if server.CertificateAuth.CAFileName == "" {
server.CertificateAuth = anns.CertificateAuth
// It is possible that no CAFileName is found in the secret
if server.CertificateAuth.CAFileName == "" {
glog.V(3).Infof("secret %v does not contain 'ca.crt', mutual authentication not enabled - ingress rule %v/%v.", server.CertificateAuth.Secret, ing.Namespace, ing.Name)
if server.CertificateAuth.Secret != "" && server.CertificateAuth.CAFileName == "" {
glog.V(3).Infof("Secret %q does not contain 'ca.crt' key, mutual authentication disabled for Ingress \"%v/%v\"", server.CertificateAuth.Secret, ing.Namespace, ing.Name)
}
} else {
glog.V(3).Infof("server %v already contains a mutual authentication configuration - ingress rule %v/%v", server.Hostname, ing.Namespace, ing.Name)
glog.V(3).Infof("Server %v is already configured for mutual authentication (Ingress \"%v/%v\")", server.Hostname, ing.Namespace, ing.Name)
}
for _, path := range rule.HTTP.Paths {
upsName := fmt.Sprintf("%v-%v-%v",
ing.GetNamespace(),
ing.Namespace,
path.Backend.ServiceName,
path.Backend.ServicePort.String())
ups := upstreams[upsName]
// if there's no path defined we assume /
nginxPath := rootLocation
if path.Path != "" {
nginxPath = path.Path
@ -421,11 +417,11 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
addLoc = false
if !loc.IsDefBackend {
glog.V(3).Infof("avoiding replacement of ingress rule %v/%v location %v upstream %v (%v)", ing.Namespace, ing.Name, loc.Path, ups.Name, loc.Backend)
glog.V(3).Infof("Location %q already configured for server %q with upstream %q (Ingress \"%v/%v\")", loc.Path, server.Hostname, loc.Backend, ing.Namespace, ing.Name)
break
}
glog.V(3).Infof("replacing ingress rule %v/%v location %v upstream %v (%v)", ing.Namespace, ing.Name, loc.Path, ups.Name, loc.Backend)
glog.V(3).Infof("Replacing location %q for server %q with upstream %q to use upstream %q (Ingress \"%v/%v\")", loc.Path, server.Hostname, loc.Backend, ups.Name, ing.Namespace, ing.Name)
loc.Backend = ups.Name
loc.IsDefBackend = false
loc.Port = ups.Port
@ -441,7 +437,6 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
loc.Redirect = anns.Redirect
loc.Rewrite = anns.Rewrite
loc.UpstreamVhost = anns.UpstreamVhost
loc.VtsFilterKey = anns.VtsFilterKey
loc.Whitelist = anns.Whitelist
loc.Denied = anns.Denied
loc.XForwardedPrefix = anns.XForwardedPrefix
@ -459,9 +454,10 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
break
}
}
// is a new location
// new location
if addLoc {
glog.V(3).Infof("adding location %v in ingress rule %v/%v upstream %v", nginxPath, ing.Namespace, ing.Name, ups.Name)
glog.V(3).Infof("Adding location %q for server %q with upstream %q (Ingress \"%v/%v\")", nginxPath, server.Hostname, ups.Name, ing.Namespace, ing.Name)
loc := &ingress.Location{
Path: nginxPath,
Backend: ups.Name,
@ -479,7 +475,6 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
Redirect: anns.Redirect,
Rewrite: anns.Rewrite,
UpstreamVhost: anns.UpstreamVhost,
VtsFilterKey: anns.VtsFilterKey,
Whitelist: anns.Whitelist,
Denied: anns.Denied,
XForwardedPrefix: anns.XForwardedPrefix,
@ -525,15 +520,16 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
for _, location := range server.Locations {
if upstream.Name == location.Backend {
if len(upstream.Endpoints) == 0 {
glog.V(3).Infof("upstream %v does not have any active endpoints.", upstream.Name)
glog.V(3).Infof("Upstream %q does not have any active endpoints.", upstream.Name)
location.Backend = "" // for nginx.tmpl checking
// check if the location contains endpoints and a custom default backend
if location.DefaultBackend != nil {
sp := location.DefaultBackend.Spec.Ports[0]
endps := getEndpoints(location.DefaultBackend, &sp, apiv1.ProtocolTCP, &healthcheck.Config{}, n.store.GetServiceEndpoints)
if len(endps) > 0 {
glog.V(3).Infof("using custom default backend in server %v location %v (service %v/%v)",
server.Hostname, location.Path, location.DefaultBackend.Namespace, location.DefaultBackend.Name)
glog.V(3).Infof("Using custom default backend for location %q in server %q (Service \"%v/%v\")",
location.Path, server.Hostname, location.DefaultBackend.Namespace, location.DefaultBackend.Name)
nb := upstream.DeepCopy()
name := fmt.Sprintf("custom-default-backend-%v", upstream.Name)
nb.Name = name
@ -544,14 +540,12 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
}
}
// Configure Backends[].SSLPassthrough
if server.SSLPassthrough {
if location.Path == rootLocation {
if location.Backend == defUpstreamName {
glog.Warningf("ignoring ssl passthrough of %v as it doesn't have a default backend (root context)", server.Hostname)
glog.Warningf("Server %q has no default backend, ignoring SSL Passthrough.", server.Hostname)
continue
}
isHTTPSfrom = append(isHTTPSfrom, server)
}
}
@ -564,7 +558,7 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
}
}
// create the list of upstreams and skip those without endpoints
// create the list of upstreams and skip those without Endpoints
for _, upstream := range upstreams {
if len(upstream.Endpoints) == 0 {
continue
@ -591,8 +585,8 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
return aUpstreams, aServers
}
// createUpstreams creates the NGINX upstreams for each service referenced in
// Ingress rules. The servers inside the upstream are endpoints.
// createUpstreams creates the NGINX upstreams (Endpoints) for each Service
// referenced in Ingress rules.
func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingress.Backend) map[string]*ingress.Backend {
upstreams := make(map[string]*ingress.Backend)
upstreams[defUpstreamName] = du
@ -600,17 +594,17 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
for _, ing := range data {
anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Errorf("unexpected error reading ingress annotations: %v", err)
glog.Errorf("Error reading Ingress annotations: %v", err)
}
var defBackend string
if ing.Spec.Backend != nil {
defBackend = fmt.Sprintf("%v-%v-%v",
ing.GetNamespace(),
ing.Namespace,
ing.Spec.Backend.ServiceName,
ing.Spec.Backend.ServicePort.String())
glog.V(3).Infof("creating upstream %v", defBackend)
glog.V(3).Infof("Creating upstream %q", defBackend)
upstreams[defBackend] = newUpstream(defBackend)
if !upstreams[defBackend].Secure {
upstreams[defBackend].Secure = anns.SecureUpstream.Secure
@ -625,14 +619,13 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
upstreams[defBackend].LoadBalancing = anns.LoadBalancing
}
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName)
svcKey := fmt.Sprintf("%v/%v", ing.Namespace, ing.Spec.Backend.ServiceName)
// Add the service cluster endpoint as the upstream instead of individual endpoints
// if the serviceUpstream annotation is enabled
// add the service ClusterIP as a single Endpoint instead of individual Endpoints
if anns.ServiceUpstream {
endpoint, err := n.getServiceClusterEndpoint(svcKey, ing.Spec.Backend)
if err != nil {
glog.Errorf("Failed to get service cluster endpoint for service %s: %v", svcKey, err)
glog.Errorf("Failed to determine a suitable ClusterIP Endpoint for Service %q: %v", svcKey, err)
} else {
upstreams[defBackend].Endpoints = []ingress.Endpoint{endpoint}
}
@ -642,7 +635,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
endps, err := n.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), &anns.HealthCheck)
upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...)
if err != nil {
glog.Warningf("error creating upstream %v: %v", defBackend, err)
glog.Warningf("Error creating upstream %q: %v", defBackend, err)
}
}
@ -655,7 +648,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
for _, path := range rule.HTTP.Paths {
name := fmt.Sprintf("%v-%v-%v",
ing.GetNamespace(),
ing.Namespace,
path.Backend.ServiceName,
path.Backend.ServicePort.String())
@ -663,7 +656,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
continue
}
glog.V(3).Infof("creating upstream %v", name)
glog.V(3).Infof("Creating upstream %q", name)
upstreams[name] = newUpstream(name)
upstreams[name].Port = path.Backend.ServicePort
@ -683,14 +676,13 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
upstreams[name].LoadBalancing = anns.LoadBalancing
}
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName)
svcKey := fmt.Sprintf("%v/%v", ing.Namespace, path.Backend.ServiceName)
// Add the service cluster endpoint as the upstream instead of individual endpoints
// if the serviceUpstream annotation is enabled
// add the service ClusterIP as a single Endpoint instead of individual Endpoints
if anns.ServiceUpstream {
endpoint, err := n.getServiceClusterEndpoint(svcKey, &path.Backend)
if err != nil {
glog.Errorf("failed to get service cluster endpoint for service %s: %v", svcKey, err)
glog.Errorf("Failed to determine a suitable ClusterIP Endpoint for Service %q: %v", svcKey, err)
} else {
upstreams[name].Endpoints = []ingress.Endpoint{endpoint}
}
@ -699,7 +691,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
if len(upstreams[name].Endpoints) == 0 {
endp, err := n.serviceEndpoints(svcKey, path.Backend.ServicePort.String(), &anns.HealthCheck)
if err != nil {
glog.Warningf("error obtaining service endpoints: %v", err)
glog.Warningf("Error obtaining Endpoints for Service %q: %v", svcKey, err)
continue
}
upstreams[name].Endpoints = endp
@ -707,7 +699,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
s, err := n.store.GetService(svcKey)
if err != nil {
glog.Warningf("error obtaining service: %v", err)
glog.Warningf("Error obtaining Service %q: %v", svcKey, err)
continue
}
@ -719,20 +711,22 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
return upstreams
}
// getServiceClusterEndpoint returns an Endpoint corresponding to the ClusterIP
// field of a Service.
func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) {
svc, err := n.store.GetService(svcKey)
if err != nil {
return endpoint, fmt.Errorf("service %v does not exist", svcKey)
return endpoint, fmt.Errorf("service %q does not exist", svcKey)
}
if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" {
return endpoint, fmt.Errorf("No ClusterIP found for service %s", svcKey)
return endpoint, fmt.Errorf("no ClusterIP found for Service %q", svcKey)
}
endpoint.Address = svc.Spec.ClusterIP
// If the service port in the ingress uses a name, lookup
// the actual port in the service spec
// if the Service port is referenced by name in the Ingress, lookup the
// actual port in the service spec
if backend.ServicePort.Type == intstr.String {
var port int32 = -1
for _, svcPort := range svc.Spec.Ports {
@ -742,7 +736,7 @@ func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *exte
}
}
if port == -1 {
return endpoint, fmt.Errorf("no port mapped for service %s and port name %s", svc.Name, backend.ServicePort.String())
return endpoint, fmt.Errorf("service %q does not have a port named %q", svc.Name, backend.ServicePort)
}
endpoint.Port = fmt.Sprintf("%d", port)
} else {
@ -752,27 +746,27 @@ func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *exte
return endpoint, err
}
// serviceEndpoints returns the upstream servers (endpoints) associated
// to a service.
// serviceEndpoints returns the upstream servers (Endpoints) associated with a
// Service.
func (n *NGINXController) serviceEndpoints(svcKey, backendPort string,
hz *healthcheck.Config) ([]ingress.Endpoint, error) {
svc, err := n.store.GetService(svcKey)
var upstreams []ingress.Endpoint
if err != nil {
return upstreams, fmt.Errorf("error getting service %v from the cache: %v", svcKey, err)
return upstreams, fmt.Errorf("error getting Service %q from local store: %v", svcKey, err)
}
glog.V(3).Infof("obtaining port information for service %v", svcKey)
glog.V(3).Infof("Obtaining ports information for Service %q", svcKey)
for _, servicePort := range svc.Spec.Ports {
// targetPort could be a string, use the name or the port (int)
// targetPort could be a string, use either the port name or number (int)
if strconv.Itoa(int(servicePort.Port)) == backendPort ||
servicePort.TargetPort.String() == backendPort ||
servicePort.Name == backendPort {
endps := getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, hz, n.store.GetServiceEndpoints)
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
glog.Warningf("Service %q does not have any active Endpoint.", svcKey)
}
if n.cfg.SortBackends {
@ -791,11 +785,11 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string,
}
}
// Ingress with an ExternalName service and no port defined in the service.
// Ingress with an ExternalName Service and no port defined for that Service
if len(svc.Spec.Ports) == 0 && svc.Spec.Type == apiv1.ServiceTypeExternalName {
externalPort, err := strconv.Atoi(backendPort)
if err != nil {
glog.Warningf("only numeric ports are allowed in ExternalName services: %v is not valid as a TCP/UDP port", backendPort)
glog.Warningf("Only numeric ports are allowed in ExternalName Services: %q is not a valid port number.", backendPort)
return upstreams, nil
}
@ -806,7 +800,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string,
}
endps := getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, hz, n.store.GetServiceEndpoints)
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
glog.Warningf("Service %q does not have any active Endpoint.", svcKey)
return upstreams, nil
}
@ -825,17 +819,14 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string,
return upstreams, nil
}
// createServers initializes a map that contains information about the list of
// FDQN referenced by ingress rules and the common name field in the referenced
// SSL certificates. Each server is configured with location / using a default
// backend specified by the user or the one inside the ingress spec.
// createServers builds a map of host name to Server structs from a map of
// already computed Upstream structs. Each Server is configured with at least
// one root location, which uses a default backend if left unspecified.
func (n *NGINXController) createServers(data []*extensions.Ingress,
upstreams map[string]*ingress.Backend,
du *ingress.Backend) map[string]*ingress.Server {
servers := make(map[string]*ingress.Server, len(data))
// If a server has a hostname equivalent to a pre-existing alias, then we
// remove the alias to avoid conflicts.
aliases := make(map[string]string, len(data))
bdef := n.store.GetDefaultBackend()
@ -858,15 +849,14 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
defaultPemFileName := n.cfg.FakeCertificatePath
defaultPemSHA := n.cfg.FakeCertificateSHA
// Tries to fetch the default Certificate from nginx configuration.
// If it does not exists, use the ones generated on Start()
// read custom default SSL certificate, fall back to generated default certificate
defaultCertificate, err := n.store.GetLocalSSLCert(n.cfg.DefaultSSLCertificate)
if err == nil {
defaultPemFileName = defaultCertificate.PemFileName
defaultPemSHA = defaultCertificate.PemSHA
}
// initialize the default server
// initialize default server and root location
servers[defServerName] = &ingress.Server{
Hostname: defServerName,
SSLCert: ingress.SSLCert{
@ -883,33 +873,34 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
},
}}
// initialize all the servers
// initialize all other servers
for _, ing := range data {
anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Errorf("unexpected error reading ingress annotations: %v", err)
glog.Errorf("Error reading Ingress %q annotations from local store: %v", ing.Name, err)
}
// default upstream server
// default upstream name
un := du.Name
if ing.Spec.Backend != nil {
// replace default backend
defUpstream := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName, ing.Spec.Backend.ServicePort.String())
defUpstream := fmt.Sprintf("%v-%v-%v", ing.Namespace, ing.Spec.Backend.ServiceName, ing.Spec.Backend.ServicePort.String())
if backendUpstream, ok := upstreams[defUpstream]; ok {
// use backend specified in Ingress as the default backend for all its rules
un = backendUpstream.Name
// Special case:
// ingress only with a backend and no rules
// this case defines a "catch all" server
// special "catch all" case, Ingress with a backend but no rule
defLoc := servers[defServerName].Locations[0]
if defLoc.IsDefBackend && len(ing.Spec.Rules) == 0 {
glog.Infof("Ingress \"%v/%v\" defines a backend but no rule. Using it to configure the catch-all server %q", ing.Namespace, ing.Name, defServerName)
defLoc.IsDefBackend = false
defLoc.Backend = backendUpstream.Name
defLoc.Service = backendUpstream.Service
defLoc.Ingress = ing
// we need to use the ingress annotations
// customize using Ingress annotations
defLoc.Logs = anns.Logs
defLoc.BasicDigestAuth = anns.BasicDigestAuth
defLoc.ClientBodyBufferSize = anns.ClientBodyBufferSize
@ -918,16 +909,17 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
defLoc.ExternalAuth = anns.ExternalAuth
defLoc.Proxy = anns.Proxy
defLoc.RateLimit = anns.RateLimit
// TODO: Redirect and rewrite can affect the catch all behavior. Don't use this annotations for now
// TODO: Redirect and rewrite can affect the catch all behavior, skip for now
// defLoc.Redirect = anns.Redirect
// defLoc.Rewrite = anns.Rewrite
defLoc.UpstreamVhost = anns.UpstreamVhost
defLoc.VtsFilterKey = anns.VtsFilterKey
defLoc.Whitelist = anns.Whitelist
defLoc.Denied = anns.Denied
defLoc.GRPC = anns.GRPC
defLoc.LuaRestyWAF = anns.LuaRestyWAF
defLoc.InfluxDB = anns.InfluxDB
} else {
glog.V(3).Infof("Ingress \"%v/%v\" defines both a backend and rules. Using its backend as default upstream for all its rules.", ing.Namespace, ing.Name)
}
}
}
@ -963,7 +955,7 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
for _, ing := range data {
anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Errorf("unexpected error reading ingress annotations: %v", err)
glog.Errorf("Error reading Ingress %q annotations from local store: %v", ing.Name, err)
}
for _, rule := range ing.Spec.Rules {
@ -972,7 +964,6 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
host = defServerName
}
// setup server aliases
if anns.Alias != "" {
if servers[host].Alias == "" {
servers[host].Alias = anns.Alias
@ -980,23 +971,21 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
aliases["Alias"] = host
}
} else {
glog.Warningf("ingress %v/%v for host %v contains an Alias but one has already been configured.",
ing.Namespace, ing.Name, host)
glog.Warningf("Aliases already configured for server %q, skipping (Ingress \"%v/%v\")",
host, ing.Namespace, ing.Name)
}
}
//notifying the user that it has already been configured.
if servers[host].ServerSnippet != "" && anns.ServerSnippet != "" {
glog.Warningf("ingress %v/%v for host %v contains a Server Snippet section that it has already been configured.",
ing.Namespace, ing.Name, host)
if anns.ServerSnippet != "" {
if servers[host].ServerSnippet == "" {
servers[host].ServerSnippet = anns.ServerSnippet
} else {
glog.Warningf("Server snippet already configured for server %q, skipping (Ingress \"%v/%v\")",
host, ing.Namespace, ing.Name)
}
}
// only add a server snippet if the server does not have one previously configured
if servers[host].ServerSnippet == "" && anns.ServerSnippet != "" {
servers[host].ServerSnippet = anns.ServerSnippet
}
// only add ssl ciphers if the server does not have one previously configured
// only add SSL ciphers if the server does not have them previously configured
if servers[host].SSLCiphers == "" && anns.SSLCiphers != "" {
servers[host].SSLCiphers = anns.SSLCiphers
}
@ -1007,14 +996,14 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
}
if len(ing.Spec.TLS) == 0 {
glog.V(3).Infof("ingress %v/%v for host %v does not contains a TLS section", ing.Namespace, ing.Name, host)
glog.V(3).Infof("Ingress \"%v/%v\" does not contains a TLS section.", ing.Namespace, ing.Name)
continue
}
tlsSecretName := extractTLSSecretName(host, ing, n.store.GetLocalSSLCert)
if tlsSecretName == "" {
glog.V(3).Infof("host %v is listed on tls section but secretName is empty. Using default cert", host)
glog.V(3).Infof("Host %q is listed in the TLS section but secretName is empty. Using default certificate.", host)
servers[host].SSLCert.PemFileName = defaultPemFileName
servers[host].SSLCert.PemSHA = defaultPemSHA
continue
@ -1023,19 +1012,19 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
key := fmt.Sprintf("%v/%v", ing.Namespace, tlsSecretName)
cert, err := n.store.GetLocalSSLCert(key)
if err != nil {
glog.Warningf("ssl certificate \"%v\" does not exist in local store", key)
glog.Warningf("SSL certificate %q does not exist in local store.", key)
continue
}
err = cert.Certificate.VerifyHostname(host)
if err != nil {
glog.Warningf("unexpected error validating SSL certificate %v for host %v. Reason: %v", key, host, err)
glog.Warningf("Unexpected error validating SSL certificate %q for server %q: %v", key, host, err)
glog.Warningf("Validating certificate against DNS names. This will be deprecated in a future version.")
// check the common name field
// check the Common Name field
// https://github.com/golang/go/issues/22922
err := verifyHostname(host, cert.Certificate)
if err != nil {
glog.Warningf("ssl certificate %v does not contain a Common Name or Subject Alternative Name for host %v. Reason: %v", key, host, err)
glog.Warningf("SSL certificate %q does not contain a Common Name or Subject Alternative Name for server %q: %v", key, host, err)
continue
}
}
@ -1043,14 +1032,14 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
servers[host].SSLCert = *cert
if cert.ExpireTime.Before(time.Now().Add(240 * time.Hour)) {
glog.Warningf("ssl certificate for host %v is about to expire in 10 days", host)
glog.Warningf("SSL certificate for server %q is about to expire (%v)", cert.ExpireTime)
}
}
}
for alias, host := range aliases {
if _, ok := servers[alias]; ok {
glog.Warningf("There is a conflict with server hostname '%v' and alias '%v' (in server %v). Removing alias to avoid conflicts.", alias, host)
glog.Warningf("Conflicting hostname (%v) and alias (%v) in server %q. Removing alias to avoid conflicts.", alias, host)
servers[host].Alias = ""
}
}
@ -1058,43 +1047,28 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
return servers
}
func (n *NGINXController) isForceReload() bool {
return atomic.LoadInt32(&n.forceReload) != 0
}
// SetForceReload sets if the ingress controller should be reloaded or not
func (n *NGINXController) SetForceReload(shouldReload bool) {
if shouldReload {
atomic.StoreInt32(&n.forceReload, 1)
n.syncQueue.Enqueue(&extensions.Ingress{})
} else {
atomic.StoreInt32(&n.forceReload, 0)
}
}
// extractTLSSecretName returns the name of the secret that
// contains a SSL certificate for a particular hostname.
// In case there is no match, an empty string is returned.
// extractTLSSecretName returns the name of the Secret containing a SSL
// certificate for the given host name, or an empty string.
func extractTLSSecretName(host string, ing *extensions.Ingress,
getLocalSSLCert func(string) (*ingress.SSLCert, error)) string {
if ing == nil {
return ""
}
// naively return Secret name from TLS spec if host name matches
for _, tls := range ing.Spec.TLS {
if sets.NewString(tls.Hosts...).Has(host) {
return tls.SecretName
}
}
// contains a TLS section but none of the host match or there
// is no hosts in the TLS section. As last resort we valide
// the host against the certificate and we use it if is valid
// no TLS host matching host name, try each TLS host for matching CN
for _, tls := range ing.Spec.TLS {
key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName)
cert, err := getLocalSSLCert(key)
if err != nil {
glog.Warningf("ssl certificate \"%v\" does not exist in local store", key)
glog.Warningf("SSL certificate %q does not exist in local store.", key)
continue
}

View file

@ -29,14 +29,9 @@ import (
"k8s.io/ingress-nginx/internal/ingress/annotations/healthcheck"
)
// getEndpoints returns a list of <endpoint ip>:<port> for a given service/target port combination.
func getEndpoints(
s *corev1.Service,
port *corev1.ServicePort,
proto corev1.Protocol,
hz *healthcheck.Config,
getServiceEndpoints func(*corev1.Service) (*corev1.Endpoints, error),
) []ingress.Endpoint {
// getEndpoints returns a list of Endpoint structs for a given service/target port combination.
func getEndpoints(s *corev1.Service, port *corev1.ServicePort, proto corev1.Protocol, hz *healthcheck.Config,
getServiceEndpoints func(*corev1.Service) (*corev1.Endpoints, error)) []ingress.Endpoint {
upsServers := []ingress.Endpoint{}
@ -44,26 +39,24 @@ func getEndpoints(
return upsServers
}
// avoid duplicated upstream servers when the service
// contains multiple port definitions sharing the same
// targetport.
adus := make(map[string]bool)
// using a map avoids duplicated upstream servers when the service
// contains multiple port definitions sharing the same targetport
processedUpstreamServers := make(map[string]struct{})
// ExternalName services
if s.Spec.Type == corev1.ServiceTypeExternalName {
glog.V(3).Infof("Ingress using a service %v of type=ExternalName : %v", s.Name)
glog.V(3).Infof("Ingress using Service %q of type ExternalName.", s.Name)
targetPort := port.TargetPort.IntValue()
// check for invalid port value
if targetPort <= 0 {
glog.Errorf("ExternalName service with an invalid port: %v", targetPort)
glog.Errorf("ExternalName Service %q has an invalid port (%v)", s.Name, targetPort)
return upsServers
}
if net.ParseIP(s.Spec.ExternalName) == nil {
_, err := net.LookupHost(s.Spec.ExternalName)
if err != nil {
glog.Errorf("unexpected error resolving host %v: %v", s.Spec.ExternalName, err)
glog.Errorf("Error resolving host %q: %v", s.Spec.ExternalName, err)
return upsServers
}
}
@ -76,10 +69,10 @@ func getEndpoints(
})
}
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, port.String())
glog.V(3).Infof("Getting Endpoints for Service \"%v/%v\" and port %v", s.Namespace, s.Name, port.String())
ep, err := getServiceEndpoints(s)
if err != nil {
glog.Warningf("unexpected error obtaining service endpoints: %v", err)
glog.Warningf("Error obtaining Endpoints for Service \"%v/%v\": %v", s.Namespace, s.Name, err)
return upsServers
}
@ -99,14 +92,13 @@ func getEndpoints(
targetPort = epPort.Port
}
// check for invalid port value
if targetPort <= 0 {
continue
}
for _, epAddress := range ss.Addresses {
ep := fmt.Sprintf("%v:%v", epAddress.IP, targetPort)
if _, exists := adus[ep]; exists {
if _, exists := processedUpstreamServers[ep]; exists {
continue
}
ups := ingress.Endpoint{
@ -117,11 +109,11 @@ func getEndpoints(
Target: epAddress.TargetRef,
}
upsServers = append(upsServers, ups)
adus[ep] = true
processedUpstreamServers[ep] = struct{}{}
}
}
}
glog.V(3).Infof("endpoints found: %v", upsServers)
glog.V(3).Infof("Endpoints found for Service \"%v/%v\": %v", s.Namespace, s.Name, upsServers)
return upsServers
}

View file

@ -1,30 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import "github.com/prometheus/client_golang/prometheus"
// Stopable defines a prometheus collector that can be stopped
type Stopable interface {
prometheus.Collector
Stop()
}
type scrapeRequest struct {
results chan<- prometheus.Metric
done chan struct{}
}

View file

@ -1,225 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"github.com/golang/glog"
)
var (
ac = regexp.MustCompile(`Active connections: (\d+)`)
sahr = regexp.MustCompile(`(\d+)\s(\d+)\s(\d+)`)
reading = regexp.MustCompile(`Reading: (\d+)`)
writing = regexp.MustCompile(`Writing: (\d+)`)
waiting = regexp.MustCompile(`Waiting: (\d+)`)
)
type basicStatus struct {
// Active total number of active connections
Active int
// Accepted total number of accepted client connections
Accepted int
// Handled total number of handled connections. Generally, the parameter value is the same as accepts unless some resource limits have been reached (for example, the worker_connections limit).
Handled int
// Requests total number of client requests.
Requests int
// Reading current number of connections where nginx is reading the request header.
Reading int
// Writing current number of connections where nginx is writing the response back to the client.
Writing int
// Waiting current number of idle client connections waiting for a request.
Waiting int
}
// https://github.com/vozlt/nginx-module-vts
type vts struct {
NginxVersion string `json:"nginxVersion"`
LoadMsec int `json:"loadMsec"`
NowMsec int `json:"nowMsec"`
// Total connections and requests(same as stub_status_module in NGINX)
Connections connections `json:"connections"`
// Traffic(in/out) and request and response counts and cache hit ratio per each server zone
ServerZones map[string]serverZone `json:"serverZones"`
// Traffic(in/out) and request and response counts and cache hit ratio per each server zone filtered through
// the vhost_traffic_status_filter_by_set_key directive
FilterZones map[string]map[string]filterZone `json:"filterZones"`
// Traffic(in/out) and request and response counts per server in each upstream group
UpstreamZones map[string][]upstreamZone `json:"upstreamZones"`
}
type serverZone struct {
RequestCounter float64 `json:"requestCounter"`
InBytes float64 `json:"inBytes"`
OutBytes float64 `json:"outBytes"`
Responses response `json:"responses"`
Cache cache `json:"cache"`
}
type filterZone struct {
RequestCounter float64 `json:"requestCounter"`
InBytes float64 `json:"inBytes"`
OutBytes float64 `json:"outBytes"`
Cache cache `json:"cache"`
Responses response `json:"responses"`
}
type upstreamZone struct {
Responses response `json:"responses"`
Server string `json:"server"`
RequestCounter float64 `json:"requestCounter"`
InBytes float64 `json:"inBytes"`
OutBytes float64 `json:"outBytes"`
ResponseMsec float64 `json:"responseMsec"`
Weight float64 `json:"weight"`
MaxFails float64 `json:"maxFails"`
FailTimeout float64 `json:"failTimeout"`
Backup BoolToFloat64 `json:"backup"`
Down BoolToFloat64 `json:"down"`
}
type cache struct {
Miss float64 `json:"miss"`
Bypass float64 `json:"bypass"`
Expired float64 `json:"expired"`
Stale float64 `json:"stale"`
Updating float64 `json:"updating"`
Revalidated float64 `json:"revalidated"`
Hit float64 `json:"hit"`
Scarce float64 `json:"scarce"`
}
type response struct {
OneXx float64 `json:"1xx"`
TwoXx float64 `json:"2xx"`
TheeXx float64 `json:"3xx"`
FourXx float64 `json:"4xx"`
FiveXx float64 `json:"5xx"`
}
type connections struct {
Active float64 `json:"active"`
Reading float64 `json:"reading"`
Writing float64 `json:"writing"`
Waiting float64 `json:"waiting"`
Accepted float64 `json:"accepted"`
Handled float64 `json:"handled"`
Requests float64 `json:"requests"`
}
// BoolToFloat64 ...
type BoolToFloat64 float64
// UnmarshalJSON ...
func (bit BoolToFloat64) UnmarshalJSON(data []byte) error {
asString := string(data)
if asString == "1" || asString == "true" {
bit = 1
} else if asString == "0" || asString == "false" {
bit = 0
} else {
return fmt.Errorf(fmt.Sprintf("boolean unmarshal error: invalid input %s", asString))
}
return nil
}
func getNginxStatus(port int, path string) (*basicStatus, error) {
url := fmt.Sprintf("http://0.0.0.0:%v%v", port, path)
glog.V(3).Infof("start scraping url: %v", url)
data, err := httpBody(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx status page: %v", err)
}
return parse(string(data)), nil
}
func httpBody(url string) ([]byte, error) {
resp, err := http.DefaultClient.Get(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx : %v", err)
}
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx (%v)", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, fmt.Errorf("unexpected error scraping nginx (status %v)", resp.StatusCode)
}
return data, nil
}
func getNginxVtsMetrics(port int, path string) (*vts, error) {
url := fmt.Sprintf("http://0.0.0.0:%v%v", port, path)
glog.V(3).Infof("start scraping url: %v", url)
data, err := httpBody(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx vts (%v)", err)
}
var vts *vts
err = json.Unmarshal(data, &vts)
if err != nil {
return nil, fmt.Errorf("unexpected error json unmarshal (%v)", err)
}
glog.V(3).Infof("scrape returned : %v", vts)
return vts, nil
}
func parse(data string) *basicStatus {
acr := ac.FindStringSubmatch(data)
sahrr := sahr.FindStringSubmatch(data)
readingr := reading.FindStringSubmatch(data)
writingr := writing.FindStringSubmatch(data)
waitingr := waiting.FindStringSubmatch(data)
return &basicStatus{
toInt(acr, 1),
toInt(sahrr, 1),
toInt(sahrr, 2),
toInt(sahrr, 3),
toInt(readingr, 1),
toInt(writingr, 1),
toInt(waitingr, 1),
}
}
func toInt(data []string, pos int) int {
if len(data) == 0 {
return 0
}
if pos > len(data) {
return 0
}
if v, err := strconv.Atoi(data[pos]); err == nil {
return v
}
return 0
}

View file

@ -1,72 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"testing"
"github.com/kylelemons/godebug/pretty"
)
func TestParseStatus(t *testing.T) {
tests := []struct {
in string
out *basicStatus
}{
{`Active connections: 43
server accepts handled requests
7368 7368 10993
Reading: 0 Writing: 5 Waiting: 38`,
&basicStatus{43, 7368, 7368, 10993, 0, 5, 38},
},
{`Active connections: 0
server accepts handled requests
1 7 0
Reading: A Writing: B Waiting: 38`,
&basicStatus{0, 1, 7, 0, 0, 0, 38},
},
}
for _, test := range tests {
r := parse(test.in)
if diff := pretty.Compare(r, test.out); diff != "" {
t.Logf("%v", diff)
t.Fatalf("expected %v but returned %v", test.out, r)
}
}
}
func TestToint(t *testing.T) {
tests := []struct {
in []string
pos int
exp int
}{
{[]string{}, 0, 0},
{[]string{}, 1, 0},
{[]string{"A"}, 0, 0},
{[]string{"1"}, 0, 1},
{[]string{"a", "2"}, 1, 2},
}
for _, test := range tests {
v := toInt(test.in, test.pos)
if v != test.exp {
t.Fatalf("expected %v but returned %v", test.exp, v)
}
}
}

View file

@ -1,273 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"reflect"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
const ns = "nginx"
type (
vtsCollector struct {
scrapeChan chan scrapeRequest
port int
path string
data *vtsData
watchNamespace string
ingressClass string
}
vtsData struct {
bytes *prometheus.Desc
cache *prometheus.Desc
connections *prometheus.Desc
responses *prometheus.Desc
requests *prometheus.Desc
filterZoneBytes *prometheus.Desc
filterZoneResponses *prometheus.Desc
filterZoneCache *prometheus.Desc
upstreamBackup *prometheus.Desc
upstreamBytes *prometheus.Desc
upstreamDown *prometheus.Desc
upstreamFailTimeout *prometheus.Desc
upstreamMaxFails *prometheus.Desc
upstreamResponses *prometheus.Desc
upstreamRequests *prometheus.Desc
upstreamResponseMsec *prometheus.Desc
upstreamWeight *prometheus.Desc
}
)
// NewNGINXVTSCollector returns a new prometheus collector for the VTS module
func NewNGINXVTSCollector(watchNamespace, ingressClass string, port int, path string) Stopable {
p := vtsCollector{
scrapeChan: make(chan scrapeRequest),
port: port,
path: path,
watchNamespace: watchNamespace,
ingressClass: ingressClass,
}
p.data = &vtsData{
bytes: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "bytes_total"),
"Nginx bytes count",
[]string{"ingress_class", "namespace", "server_zone", "direction"}, nil),
cache: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "cache_total"),
"Nginx cache count",
[]string{"ingress_class", "namespace", "server_zone", "type"}, nil),
connections: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "connections_total"),
"Nginx connections count",
[]string{"ingress_class", "namespace", "type"}, nil),
responses: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "responses_total"),
"The number of responses with status codes 1xx, 2xx, 3xx, 4xx, and 5xx.",
[]string{"ingress_class", "namespace", "server_zone", "status_code"}, nil),
requests: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "requests_total"),
"The total number of requested client connections.",
[]string{"ingress_class", "namespace", "server_zone"}, nil),
filterZoneBytes: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "filterzone_bytes_total"),
"Nginx bytes count",
[]string{"ingress_class", "namespace", "server_zone", "key", "direction"}, nil),
filterZoneResponses: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "filterzone_responses_total"),
"The number of responses with status codes 1xx, 2xx, 3xx, 4xx, and 5xx.",
[]string{"ingress_class", "namespace", "server_zone", "key", "status_code"}, nil),
filterZoneCache: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "filterzone_cache_total"),
"Nginx cache count",
[]string{"ingress_class", "namespace", "server_zone", "key", "type"}, nil),
upstreamBackup: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_backup"),
"Current backup setting of the server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamBytes: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_bytes_total"),
"The total number of bytes sent to this server.",
[]string{"ingress_class", "namespace", "upstream", "server", "direction"}, nil),
upstreamDown: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "vts_upstream_down_total"),
"Current down setting of the server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamFailTimeout: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_fail_timeout"),
"Current fail_timeout setting of the server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamMaxFails: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_maxfails"),
"Current max_fails setting of the server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamResponses: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_responses_total"),
"The number of upstream responses with status codes 1xx, 2xx, 3xx, 4xx, and 5xx.",
[]string{"ingress_class", "namespace", "upstream", "server", "status_code"}, nil),
upstreamRequests: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_requests_total"),
"The total number of client connections forwarded to this server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamResponseMsec: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_response_msecs_avg"),
"The average of only upstream response processing times in milliseconds.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamWeight: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_weight"),
"Current upstream weight setting of the server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
}
go p.start()
return p
}
// Describe implements prometheus.Collector.
func (p vtsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- p.data.bytes
ch <- p.data.cache
ch <- p.data.connections
ch <- p.data.requests
ch <- p.data.responses
ch <- p.data.upstreamBackup
ch <- p.data.upstreamBytes
ch <- p.data.upstreamDown
ch <- p.data.upstreamFailTimeout
ch <- p.data.upstreamMaxFails
ch <- p.data.upstreamRequests
ch <- p.data.upstreamResponseMsec
ch <- p.data.upstreamResponses
ch <- p.data.upstreamWeight
ch <- p.data.filterZoneBytes
ch <- p.data.filterZoneCache
ch <- p.data.filterZoneResponses
}
// Collect implements prometheus.Collector.
func (p vtsCollector) Collect(ch chan<- prometheus.Metric) {
req := scrapeRequest{results: ch, done: make(chan struct{})}
p.scrapeChan <- req
<-req.done
}
func (p vtsCollector) start() {
for req := range p.scrapeChan {
ch := req.results
p.scrapeVts(ch)
req.done <- struct{}{}
}
}
func (p vtsCollector) Stop() {
close(p.scrapeChan)
}
// scrapeVts scrape nginx vts metrics
func (p vtsCollector) scrapeVts(ch chan<- prometheus.Metric) {
nginxMetrics, err := getNginxVtsMetrics(p.port, p.path)
if err != nil {
glog.Warningf("unexpected error obtaining nginx status info: %v", err)
return
}
reflectMetrics(&nginxMetrics.Connections, p.data.connections, ch, p.ingressClass, p.watchNamespace)
for name, zones := range nginxMetrics.UpstreamZones {
for pos, value := range zones {
reflectMetrics(&zones[pos].Responses, p.data.upstreamResponses, ch, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamRequests,
prometheus.CounterValue, zones[pos].RequestCounter, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamDown,
prometheus.CounterValue, float64(zones[pos].Down), p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamWeight,
prometheus.CounterValue, zones[pos].Weight, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamResponseMsec,
prometheus.CounterValue, zones[pos].ResponseMsec, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamBackup,
prometheus.CounterValue, float64(zones[pos].Backup), p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamFailTimeout,
prometheus.CounterValue, zones[pos].FailTimeout, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamMaxFails,
prometheus.CounterValue, zones[pos].MaxFails, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamBytes,
prometheus.CounterValue, zones[pos].InBytes, p.ingressClass, p.watchNamespace, name, value.Server, "in")
ch <- prometheus.MustNewConstMetric(p.data.upstreamBytes,
prometheus.CounterValue, zones[pos].OutBytes, p.ingressClass, p.watchNamespace, name, value.Server, "out")
}
}
for name, zone := range nginxMetrics.ServerZones {
reflectMetrics(&zone.Responses, p.data.responses, ch, p.ingressClass, p.watchNamespace, name)
reflectMetrics(&zone.Cache, p.data.cache, ch, p.ingressClass, p.watchNamespace, name)
ch <- prometheus.MustNewConstMetric(p.data.requests,
prometheus.CounterValue, zone.RequestCounter, p.ingressClass, p.watchNamespace, name)
ch <- prometheus.MustNewConstMetric(p.data.bytes,
prometheus.CounterValue, zone.InBytes, p.ingressClass, p.watchNamespace, name, "in")
ch <- prometheus.MustNewConstMetric(p.data.bytes,
prometheus.CounterValue, zone.OutBytes, p.ingressClass, p.watchNamespace, name, "out")
}
for serverZone, keys := range nginxMetrics.FilterZones {
for name, zone := range keys {
reflectMetrics(&zone.Responses, p.data.filterZoneResponses, ch, p.ingressClass, p.watchNamespace, serverZone, name)
reflectMetrics(&zone.Cache, p.data.filterZoneCache, ch, p.ingressClass, p.watchNamespace, serverZone, name)
ch <- prometheus.MustNewConstMetric(p.data.filterZoneBytes,
prometheus.CounterValue, zone.InBytes, p.ingressClass, p.watchNamespace, serverZone, name, "in")
ch <- prometheus.MustNewConstMetric(p.data.filterZoneBytes,
prometheus.CounterValue, zone.OutBytes, p.ingressClass, p.watchNamespace, serverZone, name, "out")
}
}
}
func reflectMetrics(value interface{}, desc *prometheus.Desc, ch chan<- prometheus.Metric, labels ...string) {
val := reflect.ValueOf(value).Elem()
for i := 0; i < val.NumField(); i++ {
tag := val.Type().Field(i).Tag
l := append(labels, tag.Get("json"))
ch <- prometheus.MustNewConstMetric(desc,
prometheus.CounterValue, val.Field(i).Interface().(float64),
l...)
}
}

View file

@ -38,7 +38,6 @@ import (
proxyproto "github.com/armon/go-proxyproto"
"github.com/eapache/channels"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
@ -65,26 +64,14 @@ type statusModule string
const (
ngxHealthPath = "/healthz"
defaultStatusModule statusModule = "default"
vtsStatusModule statusModule = "vts"
)
var (
tmplPath = "/etc/nginx/template/nginx.tmpl"
cfgPath = "/etc/nginx/nginx.conf"
nginxBinary = "/usr/sbin/nginx"
tmplPath = "/etc/nginx/template/nginx.tmpl"
)
// NewNGINXController creates a new NGINX Ingress controller.
// If the environment variable NGINX_BINARY exists it will be used
// as source for nginx commands
func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXController {
ngx := os.Getenv("NGINX_BINARY")
if ngx == "" {
ngx = nginxBinary
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
@ -93,12 +80,10 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
h, err := dns.GetSystemNameServers()
if err != nil {
glog.Warningf("unexpected error reading system nameservers: %v", err)
glog.Warningf("Error reading system nameservers: %v", err)
}
n := &NGINXController{
binary: ngx,
isIPV6Enabled: ing_net.IsIPv6Enabled(),
resolver: h,
@ -116,8 +101,7 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
fileSystem: fs,
// create an empty configuration.
runningConfig: &ingress.Configuration{},
runningConfig: new(ingress.Configuration),
Proxy: &TCPProxy{},
}
@ -134,8 +118,6 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
fs,
n.updateCh)
n.stats = newStatsCollector(config.Namespace, class.IngressClass, n.binary, n.cfg.ListenPorts.Status)
n.syncQueue = task.NewTaskQueue(n.syncIngress)
n.annotations = annotations.NewAnnotationExtractor(n.store)
@ -153,7 +135,7 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
UseNodeInternalIP: config.UseNodeInternalIP,
})
} else {
glog.Warning("Update of ingress status is disabled (flag --update-status=false was specified)")
glog.Warning("Update of Ingress status is disabled (flag --update-status)")
}
onTemplateChange := func() {
@ -162,68 +144,66 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
// this error is different from the rest because it must be clear why nginx is not working
glog.Errorf(`
-------------------------------------------------------------------------------
Error loading new template : %v
Error loading new template: %v
-------------------------------------------------------------------------------
`, err)
return
}
n.t = template
glog.Info("new NGINX template loaded")
n.SetForceReload(true)
glog.Info("New NGINX configuration template loaded.")
n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
}
ngxTpl, err := ngx_template.NewTemplate(tmplPath, fs)
if err != nil {
glog.Fatalf("invalid NGINX template: %v", err)
glog.Fatalf("Invalid NGINX configuration template: %v", err)
}
n.t = ngxTpl
// TODO: refactor
if _, ok := fs.(filesystem.DefaultFs); !ok {
watch.NewDummyFileWatcher(tmplPath, onTemplateChange)
} else {
// do not setup watchers on tests
return n
}
_, err = watch.NewFileWatcher(tmplPath, onTemplateChange)
_, err = watch.NewFileWatcher(tmplPath, onTemplateChange)
if err != nil {
glog.Fatalf("Error creating file watcher for %v: %v", tmplPath, err)
}
filesToWatch := []string{}
err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
if err != nil {
glog.Fatalf("unexpected error creating file watcher: %v", err)
return err
}
filesToWatch := []string{}
err := filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
filesToWatch = append(filesToWatch, path)
if info.IsDir() {
return nil
}
filesToWatch = append(filesToWatch, path)
return nil
})
if err != nil {
glog.Fatalf("Error creating file watchers: %v", err)
}
for _, f := range filesToWatch {
_, err = watch.NewFileWatcher(f, func() {
glog.Info("File %v changed. Reloading NGINX", f)
n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
})
if err != nil {
glog.Fatalf("unexpected error creating file watcher: %v", err)
glog.Fatalf("Error creating file watcher for %v: %v", f, err)
}
for _, f := range filesToWatch {
_, err = watch.NewFileWatcher(f, func() {
glog.Info("file %v changed. Reloading NGINX", f)
n.SetForceReload(true)
})
if err != nil {
glog.Fatalf("unexpected error creating file watcher: %v", err)
}
}
}
return n
}
// NGINXController ...
// NGINXController describes a NGINX Ingress controller.
type NGINXController struct {
cfg *Configuration
@ -237,30 +217,24 @@ type NGINXController struct {
syncRateLimiter flowcontrol.RateLimiter
// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
// stopLock is used to enforce that only a single call to Stop send at
// a given time. We allow stopping through an HTTP endpoint and
// allowing concurrent stoppers leads to stack traces.
stopLock *sync.Mutex
stopCh chan struct{}
updateCh *channels.RingChannel
// ngxErrCh channel used to detect errors with the nginx processes
// ngxErrCh is used to detect errors with the NGINX processes
ngxErrCh chan error
// runningConfig contains the running configuration in the Backend
runningConfig *ingress.Configuration
forceReload int32
t *ngx_template.Template
binary string
resolver []net.IP
stats *statsCollector
statusModule statusModule
// returns true if IPV6 is enabled in the pod
isIPV6Enabled bool
@ -273,9 +247,9 @@ type NGINXController struct {
fileSystem filesystem.Filesystem
}
// Start start a new NGINX master process running in foreground.
// Start starts a new NGINX master process running in the foreground.
func (n *NGINXController) Start() {
glog.Infof("starting Ingress controller")
glog.Infof("Starting NGINX Ingress controller")
n.store.Run(n.stopCh)
@ -283,9 +257,9 @@ func (n *NGINXController) Start() {
go n.syncStatus.Run()
}
cmd := exec.Command(n.binary, "-c", cfgPath)
cmd := nginxExecCommand()
// put nginx in another process group to prevent it
// put NGINX in another process group to prevent it
// to receive signals meant for the controller
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
@ -296,12 +270,12 @@ func (n *NGINXController) Start() {
n.setupSSLProxy()
}
glog.Info("starting NGINX process...")
glog.Info("Starting NGINX process")
n.start(cmd)
go n.syncQueue.Run(time.Second, n.stopCh)
// force initial sync
n.syncQueue.Enqueue(&extensions.Ingress{})
n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))
for {
select {
@ -320,7 +294,7 @@ func (n *NGINXController) Start() {
// release command resources
cmd.Process.Release()
// start a new nginx master process if the controller is not being stopped
cmd = exec.Command(n.binary, "-c", cfgPath)
cmd = nginxExecCommand()
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pgid: 0,
@ -334,12 +308,14 @@ func (n *NGINXController) Start() {
if evt, ok := event.(store.Event); ok {
glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
if evt.Type == store.ConfigurationEvent {
n.SetForceReload(true)
// TODO: is this necessary? Consider removing this special case
n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
continue
}
n.syncQueue.Enqueue(evt.Obj)
n.syncQueue.EnqueueSkippableTask(evt.Obj)
} else {
glog.Warningf("unexpected event type received %T", event)
glog.Warningf("Unexpected event type received %T", event)
}
case <-n.stopCh:
break
@ -354,12 +330,11 @@ func (n *NGINXController) Stop() error {
n.stopLock.Lock()
defer n.stopLock.Unlock()
// Only try draining the workqueue if we haven't already.
if n.syncQueue.IsShuttingDown() {
return fmt.Errorf("shutdown already in progress")
}
glog.Infof("shutting down controller queues")
glog.Infof("Shutting down controller queues")
close(n.stopCh)
go n.syncQueue.Shutdown()
if n.syncStatus != nil {
@ -368,7 +343,7 @@ func (n *NGINXController) Stop() error {
// Send stop signal to Nginx
glog.Info("stopping NGINX process...")
cmd := exec.Command(n.binary, "-c", cfgPath, "-s", "quit")
cmd := nginxExecCommand("-s", "quit")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
@ -376,7 +351,7 @@ func (n *NGINXController) Stop() error {
return err
}
// Wait for the Nginx process disappear
// wait for the NGINX process to terminate
timer := time.NewTicker(time.Second * 1)
for range timer.C {
if !process.IsNginxRunning() {
@ -393,7 +368,7 @@ func (n *NGINXController) start(cmd *exec.Cmd) {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
glog.Fatalf("nginx error: %v", err)
glog.Fatalf("NGINX error: %v", err)
n.ngxErrCh <- err
return
}
@ -416,18 +391,18 @@ func (n NGINXController) DefaultEndpoint() ingress.Endpoint {
// running the command "nginx -t" using a temporal file.
func (n NGINXController) testTemplate(cfg []byte) error {
if len(cfg) == 0 {
return fmt.Errorf("invalid nginx configuration (empty)")
return fmt.Errorf("Invalid NGINX configuration (empty)")
}
tmpfile, err := ioutil.TempFile("", "nginx-cfg")
if err != nil {
return err
}
defer tmpfile.Close()
err = ioutil.WriteFile(tmpfile.Name(), cfg, 0644)
err = ioutil.WriteFile(tmpfile.Name(), cfg, file.ReadWriteByUser)
if err != nil {
return err
}
out, err := exec.Command(n.binary, "-t", "-c", tmpfile.Name()).CombinedOutput()
out, err := nginxTestCommand(tmpfile.Name()).CombinedOutput()
if err != nil {
// this error is different from the rest because it must be clear why nginx is not working
oe := fmt.Sprintf(`
@ -443,14 +418,10 @@ Error: %v
return nil
}
// OnUpdate is called periodically by syncQueue to keep the configuration in sync.
//
// 1. converts configmap configuration to custom configuration object
// 2. write the custom template (the complexity depends on the implementation)
// 3. write the configuration file
//
// returning nil implies the backend will be reloaded.
// if an error is returned means requeue the update
// OnUpdate is called by the synchronization loop whenever configuration
// changes were detected. The received backend Configuration is merged with the
// configuration ConfigMap before generating the final configuration file.
// Returns nil in case the backend was successfully reloaded.
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
cfg := n.store.GetBackendConfiguration()
cfg.Resolver = n.resolver
@ -460,7 +431,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
for _, pb := range ingressCfg.PassthroughBackends {
svc := pb.Service
if svc == nil {
glog.Warningf("missing service for PassthroughBackends %v", pb.Backend)
glog.Warningf("Missing Service for SSL Passthrough backend %q", pb.Backend)
continue
}
port, err := strconv.Atoi(pb.Port.String())
@ -480,7 +451,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
}
}
//TODO: Allow PassthroughBackends to specify they support proxy-protocol
// TODO: Allow PassthroughBackends to specify they support proxy-protocol
servers = append(servers, &TCPServer{
Hostname: pb.Hostname,
IP: svc.Spec.ClusterIP,
@ -492,13 +463,6 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
n.Proxy.ServerList = servers
}
// we need to check if the status module configuration changed
if cfg.EnableVtsStatus {
n.setupMonitor(vtsStatusModule)
} else {
n.setupMonitor(defaultStatusModule)
}
// NGINX cannot resize the hash tables used to store server names.
// For this reason we check if the defined size defined is correct
// for the FQDN defined in the ingress rules adjusting the value
@ -520,7 +484,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
} else {
n = fmt.Sprintf("www.%v", srv.Hostname)
}
glog.V(3).Infof("creating redirect from %v to %v", srv.Hostname, n)
glog.V(3).Infof("Creating redirect from %q to %q", srv.Hostname, n)
if _, ok := redirectServers[n]; !ok {
found := false
for _, esrv := range ingressCfg.Servers {
@ -537,24 +501,24 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
}
if cfg.ServerNameHashBucketSize == 0 {
nameHashBucketSize := nginxHashBucketSize(longestName)
glog.V(3).Infof("adjusting ServerNameHashBucketSize variable to %v", nameHashBucketSize)
glog.V(3).Infof("Adjusting ServerNameHashBucketSize variable to %q", nameHashBucketSize)
cfg.ServerNameHashBucketSize = nameHashBucketSize
}
serverNameHashMaxSize := nextPowerOf2(serverNameBytes)
if cfg.ServerNameHashMaxSize < serverNameHashMaxSize {
glog.V(3).Infof("adjusting ServerNameHashMaxSize variable to %v", serverNameHashMaxSize)
glog.V(3).Infof("Adjusting ServerNameHashMaxSize variable to %q", serverNameHashMaxSize)
cfg.ServerNameHashMaxSize = serverNameHashMaxSize
}
// the limit of open files is per worker process
// and we leave some room to avoid consuming all the FDs available
wp, err := strconv.Atoi(cfg.WorkerProcesses)
glog.V(3).Infof("number of worker processes: %v", wp)
glog.V(3).Infof("Number of worker processes: %d", wp)
if err != nil {
wp = 1
}
maxOpenFiles := (sysctlFSFileMax() / wp) - 1024
glog.V(2).Infof("maximum number of open file descriptors : %v", maxOpenFiles)
glog.V(2).Infof("Maximum number of open file descriptors: %d", maxOpenFiles)
if maxOpenFiles < 1024 {
// this means the value of RLIMIT_NOFILE is too low.
maxOpenFiles = 1024
@ -564,7 +528,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
if cfg.ProxySetHeaders != "" {
cmap, err := n.store.GetConfigMap(cfg.ProxySetHeaders)
if err != nil {
glog.Warningf("unexpected error reading configmap %v: %v", cfg.ProxySetHeaders, err)
glog.Warningf("Error reading ConfigMap %q from local store: %v", cfg.ProxySetHeaders, err)
}
setHeaders = cmap.Data
@ -574,7 +538,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
if cfg.AddHeaders != "" {
cmap, err := n.store.GetConfigMap(cfg.AddHeaders)
if err != nil {
glog.Warningf("unexpected error reading configmap %v: %v", cfg.AddHeaders, err)
glog.Warningf("Error reading ConfigMap %q from local store: %v", cfg.AddHeaders, err)
}
addHeaders = cmap.Data
@ -586,7 +550,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
secret, err := n.store.GetSecret(secretName)
if err != nil {
glog.Warningf("unexpected error reading secret %v: %v", secretName, err)
glog.Warningf("Error reading Secret %q from local store: %v", secretName, err)
}
nsSecName := strings.Replace(secretName, "/", "-", -1)
@ -595,7 +559,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
if ok {
pemFileName, err := ssl.AddOrUpdateDHParam(nsSecName, dh, n.fileSystem)
if err != nil {
glog.Warningf("unexpected error adding or updating dhparam %v file: %v", nsSecName, err)
glog.Warningf("Error adding or updating dhparam file %v: %v", nsSecName, err)
} else {
sslDHParam = pemFileName
}
@ -647,31 +611,28 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
return err
}
defer tmpfile.Close()
err = ioutil.WriteFile(tmpfile.Name(), content, 0644)
err = ioutil.WriteFile(tmpfile.Name(), content, file.ReadWriteByUser)
if err != nil {
return err
}
// executing diff can return exit code != 0
// TODO: executing diff can return exit code != 0
diffOutput, _ := exec.Command("diff", "-u", cfgPath, tmpfile.Name()).CombinedOutput()
glog.Infof("NGINX configuration diff\n")
glog.Infof("%v\n", string(diffOutput))
glog.Infof("NGINX configuration diff:\n%v", string(diffOutput))
// Do not use defer to remove the temporal file.
// This is helpful when there is an error in the
// temporal configuration (we can manually inspect the file).
// Only remove the file when no error occurred.
// we do not defer the deletion of temp files in order
// to keep them around for inspection in case of error
os.Remove(tmpfile.Name())
}
}
err = ioutil.WriteFile(cfgPath, content, 0644)
err = ioutil.WriteFile(cfgPath, content, file.ReadWriteByUser)
if err != nil {
return err
}
o, err := exec.Command(n.binary, "-s", "reload", "-c", cfgPath).CombinedOutput()
o, err := nginxExecCommand("-s", "reload").CombinedOutput()
if err != nil {
return fmt.Errorf("%v\n%v", err, string(o))
}
@ -679,9 +640,10 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
return nil
}
// nginxHashBucketSize computes the correct nginx hash_bucket_size for a hash with the given longest key
// nginxHashBucketSize computes the correct NGINX hash_bucket_size for a hash
// with the given longest key.
func nginxHashBucketSize(longestString int) int {
// See https://github.com/kubernetes/ingress-nginxs/issues/623 for an explanation
// see https://github.com/kubernetes/ingress-nginxs/issues/623 for an explanation
wordSize := 8 // Assume 64 bit CPU
n := longestString + 2
aligned := (n + wordSize - 1) & ^(wordSize - 1)
@ -708,7 +670,7 @@ func (n *NGINXController) setupSSLProxy() {
sslPort := n.cfg.ListenPorts.HTTPS
proxyPort := n.cfg.ListenPorts.SSLProxy
glog.Info("starting TLS proxy for SSL passthrough")
glog.Info("Starting TLS proxy for SSL Passthrough")
n.Proxy = &TCPProxy{
Default: &TCPServer{
Hostname: "localhost",
@ -725,32 +687,33 @@ func (n *NGINXController) setupSSLProxy() {
proxyList := &proxyproto.Listener{Listener: listener, ProxyHeaderTimeout: cfg.ProxyProtocolHeaderTimeout}
// start goroutine that accepts tcp connections in port 443
// accept TCP connections on the configured HTTPS port
go func() {
for {
var conn net.Conn
var err error
if n.store.GetBackendConfiguration().UseProxyProtocol {
// we need to wrap the listener in order to decode
// proxy protocol before handling the connection
// wrap the listener in order to decode Proxy
// Protocol before handling the connection
conn, err = proxyList.Accept()
} else {
conn, err = listener.Accept()
}
if err != nil {
glog.Warningf("unexpected error accepting tcp connection: %v", err)
glog.Warningf("Error accepting TCP connection: %v", err)
continue
}
glog.V(3).Infof("remote address %s to local %s", conn.RemoteAddr(), conn.LocalAddr())
glog.V(3).Infof("Handling connection from remote address %s to local %s", conn.RemoteAddr(), conn.LocalAddr())
go n.Proxy.Handle(conn)
}
}()
}
// IsDynamicConfigurationEnough decides if the new configuration changes can be dynamically applied without reloading
// IsDynamicConfigurationEnough returns whether a Configuration can be
// dynamically applied, without reloading the backend.
func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configuration) bool {
copyOfRunningConfig := *n.runningConfig
copyOfPcfg := *pcfg
@ -761,8 +724,8 @@ func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configurati
return copyOfRunningConfig.Equal(&copyOfPcfg)
}
// configureDynamically JSON encodes new Backends and POSTs it to an internal HTTP endpoint
// that is handled by Lua
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
func configureDynamically(pcfg *ingress.Configuration, port int) error {
backends := make([]*ingress.Backend, len(pcfg.Backends))
@ -796,7 +759,7 @@ func configureDynamically(pcfg *ingress.Configuration, port int) error {
return err
}
glog.V(2).Infof("posting backends configuration: %s", buf)
glog.V(2).Infof("Posting backends configuration: %s", buf)
url := fmt.Sprintf("http://localhost:%d/configuration/backends", port)
resp, err := http.Post(url, "application/json", bytes.NewReader(buf))
@ -806,7 +769,7 @@ func configureDynamically(pcfg *ingress.Configuration, port int) error {
defer func() {
if err := resp.Body.Close(); err != nil {
glog.Warningf("error while closing response body: \n%v", err)
glog.Warningf("Error while closing response body:\n%v", err)
}
}()

View file

@ -1,97 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/ingress-nginx/internal/ingress/controller/metric/collector"
)
const (
ngxStatusPath = "/nginx_status"
ngxVtsPath = "/nginx_status/format/json"
)
func (n *NGINXController) setupMonitor(sm statusModule) {
csm := n.statusModule
if csm != sm {
glog.Infof("changing prometheus collector from %v to %v", csm, sm)
n.stats.stop(csm)
n.stats.start(sm)
n.statusModule = sm
}
}
type statsCollector struct {
process prometheus.Collector
basic collector.Stopable
vts collector.Stopable
namespace string
watchClass string
port int
}
func (s *statsCollector) stop(sm statusModule) {
switch sm {
case defaultStatusModule:
s.basic.Stop()
prometheus.Unregister(s.basic)
case vtsStatusModule:
s.vts.Stop()
prometheus.Unregister(s.vts)
}
}
func (s *statsCollector) start(sm statusModule) {
switch sm {
case defaultStatusModule:
s.basic = collector.NewNginxStatus(s.namespace, s.watchClass, s.port, ngxStatusPath)
prometheus.Register(s.basic)
break
case vtsStatusModule:
s.vts = collector.NewNGINXVTSCollector(s.namespace, s.watchClass, s.port, ngxVtsPath)
prometheus.Register(s.vts)
break
}
}
func newStatsCollector(ns, class, binary string, port int) *statsCollector {
glog.Infof("starting new nginx stats collector for Ingress controller running in namespace %v (class %v)", ns, class)
glog.Infof("collector extracting information from port %v", port)
pc, err := collector.NewNamedProcess(true, collector.BinaryNameMatcher{
Name: "nginx",
Binary: binary,
})
if err != nil {
glog.Fatalf("unexpected error registering nginx collector: %v", err)
}
err = prometheus.Register(pc)
if err != nil {
glog.Fatalf("unexpected error registering nginx collector: %v", err)
}
return &statsCollector{
namespace: ns,
watchClass: class,
process: pc,
port: port,
}
}

View file

@ -479,6 +479,18 @@ func New(checkOCSP bool,
if key == configmap {
store.setConfig(cm)
}
ings := store.listers.IngressAnnotation.List()
for _, ingKey := range ings {
key := k8s.MetaNamespaceKey(ingKey)
ing, err := store.GetIngress(key)
if err != nil {
glog.Errorf("could not find Ingress %v in local store: %v", key, err)
continue
}
store.extractAnnotations(ing)
}
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
@ -494,6 +506,13 @@ func New(checkOCSP bool,
store.informers.ConfigMap.AddEventHandler(cmEventHandler)
store.informers.Service.AddEventHandler(cache.ResourceEventHandlerFuncs{})
// do not wait for informers to read the configmap configuration
cm, err := client.CoreV1().ConfigMaps(namespace).Get(configmap, metav1.GetOptions{})
if err != nil {
glog.Warningf("Unexpected error reading configuration configmap: %v", err)
}
store.setConfig(cm)
return store
}
@ -699,7 +718,7 @@ func (s *k8sStore) setConfig(cmap *corev1.ConfigMap) {
glog.Warningf("unexpected error decoding key ssl-session-ticket-key: %v", err)
s.backendConfig.SSLSessionTicketKey = ""
}
ioutil.WriteFile("/etc/nginx/tickets.key", d, 0644)
ioutil.WriteFile("/etc/nginx/tickets.key", d, file.ReadWriteByUser)
}
}

View file

@ -26,7 +26,7 @@ import (
"github.com/paultag/sniff/parser"
)
// TCPServer describes a server that works in passthrough mode
// TCPServer describes a server that works in passthrough mode.
type TCPServer struct {
Hostname string
IP string
@ -34,13 +34,13 @@ type TCPServer struct {
ProxyProtocol bool
}
// TCPProxy describes the passthrough servers and a default as catch all
// TCPProxy describes the passthrough servers and a default as catch all.
type TCPProxy struct {
ServerList []*TCPServer
Default *TCPServer
}
// Get returns the TCPServer to use
// Get returns the TCPServer to use for a given host.
func (p *TCPProxy) Get(host string) *TCPServer {
if p.ServerList == nil {
return p.Default
@ -63,19 +63,19 @@ func (p *TCPProxy) Handle(conn net.Conn) {
length, err := conn.Read(data)
if err != nil {
glog.V(4).Infof("error reading the first 4k of the connection: %s", err)
glog.V(4).Infof("Error reading the first 4k of the connection: %s", err)
return
}
proxy := p.Default
hostname, err := parser.GetHostname(data[:])
if err == nil {
glog.V(4).Infof("parsed hostname from TLS Client Hello: %s", hostname)
glog.V(4).Infof("Parsed hostname from TLS Client Hello: %s", hostname)
proxy = p.Get(hostname)
}
if proxy == nil {
glog.V(4).Infof("there is no configured proxy for SSL connections")
glog.V(4).Infof("There is no configured proxy for SSL connections.")
return
}
@ -86,7 +86,7 @@ func (p *TCPProxy) Handle(conn net.Conn) {
defer clientConn.Close()
if proxy.ProxyProtocol {
//Write out the proxy-protocol header
// write out the Proxy Protocol header
localAddr := conn.LocalAddr().(*net.TCPAddr)
remoteAddr := conn.RemoteAddr().(*net.TCPAddr)
protocol := "UNKNOWN"
@ -96,16 +96,16 @@ func (p *TCPProxy) Handle(conn net.Conn) {
protocol = "TCP6"
}
proxyProtocolHeader := fmt.Sprintf("PROXY %s %s %s %d %d\r\n", protocol, remoteAddr.IP.String(), localAddr.IP.String(), remoteAddr.Port, localAddr.Port)
glog.V(4).Infof("Writing proxy protocol header - %s", proxyProtocolHeader)
glog.V(4).Infof("Writing Proxy Protocol header: %s", proxyProtocolHeader)
_, err = fmt.Fprintf(clientConn, proxyProtocolHeader)
}
if err != nil {
glog.Errorf("unexpected error writing proxy-protocol header: %s", err)
glog.Errorf("Error writing Proxy Protocol header: %s", err)
clientConn.Close()
} else {
_, err = clientConn.Write(data[:length])
if err != nil {
glog.Errorf("unexpected error writing first 4k of proxy data: %s", err)
glog.Errorf("Error writing the first 4k of proxy data: %s", err)
clientConn.Close()
}
}

View file

@ -25,6 +25,7 @@ import (
"github.com/golang/glog"
"github.com/mitchellh/hashstructure"
"github.com/mitchellh/mapstructure"
"k8s.io/apimachinery/pkg/util/sets"
@ -191,6 +192,15 @@ func ReadConfig(src map[string]string) config.Configuration {
glog.Warningf("unexpected error merging defaults: %v", err)
}
hash, err := hashstructure.Hash(to, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
glog.Warningf("unexpected error obtaining hash: %v", err)
}
to.Checksum = fmt.Sprintf("%v", hash)
return to
}

View file

@ -17,11 +17,13 @@ limitations under the License.
package template
import (
"fmt"
"reflect"
"testing"
"time"
"github.com/kylelemons/godebug/pretty"
"github.com/mitchellh/hashstructure"
"k8s.io/ingress-nginx/internal/ingress/controller/config"
)
@ -88,6 +90,14 @@ func TestMergeConfigMapToStruct(t *testing.T) {
def.NginxStatusIpv6Whitelist = []string{"::1", "2001::/16"}
def.ProxyAddOriginalUriHeader = false
hash, err := hashstructure.Hash(def, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
t.Fatalf("unexpected error obtaining hash: %v", err)
}
def.Checksum = fmt.Sprintf("%v", hash)
to := ReadConfig(conf)
if diff := pretty.Compare(to, def); diff != "" {
t.Errorf("unexpected diff: (-got +want)\n%s", diff)
@ -107,6 +117,14 @@ func TestMergeConfigMapToStruct(t *testing.T) {
}
def = config.NewDefault()
hash, err = hashstructure.Hash(def, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
t.Fatalf("unexpected error obtaining hash: %v", err)
}
def.Checksum = fmt.Sprintf("%v", hash)
to = ReadConfig(map[string]string{})
if diff := pretty.Compare(to, def); diff != "" {
t.Errorf("unexpected diff: (-got +want)\n%s", diff)
@ -114,6 +132,15 @@ func TestMergeConfigMapToStruct(t *testing.T) {
def = config.NewDefault()
def.WhitelistSourceRange = []string{"1.1.1.1/32"}
hash, err = hashstructure.Hash(def, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
t.Fatalf("unexpected error obtaining hash: %v", err)
}
def.Checksum = fmt.Sprintf("%v", hash)
to = ReadConfig(map[string]string{
"whitelist-source-range": "1.1.1.1/32",
})

View file

@ -17,6 +17,8 @@ limitations under the License.
package controller
import (
"os"
"os/exec"
"syscall"
"github.com/golang/glog"
@ -41,29 +43,53 @@ func newUpstream(name string) *ingress.Backend {
}
}
// sysctlSomaxconn returns the value of net.core.somaxconn, i.e.
// maximum number of connections that can be queued for acceptance
// sysctlSomaxconn returns the maximum number of connections that can be queued
// for acceptance (value of net.core.somaxconn)
// http://nginx.org/en/docs/http/ngx_http_core_module.html#listen
func sysctlSomaxconn() int {
maxConns, err := sysctl.New().GetSysctl("net/core/somaxconn")
if err != nil || maxConns < 512 {
glog.V(3).Infof("system net.core.somaxconn=%v (using system default)", maxConns)
glog.V(3).Infof("net.core.somaxconn=%v (using system default)", maxConns)
return 511
}
return maxConns
}
// sysctlFSFileMax returns the value of fs.file-max, i.e.
// maximum number of open file descriptors
// sysctlFSFileMax returns the maximum number of open file descriptors (value
// of fs.file-max) or 0 in case of error.
func sysctlFSFileMax() int {
var rLimit syscall.Rlimit
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit)
if err != nil {
glog.Errorf("unexpected error reading system maximum number of open file descriptors (RLIMIT_NOFILE): %v", err)
// returning 0 means don't render the value
glog.Errorf("Error reading system maximum number of open file descriptors (RLIMIT_NOFILE): %v", err)
return 0
}
glog.V(2).Infof("rlimit.max=%v", rLimit.Max)
return int(rLimit.Max)
}
const (
defBinary = "/usr/sbin/nginx"
cfgPath = "/etc/nginx/nginx.conf"
)
func nginxExecCommand(args ...string) *exec.Cmd {
ngx := os.Getenv("NGINX_BINARY")
if ngx == "" {
ngx = defBinary
}
cmdArgs := []string{"-c", cfgPath}
cmdArgs = append(cmdArgs, args...)
return exec.Command(ngx, cmdArgs...)
}
func nginxTestCommand(cfg string) *exec.Cmd {
ngx := os.Getenv("NGINX_BINARY")
if ngx == "" {
ngx = defBinary
}
return exec.Command(ngx, "-c", cfg, "-t")
}

View file

@ -0,0 +1,296 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"encoding/json"
"net"
"strings"
"time"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
type socketData struct {
Host string `json:"host"` // Label
Status string `json:"status"` // Label
RealIPAddress string `json:"realIpAddr"` // Label
RemoteAddress string `json:"remoteAddr"` // Label
RemoteUser string `json:"remoteUser"` // Label
BytesSent float64 `json:"bytesSent"` // Metric
Protocol string `json:"protocol"` // Label
Method string `json:"method"` // Label
URI string `json:"uri"` // Label
RequestLength float64 `json:"requestLength"` // Metric
RequestTime float64 `json:"requestTime"` // Metric
UpstreamName string `json:"upstreamName"` // Label
UpstreamIP string `json:"upstreamIP"` // Label
UpstreamResponseTime float64 `json:"upstreamResponseTime"` // Metric
UpstreamStatus string `json:"upstreamStatus"` // Label
Namespace string `json:"namespace"` // Label
Ingress string `json:"ingress"` // Label
Service string `json:"service"` // Label
}
// SocketCollector stores prometheus metrics and ingress meta-data
type SocketCollector struct {
upstreamResponseTime *prometheus.HistogramVec
requestTime *prometheus.HistogramVec
requestLength *prometheus.HistogramVec
bytesSent *prometheus.HistogramVec
collectorSuccess *prometheus.GaugeVec
collectorSuccessTime *prometheus.GaugeVec
requests *prometheus.CounterVec
listener net.Listener
ns string
ingressClass string
}
// NewInstance creates a new SocketCollector instance
func NewInstance(ns string, class string) error {
sc := SocketCollector{}
ns = strings.Replace(ns, "-", "_", -1)
listener, err := net.Listen("unix", "/tmp/prometheus-nginx.socket")
if err != nil {
return err
}
sc.listener = listener
sc.ns = ns
sc.ingressClass = class
requestTags := []string{"host", "status", "remote_address", "real_ip_address", "remote_user", "protocol", "method", "uri", "upstream_name", "upstream_ip", "upstream_status", "namespace", "ingress", "service"}
collectorTags := []string{"namespace", "ingress_class"}
sc.upstreamResponseTime = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "upstream_response_time_seconds",
Help: "The time spent on receiving the response from the upstream server",
Namespace: ns,
},
requestTags,
)
sc.requestTime = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "request_duration_seconds",
Help: "The request processing time in seconds",
Namespace: ns,
},
requestTags,
)
sc.requestLength = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "request_length_bytes",
Help: "The request length (including request line, header, and request body)",
Namespace: ns,
Buckets: prometheus.LinearBuckets(10, 10, 10), // 10 buckets, each 10 bytes wide.
},
requestTags,
)
sc.requests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "requests",
Help: "The total number of client requests.",
Namespace: ns,
},
collectorTags,
)
sc.bytesSent = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "bytes_sent",
Help: "The the number of bytes sent to a client",
Namespace: ns,
Buckets: prometheus.ExponentialBuckets(10, 10, 7), // 7 buckets, exponential factor of 10.
},
requestTags,
)
sc.collectorSuccess = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "collector_last_run_successful",
Help: "Whether the last collector run was successful (success = 1, failure = 0).",
Namespace: ns,
},
collectorTags,
)
sc.collectorSuccessTime = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "collector_last_run_successful_timestamp_seconds",
Help: "Timestamp of the last successful collector run",
Namespace: ns,
},
collectorTags,
)
prometheus.MustRegister(sc.upstreamResponseTime)
prometheus.MustRegister(sc.requestTime)
prometheus.MustRegister(sc.requestLength)
prometheus.MustRegister(sc.requests)
prometheus.MustRegister(sc.bytesSent)
prometheus.MustRegister(sc.collectorSuccess)
prometheus.MustRegister(sc.collectorSuccessTime)
go sc.Run()
return nil
}
func (sc *SocketCollector) handleMessage(msg []byte) {
glog.V(5).Infof("msg: %v", string(msg))
collectorSuccess := true
// Unmarshall bytes
var stats socketData
err := json.Unmarshal(msg, &stats)
if err != nil {
glog.Errorf("Unexpected error deserializing JSON paylod: %v", err)
collectorSuccess = false
return
}
// Create Request Labels Map
requestLabels := prometheus.Labels{
"host": stats.Host,
"status": stats.Status,
"remote_address": stats.RemoteAddress,
"real_ip_address": stats.RealIPAddress,
"remote_user": stats.RemoteUser,
"protocol": stats.Protocol,
"method": stats.Method,
"uri": stats.URI,
"upstream_name": stats.UpstreamName,
"upstream_ip": stats.UpstreamIP,
"upstream_status": stats.UpstreamStatus,
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"service": stats.Service,
}
// Create Collector Labels Map
collectorLabels := prometheus.Labels{
"namespace": sc.ns,
"ingress_class": sc.ingressClass,
}
// Emit metrics
requestsMetric, err := sc.requests.GetMetricWith(collectorLabels)
if err != nil {
glog.Errorf("Error fetching requests metric: %v", err)
collectorSuccess = false
} else {
requestsMetric.Inc()
}
if stats.UpstreamResponseTime != -1 {
upstreamResponseTimeMetric, err := sc.upstreamResponseTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching upstream response time metric: %v", err)
collectorSuccess = false
} else {
upstreamResponseTimeMetric.Observe(stats.UpstreamResponseTime)
}
}
if stats.RequestTime != -1 {
requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request duration metric: %v", err)
collectorSuccess = false
} else {
requestTimeMetric.Observe(stats.RequestTime)
}
}
if stats.RequestLength != -1 {
requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request length metric: %v", err)
collectorSuccess = false
} else {
requestLengthMetric.Observe(stats.RequestLength)
}
}
if stats.BytesSent != -1 {
bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching bytes sent metric: %v", err)
collectorSuccess = false
} else {
bytesSentMetric.Observe(stats.BytesSent)
}
}
collectorSuccessMetric, err := sc.collectorSuccess.GetMetricWith(collectorLabels)
if err != nil {
glog.Errorf("Error fetching collector success metric: %v", err)
} else {
if collectorSuccess {
collectorSuccessMetric.Set(1)
collectorSuccessTimeMetric, err := sc.collectorSuccessTime.GetMetricWith(collectorLabels)
if err != nil {
glog.Errorf("Error fetching collector success time metric: %v", err)
} else {
collectorSuccessTimeMetric.Set(float64(time.Now().Unix()))
}
} else {
collectorSuccessMetric.Set(0)
}
}
}
// Run listen for connections in the unix socket and spawns a goroutine to process the content
func (sc *SocketCollector) Run() {
for {
conn, err := sc.listener.Accept()
if err != nil {
continue
}
go handleMessages(conn, sc.handleMessage)
}
}
const packetSize = 1024 * 65
// handleMessages process the content received in a network connection
func handleMessages(conn net.Conn, fn func([]byte)) {
defer conn.Close()
msg := make([]byte, packetSize)
s, err := conn.Read(msg[0:])
if err != nil {
return
}
fn(msg[0:s])
}

View file

@ -0,0 +1,66 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"fmt"
"net"
"sync/atomic"
"testing"
"time"
)
func TestNewUDPLogListener(t *testing.T) {
var count uint64
fn := func(message []byte) {
t.Logf("message: %v", string(message))
atomic.AddUint64(&count, 1)
}
tmpFile := fmt.Sprintf("/tmp/test-socket-%v", time.Now().Nanosecond())
l, err := net.Listen("unix", tmpFile)
if err != nil {
t.Fatalf("unexpected error creating unix socket: %v", err)
}
if l == nil {
t.Fatalf("expected a listener but none returned")
}
defer l.Close()
go func() {
for {
conn, err := l.Accept()
if err != nil {
continue
}
go handleMessages(conn, fn)
}
}()
conn, _ := net.Dial("unix", tmpFile)
conn.Write([]byte("message"))
conn.Close()
time.Sleep(1 * time.Millisecond)
if count != 1 {
t.Errorf("expected only one message from the UDP listern but %v returned", count)
}
}

View file

@ -17,16 +17,30 @@ limitations under the License.
package collector
import (
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
var (
ac = regexp.MustCompile(`Active connections: (\d+)`)
sahr = regexp.MustCompile(`(\d+)\s(\d+)\s(\d+)`)
reading = regexp.MustCompile(`Reading: (\d+)`)
writing = regexp.MustCompile(`Writing: (\d+)`)
waiting = regexp.MustCompile(`Waiting: (\d+)`)
)
type (
nginxStatusCollector struct {
scrapeChan chan scrapeRequest
ngxHealthPort int
ngxVtsPath string
ngxStatusPath string
data *nginxStatusData
watchNamespace string
ingressClass string
@ -37,15 +51,33 @@ type (
requestsTotal *prometheus.Desc
connections *prometheus.Desc
}
basicStatus struct {
// Active total number of active connections
Active int
// Accepted total number of accepted client connections
Accepted int
// Handled total number of handled connections. Generally, the parameter value is the same as accepts unless some resource limits have been reached (for example, the worker_connections limit).
Handled int
// Requests total number of client requests.
Requests int
// Reading current number of connections where nginx is reading the request header.
Reading int
// Writing current number of connections where nginx is writing the response back to the client.
Writing int
// Waiting current number of idle client connections waiting for a request.
Waiting int
}
)
// NewNginxStatus returns a new prometheus collector the default nginx status module
func NewNginxStatus(watchNamespace, ingressClass string, ngxHealthPort int, ngxVtsPath string) Stopable {
// InitNGINXStatusCollector returns a new prometheus collector the default nginx status module
func InitNGINXStatusCollector(watchNamespace, ingressClass string, ngxHealthPort int) error {
const ns string = "nginx"
const ngxStatusPath = "/nginx_status"
p := nginxStatusCollector{
scrapeChan: make(chan scrapeRequest),
ngxHealthPort: ngxHealthPort,
ngxVtsPath: ngxVtsPath,
ngxStatusPath: ngxStatusPath,
watchNamespace: watchNamespace,
ingressClass: ingressClass,
}
@ -62,14 +94,20 @@ func NewNginxStatus(watchNamespace, ingressClass string, ngxHealthPort int, ngxV
[]string{"ingress_class", "namespace"}, nil),
connections: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "connnections"),
prometheus.BuildFQName(ns, "", "connections"),
"current number of client connections with state {reading, writing, waiting}",
[]string{"ingress_class", "namespace", "state"}, nil),
}
go p.start()
err := prometheus.Register(p)
return p
if err != nil {
return fmt.Errorf("error while registering nginx status collector : %v", err)
}
go p.Run()
return nil
}
// Describe implements prometheus.Collector.
@ -86,7 +124,7 @@ func (p nginxStatusCollector) Collect(ch chan<- prometheus.Metric) {
<-req.done
}
func (p nginxStatusCollector) start() {
func (p nginxStatusCollector) Run() {
for req := range p.scrapeChan {
ch := req.results
p.scrape(ch)
@ -98,9 +136,71 @@ func (p nginxStatusCollector) Stop() {
close(p.scrapeChan)
}
func httpBody(url string) ([]byte, error) {
resp, err := http.DefaultClient.Get(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx : %v", err)
}
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx (%v)", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, fmt.Errorf("unexpected error scraping nginx (status %v)", resp.StatusCode)
}
return data, nil
}
func toInt(data []string, pos int) int {
if len(data) == 0 {
return 0
}
if pos > len(data) {
return 0
}
if v, err := strconv.Atoi(data[pos]); err == nil {
return v
}
return 0
}
func parse(data string) *basicStatus {
acr := ac.FindStringSubmatch(data)
sahrr := sahr.FindStringSubmatch(data)
readingr := reading.FindStringSubmatch(data)
writingr := writing.FindStringSubmatch(data)
waitingr := waiting.FindStringSubmatch(data)
return &basicStatus{
toInt(acr, 1),
toInt(sahrr, 1),
toInt(sahrr, 2),
toInt(sahrr, 3),
toInt(readingr, 1),
toInt(writingr, 1),
toInt(waitingr, 1),
}
}
func getNginxStatus(port int, path string) (*basicStatus, error) {
url := fmt.Sprintf("http://0.0.0.0:%v%v", port, path)
glog.V(3).Infof("start scraping url: %v", url)
data, err := httpBody(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx status page: %v", err)
}
return parse(string(data)), nil
}
// nginxStatusCollector scrape the nginx status
func (p nginxStatusCollector) scrape(ch chan<- prometheus.Metric) {
s, err := getNginxStatus(p.ngxHealthPort, p.ngxVtsPath)
s, err := getNginxStatus(p.ngxHealthPort, p.ngxStatusPath)
if err != nil {
glog.Warningf("unexpected error obtaining nginx status info: %v", err)
return

View file

@ -26,6 +26,17 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
type scrapeRequest struct {
results chan<- prometheus.Metric
done chan struct{}
}
// Stopable defines a prometheus collector that can be stopped
type Stopable interface {
prometheus.Collector
Stop()
}
// BinaryNameMatcher ...
type BinaryNameMatcher struct {
Name string
@ -60,8 +71,8 @@ type namedProcess struct {
data namedProcessData
}
// NewNamedProcess returns a new prometheus collector for the nginx process
func NewNamedProcess(children bool, mn common.MatchNamer) (prometheus.Collector, error) {
// newNamedProcess returns a new prometheus collector for the nginx process
func newNamedProcess(children bool, mn common.MatchNamer) (prometheus.Collector, error) {
fs, err := proc.NewFS("/proc")
if err != nil {
return nil, err

View file

@ -32,7 +32,6 @@ import (
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/leaderelection"
@ -183,11 +182,6 @@ func NewStatusSyncer(config Config) Sync {
OnStartedLeading: func(stop <-chan struct{}) {
glog.V(2).Infof("I am the new status update leader")
go st.syncQueue.Run(time.Second, stop)
wait.PollUntil(updateInterval, func() (bool, error) {
// send a dummy object to the queue to force a sync
st.syncQueue.Enqueue("sync status")
return false, nil
}, stop)
},
OnStoppedLeading: func() {
glog.V(2).Infof("I am not status update leader anymore")

View file

@ -63,6 +63,9 @@ type Configuration struct {
// It contains information about the associated Server Name Indication (SNI).
// +optional
PassthroughBackends []*SSLPassthroughBackend `json:"passthroughBackends,omitempty"`
// ConfigurationChecksum contains the particular checksum of a Configuration object
ConfigurationChecksum string `json:"configurationChecksum,omitempty"`
}
// Backend describes one or more remote server/s (endpoints) associated with a service
@ -230,10 +233,6 @@ type Location struct {
// UsePortInRedirects indicates if redirects must specify the port
// +optional
UsePortInRedirects bool `json:"usePortInRedirects"`
// VtsFilterKey contains the vts filter key on the location level
// https://github.com/vozlt/nginx-module-vts#vhost_traffic_status_filter_by_set_key
// +optional
VtsFilterKey string `json:"vtsFilterKey,omitempty"`
// ConfigurationSnippet contains additional configuration for the backend
// to be considered in the configuration of the location
ConfigurationSnippet string `json:"configurationSnippet"`

View file

@ -104,6 +104,10 @@ func (c1 *Configuration) Equal(c2 *Configuration) bool {
}
}
if c1.ConfigurationChecksum != c2.ConfigurationChecksum {
return false
}
return true
}
@ -256,28 +260,34 @@ func (s1 *Server) Equal(s2 *Server) bool {
if s1.Hostname != s2.Hostname {
return false
}
if s1.Alias != s2.Alias {
return false
}
if s1.SSLPassthrough != s2.SSLPassthrough {
return false
}
if !(&s1.SSLCert).Equal(&s2.SSLCert) {
return false
}
if !(&s1.CertificateAuth).Equal(&s2.CertificateAuth) {
if s1.Alias != s2.Alias {
return false
}
if s1.RedirectFromToWWW != s2.RedirectFromToWWW {
return false
}
if len(s1.Locations) != len(s2.Locations) {
if !(&s1.CertificateAuth).Equal(&s2.CertificateAuth) {
return false
}
if s1.ServerSnippet != s2.ServerSnippet {
return false
}
if s1.SSLCiphers != s2.SSLCiphers {
return false
}
if s1.AuthTLSError != s2.AuthTLSError {
return false
}
if len(s1.Locations) != len(s2.Locations) {
return false
}
// Location are sorted
for idx, s1l := range s1.Locations {

View file

@ -21,6 +21,8 @@ import (
"net"
"os"
"testing"
"k8s.io/ingress-nginx/internal/file"
)
func TestGetDNSServers(t *testing.T) {
@ -32,22 +34,22 @@ func TestGetDNSServers(t *testing.T) {
t.Error("expected at least 1 nameserver in /etc/resolv.conf")
}
file, err := ioutil.TempFile("", "fw")
f, err := ioutil.TempFile("", "fw")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
defer file.Close()
defer os.Remove(file.Name())
defer f.Close()
defer os.Remove(f.Name())
ioutil.WriteFile(file.Name(), []byte(`
ioutil.WriteFile(f.Name(), []byte(`
# comment
; comment
nameserver 2001:4860:4860::8844
nameserver 2001:4860:4860::8888
nameserver 8.8.8.8
`), 0644)
`), file.ReadWriteByUser)
defResolvConf = file.Name()
defResolvConf = f.Name()
s, err = GetSystemNameServers()
if err != nil {
t.Fatalf("unexpected error reading /etc/resolv.conf file: %v", err)

View file

@ -22,6 +22,7 @@ import (
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
@ -50,23 +51,39 @@ type Queue struct {
// Element represents one item of the queue
type Element struct {
Key interface{}
Timestamp int64
Key interface{}
Timestamp int64
IsSkippable bool
}
// Run ...
// Run starts processing elements in the queue
func (t *Queue) Run(period time.Duration, stopCh <-chan struct{}) {
wait.Until(t.worker, period, stopCh)
}
// Enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) Enqueue(obj interface{}) {
// EnqueueTask enqueues ns/name of the given api object in the task queue.
func (t *Queue) EnqueueTask(obj interface{}) {
t.enqueue(obj, false)
}
// EnqueueSkippableTask enqueues ns/name of the given api object in
// the task queue that can be skipped
func (t *Queue) EnqueueSkippableTask(obj interface{}) {
t.enqueue(obj, true)
}
// enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) enqueue(obj interface{}, skippable bool) {
if t.IsShuttingDown() {
glog.Errorf("queue has been shutdown, failed to enqueue: %v", obj)
return
}
ts := time.Now().UnixNano()
if !skippable {
// make sure the timestamp is bigger than lastSync
ts = time.Now().Add(24 * time.Hour).UnixNano()
}
glog.V(3).Infof("queuing item %v", obj)
key, err := t.fn(obj)
if err != nil {
@ -166,3 +183,10 @@ func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (in
return q
}
// GetDummyObject returns a valid object that can be used in the Queue
func GetDummyObject(name string) *metav1.ObjectMeta {
return &metav1.ObjectMeta{
Name: name,
}
}

View file

@ -71,7 +71,7 @@ func TestEnqueueSuccess(t *testing.T) {
k: "testKey",
v: "testValue",
}
q.Enqueue(mo)
q.EnqueueSkippableTask(mo)
// wait for 'mockSynFn'
time.Sleep(time.Millisecond * 10)
if atomic.LoadUint32(&sr) != 1 {
@ -99,7 +99,7 @@ func TestEnqueueFailed(t *testing.T) {
q.Shutdown()
// wait for shutdown
time.Sleep(time.Millisecond * 10)
q.Enqueue(mo)
q.EnqueueSkippableTask(mo)
// wait for 'mockSynFn'
time.Sleep(time.Millisecond * 10)
// queue is shutdown, so mockSynFn should not be executed, so the result should be 0
@ -121,7 +121,7 @@ func TestEnqueueKeyError(t *testing.T) {
v: "testValue",
}
q.Enqueue(mo)
q.EnqueueSkippableTask(mo)
// wait for 'mockSynFn'
time.Sleep(time.Millisecond * 10)
// key error, so the result should be 0
@ -142,16 +142,16 @@ func TestSkipEnqueue(t *testing.T) {
k: "testKey",
v: "testValue",
}
q.Enqueue(mo)
q.Enqueue(mo)
q.Enqueue(mo)
q.Enqueue(mo)
q.EnqueueSkippableTask(mo)
q.EnqueueSkippableTask(mo)
q.EnqueueTask(mo)
q.EnqueueSkippableTask(mo)
// run queue
go q.Run(time.Second, stopCh)
// wait for 'mockSynFn'
time.Sleep(time.Millisecond * 10)
if atomic.LoadUint32(&sr) != 1 {
t.Errorf("sr should be 1, but is %d", sr)
if atomic.LoadUint32(&sr) != 2 {
t.Errorf("sr should be 2, but is %d", sr)
}
// shutdown queue before exit

View file

@ -1,30 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
// DummyFileWatcher noop implementation of a file watcher
type DummyFileWatcher struct{}
// NewDummyFileWatcher creates a FileWatcher using the DummyFileWatcher
func NewDummyFileWatcher(file string, onEvent func()) FileWatcher {
return DummyFileWatcher{}
}
// Close ends the watch
func (f DummyFileWatcher) Close() error {
return nil
}

View file

@ -21,6 +21,8 @@ import (
"os"
"testing"
"time"
"k8s.io/ingress-nginx/internal/file"
)
func prepareTimeout() chan bool {
@ -33,15 +35,15 @@ func prepareTimeout() chan bool {
}
func TestFileWatcher(t *testing.T) {
file, err := ioutil.TempFile("", "fw")
f, err := ioutil.TempFile("", "fw")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
defer file.Close()
defer os.Remove(file.Name())
defer f.Close()
defer os.Remove(f.Name())
count := 0
events := make(chan bool, 10)
fw, err := NewFileWatcher(file.Name(), func() {
fw, err := NewFileWatcher(f.Name(), func() {
count++
if count != 1 {
t.Fatalf("expected 1 but returned %v", count)
@ -58,7 +60,7 @@ func TestFileWatcher(t *testing.T) {
t.Fatalf("expected no events before writing a file")
case <-timeoutChan:
}
ioutil.WriteFile(file.Name(), []byte{}, 0644)
ioutil.WriteFile(f.Name(), []byte{}, file.ReadWriteByUser)
select {
case <-events:
case <-timeoutChan: