Fix status update in case of connection errors
This commit is contained in:
parent
468872b7e9
commit
fed013ab6f
11 changed files with 326 additions and 107 deletions
|
|
@ -18,7 +18,6 @@ package controller
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
|
@ -257,7 +256,7 @@ func (n *NGINXController) Start() {
|
|||
n.store.Run(n.stopCh)
|
||||
|
||||
if n.syncStatus != nil {
|
||||
go n.syncStatus.Run(context.Background())
|
||||
go n.syncStatus.Run()
|
||||
}
|
||||
|
||||
cmd := nginxExecCommand()
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ const (
|
|||
|
||||
// Sync ...
|
||||
type Sync interface {
|
||||
Run(ctx context.Context)
|
||||
Run()
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
|
|
@ -93,22 +93,97 @@ type statusSync struct {
|
|||
pod *k8s.PodInfo
|
||||
|
||||
elector *leaderelection.LeaderElector
|
||||
|
||||
// workqueue used to keep in sync the status IP/s
|
||||
// in the Ingress rules
|
||||
syncQueue *task.Queue
|
||||
}
|
||||
|
||||
// Run starts the loop to keep the status in sync
|
||||
func (s statusSync) Run(ctx context.Context) {
|
||||
s.elector.Run(ctx)
|
||||
func (s statusSync) Run() {
|
||||
// we need to use the defined ingress class to allow multiple leaders
|
||||
// in order to update information about ingress status
|
||||
electionID := fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.DefaultIngressClass)
|
||||
if s.Config.IngressClass != "" {
|
||||
electionID = fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.IngressClass)
|
||||
}
|
||||
|
||||
// start a new context
|
||||
ctx := context.Background()
|
||||
// allow to cancel the context in case we stop being the leader
|
||||
leaderCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
var stopCh chan struct{}
|
||||
callbacks := leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: func(ctx context.Context) {
|
||||
glog.V(2).Infof("I am the new status update leader")
|
||||
stopCh = make(chan struct{})
|
||||
go s.syncQueue.Run(time.Second, stopCh)
|
||||
// trigger initial sync
|
||||
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
|
||||
// when this instance is the leader we need to enqueue
|
||||
// an item to trigger the update of the Ingress status.
|
||||
wait.PollUntil(updateInterval, func() (bool, error) {
|
||||
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
|
||||
return false, nil
|
||||
}, stopCh)
|
||||
},
|
||||
OnStoppedLeading: func() {
|
||||
glog.V(2).Infof("I am not status update leader anymore")
|
||||
close(stopCh)
|
||||
|
||||
// cancel the context
|
||||
cancel()
|
||||
|
||||
// start a new context and run the elector
|
||||
leaderCtx, cancel = context.WithCancel(ctx)
|
||||
go s.elector.Run(leaderCtx)
|
||||
},
|
||||
OnNewLeader: func(identity string) {
|
||||
glog.Infof("new leader elected: %v", identity)
|
||||
},
|
||||
}
|
||||
|
||||
broadcaster := record.NewBroadcaster()
|
||||
hostname, _ := os.Hostname()
|
||||
|
||||
recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
|
||||
Component: "ingress-leader-elector",
|
||||
Host: hostname,
|
||||
})
|
||||
|
||||
lock := resourcelock.ConfigMapLock{
|
||||
ConfigMapMeta: metav1.ObjectMeta{Namespace: s.pod.Namespace, Name: electionID},
|
||||
Client: s.Config.Client.CoreV1(),
|
||||
LockConfig: resourcelock.ResourceLockConfig{
|
||||
Identity: s.pod.Name,
|
||||
EventRecorder: recorder,
|
||||
},
|
||||
}
|
||||
|
||||
ttl := 30 * time.Second
|
||||
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
|
||||
Lock: &lock,
|
||||
LeaseDuration: ttl,
|
||||
RenewDeadline: ttl / 2,
|
||||
RetryPeriod: ttl / 4,
|
||||
Callbacks: callbacks,
|
||||
})
|
||||
if err != nil {
|
||||
glog.Fatalf("unexpected error starting leader election: %v", err)
|
||||
}
|
||||
s.elector = le
|
||||
|
||||
go le.Run(leaderCtx)
|
||||
}
|
||||
|
||||
// Shutdown stop the sync. In case the instance is the leader it will remove the current IP
|
||||
// if there is no other instances running.
|
||||
func (s statusSync) Shutdown() {
|
||||
go s.syncQueue.Shutdown()
|
||||
|
||||
// remove IP from Ingress
|
||||
if !s.elector.IsLeader() {
|
||||
if s.elector != nil && !s.elector.IsLeader() {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -146,6 +221,10 @@ func (s *statusSync) sync(key interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
if s.elector != nil && !s.elector.IsLeader() {
|
||||
return fmt.Errorf("i am not the current leader. Skiping status update")
|
||||
}
|
||||
|
||||
addrs, err := s.runningAddresses()
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -173,66 +252,6 @@ func NewStatusSyncer(config Config) Sync {
|
|||
}
|
||||
st.syncQueue = task.NewCustomTaskQueue(st.sync, st.keyfunc)
|
||||
|
||||
// we need to use the defined ingress class to allow multiple leaders
|
||||
// in order to update information about ingress status
|
||||
electionID := fmt.Sprintf("%v-%v", config.ElectionID, config.DefaultIngressClass)
|
||||
if config.IngressClass != "" {
|
||||
electionID = fmt.Sprintf("%v-%v", config.ElectionID, config.IngressClass)
|
||||
}
|
||||
|
||||
var stopCh chan struct{}
|
||||
callbacks := leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: func(ctx context.Context) {
|
||||
glog.V(2).Infof("I am the new status update leader")
|
||||
stopCh = make(chan struct{})
|
||||
go st.syncQueue.Run(time.Second, stopCh)
|
||||
// when this instance is the leader we need to enqueue
|
||||
// an item to trigger the update of the Ingress status.
|
||||
wait.PollUntil(updateInterval, func() (bool, error) {
|
||||
st.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
|
||||
return false, nil
|
||||
}, stopCh)
|
||||
},
|
||||
OnStoppedLeading: func() {
|
||||
glog.V(2).Infof("I am not status update leader anymore")
|
||||
close(stopCh)
|
||||
},
|
||||
OnNewLeader: func(identity string) {
|
||||
glog.Infof("new leader elected: %v", identity)
|
||||
},
|
||||
}
|
||||
|
||||
broadcaster := record.NewBroadcaster()
|
||||
hostname, _ := os.Hostname()
|
||||
|
||||
recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
|
||||
Component: "ingress-leader-elector",
|
||||
Host: hostname,
|
||||
})
|
||||
|
||||
lock := resourcelock.ConfigMapLock{
|
||||
ConfigMapMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: electionID},
|
||||
Client: config.Client.CoreV1(),
|
||||
LockConfig: resourcelock.ResourceLockConfig{
|
||||
Identity: pod.Name,
|
||||
EventRecorder: recorder,
|
||||
},
|
||||
}
|
||||
|
||||
ttl := 30 * time.Second
|
||||
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
|
||||
Lock: &lock,
|
||||
LeaseDuration: ttl,
|
||||
RenewDeadline: ttl / 2,
|
||||
RetryPeriod: ttl / 4,
|
||||
Callbacks: callbacks,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
glog.Fatalf("unexpected error starting leader election: %v", err)
|
||||
}
|
||||
|
||||
st.elector = le
|
||||
return st
|
||||
}
|
||||
|
||||
|
|
@ -333,6 +352,13 @@ func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) {
|
|||
sort.SliceStable(newIngressPoint, lessLoadBalancerIngress(newIngressPoint))
|
||||
|
||||
for _, ing := range ings {
|
||||
curIPs := ing.Status.LoadBalancer.Ingress
|
||||
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
|
||||
if ingressSliceEqual(curIPs, newIngressPoint) {
|
||||
glog.V(3).Infof("skipping update of Ingress %v/%v (no change)", ing.Namespace, ing.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
batch.Queue(runUpdate(ing, newIngressPoint, s.Client))
|
||||
}
|
||||
|
||||
|
|
@ -347,14 +373,6 @@ func runUpdate(ing *extensions.Ingress, status []apiv1.LoadBalancerIngress,
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
curIPs := ing.Status.LoadBalancer.Ingress
|
||||
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
|
||||
|
||||
if ingressSliceEqual(status, curIPs) {
|
||||
glog.V(3).Infof("skipping update of Ingress %v/%v (no change)", ing.Namespace, ing.Name)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
ingClient := client.ExtensionsV1beta1().Ingresses(ing.Namespace)
|
||||
|
||||
currIng, err := ingClient.Get(ing.Name, metav1.GetOptions{})
|
||||
|
|
@ -398,5 +416,6 @@ func ingressSliceEqual(lhs, rhs []apiv1.LoadBalancerIngress) bool {
|
|||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@ limitations under the License.
|
|||
package status
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -298,7 +297,7 @@ func TestStatusActions(t *testing.T) {
|
|||
fk := fkSync.(statusSync)
|
||||
|
||||
// start it and wait for the election and syn actions
|
||||
go fk.Run(context.Background())
|
||||
go fk.Run()
|
||||
// wait for the election
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// execute sync
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue