Replace Status port using a socket

This commit is contained in:
Manuel Alejandro de Brito Fontes 2019-01-21 11:29:36 -03:00
parent bd74dce19c
commit 34b0580225
No known key found for this signature in database
GPG key ID: 786136016A8BA02A
15 changed files with 357 additions and 309 deletions

View file

@ -21,13 +21,13 @@ import (
"net/http"
"strconv"
"strings"
"time"
"github.com/ncabatoff/process-exporter/proc"
"github.com/pkg/errors"
)
"k8s.io/klog"
const nginxPID = "/tmp/nginx.pid"
"k8s.io/ingress-nginx/internal/nginx"
)
// Name returns the healthcheck name
func (n NGINXController) Name() string {
@ -36,25 +36,25 @@ func (n NGINXController) Name() string {
// Check returns if the nginx healthz endpoint is returning ok (status code 200)
func (n *NGINXController) Check(_ *http.Request) error {
url := fmt.Sprintf("http://127.0.0.1:%v%v", n.cfg.ListenPorts.Status, ngxHealthPath)
timeout := n.cfg.HealthCheckTimeout
statusCode, err := simpleGet(url, timeout)
statusCode, _, err := nginx.NewGetStatusRequest(nginx.HealthPath)
if err != nil {
klog.Errorf("healthcheck error: %v", err)
return err
}
if statusCode != 200 {
klog.Errorf("healthcheck error: %v", statusCode)
return fmt.Errorf("ingress controller is not healthy")
}
url = fmt.Sprintf("http://127.0.0.1:%v/is-dynamic-lb-initialized", n.cfg.ListenPorts.Status)
statusCode, err = simpleGet(url, timeout)
statusCode, _, err = nginx.NewGetStatusRequest("/is-dynamic-lb-initialized")
if err != nil {
klog.Errorf("healthcheck error: %v", err)
return err
}
if statusCode != 200 {
klog.Errorf("healthcheck error: %v", statusCode)
return fmt.Errorf("dynamic load balancer not started")
}
@ -63,35 +63,14 @@ func (n *NGINXController) Check(_ *http.Request) error {
if err != nil {
return errors.Wrap(err, "unexpected error reading /proc directory")
}
f, err := n.fileSystem.ReadFile(nginxPID)
f, err := n.fileSystem.ReadFile(nginx.PID)
if err != nil {
return errors.Wrapf(err, "unexpected error reading %v", nginxPID)
return errors.Wrapf(err, "unexpected error reading %v", nginx.PID)
}
pid, err := strconv.Atoi(strings.TrimRight(string(f), "\r\n"))
if err != nil {
return errors.Wrapf(err, "unexpected error reading the nginx PID from %v", nginxPID)
return errors.Wrapf(err, "unexpected error reading the nginx PID from %v", nginx.PID)
}
_, err = fs.NewProc(pid)
return err
}
func simpleGet(url string, timeout time.Duration) (int, error) {
client := &http.Client{
Timeout: timeout * time.Second,
Transport: &http.Transport{DisableKeepAlives: true},
}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return -1, err
}
res, err := client.Do(req)
if err != nil {
return -1, err
}
defer res.Body.Close()
return res.StatusCode, nil
}

View file

@ -21,6 +21,7 @@ import (
"net"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"testing"
@ -29,27 +30,37 @@ import (
"k8s.io/ingress-nginx/internal/file"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
"k8s.io/ingress-nginx/internal/nginx"
)
func TestNginxCheck(t *testing.T) {
mux := http.NewServeMux()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "ok")
}))
listener, err := net.Listen("unix", nginx.StatusSocket)
if err != nil {
t.Errorf("crating unix listener: %s", err)
}
defer listener.Close()
defer os.Remove(nginx.StatusSocket)
server := &httptest.Server{
Listener: listener,
Config: &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "ok")
}),
},
}
defer server.Close()
// port to be used in the check
p := server.Listener.Addr().(*net.TCPAddr).Port
server.Start()
// mock filesystem
fs := filesystem.NewFakeFs()
fs := filesystem.DefaultFs{}
n := &NGINXController{
cfg: &Configuration{
ListenPorts: &ngx_config.ListenPorts{
Status: p,
},
ListenPorts: &ngx_config.ListenPorts{},
},
fileSystem: fs,
}
@ -62,7 +73,7 @@ func TestNginxCheck(t *testing.T) {
// create pid file
fs.MkdirAll("/tmp", file.ReadWriteByUser)
pidFile, err := fs.Create(nginxPID)
pidFile, err := fs.Create(nginx.PID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -102,20 +113,14 @@ func TestNginxCheck(t *testing.T) {
t.Error("expected an error but none returned")
}
})
t.Run("invalid port", func(t *testing.T) {
n.cfg.ListenPorts.Status = 9000
if err := callHealthz(true, mux); err == nil {
t.Error("expected an error but none returned")
}
})
}
func callHealthz(expErr bool, mux *http.ServeMux) error {
req, err := http.NewRequest("GET", "http://localhost:8080/healthz", nil)
req, err := http.NewRequest("GET", "/healthz", nil)
if err != nil {
return err
return fmt.Errorf("healthz error: %v", err)
}
w := httptest.NewRecorder()
mux.ServeHTTP(w, req)

View file

@ -726,6 +726,11 @@ type TemplateConfig struct {
PublishService *apiv1.Service
DynamicCertificatesEnabled bool
EnableMetrics bool
PID string
StatusSocket string
StatusPath string
StreamSocket string
}
// ListenPorts describe the ports required to run the
@ -733,7 +738,6 @@ type TemplateConfig struct {
type ListenPorts struct {
HTTP int
HTTPS int
Status int
Health int
Default int
SSLProxy int

View file

@ -65,7 +65,6 @@ type Configuration struct {
// +optional
UDPConfigMapName string
DefaultHealthzURL string
HealthCheckTimeout time.Duration
DefaultSSLCertificate string
@ -195,10 +194,8 @@ func (n *NGINXController) syncIngress(interface{}) error {
isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
if isFirstSync {
// For the initial sync it always takes some time for NGINX to
// start listening on the configured port (default 18080)
// For large configurations it might take a while so we loop
// and back off
// For the initial sync it always takes some time for NGINX to start listening
// For large configurations it might take a while so we loop and back off
klog.Info("Initial sync, sleeping for 1 second.")
time.Sleep(1 * time.Second)
}
@ -211,7 +208,7 @@ func (n *NGINXController) syncIngress(interface{}) error {
}
err := wait.ExponentialBackoff(retry, func() (bool, error) {
err := configureDynamically(pcfg, n.cfg.ListenPorts.Status, n.cfg.DynamicCertificatesEnabled)
err := configureDynamically(pcfg, n.cfg.DynamicCertificatesEnabled)
if err == nil {
klog.V(2).Infof("Dynamic reconfiguration succeeded.")
return true, nil
@ -255,7 +252,6 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
n.cfg.ListenPorts.HTTP,
n.cfg.ListenPorts.HTTPS,
n.cfg.ListenPorts.SSLProxy,
n.cfg.ListenPorts.Status,
n.cfg.ListenPorts.Health,
n.cfg.ListenPorts.Default,
}

View file

@ -60,14 +60,13 @@ import (
ing_net "k8s.io/ingress-nginx/internal/net"
"k8s.io/ingress-nginx/internal/net/dns"
"k8s.io/ingress-nginx/internal/net/ssl"
"k8s.io/ingress-nginx/internal/nginx"
"k8s.io/ingress-nginx/internal/task"
"k8s.io/ingress-nginx/internal/watch"
)
const (
ngxHealthPath = "/healthz"
nginxStreamSocket = "/tmp/ingress-stream.sock"
tempNginxPattern = "nginx-cfg"
tempNginxPattern = "nginx-cfg"
)
var (
@ -595,7 +594,6 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
Servers: ingressCfg.Servers,
TCPBackends: ingressCfg.TCPEndpoints,
UDPBackends: ingressCfg.UDPEndpoints,
HealthzURI: ngxHealthPath,
CustomErrors: len(cfg.CustomHTTPErrors) > 0,
Cfg: cfg,
IsIPV6Enabled: n.isIPV6Enabled && !cfg.DisableIpv6,
@ -607,6 +605,12 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
PublishService: n.GetPublishService(),
DynamicCertificatesEnabled: n.cfg.DynamicCertificatesEnabled,
EnableMetrics: n.cfg.EnableMetrics,
HealthzURI: nginx.HealthPath,
PID: nginx.PID,
StatusSocket: nginx.StatusSocket,
StatusPath: nginx.StatusPath,
StreamSocket: nginx.StreamSocket,
}
tc.Cfg.Checksum = ingressCfg.ConfigurationChecksum
@ -772,7 +776,7 @@ func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configurati
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertificatesEnabled bool) error {
func configureDynamically(pcfg *ingress.Configuration, isDynamicCertificatesEnabled bool) error {
backends := make([]*ingress.Backend, len(pcfg.Backends))
for i, backend := range pcfg.Backends {
@ -805,12 +809,15 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif
backends[i] = luaBackend
}
url := fmt.Sprintf("http://localhost:%d/configuration/backends", port)
err := post(url, backends)
statusCode, _, err := nginx.NewPostStatusRequest("/configuration/backends", "application/json", backends)
if err != nil {
return err
}
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
streams := make([]ingress.Backend, 0)
for _, ep := range pcfg.TCPEndpoints {
var service *apiv1.Service
@ -846,16 +853,19 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif
return err
}
url = fmt.Sprintf("http://localhost:%d/configuration/general", port)
err = post(url, &ingress.GeneralConfig{
statusCode, _, err = nginx.NewPostStatusRequest("/configuration/general", "application/json", ingress.GeneralConfig{
ControllerPodsCount: pcfg.ControllerPodsCount,
})
if err != nil {
return err
}
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
if isDynamicCertificatesEnabled {
err = configureCertificates(pcfg, port)
err = configureCertificates(pcfg)
if err != nil {
return err
}
@ -865,7 +875,7 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif
}
func updateStreamConfiguration(streams []ingress.Backend) error {
conn, err := net.Dial("unix", nginxStreamSocket)
conn, err := net.Dial("unix", nginx.StreamSocket)
if err != nil {
return err
}
@ -890,7 +900,7 @@ func updateStreamConfiguration(streams []ingress.Backend) error {
// configureCertificates JSON encodes certificates and POSTs it to an internal HTTP endpoint
// that is handled by Lua
func configureCertificates(pcfg *ingress.Configuration, port int) error {
func configureCertificates(pcfg *ingress.Configuration) error {
var servers []*ingress.Server
for _, server := range pcfg.Servers {
@ -902,30 +912,13 @@ func configureCertificates(pcfg *ingress.Configuration, port int) error {
})
}
url := fmt.Sprintf("http://localhost:%d/configuration/servers", port)
return post(url, servers)
}
func post(url string, data interface{}) error {
buf, err := json.Marshal(data)
statusCode, _, err := nginx.NewPostStatusRequest("/configuration/servers", "application/json", servers)
if err != nil {
return err
}
klog.V(2).Infof("Posting to %s", url)
resp, err := http.Post(url, "application/json", bytes.NewReader(buf))
if err != nil {
return err
}
defer func() {
if err := resp.Body.Close(); err != nil {
klog.Warningf("Error while closing response body:\n%v", err)
}
}()
if resp.StatusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", resp.StatusCode)
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
return nil

View file

@ -32,6 +32,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/nginx"
)
func TestIsDynamicConfigurationEnough(t *testing.T) {
@ -149,32 +150,62 @@ func TestIsDynamicConfigurationEnough(t *testing.T) {
}
}
func mockUnixSocket(t *testing.T) net.Listener {
l, err := net.Listen("unix", nginxStreamSocket)
if err != nil {
t.Fatalf("unexpected error creating unix socket: %v", err)
}
if l == nil {
t.Fatalf("expected a listener but none returned")
}
go func() {
for {
conn, err := l.Accept()
if err != nil {
continue
}
time.Sleep(100 * time.Millisecond)
defer conn.Close()
}
}()
return l
}
func TestConfigureDynamically(t *testing.T) {
l := mockUnixSocket(t)
defer l.Close()
listener, err := net.Listen("unix", nginx.StatusSocket)
if err != nil {
t.Errorf("crating unix listener: %s", err)
}
defer listener.Close()
defer os.Remove(nginx.StatusSocket)
streamListener, err := net.Listen("unix", nginx.StreamSocket)
if err != nil {
t.Errorf("crating unix listener: %s", err)
}
defer streamListener.Close()
defer os.Remove(nginx.StreamSocket)
server := &httptest.Server{
Listener: listener,
Config: &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusCreated)
if r.Method != "POST" {
t.Errorf("expected a 'POST' request, got '%s'", r.Method)
}
b, err := ioutil.ReadAll(r.Body)
if err != nil && err != io.EOF {
t.Fatal(err)
}
body := string(b)
switch r.URL.Path {
case "/configuration/backends":
{
if strings.Contains(body, "target") {
t.Errorf("unexpected target reference in JSON content: %v", body)
}
if !strings.Contains(body, "service") {
t.Errorf("service reference should be present in JSON content: %v", body)
}
}
case "/configuration/general":
{
if !strings.Contains(body, "controllerPodsCount") {
t.Errorf("controllerPodsCount should be present in JSON content: %v", body)
}
}
default:
t.Errorf("unknown request to %s", r.URL.Path)
}
}),
},
}
defer server.Close()
server.Start()
target := &apiv1.ObjectReference{}
@ -212,46 +243,7 @@ func TestConfigureDynamically(t *testing.T) {
ControllerPodsCount: 2,
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusCreated)
if r.Method != "POST" {
t.Errorf("expected a 'POST' request, got '%s'", r.Method)
}
b, err := ioutil.ReadAll(r.Body)
if err != nil && err != io.EOF {
t.Fatal(err)
}
body := string(b)
switch r.URL.Path {
case "/configuration/backends":
{
if strings.Contains(body, "target") {
t.Errorf("unexpected target reference in JSON content: %v", body)
}
if !strings.Contains(body, "service") {
t.Errorf("service reference should be present in JSON content: %v", body)
}
}
case "/configuration/general":
{
if !strings.Contains(body, "controllerPodsCount") {
t.Errorf("controllerPodsCount should be present in JSON content: %v", body)
}
}
default:
t.Errorf("unknown request to %s", r.URL.Path)
}
}))
port := ts.Listener.Addr().(*net.TCPAddr).Port
defer ts.Close()
err := configureDynamically(commonConfig, port, false)
err = configureDynamically(commonConfig, false)
if err != nil {
t.Errorf("unexpected error posting dynamic configuration: %v", err)
}
@ -262,6 +254,19 @@ func TestConfigureDynamically(t *testing.T) {
}
func TestConfigureCertificates(t *testing.T) {
listener, err := net.Listen("unix", nginx.StatusSocket)
if err != nil {
t.Errorf("crating unix listener: %s", err)
}
defer listener.Close()
defer os.Remove(nginx.StatusSocket)
streamListener, err := net.Listen("unix", nginx.StreamSocket)
if err != nil {
t.Errorf("crating unix listener: %s", err)
}
defer streamListener.Close()
defer os.Remove(nginx.StreamSocket)
servers := []*ingress.Server{{
Hostname: "myapp.fake",
@ -270,42 +275,46 @@ func TestConfigureCertificates(t *testing.T) {
},
}}
server := &httptest.Server{
Listener: listener,
Config: &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusCreated)
if r.Method != "POST" {
t.Errorf("expected a 'POST' request, got '%s'", r.Method)
}
b, err := ioutil.ReadAll(r.Body)
if err != nil && err != io.EOF {
t.Fatal(err)
}
var postedServers []ingress.Server
err = jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal(b, &postedServers)
if err != nil {
t.Fatal(err)
}
if len(servers) != len(postedServers) {
t.Errorf("Expected servers to be the same length as the posted servers")
}
for i, server := range servers {
if !server.Equal(&postedServers[i]) {
t.Errorf("Expected servers and posted servers to be equal")
}
}
}),
},
}
defer server.Close()
server.Start()
commonConfig := &ingress.Configuration{
Servers: servers,
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusCreated)
if r.Method != "POST" {
t.Errorf("expected a 'POST' request, got '%s'", r.Method)
}
b, err := ioutil.ReadAll(r.Body)
if err != nil && err != io.EOF {
t.Fatal(err)
}
var postedServers []ingress.Server
err = jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal(b, &postedServers)
if err != nil {
t.Fatal(err)
}
if len(servers) != len(postedServers) {
t.Errorf("Expected servers to be the same length as the posted servers")
}
for i, server := range servers {
if !server.Equal(&postedServers[i]) {
t.Errorf("Expected servers and posted servers to be equal")
}
}
}))
port := ts.Listener.Addr().(*net.TCPAddr).Port
defer ts.Close()
err := configureCertificates(commonConfig, port)
err = configureCertificates(commonConfig)
if err != nil {
t.Errorf("unexpected error posting dynamic certificate configuration: %v", err)
}