Improve configuration change detection (#2656)
* Use information about the configuration configmap to determine changes * Add hashstructure dependency * Rename queue functions * Add test for configmap checksum
This commit is contained in:
parent
a6978a873b
commit
aec40c171f
15 changed files with 564 additions and 42 deletions
|
|
@ -506,6 +506,9 @@ type Configuration struct {
|
|||
// http://github.com/influxdata/nginx-influxdb-module/
|
||||
// By default this is disabled
|
||||
EnableInfluxDB bool `json:"enable-influxdb"`
|
||||
|
||||
// Checksum contains a checksum of the configmap configuration
|
||||
Checksum string `json:"-"`
|
||||
}
|
||||
|
||||
// NewDefault returns the default nginx configuration
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import (
|
|||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
|
@ -155,14 +154,16 @@ func (n *NGINXController) syncIngress(interface{}) error {
|
|||
TCPEndpoints: n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
|
||||
UDPEndpoints: n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
|
||||
PassthroughBackends: passUpstreams,
|
||||
|
||||
ConfigurationChecksum: n.store.GetBackendConfiguration().Checksum,
|
||||
}
|
||||
|
||||
if !n.isForceReload() && n.runningConfig.Equal(&pcfg) {
|
||||
if n.runningConfig.Equal(&pcfg) {
|
||||
glog.V(3).Infof("No configuration change detected, skipping backend reload.")
|
||||
return nil
|
||||
}
|
||||
|
||||
if n.cfg.DynamicConfigurationEnabled && n.IsDynamicConfigurationEnough(&pcfg) && !n.isForceReload() {
|
||||
if n.cfg.DynamicConfigurationEnabled && n.IsDynamicConfigurationEnough(&pcfg) {
|
||||
glog.Infof("Changes handled by the dynamic configuration, skipping backend reload.")
|
||||
} else {
|
||||
glog.Infof("Configuration changes detected, backend reload required.")
|
||||
|
|
@ -200,7 +201,6 @@ func (n *NGINXController) syncIngress(interface{}) error {
|
|||
}
|
||||
|
||||
n.runningConfig = &pcfg
|
||||
n.SetForceReload(false)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -1048,21 +1048,6 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
|
|||
return servers
|
||||
}
|
||||
|
||||
func (n *NGINXController) isForceReload() bool {
|
||||
return atomic.LoadInt32(&n.forceReload) != 0
|
||||
}
|
||||
|
||||
// SetForceReload sets whether the backend should be reloaded regardless of
|
||||
// configuration changes.
|
||||
func (n *NGINXController) SetForceReload(shouldReload bool) {
|
||||
if shouldReload {
|
||||
atomic.StoreInt32(&n.forceReload, 1)
|
||||
n.syncQueue.Enqueue(&extensions.Ingress{})
|
||||
} else {
|
||||
atomic.StoreInt32(&n.forceReload, 0)
|
||||
}
|
||||
}
|
||||
|
||||
// extractTLSSecretName returns the name of the Secret containing a SSL
|
||||
// certificate for the given host name, or an empty string.
|
||||
func extractTLSSecretName(host string, ing *extensions.Ingress,
|
||||
|
|
|
|||
|
|
@ -38,7 +38,6 @@ import (
|
|||
proxyproto "github.com/armon/go-proxyproto"
|
||||
"github.com/eapache/channels"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
|
@ -153,7 +152,7 @@ Error loading new template: %v
|
|||
|
||||
n.t = template
|
||||
glog.Info("New NGINX configuration template loaded.")
|
||||
n.SetForceReload(true)
|
||||
n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
|
||||
}
|
||||
|
||||
ngxTpl, err := ngx_template.NewTemplate(tmplPath, fs)
|
||||
|
|
@ -194,7 +193,7 @@ Error loading new template: %v
|
|||
for _, f := range filesToWatch {
|
||||
_, err = watch.NewFileWatcher(f, func() {
|
||||
glog.Info("File %v changed. Reloading NGINX", f)
|
||||
n.SetForceReload(true)
|
||||
n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
|
||||
})
|
||||
if err != nil {
|
||||
glog.Fatalf("Error creating file watcher for %v: %v", f, err)
|
||||
|
|
@ -232,8 +231,6 @@ type NGINXController struct {
|
|||
// runningConfig contains the running configuration in the Backend
|
||||
runningConfig *ingress.Configuration
|
||||
|
||||
forceReload int32
|
||||
|
||||
t *ngx_template.Template
|
||||
|
||||
resolver []net.IP
|
||||
|
|
@ -278,7 +275,7 @@ func (n *NGINXController) Start() {
|
|||
|
||||
go n.syncQueue.Run(time.Second, n.stopCh)
|
||||
// force initial sync
|
||||
n.syncQueue.Enqueue(&extensions.Ingress{})
|
||||
n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))
|
||||
|
||||
for {
|
||||
select {
|
||||
|
|
@ -311,10 +308,12 @@ func (n *NGINXController) Start() {
|
|||
if evt, ok := event.(store.Event); ok {
|
||||
glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
|
||||
if evt.Type == store.ConfigurationEvent {
|
||||
n.SetForceReload(true)
|
||||
// TODO: is this necessary? Consider removing this special case
|
||||
n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
|
||||
continue
|
||||
}
|
||||
|
||||
n.syncQueue.Enqueue(evt.Obj)
|
||||
n.syncQueue.EnqueueSkippableTask(evt.Obj)
|
||||
} else {
|
||||
glog.Warningf("Unexpected event type received %T", event)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"github.com/mitchellh/hashstructure"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
|
|
@ -191,6 +192,15 @@ func ReadConfig(src map[string]string) config.Configuration {
|
|||
glog.Warningf("unexpected error merging defaults: %v", err)
|
||||
}
|
||||
|
||||
hash, err := hashstructure.Hash(to, &hashstructure.HashOptions{
|
||||
TagName: "json",
|
||||
})
|
||||
if err != nil {
|
||||
glog.Warningf("unexpected error obtaining hash: %v", err)
|
||||
}
|
||||
|
||||
to.Checksum = fmt.Sprintf("%v", hash)
|
||||
|
||||
return to
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,11 +17,13 @@ limitations under the License.
|
|||
package template
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/kylelemons/godebug/pretty"
|
||||
"github.com/mitchellh/hashstructure"
|
||||
|
||||
"k8s.io/ingress-nginx/internal/ingress/controller/config"
|
||||
)
|
||||
|
|
@ -88,6 +90,14 @@ func TestMergeConfigMapToStruct(t *testing.T) {
|
|||
def.NginxStatusIpv6Whitelist = []string{"::1", "2001::/16"}
|
||||
def.ProxyAddOriginalUriHeader = false
|
||||
|
||||
hash, err := hashstructure.Hash(def, &hashstructure.HashOptions{
|
||||
TagName: "json",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error obtaining hash: %v", err)
|
||||
}
|
||||
def.Checksum = fmt.Sprintf("%v", hash)
|
||||
|
||||
to := ReadConfig(conf)
|
||||
if diff := pretty.Compare(to, def); diff != "" {
|
||||
t.Errorf("unexpected diff: (-got +want)\n%s", diff)
|
||||
|
|
@ -107,6 +117,14 @@ func TestMergeConfigMapToStruct(t *testing.T) {
|
|||
}
|
||||
|
||||
def = config.NewDefault()
|
||||
hash, err = hashstructure.Hash(def, &hashstructure.HashOptions{
|
||||
TagName: "json",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error obtaining hash: %v", err)
|
||||
}
|
||||
def.Checksum = fmt.Sprintf("%v", hash)
|
||||
|
||||
to = ReadConfig(map[string]string{})
|
||||
if diff := pretty.Compare(to, def); diff != "" {
|
||||
t.Errorf("unexpected diff: (-got +want)\n%s", diff)
|
||||
|
|
@ -114,6 +132,15 @@ func TestMergeConfigMapToStruct(t *testing.T) {
|
|||
|
||||
def = config.NewDefault()
|
||||
def.WhitelistSourceRange = []string{"1.1.1.1/32"}
|
||||
|
||||
hash, err = hashstructure.Hash(def, &hashstructure.HashOptions{
|
||||
TagName: "json",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error obtaining hash: %v", err)
|
||||
}
|
||||
def.Checksum = fmt.Sprintf("%v", hash)
|
||||
|
||||
to = ReadConfig(map[string]string{
|
||||
"whitelist-source-range": "1.1.1.1/32",
|
||||
})
|
||||
|
|
|
|||
|
|
@ -65,6 +65,9 @@ type Configuration struct {
|
|||
// It contains information about the associated Server Name Indication (SNI).
|
||||
// +optional
|
||||
PassthroughBackends []*SSLPassthroughBackend `json:"passthroughBackends,omitempty"`
|
||||
|
||||
// ConfigurationChecksum contains the particular checksum of a Configuration object
|
||||
ConfigurationChecksum string `json:"configurationChecksum,omitempty"`
|
||||
}
|
||||
|
||||
// Backend describes one or more remote server/s (endpoints) associated with a service
|
||||
|
|
|
|||
|
|
@ -104,6 +104,10 @@ func (c1 *Configuration) Equal(c2 *Configuration) bool {
|
|||
}
|
||||
}
|
||||
|
||||
if c1.ConfigurationChecksum != c2.ConfigurationChecksum {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
|
@ -50,23 +51,39 @@ type Queue struct {
|
|||
|
||||
// Element represents one item of the queue
|
||||
type Element struct {
|
||||
Key interface{}
|
||||
Timestamp int64
|
||||
Key interface{}
|
||||
Timestamp int64
|
||||
IsSkippable bool
|
||||
}
|
||||
|
||||
// Run ...
|
||||
// Run starts processing elements in the queue
|
||||
func (t *Queue) Run(period time.Duration, stopCh <-chan struct{}) {
|
||||
wait.Until(t.worker, period, stopCh)
|
||||
}
|
||||
|
||||
// Enqueue enqueues ns/name of the given api object in the task queue.
|
||||
func (t *Queue) Enqueue(obj interface{}) {
|
||||
// EnqueueTask enqueues ns/name of the given api object in the task queue.
|
||||
func (t *Queue) EnqueueTask(obj interface{}) {
|
||||
t.enqueue(obj, false)
|
||||
}
|
||||
|
||||
// EnqueueSkippableTask enqueues ns/name of the given api object in
|
||||
// the task queue that can be skipped
|
||||
func (t *Queue) EnqueueSkippableTask(obj interface{}) {
|
||||
t.enqueue(obj, true)
|
||||
}
|
||||
|
||||
// enqueue enqueues ns/name of the given api object in the task queue.
|
||||
func (t *Queue) enqueue(obj interface{}, skippable bool) {
|
||||
if t.IsShuttingDown() {
|
||||
glog.Errorf("queue has been shutdown, failed to enqueue: %v", obj)
|
||||
return
|
||||
}
|
||||
|
||||
ts := time.Now().UnixNano()
|
||||
if !skippable {
|
||||
// make sure the timestamp is bigger than lastSync
|
||||
ts = time.Now().Add(24 * time.Hour).UnixNano()
|
||||
}
|
||||
glog.V(3).Infof("queuing item %v", obj)
|
||||
key, err := t.fn(obj)
|
||||
if err != nil {
|
||||
|
|
@ -166,3 +183,10 @@ func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (in
|
|||
|
||||
return q
|
||||
}
|
||||
|
||||
// GetDummyObject returns a valid object that can be used in the Queue
|
||||
func GetDummyObject(name string) *metav1.ObjectMeta {
|
||||
return &metav1.ObjectMeta{
|
||||
Name: name,
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ func TestEnqueueSuccess(t *testing.T) {
|
|||
k: "testKey",
|
||||
v: "testValue",
|
||||
}
|
||||
q.Enqueue(mo)
|
||||
q.EnqueueSkippableTask(mo)
|
||||
// wait for 'mockSynFn'
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
if atomic.LoadUint32(&sr) != 1 {
|
||||
|
|
@ -99,7 +99,7 @@ func TestEnqueueFailed(t *testing.T) {
|
|||
q.Shutdown()
|
||||
// wait for shutdown
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
q.Enqueue(mo)
|
||||
q.EnqueueSkippableTask(mo)
|
||||
// wait for 'mockSynFn'
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
// queue is shutdown, so mockSynFn should not be executed, so the result should be 0
|
||||
|
|
@ -121,7 +121,7 @@ func TestEnqueueKeyError(t *testing.T) {
|
|||
v: "testValue",
|
||||
}
|
||||
|
||||
q.Enqueue(mo)
|
||||
q.EnqueueSkippableTask(mo)
|
||||
// wait for 'mockSynFn'
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
// key error, so the result should be 0
|
||||
|
|
@ -142,16 +142,16 @@ func TestSkipEnqueue(t *testing.T) {
|
|||
k: "testKey",
|
||||
v: "testValue",
|
||||
}
|
||||
q.Enqueue(mo)
|
||||
q.Enqueue(mo)
|
||||
q.Enqueue(mo)
|
||||
q.Enqueue(mo)
|
||||
q.EnqueueSkippableTask(mo)
|
||||
q.EnqueueSkippableTask(mo)
|
||||
q.EnqueueTask(mo)
|
||||
q.EnqueueSkippableTask(mo)
|
||||
// run queue
|
||||
go q.Run(time.Second, stopCh)
|
||||
// wait for 'mockSynFn'
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
if atomic.LoadUint32(&sr) != 1 {
|
||||
t.Errorf("sr should be 1, but is %d", sr)
|
||||
if atomic.LoadUint32(&sr) != 2 {
|
||||
t.Errorf("sr should be 2, but is %d", sr)
|
||||
}
|
||||
|
||||
// shutdown queue before exit
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue