support watch namespaces matched namespace selector (#7472)

skip caching namespaces at cluster scope if only watching single namespace

add --watch-namespace-selector in user guide

add e2e test
This commit is contained in:
zryfish 2021-11-13 03:46:28 +08:00 committed by GitHub
parent 67e13bf692
commit 7203a0b8bd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 461 additions and 19 deletions

View file

@ -27,6 +27,7 @@ import (
apiv1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
@ -67,6 +68,8 @@ type Configuration struct {
Namespace string
WatchNamespaceSelector labels.Selector
// +optional
TCPConfigMapName string
// +optional

View file

@ -36,6 +36,7 @@ import (
v1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/ingress-nginx/internal/file"
@ -2378,6 +2379,7 @@ func newNGINXController(t *testing.T) *NGINXController {
storer := store.New(
ns,
labels.Nothing(),
fmt.Sprintf("%v/config", ns),
fmt.Sprintf("%v/tcp", ns),
fmt.Sprintf("%v/udp", ns),
@ -2441,6 +2443,7 @@ func newDynamicNginxController(t *testing.T, setConfigMap func(string) *v1.Confi
storer := store.New(
ns,
labels.Nothing(),
fmt.Sprintf("%v/config", ns),
fmt.Sprintf("%v/tcp", ns),
fmt.Sprintf("%v/udp", ns),

View file

@ -122,6 +122,7 @@ func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXContro
n.store = store.New(
config.Namespace,
config.WatchNamespaceSelector,
config.ConfigMapName,
config.TCPConfigMapName,
config.UDPConfigMapName,

View file

@ -0,0 +1,39 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
// NamespaceLister makes a Store that lists Namespaces.
type NamespaceLister struct {
cache.Store
}
// ByKey returns the Namespace matching key in the local Namespace Store.
func (cml *NamespaceLister) ByKey(key string) (*apiv1.Namespace, error) {
s, exists, err := cml.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, NotExistsError(key)
}
return s.(*apiv1.Namespace), nil
}

View file

@ -32,6 +32,7 @@ import (
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -127,6 +128,7 @@ type Informer struct {
Service cache.SharedIndexInformer
Secret cache.SharedIndexInformer
ConfigMap cache.SharedIndexInformer
Namespace cache.SharedIndexInformer
}
// Lister contains object listers (stores).
@ -137,6 +139,7 @@ type Lister struct {
Endpoint EndpointLister
Secret SecretLister
ConfigMap ConfigMapLister
Namespace NamespaceLister
IngressWithAnnotation IngressWithAnnotationsLister
}
@ -172,6 +175,15 @@ func (i *Informer) Run(stopCh chan struct{}) {
runtime.HandleError(fmt.Errorf("timed out waiting for ingress classcaches to sync"))
}
// when limit controller scope to one namespace, skip sync namespaces at cluster scope
if i.Namespace != nil {
go i.Namespace.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, i.Namespace.HasSynced) {
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
}
}
// in big clusters, deltas can keep arriving even after HasSynced
// functions have returned 'true'
time.Sleep(1 * time.Second)
@ -225,7 +237,9 @@ type k8sStore struct {
// New creates a new object store to be used in the ingress controller
func New(
namespace, configmap, tcp, udp, defaultSSLCertificate string,
namespace string,
namespaceSelector labels.Selector,
configmap, tcp, udp, defaultSSLCertificate string,
resyncPeriod time.Duration,
client clientset.Interface,
updateCh *channels.RingChannel,
@ -322,6 +336,35 @@ func New(
store.informers.Service = infFactory.Core().V1().Services().Informer()
store.listers.Service.Store = store.informers.Service.GetStore()
// avoid caching namespaces at cluster scope when watching single namespace
if namespaceSelector != nil && !namespaceSelector.Empty() {
// cache informers factory for namespaces
infFactoryNamespaces := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
informers.WithTweakListOptions(labelsTweakListOptionsFunc),
)
store.informers.Namespace = infFactoryNamespaces.Core().V1().Namespaces().Informer()
store.listers.Namespace.Store = store.informers.Namespace.GetStore()
}
watchedNamespace := func(namespace string) bool {
if namespaceSelector == nil || namespaceSelector.Empty() {
return true
}
item, ok, err := store.listers.Namespace.GetByKey(namespace)
if !ok {
klog.Errorf("Namespace %s not existed: %v.", namespace, err)
return false
}
ns, ok := item.(*corev1.Namespace)
if !ok {
return false
}
return namespaceSelector.Matches(labels.Set(ns.Labels))
}
ingDeleteHandler := func(obj interface{}) {
ing, ok := toIngress(obj)
if !ok {
@ -338,6 +381,10 @@ func New(
}
}
if !watchedNamespace(ing.Namespace) {
return
}
_, err := store.GetIngressClass(ing, icConfig)
if err != nil {
klog.InfoS("Ignoring ingress because of error while validating ingress class", "ingress", klog.KObj(ing), "error", err)
@ -363,6 +410,11 @@ func New(
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ing, _ := toIngress(obj)
if !watchedNamespace(ing.Namespace) {
return
}
ic, err := store.GetIngressClass(ing, icConfig)
if err != nil {
klog.InfoS("Ignoring ingress because of error while validating ingress class", "ingress", klog.KObj(ing), "error", err)
@ -392,6 +444,10 @@ func New(
oldIng, _ := toIngress(old)
curIng, _ := toIngress(cur)
if !watchedNamespace(oldIng.Namespace) {
return
}
var errOld, errCur error
var classCur string
if !icConfig.IgnoreIngressClass {
@ -528,6 +584,10 @@ func New(
sec := cur.(*corev1.Secret)
key := k8s.MetaNamespaceKey(sec)
if !watchedNamespace(sec.Namespace) {
return
}
if store.defaultSSLCertificate == key {
store.syncSecret(store.defaultSSLCertificate)
}
@ -566,6 +626,10 @@ func New(
}
}
if !watchedNamespace(sec.Namespace) {
return
}
store.sslStore.Delete(k8s.MetaNamespaceKey(sec))
key := k8s.MetaNamespaceKey(sec)

View file

@ -31,6 +31,7 @@ import (
networking "k8s.io/api/networking/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/envtest"
@ -89,6 +90,8 @@ func TestStore(t *testing.T) {
t.Fatalf("error: %v", err)
}
emptySelector, _ := labels.Parse("")
defer te.Stop()
clientSet, err := kubernetes.NewForConfig(cfg)
@ -112,6 +115,7 @@ func TestStore(t *testing.T) {
storer := New(
ns,
emptySelector,
fmt.Sprintf("%v/config", ns),
fmt.Sprintf("%v/tcp", ns),
fmt.Sprintf("%v/udp", ns),
@ -191,6 +195,7 @@ func TestStore(t *testing.T) {
storer := New(
ns,
emptySelector,
fmt.Sprintf("%v/config", ns),
fmt.Sprintf("%v/tcp", ns),
fmt.Sprintf("%v/udp", ns),
@ -293,6 +298,7 @@ func TestStore(t *testing.T) {
storer := New(
ns,
emptySelector,
fmt.Sprintf("%v/config", ns),
fmt.Sprintf("%v/tcp", ns),
fmt.Sprintf("%v/udp", ns),
@ -407,6 +413,7 @@ func TestStore(t *testing.T) {
storer := New(
ns,
emptySelector,
fmt.Sprintf("%v/config", ns),
fmt.Sprintf("%v/tcp", ns),
fmt.Sprintf("%v/udp", ns),
@ -535,6 +542,7 @@ func TestStore(t *testing.T) {
storer := New(
ns,
emptySelector,
fmt.Sprintf("%v/config", ns),
fmt.Sprintf("%v/tcp", ns),
fmt.Sprintf("%v/udp", ns),
@ -633,6 +641,7 @@ func TestStore(t *testing.T) {
storer := New(
ns,
emptySelector,
fmt.Sprintf("%v/config", ns),
fmt.Sprintf("%v/tcp", ns),
fmt.Sprintf("%v/udp", ns),
@ -725,6 +734,7 @@ func TestStore(t *testing.T) {
storer := New(
ns,
emptySelector,
fmt.Sprintf("%v/config", ns),
fmt.Sprintf("%v/tcp", ns),
fmt.Sprintf("%v/udp", ns),
@ -809,6 +819,7 @@ func TestStore(t *testing.T) {
storer := New(
ns,
emptySelector,
fmt.Sprintf("%v/config", ns),
fmt.Sprintf("%v/tcp", ns),
fmt.Sprintf("%v/udp", ns),
@ -903,6 +914,7 @@ func TestStore(t *testing.T) {
storer := New(
ns,
emptySelector,
fmt.Sprintf("%v/config", ns),
fmt.Sprintf("%v/tcp", ns),
fmt.Sprintf("%v/udp", ns),
@ -1025,6 +1037,7 @@ func TestStore(t *testing.T) {
storer := New(
ns,
emptySelector,
fmt.Sprintf("%v/config", ns),
fmt.Sprintf("%v/tcp", ns),
fmt.Sprintf("%v/udp", ns),
@ -1107,6 +1120,102 @@ func TestStore(t *testing.T) {
}
})
t.Run("should not receive events whose namespace doesn't match watch namespace selector", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
createConfigMap(clientSet, ns, t)
stopCh := make(chan struct{})
updateCh := channels.NewRingChannel(1024)
var add uint64
var upd uint64
var del uint64
go func(ch *channels.RingChannel) {
for {
evt, ok := <-ch.Out()
if !ok {
return
}
e := evt.(Event)
if e.Obj == nil {
continue
}
switch e.Type {
case CreateEvent:
atomic.AddUint64(&add, 1)
case UpdateEvent:
atomic.AddUint64(&upd, 1)
case DeleteEvent:
atomic.AddUint64(&del, 1)
}
}
}(updateCh)
namesapceSelector, _ := labels.Parse("foo=bar")
storer := New(
ns,
namesapceSelector,
fmt.Sprintf("%v/config", ns),
fmt.Sprintf("%v/tcp", ns),
fmt.Sprintf("%v/udp", ns),
"",
10*time.Minute,
clientSet,
updateCh,
false,
DefaultClassConfig)
storer.Run(stopCh)
ing := ensureIngress(&networking.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "dummy",
Namespace: ns,
},
Spec: networking.IngressSpec{
Rules: []networking.IngressRule{
{
Host: "dummy",
IngressRuleValue: networking.IngressRuleValue{
HTTP: &networking.HTTPIngressRuleValue{
Paths: []networking.HTTPIngressPath{
{
Path: "/",
PathType: &pathPrefix,
Backend: networking.IngressBackend{
Service: &networking.IngressServiceBackend{
Name: "http-svc",
Port: networking.ServiceBackendPort{
Number: 80,
},
},
},
},
},
},
},
},
},
},
}, clientSet, t)
defer deleteIngress(ing, clientSet, t)
time.Sleep(1 * time.Second)
if atomic.LoadUint64(&add) != 0 {
t.Errorf("expected 0 events of type Create but %v occurred", add)
}
if atomic.LoadUint64(&upd) != 0 {
t.Errorf("expected 0 events of type Update but %v occurred", upd)
}
if atomic.LoadUint64(&del) != 0 {
t.Errorf("expected 0 events of type Delete but %v occurred", del)
}
})
// test add ingress with secret it doesn't exists and then add secret
// check secret is generated on fs
// check ocsp