post data to Lua only if it changes

This commit is contained in:
Elvin Efendi 2019-08-15 14:57:51 -04:00
parent b5fecd0dc8
commit 05c889335d
3 changed files with 167 additions and 89 deletions

View file

@ -27,6 +27,7 @@ import (
"os"
"os/exec"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
@ -849,10 +850,104 @@ func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configurati
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
func configureDynamically(pcfg *ingress.Configuration) error {
backends := make([]*ingress.Backend, len(pcfg.Backends))
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {
backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)
if backendsChanged {
err := configureBackends(pcfg.Backends)
if err != nil {
return err
}
}
for i, backend := range pcfg.Backends {
streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)
if streamConfigurationChanged {
err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)
if err != nil {
return err
}
}
if n.runningConfig.ControllerPodsCount != pcfg.ControllerPodsCount {
statusCode, _, err := nginx.NewPostStatusRequest("/configuration/general", "application/json", ingress.GeneralConfig{
ControllerPodsCount: pcfg.ControllerPodsCount,
})
if err != nil {
return err
}
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
}
serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)
if serversChanged {
err := configureCertificates(pcfg.Servers)
if err != nil {
return err
}
}
return nil
}
func updateStreamConfiguration(TCPEndpoints []ingress.L4Service, UDPEndpoints []ingress.L4Service) error {
streams := make([]ingress.Backend, 0)
for _, ep := range TCPEndpoints {
var service *apiv1.Service
if ep.Service != nil {
service = &apiv1.Service{Spec: ep.Service.Spec}
}
key := fmt.Sprintf("tcp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String())
streams = append(streams, ingress.Backend{
Name: key,
Endpoints: ep.Endpoints,
Port: intstr.FromInt(ep.Port),
Service: service,
})
}
for _, ep := range UDPEndpoints {
var service *apiv1.Service
if ep.Service != nil {
service = &apiv1.Service{Spec: ep.Service.Spec}
}
key := fmt.Sprintf("udp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String())
streams = append(streams, ingress.Backend{
Name: key,
Endpoints: ep.Endpoints,
Port: intstr.FromInt(ep.Port),
Service: service,
})
}
conn, err := net.Dial("unix", nginx.StreamSocket)
if err != nil {
return err
}
defer conn.Close()
buf, err := json.Marshal(streams)
if err != nil {
return err
}
_, err = conn.Write(buf)
if err != nil {
return err
}
_, err = fmt.Fprintf(conn, "\r\n")
if err != nil {
return err
}
return nil
}
func configureBackends(rawBackends []*ingress.Backend) error {
backends := make([]*ingress.Backend, len(rawBackends))
for i, backend := range rawBackends {
var service *apiv1.Service
if backend.Service != nil {
service = &apiv1.Service{Spec: backend.Service.Spec}
@ -891,90 +986,15 @@ func configureDynamically(pcfg *ingress.Configuration) error {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
streams := make([]ingress.Backend, 0)
for _, ep := range pcfg.TCPEndpoints {
var service *apiv1.Service
if ep.Service != nil {
service = &apiv1.Service{Spec: ep.Service.Spec}
}
key := fmt.Sprintf("tcp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String())
streams = append(streams, ingress.Backend{
Name: key,
Endpoints: ep.Endpoints,
Port: intstr.FromInt(ep.Port),
Service: service,
})
}
for _, ep := range pcfg.UDPEndpoints {
var service *apiv1.Service
if ep.Service != nil {
service = &apiv1.Service{Spec: ep.Service.Spec}
}
key := fmt.Sprintf("udp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String())
streams = append(streams, ingress.Backend{
Name: key,
Endpoints: ep.Endpoints,
Port: intstr.FromInt(ep.Port),
Service: service,
})
}
err = updateStreamConfiguration(streams)
if err != nil {
return err
}
statusCode, _, err = nginx.NewPostStatusRequest("/configuration/general", "application/json", ingress.GeneralConfig{
ControllerPodsCount: pcfg.ControllerPodsCount,
})
if err != nil {
return err
}
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
err = configureCertificates(pcfg)
if err != nil {
return err
}
return nil
}
func updateStreamConfiguration(streams []ingress.Backend) error {
conn, err := net.Dial("unix", nginx.StreamSocket)
if err != nil {
return err
}
defer conn.Close()
buf, err := json.Marshal(streams)
if err != nil {
return err
}
_, err = conn.Write(buf)
if err != nil {
return err
}
_, err = fmt.Fprintf(conn, "\r\n")
if err != nil {
return err
}
return nil
}
// configureCertificates JSON encodes certificates and POSTs it to an internal HTTP endpoint
// that is handled by Lua
func configureCertificates(pcfg *ingress.Configuration) error {
func configureCertificates(rawServers []*ingress.Server) error {
servers := make([]*ingress.Server, 0)
for _, server := range pcfg.Servers {
for _, server := range rawServers {
if server.SSLCert == nil {
continue
}
@ -996,7 +1016,7 @@ func configureCertificates(pcfg *ingress.Configuration) error {
}
}
redirects := buildRedirects(pcfg.Servers)
redirects := buildRedirects(rawServers)
for _, redirect := range redirects {
if redirect.SSLCert == nil {
continue