Update dependencies to K8s 1.8
This commit is contained in:
parent
ba6c89672d
commit
6a59f4c9a2
1114 changed files with 160955 additions and 262845 deletions
24
vendor/k8s.io/client-go/tools/cache/BUILD
generated
vendored
24
vendor/k8s.io/client-go/tools/cache/BUILD
generated
vendored
|
|
@ -1,7 +1,5 @@
|
|||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
|
|
@ -15,6 +13,7 @@ go_test(
|
|||
"delta_fifo_test.go",
|
||||
"expiration_cache_test.go",
|
||||
"fifo_test.go",
|
||||
"heap_test.go",
|
||||
"index_test.go",
|
||||
"mutation_detector_test.go",
|
||||
"processor_listener_test.go",
|
||||
|
|
@ -24,7 +23,6 @@ go_test(
|
|||
"undelta_store_test.go",
|
||||
],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/github.com/google/gofuzz:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
|
|
@ -50,6 +48,7 @@ go_library(
|
|||
"expiration_cache_fakes.go",
|
||||
"fake_custom_store.go",
|
||||
"fifo.go",
|
||||
"heap.go",
|
||||
"index.go",
|
||||
"listers.go",
|
||||
"listwatch.go",
|
||||
|
|
@ -62,9 +61,9 @@ go_library(
|
|||
"thread_safe_store.go",
|
||||
"undelta_store.go",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/golang.org/x/net/context:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
|
|
@ -81,5 +80,22 @@ go_library(
|
|||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/pager:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [
|
||||
":package-srcs",
|
||||
"//staging/src/k8s.io/client-go/tools/cache/testing:all-srcs",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
|
|
|||
10
vendor/k8s.io/client-go/tools/cache/OWNERS
generated
vendored
10
vendor/k8s.io/client-go/tools/cache/OWNERS
generated
vendored
|
|
@ -1,3 +1,12 @@
|
|||
approvers:
|
||||
- thockin
|
||||
- lavalamp
|
||||
- smarterclayton
|
||||
- wojtek-t
|
||||
- deads2k
|
||||
- caesarxuchao
|
||||
- liggitt
|
||||
- ncdc
|
||||
reviewers:
|
||||
- thockin
|
||||
- lavalamp
|
||||
|
|
@ -38,3 +47,4 @@ reviewers:
|
|||
- mqliang
|
||||
- feihujiang
|
||||
- sdminonne
|
||||
- ncdc
|
||||
|
|
|
|||
323
vendor/k8s.io/client-go/tools/cache/heap.go
generated
vendored
Normal file
323
vendor/k8s.io/client-go/tools/cache/heap.go
generated
vendored
Normal file
|
|
@ -0,0 +1,323 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// This file implements a heap data structure.
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
closedMsg = "heap is closed"
|
||||
)
|
||||
|
||||
type LessFunc func(interface{}, interface{}) bool
|
||||
type heapItem struct {
|
||||
obj interface{} // The object which is stored in the heap.
|
||||
index int // The index of the object's key in the Heap.queue.
|
||||
}
|
||||
|
||||
type itemKeyValue struct {
|
||||
key string
|
||||
obj interface{}
|
||||
}
|
||||
|
||||
// heapData is an internal struct that implements the standard heap interface
|
||||
// and keeps the data stored in the heap.
|
||||
type heapData struct {
|
||||
// items is a map from key of the objects to the objects and their index.
|
||||
// We depend on the property that items in the map are in the queue and vice versa.
|
||||
items map[string]*heapItem
|
||||
// queue implements a heap data structure and keeps the order of elements
|
||||
// according to the heap invariant. The queue keeps the keys of objects stored
|
||||
// in "items".
|
||||
queue []string
|
||||
|
||||
// keyFunc is used to make the key used for queued item insertion and retrieval, and
|
||||
// should be deterministic.
|
||||
keyFunc KeyFunc
|
||||
// lessFunc is used to compare two objects in the heap.
|
||||
lessFunc LessFunc
|
||||
}
|
||||
|
||||
var (
|
||||
_ = heap.Interface(&heapData{}) // heapData is a standard heap
|
||||
)
|
||||
|
||||
// Less compares two objects and returns true if the first one should go
|
||||
// in front of the second one in the heap.
|
||||
func (h *heapData) Less(i, j int) bool {
|
||||
if i > len(h.queue) || j > len(h.queue) {
|
||||
return false
|
||||
}
|
||||
itemi, ok := h.items[h.queue[i]]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
itemj, ok := h.items[h.queue[j]]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return h.lessFunc(itemi.obj, itemj.obj)
|
||||
}
|
||||
|
||||
// Len returns the number of items in the Heap.
|
||||
func (h *heapData) Len() int { return len(h.queue) }
|
||||
|
||||
// Swap implements swapping of two elements in the heap. This is a part of standard
|
||||
// heap interface and should never be called directly.
|
||||
func (h *heapData) Swap(i, j int) {
|
||||
h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
|
||||
item := h.items[h.queue[i]]
|
||||
item.index = i
|
||||
item = h.items[h.queue[j]]
|
||||
item.index = j
|
||||
}
|
||||
|
||||
// Push is supposed to be called by heap.Push only.
|
||||
func (h *heapData) Push(kv interface{}) {
|
||||
keyValue := kv.(*itemKeyValue)
|
||||
n := len(h.queue)
|
||||
h.items[keyValue.key] = &heapItem{keyValue.obj, n}
|
||||
h.queue = append(h.queue, keyValue.key)
|
||||
}
|
||||
|
||||
// Pop is supposed to be called by heap.Pop only.
|
||||
func (h *heapData) Pop() interface{} {
|
||||
key := h.queue[len(h.queue)-1]
|
||||
h.queue = h.queue[0 : len(h.queue)-1]
|
||||
item, ok := h.items[key]
|
||||
if !ok {
|
||||
// This is an error
|
||||
return nil
|
||||
}
|
||||
delete(h.items, key)
|
||||
return item.obj
|
||||
}
|
||||
|
||||
// Heap is a thread-safe producer/consumer queue that implements a heap data structure.
|
||||
// It can be used to implement priority queues and similar data structures.
|
||||
type Heap struct {
|
||||
lock sync.RWMutex
|
||||
cond sync.Cond
|
||||
|
||||
// data stores objects and has a queue that keeps their ordering according
|
||||
// to the heap invariant.
|
||||
data *heapData
|
||||
|
||||
// closed indicates that the queue is closed.
|
||||
// It is mainly used to let Pop() exit its control loop while waiting for an item.
|
||||
closed bool
|
||||
}
|
||||
|
||||
// Close the Heap and signals condition variables that may be waiting to pop
|
||||
// items from the heap.
|
||||
func (h *Heap) Close() {
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
h.closed = true
|
||||
h.cond.Broadcast()
|
||||
}
|
||||
|
||||
// Add inserts an item, and puts it in the queue. The item is updated if it
|
||||
// already exists.
|
||||
func (h *Heap) Add(obj interface{}) error {
|
||||
key, err := h.data.keyFunc(obj)
|
||||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
if h.closed {
|
||||
return fmt.Errorf(closedMsg)
|
||||
}
|
||||
if _, exists := h.data.items[key]; exists {
|
||||
h.data.items[key].obj = obj
|
||||
heap.Fix(h.data, h.data.items[key].index)
|
||||
} else {
|
||||
h.addIfNotPresentLocked(key, obj)
|
||||
}
|
||||
h.cond.Broadcast()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Adds all the items in the list to the queue and then signals the condition
|
||||
// variable. It is useful when the caller would like to add all of the items
|
||||
// to the queue before consumer starts processing them.
|
||||
func (h *Heap) BulkAdd(list []interface{}) error {
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
if h.closed {
|
||||
return fmt.Errorf(closedMsg)
|
||||
}
|
||||
for _, obj := range list {
|
||||
key, err := h.data.keyFunc(obj)
|
||||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
if _, exists := h.data.items[key]; exists {
|
||||
h.data.items[key].obj = obj
|
||||
heap.Fix(h.data, h.data.items[key].index)
|
||||
} else {
|
||||
h.addIfNotPresentLocked(key, obj)
|
||||
}
|
||||
}
|
||||
h.cond.Broadcast()
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddIfNotPresent inserts an item, and puts it in the queue. If an item with
|
||||
// the key is present in the map, no changes is made to the item.
|
||||
//
|
||||
// This is useful in a single producer/consumer scenario so that the consumer can
|
||||
// safely retry items without contending with the producer and potentially enqueueing
|
||||
// stale items.
|
||||
func (h *Heap) AddIfNotPresent(obj interface{}) error {
|
||||
id, err := h.data.keyFunc(obj)
|
||||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
if h.closed {
|
||||
return fmt.Errorf(closedMsg)
|
||||
}
|
||||
h.addIfNotPresentLocked(id, obj)
|
||||
h.cond.Broadcast()
|
||||
return nil
|
||||
}
|
||||
|
||||
// addIfNotPresentLocked assumes the lock is already held and adds the the provided
|
||||
// item to the queue if it does not already exist.
|
||||
func (h *Heap) addIfNotPresentLocked(key string, obj interface{}) {
|
||||
if _, exists := h.data.items[key]; exists {
|
||||
return
|
||||
}
|
||||
heap.Push(h.data, &itemKeyValue{key, obj})
|
||||
}
|
||||
|
||||
// Update is the same as Add in this implementation. When the item does not
|
||||
// exist, it is added.
|
||||
func (h *Heap) Update(obj interface{}) error {
|
||||
return h.Add(obj)
|
||||
}
|
||||
|
||||
// Delete removes an item.
|
||||
func (h *Heap) Delete(obj interface{}) error {
|
||||
key, err := h.data.keyFunc(obj)
|
||||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
if item, ok := h.data.items[key]; ok {
|
||||
heap.Remove(h.data, item.index)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("object not found")
|
||||
}
|
||||
|
||||
// Pop waits until an item is ready. If multiple items are
|
||||
// ready, they are returned in the order given by Heap.data.lessFunc.
|
||||
func (h *Heap) Pop() (interface{}, error) {
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
for len(h.data.queue) == 0 {
|
||||
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
|
||||
// When Close() is called, the h.closed is set and the condition is broadcast,
|
||||
// which causes this loop to continue and return from the Pop().
|
||||
if h.closed {
|
||||
return nil, fmt.Errorf("heap is closed")
|
||||
}
|
||||
h.cond.Wait()
|
||||
}
|
||||
obj := heap.Pop(h.data)
|
||||
if obj != nil {
|
||||
return obj, nil
|
||||
} else {
|
||||
return nil, fmt.Errorf("object was removed from heap data")
|
||||
}
|
||||
}
|
||||
|
||||
// List returns a list of all the items.
|
||||
func (h *Heap) List() []interface{} {
|
||||
h.lock.RLock()
|
||||
defer h.lock.RUnlock()
|
||||
list := make([]interface{}, 0, len(h.data.items))
|
||||
for _, item := range h.data.items {
|
||||
list = append(list, item.obj)
|
||||
}
|
||||
return list
|
||||
}
|
||||
|
||||
// ListKeys returns a list of all the keys of the objects currently in the Heap.
|
||||
func (h *Heap) ListKeys() []string {
|
||||
h.lock.RLock()
|
||||
defer h.lock.RUnlock()
|
||||
list := make([]string, 0, len(h.data.items))
|
||||
for key := range h.data.items {
|
||||
list = append(list, key)
|
||||
}
|
||||
return list
|
||||
}
|
||||
|
||||
// Get returns the requested item, or sets exists=false.
|
||||
func (h *Heap) Get(obj interface{}) (interface{}, bool, error) {
|
||||
key, err := h.data.keyFunc(obj)
|
||||
if err != nil {
|
||||
return nil, false, KeyError{obj, err}
|
||||
}
|
||||
return h.GetByKey(key)
|
||||
}
|
||||
|
||||
// GetByKey returns the requested item, or sets exists=false.
|
||||
func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
|
||||
h.lock.RLock()
|
||||
defer h.lock.RUnlock()
|
||||
item, exists := h.data.items[key]
|
||||
if !exists {
|
||||
return nil, false, nil
|
||||
}
|
||||
return item.obj, true, nil
|
||||
}
|
||||
|
||||
// IsClosed returns true if the queue is closed.
|
||||
func (h *Heap) IsClosed() bool {
|
||||
h.lock.RLock()
|
||||
defer h.lock.RUnlock()
|
||||
if h.closed {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// NewHeap returns a Heap which can be used to queue up items to process.
|
||||
func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
|
||||
h := &Heap{
|
||||
data: &heapData{
|
||||
items: map[string]*heapItem{},
|
||||
queue: []string{},
|
||||
keyFunc: keyFn,
|
||||
lessFunc: lessFn,
|
||||
},
|
||||
}
|
||||
h.cond.L = &h.lock
|
||||
return h
|
||||
}
|
||||
11
vendor/k8s.io/client-go/tools/cache/listwatch.go
generated
vendored
11
vendor/k8s.io/client-go/tools/cache/listwatch.go
generated
vendored
|
|
@ -19,12 +19,15 @@ package cache
|
|||
import (
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/pager"
|
||||
)
|
||||
|
||||
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
|
||||
|
|
@ -48,6 +51,9 @@ type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
|
|||
type ListWatch struct {
|
||||
ListFunc ListFunc
|
||||
WatchFunc WatchFunc
|
||||
// DisableChunking requests no chunking for this list watcher. It has no effect in Kubernetes 1.8, but in
|
||||
// 1.9 will allow a controller to opt out of chunking.
|
||||
DisableChunking bool
|
||||
}
|
||||
|
||||
// Getter interface knows how to access Get method from RESTClient.
|
||||
|
|
@ -87,6 +93,11 @@ func timeoutFromListOptions(options metav1.ListOptions) time.Duration {
|
|||
|
||||
// List a set of apiserver resources
|
||||
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
|
||||
// chunking will become the default for list watchers starting in Kubernetes 1.9, unless
|
||||
// otherwise disabled.
|
||||
if false && !lw.DisableChunking {
|
||||
return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)
|
||||
}
|
||||
return lw.ListFunc(options)
|
||||
}
|
||||
|
||||
|
|
|
|||
14
vendor/k8s.io/client-go/tools/cache/reflector.go
generated
vendored
14
vendor/k8s.io/client-go/tools/cache/reflector.go
generated
vendored
|
|
@ -30,6 +30,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
|
|
@ -98,12 +99,17 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn
|
|||
return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
|
||||
}
|
||||
|
||||
// reflectorDisambiguator is used to disambiguate started reflectors.
|
||||
// initialized to an unstable value to ensure meaning isn't attributed to the suffix.
|
||||
var reflectorDisambiguator = int64(time.Now().UnixNano() % 12345)
|
||||
|
||||
// NewNamedReflector same as NewReflector, but with a specified name for logging
|
||||
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
|
||||
reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
|
||||
r := &Reflector{
|
||||
name: name,
|
||||
// we need this to be unique per process (some names are still the same)but obvious who it belongs to
|
||||
metrics: newReflectorMetrics(makeValidPromethusMetricName(fmt.Sprintf("reflector_"+name+"_%07d", rand.Intn(1000000)))),
|
||||
metrics: newReflectorMetrics(makeValidPromethusMetricName(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
|
||||
listerWatcher: lw,
|
||||
store: store,
|
||||
expectedType: reflect.TypeOf(expectedType),
|
||||
|
|
@ -233,8 +239,6 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
|
|||
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
|
||||
var resourceVersion string
|
||||
resyncCh, cleanup := r.resyncChan()
|
||||
defer cleanup()
|
||||
|
||||
// Explicitly set "0" as resource version - it's fine for the List()
|
||||
// to be served from cache and potentially be delayed relative to
|
||||
|
|
@ -266,6 +270,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||
cancelCh := make(chan struct{})
|
||||
defer close(cancelCh)
|
||||
go func() {
|
||||
resyncCh, cleanup := r.resyncChan()
|
||||
defer func() {
|
||||
cleanup() // Call the last one written into cleanup
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-resyncCh:
|
||||
|
|
|
|||
170
vendor/k8s.io/client-go/tools/cache/shared_informer.go
generated
vendored
170
vendor/k8s.io/client-go/tools/cache/shared_informer.go
generated
vendored
|
|
@ -138,16 +138,12 @@ type sharedIndexInformer struct {
|
|||
// clock allows for testability
|
||||
clock clock.Clock
|
||||
|
||||
started bool
|
||||
startedLock sync.Mutex
|
||||
started, stopped bool
|
||||
startedLock sync.Mutex
|
||||
|
||||
// blockDeltas gives a way to stop all event distribution so that a late event handler
|
||||
// can safely join the shared informer.
|
||||
blockDeltas sync.Mutex
|
||||
// stopCh is the channel used to stop the main Run process. We have to track it so that
|
||||
// late joiners can have a proper stop
|
||||
stopCh <-chan struct{}
|
||||
wg wait.Group
|
||||
}
|
||||
|
||||
// dummyController hides the fact that a SharedInformer is different from a dedicated one
|
||||
|
|
@ -205,23 +201,25 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
|
|||
|
||||
s.controller = New(cfg)
|
||||
s.controller.(*controller).clock = s.clock
|
||||
s.stopCh = stopCh
|
||||
s.started = true
|
||||
}()
|
||||
|
||||
defer s.wg.Wait()
|
||||
// Separate stop channel because Processor should be stopped strictly after controller
|
||||
processorStopCh := make(chan struct{})
|
||||
var wg wait.Group
|
||||
defer wg.Wait() // Wait for Processor to stop
|
||||
defer close(processorStopCh) // Tell Processor to stop
|
||||
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
|
||||
wg.StartWithChannel(processorStopCh, s.processor.run)
|
||||
|
||||
s.wg.StartWithChannel(stopCh, s.cacheMutationDetector.Run)
|
||||
s.wg.StartWithChannel(stopCh, s.processor.run)
|
||||
defer func() {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
s.stopped = true // Don't want any new listeners
|
||||
}()
|
||||
s.controller.Run(stopCh)
|
||||
}
|
||||
|
||||
func (s *sharedIndexInformer) isStarted() bool {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
return s.started
|
||||
}
|
||||
|
||||
func (s *sharedIndexInformer) HasSynced() bool {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
|
|
@ -290,6 +288,11 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
|||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
|
||||
if s.stopped {
|
||||
glog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
|
||||
return
|
||||
}
|
||||
|
||||
if resyncPeriod > 0 {
|
||||
if resyncPeriod < minimumResyncPeriod {
|
||||
glog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
|
||||
|
|
@ -325,14 +328,9 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
|||
s.blockDeltas.Lock()
|
||||
defer s.blockDeltas.Unlock()
|
||||
|
||||
s.processor.addListener(listener)
|
||||
|
||||
s.wg.StartWithChannel(s.stopCh, listener.run)
|
||||
s.wg.StartWithChannel(s.stopCh, listener.pop)
|
||||
|
||||
items := s.indexer.List()
|
||||
for i := range items {
|
||||
listener.add(addNotification{newObj: items[i]})
|
||||
s.processor.addAndStartListener(listener)
|
||||
for _, item := range s.indexer.List() {
|
||||
listener.add(addNotification{newObj: item})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -372,12 +370,26 @@ type sharedProcessor struct {
|
|||
listeners []*processorListener
|
||||
syncingListeners []*processorListener
|
||||
clock clock.Clock
|
||||
wg wait.Group
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addAndStartListener(listener *processorListener) {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
p.addListenerLocked(listener)
|
||||
p.wg.Start(listener.run)
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addListener(listener *processorListener) {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
p.addListenerLocked(listener)
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
|
||||
p.listeners = append(p.listeners, listener)
|
||||
p.syncingListeners = append(p.syncingListeners, listener)
|
||||
}
|
||||
|
|
@ -398,16 +410,21 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
|
|||
}
|
||||
|
||||
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
|
||||
var wg wait.Group
|
||||
func() {
|
||||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
for _, listener := range p.listeners {
|
||||
wg.StartWithChannel(stopCh, listener.run)
|
||||
wg.StartWithChannel(stopCh, listener.pop)
|
||||
p.wg.Start(listener.run)
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
<-stopCh
|
||||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
for _, listener := range p.listeners {
|
||||
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
|
||||
}
|
||||
p.wg.Wait() // Wait for all .pop() and .run() to stop
|
||||
}
|
||||
|
||||
// shouldResync queries every listener to determine if any of them need a resync, based on each
|
||||
|
|
@ -443,18 +460,8 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati
|
|||
}
|
||||
|
||||
type processorListener struct {
|
||||
// lock/cond protects access to 'pendingNotifications'.
|
||||
lock sync.RWMutex
|
||||
cond sync.Cond
|
||||
|
||||
// pendingNotifications is an unbounded slice that holds all notifications not yet distributed
|
||||
// there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
|
||||
// added until we OOM.
|
||||
// TODO This is no worse that before, since reflectors were backed by unbounded DeltaFIFOs, but
|
||||
// we should try to do something better
|
||||
pendingNotifications []interface{}
|
||||
|
||||
nextCh chan interface{}
|
||||
addCh chan interface{}
|
||||
|
||||
handler ResourceEventHandler
|
||||
|
||||
|
|
@ -472,80 +479,65 @@ type processorListener struct {
|
|||
|
||||
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time) *processorListener {
|
||||
ret := &processorListener{
|
||||
pendingNotifications: []interface{}{},
|
||||
nextCh: make(chan interface{}),
|
||||
addCh: make(chan interface{}),
|
||||
handler: handler,
|
||||
requestedResyncPeriod: requestedResyncPeriod,
|
||||
resyncPeriod: resyncPeriod,
|
||||
}
|
||||
|
||||
ret.cond.L = &ret.lock
|
||||
|
||||
ret.determineNextResync(now)
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func (p *processorListener) add(notification interface{}) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
p.pendingNotifications = append(p.pendingNotifications, notification)
|
||||
p.cond.Broadcast()
|
||||
p.addCh <- notification
|
||||
}
|
||||
|
||||
func (p *processorListener) pop(stopCh <-chan struct{}) {
|
||||
func (p *processorListener) pop() {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer close(p.nextCh) // Tell .run() to stop
|
||||
|
||||
// pendingNotifications is an unbounded slice that holds all notifications not yet distributed
|
||||
// there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
|
||||
// added until we OOM.
|
||||
// TODO This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
|
||||
// we should try to do something better
|
||||
var pendingNotifications []interface{}
|
||||
var nextCh chan<- interface{}
|
||||
var notification interface{}
|
||||
for {
|
||||
blockingGet := func() (interface{}, bool) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
for len(p.pendingNotifications) == 0 {
|
||||
// check if we're shutdown
|
||||
select {
|
||||
case <-stopCh:
|
||||
return nil, true
|
||||
default:
|
||||
}
|
||||
p.cond.Wait()
|
||||
}
|
||||
|
||||
nt := p.pendingNotifications[0]
|
||||
p.pendingNotifications = p.pendingNotifications[1:]
|
||||
return nt, false
|
||||
}
|
||||
|
||||
notification, stopped := blockingGet()
|
||||
if stopped {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
case p.nextCh <- notification:
|
||||
case nextCh <- notification:
|
||||
// Notification dispatched
|
||||
if len(pendingNotifications) == 0 { // Nothing to pop
|
||||
nextCh = nil // Disable this select case
|
||||
notification = nil
|
||||
} else {
|
||||
notification = pendingNotifications[0]
|
||||
pendingNotifications[0] = nil
|
||||
pendingNotifications = pendingNotifications[1:]
|
||||
}
|
||||
case notificationToAdd, ok := <-p.addCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if notification == nil { // No notification to pop (and pendingNotifications is empty)
|
||||
// Optimize the case - skip adding to pendingNotifications
|
||||
notification = notificationToAdd
|
||||
nextCh = p.nextCh
|
||||
} else { // There is already a notification waiting to be dispatched
|
||||
pendingNotifications = append(pendingNotifications, notificationToAdd)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *processorListener) run(stopCh <-chan struct{}) {
|
||||
func (p *processorListener) run() {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
for {
|
||||
var next interface{}
|
||||
select {
|
||||
case <-stopCh:
|
||||
func() {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.cond.Broadcast()
|
||||
}()
|
||||
return
|
||||
case next = <-p.nextCh:
|
||||
}
|
||||
|
||||
for next := range p.nextCh {
|
||||
switch notification := next.(type) {
|
||||
case updateNotification:
|
||||
p.handler.OnUpdate(notification.oldObj, notification.newObj)
|
||||
|
|
|
|||
17
vendor/k8s.io/client-go/tools/cache/testing/BUILD
generated
vendored
17
vendor/k8s.io/client-go/tools/cache/testing/BUILD
generated
vendored
|
|
@ -1,7 +1,5 @@
|
|||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
|
|
@ -12,7 +10,6 @@ go_test(
|
|||
name = "go_default_test",
|
||||
srcs = ["fake_controller_source_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
|
|
@ -23,7 +20,6 @@ go_test(
|
|||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["fake_controller_source.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
|
|
@ -34,3 +30,16 @@ go_library(
|
|||
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue