merge from master
This commit is contained in:
commit
ce6e564f82
86 changed files with 1448 additions and 3530 deletions
|
|
@ -27,6 +27,9 @@ import (
|
|||
"k8s.io/ingress-nginx/internal/ingress/resolver"
|
||||
)
|
||||
|
||||
// HTTP protocol
|
||||
const HTTP = "HTTP"
|
||||
|
||||
var (
|
||||
validProtocols = regexp.MustCompile(`^(HTTP|HTTPS|AJP|GRPC|GRPCS)$`)
|
||||
)
|
||||
|
|
@ -44,18 +47,18 @@ func NewParser(r resolver.Resolver) parser.IngressAnnotation {
|
|||
// rule used to indicate the backend protocol.
|
||||
func (a backendProtocol) Parse(ing *extensions.Ingress) (interface{}, error) {
|
||||
if ing.GetAnnotations() == nil {
|
||||
return "HTTP", nil
|
||||
return HTTP, nil
|
||||
}
|
||||
|
||||
proto, err := parser.GetStringAnnotation("backend-protocol", ing)
|
||||
if err != nil {
|
||||
return "HTTP", nil
|
||||
return HTTP, nil
|
||||
}
|
||||
|
||||
proto = strings.TrimSpace(strings.ToUpper(proto))
|
||||
if !validProtocols.MatchString(proto) {
|
||||
glog.Warningf("Protocol %v is not a valid value for the backend-protocol annotation. Using HTTP as protocol", proto)
|
||||
return "HTTP", nil
|
||||
return HTTP, nil
|
||||
}
|
||||
|
||||
return proto, nil
|
||||
|
|
|
|||
|
|
@ -31,10 +31,13 @@ var luaRestyWAFModes = map[string]bool{"ACTIVE": true, "INACTIVE": true, "SIMULA
|
|||
|
||||
// Config returns lua-resty-waf configuration for an Ingress rule
|
||||
type Config struct {
|
||||
Mode string `json:"mode"`
|
||||
Debug bool `json:"debug"`
|
||||
IgnoredRuleSets []string `json:"ignored-rulesets"`
|
||||
ExtraRulesetString string `json:"extra-ruleset-string"`
|
||||
Mode string `json:"mode"`
|
||||
Debug bool `json:"debug"`
|
||||
IgnoredRuleSets []string `json:"ignored-rulesets"`
|
||||
ExtraRulesetString string `json:"extra-ruleset-string"`
|
||||
ScoreThreshold int `json:"score-threshold"`
|
||||
AllowUnknownContentTypes bool `json:"allow-unknown-content-types"`
|
||||
ProcessMultipartBody bool `json:"process-multipart-body"`
|
||||
}
|
||||
|
||||
// Equal tests for equality between two Config types
|
||||
|
|
@ -57,6 +60,15 @@ func (e1 *Config) Equal(e2 *Config) bool {
|
|||
if e1.ExtraRulesetString != e2.ExtraRulesetString {
|
||||
return false
|
||||
}
|
||||
if e1.ScoreThreshold != e2.ScoreThreshold {
|
||||
return false
|
||||
}
|
||||
if e1.AllowUnknownContentTypes != e2.AllowUnknownContentTypes {
|
||||
return false
|
||||
}
|
||||
if e1.ProcessMultipartBody != e2.ProcessMultipartBody {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
@ -95,10 +107,22 @@ func (a luarestywaf) Parse(ing *extensions.Ingress) (interface{}, error) {
|
|||
// TODO(elvinefendi) maybe validate the ruleset string here
|
||||
extraRulesetString, _ := parser.GetStringAnnotation("lua-resty-waf-extra-rules", ing)
|
||||
|
||||
scoreThreshold, _ := parser.GetIntAnnotation("lua-resty-waf-score-threshold", ing)
|
||||
|
||||
allowUnknownContentTypes, _ := parser.GetBoolAnnotation("lua-resty-waf-allow-unknown-content-types", ing)
|
||||
|
||||
processMultipartBody, err := parser.GetBoolAnnotation("lua-resty-waf-process-multipart-body", ing)
|
||||
if err != nil {
|
||||
processMultipartBody = true
|
||||
}
|
||||
|
||||
return &Config{
|
||||
Mode: mode,
|
||||
Debug: debug,
|
||||
IgnoredRuleSets: ignoredRuleSets,
|
||||
ExtraRulesetString: extraRulesetString,
|
||||
Mode: mode,
|
||||
Debug: debug,
|
||||
IgnoredRuleSets: ignoredRuleSets,
|
||||
ExtraRulesetString: extraRulesetString,
|
||||
ScoreThreshold: scoreThreshold,
|
||||
AllowUnknownContentTypes: allowUnknownContentTypes,
|
||||
ProcessMultipartBody: processMultipartBody,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,6 +30,9 @@ func TestParse(t *testing.T) {
|
|||
luaRestyWAFAnnotation := parser.GetAnnotationWithPrefix("lua-resty-waf")
|
||||
luaRestyWAFDebugAnnotation := parser.GetAnnotationWithPrefix("lua-resty-waf-debug")
|
||||
luaRestyWAFIgnoredRuleSetsAnnotation := parser.GetAnnotationWithPrefix("lua-resty-waf-ignore-rulesets")
|
||||
luaRestyWAFScoreThresholdAnnotation := parser.GetAnnotationWithPrefix("lua-resty-waf-score-threshold")
|
||||
luaRestyWAFAllowUnknownContentTypesAnnotation := parser.GetAnnotationWithPrefix("lua-resty-waf-allow-unknown-content-types")
|
||||
luaRestyWAFProcessMultipartBody := parser.GetAnnotationWithPrefix("lua-resty-waf-process-multipart-body")
|
||||
|
||||
ap := NewParser(&resolver.Mock{})
|
||||
if ap == nil {
|
||||
|
|
@ -43,21 +46,25 @@ func TestParse(t *testing.T) {
|
|||
{nil, &Config{}},
|
||||
{map[string]string{}, &Config{}},
|
||||
|
||||
{map[string]string{luaRestyWAFAnnotation: "active"}, &Config{Mode: "ACTIVE", Debug: false, IgnoredRuleSets: []string{}}},
|
||||
{map[string]string{luaRestyWAFAnnotation: "active"}, &Config{Mode: "ACTIVE", Debug: false, IgnoredRuleSets: []string{}, ProcessMultipartBody: true}},
|
||||
{map[string]string{luaRestyWAFDebugAnnotation: "true"}, &Config{Debug: false}},
|
||||
|
||||
{map[string]string{luaRestyWAFAnnotation: "active", luaRestyWAFDebugAnnotation: "true"}, &Config{Mode: "ACTIVE", Debug: true, IgnoredRuleSets: []string{}}},
|
||||
{map[string]string{luaRestyWAFAnnotation: "active", luaRestyWAFDebugAnnotation: "false"}, &Config{Mode: "ACTIVE", Debug: false, IgnoredRuleSets: []string{}}},
|
||||
{map[string]string{luaRestyWAFAnnotation: "inactive", luaRestyWAFDebugAnnotation: "true"}, &Config{Mode: "INACTIVE", Debug: true, IgnoredRuleSets: []string{}}},
|
||||
{map[string]string{luaRestyWAFAnnotation: "active", luaRestyWAFDebugAnnotation: "true"}, &Config{Mode: "ACTIVE", Debug: true, IgnoredRuleSets: []string{}, ProcessMultipartBody: true}},
|
||||
{map[string]string{luaRestyWAFAnnotation: "active", luaRestyWAFDebugAnnotation: "false"}, &Config{Mode: "ACTIVE", Debug: false, IgnoredRuleSets: []string{}, ProcessMultipartBody: true}},
|
||||
{map[string]string{luaRestyWAFAnnotation: "inactive", luaRestyWAFDebugAnnotation: "true"}, &Config{Mode: "INACTIVE", Debug: true, IgnoredRuleSets: []string{}, ProcessMultipartBody: true}},
|
||||
|
||||
{map[string]string{
|
||||
luaRestyWAFAnnotation: "active",
|
||||
luaRestyWAFDebugAnnotation: "true",
|
||||
luaRestyWAFIgnoredRuleSetsAnnotation: "ruleset1, ruleset2 ruleset3, another.ruleset"},
|
||||
&Config{Mode: "ACTIVE", Debug: true, IgnoredRuleSets: []string{"ruleset1", "ruleset2", "ruleset3", "another.ruleset"}}},
|
||||
luaRestyWAFAnnotation: "active",
|
||||
luaRestyWAFDebugAnnotation: "true",
|
||||
luaRestyWAFIgnoredRuleSetsAnnotation: "ruleset1, ruleset2 ruleset3, another.ruleset",
|
||||
luaRestyWAFScoreThresholdAnnotation: "10",
|
||||
luaRestyWAFAllowUnknownContentTypesAnnotation: "true"},
|
||||
&Config{Mode: "ACTIVE", Debug: true, IgnoredRuleSets: []string{"ruleset1", "ruleset2", "ruleset3", "another.ruleset"}, ScoreThreshold: 10, AllowUnknownContentTypes: true, ProcessMultipartBody: true}},
|
||||
|
||||
{map[string]string{luaRestyWAFAnnotation: "siMulate", luaRestyWAFDebugAnnotation: "true"}, &Config{Mode: "SIMULATE", Debug: true, IgnoredRuleSets: []string{}}},
|
||||
{map[string]string{luaRestyWAFAnnotation: "siMulate", luaRestyWAFDebugAnnotation: "true"}, &Config{Mode: "SIMULATE", Debug: true, IgnoredRuleSets: []string{}, ProcessMultipartBody: true}},
|
||||
{map[string]string{luaRestyWAFAnnotation: "siMulateX", luaRestyWAFDebugAnnotation: "true"}, &Config{Debug: false}},
|
||||
|
||||
{map[string]string{luaRestyWAFAnnotation: "active", luaRestyWAFProcessMultipartBody: "false"}, &Config{Mode: "ACTIVE", ProcessMultipartBody: false, IgnoredRuleSets: []string{}}},
|
||||
}
|
||||
|
||||
ing := &extensions.Ingress{
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ type Config struct {
|
|||
// AppRoot defines the Application Root that the Controller must redirect if it's in '/' context
|
||||
AppRoot string `json:"appRoot"`
|
||||
// UseRegex indicates whether or not the locations use regex paths
|
||||
UseRegex bool `json:useRegex`
|
||||
UseRegex bool `json:"useRegex"`
|
||||
}
|
||||
|
||||
// Equal tests for equality between two Redirect types
|
||||
|
|
|
|||
|
|
@ -191,7 +191,7 @@ func TestUseRegex(t *testing.T) {
|
|||
if !ok {
|
||||
t.Errorf("expected a App Context")
|
||||
}
|
||||
if redirect.UseRegex != true {
|
||||
if !redirect.UseRegex {
|
||||
t.Errorf("Unexpected value got in UseRegex")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,7 +38,8 @@ func (n NGINXController) Name() string {
|
|||
func (n *NGINXController) Check(_ *http.Request) error {
|
||||
|
||||
url := fmt.Sprintf("http://127.0.0.1:%v%v", n.cfg.ListenPorts.Status, ngxHealthPath)
|
||||
statusCode, err := simpleGet(url)
|
||||
timeout := n.cfg.HealthCheckTimeout
|
||||
statusCode, err := simpleGet(url, timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -48,7 +49,7 @@ func (n *NGINXController) Check(_ *http.Request) error {
|
|||
}
|
||||
|
||||
url = fmt.Sprintf("http://127.0.0.1:%v/is-dynamic-lb-initialized", n.cfg.ListenPorts.Status)
|
||||
statusCode, err = simpleGet(url)
|
||||
statusCode, err = simpleGet(url, timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -75,9 +76,9 @@ func (n *NGINXController) Check(_ *http.Request) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func simpleGet(url string) (int, error) {
|
||||
func simpleGet(url string, timeout time.Duration) (int, error) {
|
||||
client := &http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
Timeout: timeout * time.Second,
|
||||
Transport: &http.Transport{DisableKeepAlives: true},
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -102,10 +102,10 @@ type Configuration struct {
|
|||
// By default access logs go to /var/log/nginx/access.log
|
||||
AccessLogPath string `json:"access-log-path,omitempty"`
|
||||
|
||||
// WorkerCpuAffinity bind nginx worker processes to CPUs this will improve response latency
|
||||
// WorkerCPUAffinity bind nginx worker processes to CPUs this will improve response latency
|
||||
// http://nginx.org/en/docs/ngx_core_module.html#worker_cpu_affinity
|
||||
// By default this is disabled
|
||||
WorkerCpuAffinity string `json:"worker-cpu-affinity,omitempty"`
|
||||
WorkerCPUAffinity string `json:"worker-cpu-affinity,omitempty"`
|
||||
// ErrorLogPath sets the path of the error logs
|
||||
// http://nginx.org/en/docs/ngx_core_module.html#error_log
|
||||
// By default error logs go to /var/log/nginx/error.log
|
||||
|
|
@ -347,6 +347,10 @@ type Configuration struct {
|
|||
// http://nginx.org/en/docs/http/ngx_http_geoip_module.html
|
||||
UseGeoIP bool `json:"use-geoip,omitempty"`
|
||||
|
||||
// UseGeoIP2 enables the geoip2 module for NGINX
|
||||
// By default this is disabled
|
||||
UseGeoIP2 bool `json:"use-geoip2,omitempty"`
|
||||
|
||||
// Enables or disables the use of the NGINX Brotli Module for compression
|
||||
// https://github.com/google/ngx_brotli
|
||||
EnableBrotli bool `json:"enable-brotli,omitempty"`
|
||||
|
|
@ -438,11 +442,11 @@ type Configuration struct {
|
|||
|
||||
// If the request does not have a request-id, should we generate a random value?
|
||||
// Default: true
|
||||
GenerateRequestId bool `json:"generate-request-id,omitempty"`
|
||||
GenerateRequestID bool `json:"generate-request-id,omitempty"`
|
||||
|
||||
// Adds an X-Original-Uri header with the original request URI to the backend request
|
||||
// Default: true
|
||||
ProxyAddOriginalUriHeader bool `json:"proxy-add-original-uri-header"`
|
||||
ProxyAddOriginalURIHeader bool `json:"proxy-add-original-uri-header"`
|
||||
|
||||
// EnableOpentracing enables the nginx Opentracing extension
|
||||
// https://github.com/opentracing-contrib/nginx-opentracing
|
||||
|
|
@ -570,7 +574,7 @@ func NewDefault() Configuration {
|
|||
cfg := Configuration{
|
||||
AllowBackendServerHeader: false,
|
||||
AccessLogPath: "/var/log/nginx/access.log",
|
||||
WorkerCpuAffinity: "",
|
||||
WorkerCPUAffinity: "",
|
||||
ErrorLogPath: "/var/log/nginx/error.log",
|
||||
BlockCIDRs: defBlockEntity,
|
||||
BlockUserAgents: defBlockEntity,
|
||||
|
|
@ -587,8 +591,8 @@ func NewDefault() Configuration {
|
|||
UseForwardedHeaders: true,
|
||||
ForwardedForHeader: "X-Forwarded-For",
|
||||
ComputeFullForwardedFor: false,
|
||||
ProxyAddOriginalUriHeader: true,
|
||||
GenerateRequestId: true,
|
||||
ProxyAddOriginalURIHeader: true,
|
||||
GenerateRequestID: true,
|
||||
HTTP2MaxFieldSize: "4k",
|
||||
HTTP2MaxHeaderSize: "16k",
|
||||
HTTP2MaxRequests: 1000,
|
||||
|
|
@ -630,6 +634,7 @@ func NewDefault() Configuration {
|
|||
EnableBrotli: false,
|
||||
UseGzip: true,
|
||||
UseGeoIP: true,
|
||||
UseGeoIP2: false,
|
||||
WorkerProcesses: strconv.Itoa(runtime.NumCPU()),
|
||||
WorkerShutdownTimeout: "10s",
|
||||
LoadBalanceAlgorithm: defaultLoadBalancerAlgorithm,
|
||||
|
|
|
|||
|
|
@ -61,6 +61,7 @@ type Configuration struct {
|
|||
ForceNamespaceIsolation bool
|
||||
|
||||
DefaultHealthzURL string
|
||||
HealthCheckTimeout time.Duration
|
||||
DefaultSSLCertificate string
|
||||
|
||||
// +optional
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ package controller
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
|
@ -257,7 +256,7 @@ func (n *NGINXController) Start() {
|
|||
n.store.Run(n.stopCh)
|
||||
|
||||
if n.syncStatus != nil {
|
||||
go n.syncStatus.Run(context.Background())
|
||||
go n.syncStatus.Run()
|
||||
}
|
||||
|
||||
cmd := nginxExecCommand()
|
||||
|
|
@ -812,12 +811,7 @@ func configureCertificates(pcfg *ingress.Configuration, port int) error {
|
|||
}
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/configuration/servers", port)
|
||||
err := post(url, servers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return post(url, servers)
|
||||
}
|
||||
|
||||
func post(url string, data interface{}) error {
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ func TestMergeConfigMapToStruct(t *testing.T) {
|
|||
def.WorkerShutdownTimeout = "99s"
|
||||
def.NginxStatusIpv4Whitelist = []string{"127.0.0.1", "10.0.0.0/24"}
|
||||
def.NginxStatusIpv6Whitelist = []string{"::1", "2001::/16"}
|
||||
def.ProxyAddOriginalUriHeader = false
|
||||
def.ProxyAddOriginalURIHeader = false
|
||||
|
||||
hash, err := hashstructure.Hash(def, &hashstructure.HashOptions{
|
||||
TagName: "json",
|
||||
|
|
|
|||
|
|
@ -209,8 +209,6 @@ func buildLuaSharedDictionaries(s interface{}, disableLuaRestyWAF bool) string {
|
|||
"lua_shared_dict configuration_data 5M",
|
||||
"lua_shared_dict certificate_data 16M",
|
||||
"lua_shared_dict locks 512k",
|
||||
"lua_shared_dict balancer_ewma 1M",
|
||||
"lua_shared_dict balancer_ewma_last_touched_at 1M",
|
||||
"lua_shared_dict sticky_sessions 1M",
|
||||
}
|
||||
|
||||
|
|
@ -724,19 +722,6 @@ func buildUpstreamName(loc interface{}) string {
|
|||
return upstreamName
|
||||
}
|
||||
|
||||
// TODO: Needs Unit Tests
|
||||
func isSticky(host string, loc *ingress.Location, stickyLocations map[string][]string) bool {
|
||||
if _, ok := stickyLocations[host]; ok {
|
||||
for _, sl := range stickyLocations[host] {
|
||||
if sl == loc.Path {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func buildNextUpstream(i, r interface{}) string {
|
||||
nextUpstream, ok := i.(string)
|
||||
if !ok {
|
||||
|
|
|
|||
|
|
@ -171,7 +171,7 @@ func NewSocketCollector(pod, namespace, class string) (*SocketCollector, error)
|
|||
bytesSent: prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "bytes_sent",
|
||||
Help: "The the number of bytes sent to a client",
|
||||
Help: "The number of bytes sent to a client",
|
||||
Namespace: PrometheusNamespace,
|
||||
Buckets: prometheus.ExponentialBuckets(10, 10, 7), // 7 buckets, exponential factor of 10.
|
||||
ConstLabels: constLabels,
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ const (
|
|||
|
||||
// Sync ...
|
||||
type Sync interface {
|
||||
Run(ctx context.Context)
|
||||
Run()
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
|
|
@ -93,22 +93,102 @@ type statusSync struct {
|
|||
pod *k8s.PodInfo
|
||||
|
||||
elector *leaderelection.LeaderElector
|
||||
|
||||
// workqueue used to keep in sync the status IP/s
|
||||
// in the Ingress rules
|
||||
syncQueue *task.Queue
|
||||
}
|
||||
|
||||
// Run starts the loop to keep the status in sync
|
||||
func (s statusSync) Run(ctx context.Context) {
|
||||
s.elector.Run(ctx)
|
||||
func (s statusSync) Run() {
|
||||
// we need to use the defined ingress class to allow multiple leaders
|
||||
// in order to update information about ingress status
|
||||
electionID := fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.DefaultIngressClass)
|
||||
if s.Config.IngressClass != "" {
|
||||
electionID = fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.IngressClass)
|
||||
}
|
||||
|
||||
// start a new context
|
||||
ctx := context.Background()
|
||||
|
||||
var cancelContext context.CancelFunc
|
||||
|
||||
var newLeaderCtx = func(ctx context.Context) context.CancelFunc {
|
||||
// allow to cancel the context in case we stop being the leader
|
||||
leaderCtx, cancel := context.WithCancel(ctx)
|
||||
go s.elector.Run(leaderCtx)
|
||||
return cancel
|
||||
}
|
||||
|
||||
var stopCh chan struct{}
|
||||
callbacks := leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: func(ctx context.Context) {
|
||||
glog.V(2).Infof("I am the new status update leader")
|
||||
stopCh = make(chan struct{})
|
||||
go s.syncQueue.Run(time.Second, stopCh)
|
||||
// trigger initial sync
|
||||
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
|
||||
// when this instance is the leader we need to enqueue
|
||||
// an item to trigger the update of the Ingress status.
|
||||
wait.PollUntil(updateInterval, func() (bool, error) {
|
||||
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
|
||||
return false, nil
|
||||
}, stopCh)
|
||||
},
|
||||
OnStoppedLeading: func() {
|
||||
glog.V(2).Infof("I am not status update leader anymore")
|
||||
close(stopCh)
|
||||
|
||||
// cancel the context
|
||||
cancelContext()
|
||||
|
||||
cancelContext = newLeaderCtx(ctx)
|
||||
},
|
||||
OnNewLeader: func(identity string) {
|
||||
glog.Infof("new leader elected: %v", identity)
|
||||
},
|
||||
}
|
||||
|
||||
broadcaster := record.NewBroadcaster()
|
||||
hostname, _ := os.Hostname()
|
||||
|
||||
recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
|
||||
Component: "ingress-leader-elector",
|
||||
Host: hostname,
|
||||
})
|
||||
|
||||
lock := resourcelock.ConfigMapLock{
|
||||
ConfigMapMeta: metav1.ObjectMeta{Namespace: s.pod.Namespace, Name: electionID},
|
||||
Client: s.Config.Client.CoreV1(),
|
||||
LockConfig: resourcelock.ResourceLockConfig{
|
||||
Identity: s.pod.Name,
|
||||
EventRecorder: recorder,
|
||||
},
|
||||
}
|
||||
|
||||
ttl := 30 * time.Second
|
||||
var err error
|
||||
s.elector, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
|
||||
Lock: &lock,
|
||||
LeaseDuration: ttl,
|
||||
RenewDeadline: ttl / 2,
|
||||
RetryPeriod: ttl / 4,
|
||||
Callbacks: callbacks,
|
||||
})
|
||||
if err != nil {
|
||||
glog.Fatalf("unexpected error starting leader election: %v", err)
|
||||
}
|
||||
|
||||
cancelContext = newLeaderCtx(ctx)
|
||||
}
|
||||
|
||||
// Shutdown stop the sync. In case the instance is the leader it will remove the current IP
|
||||
// if there is no other instances running.
|
||||
func (s statusSync) Shutdown() {
|
||||
go s.syncQueue.Shutdown()
|
||||
|
||||
// remove IP from Ingress
|
||||
if !s.elector.IsLeader() {
|
||||
if s.elector != nil && !s.elector.IsLeader() {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -146,6 +226,10 @@ func (s *statusSync) sync(key interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
if s.elector != nil && !s.elector.IsLeader() {
|
||||
return fmt.Errorf("i am not the current leader. Skiping status update")
|
||||
}
|
||||
|
||||
addrs, err := s.runningAddresses()
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -173,66 +257,6 @@ func NewStatusSyncer(config Config) Sync {
|
|||
}
|
||||
st.syncQueue = task.NewCustomTaskQueue(st.sync, st.keyfunc)
|
||||
|
||||
// we need to use the defined ingress class to allow multiple leaders
|
||||
// in order to update information about ingress status
|
||||
electionID := fmt.Sprintf("%v-%v", config.ElectionID, config.DefaultIngressClass)
|
||||
if config.IngressClass != "" {
|
||||
electionID = fmt.Sprintf("%v-%v", config.ElectionID, config.IngressClass)
|
||||
}
|
||||
|
||||
var stopCh chan struct{}
|
||||
callbacks := leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: func(ctx context.Context) {
|
||||
glog.V(2).Infof("I am the new status update leader")
|
||||
stopCh = make(chan struct{})
|
||||
go st.syncQueue.Run(time.Second, stopCh)
|
||||
// when this instance is the leader we need to enqueue
|
||||
// an item to trigger the update of the Ingress status.
|
||||
wait.PollUntil(updateInterval, func() (bool, error) {
|
||||
st.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
|
||||
return false, nil
|
||||
}, stopCh)
|
||||
},
|
||||
OnStoppedLeading: func() {
|
||||
glog.V(2).Infof("I am not status update leader anymore")
|
||||
close(stopCh)
|
||||
},
|
||||
OnNewLeader: func(identity string) {
|
||||
glog.Infof("new leader elected: %v", identity)
|
||||
},
|
||||
}
|
||||
|
||||
broadcaster := record.NewBroadcaster()
|
||||
hostname, _ := os.Hostname()
|
||||
|
||||
recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
|
||||
Component: "ingress-leader-elector",
|
||||
Host: hostname,
|
||||
})
|
||||
|
||||
lock := resourcelock.ConfigMapLock{
|
||||
ConfigMapMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: electionID},
|
||||
Client: config.Client.CoreV1(),
|
||||
LockConfig: resourcelock.ResourceLockConfig{
|
||||
Identity: pod.Name,
|
||||
EventRecorder: recorder,
|
||||
},
|
||||
}
|
||||
|
||||
ttl := 30 * time.Second
|
||||
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
|
||||
Lock: &lock,
|
||||
LeaseDuration: ttl,
|
||||
RenewDeadline: ttl / 2,
|
||||
RetryPeriod: ttl / 4,
|
||||
Callbacks: callbacks,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
glog.Fatalf("unexpected error starting leader election: %v", err)
|
||||
}
|
||||
|
||||
st.elector = le
|
||||
return st
|
||||
}
|
||||
|
||||
|
|
@ -333,6 +357,13 @@ func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) {
|
|||
sort.SliceStable(newIngressPoint, lessLoadBalancerIngress(newIngressPoint))
|
||||
|
||||
for _, ing := range ings {
|
||||
curIPs := ing.Status.LoadBalancer.Ingress
|
||||
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
|
||||
if ingressSliceEqual(curIPs, newIngressPoint) {
|
||||
glog.V(3).Infof("skipping update of Ingress %v/%v (no change)", ing.Namespace, ing.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
batch.Queue(runUpdate(ing, newIngressPoint, s.Client))
|
||||
}
|
||||
|
||||
|
|
@ -347,14 +378,6 @@ func runUpdate(ing *extensions.Ingress, status []apiv1.LoadBalancerIngress,
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
curIPs := ing.Status.LoadBalancer.Ingress
|
||||
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
|
||||
|
||||
if ingressSliceEqual(status, curIPs) {
|
||||
glog.V(3).Infof("skipping update of Ingress %v/%v (no change)", ing.Namespace, ing.Name)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
ingClient := client.ExtensionsV1beta1().Ingresses(ing.Namespace)
|
||||
|
||||
currIng, err := ingClient.Get(ing.Name, metav1.GetOptions{})
|
||||
|
|
@ -398,5 +421,6 @@ func ingressSliceEqual(lhs, rhs []apiv1.LoadBalancerIngress) bool {
|
|||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@ limitations under the License.
|
|||
package status
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -298,7 +297,7 @@ func TestStatusActions(t *testing.T) {
|
|||
fk := fkSync.(statusSync)
|
||||
|
||||
// start it and wait for the election and syn actions
|
||||
go fk.Run(context.Background())
|
||||
go fk.Run()
|
||||
// wait for the election
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// execute sync
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue