Use a ring channel to avoid blocking write of events (#2082)

* Use a ring channel to avoid blocking write of events

* Add eapache/channels dependency
This commit is contained in:
Manuel Alejandro de Brito Fontes 2018-02-13 17:46:18 -08:00 committed by GitHub
parent 33475b7184
commit 9bcb5b08ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 2833 additions and 78 deletions

View file

@ -226,7 +226,7 @@ func (s k8sStore) ReadSecrets(ing *extensions.Ingress) {
// sendDummyEvent sends a dummy event to trigger an update
// This is used in when a secret change
func (s *k8sStore) sendDummyEvent() {
s.updateCh <- Event{
s.updateCh.In() <- Event{
Type: UpdateEvent,
Obj: &extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{

View file

@ -24,6 +24,7 @@ import (
"sync"
"time"
"github.com/eapache/channels"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
@ -194,7 +195,7 @@ type k8sStore struct {
filesystem file.Filesystem
// updateCh
updateCh chan Event
updateCh *channels.RingChannel
// mu mutex used to avoid simultaneous incovations to syncSecret
mu *sync.Mutex
@ -208,7 +209,7 @@ func New(checkOCSP bool,
resyncPeriod time.Duration,
client clientset.Interface,
fs file.Filesystem,
updateCh chan Event) Storer {
updateCh *channels.RingChannel) Storer {
store := &k8sStore{
isOCSPCheckEnabled: checkOCSP,
@ -246,7 +247,7 @@ func New(checkOCSP bool,
store.extractAnnotations(addIng)
recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
updateCh <- Event{
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
@ -272,7 +273,7 @@ func New(checkOCSP bool,
}
recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
store.listers.IngressAnnotation.Delete(delIng)
updateCh <- Event{
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
@ -293,7 +294,7 @@ func New(checkOCSP bool,
}
store.extractAnnotations(curIng)
updateCh <- Event{
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
@ -312,7 +313,7 @@ func New(checkOCSP bool,
_, err := store.GetLocalSecret(k8s.MetaNamespaceKey(sec))
if err == nil {
store.syncSecret(key)
updateCh <- Event{
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
@ -323,7 +324,7 @@ func New(checkOCSP bool,
store.extractAnnotations(ing)
}
updateCh <- Event{
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
}
@ -346,7 +347,7 @@ func New(checkOCSP bool,
}
}
store.sslStore.Delete(k8s.MetaNamespaceKey(sec))
updateCh <- Event{
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
@ -362,7 +363,7 @@ func New(checkOCSP bool,
}
}
updateCh <- Event{
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: sec,
}
@ -372,13 +373,13 @@ func New(checkOCSP bool,
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
updateCh <- Event{
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
DeleteFunc: func(obj interface{}) {
updateCh <- Event{
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
@ -387,7 +388,7 @@ func New(checkOCSP bool,
oep := old.(*apiv1.Endpoints)
ocur := cur.(*apiv1.Endpoints)
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
updateCh <- Event{
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
@ -402,7 +403,7 @@ func New(checkOCSP bool,
if mapKey == configmap {
glog.V(2).Infof("adding configmap %v to backend", mapKey)
store.setConfig(m)
updateCh <- Event{
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: obj,
}
@ -415,7 +416,7 @@ func New(checkOCSP bool,
if mapKey == configmap {
recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
store.setConfig(m)
updateCh <- Event{
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
}
@ -423,7 +424,7 @@ func New(checkOCSP bool,
// updates to configuration configmaps can trigger an update
if mapKey == tcp || mapKey == udp {
recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
updateCh <- Event{
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
}

View file

@ -23,6 +23,7 @@ import (
"testing"
"time"
"github.com/eapache/channels"
apiv1 "k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
extensions "k8s.io/api/extensions/v1beta1"
@ -56,11 +57,11 @@ func TestStore(t *testing.T) {
defer deleteNamespace(ns, clientSet, t)
stopCh := make(chan struct{})
updateCh := make(chan Event, 1024)
updateCh := channels.NewRingChannel(1024)
go func(ch chan Event) {
go func(ch *channels.RingChannel) {
for {
<-ch
<-ch.Out()
}
}(updateCh)
@ -111,7 +112,7 @@ func TestStore(t *testing.T) {
t.Errorf("expected an Ingres but none returned")
}
close(updateCh)
updateCh.Close()
close(stopCh)
})
@ -120,19 +121,20 @@ func TestStore(t *testing.T) {
defer deleteNamespace(ns, clientSet, t)
stopCh := make(chan struct{})
updateCh := make(chan Event, 1024)
updateCh := channels.NewRingChannel(1024)
var add uint64
var upd uint64
var del uint64
go func(ch chan Event) {
go func(ch *channels.RingChannel) {
for {
e, ok := <-ch
evt, ok := <-ch.Out()
if !ok {
return
}
e := evt.(Event)
if e.Obj == nil {
continue
}
@ -254,7 +256,7 @@ func TestStore(t *testing.T) {
t.Errorf("expected 1 event of type Delete but %v occurred", del)
}
close(updateCh)
updateCh.Close()
close(stopCh)
})
@ -263,19 +265,20 @@ func TestStore(t *testing.T) {
defer deleteNamespace(ns, clientSet, t)
stopCh := make(chan struct{})
updateCh := make(chan Event, 1024)
updateCh := channels.NewRingChannel(1024)
var add uint64
var upd uint64
var del uint64
go func(ch chan Event) {
go func(ch *channels.RingChannel) {
for {
e, ok := <-ch
evt, ok := <-ch.Out()
if !ok {
return
}
e := evt.(Event)
if e.Obj == nil {
continue
}
@ -339,7 +342,7 @@ func TestStore(t *testing.T) {
t.Errorf("expected 1 events of type Delete but %v occurred", del)
}
close(updateCh)
updateCh.Close()
close(stopCh)
})
@ -348,19 +351,20 @@ func TestStore(t *testing.T) {
defer deleteNamespace(ns, clientSet, t)
stopCh := make(chan struct{})
updateCh := make(chan Event, 1024)
updateCh := channels.NewRingChannel(1024)
var add uint64
var upd uint64
var del uint64
go func(ch <-chan Event) {
go func(ch *channels.RingChannel) {
for {
e, ok := <-ch
evt, ok := <-ch.Out()
if !ok {
return
}
e := evt.(Event)
if e.Obj == nil {
continue
}
@ -478,7 +482,7 @@ func TestStore(t *testing.T) {
}
})
close(updateCh)
updateCh.Close()
close(stopCh)
})