Sync secrets (SSL certificates) on events

Remove scheduled check for missing secrets.
This commit is contained in:
Antoine Cotten 2018-04-13 00:26:10 +02:00
parent 8855460817
commit fec3ddc6cc
No known key found for this signature in database
GPG key ID: EA06C9A94E2B3EA0
9 changed files with 395 additions and 209 deletions

View file

@ -27,16 +27,15 @@ import (
"github.com/eapache/channels"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
@ -59,15 +58,15 @@ type Storer interface {
GetBackendConfiguration() ngx_config.Configuration
// GetConfigMap returns a ConfigmMap using the namespace and name as key
GetConfigMap(key string) (*apiv1.ConfigMap, error)
GetConfigMap(key string) (*corev1.ConfigMap, error)
// GetSecret returns a Secret using the namespace and name as key
GetSecret(key string) (*apiv1.Secret, error)
GetSecret(key string) (*corev1.Secret, error)
// GetService returns a Service using the namespace and name as key
GetService(key string) (*apiv1.Service, error)
GetService(key string) (*corev1.Service, error)
GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error)
GetServiceEndpoints(svc *corev1.Service) (*corev1.Endpoints, error)
// GetSecret returns an Ingress using the namespace and name as key
GetIngress(key string) (*extensions.Ingress, error)
@ -78,11 +77,11 @@ type Storer interface {
// GetIngressAnnotations returns the annotations associated to an Ingress
GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error)
// GetLocalSecret returns the local copy of a Secret
GetLocalSecret(name string) (*ingress.SSLCert, error)
// GetLocalSSLCert returns the local copy of a SSLCert
GetLocalSSLCert(name string) (*ingress.SSLCert, error)
// ListLocalSecrets returns the list of local Secrets
ListLocalSecrets() []*ingress.SSLCert
// ListLocalSSLCerts returns the list of local SSLCerts
ListLocalSSLCerts() []*ingress.SSLCert
// GetAuthCertificate resolves a given secret name into an SSL certificate.
// The secret must contain 3 keys named:
@ -94,9 +93,6 @@ type Storer interface {
// Run initiates the synchronization of the controllers
Run(stopCh chan struct{})
// ReadSecrets extracts information about secrets from an Ingress rule
ReadSecrets(*extensions.Ingress)
}
// EventType type of event associated with an informer
@ -109,9 +105,8 @@ const (
UpdateEvent EventType = "UPDATE"
// DeleteEvent event associated when an object is removed from an informer
DeleteEvent EventType = "DELETE"
// ConfigurationEvent event associated when a configuration object is created or updated
// ConfigurationEvent event associated when a controller configuration object is created or updated
ConfigurationEvent EventType = "CONFIGURATION"
slash = "/"
)
// Event holds the context of an event
@ -196,14 +191,14 @@ type k8sStore struct {
// secretIngressMap contains information about which ingress references a
// secret in the annotations.
secretIngressMap map[string]sets.String
secretIngressMap ObjectRefMap
filesystem file.Filesystem
// updateCh
updateCh *channels.RingChannel
// mu mutex used to avoid simultaneous incovations to syncSecret
// mu protects against simultaneous invocations of syncSecret
mu *sync.Mutex
defaultSSLCertificate string
@ -226,16 +221,16 @@ func New(checkOCSP bool,
updateCh: updateCh,
backendConfig: ngx_config.NewDefault(),
mu: &sync.Mutex{},
secretIngressMap: make(map[string]sets.String),
secretIngressMap: NewObjectRefMap(),
defaultSSLCertificate: defaultSSLCertificate,
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{
Interface: client.CoreV1().Events(namespace),
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{
Component: "nginx-ingress-controller",
})
@ -264,22 +259,25 @@ func New(checkOCSP bool,
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !class.IsValid(addIng) {
a, _ := parser.GetStringAnnotation(class.IngressKey, addIng)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
ing := obj.(*extensions.Ingress)
if !class.IsValid(ing) {
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
return
}
store.extractAnnotations(addIng)
recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
store.extractAnnotations(ing)
store.updateSecretIngressMap(ing)
store.syncSecrets(ing)
recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
DeleteFunc: func(obj interface{}) {
delIng, ok := obj.(*extensions.Ingress)
ing, ok := obj.(*extensions.Ingress)
if !ok {
// If we reached here it means the ingress was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
@ -287,18 +285,23 @@ func New(checkOCSP bool,
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
delIng, ok = tombstone.Obj.(*extensions.Ingress)
ing, ok = tombstone.Obj.(*extensions.Ingress)
if !ok {
glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
return
}
}
if !class.IsValid(delIng) {
glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
if !class.IsValid(ing) {
glog.Infof("ignoring delete for ingress %v based on annotation %v", ing.Name, class.IngressKey)
return
}
recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
store.listers.IngressAnnotation.Delete(delIng)
store.listers.IngressAnnotation.Delete(ing)
key := k8s.MetaNamespaceKey(ing)
store.secretIngressMap.Delete(key)
recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
@ -311,15 +314,18 @@ func New(checkOCSP bool,
validCur := class.IsValid(curIng)
if !validOld && validCur {
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
recorder.Eventf(curIng, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validOld && !validCur {
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
recorder.Eventf(curIng, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validCur && !reflect.DeepEqual(old, cur) {
recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
recorder.Eventf(curIng, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
}
store.extractAnnotations(curIng)
store.updateSecretIngressMap(curIng)
store.syncSecrets(curIng)
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
@ -328,39 +334,64 @@ func New(checkOCSP bool,
}
secrEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
sec := obj.(*corev1.Secret)
key := k8s.MetaNamespaceKey(sec)
if store.defaultSSLCertificate == key {
store.syncSecret(store.defaultSSLCertificate)
}
// find references in ingresses and update local ssl certs
if ings := store.secretIngressMap.Reference(key); len(ings) > 0 {
glog.Infof("secret %v was added and it is used in ingress annotations. Parsing...", key)
for _, ingKey := range ings {
ing, err := store.GetIngress(ingKey)
if err != nil {
glog.Errorf("could not find Ingress %v in local store", ingKey)
continue
}
store.extractAnnotations(ing)
store.updateSecretIngressMap(ing)
store.syncSecrets(ing)
}
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: obj,
}
}
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
sec := cur.(*apiv1.Secret)
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
sec := cur.(*corev1.Secret)
key := k8s.MetaNamespaceKey(sec)
// parse the ingress annotations (again)
if set, ok := store.secretIngressMap[key]; ok {
glog.Infof("secret %v changed and it is used in ingress annotations. Parsing...", key)
_, err := store.GetLocalSecret(k8s.MetaNamespaceKey(sec))
if err == nil {
store.syncSecret(key)
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
if store.defaultSSLCertificate == key {
store.syncSecret(store.defaultSSLCertificate)
}
// find references in ingresses and update local ssl certs
if ings := store.secretIngressMap.Reference(key); len(ings) > 0 {
glog.Infof("secret %v was updated and it is used in ingress annotations. Parsing...", key)
for _, ingKey := range ings {
ing, err := store.GetIngress(ingKey)
if err != nil {
glog.Errorf("could not find Ingress %v in local store", ingKey)
continue
}
store.extractAnnotations(ing)
store.updateSecretIngressMap(ing)
store.syncSecrets(ing)
}
for _, name := range set.List() {
ing, _ := store.GetIngress(name)
if ing != nil {
store.extractAnnotations(ing)
}
}
updateCh.In() <- Event{
Type: ConfigurationEvent,
Type: UpdateEvent,
Obj: cur,
}
}
}
},
DeleteFunc: func(obj interface{}) {
sec, ok := obj.(*apiv1.Secret)
sec, ok := obj.(*corev1.Secret)
if !ok {
// If we reached here it means the secret was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
@ -368,32 +399,32 @@ func New(checkOCSP bool,
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
sec, ok = tombstone.Obj.(*apiv1.Secret)
sec, ok = tombstone.Obj.(*corev1.Secret)
if !ok {
glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj)
return
}
}
store.sslStore.Delete(k8s.MetaNamespaceKey(sec))
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
// parse the ingress annotations (again)c
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
if set, ok := store.secretIngressMap[key]; ok {
glog.Infof("secret %v was removed and it is used in ingress annotations. Parsing...", key)
for _, name := range set.List() {
ing, _ := store.GetIngress(name)
if ing != nil {
store.extractAnnotations(ing)
key := k8s.MetaNamespaceKey(sec)
// find references in ingresses
if ings := store.secretIngressMap.Reference(key); len(ings) > 0 {
glog.Infof("secret %v was deleted and it is used in ingress annotations. Parsing...", key)
for _, ingKey := range ings {
ing, err := store.GetIngress(ingKey)
if err != nil {
glog.Errorf("could not find Ingress %v in local store", ingKey)
continue
}
store.extractAnnotations(ing)
store.updateSecretIngressMap(ing)
}
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: sec,
Type: DeleteEvent,
Obj: obj,
}
}
},
@ -413,9 +444,9 @@ func New(checkOCSP bool,
}
},
UpdateFunc: func(old, cur interface{}) {
oep := old.(*apiv1.Endpoints)
ocur := cur.(*apiv1.Endpoints)
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
oep := old.(*corev1.Endpoints)
cep := cur.(*corev1.Endpoints)
if !reflect.DeepEqual(cep.Subsets, oep.Subsets) {
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
@ -424,13 +455,17 @@ func New(checkOCSP bool,
},
}
mapEventHandler := cache.ResourceEventHandlerFuncs{
cmEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
m := obj.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", m.Namespace, m.Name)
if mapKey == configmap {
glog.V(2).Infof("adding configmap %v to backend", mapKey)
store.setConfig(m)
cm := obj.(*corev1.ConfigMap)
key := k8s.MetaNamespaceKey(cm)
// updates to configuration configmaps can trigger an update
if key == configmap || key == tcp || key == udp {
glog.V(2).Infof("adding configmap %v to backend", key)
recorder.Eventf(cm, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("ConfigMap %v", key))
if key == configmap {
store.setConfig(cm)
}
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: obj,
@ -439,19 +474,14 @@ func New(checkOCSP bool,
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
m := cur.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", m.Namespace, m.Name)
if mapKey == configmap {
recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
store.setConfig(m)
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
}
}
cm := cur.(*corev1.ConfigMap)
key := k8s.MetaNamespaceKey(cm)
// updates to configuration configmaps can trigger an update
if mapKey == tcp || mapKey == udp {
recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
if key == configmap || key == tcp || key == udp {
recorder.Eventf(cm, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", key))
if key == configmap {
store.setConfig(cm)
}
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
@ -464,7 +494,7 @@ func New(checkOCSP bool,
store.informers.Ingress.AddEventHandler(ingEventHandler)
store.informers.Endpoint.AddEventHandler(epEventHandler)
store.informers.Secret.AddEventHandler(secrEventHandler)
store.informers.ConfigMap.AddEventHandler(mapEventHandler)
store.informers.ConfigMap.AddEventHandler(cmEventHandler)
store.informers.Service.AddEventHandler(cache.ResourceEventHandlerFuncs{})
return store
@ -473,46 +503,74 @@ func New(checkOCSP bool,
// extractAnnotations parses ingress annotations converting the value of the
// annotation to a go struct and also information about the referenced secrets
func (s *k8sStore) extractAnnotations(ing *extensions.Ingress) {
key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name)
glog.V(3).Infof("updating annotations information for ingres %v", key)
key := k8s.MetaNamespaceKey(ing)
glog.V(3).Infof("updating annotations information for ingress %v", key)
anns := s.annotations.Extract(ing)
secName := anns.BasicDigestAuth.Secret
if secName != "" {
if _, ok := s.secretIngressMap[secName]; !ok {
s.secretIngressMap[secName] = sets.String{}
}
v := s.secretIngressMap[secName]
if !v.Has(key) {
v.Insert(key)
}
}
secName = anns.CertificateAuth.Secret
if secName != "" {
if _, ok := s.secretIngressMap[secName]; !ok {
s.secretIngressMap[secName] = sets.String{}
}
v := s.secretIngressMap[secName]
if !v.Has(key) {
v.Insert(key)
}
}
err := s.listers.IngressAnnotation.Update(anns)
if err != nil {
glog.Error(err)
}
}
// updateSecretIngressMap takes an Ingress and updates all Secret objects it
// references in secretIngressMap.
func (s *k8sStore) updateSecretIngressMap(ing *extensions.Ingress) {
key := k8s.MetaNamespaceKey(ing)
glog.V(3).Infof("updating references to secrets for ingress %v", key)
// delete all existing references first
s.secretIngressMap.Delete(key)
var refSecrets []string
for _, tls := range ing.Spec.TLS {
secrName := tls.SecretName
if secrName != "" {
secrKey := fmt.Sprintf("%v/%v", ing.Namespace, secrName)
refSecrets = append(refSecrets, secrKey)
}
}
anns, err := s.GetIngressAnnotations(ing)
if err != nil {
glog.Errorf("Error reading Ingress annotations: %v", err)
return
}
secretAnnotations := []string{
anns.BasicDigestAuth.Secret,
anns.CertificateAuth.Secret,
}
for _, secrName := range secretAnnotations {
if secrName != "" {
secrKey := fmt.Sprintf("%v/%v", ing.Namespace, secrName)
refSecrets = append(refSecrets, secrKey)
}
}
// populate map with all secret references
s.secretIngressMap.Insert(key, refSecrets...)
}
// syncSecrets synchronizes data from all Secrets referenced by the given
// Ingress with the local store and file system.
func (s k8sStore) syncSecrets(ing *extensions.Ingress) {
key := k8s.MetaNamespaceKey(ing)
for _, secrKey := range s.secretIngressMap.ReferencedBy(key) {
s.syncSecret(secrKey)
}
}
// GetSecret returns a Secret using the namespace and name as key
func (s k8sStore) GetSecret(key string) (*apiv1.Secret, error) {
func (s k8sStore) GetSecret(key string) (*corev1.Secret, error) {
return s.listers.Secret.ByKey(key)
}
// ListLocalSecrets returns the list of local Secrets
func (s k8sStore) ListLocalSecrets() []*ingress.SSLCert {
// ListLocalSSLCerts returns the list of local SSLCerts
func (s k8sStore) ListLocalSSLCerts() []*ingress.SSLCert {
var certs []*ingress.SSLCert
for _, item := range s.sslStore.List() {
if s, ok := item.(*ingress.SSLCert); ok {
@ -524,7 +582,7 @@ func (s k8sStore) ListLocalSecrets() []*ingress.SSLCert {
}
// GetService returns a Service using the namespace and name as key
func (s k8sStore) GetService(key string) (*apiv1.Service, error) {
func (s k8sStore) GetService(key string) (*corev1.Service, error) {
return s.listers.Service.ByKey(key)
}
@ -545,7 +603,7 @@ func (s k8sStore) ListIngresses() []*extensions.Ingress {
for ri, rule := range ing.Spec.Rules {
for pi, path := range rule.HTTP.Paths {
if path.Path == "" {
ing.Spec.Rules[ri].HTTP.Paths[pi].Path = slash
ing.Spec.Rules[ri].HTTP.Paths[pi].Path = "/"
}
}
}
@ -557,7 +615,7 @@ func (s k8sStore) ListIngresses() []*extensions.Ingress {
// GetIngressAnnotations returns the annotations associated to an Ingress
func (s k8sStore) GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error) {
key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name)
key := k8s.MetaNamespaceKey(ing)
item, exists, err := s.listers.IngressAnnotation.GetByKey(key)
if err != nil {
return &annotations.Ingress{}, fmt.Errorf("unexpected error getting ingress annotation %v: %v", key, err)
@ -568,26 +626,26 @@ func (s k8sStore) GetIngressAnnotations(ing *extensions.Ingress) (*annotations.I
return item.(*annotations.Ingress), nil
}
// GetLocalSecret returns the local copy of a Secret
func (s k8sStore) GetLocalSecret(key string) (*ingress.SSLCert, error) {
// GetLocalSSLCert returns the local copy of a SSLCert
func (s k8sStore) GetLocalSSLCert(key string) (*ingress.SSLCert, error) {
return s.sslStore.ByKey(key)
}
func (s k8sStore) GetConfigMap(key string) (*apiv1.ConfigMap, error) {
func (s k8sStore) GetConfigMap(key string) (*corev1.ConfigMap, error) {
return s.listers.ConfigMap.ByKey(key)
}
func (s k8sStore) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) {
func (s k8sStore) GetServiceEndpoints(svc *corev1.Service) (*corev1.Endpoints, error) {
return s.listers.Endpoint.GetServiceEndpoints(svc)
}
// GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret
func (s k8sStore) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) {
if _, err := s.GetLocalSecret(name); err != nil {
if _, err := s.GetLocalSSLCert(name); err != nil {
s.syncSecret(name)
}
cert, err := s.GetLocalSecret(name)
cert, err := s.GetLocalSSLCert(name)
if err != nil {
return nil, err
}
@ -608,7 +666,7 @@ func (s k8sStore) GetBackendConfiguration() ngx_config.Configuration {
return s.backendConfig
}
func (s *k8sStore) setConfig(cmap *apiv1.ConfigMap) {
func (s *k8sStore) setConfig(cmap *corev1.ConfigMap) {
s.backendConfig = ngx_template.ReadConfig(cmap.Data)
// TODO: this should not be done here
@ -628,19 +686,6 @@ func (s k8sStore) Run(stopCh chan struct{}) {
// start informers
s.informers.Run(stopCh)
// initial sync of secrets to avoid unnecessary reloads
glog.Info("running initial sync of secrets")
for _, ing := range s.ListIngresses() {
s.ReadSecrets(ing)
}
if s.defaultSSLCertificate != "" {
s.syncSecret(s.defaultSSLCertificate)
}
// start goroutine to check for missing local secrets
go wait.Until(s.checkMissingSecrets, 10*time.Second, stopCh)
if s.isOCSPCheckEnabled {
go wait.Until(s.checkSSLChainIssues, 60*time.Second, stopCh)
}