Merge pull request #3684 from aledbf/health

Replace Status port using a socket
This commit is contained in:
Kubernetes Prow Robot 2019-02-06 13:49:08 -08:00 committed by GitHub
commit 17e788b8e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 482 additions and 309 deletions

View file

@ -21,13 +21,13 @@ import (
"net/http"
"strconv"
"strings"
"time"
"github.com/ncabatoff/process-exporter/proc"
"github.com/pkg/errors"
)
"k8s.io/klog"
const nginxPID = "/tmp/nginx.pid"
"k8s.io/ingress-nginx/internal/nginx"
)
// Name returns the healthcheck name
func (n NGINXController) Name() string {
@ -36,25 +36,25 @@ func (n NGINXController) Name() string {
// Check returns if the nginx healthz endpoint is returning ok (status code 200)
func (n *NGINXController) Check(_ *http.Request) error {
url := fmt.Sprintf("http://127.0.0.1:%v%v", n.cfg.ListenPorts.Status, ngxHealthPath)
timeout := n.cfg.HealthCheckTimeout
statusCode, err := simpleGet(url, timeout)
statusCode, _, err := nginx.NewGetStatusRequest(nginx.HealthPath)
if err != nil {
klog.Errorf("healthcheck error: %v", err)
return err
}
if statusCode != 200 {
klog.Errorf("healthcheck error: %v", statusCode)
return fmt.Errorf("ingress controller is not healthy")
}
url = fmt.Sprintf("http://127.0.0.1:%v/is-dynamic-lb-initialized", n.cfg.ListenPorts.Status)
statusCode, err = simpleGet(url, timeout)
statusCode, _, err = nginx.NewGetStatusRequest("/is-dynamic-lb-initialized")
if err != nil {
klog.Errorf("healthcheck error: %v", err)
return err
}
if statusCode != 200 {
klog.Errorf("healthcheck error: %v", statusCode)
return fmt.Errorf("dynamic load balancer not started")
}
@ -63,35 +63,14 @@ func (n *NGINXController) Check(_ *http.Request) error {
if err != nil {
return errors.Wrap(err, "unexpected error reading /proc directory")
}
f, err := n.fileSystem.ReadFile(nginxPID)
f, err := n.fileSystem.ReadFile(nginx.PID)
if err != nil {
return errors.Wrapf(err, "unexpected error reading %v", nginxPID)
return errors.Wrapf(err, "unexpected error reading %v", nginx.PID)
}
pid, err := strconv.Atoi(strings.TrimRight(string(f), "\r\n"))
if err != nil {
return errors.Wrapf(err, "unexpected error reading the nginx PID from %v", nginxPID)
return errors.Wrapf(err, "unexpected error reading the nginx PID from %v", nginx.PID)
}
_, err = fs.NewProc(pid)
return err
}
func simpleGet(url string, timeout time.Duration) (int, error) {
client := &http.Client{
Timeout: timeout * time.Second,
Transport: &http.Transport{DisableKeepAlives: true},
}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return -1, err
}
res, err := client.Do(req)
if err != nil {
return -1, err
}
defer res.Body.Close()
return res.StatusCode, nil
}

View file

@ -21,6 +21,7 @@ import (
"net"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"testing"
@ -29,27 +30,37 @@ import (
"k8s.io/ingress-nginx/internal/file"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
"k8s.io/ingress-nginx/internal/nginx"
)
func TestNginxCheck(t *testing.T) {
mux := http.NewServeMux()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "ok")
}))
listener, err := net.Listen("unix", nginx.StatusSocket)
if err != nil {
t.Errorf("crating unix listener: %s", err)
}
defer listener.Close()
defer os.Remove(nginx.StatusSocket)
server := &httptest.Server{
Listener: listener,
Config: &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "ok")
}),
},
}
defer server.Close()
// port to be used in the check
p := server.Listener.Addr().(*net.TCPAddr).Port
server.Start()
// mock filesystem
fs := filesystem.NewFakeFs()
fs := filesystem.DefaultFs{}
n := &NGINXController{
cfg: &Configuration{
ListenPorts: &ngx_config.ListenPorts{
Status: p,
},
ListenPorts: &ngx_config.ListenPorts{},
},
fileSystem: fs,
}
@ -62,7 +73,7 @@ func TestNginxCheck(t *testing.T) {
// create pid file
fs.MkdirAll("/tmp", file.ReadWriteByUser)
pidFile, err := fs.Create(nginxPID)
pidFile, err := fs.Create(nginx.PID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -102,20 +113,14 @@ func TestNginxCheck(t *testing.T) {
t.Error("expected an error but none returned")
}
})
t.Run("invalid port", func(t *testing.T) {
n.cfg.ListenPorts.Status = 9000
if err := callHealthz(true, mux); err == nil {
t.Error("expected an error but none returned")
}
})
}
func callHealthz(expErr bool, mux *http.ServeMux) error {
req, err := http.NewRequest("GET", "http://localhost:8080/healthz", nil)
req, err := http.NewRequest("GET", "/healthz", nil)
if err != nil {
return err
return fmt.Errorf("healthz error: %v", err)
}
w := httptest.NewRecorder()
mux.ServeHTTP(w, req)

View file

@ -732,6 +732,11 @@ type TemplateConfig struct {
PublishService *apiv1.Service
DynamicCertificatesEnabled bool
EnableMetrics bool
PID string
StatusSocket string
StatusPath string
StreamSocket string
}
// ListenPorts describe the ports required to run the
@ -739,7 +744,6 @@ type TemplateConfig struct {
type ListenPorts struct {
HTTP int
HTTPS int
Status int
Health int
Default int
SSLProxy int

View file

@ -65,7 +65,6 @@ type Configuration struct {
// +optional
UDPConfigMapName string
DefaultHealthzURL string
HealthCheckTimeout time.Duration
DefaultSSLCertificate string
@ -195,10 +194,8 @@ func (n *NGINXController) syncIngress(interface{}) error {
isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
if isFirstSync {
// For the initial sync it always takes some time for NGINX to
// start listening on the configured port (default 18080)
// For large configurations it might take a while so we loop
// and back off
// For the initial sync it always takes some time for NGINX to start listening
// For large configurations it might take a while so we loop and back off
klog.Info("Initial sync, sleeping for 1 second.")
time.Sleep(1 * time.Second)
}
@ -211,7 +208,7 @@ func (n *NGINXController) syncIngress(interface{}) error {
}
err := wait.ExponentialBackoff(retry, func() (bool, error) {
err := configureDynamically(pcfg, n.cfg.ListenPorts.Status, n.cfg.DynamicCertificatesEnabled)
err := configureDynamically(pcfg, n.cfg.DynamicCertificatesEnabled)
if err == nil {
klog.V(2).Infof("Dynamic reconfiguration succeeded.")
return true, nil
@ -255,7 +252,6 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
n.cfg.ListenPorts.HTTP,
n.cfg.ListenPorts.HTTPS,
n.cfg.ListenPorts.SSLProxy,
n.cfg.ListenPorts.Status,
n.cfg.ListenPorts.Health,
n.cfg.ListenPorts.Default,
}

View file

@ -60,14 +60,13 @@ import (
ing_net "k8s.io/ingress-nginx/internal/net"
"k8s.io/ingress-nginx/internal/net/dns"
"k8s.io/ingress-nginx/internal/net/ssl"
"k8s.io/ingress-nginx/internal/nginx"
"k8s.io/ingress-nginx/internal/task"
"k8s.io/ingress-nginx/internal/watch"
)
const (
ngxHealthPath = "/healthz"
nginxStreamSocket = "/tmp/ingress-stream.sock"
tempNginxPattern = "nginx-cfg"
tempNginxPattern = "nginx-cfg"
)
var (
@ -595,7 +594,6 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
Servers: ingressCfg.Servers,
TCPBackends: ingressCfg.TCPEndpoints,
UDPBackends: ingressCfg.UDPEndpoints,
HealthzURI: ngxHealthPath,
CustomErrors: len(cfg.CustomHTTPErrors) > 0,
Cfg: cfg,
IsIPV6Enabled: n.isIPV6Enabled && !cfg.DisableIpv6,
@ -607,6 +605,12 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
PublishService: n.GetPublishService(),
DynamicCertificatesEnabled: n.cfg.DynamicCertificatesEnabled,
EnableMetrics: n.cfg.EnableMetrics,
HealthzURI: nginx.HealthPath,
PID: nginx.PID,
StatusSocket: nginx.StatusSocket,
StatusPath: nginx.StatusPath,
StreamSocket: nginx.StreamSocket,
}
tc.Cfg.Checksum = ingressCfg.ConfigurationChecksum
@ -772,7 +776,7 @@ func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configurati
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertificatesEnabled bool) error {
func configureDynamically(pcfg *ingress.Configuration, isDynamicCertificatesEnabled bool) error {
backends := make([]*ingress.Backend, len(pcfg.Backends))
for i, backend := range pcfg.Backends {
@ -805,12 +809,15 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif
backends[i] = luaBackend
}
url := fmt.Sprintf("http://localhost:%d/configuration/backends", port)
err := post(url, backends)
statusCode, _, err := nginx.NewPostStatusRequest("/configuration/backends", "application/json", backends)
if err != nil {
return err
}
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
streams := make([]ingress.Backend, 0)
for _, ep := range pcfg.TCPEndpoints {
var service *apiv1.Service
@ -846,16 +853,19 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif
return err
}
url = fmt.Sprintf("http://localhost:%d/configuration/general", port)
err = post(url, &ingress.GeneralConfig{
statusCode, _, err = nginx.NewPostStatusRequest("/configuration/general", "application/json", ingress.GeneralConfig{
ControllerPodsCount: pcfg.ControllerPodsCount,
})
if err != nil {
return err
}
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
if isDynamicCertificatesEnabled {
err = configureCertificates(pcfg, port)
err = configureCertificates(pcfg)
if err != nil {
return err
}
@ -865,7 +875,7 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif
}
func updateStreamConfiguration(streams []ingress.Backend) error {
conn, err := net.Dial("unix", nginxStreamSocket)
conn, err := net.Dial("unix", nginx.StreamSocket)
if err != nil {
return err
}
@ -890,7 +900,7 @@ func updateStreamConfiguration(streams []ingress.Backend) error {
// configureCertificates JSON encodes certificates and POSTs it to an internal HTTP endpoint
// that is handled by Lua
func configureCertificates(pcfg *ingress.Configuration, port int) error {
func configureCertificates(pcfg *ingress.Configuration) error {
var servers []*ingress.Server
for _, server := range pcfg.Servers {
@ -902,30 +912,13 @@ func configureCertificates(pcfg *ingress.Configuration, port int) error {
})
}
url := fmt.Sprintf("http://localhost:%d/configuration/servers", port)
return post(url, servers)
}
func post(url string, data interface{}) error {
buf, err := json.Marshal(data)
statusCode, _, err := nginx.NewPostStatusRequest("/configuration/servers", "application/json", servers)
if err != nil {
return err
}
klog.V(2).Infof("Posting to %s", url)
resp, err := http.Post(url, "application/json", bytes.NewReader(buf))
if err != nil {
return err
}
defer func() {
if err := resp.Body.Close(); err != nil {
klog.Warningf("Error while closing response body:\n%v", err)
}
}()
if resp.StatusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", resp.StatusCode)
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
return nil

View file

@ -32,6 +32,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/nginx"
)
func TestIsDynamicConfigurationEnough(t *testing.T) {
@ -149,32 +150,62 @@ func TestIsDynamicConfigurationEnough(t *testing.T) {
}
}
func mockUnixSocket(t *testing.T) net.Listener {
l, err := net.Listen("unix", nginxStreamSocket)
if err != nil {
t.Fatalf("unexpected error creating unix socket: %v", err)
}
if l == nil {
t.Fatalf("expected a listener but none returned")
}
go func() {
for {
conn, err := l.Accept()
if err != nil {
continue
}
time.Sleep(100 * time.Millisecond)
defer conn.Close()
}
}()
return l
}
func TestConfigureDynamically(t *testing.T) {
l := mockUnixSocket(t)
defer l.Close()
listener, err := net.Listen("unix", nginx.StatusSocket)
if err != nil {
t.Errorf("crating unix listener: %s", err)
}
defer listener.Close()
defer os.Remove(nginx.StatusSocket)
streamListener, err := net.Listen("unix", nginx.StreamSocket)
if err != nil {
t.Errorf("crating unix listener: %s", err)
}
defer streamListener.Close()
defer os.Remove(nginx.StreamSocket)
server := &httptest.Server{
Listener: listener,
Config: &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusCreated)
if r.Method != "POST" {
t.Errorf("expected a 'POST' request, got '%s'", r.Method)
}
b, err := ioutil.ReadAll(r.Body)
if err != nil && err != io.EOF {
t.Fatal(err)
}
body := string(b)
switch r.URL.Path {
case "/configuration/backends":
{
if strings.Contains(body, "target") {
t.Errorf("unexpected target reference in JSON content: %v", body)
}
if !strings.Contains(body, "service") {
t.Errorf("service reference should be present in JSON content: %v", body)
}
}
case "/configuration/general":
{
if !strings.Contains(body, "controllerPodsCount") {
t.Errorf("controllerPodsCount should be present in JSON content: %v", body)
}
}
default:
t.Errorf("unknown request to %s", r.URL.Path)
}
}),
},
}
defer server.Close()
server.Start()
target := &apiv1.ObjectReference{}
@ -212,46 +243,7 @@ func TestConfigureDynamically(t *testing.T) {
ControllerPodsCount: 2,
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusCreated)
if r.Method != "POST" {
t.Errorf("expected a 'POST' request, got '%s'", r.Method)
}
b, err := ioutil.ReadAll(r.Body)
if err != nil && err != io.EOF {
t.Fatal(err)
}
body := string(b)
switch r.URL.Path {
case "/configuration/backends":
{
if strings.Contains(body, "target") {
t.Errorf("unexpected target reference in JSON content: %v", body)
}
if !strings.Contains(body, "service") {
t.Errorf("service reference should be present in JSON content: %v", body)
}
}
case "/configuration/general":
{
if !strings.Contains(body, "controllerPodsCount") {
t.Errorf("controllerPodsCount should be present in JSON content: %v", body)
}
}
default:
t.Errorf("unknown request to %s", r.URL.Path)
}
}))
port := ts.Listener.Addr().(*net.TCPAddr).Port
defer ts.Close()
err := configureDynamically(commonConfig, port, false)
err = configureDynamically(commonConfig, false)
if err != nil {
t.Errorf("unexpected error posting dynamic configuration: %v", err)
}
@ -262,6 +254,19 @@ func TestConfigureDynamically(t *testing.T) {
}
func TestConfigureCertificates(t *testing.T) {
listener, err := net.Listen("unix", nginx.StatusSocket)
if err != nil {
t.Errorf("crating unix listener: %s", err)
}
defer listener.Close()
defer os.Remove(nginx.StatusSocket)
streamListener, err := net.Listen("unix", nginx.StreamSocket)
if err != nil {
t.Errorf("crating unix listener: %s", err)
}
defer streamListener.Close()
defer os.Remove(nginx.StreamSocket)
servers := []*ingress.Server{{
Hostname: "myapp.fake",
@ -270,42 +275,46 @@ func TestConfigureCertificates(t *testing.T) {
},
}}
server := &httptest.Server{
Listener: listener,
Config: &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusCreated)
if r.Method != "POST" {
t.Errorf("expected a 'POST' request, got '%s'", r.Method)
}
b, err := ioutil.ReadAll(r.Body)
if err != nil && err != io.EOF {
t.Fatal(err)
}
var postedServers []ingress.Server
err = jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal(b, &postedServers)
if err != nil {
t.Fatal(err)
}
if len(servers) != len(postedServers) {
t.Errorf("Expected servers to be the same length as the posted servers")
}
for i, server := range servers {
if !server.Equal(&postedServers[i]) {
t.Errorf("Expected servers and posted servers to be equal")
}
}
}),
},
}
defer server.Close()
server.Start()
commonConfig := &ingress.Configuration{
Servers: servers,
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusCreated)
if r.Method != "POST" {
t.Errorf("expected a 'POST' request, got '%s'", r.Method)
}
b, err := ioutil.ReadAll(r.Body)
if err != nil && err != io.EOF {
t.Fatal(err)
}
var postedServers []ingress.Server
err = jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal(b, &postedServers)
if err != nil {
t.Fatal(err)
}
if len(servers) != len(postedServers) {
t.Errorf("Expected servers to be the same length as the posted servers")
}
for i, server := range servers {
if !server.Equal(&postedServers[i]) {
t.Errorf("Expected servers and posted servers to be equal")
}
}
}))
port := ts.Listener.Addr().(*net.TCPAddr).Port
defer ts.Close()
err := configureCertificates(commonConfig, port)
err = configureCertificates(commonConfig)
if err != nil {
t.Errorf("unexpected error posting dynamic certificate configuration: %v", err)
}

View file

@ -17,13 +17,12 @@ limitations under the License.
package collectors
import (
"fmt"
"io/ioutil"
"net/http"
"log"
"regexp"
"strconv"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/ingress-nginx/internal/nginx"
"k8s.io/klog"
)
@ -39,9 +38,6 @@ type (
nginxStatusCollector struct {
scrapeChan chan scrapeRequest
ngxHealthPort int
ngxStatusPath string
data *nginxStatusData
}
@ -78,12 +74,10 @@ type NGINXStatusCollector interface {
}
// NewNGINXStatus returns a new prometheus collector the default nginx status module
func NewNGINXStatus(podName, namespace, ingressClass string, ngxHealthPort int) (NGINXStatusCollector, error) {
func NewNGINXStatus(podName, namespace, ingressClass string) (NGINXStatusCollector, error) {
p := nginxStatusCollector{
scrapeChan: make(chan scrapeRequest),
ngxHealthPort: ngxHealthPort,
ngxStatusPath: "/nginx_status",
scrapeChan: make(chan scrapeRequest),
}
constLabels := prometheus.Labels{
@ -138,24 +132,6 @@ func (p nginxStatusCollector) Stop() {
close(p.scrapeChan)
}
func httpBody(url string) ([]byte, error) {
resp, err := http.DefaultClient.Get(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx : %v", err)
}
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx (%v)", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, fmt.Errorf("unexpected error scraping nginx (status %v)", resp.StatusCode)
}
return data, nil
}
func toInt(data []string, pos int) int {
if len(data) == 0 {
return 0
@ -187,27 +163,23 @@ func parse(data string) *basicStatus {
}
}
func getNginxStatus(port int, path string) (*basicStatus, error) {
url := fmt.Sprintf("http://0.0.0.0:%v%v", port, path)
klog.V(3).Infof("start scraping url: %v", url)
data, err := httpBody(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx status page: %v", err)
}
return parse(string(data)), nil
}
// nginxStatusCollector scrape the nginx status
func (p nginxStatusCollector) scrape(ch chan<- prometheus.Metric) {
s, err := getNginxStatus(p.ngxHealthPort, p.ngxStatusPath)
klog.V(3).Infof("start scraping socket: %v", nginx.StatusPath)
status, data, err := nginx.NewGetStatusRequest(nginx.StatusPath)
if err != nil {
log.Printf("%v", err)
klog.Warningf("unexpected error obtaining nginx status info: %v", err)
return
}
if status < 200 || status >= 400 {
klog.Warningf("unexpected error obtaining nginx status info (status %v)", status)
return
}
s := parse(string(data))
ch <- prometheus.MustNewConstMetric(p.data.connectionsTotal,
prometheus.CounterValue, float64(s.Active), "active")
ch <- prometheus.MustNewConstMetric(p.data.connectionsTotal,

View file

@ -21,9 +21,12 @@ import (
"net"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/ingress-nginx/internal/nginx"
)
func TestStatusCollector(t *testing.T) {
@ -96,24 +99,39 @@ func TestStatusCollector(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, c.mock)
}))
p := server.Listener.Addr().(*net.TCPAddr).Port
listener, err := net.Listen("unix", nginx.StatusSocket)
if err != nil {
t.Fatalf("crating unix listener: %s", err)
}
cm, err := NewNGINXStatus("pod", "default", "nginx", p)
server := &httptest.Server{
Listener: listener,
Config: &http.Server{Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if r.URL.Path == "/nginx_status" {
_, err := fmt.Fprintf(w, c.mock)
if err != nil {
t.Fatal(err)
}
return
}
fmt.Fprintf(w, "OK")
})},
}
server.Start()
time.Sleep(1 * time.Second)
cm, err := NewNGINXStatus("pod", "default", "nginx")
if err != nil {
t.Errorf("unexpected error creating nginx status collector: %v", err)
}
go cm.Start()
defer func() {
server.Close()
cm.Stop()
}()
reg := prometheus.NewPedanticRegistry()
if err := reg.Register(cm); err != nil {
t.Errorf("registering collector failed: %s", err)
@ -124,6 +142,12 @@ func TestStatusCollector(t *testing.T) {
}
reg.Unregister(cm)
server.Close()
cm.Stop()
listener.Close()
os.Remove(nginx.StatusSocket)
})
}
}

View file

@ -59,7 +59,7 @@ type collector struct {
}
// NewCollector creates a new metric collector the for ingress controller
func NewCollector(statusPort int, metricsPerHost bool, registry *prometheus.Registry) (Collector, error) {
func NewCollector(metricsPerHost bool, registry *prometheus.Registry) (Collector, error) {
podNamespace := os.Getenv("POD_NAMESPACE")
if podNamespace == "" {
podNamespace = "default"
@ -67,7 +67,7 @@ func NewCollector(statusPort int, metricsPerHost bool, registry *prometheus.Regi
podName := os.Getenv("POD_NAME")
nc, err := collectors.NewNGINXStatus(podName, podNamespace, class.IngressClass, statusPort)
nc, err := collectors.NewNGINXStatus(podName, podNamespace, class.IngressClass)
if err != nil {
return nil, err
}

101
internal/nginx/main.go Normal file
View file

@ -0,0 +1,101 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nginx
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"
"github.com/tv42/httpunix"
)
// PID defines the location of the pid file used by NGINX
var PID = "/tmp/nginx.pid"
// StatusSocket defines the location of the unix socket used by NGINX for the status server
var StatusSocket = "/tmp/nginx-status-server.sock"
// HealthPath defines the path used to define the health check location in NGINX
var HealthPath = "/healthz"
// StatusPath defines the path used to expose the NGINX status page
// http://nginx.org/en/docs/http/ngx_http_stub_status_module.html
var StatusPath = "/nginx_status"
// StreamSocket defines the location of the unix socket used by NGINX for the NGINX stream configuration socket
var StreamSocket = "/tmp/ingress-stream.sock"
var statusLocation = "nginx-status"
var socketClient = buildUnixSocketClient()
// NewGetStatusRequest creates a new GET request to the internal NGINX status server
func NewGetStatusRequest(path string) (int, []byte, error) {
url := fmt.Sprintf("http+unix://%v%v", statusLocation, path)
res, err := socketClient.Get(url)
if err != nil {
return 0, nil, err
}
defer res.Body.Close()
data, err := ioutil.ReadAll(res.Body)
if err != nil {
return 0, nil, err
}
return res.StatusCode, data, nil
}
// NewPostStatusRequest creates a new POST request to the internal NGINX status server
func NewPostStatusRequest(path, contentType string, data interface{}) (int, []byte, error) {
url := fmt.Sprintf("http+unix://%v%v", statusLocation, path)
buf, err := json.Marshal(data)
if err != nil {
return 0, nil, err
}
res, err := socketClient.Post(url, contentType, bytes.NewReader(buf))
if err != nil {
return 0, nil, err
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return 0, nil, err
}
return res.StatusCode, body, nil
}
func buildUnixSocketClient() *http.Client {
u := &httpunix.Transport{
DialTimeout: 1 * time.Second,
RequestTimeout: 10 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
}
u.RegisterLocation(statusLocation, StatusSocket)
return &http.Client{
Transport: u,
}
}