feat: switch from endpoints to endpointslices (#8890)
* endpointslices Signed-off-by: tombokombo <tombo@sysart.tech> * cleanup Signed-off-by: tombokombo <tombo@sysart.tech> * fix rbac Signed-off-by: tombokombo <tombo@sysart.tech> * fix comments Signed-off-by: tombokombo <tombo@sysart.tech> * cleanup store, add store tests Signed-off-by: tombokombo <tombo@sysart.tech> * fix copyright date Signed-off-by: tombokombo <tombo@sysart.tech> Signed-off-by: tombokombo <tombo@sysart.tech>
This commit is contained in:
parent
0f5bf530ae
commit
3579ed0487
11 changed files with 520 additions and 301 deletions
|
|
@ -1,39 +0,0 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// EndpointLister makes a Store that lists Endpoints.
|
||||
type EndpointLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// ByKey returns the Endpoints of the Service matching key in the local Endpoint Store.
|
||||
func (s *EndpointLister) ByKey(key string) (*apiv1.Endpoints, error) {
|
||||
eps, exists, err := s.GetByKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, NotExistsError(key)
|
||||
}
|
||||
return eps.(*apiv1.Endpoints), nil
|
||||
}
|
||||
|
|
@ -1,66 +0,0 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func newEndpointLister(t *testing.T) *EndpointLister {
|
||||
t.Helper()
|
||||
|
||||
return &EndpointLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}
|
||||
}
|
||||
|
||||
func TestEndpointLister(t *testing.T) {
|
||||
t.Run("the key does not exist", func(t *testing.T) {
|
||||
el := newEndpointLister(t)
|
||||
|
||||
key := "namespace/endpoint"
|
||||
_, err := el.ByKey(key)
|
||||
|
||||
if err == nil {
|
||||
t.Error("expected an error but nothing has been returned")
|
||||
}
|
||||
|
||||
if _, ok := err.(NotExistsError); !ok {
|
||||
t.Errorf("expected NotExistsError, got %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("the key exists", func(t *testing.T) {
|
||||
el := newEndpointLister(t)
|
||||
|
||||
key := "namespace/endpoint"
|
||||
endpoint := &apiv1.Endpoints{ObjectMeta: metav1.ObjectMeta{Namespace: "namespace", Name: "endpoint"}}
|
||||
|
||||
el.Add(endpoint)
|
||||
|
||||
e, err := el.ByKey(key)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("unexpeted error %v", err)
|
||||
}
|
||||
|
||||
if e != endpoint {
|
||||
t.Errorf("expected %v, error, got %v", e, endpoint)
|
||||
}
|
||||
})
|
||||
}
|
||||
55
internal/ingress/controller/store/endpointslice.go
Normal file
55
internal/ingress/controller/store/endpointslice.go
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// EndpointSliceLister makes a Store that lists Endpoints.
|
||||
type EndpointSliceLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// MatchByKey returns the EndpointsSlices of the Service matching key in the local Endpoint Store.
|
||||
func (s *EndpointSliceLister) MatchByKey(key string) ([]*discoveryv1.EndpointSlice, error) {
|
||||
var eps []*discoveryv1.EndpointSlice
|
||||
// filter endpointSlices owned by svc
|
||||
for _, listKey := range s.ListKeys() {
|
||||
if !strings.HasPrefix(listKey, key) {
|
||||
continue
|
||||
}
|
||||
epss, exists, err := s.GetByKey(listKey)
|
||||
if exists && err == nil {
|
||||
// check for svc owner label
|
||||
if svcName, ok := epss.(*discoveryv1.EndpointSlice).ObjectMeta.GetLabels()[discoveryv1.LabelServiceName]; ok {
|
||||
namespace := epss.(*discoveryv1.EndpointSlice).ObjectMeta.GetNamespace()
|
||||
if key == fmt.Sprintf("%s/%s", namespace, svcName) {
|
||||
eps = append(eps, epss.(*discoveryv1.EndpointSlice))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(eps) == 0 {
|
||||
return nil, NotExistsError(key)
|
||||
}
|
||||
return eps, nil
|
||||
}
|
||||
94
internal/ingress/controller/store/endpointslice_test.go
Normal file
94
internal/ingress/controller/store/endpointslice_test.go
Normal file
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
func newEndpointSliceLister(t *testing.T) *EndpointSliceLister {
|
||||
t.Helper()
|
||||
|
||||
return &EndpointSliceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}
|
||||
}
|
||||
|
||||
func TestEndpointSliceLister(t *testing.T) {
|
||||
t.Run("the key does not exist", func(t *testing.T) {
|
||||
el := newEndpointSliceLister(t)
|
||||
|
||||
key := "namespace/svcname"
|
||||
_, err := el.MatchByKey(key)
|
||||
|
||||
if err == nil {
|
||||
t.Error("expected an error but nothing has been returned")
|
||||
}
|
||||
|
||||
if _, ok := err.(NotExistsError); !ok {
|
||||
t.Errorf("expected NotExistsError, got %v", err)
|
||||
}
|
||||
})
|
||||
t.Run("the key exists", func(t *testing.T) {
|
||||
el := newEndpointSliceLister(t)
|
||||
|
||||
key := "namespace/svcname"
|
||||
endpointSlice := &discoveryv1.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "namespace",
|
||||
Name: "anothername-foo",
|
||||
Labels: map[string]string{
|
||||
discoveryv1.LabelServiceName: "svcname",
|
||||
},
|
||||
},
|
||||
}
|
||||
el.Add(endpointSlice)
|
||||
endpointSlice = &discoveryv1.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "namespace",
|
||||
Name: "svcname-bar",
|
||||
Labels: map[string]string{
|
||||
discoveryv1.LabelServiceName: "othersvc",
|
||||
},
|
||||
},
|
||||
}
|
||||
el.Add(endpointSlice)
|
||||
endpointSlice = &discoveryv1.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "namespace",
|
||||
Name: "svcname-buz",
|
||||
Labels: map[string]string{
|
||||
discoveryv1.LabelServiceName: "svcname",
|
||||
},
|
||||
},
|
||||
}
|
||||
el.Add(endpointSlice)
|
||||
eps, err := el.MatchByKey(key)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("unexpeted error %v", err)
|
||||
}
|
||||
if err == nil && len(eps) != 1 {
|
||||
t.Errorf("expected one slice %v, error, got %d slices", endpointSlice, len(eps))
|
||||
}
|
||||
if len(eps) > 0 && eps[0].GetName() != endpointSlice.GetName() {
|
||||
t.Errorf("expected %v, error, got %v", endpointSlice.GetName(), eps[0].GetName())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -29,6 +29,7 @@ import (
|
|||
|
||||
"github.com/eapache/channels"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
networkingv1 "k8s.io/api/networking/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
|
|
@ -78,8 +79,8 @@ type Storer interface {
|
|||
// GetService returns the Service matching key.
|
||||
GetService(key string) (*corev1.Service, error)
|
||||
|
||||
// GetServiceEndpoints returns the Endpoints of a Service matching key.
|
||||
GetServiceEndpoints(key string) (*corev1.Endpoints, error)
|
||||
// GetServiceEndpointsSlices returns the EndpointSlices of a Service matching key.
|
||||
GetServiceEndpointsSlices(key string) ([]*discoveryv1.EndpointSlice, error)
|
||||
|
||||
// ListIngresses returns a list of all Ingresses in the store.
|
||||
ListIngresses() []*ingress.Ingress
|
||||
|
|
@ -127,13 +128,13 @@ type Event struct {
|
|||
|
||||
// Informer defines the required SharedIndexInformers that interact with the API server.
|
||||
type Informer struct {
|
||||
Ingress cache.SharedIndexInformer
|
||||
IngressClass cache.SharedIndexInformer
|
||||
Endpoint cache.SharedIndexInformer
|
||||
Service cache.SharedIndexInformer
|
||||
Secret cache.SharedIndexInformer
|
||||
ConfigMap cache.SharedIndexInformer
|
||||
Namespace cache.SharedIndexInformer
|
||||
Ingress cache.SharedIndexInformer
|
||||
IngressClass cache.SharedIndexInformer
|
||||
EndpointSlice cache.SharedIndexInformer
|
||||
Service cache.SharedIndexInformer
|
||||
Secret cache.SharedIndexInformer
|
||||
ConfigMap cache.SharedIndexInformer
|
||||
Namespace cache.SharedIndexInformer
|
||||
}
|
||||
|
||||
// Lister contains object listers (stores).
|
||||
|
|
@ -141,7 +142,7 @@ type Lister struct {
|
|||
Ingress IngressLister
|
||||
IngressClass IngressClassLister
|
||||
Service ServiceLister
|
||||
Endpoint EndpointLister
|
||||
EndpointSlice EndpointSliceLister
|
||||
Secret SecretLister
|
||||
ConfigMap ConfigMapLister
|
||||
Namespace NamespaceLister
|
||||
|
|
@ -159,7 +160,7 @@ func (e NotExistsError) Error() string {
|
|||
// Run initiates the synchronization of the informers against the API server.
|
||||
func (i *Informer) Run(stopCh chan struct{}) {
|
||||
go i.Secret.Run(stopCh)
|
||||
go i.Endpoint.Run(stopCh)
|
||||
go i.EndpointSlice.Run(stopCh)
|
||||
if i.IngressClass != nil {
|
||||
go i.IngressClass.Run(stopCh)
|
||||
}
|
||||
|
|
@ -169,7 +170,6 @@ func (i *Informer) Run(stopCh chan struct{}) {
|
|||
// wait for all involved caches to be synced before processing items
|
||||
// from the queue
|
||||
if !cache.WaitForCacheSync(stopCh,
|
||||
i.Endpoint.HasSynced,
|
||||
i.Service.HasSynced,
|
||||
i.Secret.HasSynced,
|
||||
i.ConfigMap.HasSynced,
|
||||
|
|
@ -330,8 +330,8 @@ func New(
|
|||
store.listers.IngressClass.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||
}
|
||||
|
||||
store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
|
||||
store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()
|
||||
store.informers.EndpointSlice = infFactory.Discovery().V1().EndpointSlices().Informer()
|
||||
store.listers.EndpointSlice.Store = store.informers.EndpointSlice.GetStore()
|
||||
|
||||
store.informers.Secret = infFactorySecrets.Core().V1().Secrets().Informer()
|
||||
store.listers.Secret.Store = store.informers.Secret.GetStore()
|
||||
|
|
@ -673,7 +673,7 @@ func New(
|
|||
},
|
||||
}
|
||||
|
||||
epEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
epsEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
updateCh.In() <- Event{
|
||||
Type: CreateEvent,
|
||||
|
|
@ -687,9 +687,9 @@ func New(
|
|||
}
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oep := old.(*corev1.Endpoints)
|
||||
cep := cur.(*corev1.Endpoints)
|
||||
if !reflect.DeepEqual(cep.Subsets, oep.Subsets) {
|
||||
oeps := old.(*discoveryv1.EndpointSlice)
|
||||
ceps := cur.(*discoveryv1.EndpointSlice)
|
||||
if !reflect.DeepEqual(ceps.Endpoints, oeps.Endpoints) {
|
||||
updateCh.In() <- Event{
|
||||
Type: UpdateEvent,
|
||||
Obj: cur,
|
||||
|
|
@ -796,7 +796,7 @@ func New(
|
|||
if !icConfig.IgnoreIngressClass {
|
||||
store.informers.IngressClass.AddEventHandler(ingressClassEventHandler)
|
||||
}
|
||||
store.informers.Endpoint.AddEventHandler(epEventHandler)
|
||||
store.informers.EndpointSlice.AddEventHandler(epsEventHandler)
|
||||
store.informers.Secret.AddEventHandler(secrEventHandler)
|
||||
store.informers.ConfigMap.AddEventHandler(cmEventHandler)
|
||||
store.informers.Service.AddEventHandler(serviceHandler)
|
||||
|
|
@ -1044,9 +1044,8 @@ func (s *k8sStore) GetConfigMap(key string) (*corev1.ConfigMap, error) {
|
|||
return s.listers.ConfigMap.ByKey(key)
|
||||
}
|
||||
|
||||
// GetServiceEndpoints returns the Endpoints of a Service matching key.
|
||||
func (s *k8sStore) GetServiceEndpoints(key string) (*corev1.Endpoints, error) {
|
||||
return s.listers.Endpoint.ByKey(key)
|
||||
func (s *k8sStore) GetServiceEndpointsSlices(key string) ([]*discoveryv1.EndpointSlice, error) {
|
||||
return s.listers.EndpointSlice.MatchByKey(key)
|
||||
}
|
||||
|
||||
// GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue