Implement a validation webhook

In case some ingress have a syntax error in the snippet configuration,
the freshly generated configuration will not be reloaded to prevent tearing down existing rules.
Although, once inserted, this configuration is preventing from any other valid configuration to be inserted as it remains in the ingresses of the cluster.
To solve this problem, implement an optional validation webhook that simulates the addition of the ingress to be added together with the rest of ingresses.
In case the generated configuration is not validated by nginx, deny the insertion of the ingress.

In case certificates are mounted using kubernetes secrets, when those
changes, keys are automatically updated in the container volume, and the
controller reloads it using the filewatcher.

Related changes:

- Update vendors
- Extract useful functions to check configuration with an additional ingress
- Update documentation for validating webhook
- Add validating webhook examples
- Add a metric for each syntax check success and errors
- Add more certificate generation examples
This commit is contained in:
Thibault Jamet 2019-02-21 20:45:21 +01:00
parent 7283a01b9f
commit 1cd17cd12c
No known key found for this signature in database
GPG key ID: 9D28A304A3810C17
30 changed files with 3314 additions and 131 deletions

View file

@ -23,23 +23,21 @@ import (
"strings"
"time"
"k8s.io/ingress-nginx/internal/ingress/annotations/log"
"github.com/mitchellh/hashstructure"
"k8s.io/klog"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
"k8s.io/ingress-nginx/internal/ingress/annotations/log"
"k8s.io/ingress-nginx/internal/ingress/annotations/proxy"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/klog"
)
const (
@ -96,6 +94,10 @@ type Configuration struct {
DynamicCertificatesEnabled bool
DisableCatchAll bool
ValidationWebhook string
ValidationWebhookCertPath string
ValidationWebhookKeyPath string
}
// GetPublishService returns the Service used to set the load-balancer status of Ingresses.
@ -120,47 +122,7 @@ func (n *NGINXController) syncIngress(interface{}) error {
ings := n.store.ListIngresses()
upstreams, servers := n.getBackendServers(ings)
var passUpstreams []*ingress.SSLPassthroughBackend
hosts := sets.NewString()
for _, server := range servers {
if !hosts.Has(server.Hostname) {
hosts.Insert(server.Hostname)
}
if server.Alias != "" && !hosts.Has(server.Alias) {
hosts.Insert(server.Alias)
}
if !server.SSLPassthrough {
continue
}
for _, loc := range server.Locations {
if loc.Path != rootLocation {
klog.Warningf("Ignoring SSL Passthrough for location %q in server %q", loc.Path, server.Hostname)
continue
}
passUpstreams = append(passUpstreams, &ingress.SSLPassthroughBackend{
Backend: loc.Backend,
Hostname: server.Hostname,
Service: loc.Service,
Port: loc.Port,
})
break
}
}
pcfg := &ingress.Configuration{
Backends: upstreams,
Servers: servers,
TCPEndpoints: n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
UDPEndpoints: n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
PassthroughBackends: passUpstreams,
BackendConfigChecksum: n.store.GetBackendConfiguration().Checksum,
ControllerPodsCount: n.store.GetRunningControllerPodsCount(),
}
hosts, servers, pcfg := n.getConfiguration(ings)
if n.runningConfig.Equal(pcfg) {
klog.V(3).Infof("No configuration change detected, skipping backend reload.")
@ -235,6 +197,60 @@ func (n *NGINXController) syncIngress(interface{}) error {
return nil
}
// CheckIngress returns an error in case the provided ingress, when added
// to the current configuration, generates an invalid configuration
func (n *NGINXController) CheckIngress(ing *extensions.Ingress) error {
if n == nil {
return fmt.Errorf("cannot check ingress on a nil ingress controller")
}
if ing == nil {
// no ingress to add, no state change
return nil
}
if !class.IsValid(ing) {
klog.Infof("ignoring ingress %v in %v based on annotation %v", ing.Name, ing.ObjectMeta.Namespace, class.IngressKey)
return nil
}
if n.cfg.Namespace != "" && ing.ObjectMeta.Namespace != n.cfg.Namespace {
klog.Infof("ignoring ingress %v in namespace %v different from the namespace watched %s", ing.Name, ing.ObjectMeta.Namespace, n.cfg.Namespace)
return nil
}
ings := n.store.ListIngresses()
newIngress := &ingress.Ingress{
Ingress: *ing,
ParsedAnnotations: annotations.NewAnnotationExtractor(n.store).Extract(ing),
}
for i, ingress := range ings {
if ingress.Ingress.ObjectMeta.Name == ing.ObjectMeta.Name && ingress.Ingress.ObjectMeta.Namespace == ing.ObjectMeta.Namespace {
ings[i] = newIngress
newIngress = nil
}
}
if newIngress != nil {
ings = append(ings, newIngress)
}
_, _, pcfg := n.getConfiguration(ings)
cfg := n.store.GetBackendConfiguration()
cfg.Resolver = n.resolver
content, err := n.generateTemplate(cfg, *pcfg)
if err != nil {
n.metricCollector.IncCheckErrorCount(ing.ObjectMeta.Namespace, ing.Name)
return err
}
err = n.testTemplate(content)
if err != nil {
n.metricCollector.IncCheckErrorCount(ing.ObjectMeta.Namespace, ing.Name)
} else {
n.metricCollector.IncCheckCount(ing.ObjectMeta.Namespace, ing.Name)
}
return err
}
func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Protocol) []ingress.L4Service {
if configmapName == "" {
return []ingress.L4Service{}
@ -380,6 +396,51 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
return upstream
}
// getConfiguration returns the configuration matching the standard kubernetes ingress
func (n *NGINXController) getConfiguration(ingresses []*ingress.Ingress) (sets.String, []*ingress.Server, *ingress.Configuration) {
upstreams, servers := n.getBackendServers(ingresses)
var passUpstreams []*ingress.SSLPassthroughBackend
hosts := sets.NewString()
for _, server := range servers {
if !hosts.Has(server.Hostname) {
hosts.Insert(server.Hostname)
}
if server.Alias != "" && !hosts.Has(server.Alias) {
hosts.Insert(server.Alias)
}
if !server.SSLPassthrough {
continue
}
for _, loc := range server.Locations {
if loc.Path != rootLocation {
klog.Warningf("Ignoring SSL Passthrough for location %q in server %q", loc.Path, server.Hostname)
continue
}
passUpstreams = append(passUpstreams, &ingress.SSLPassthroughBackend{
Backend: loc.Backend,
Hostname: server.Hostname,
Service: loc.Service,
Port: loc.Port,
})
break
}
}
return hosts, servers, &ingress.Configuration{
Backends: upstreams,
Servers: servers,
TCPEndpoints: n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
UDPEndpoints: n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
PassthroughBackends: passUpstreams,
BackendConfigChecksum: n.store.GetBackendConfiguration().Checksum,
ControllerPodsCount: n.store.GetRunningControllerPodsCount(),
}
}
// getBackendServers returns a list of Upstream and Server to be used by the
// backend. An upstream can be used in multiple servers if the namespace,
// service name and port are the same.

View file

@ -21,12 +21,17 @@ import (
"crypto/x509/pkix"
"encoding/asn1"
"fmt"
"time"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
"github.com/eapache/channels"
"k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
@ -35,14 +40,216 @@ import (
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations"
"k8s.io/ingress-nginx/internal/ingress/annotations/canary"
"k8s.io/ingress-nginx/internal/ingress/controller/config"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
"k8s.io/ingress-nginx/internal/ingress/controller/store"
"k8s.io/ingress-nginx/internal/ingress/defaults"
"k8s.io/ingress-nginx/internal/ingress/metric"
"k8s.io/ingress-nginx/internal/ingress/resolver"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/internal/net/ssl"
)
const fakeCertificateName = "default-fake-certificate"
type fakeIngressStore struct {
ingresses []*ingress.Ingress
}
func (fakeIngressStore) GetBackendConfiguration() ngx_config.Configuration {
return ngx_config.Configuration{}
}
func (fakeIngressStore) GetConfigMap(key string) (*corev1.ConfigMap, error) {
return nil, fmt.Errorf("test error")
}
func (fakeIngressStore) GetSecret(key string) (*corev1.Secret, error) {
return nil, fmt.Errorf("test error")
}
func (fakeIngressStore) GetService(key string) (*corev1.Service, error) {
return nil, fmt.Errorf("test error")
}
func (fakeIngressStore) GetServiceEndpoints(key string) (*corev1.Endpoints, error) {
return nil, fmt.Errorf("test error")
}
func (fis fakeIngressStore) ListIngresses() []*ingress.Ingress {
return fis.ingresses
}
func (fakeIngressStore) GetRunningControllerPodsCount() int {
return 0
}
func (fakeIngressStore) GetLocalSSLCert(name string) (*ingress.SSLCert, error) {
return nil, fmt.Errorf("test error")
}
func (fakeIngressStore) ListLocalSSLCerts() []*ingress.SSLCert {
return nil
}
func (fakeIngressStore) GetAuthCertificate(string) (*resolver.AuthSSLCert, error) {
return nil, fmt.Errorf("test error")
}
func (fakeIngressStore) GetDefaultBackend() defaults.Backend {
return defaults.Backend{}
}
func (fakeIngressStore) Run(stopCh chan struct{}) {}
type testNginxTestCommand struct {
t *testing.T
expected string
out []byte
err error
}
func (ntc testNginxTestCommand) ExecCommand(args ...string) *exec.Cmd {
return nil
}
func (ntc testNginxTestCommand) Test(cfg string) ([]byte, error) {
fd, err := os.Open(cfg)
if err != nil {
ntc.t.Errorf("could not read generated nginx configuration: %v", err.Error())
return nil, err
}
defer fd.Close()
bytes, err := ioutil.ReadAll(fd)
if err != nil {
ntc.t.Errorf("could not read generated nginx configuration: %v", err.Error())
}
if string(bytes) != ntc.expected {
ntc.t.Errorf("unexpected generated configuration %v. Expecting %v", string(bytes), ntc.expected)
}
return ntc.out, ntc.err
}
type fakeTemplate struct{}
func (fakeTemplate) Write(conf config.TemplateConfig) ([]byte, error) {
r := []byte{}
for _, s := range conf.Servers {
if len(r) > 0 {
r = append(r, ',')
}
r = append(r, []byte(s.Hostname)...)
}
return r, nil
}
func TestCheckIngress(t *testing.T) {
defer func() {
filepath.Walk(os.TempDir(), func(path string, info os.FileInfo, err error) error {
if info.IsDir() && os.TempDir() != path {
return filepath.SkipDir
}
if strings.HasPrefix(info.Name(), tempNginxPattern) {
os.Remove(path)
}
return nil
})
}()
// Ensure no panic with wrong arguments
var nginx *NGINXController
nginx.CheckIngress(nil)
nginx = newNGINXController(t)
nginx.CheckIngress(nil)
nginx.metricCollector = metric.DummyCollector{}
nginx.t = fakeTemplate{}
nginx.store = fakeIngressStore{
ingresses: []*ingress.Ingress{},
}
ing := &extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "test-ingress",
Namespace: "user-namespace",
Annotations: map[string]string{},
},
Spec: extensions.IngressSpec{
Rules: []extensions.IngressRule{
{
Host: "example.com",
},
},
},
}
t.Run("When the ingress class differs from nginx", func(t *testing.T) {
ing.ObjectMeta.Annotations["kubernetes.io/ingress.class"] = "different"
nginx.command = testNginxTestCommand{
t: t,
err: fmt.Errorf("test error"),
}
if nginx.CheckIngress(ing) != nil {
t.Errorf("with a different ingress class, no error should be returned")
}
})
t.Run("when the class is the nginx one", func(t *testing.T) {
ing.ObjectMeta.Annotations["kubernetes.io/ingress.class"] = "nginx"
nginx.command = testNginxTestCommand{
t: t,
err: nil,
expected: "_,example.com",
}
if nginx.CheckIngress(ing) != nil {
t.Errorf("with a new ingress without error, no error should be returned")
}
t.Run("When the hostname is updated", func(t *testing.T) {
nginx.store = fakeIngressStore{
ingresses: []*ingress.Ingress{
{
Ingress: *ing,
},
},
}
ing.Spec.Rules[0].Host = "test.example.com"
nginx.command = testNginxTestCommand{
t: t,
err: nil,
expected: "_,test.example.com",
}
if nginx.CheckIngress(ing) != nil {
t.Errorf("with a new ingress without error, no error should be returned")
}
})
t.Run("When nginx test returns an error", func(t *testing.T) {
nginx.command = testNginxTestCommand{
t: t,
err: fmt.Errorf("test error"),
out: []byte("this is the test command output"),
expected: "_,test.example.com",
}
if nginx.CheckIngress(ing) == nil {
t.Errorf("with a new ingress with an error, an error should be returned")
}
})
t.Run("When the ingress is in a different namespace than the watched one", func(t *testing.T) {
nginx.command = testNginxTestCommand{
t: t,
err: fmt.Errorf("test error"),
}
nginx.cfg.Namespace = "other-namespace"
ing.ObjectMeta.Namespace = "test-namespace"
if nginx.CheckIngress(ing) != nil {
t.Errorf("with a new ingress without error, no error should be returned")
}
})
})
}
func TestMergeAlternativeBackends(t *testing.T) {
testCases := map[string]struct {
ingress *ingress.Ingress
@ -930,8 +1137,10 @@ func newNGINXController(t *testing.T) *NGINXController {
}
return &NGINXController{
store: storer,
cfg: config,
store: storer,
cfg: config,
command: NewNginxCommand(),
fileSystem: fs,
}
}

View file

@ -45,9 +45,7 @@ import (
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/filesystem"
adm_controler "k8s.io/ingress-nginx/internal/admission/controller"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
@ -64,6 +62,8 @@ import (
"k8s.io/ingress-nginx/internal/nginx"
"k8s.io/ingress-nginx/internal/task"
"k8s.io/ingress-nginx/internal/watch"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/filesystem"
)
const (
@ -110,6 +110,16 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File
Proxy: &TCPProxy{},
metricCollector: mc,
command: NewNginxCommand(),
}
if n.cfg.ValidationWebhook != "" {
n.validationWebhookServer = &http.Server{
Addr: config.ValidationWebhook,
Handler: adm_controler.NewAdmissionControllerServer(&adm_controler.IngressAdmission{Checker: n}),
TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(),
}
}
pod, err := k8s.GetPodDetails(config.Client)
@ -241,7 +251,7 @@ type NGINXController struct {
// runningConfig contains the running configuration in the Backend
runningConfig *ingress.Configuration
t *ngx_template.Template
t ngx_template.TemplateWriter
resolver []net.IP
@ -258,6 +268,10 @@ type NGINXController struct {
metricCollector metric.Collector
currentLeader uint32
validationWebhookServer *http.Server
command NginxExecTester
}
// Start starts a new NGINX master process running in the foreground.
@ -295,7 +309,7 @@ func (n *NGINXController) Start() {
PodNamespace: n.podInfo.Namespace,
})
cmd := nginxExecCommand()
cmd := n.command.ExecCommand()
// put NGINX in another process group to prevent it
// to receive signals meant for the controller
@ -327,6 +341,13 @@ func (n *NGINXController) Start() {
}
}()
if n.validationWebhookServer != nil {
klog.Infof("Starting validation webhook on %s with keys %s %s", n.validationWebhookServer.Addr, n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath)
go func() {
klog.Error(n.validationWebhookServer.ListenAndServeTLS("", ""))
}()
}
for {
select {
case err := <-n.ngxErrCh:
@ -344,7 +365,7 @@ func (n *NGINXController) Start() {
// release command resources
cmd.Process.Release()
// start a new nginx master process if the controller is not being stopped
cmd = nginxExecCommand()
cmd = n.command.ExecCommand()
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pgid: 0,
@ -391,9 +412,17 @@ func (n *NGINXController) Stop() error {
n.syncStatus.Shutdown()
}
if n.validationWebhookServer != nil {
klog.Info("Stopping admission controller")
err := n.validationWebhookServer.Close()
if err != nil {
return err
}
}
// send stop signal to NGINX
klog.Info("Stopping NGINX process")
cmd := nginxExecCommand("-s", "quit")
cmd := n.command.ExecCommand("-s", "quit")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
@ -437,45 +466,8 @@ func (n NGINXController) DefaultEndpoint() ingress.Endpoint {
}
}
// testTemplate checks if the NGINX configuration inside the byte array is valid
// running the command "nginx -t" using a temporal file.
func (n NGINXController) testTemplate(cfg []byte) error {
if len(cfg) == 0 {
return fmt.Errorf("invalid NGINX configuration (empty)")
}
tmpfile, err := ioutil.TempFile("", tempNginxPattern)
if err != nil {
return err
}
defer tmpfile.Close()
err = ioutil.WriteFile(tmpfile.Name(), cfg, file.ReadWriteByUser)
if err != nil {
return err
}
out, err := nginxTestCommand(tmpfile.Name()).CombinedOutput()
if err != nil {
// this error is different from the rest because it must be clear why nginx is not working
oe := fmt.Sprintf(`
-------------------------------------------------------------------------------
Error: %v
%v
-------------------------------------------------------------------------------
`, err, string(out))
return errors.New(oe)
}
os.Remove(tmpfile.Name())
return nil
}
// OnUpdate is called by the synchronization loop whenever configuration
// changes were detected. The received backend Configuration is merged with the
// configuration ConfigMap before generating the final configuration file.
// Returns nil in case the backend was successfully reloaded.
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
cfg := n.store.GetBackendConfiguration()
cfg.Resolver = n.resolver
// generateTemplate returns the nginx configuration file content
func (n NGINXController) generateTemplate(cfg ngx_config.Configuration, ingressCfg ingress.Configuration) ([]byte, error) {
if n.cfg.EnableSSLPassthrough {
servers := []*TCPServer{}
@ -638,7 +630,50 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
tc.Cfg.Checksum = ingressCfg.ConfigurationChecksum
content, err := n.t.Write(tc)
return n.t.Write(tc)
}
// testTemplate checks if the NGINX configuration inside the byte array is valid
// running the command "nginx -t" using a temporal file.
func (n NGINXController) testTemplate(cfg []byte) error {
if len(cfg) == 0 {
return fmt.Errorf("invalid NGINX configuration (empty)")
}
tmpfile, err := ioutil.TempFile("", tempNginxPattern)
if err != nil {
return err
}
defer tmpfile.Close()
err = ioutil.WriteFile(tmpfile.Name(), cfg, file.ReadWriteByUser)
if err != nil {
return err
}
out, err := n.command.Test(tmpfile.Name())
if err != nil {
// this error is different from the rest because it must be clear why nginx is not working
oe := fmt.Sprintf(`
-------------------------------------------------------------------------------
Error: %v
%v
-------------------------------------------------------------------------------
`, err, string(out))
return errors.New(oe)
}
os.Remove(tmpfile.Name())
return nil
}
// OnUpdate is called by the synchronization loop whenever configuration
// changes were detected. The received backend Configuration is merged with the
// configuration ConfigMap before generating the final configuration file.
// Returns nil in case the backend was successfully reloaded.
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
cfg := n.store.GetBackendConfiguration()
cfg.Resolver = n.resolver
content, err := n.generateTemplate(cfg, ingressCfg)
if err != nil {
return err
}
@ -686,7 +721,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
return err
}
o, err := nginxExecCommand("-s", "reload").CombinedOutput()
o, err := n.command.ExecCommand("-s", "reload").CombinedOutput()
if err != nil {
return fmt.Errorf("%v\n%v", err, string(o))
}

View file

@ -51,6 +51,11 @@ const (
defBufferSize = 65535
)
// TemplateWriter is the interface to render a template
type TemplateWriter interface {
Write(conf config.TemplateConfig) ([]byte, error)
}
// Template ...
type Template struct {
tmpl *text_template.Template

View file

@ -21,13 +21,11 @@ import (
"os/exec"
"syscall"
"k8s.io/klog"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/sysctl"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/sysctl"
)
// newUpstream creates an upstream without servers.
@ -79,14 +77,36 @@ const (
cfgPath = "/etc/nginx/nginx.conf"
)
func nginxExecCommand(args ...string) *exec.Cmd {
// NginxExecTester defines the interface to execute
// command like reload or test configuration
type NginxExecTester interface {
ExecCommand(args ...string) *exec.Cmd
Test(cfg string) ([]byte, error)
}
// NginxCommand stores context around a given nginx executable path
type NginxCommand struct {
Binary string
}
// NewNginxCommand returns a new NginxCommand from which path
// has been detected from environment variable NGINX_BINARY or default
func NewNginxCommand() NginxCommand {
return NginxCommand{
Binary: defBinary,
}
}
// ExecCommand instanciates an exec.Cmd object to call nginx program
func (nc NginxCommand) ExecCommand(args ...string) *exec.Cmd {
cmdArgs := []string{}
cmdArgs = append(cmdArgs, "-c", cfgPath)
cmdArgs = append(cmdArgs, args...)
return exec.Command(defBinary, cmdArgs...)
return exec.Command(nc.Binary, cmdArgs...)
}
func nginxTestCommand(cfg string) *exec.Cmd {
return exec.Command(defBinary, "-c", cfg, "-t")
// Test checks if config file is a syntax valid nginx configuration
func (nc NginxCommand) Test(cfg string) ([]byte, error) {
return exec.Command(nc.Binary, "-c", cfg, "-t").CombinedOutput()
}