Watch controller Pods list

This commit is contained in:
Maxime Ginters 2018-11-20 15:29:20 -05:00
parent b65b85cd99
commit b6b221aebb
4 changed files with 205 additions and 7 deletions

View file

@ -30,8 +30,11 @@ import (
corev1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
@ -76,6 +79,9 @@ type Storer interface {
// ListIngresses returns a list of all Ingresses in the store.
ListIngresses() []*ingress.Ingress
// ListControllerPods returns a list of ingress-nginx controller Pods.
ListControllerPods() []*corev1.Pod
// GetLocalSSLCert returns the local copy of a SSLCert
GetLocalSSLCert(name string) (*ingress.SSLCert, error)
@ -121,6 +127,7 @@ type Informer struct {
Service cache.SharedIndexInformer
Secret cache.SharedIndexInformer
ConfigMap cache.SharedIndexInformer
Pod cache.SharedIndexInformer
}
// Lister contains object listers (stores).
@ -131,6 +138,7 @@ type Lister struct {
Secret SecretLister
ConfigMap ConfigMapLister
IngressAnnotation IngressAnnotationsLister
Pod PodLister
}
// NotExistsError is returned when an object does not exist in a local store.
@ -147,6 +155,7 @@ func (i *Informer) Run(stopCh chan struct{}) {
go i.Service.Run(stopCh)
go i.Secret.Run(stopCh)
go i.ConfigMap.Run(stopCh)
go i.Pod.Run(stopCh)
// wait for all involved caches to be synced before processing items
// from the queue
@ -211,6 +220,8 @@ type k8sStore struct {
defaultSSLCertificate string
isDynamicCertificatesEnabled bool
pod *k8s.PodInfo
}
// New creates a new object store to be used in the ingress controller
@ -220,7 +231,8 @@ func New(checkOCSP bool,
client clientset.Interface,
fs file.Filesystem,
updateCh *channels.RingChannel,
isDynamicCertificatesEnabled bool) Storer {
isDynamicCertificatesEnabled bool,
pod *k8s.PodInfo) Storer {
store := &k8sStore{
isOCSPCheckEnabled: checkOCSP,
@ -234,6 +246,7 @@ func New(checkOCSP bool,
secretIngressMap: NewObjectRefMap(),
defaultSSLCertificate: defaultSSLCertificate,
isDynamicCertificatesEnabled: isDynamicCertificatesEnabled,
pod: pod,
}
eventBroadcaster := record.NewBroadcaster()
@ -270,6 +283,26 @@ func New(checkOCSP bool,
store.informers.Service = infFactory.Core().V1().Services().Informer()
store.listers.Service.Store = store.informers.Service.GetStore()
labelSelector := labels.SelectorFromSet(store.pod.Labels)
store.informers.Pod = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (k8sruntime.Object, error) {
options.LabelSelector = labelSelector.String()
return client.CoreV1().Pods(store.pod.Namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = labelSelector.String()
return client.CoreV1().Pods(store.pod.Namespace).Watch(options)
},
},
&corev1.Pod{},
resyncPeriod,
cache.Indexers{},
)
store.listers.Pod.Store = store.informers.Pod.GetStore()
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ing := obj.(*extensions.Ingress)
@ -512,11 +545,40 @@ func New(checkOCSP bool,
},
}
podEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
UpdateFunc: func(old, cur interface{}) {
oldPod := old.(*corev1.Pod)
curPod := cur.(*corev1.Pod)
if oldPod.Status.Phase == curPod.Status.Phase {
return
}
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
},
DeleteFunc: func(obj interface{}) {
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
},
}
store.informers.Ingress.AddEventHandler(ingEventHandler)
store.informers.Endpoint.AddEventHandler(epEventHandler)
store.informers.Secret.AddEventHandler(secrEventHandler)
store.informers.ConfigMap.AddEventHandler(cmEventHandler)
store.informers.Service.AddEventHandler(cache.ResourceEventHandlerFuncs{})
store.informers.Pod.AddEventHandler(podEventHandler)
// do not wait for informers to read the configmap configuration
ns, name, _ := k8s.ParseNameNS(configmap)
@ -773,3 +835,20 @@ func (s k8sStore) Run(stopCh chan struct{}) {
go wait.Until(s.checkSSLChainIssues, 60*time.Second, stopCh)
}
}
// ListControllerPods returns a list of ingress-nginx controller Pods
func (s k8sStore) ListControllerPods() []*corev1.Pod {
var pods []*corev1.Pod
for _, i := range s.listers.Pod.List() {
pod := i.(*corev1.Pod)
if pod.Status.Phase != corev1.PodRunning {
continue
}
pods = append(pods, pod)
}
return pods
}