Move Ingress godeps to vendor/
This commit is contained in:
parent
0d4f49e50e
commit
ca620e4074
2059 changed files with 3706 additions and 213845 deletions
583
vendor/k8s.io/kubernetes/pkg/storage/cacher.go
generated
vendored
Normal file
583
vendor/k8s.io/kubernetes/pkg/storage/cacher.go
generated
vendored
Normal file
|
|
@ -0,0 +1,583 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/api/rest"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/conversion"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// CacherConfig contains the configuration for a given Cache.
|
||||
type CacherConfig struct {
|
||||
// Maximum size of the history cached in memory.
|
||||
CacheCapacity int
|
||||
|
||||
// An underlying storage.Interface.
|
||||
Storage Interface
|
||||
|
||||
// An underlying storage.Versioner.
|
||||
Versioner Versioner
|
||||
|
||||
// The Cache will be caching objects of a given Type and assumes that they
|
||||
// are all stored under ResourcePrefix directory in the underlying database.
|
||||
Type interface{}
|
||||
ResourcePrefix string
|
||||
|
||||
// KeyFunc is used to get a key in the underyling storage for a given object.
|
||||
KeyFunc func(runtime.Object) (string, error)
|
||||
|
||||
// NewList is a function that creates new empty object storing a list of
|
||||
// objects of type Type.
|
||||
NewListFunc func() runtime.Object
|
||||
}
|
||||
|
||||
// Cacher is responsible for serving WATCH and LIST requests for a given
|
||||
// resource from its internal cache and updating its cache in the background
|
||||
// based on the underlying storage contents.
|
||||
// Cacher implements storage.Interface (although most of the calls are just
|
||||
// delegated to the underlying storage).
|
||||
type Cacher struct {
|
||||
sync.RWMutex
|
||||
|
||||
// Each user-facing method that is not simply redirected to the underlying
|
||||
// storage has to read-lock on this mutex before starting any processing.
|
||||
// This is necessary to prevent users from accessing structures that are
|
||||
// uninitialized or are being repopulated right now.
|
||||
// NOTE: We cannot easily reuse the main mutex for it due to multi-threaded
|
||||
// interactions of Cacher with the underlying WatchCache. Since Cacher is
|
||||
// caling WatchCache directly and WatchCache is calling Cacher methods
|
||||
// via its OnEvent and OnReplace hooks, we explicitly assume that if mutexes
|
||||
// of both structures are held, the one from WatchCache is acquired first
|
||||
// to avoid deadlocks. Unfortunately, forcing this rule in startCaching
|
||||
// would be very difficult and introducing one more mutex seems to be much
|
||||
// easier.
|
||||
usable sync.RWMutex
|
||||
|
||||
// Underlying storage.Interface.
|
||||
storage Interface
|
||||
|
||||
// "sliding window" of recent changes of objects and the current state.
|
||||
watchCache *watchCache
|
||||
reflector *cache.Reflector
|
||||
|
||||
// Registered watchers.
|
||||
watcherIdx int
|
||||
watchers map[int]*cacheWatcher
|
||||
|
||||
// Versioner is used to handle resource versions.
|
||||
versioner Versioner
|
||||
|
||||
// keyFunc is used to get a key in the underyling storage for a given object.
|
||||
keyFunc func(runtime.Object) (string, error)
|
||||
|
||||
// Handling graceful termination.
|
||||
stopLock sync.RWMutex
|
||||
stopped bool
|
||||
stopCh chan struct{}
|
||||
stopWg sync.WaitGroup
|
||||
}
|
||||
|
||||
// Create a new Cacher responsible from service WATCH and LIST requests from its
|
||||
// internal cache and updating its cache in the background based on the given
|
||||
// configuration.
|
||||
func NewCacher(
|
||||
storage Interface,
|
||||
capacity int,
|
||||
versioner Versioner,
|
||||
objectType runtime.Object,
|
||||
resourcePrefix string,
|
||||
scopeStrategy rest.NamespaceScopedStrategy,
|
||||
newListFunc func() runtime.Object) Interface {
|
||||
config := CacherConfig{
|
||||
CacheCapacity: capacity,
|
||||
Storage: storage,
|
||||
Versioner: versioner,
|
||||
Type: objectType,
|
||||
ResourcePrefix: resourcePrefix,
|
||||
NewListFunc: newListFunc,
|
||||
}
|
||||
if scopeStrategy.NamespaceScoped() {
|
||||
config.KeyFunc = func(obj runtime.Object) (string, error) {
|
||||
return NamespaceKeyFunc(resourcePrefix, obj)
|
||||
}
|
||||
} else {
|
||||
config.KeyFunc = func(obj runtime.Object) (string, error) {
|
||||
return NoNamespaceKeyFunc(resourcePrefix, obj)
|
||||
}
|
||||
}
|
||||
return NewCacherFromConfig(config)
|
||||
}
|
||||
|
||||
// Create a new Cacher responsible from service WATCH and LIST requests from its
|
||||
// internal cache and updating its cache in the background based on the given
|
||||
// configuration.
|
||||
func NewCacherFromConfig(config CacherConfig) *Cacher {
|
||||
watchCache := newWatchCache(config.CacheCapacity)
|
||||
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||
|
||||
// Give this error when it is constructed rather than when you get the
|
||||
// first watch item, because it's much easier to track down that way.
|
||||
if obj, ok := config.Type.(runtime.Object); ok {
|
||||
if err := runtime.CheckCodec(config.Storage.Codec(), obj); err != nil {
|
||||
panic("storage codec doesn't seem to match given type: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
cacher := &Cacher{
|
||||
usable: sync.RWMutex{},
|
||||
storage: config.Storage,
|
||||
watchCache: watchCache,
|
||||
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
|
||||
watcherIdx: 0,
|
||||
watchers: make(map[int]*cacheWatcher),
|
||||
versioner: config.Versioner,
|
||||
keyFunc: config.KeyFunc,
|
||||
stopped: false,
|
||||
// We need to (potentially) stop both:
|
||||
// - wait.Until go-routine
|
||||
// - reflector.ListAndWatch
|
||||
// and there are no guarantees on the order that they will stop.
|
||||
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
|
||||
stopCh: make(chan struct{}),
|
||||
stopWg: sync.WaitGroup{},
|
||||
}
|
||||
// See startCaching method for explanation and where this is unlocked.
|
||||
cacher.usable.Lock()
|
||||
watchCache.SetOnEvent(cacher.processEvent)
|
||||
|
||||
stopCh := cacher.stopCh
|
||||
cacher.stopWg.Add(1)
|
||||
go func() {
|
||||
defer cacher.stopWg.Done()
|
||||
wait.Until(
|
||||
func() {
|
||||
if !cacher.isStopped() {
|
||||
cacher.startCaching(stopCh)
|
||||
}
|
||||
}, time.Second, stopCh,
|
||||
)
|
||||
}()
|
||||
return cacher
|
||||
}
|
||||
|
||||
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
|
||||
// The 'usable' lock is always 'RLock'able when it is safe to use the cache.
|
||||
// It is safe to use the cache after a successful list until a disconnection.
|
||||
// We start with usable (write) locked. The below OnReplace function will
|
||||
// unlock it after a successful list. The below defer will then re-lock
|
||||
// it when this function exits (always due to disconnection), only if
|
||||
// we actually got a successful list. This cycle will repeat as needed.
|
||||
successfulList := false
|
||||
c.watchCache.SetOnReplace(func() {
|
||||
successfulList = true
|
||||
c.usable.Unlock()
|
||||
})
|
||||
defer func() {
|
||||
if successfulList {
|
||||
c.usable.Lock()
|
||||
}
|
||||
}()
|
||||
|
||||
c.terminateAllWatchers()
|
||||
// Note that since onReplace may be not called due to errors, we explicitly
|
||||
// need to retry it on errors under lock.
|
||||
// Also note that startCaching is called in a loop, so there's no need
|
||||
// to have another loop here.
|
||||
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
|
||||
glog.Errorf("unexpected ListAndWatch error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) Backends(ctx context.Context) []string {
|
||||
return c.storage.Backends(ctx)
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) Versioner() Versioner {
|
||||
return c.storage.Versioner()
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
|
||||
return c.storage.Create(ctx, key, obj, out, ttl)
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error {
|
||||
return c.storage.Delete(ctx, key, out, preconditions)
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) {
|
||||
watchRV, err := ParseWatchResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Do NOT allow Watch to start when the underlying structures are not propagated.
|
||||
c.usable.RLock()
|
||||
defer c.usable.RUnlock()
|
||||
|
||||
// We explicitly use thread unsafe version and do locking ourself to ensure that
|
||||
// no new events will be processed in the meantime. The watchCache will be unlocked
|
||||
// on return from this function.
|
||||
// Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
|
||||
// underlying watchCache is calling processEvent under its lock.
|
||||
c.watchCache.RLock()
|
||||
defer c.watchCache.RUnlock()
|
||||
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
|
||||
c.watchers[c.watcherIdx] = watcher
|
||||
c.watcherIdx++
|
||||
return watcher, nil
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) {
|
||||
return c.Watch(ctx, key, resourceVersion, filter)
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error {
|
||||
return c.storage.Get(ctx, key, objPtr, ignoreNotFound)
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error {
|
||||
return c.storage.GetToList(ctx, key, filter, listObj)
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error {
|
||||
if resourceVersion == "" {
|
||||
// If resourceVersion is not specified, serve it from underlying
|
||||
// storage (for backward compatibility).
|
||||
return c.storage.List(ctx, key, resourceVersion, filter, listObj)
|
||||
}
|
||||
|
||||
// If resourceVersion is specified, serve it from cache.
|
||||
// It's guaranteed that the returned value is at least that
|
||||
// fresh as the given resourceVersion.
|
||||
|
||||
listRV, err := ParseListResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// To avoid situation when List is processed before the underlying
|
||||
// watchCache is propagated for the first time, we acquire and immediately
|
||||
// release the 'usable' lock.
|
||||
// We don't need to hold it all the time, because watchCache is thread-safe
|
||||
// and it would complicate already very difficult locking pattern.
|
||||
c.usable.RLock()
|
||||
c.usable.RUnlock()
|
||||
|
||||
// List elements from cache, with at least 'listRV'.
|
||||
listPtr, err := meta.GetItemsPtr(listObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
listVal, err := conversion.EnforcePtr(listPtr)
|
||||
if err != nil || listVal.Kind() != reflect.Slice {
|
||||
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
||||
}
|
||||
filterFunc := filterFunction(key, c.keyFunc, filter)
|
||||
|
||||
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to wait for fresh list: %v", err)
|
||||
}
|
||||
for _, obj := range objs {
|
||||
object, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
return fmt.Errorf("non runtime.Object returned from storage: %v", obj)
|
||||
}
|
||||
if filterFunc(object) {
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
|
||||
}
|
||||
}
|
||||
if c.versioner != nil {
|
||||
if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, preconditions *Preconditions, tryUpdate UpdateFunc) error {
|
||||
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) Codec() runtime.Codec {
|
||||
return c.storage.Codec()
|
||||
}
|
||||
|
||||
func (c *Cacher) processEvent(event watchCacheEvent) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
for _, watcher := range c.watchers {
|
||||
watcher.add(event)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cacher) terminateAllWatchers() {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
for key, watcher := range c.watchers {
|
||||
delete(c.watchers, key)
|
||||
watcher.stop()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cacher) isStopped() bool {
|
||||
c.stopLock.RLock()
|
||||
defer c.stopLock.RUnlock()
|
||||
return c.stopped
|
||||
}
|
||||
|
||||
func (c *Cacher) Stop() {
|
||||
c.stopLock.Lock()
|
||||
c.stopped = true
|
||||
c.stopLock.Unlock()
|
||||
close(c.stopCh)
|
||||
c.stopWg.Wait()
|
||||
}
|
||||
|
||||
func forgetWatcher(c *Cacher, index int) func(bool) {
|
||||
return func(lock bool) {
|
||||
if lock {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
}
|
||||
// It's possible that the watcher is already not in the map (e.g. in case of
|
||||
// simulaneous Stop() and terminateAllWatchers(), but it doesn't break anything.
|
||||
delete(c.watchers, index)
|
||||
}
|
||||
}
|
||||
|
||||
func filterFunction(key string, keyFunc func(runtime.Object) (string, error), filter FilterFunc) FilterFunc {
|
||||
return func(obj runtime.Object) bool {
|
||||
objKey, err := keyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("invalid object for filter: %v", obj)
|
||||
return false
|
||||
}
|
||||
if !strings.HasPrefix(objKey, key) {
|
||||
return false
|
||||
}
|
||||
return filter(obj)
|
||||
}
|
||||
}
|
||||
|
||||
// Returns resource version to which the underlying cache is synced.
|
||||
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
|
||||
// To avoid situation when LastSyncResourceVersion is processed before the
|
||||
// underlying watchCache is propagated, we acquire 'usable' lock.
|
||||
c.usable.RLock()
|
||||
defer c.usable.RUnlock()
|
||||
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
|
||||
resourceVersion := c.reflector.LastSyncResourceVersion()
|
||||
if resourceVersion == "" {
|
||||
return 0, nil
|
||||
}
|
||||
return strconv.ParseUint(resourceVersion, 10, 64)
|
||||
}
|
||||
|
||||
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
|
||||
type cacherListerWatcher struct {
|
||||
storage Interface
|
||||
resourcePrefix string
|
||||
newListFunc func() runtime.Object
|
||||
}
|
||||
|
||||
func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
|
||||
return &cacherListerWatcher{
|
||||
storage: storage,
|
||||
resourcePrefix: resourcePrefix,
|
||||
newListFunc: newListFunc,
|
||||
}
|
||||
}
|
||||
|
||||
// Implements cache.ListerWatcher interface.
|
||||
func (lw *cacherListerWatcher) List(options api.ListOptions) (runtime.Object, error) {
|
||||
list := lw.newListFunc()
|
||||
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return list, nil
|
||||
}
|
||||
|
||||
// Implements cache.ListerWatcher interface.
|
||||
func (lw *cacherListerWatcher) Watch(options api.ListOptions) (watch.Interface, error) {
|
||||
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
|
||||
}
|
||||
|
||||
// cacherWatch implements watch.Interface
|
||||
type cacheWatcher struct {
|
||||
sync.Mutex
|
||||
input chan watchCacheEvent
|
||||
result chan watch.Event
|
||||
filter FilterFunc
|
||||
stopped bool
|
||||
forget func(bool)
|
||||
}
|
||||
|
||||
func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
|
||||
watcher := &cacheWatcher{
|
||||
input: make(chan watchCacheEvent, 10),
|
||||
result: make(chan watch.Event, 10),
|
||||
filter: filter,
|
||||
stopped: false,
|
||||
forget: forget,
|
||||
}
|
||||
go watcher.process(initEvents, resourceVersion)
|
||||
return watcher
|
||||
}
|
||||
|
||||
// Implements watch.Interface.
|
||||
func (c *cacheWatcher) ResultChan() <-chan watch.Event {
|
||||
return c.result
|
||||
}
|
||||
|
||||
// Implements watch.Interface.
|
||||
func (c *cacheWatcher) Stop() {
|
||||
c.forget(true)
|
||||
c.stop()
|
||||
}
|
||||
|
||||
func (c *cacheWatcher) stop() {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
if !c.stopped {
|
||||
c.stopped = true
|
||||
close(c.input)
|
||||
}
|
||||
}
|
||||
|
||||
var timerPool sync.Pool
|
||||
|
||||
func (c *cacheWatcher) add(event watchCacheEvent) {
|
||||
// Try to send the event immediately, without blocking.
|
||||
select {
|
||||
case c.input <- event:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// OK, block sending, but only for up to 5 seconds.
|
||||
// cacheWatcher.add is called very often, so arrange
|
||||
// to reuse timers instead of constantly allocating.
|
||||
const timeout = 5 * time.Second
|
||||
t, ok := timerPool.Get().(*time.Timer)
|
||||
if ok {
|
||||
t.Reset(timeout)
|
||||
} else {
|
||||
t = time.NewTimer(timeout)
|
||||
}
|
||||
defer timerPool.Put(t)
|
||||
|
||||
select {
|
||||
case c.input <- event:
|
||||
stopped := t.Stop()
|
||||
if !stopped {
|
||||
// Consume triggered (but not yet received) timer event
|
||||
// so that future reuse does not get a spurious timeout.
|
||||
<-t.C
|
||||
}
|
||||
case <-t.C:
|
||||
// This means that we couldn't send event to that watcher.
|
||||
// Since we don't want to block on it infinitely,
|
||||
// we simply terminate it.
|
||||
c.forget(false)
|
||||
c.stop()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
|
||||
curObjPasses := event.Type != watch.Deleted && c.filter(event.Object)
|
||||
oldObjPasses := false
|
||||
if event.PrevObject != nil {
|
||||
oldObjPasses = c.filter(event.PrevObject)
|
||||
}
|
||||
if !curObjPasses && !oldObjPasses {
|
||||
// Watcher is not interested in that object.
|
||||
return
|
||||
}
|
||||
|
||||
object, err := api.Scheme.Copy(event.Object)
|
||||
if err != nil {
|
||||
glog.Errorf("unexpected copy error: %v", err)
|
||||
return
|
||||
}
|
||||
switch {
|
||||
case curObjPasses && !oldObjPasses:
|
||||
c.result <- watch.Event{Type: watch.Added, Object: object}
|
||||
case curObjPasses && oldObjPasses:
|
||||
c.result <- watch.Event{Type: watch.Modified, Object: object}
|
||||
case !curObjPasses && oldObjPasses:
|
||||
c.result <- watch.Event{Type: watch.Deleted, Object: object}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
for _, event := range initEvents {
|
||||
c.sendWatchCacheEvent(event)
|
||||
}
|
||||
defer close(c.result)
|
||||
defer c.Stop()
|
||||
for {
|
||||
event, ok := <-c.input
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// only send events newer than resourceVersion
|
||||
if event.ResourceVersion > resourceVersion {
|
||||
c.sendWatchCacheEvent(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
18
vendor/k8s.io/kubernetes/pkg/storage/doc.go
generated
vendored
Normal file
18
vendor/k8s.io/kubernetes/pkg/storage/doc.go
generated
vendored
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Interfaces for database-related operations.
|
||||
package storage
|
||||
174
vendor/k8s.io/kubernetes/pkg/storage/errors.go
generated
vendored
Normal file
174
vendor/k8s.io/kubernetes/pkg/storage/errors.go
generated
vendored
Normal file
|
|
@ -0,0 +1,174 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/kubernetes/pkg/util/validation/field"
|
||||
)
|
||||
|
||||
const (
|
||||
ErrCodeKeyNotFound int = iota + 1
|
||||
ErrCodeKeyExists
|
||||
ErrCodeResourceVersionConflicts
|
||||
ErrCodeInvalidObj
|
||||
ErrCodeUnreachable
|
||||
)
|
||||
|
||||
var errCodeToMessage = map[int]string{
|
||||
ErrCodeKeyNotFound: "key not found",
|
||||
ErrCodeKeyExists: "key exists",
|
||||
ErrCodeResourceVersionConflicts: "resource version conflicts",
|
||||
ErrCodeInvalidObj: "invalid object",
|
||||
ErrCodeUnreachable: "server unreachable",
|
||||
}
|
||||
|
||||
func NewKeyNotFoundError(key string, rv int64) *StorageError {
|
||||
return &StorageError{
|
||||
Code: ErrCodeKeyNotFound,
|
||||
Key: key,
|
||||
ResourceVersion: rv,
|
||||
}
|
||||
}
|
||||
|
||||
func NewKeyExistsError(key string, rv int64) *StorageError {
|
||||
return &StorageError{
|
||||
Code: ErrCodeKeyExists,
|
||||
Key: key,
|
||||
ResourceVersion: rv,
|
||||
}
|
||||
}
|
||||
|
||||
func NewResourceVersionConflictsError(key string, rv int64) *StorageError {
|
||||
return &StorageError{
|
||||
Code: ErrCodeResourceVersionConflicts,
|
||||
Key: key,
|
||||
ResourceVersion: rv,
|
||||
}
|
||||
}
|
||||
|
||||
func NewUnreachableError(key string, rv int64) *StorageError {
|
||||
return &StorageError{
|
||||
Code: ErrCodeUnreachable,
|
||||
Key: key,
|
||||
ResourceVersion: rv,
|
||||
}
|
||||
}
|
||||
|
||||
func NewInvalidObjError(key, msg string) *StorageError {
|
||||
return &StorageError{
|
||||
Code: ErrCodeInvalidObj,
|
||||
Key: key,
|
||||
AdditionalErrorMsg: msg,
|
||||
}
|
||||
}
|
||||
|
||||
type StorageError struct {
|
||||
Code int
|
||||
Key string
|
||||
ResourceVersion int64
|
||||
AdditionalErrorMsg string
|
||||
}
|
||||
|
||||
func (e *StorageError) Error() string {
|
||||
return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d, AdditionalErrorMsg: %s",
|
||||
errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion, e.AdditionalErrorMsg)
|
||||
}
|
||||
|
||||
// IsNotFound returns true if and only if err is "key" not found error.
|
||||
func IsNotFound(err error) bool {
|
||||
return isErrCode(err, ErrCodeKeyNotFound)
|
||||
}
|
||||
|
||||
// IsNodeExist returns true if and only if err is an node already exist error.
|
||||
func IsNodeExist(err error) bool {
|
||||
return isErrCode(err, ErrCodeKeyExists)
|
||||
}
|
||||
|
||||
// IsUnreachable returns true if and only if err indicates the server could not be reached.
|
||||
func IsUnreachable(err error) bool {
|
||||
return isErrCode(err, ErrCodeUnreachable)
|
||||
}
|
||||
|
||||
// IsTestFailed returns true if and only if err is a write conflict.
|
||||
func IsTestFailed(err error) bool {
|
||||
return isErrCode(err, ErrCodeResourceVersionConflicts, ErrCodeInvalidObj)
|
||||
}
|
||||
|
||||
// IsInvalidUID returns true if and only if err is invalid UID error
|
||||
func IsInvalidObj(err error) bool {
|
||||
return isErrCode(err, ErrCodeInvalidObj)
|
||||
}
|
||||
|
||||
func isErrCode(err error, codes ...int) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
if e, ok := err.(*StorageError); ok {
|
||||
for _, code := range codes {
|
||||
if e.Code == code {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// InvalidError is generated when an error caused by invalid API object occurs
|
||||
// in the storage package.
|
||||
type InvalidError struct {
|
||||
Errs field.ErrorList
|
||||
}
|
||||
|
||||
func (e InvalidError) Error() string {
|
||||
return e.Errs.ToAggregate().Error()
|
||||
}
|
||||
|
||||
// IsInvalidError returns true if and only if err is an InvalidError.
|
||||
func IsInvalidError(err error) bool {
|
||||
_, ok := err.(InvalidError)
|
||||
return ok
|
||||
}
|
||||
|
||||
func NewInvalidError(errors field.ErrorList) InvalidError {
|
||||
return InvalidError{errors}
|
||||
}
|
||||
|
||||
// InternalError is generated when an error occurs in the storage package, i.e.,
|
||||
// not from the underlying storage backend (e.g., etcd).
|
||||
type InternalError struct {
|
||||
Reason string
|
||||
}
|
||||
|
||||
func (e InternalError) Error() string {
|
||||
return e.Reason
|
||||
}
|
||||
|
||||
// IsInternalError returns true if and only if err is an InternalError.
|
||||
func IsInternalError(err error) bool {
|
||||
_, ok := err.(InternalError)
|
||||
return ok
|
||||
}
|
||||
|
||||
func NewInternalError(reason string) InternalError {
|
||||
return InternalError{reason}
|
||||
}
|
||||
|
||||
func NewInternalErrorf(format string, a ...interface{}) InternalError {
|
||||
return InternalError{fmt.Sprintf(format, a)}
|
||||
}
|
||||
171
vendor/k8s.io/kubernetes/pkg/storage/interfaces.go
generated
vendored
Normal file
171
vendor/k8s.io/kubernetes/pkg/storage/interfaces.go
generated
vendored
Normal file
|
|
@ -0,0 +1,171 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"golang.org/x/net/context"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// Versioner abstracts setting and retrieving metadata fields from database response
|
||||
// onto the object ot list.
|
||||
type Versioner interface {
|
||||
// UpdateObject sets storage metadata into an API object. Returns an error if the object
|
||||
// cannot be updated correctly. May return nil if the requested object does not need metadata
|
||||
// from database.
|
||||
UpdateObject(obj runtime.Object, resourceVersion uint64) error
|
||||
// UpdateList sets the resource version into an API list object. Returns an error if the object
|
||||
// cannot be updated correctly. May return nil if the requested object does not need metadata
|
||||
// from database.
|
||||
UpdateList(obj runtime.Object, resourceVersion uint64) error
|
||||
// ObjectResourceVersion returns the resource version (for persistence) of the specified object.
|
||||
// Should return an error if the specified object does not have a persistable version.
|
||||
ObjectResourceVersion(obj runtime.Object) (uint64, error)
|
||||
}
|
||||
|
||||
// ResponseMeta contains information about the database metadata that is associated with
|
||||
// an object. It abstracts the actual underlying objects to prevent coupling with concrete
|
||||
// database and to improve testability.
|
||||
type ResponseMeta struct {
|
||||
// TTL is the time to live of the node that contained the returned object. It may be
|
||||
// zero or negative in some cases (objects may be expired after the requested
|
||||
// expiration time due to server lag).
|
||||
TTL int64
|
||||
// The resource version of the node that contained the returned object.
|
||||
ResourceVersion uint64
|
||||
}
|
||||
|
||||
// FilterFunc is a predicate which takes an API object and returns true
|
||||
// if and only if the object should remain in the set.
|
||||
type FilterFunc func(obj runtime.Object) bool
|
||||
|
||||
// Everything is a FilterFunc which accepts all objects.
|
||||
func Everything(runtime.Object) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update
|
||||
// that is guaranteed to succeed.
|
||||
// See the comment for GuaranteedUpdate for more details.
|
||||
type UpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error)
|
||||
|
||||
// Preconditions must be fulfilled before an operation (update, delete, etc.) is carried out.
|
||||
type Preconditions struct {
|
||||
// Specifies the target UID.
|
||||
UID *types.UID `json:"uid,omitempty"`
|
||||
}
|
||||
|
||||
// NewUIDPreconditions returns a Preconditions with UID set.
|
||||
func NewUIDPreconditions(uid string) *Preconditions {
|
||||
u := types.UID(uid)
|
||||
return &Preconditions{UID: &u}
|
||||
}
|
||||
|
||||
// Interface offers a common interface for object marshaling/unmarshling operations and
|
||||
// hides all the storage-related operations behind it.
|
||||
type Interface interface {
|
||||
// Returns list of servers addresses of the underyling database.
|
||||
// TODO: This method is used only in a single place. Consider refactoring and getting rid
|
||||
// of this method from the interface.
|
||||
Backends(ctx context.Context) []string
|
||||
|
||||
// Returns Versioner associated with this interface.
|
||||
Versioner() Versioner
|
||||
|
||||
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
|
||||
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
|
||||
// set to the read value from database.
|
||||
Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
|
||||
|
||||
// Delete removes the specified key and returns the value that existed at that spot.
|
||||
// If key didn't exist, it will return NotFound storage error.
|
||||
Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error
|
||||
|
||||
// Watch begins watching the specified key. Events are decoded into API objects,
|
||||
// and any items passing 'filter' are sent down to returned watch.Interface.
|
||||
// resourceVersion may be used to specify what version to begin watching,
|
||||
// which should be the current resourceVersion, and no longer rv+1
|
||||
// (e.g. reconnecting without missing any updates).
|
||||
Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error)
|
||||
|
||||
// WatchList begins watching the specified key's items. Items are decoded into API
|
||||
// objects and any item passing 'filter' are sent down to returned watch.Interface.
|
||||
// resourceVersion may be used to specify what version to begin watching,
|
||||
// which should be the current resourceVersion, and no longer rv+1
|
||||
// (e.g. reconnecting without missing any updates).
|
||||
WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error)
|
||||
|
||||
// Get unmarshals json found at key into objPtr. On a not found error, will either
|
||||
// return a zero object of the requested type, or an error, depending on ignoreNotFound.
|
||||
// Treats empty responses and nil response nodes exactly like a not found error.
|
||||
Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error
|
||||
|
||||
// GetToList unmarshals json found at key and opaque it into *List api object
|
||||
// (an object that satisfies the runtime.IsList definition).
|
||||
GetToList(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error
|
||||
|
||||
// List unmarshalls jsons found at directory defined by key and opaque them
|
||||
// into *List api object (an object that satisfies runtime.IsList definition).
|
||||
// The returned contents may be delayed, but it is guaranteed that they will
|
||||
// be have at least 'resourceVersion'.
|
||||
List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error
|
||||
|
||||
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
|
||||
// retrying the update until success if there is index conflict.
|
||||
// Note that object passed to tryUpdate may change across invocations of tryUpdate() if
|
||||
// other writers are simultaneously updating it, to tryUpdate() needs to take into account
|
||||
// the current contents of the object when deciding how the update object should look.
|
||||
// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
|
||||
// or zero value in 'ptrToType' parameter otherwise.
|
||||
// If the object to update has the same value as previous, it won't do any update
|
||||
// but will return the object in 'ptrToType' parameter.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// s := /* implementation of Interface */
|
||||
// err := s.GuaranteedUpdate(
|
||||
// "myKey", &MyType{}, true,
|
||||
// func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
|
||||
// // Before each incovation of the user defined function, "input" is reset to
|
||||
// // current contents for "myKey" in database.
|
||||
// curr := input.(*MyType) // Guaranteed to succeed.
|
||||
//
|
||||
// // Make the modification
|
||||
// curr.Counter++
|
||||
//
|
||||
// // Return the modified object - return an error to stop iterating. Return
|
||||
// // a uint64 to alter the TTL on the object, or nil to keep it the same value.
|
||||
// return cur, nil, nil
|
||||
// }
|
||||
// })
|
||||
GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, precondtions *Preconditions, tryUpdate UpdateFunc) error
|
||||
|
||||
// Codec provides access to the underlying codec being used by the implementation.
|
||||
Codec() runtime.Codec
|
||||
}
|
||||
|
||||
// Config interface allows storage tiers to generate the proper storage.interface
|
||||
// and reduce the dependencies to encapsulate storage.
|
||||
type Config interface {
|
||||
// Creates the Interface base on ConfigObject
|
||||
NewStorage() (Interface, error)
|
||||
|
||||
// This function is used to enforce membership, and return the underlying type
|
||||
GetType() string
|
||||
}
|
||||
90
vendor/k8s.io/kubernetes/pkg/storage/util.go
generated
vendored
Normal file
90
vendor/k8s.io/kubernetes/pkg/storage/util.go
generated
vendored
Normal file
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/api/validation"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/validation/field"
|
||||
)
|
||||
|
||||
type SimpleUpdateFunc func(runtime.Object) (runtime.Object, error)
|
||||
|
||||
// SimpleUpdateFunc converts SimpleUpdateFunc into UpdateFunc
|
||||
func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc {
|
||||
return func(input runtime.Object, _ ResponseMeta) (runtime.Object, *uint64, error) {
|
||||
out, err := fn(input)
|
||||
return out, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// ParseWatchResourceVersion takes a resource version argument and converts it to
|
||||
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
|
||||
// an opaque value, the default watch behavior for non-zero watch is to watch
|
||||
// the next value (if you pass "1", you will see updates from "2" onwards).
|
||||
func ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
|
||||
if resourceVersion == "" || resourceVersion == "0" {
|
||||
return 0, nil
|
||||
}
|
||||
version, err := strconv.ParseUint(resourceVersion, 10, 64)
|
||||
if err != nil {
|
||||
return 0, NewInvalidError(field.ErrorList{
|
||||
// Validation errors are supposed to return version-specific field
|
||||
// paths, but this is probably close enough.
|
||||
field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
|
||||
})
|
||||
}
|
||||
return version, nil
|
||||
}
|
||||
|
||||
// ParseListResourceVersion takes a resource version argument and converts it to
|
||||
// the etcd version.
|
||||
func ParseListResourceVersion(resourceVersion string) (uint64, error) {
|
||||
if resourceVersion == "" {
|
||||
return 0, nil
|
||||
}
|
||||
version, err := strconv.ParseUint(resourceVersion, 10, 64)
|
||||
return version, err
|
||||
}
|
||||
|
||||
func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
|
||||
meta, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
name := meta.GetName()
|
||||
if ok, msg := validation.IsValidPathSegmentName(name); !ok {
|
||||
return "", fmt.Errorf("invalid name: %v", msg)
|
||||
}
|
||||
return prefix + "/" + meta.GetNamespace() + "/" + name, nil
|
||||
}
|
||||
|
||||
func NoNamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
|
||||
meta, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
name := meta.GetName()
|
||||
if ok, msg := validation.IsValidPathSegmentName(name); !ok {
|
||||
return "", fmt.Errorf("invalid name: %v", msg)
|
||||
}
|
||||
return prefix + "/" + name, nil
|
||||
}
|
||||
326
vendor/k8s.io/kubernetes/pkg/storage/watch_cache.go
generated
vendored
Normal file
326
vendor/k8s.io/kubernetes/pkg/storage/watch_cache.go
generated
vendored
Normal file
|
|
@ -0,0 +1,326 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
const (
|
||||
// MaximumListWait determines how long we're willing to wait for a
|
||||
// list if a client specified a resource version in the future.
|
||||
MaximumListWait = 60 * time.Second
|
||||
)
|
||||
|
||||
// watchCacheEvent is a single "watch event" that is send to users of
|
||||
// watchCache. Additionally to a typical "watch.Event" it contains
|
||||
// the previous value of the object to enable proper filtering in the
|
||||
// upper layers.
|
||||
type watchCacheEvent struct {
|
||||
Type watch.EventType
|
||||
Object runtime.Object
|
||||
PrevObject runtime.Object
|
||||
ResourceVersion uint64
|
||||
}
|
||||
|
||||
// watchCacheElement is a single "watch event" stored in a cache.
|
||||
// It contains the resource version of the object and the object
|
||||
// itself.
|
||||
type watchCacheElement struct {
|
||||
resourceVersion uint64
|
||||
watchCacheEvent watchCacheEvent
|
||||
}
|
||||
|
||||
// watchCache implements a Store interface.
|
||||
// However, it depends on the elements implementing runtime.Object interface.
|
||||
//
|
||||
// watchCache is a "sliding window" (with a limited capacity) of objects
|
||||
// observed from a watch.
|
||||
type watchCache struct {
|
||||
sync.RWMutex
|
||||
|
||||
// Condition on which lists are waiting for the fresh enough
|
||||
// resource version.
|
||||
cond *sync.Cond
|
||||
|
||||
// Maximum size of history window.
|
||||
capacity int
|
||||
|
||||
// cache is used a cyclic buffer - its first element (with the smallest
|
||||
// resourceVersion) is defined by startIndex, its last element is defined
|
||||
// by endIndex (if cache is full it will be startIndex + capacity).
|
||||
// Both startIndex and endIndex can be greater than buffer capacity -
|
||||
// you should always apply modulo capacity to get an index in cache array.
|
||||
cache []watchCacheElement
|
||||
startIndex int
|
||||
endIndex int
|
||||
|
||||
// store will effectively support LIST operation from the "end of cache
|
||||
// history" i.e. from the moment just after the newest cached watched event.
|
||||
// It is necessary to effectively allow clients to start watching at now.
|
||||
store cache.Store
|
||||
|
||||
// ResourceVersion up to which the watchCache is propagated.
|
||||
resourceVersion uint64
|
||||
|
||||
// This handler is run at the end of every successful Replace() method.
|
||||
onReplace func()
|
||||
|
||||
// This handler is run at the end of every Add/Update/Delete method
|
||||
// and additionally gets the previous value of the object.
|
||||
onEvent func(watchCacheEvent)
|
||||
|
||||
// for testing timeouts.
|
||||
clock util.Clock
|
||||
}
|
||||
|
||||
func newWatchCache(capacity int) *watchCache {
|
||||
wc := &watchCache{
|
||||
capacity: capacity,
|
||||
cache: make([]watchCacheElement, capacity),
|
||||
startIndex: 0,
|
||||
endIndex: 0,
|
||||
store: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||
resourceVersion: 0,
|
||||
clock: util.RealClock{},
|
||||
}
|
||||
wc.cond = sync.NewCond(wc.RLocker())
|
||||
return wc
|
||||
}
|
||||
|
||||
func (w *watchCache) Add(obj interface{}) error {
|
||||
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
event := watch.Event{Type: watch.Added, Object: object}
|
||||
|
||||
f := func(obj runtime.Object) error { return w.store.Add(obj) }
|
||||
return w.processEvent(event, resourceVersion, f)
|
||||
}
|
||||
|
||||
func (w *watchCache) Update(obj interface{}) error {
|
||||
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
event := watch.Event{Type: watch.Modified, Object: object}
|
||||
|
||||
f := func(obj runtime.Object) error { return w.store.Update(obj) }
|
||||
return w.processEvent(event, resourceVersion, f)
|
||||
}
|
||||
|
||||
func (w *watchCache) Delete(obj interface{}) error {
|
||||
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
event := watch.Event{Type: watch.Deleted, Object: object}
|
||||
|
||||
f := func(obj runtime.Object) error { return w.store.Delete(obj) }
|
||||
return w.processEvent(event, resourceVersion, f)
|
||||
}
|
||||
|
||||
func objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) {
|
||||
object, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
return nil, 0, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
|
||||
}
|
||||
meta, err := meta.Accessor(object)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
resourceVersion, err := parseResourceVersion(meta.GetResourceVersion())
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return object, resourceVersion, nil
|
||||
}
|
||||
|
||||
func parseResourceVersion(resourceVersion string) (uint64, error) {
|
||||
if resourceVersion == "" {
|
||||
return 0, nil
|
||||
}
|
||||
return strconv.ParseUint(resourceVersion, 10, 64)
|
||||
}
|
||||
|
||||
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error {
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
previous, exists, err := w.store.Get(event.Object)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var prevObject runtime.Object
|
||||
if exists {
|
||||
prevObject = previous.(runtime.Object)
|
||||
}
|
||||
watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion}
|
||||
if w.onEvent != nil {
|
||||
w.onEvent(watchCacheEvent)
|
||||
}
|
||||
w.updateCache(resourceVersion, watchCacheEvent)
|
||||
w.resourceVersion = resourceVersion
|
||||
w.cond.Broadcast()
|
||||
return updateFunc(event.Object)
|
||||
}
|
||||
|
||||
// Assumes that lock is already held for write.
|
||||
func (w *watchCache) updateCache(resourceVersion uint64, event watchCacheEvent) {
|
||||
if w.endIndex == w.startIndex+w.capacity {
|
||||
// Cache is full - remove the oldest element.
|
||||
w.startIndex++
|
||||
}
|
||||
w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
|
||||
w.endIndex++
|
||||
}
|
||||
|
||||
func (w *watchCache) List() []interface{} {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
return w.store.List()
|
||||
}
|
||||
|
||||
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) {
|
||||
startTime := w.clock.Now()
|
||||
go func() {
|
||||
// Wake us up when the time limit has expired. The docs
|
||||
// promise that time.After (well, NewTimer, which it calls)
|
||||
// will wait *at least* the duration given. Since this go
|
||||
// routine starts sometime after we record the start time, and
|
||||
// it will wake up the loop below sometime after the broadcast,
|
||||
// we don't need to worry about waking it up before the time
|
||||
// has expired accidentally.
|
||||
<-w.clock.After(MaximumListWait)
|
||||
w.cond.Broadcast()
|
||||
}()
|
||||
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
for w.resourceVersion < resourceVersion {
|
||||
if w.clock.Since(startTime) >= MaximumListWait {
|
||||
return nil, 0, fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion)
|
||||
}
|
||||
w.cond.Wait()
|
||||
}
|
||||
return w.store.List(), w.resourceVersion, nil
|
||||
}
|
||||
|
||||
func (w *watchCache) ListKeys() []string {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
return w.store.ListKeys()
|
||||
}
|
||||
|
||||
func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
return w.store.Get(obj)
|
||||
}
|
||||
|
||||
func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
return w.store.GetByKey(key)
|
||||
}
|
||||
|
||||
func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
|
||||
version, err := parseResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
|
||||
w.startIndex = 0
|
||||
w.endIndex = 0
|
||||
if err := w.store.Replace(objs, resourceVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
w.resourceVersion = version
|
||||
if w.onReplace != nil {
|
||||
w.onReplace()
|
||||
}
|
||||
w.cond.Broadcast()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *watchCache) SetOnReplace(onReplace func()) {
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
w.onReplace = onReplace
|
||||
}
|
||||
|
||||
func (w *watchCache) SetOnEvent(onEvent func(watchCacheEvent)) {
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
w.onEvent = onEvent
|
||||
}
|
||||
|
||||
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]watchCacheEvent, error) {
|
||||
size := w.endIndex - w.startIndex
|
||||
oldest := w.resourceVersion
|
||||
if size > 0 {
|
||||
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
|
||||
}
|
||||
if resourceVersion == 0 {
|
||||
// resourceVersion = 0 means that we don't require any specific starting point
|
||||
// and we would like to start watching from ~now.
|
||||
// However, to keep backward compatibility, we additionally need to return the
|
||||
// current state and only then start watching from that point.
|
||||
//
|
||||
// TODO: In v2 api, we should stop returning the current state - #13969.
|
||||
allItems := w.store.List()
|
||||
result := make([]watchCacheEvent, len(allItems))
|
||||
for i, item := range allItems {
|
||||
result[i] = watchCacheEvent{Type: watch.Added, Object: item.(runtime.Object)}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
if resourceVersion < oldest-1 {
|
||||
return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
|
||||
}
|
||||
|
||||
// Binary search the smallest index at which resourceVersion is greater than the given one.
|
||||
f := func(i int) bool {
|
||||
return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion
|
||||
}
|
||||
first := sort.Search(size, f)
|
||||
result := make([]watchCacheEvent, size-first)
|
||||
for i := 0; i < size-first; i++ {
|
||||
result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]watchCacheEvent, error) {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
return w.GetAllEventsSinceThreadUnsafe(resourceVersion)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue