Update godeps
This commit is contained in:
parent
1c8773fc98
commit
1bc383f9c5
1723 changed files with 287976 additions and 411028 deletions
240
vendor/k8s.io/kubernetes/pkg/storage/cacher.go
generated
vendored
240
vendor/k8s.io/kubernetes/pkg/storage/cacher.go
generated
vendored
|
|
@ -31,6 +31,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/conversion"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
|
@ -113,7 +114,10 @@ func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool
|
|||
}
|
||||
}
|
||||
|
||||
func (i *indexedWatchers) terminateAll() {
|
||||
func (i *indexedWatchers) terminateAll(objectType reflect.Type) {
|
||||
if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
|
||||
glog.Warningf("Terminating all watchers from cacher %v", objectType)
|
||||
}
|
||||
i.allWatchers.terminateAll()
|
||||
for index, watchers := range i.valueWatchers {
|
||||
watchers.terminateAll()
|
||||
|
|
@ -121,12 +125,21 @@ func (i *indexedWatchers) terminateAll() {
|
|||
}
|
||||
}
|
||||
|
||||
type filterObjectFunc func(string, runtime.Object) bool
|
||||
|
||||
// Cacher is responsible for serving WATCH and LIST requests for a given
|
||||
// resource from its internal cache and updating its cache in the background
|
||||
// based on the underlying storage contents.
|
||||
// Cacher implements storage.Interface (although most of the calls are just
|
||||
// delegated to the underlying storage).
|
||||
type Cacher struct {
|
||||
// HighWaterMarks for performance debugging.
|
||||
// Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms
|
||||
// See: https://golang.org/pkg/sync/atomic/ for more information
|
||||
incomingHWM HighWaterMark
|
||||
// Incoming events that should be dispatched to watchers.
|
||||
incoming chan watchCacheEvent
|
||||
|
||||
sync.RWMutex
|
||||
|
||||
// Before accessing the cacher's cache, wait for the ready to be ok.
|
||||
|
|
@ -150,9 +163,6 @@ type Cacher struct {
|
|||
// Versioner is used to handle resource versions.
|
||||
versioner Versioner
|
||||
|
||||
// keyFunc is used to get a key in the underyling storage for a given object.
|
||||
keyFunc func(runtime.Object) (string, error)
|
||||
|
||||
// triggerFunc is used for optimizing amount of watchers that needs to process
|
||||
// an incoming event.
|
||||
triggerFunc TriggerPublisherFunc
|
||||
|
|
@ -161,9 +171,6 @@ type Cacher struct {
|
|||
watcherIdx int
|
||||
watchers indexedWatchers
|
||||
|
||||
// Incoming events that should be dispatched to watchers.
|
||||
incoming chan watchCacheEvent
|
||||
|
||||
// Handling graceful termination.
|
||||
stopLock sync.RWMutex
|
||||
stopped bool
|
||||
|
|
@ -175,7 +182,7 @@ type Cacher struct {
|
|||
// internal cache and updating its cache in the background based on the given
|
||||
// configuration.
|
||||
func NewCacherFromConfig(config CacherConfig) *Cacher {
|
||||
watchCache := newWatchCache(config.CacheCapacity)
|
||||
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc)
|
||||
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||
|
||||
// Give this error when it is constructed rather than when you get the
|
||||
|
|
@ -193,7 +200,6 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
|
|||
watchCache: watchCache,
|
||||
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
|
||||
versioner: config.Versioner,
|
||||
keyFunc: config.KeyFunc,
|
||||
triggerFunc: config.TriggerPublisherFunc,
|
||||
watcherIdx: 0,
|
||||
watchers: indexedWatchers{
|
||||
|
|
@ -271,7 +277,7 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, pre
|
|||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error) {
|
||||
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
|
||||
watchRV, err := ParseWatchResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -295,17 +301,32 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
|
|||
}
|
||||
|
||||
triggerValue, triggerSupported := "", false
|
||||
// TODO: Currently we assume that in a given Cacher object, any <filter> that is
|
||||
// TODO: Currently we assume that in a given Cacher object, any <predicate> that is
|
||||
// passed here is aware of exactly the same trigger (at most one).
|
||||
// Thus, either 0 or 1 values will be returned.
|
||||
if matchValues := filter.Trigger(); len(matchValues) > 0 {
|
||||
if matchValues := pred.MatcherIndex(); len(matchValues) > 0 {
|
||||
triggerValue, triggerSupported = matchValues[0].Value, true
|
||||
}
|
||||
|
||||
// If there is triggerFunc defined, but triggerSupported is false,
|
||||
// we can't narrow the amount of events significantly at this point.
|
||||
//
|
||||
// That said, currently triggerFunc is defined only for Pods and Nodes,
|
||||
// and there is only constant number of watchers for which triggerSupported
|
||||
// is false (excluding those issues explicitly by users).
|
||||
// Thus, to reduce the risk of those watchers blocking all watchers of a
|
||||
// given resource in the system, we increase the sizes of buffers for them.
|
||||
chanSize := 10
|
||||
if c.triggerFunc != nil && !triggerSupported {
|
||||
// TODO: We should tune this value and ideally make it dependent on the
|
||||
// number of objects of a given type and/or their churn.
|
||||
chanSize = 1000
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
|
||||
watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forget)
|
||||
watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, pred), forget)
|
||||
|
||||
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
|
||||
c.watcherIdx++
|
||||
|
|
@ -313,8 +334,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
|
|||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error) {
|
||||
return c.Watch(ctx, key, resourceVersion, filter)
|
||||
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
|
||||
return c.Watch(ctx, key, resourceVersion, pred)
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
|
|
@ -323,30 +344,28 @@ func (c *Cacher) Get(ctx context.Context, key string, objPtr runtime.Object, ign
|
|||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) GetToList(ctx context.Context, key string, filter Filter, listObj runtime.Object) error {
|
||||
return c.storage.GetToList(ctx, key, filter, listObj)
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter Filter, listObj runtime.Object) error {
|
||||
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
|
||||
if resourceVersion == "" {
|
||||
// If resourceVersion is not specified, serve it from underlying
|
||||
// storage (for backward compatibility).
|
||||
return c.storage.List(ctx, key, resourceVersion, filter, listObj)
|
||||
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
|
||||
}
|
||||
|
||||
// If resourceVersion is specified, serve it from cache.
|
||||
// It's guaranteed that the returned value is at least that
|
||||
// fresh as the given resourceVersion.
|
||||
|
||||
listRV, err := ParseListResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.ready.wait()
|
||||
trace := util.NewTrace(fmt.Sprintf("cacher %v: List", c.objectType.String()))
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
|
||||
// List elements from cache, with at least 'listRV'.
|
||||
c.ready.wait()
|
||||
trace.Step("Ready")
|
||||
|
||||
// List elements with at least 'listRV' from cache.
|
||||
listPtr, err := meta.GetItemsPtr(listObj)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -355,19 +374,21 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f
|
|||
if err != nil || listVal.Kind() != reflect.Slice {
|
||||
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
||||
}
|
||||
filterFunc := filterFunction(key, c.keyFunc, filter)
|
||||
filter := filterFunction(key, pred)
|
||||
|
||||
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV)
|
||||
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to wait for fresh list: %v", err)
|
||||
}
|
||||
for _, obj := range objs {
|
||||
object, ok := obj.(runtime.Object)
|
||||
trace.Step("Got from cache")
|
||||
|
||||
if exists {
|
||||
elem, ok := obj.(*storeElement)
|
||||
if !ok {
|
||||
return fmt.Errorf("non runtime.Object returned from storage: %v", obj)
|
||||
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
||||
}
|
||||
if filterFunc.Filter(object) {
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
|
||||
if filter(elem.Key, elem.Object) {
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
||||
}
|
||||
}
|
||||
if c.versioner != nil {
|
||||
|
|
@ -379,7 +400,83 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f
|
|||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, preconditions *Preconditions, tryUpdate UpdateFunc) error {
|
||||
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
|
||||
if resourceVersion == "" {
|
||||
// If resourceVersion is not specified, serve it from underlying
|
||||
// storage (for backward compatibility).
|
||||
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
|
||||
}
|
||||
|
||||
// If resourceVersion is specified, serve it from cache.
|
||||
// It's guaranteed that the returned value is at least that
|
||||
// fresh as the given resourceVersion.
|
||||
listRV, err := ParseListResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
trace := util.NewTrace(fmt.Sprintf("cacher %v: List", c.objectType.String()))
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
|
||||
c.ready.wait()
|
||||
trace.Step("Ready")
|
||||
|
||||
// List elements with at least 'listRV' from cache.
|
||||
listPtr, err := meta.GetItemsPtr(listObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
listVal, err := conversion.EnforcePtr(listPtr)
|
||||
if err != nil || listVal.Kind() != reflect.Slice {
|
||||
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
||||
}
|
||||
filter := filterFunction(key, pred)
|
||||
|
||||
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to wait for fresh list: %v", err)
|
||||
}
|
||||
trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
|
||||
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
|
||||
// Resize the slice appropriately, since we already know that none
|
||||
// of the elements will be filtered out.
|
||||
listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))
|
||||
trace.Step("Resized result")
|
||||
}
|
||||
for _, obj := range objs {
|
||||
elem, ok := obj.(*storeElement)
|
||||
if !ok {
|
||||
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
||||
}
|
||||
if filter(elem.Key, elem.Object) {
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
||||
}
|
||||
}
|
||||
trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len()))
|
||||
if c.versioner != nil {
|
||||
if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) GuaranteedUpdate(
|
||||
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
|
||||
preconditions *Preconditions, tryUpdate UpdateFunc, _ ...runtime.Object) error {
|
||||
// Ignore the suggestion and try to pass down the current version of the object
|
||||
// read from cache.
|
||||
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
|
||||
glog.Errorf("GetByKey returned error: %v", err)
|
||||
} else if exists {
|
||||
currObj, copyErr := api.Scheme.Copy(elem.(*storeElement).Object)
|
||||
if copyErr == nil {
|
||||
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, currObj)
|
||||
}
|
||||
glog.Errorf("couldn't copy object: %v", copyErr)
|
||||
}
|
||||
// If we couldn't get the object, fallback to no-suggestion.
|
||||
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
|
||||
}
|
||||
|
||||
|
|
@ -409,13 +506,11 @@ func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
|
|||
return result, len(result) > 0
|
||||
}
|
||||
|
||||
// TODO: Most probably splitting this method to a separate thread will visibily
|
||||
// improve throughput of our watch machinery. So what we should do is to:
|
||||
// - OnEvent handler simply put an element to channel
|
||||
// - processEvent be another goroutine processing events from that channel
|
||||
// Additionally, if we make this channel buffered, cacher will be more resistant
|
||||
// to single watchers being slow - see cacheWatcher::add method.
|
||||
func (c *Cacher) processEvent(event watchCacheEvent) {
|
||||
if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
|
||||
// Monitor if this gets backed up, and how much.
|
||||
glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
|
||||
}
|
||||
c.incoming <- event
|
||||
}
|
||||
|
||||
|
|
@ -466,10 +561,9 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
|
|||
}
|
||||
|
||||
func (c *Cacher) terminateAllWatchers() {
|
||||
glog.Warningf("Terminating all watchers from cacher %v", c.objectType)
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.watchers.terminateAll()
|
||||
c.watchers.terminateAll(c.objectType)
|
||||
}
|
||||
|
||||
func (c *Cacher) isStopped() bool {
|
||||
|
|
@ -498,19 +592,15 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
|
|||
}
|
||||
}
|
||||
|
||||
func filterFunction(key string, keyFunc func(runtime.Object) (string, error), filter Filter) Filter {
|
||||
filterFunc := func(obj runtime.Object) bool {
|
||||
objKey, err := keyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("invalid object for filter: %v", obj)
|
||||
return false
|
||||
}
|
||||
func filterFunction(key string, p SelectionPredicate) filterObjectFunc {
|
||||
f := SimpleFilter(p)
|
||||
filterFunc := func(objKey string, obj runtime.Object) bool {
|
||||
if !hasPathPrefix(objKey, key) {
|
||||
return false
|
||||
}
|
||||
return filter.Filter(obj)
|
||||
return f(obj)
|
||||
}
|
||||
return NewSimpleFilter(filterFunc, filter.Trigger)
|
||||
return filterFunc
|
||||
}
|
||||
|
||||
// Returns resource version to which the underlying cache is synced.
|
||||
|
|
@ -599,15 +689,15 @@ type cacheWatcher struct {
|
|||
sync.Mutex
|
||||
input chan watchCacheEvent
|
||||
result chan watch.Event
|
||||
filter Filter
|
||||
filter filterObjectFunc
|
||||
stopped bool
|
||||
forget func(bool)
|
||||
}
|
||||
|
||||
func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter Filter, forget func(bool)) *cacheWatcher {
|
||||
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter filterObjectFunc, forget func(bool)) *cacheWatcher {
|
||||
watcher := &cacheWatcher{
|
||||
input: make(chan watchCacheEvent, 10),
|
||||
result: make(chan watch.Event, 10),
|
||||
input: make(chan watchCacheEvent, chanSize),
|
||||
result: make(chan watch.Event, chanSize),
|
||||
filter: filter,
|
||||
stopped: false,
|
||||
forget: forget,
|
||||
|
|
@ -649,7 +739,11 @@ func (c *cacheWatcher) add(event *watchCacheEvent) {
|
|||
// OK, block sending, but only for up to 5 seconds.
|
||||
// cacheWatcher.add is called very often, so arrange
|
||||
// to reuse timers instead of constantly allocating.
|
||||
startTime := time.Now()
|
||||
trace := util.NewTrace(
|
||||
fmt.Sprintf("cacheWatcher %v: waiting for add (initial result size %v)",
|
||||
reflect.TypeOf(event.Object).String(), len(c.result)))
|
||||
defer trace.LogIfLong(50 * time.Millisecond)
|
||||
|
||||
const timeout = 5 * time.Second
|
||||
t, ok := timerPool.Get().(*time.Timer)
|
||||
if ok {
|
||||
|
|
@ -674,14 +768,14 @@ func (c *cacheWatcher) add(event *watchCacheEvent) {
|
|||
c.forget(false)
|
||||
c.stop()
|
||||
}
|
||||
glog.V(2).Infof("cacheWatcher add function blocked processing for %v", time.Since(startTime))
|
||||
}
|
||||
|
||||
func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
|
||||
curObjPasses := event.Type != watch.Deleted && c.filter.Filter(event.Object)
|
||||
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
|
||||
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
||||
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.Object)
|
||||
oldObjPasses := false
|
||||
if event.PrevObject != nil {
|
||||
oldObjPasses = c.filter.Filter(event.PrevObject)
|
||||
oldObjPasses = c.filter(event.Key, event.PrevObject)
|
||||
}
|
||||
if !curObjPasses && !oldObjPasses {
|
||||
// Watcher is not interested in that object.
|
||||
|
|
@ -706,9 +800,33 @@ func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
|
|||
func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
// Check how long we are processing initEvents.
|
||||
// As long as these are not processed, we are not processing
|
||||
// any incoming events, so if it takes long, we may actually
|
||||
// block all watchers for some time.
|
||||
// TODO: From the logs it seems that there happens processing
|
||||
// times even up to 1s which is very long. However, this doesn't
|
||||
// depend that much on the number of initEvents. E.g. from the
|
||||
// 2000-node Kubemark run we have logs like this, e.g.:
|
||||
// ... processing 13862 initEvents took 66.808689ms
|
||||
// ... processing 14040 initEvents took 993.532539ms
|
||||
// We should understand what is blocking us in those cases (e.g.
|
||||
// is it lack of CPU, network, or sth else) and potentially
|
||||
// consider increase size of result buffer in those cases.
|
||||
const initProcessThreshold = 500 * time.Millisecond
|
||||
startTime := time.Now()
|
||||
for _, event := range initEvents {
|
||||
c.sendWatchCacheEvent(event)
|
||||
c.sendWatchCacheEvent(&event)
|
||||
}
|
||||
processingTime := time.Since(startTime)
|
||||
if processingTime > initProcessThreshold {
|
||||
objType := "<null>"
|
||||
if len(initEvents) > 0 {
|
||||
objType = reflect.TypeOf(initEvents[0].Object).String()
|
||||
}
|
||||
glog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
|
||||
}
|
||||
|
||||
defer close(c.result)
|
||||
defer c.Stop()
|
||||
for {
|
||||
|
|
@ -718,7 +836,7 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin
|
|||
}
|
||||
// only send events newer than resourceVersion
|
||||
if event.ResourceVersion > resourceVersion {
|
||||
c.sendWatchCacheEvent(event)
|
||||
c.sendWatchCacheEvent(&event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue