Refactor status update
This commit is contained in:
parent
8e7480e0f1
commit
0a39425e8f
3 changed files with 180 additions and 122 deletions
|
|
@ -17,10 +17,8 @@ limitations under the License.
|
|||
package status
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
|
@ -34,10 +32,6 @@ import (
|
|||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/tools/leaderelection"
|
||||
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
|
||||
|
||||
"k8s.io/ingress-nginx/internal/ingress"
|
||||
|
|
@ -49,9 +43,10 @@ const (
|
|||
updateInterval = 60 * time.Second
|
||||
)
|
||||
|
||||
// Sync ...
|
||||
type Sync interface {
|
||||
Run()
|
||||
// Syncer ...
|
||||
type Syncer interface {
|
||||
Run(chan struct{})
|
||||
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
|
|
@ -68,16 +63,11 @@ type Config struct {
|
|||
|
||||
PublishStatusAddress string
|
||||
|
||||
ElectionID string
|
||||
|
||||
UpdateStatusOnShutdown bool
|
||||
|
||||
UseNodeInternalIP bool
|
||||
|
||||
IngressLister ingressLister
|
||||
|
||||
DefaultIngressClass string
|
||||
IngressClass string
|
||||
}
|
||||
|
||||
// statusSync keeps the status IP in each Ingress rule updated executing a periodic check
|
||||
|
|
@ -89,109 +79,35 @@ type Config struct {
|
|||
// two flags are set, the source is the IP/s of the node/s
|
||||
type statusSync struct {
|
||||
Config
|
||||
|
||||
// pod contains runtime information about this pod
|
||||
pod *k8s.PodInfo
|
||||
|
||||
elector *leaderelection.LeaderElector
|
||||
|
||||
// workqueue used to keep in sync the status IP/s
|
||||
// in the Ingress rules
|
||||
syncQueue *task.Queue
|
||||
}
|
||||
|
||||
// Run starts the loop to keep the status in sync
|
||||
func (s statusSync) Run() {
|
||||
// we need to use the defined ingress class to allow multiple leaders
|
||||
// in order to update information about ingress status
|
||||
electionID := fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.DefaultIngressClass)
|
||||
if s.Config.IngressClass != "" {
|
||||
electionID = fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.IngressClass)
|
||||
}
|
||||
// Start starts the loop to keep the status in sync
|
||||
func (s statusSync) Run(stopCh chan struct{}) {
|
||||
go s.syncQueue.Run(time.Second, stopCh)
|
||||
|
||||
// start a new context
|
||||
ctx := context.Background()
|
||||
// trigger initial sync
|
||||
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
|
||||
|
||||
var cancelContext context.CancelFunc
|
||||
|
||||
var newLeaderCtx = func(ctx context.Context) context.CancelFunc {
|
||||
// allow to cancel the context in case we stop being the leader
|
||||
leaderCtx, cancel := context.WithCancel(ctx)
|
||||
go s.elector.Run(leaderCtx)
|
||||
return cancel
|
||||
}
|
||||
|
||||
var stopCh chan struct{}
|
||||
callbacks := leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: func(ctx context.Context) {
|
||||
klog.V(2).Infof("I am the new status update leader")
|
||||
stopCh = make(chan struct{})
|
||||
go s.syncQueue.Run(time.Second, stopCh)
|
||||
// trigger initial sync
|
||||
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
|
||||
// when this instance is the leader we need to enqueue
|
||||
// an item to trigger the update of the Ingress status.
|
||||
wait.PollUntil(updateInterval, func() (bool, error) {
|
||||
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
|
||||
return false, nil
|
||||
}, stopCh)
|
||||
},
|
||||
OnStoppedLeading: func() {
|
||||
klog.V(2).Info("I am not status update leader anymore")
|
||||
close(stopCh)
|
||||
|
||||
// cancel the context
|
||||
cancelContext()
|
||||
|
||||
cancelContext = newLeaderCtx(ctx)
|
||||
},
|
||||
OnNewLeader: func(identity string) {
|
||||
klog.Infof("new leader elected: %v", identity)
|
||||
},
|
||||
}
|
||||
|
||||
broadcaster := record.NewBroadcaster()
|
||||
hostname, _ := os.Hostname()
|
||||
|
||||
recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
|
||||
Component: "ingress-leader-elector",
|
||||
Host: hostname,
|
||||
})
|
||||
|
||||
lock := resourcelock.ConfigMapLock{
|
||||
ConfigMapMeta: metav1.ObjectMeta{Namespace: s.pod.Namespace, Name: electionID},
|
||||
Client: s.Config.Client.CoreV1(),
|
||||
LockConfig: resourcelock.ResourceLockConfig{
|
||||
Identity: s.pod.Name,
|
||||
EventRecorder: recorder,
|
||||
},
|
||||
}
|
||||
|
||||
ttl := 30 * time.Second
|
||||
var err error
|
||||
s.elector, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
|
||||
Lock: &lock,
|
||||
LeaseDuration: ttl,
|
||||
RenewDeadline: ttl / 2,
|
||||
RetryPeriod: ttl / 4,
|
||||
Callbacks: callbacks,
|
||||
})
|
||||
if err != nil {
|
||||
klog.Fatalf("unexpected error starting leader election: %v", err)
|
||||
}
|
||||
|
||||
cancelContext = newLeaderCtx(ctx)
|
||||
// when this instance is the leader we need to enqueue
|
||||
// an item to trigger the update of the Ingress status.
|
||||
wait.PollUntil(updateInterval, func() (bool, error) {
|
||||
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
|
||||
return false, nil
|
||||
}, stopCh)
|
||||
}
|
||||
|
||||
// Shutdown stop the sync. In case the instance is the leader it will remove the current IP
|
||||
// Shutdown stops the sync. In case the instance is the leader it will remove the current IP
|
||||
// if there is no other instances running.
|
||||
func (s statusSync) Shutdown() {
|
||||
go s.syncQueue.Shutdown()
|
||||
|
||||
// remove IP from Ingress
|
||||
if s.elector != nil && !s.elector.IsLeader() {
|
||||
return
|
||||
}
|
||||
|
||||
if !s.UpdateStatusOnShutdown {
|
||||
klog.Warningf("skipping update of status of Ingress rules")
|
||||
return
|
||||
|
|
@ -226,10 +142,6 @@ func (s *statusSync) sync(key interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
if s.elector != nil && !s.elector.IsLeader() {
|
||||
return fmt.Errorf("i am not the current leader. Skiping status update")
|
||||
}
|
||||
|
||||
addrs, err := s.runningAddresses()
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -243,15 +155,10 @@ func (s statusSync) keyfunc(input interface{}) (interface{}, error) {
|
|||
return input, nil
|
||||
}
|
||||
|
||||
// NewStatusSyncer returns a new Sync instance
|
||||
func NewStatusSyncer(config Config) Sync {
|
||||
pod, err := k8s.GetPodDetails(config.Client)
|
||||
if err != nil {
|
||||
klog.Fatalf("unexpected error obtaining pod information: %v", err)
|
||||
}
|
||||
|
||||
// NewStatusSyncer returns a new Syncer instance
|
||||
func NewStatusSyncer(podInfo *k8s.PodInfo, config Config) Syncer {
|
||||
st := statusSync{
|
||||
pod: pod,
|
||||
pod: podInfo,
|
||||
|
||||
Config: config,
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue