Update godeps

This commit is contained in:
Prashanth Balasubramanian 2016-06-21 11:58:43 -07:00
parent 423433bc5f
commit 701c5a0e30
482 changed files with 86915 additions and 19741 deletions

View file

@ -84,7 +84,7 @@ func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyL
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
// to list keys that are "known", for the purpose of figuring out which
// items have been deleted when Replace() or Delete() are called. The deleted
// objet will be included in the DeleteFinalStateUnknown markers. These objects
// object will be included in the DeleteFinalStateUnknown markers. These objects
// could be stale.
//
// You may provide a function to compress deltas (e.g., represent a
@ -189,12 +189,20 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
// Don't provide a second report of the same deletion.
return nil
}
} else if _, exists, err := f.knownObjects.GetByKey(id); err == nil && !exists {
// Presumably, this was deleted when a relist happened.
// Don't provide a second report of the same deletion.
// TODO(lavalamp): This may be racy-- we aren't properly locked
// with knownObjects.
return nil
} else {
// We only want to skip the "deletion" action if the object doesn't
// exist in knownObjects and it doesn't have corresponding item in items.
// Note that even if there is a "deletion" action in items, we can ignore it,
// because it will be deduped automatically in "queueActionLocked"
_, exists, err := f.knownObjects.GetByKey(id)
_, itemsExist := f.items[id]
if err == nil && !exists && !itemsExist {
// Presumably, this was deleted when a relist happened.
// Don't provide a second report of the same deletion.
// TODO(lavalamp): This may be racy-- we aren't properly locked
// with knownObjects.
return nil
}
}
return f.queueActionLocked(Deleted, obj)
@ -270,6 +278,13 @@ func isDeletionDup(a, b *Delta) *Delta {
return b
}
// willObjectBeDeletedLocked returns true only if the last delta for the
// given object is Delete. Caller must lock first.
func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
deltas := f.items[id]
return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
}
// queueActionLocked appends to the delta list for the object, calling
// f.deltaCompressor if needed. Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
@ -277,6 +292,14 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
if err != nil {
return KeyError{obj, err}
}
// If object is supposed to be deleted (last event is Deleted),
// then we should ignore Sync events, because it would result in
// recreation of this object.
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if f.deltaCompressor != nil {
@ -306,6 +329,10 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
func (f *DeltaFIFO) List() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.listLocked()
}
func (f *DeltaFIFO) listLocked() []interface{} {
list := make([]interface{}, 0, len(f.items))
for _, item := range f.items {
// Copy item's slice so operations on this slice (delta
@ -359,10 +386,12 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// in it that need to be in sync with the queue (e.g. knownKeys).
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
func (f *DeltaFIFO) Pop() interface{} {
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
@ -382,7 +411,7 @@ func (f *DeltaFIFO) Pop() interface{} {
delete(f.items, id)
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item
return item, process(item)
}
}
@ -452,6 +481,27 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
return nil
}
// Resync will send a sync event for each item
func (f *DeltaFIFO) Resync() error {
f.lock.RLock()
defer f.lock.RUnlock()
for _, k := range f.knownObjects.ListKeys() {
obj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, k)
continue
} else if !exists {
glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", k)
continue
}
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
}
return nil
}
// A KeyListerGetter is anything that knows how to list its keys and look up by key.
type KeyListerGetter interface {
KeyLister

View file

@ -146,6 +146,7 @@ func (c *ExpirationCache) ListKeys() []string {
func (c *ExpirationCache) Add(obj interface{}) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
@ -191,6 +192,11 @@ func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) er
return nil
}
// Resync will touch all objects to put them into the processing queue
func (c *ExpirationCache) Resync() error {
return c.cacheStorage.Resync()
}
// NewTTLStore creates and returns a ExpirationCache with a TTLPolicy
func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store {
return &ExpirationCache{

View file

@ -0,0 +1,102 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
// FakeStore lets you define custom functions for store operations
type FakeCustomStore struct {
AddFunc func(obj interface{}) error
UpdateFunc func(obj interface{}) error
DeleteFunc func(obj interface{}) error
ListFunc func() []interface{}
ListKeysFunc func() []string
GetFunc func(obj interface{}) (item interface{}, exists bool, err error)
GetByKeyFunc func(key string) (item interface{}, exists bool, err error)
ReplaceFunc func(list []interface{}, resourceVerion string) error
ResyncFunc func() error
}
// Add calls the custom Add function if defined
func (f *FakeCustomStore) Add(obj interface{}) error {
if f.AddFunc != nil {
return f.AddFunc(obj)
}
return nil
}
// Update calls the custom Update function if defined
func (f *FakeCustomStore) Update(obj interface{}) error {
if f.UpdateFunc != nil {
return f.Update(obj)
}
return nil
}
// Delete calls the custom Delete function if defined
func (f *FakeCustomStore) Delete(obj interface{}) error {
if f.DeleteFunc != nil {
return f.DeleteFunc(obj)
}
return nil
}
// List calls the custom List function if defined
func (f *FakeCustomStore) List() []interface{} {
if f.ListFunc != nil {
return f.ListFunc()
}
return nil
}
// ListKeys calls the custom ListKeys function if defined
func (f *FakeCustomStore) ListKeys() []string {
if f.ListKeysFunc != nil {
return f.ListKeysFunc()
}
return nil
}
// Get calls the custom Get function if defined
func (f *FakeCustomStore) Get(obj interface{}) (item interface{}, exists bool, err error) {
if f.GetFunc != nil {
return f.GetFunc(obj)
}
return nil, false, nil
}
// GetByKey calls the custom GetByKey function if defined
func (f *FakeCustomStore) GetByKey(key string) (item interface{}, exists bool, err error) {
if f.GetByKeyFunc != nil {
return f.GetByKeyFunc(key)
}
return nil, false, nil
}
// Replace calls the custom Replace function if defined
func (f *FakeCustomStore) Replace(list []interface{}, resourceVersion string) error {
if f.ReplaceFunc != nil {
return f.ReplaceFunc(list, resourceVersion)
}
return nil
}
// Resync calls the custom Resync function if defined
func (f *FakeCustomStore) Resync() error {
if f.ResyncFunc != nil {
return f.ResyncFunc()
}
return nil
}

View file

@ -18,14 +18,21 @@ package cache
import (
"sync"
"k8s.io/kubernetes/pkg/util/sets"
)
// PopProcessFunc is passed to Pop() method of Queue interface.
// It is supposed to process the element popped from the queue.
type PopProcessFunc func(interface{}) error
// Queue is exactly like a Store, but has a Pop() method too.
type Queue interface {
Store
// Pop blocks until it has something to return.
Pop() interface{}
// Pop blocks until it has something to process.
// It returns the object that was process and the result of processing.
Pop(PopProcessFunc) (interface{}, error)
// AddIfNotPresent adds a value previously
// returned by Pop back into the queue as long
@ -37,6 +44,18 @@ type Queue interface {
HasSynced() bool
}
// Helper function for popping from Queue.
// WARNING: Do NOT use this function in non-test code to avoid races
// unless you really really really really know what you are doing.
func Pop(queue Queue) interface{} {
var result interface{}
queue.Pop(func(obj interface{}) error {
result = obj
return nil
})
return result
}
// FIFO receives adds and updates from a Reflector, and puts them in a queue for
// FIFO order processing. If multiple adds/updates of a single item happen while
// an item is in the queue before it has been processed, it will only be
@ -181,12 +200,13 @@ func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
return item, exists, nil
}
// Pop waits until an item is ready and returns it. If multiple items are
// Pop waits until an item is ready and processes it. If multiple items are
// ready, they are returned in the order in which they were added/updated.
// The item is removed from the queue (and the store) before it is returned,
// so if you don't successfully process it, you need to add it back with
// AddIfNotPresent().
func (f *FIFO) Pop() interface{} {
// The item is removed from the queue (and the store) before it is processed,
// so if you don't successfully process it, it should be added back with
// AddIfNotPresent(). process function is called under lock, so it is safe
// update data structures in it that need to be in sync with the queue.
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
@ -204,7 +224,7 @@ func (f *FIFO) Pop() interface{} {
continue
}
delete(f.items, id)
return item
return item, process(item)
}
}
@ -241,6 +261,26 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
return nil
}
// Resync will touch all objects to put them into the processing queue
func (f *FIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()
inQueue := sets.NewString()
for _, id := range f.queue {
inQueue.Insert(id)
}
for id := range f.items {
if !inQueue.Has(id) {
f.queue = append(f.queue, id)
}
}
if len(f.queue) > 0 {
f.cond.Broadcast()
}
return nil
}
// NewFIFO returns a Store which can be used to queue up items to
// process.
func NewFIFO(keyFunc KeyFunc) *FIFO {

View file

@ -34,6 +34,10 @@ type Indexer interface {
ByIndex(indexName, indexKey string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
}
// IndexFunc knows how to provide an indexed value for an object.

View file

@ -76,38 +76,39 @@ type storePodsNamespacer struct {
// Please note that selector is filtering among the pods that have gotten into
// the store; there may have been some filtering that already happened before
// that.
func (s storePodsNamespacer) List(selector labels.Selector) (pods api.PodList, err error) {
list := api.PodList{}
func (s storePodsNamespacer) List(selector labels.Selector) (api.PodList, error) {
pods := api.PodList{}
if s.namespace == api.NamespaceAll {
for _, m := range s.indexer.List() {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
list.Items = append(list.Items, *pod)
pods.Items = append(pods.Items, *pod)
}
}
return list, nil
return pods, nil
}
key := &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
items, err := s.indexer.Index(NamespaceIndex, key)
if err != nil {
// Ignore error; do slow search without index.
glog.Warningf("can not retrieve list of objects using index : %v", err)
for _, m := range s.indexer.List() {
pod := m.(*api.Pod)
if s.namespace == pod.Namespace && selector.Matches(labels.Set(pod.Labels)) {
list.Items = append(list.Items, *pod)
pods.Items = append(pods.Items, *pod)
}
}
return list, err
return pods, nil
}
for _, m := range items {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
list.Items = append(list.Items, *pod)
pods.Items = append(pods.Items, *pod)
}
}
return list, nil
return pods, nil
}
// Exists returns true if a pod matching the namespace/name of the given pod exists in the store.
@ -194,7 +195,9 @@ type storeReplicationControllersNamespacer struct {
namespace string
}
func (s storeReplicationControllersNamespacer) List(selector labels.Selector) (controllers []api.ReplicationController, err error) {
func (s storeReplicationControllersNamespacer) List(selector labels.Selector) ([]api.ReplicationController, error) {
controllers := []api.ReplicationController{}
if s.namespace == api.NamespaceAll {
for _, m := range s.indexer.List() {
rc := *(m.(*api.ReplicationController))
@ -202,12 +205,13 @@ func (s storeReplicationControllersNamespacer) List(selector labels.Selector) (c
controllers = append(controllers, rc)
}
}
return
return controllers, nil
}
key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
items, err := s.indexer.Index(NamespaceIndex, key)
if err != nil {
// Ignore error; do slow search without index.
glog.Warningf("can not retrieve list of objects using index : %v", err)
for _, m := range s.indexer.List() {
rc := *(m.(*api.ReplicationController))
@ -215,7 +219,7 @@ func (s storeReplicationControllersNamespacer) List(selector labels.Selector) (c
controllers = append(controllers, rc)
}
}
return
return controllers, nil
}
for _, m := range items {
rc := *(m.(*api.ReplicationController))
@ -223,7 +227,7 @@ func (s storeReplicationControllersNamespacer) List(selector labels.Selector) (c
controllers = append(controllers, rc)
}
}
return
return controllers, nil
}
// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found.

View file

@ -246,35 +246,6 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
return t.C, t.Stop
}
// We want to avoid situations when periodic resyncing is breaking the TCP
// connection.
// If response`s body is not read to completion before calling body.Close(),
// that TCP connection will not be reused in the future - see #15664 issue
// for more details.
// Thus, we set timeout for watch requests to be smaller than the remaining
// time until next periodic resync and force resyncing ourself to avoid
// breaking TCP connection.
//
// TODO: This should be parametrizable based on server load.
func (r *Reflector) timeoutForWatch() *int64 {
randTimeout := time.Duration(float64(minWatchTimeout) * (rand.Float64() + 1.0))
timeout := r.nextResync.Sub(r.now()) - timeoutThreshold
if timeout < 0 || randTimeout < timeout {
timeout = randTimeout
}
timeoutSeconds := int64(timeout.Seconds())
return &timeoutSeconds
}
// Returns true if we are close enough to next planned periodic resync
// and we can force resyncing ourself now.
func (r *Reflector) canForceResyncNow() bool {
if r.nextResync.IsZero() {
return false
}
return r.now().Add(forceResyncThreshold).After(r.nextResync)
}
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
@ -292,11 +263,11 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
}
metaInterface, err := meta.Accessor(list)
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v", r.name, list)
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
resourceVersion = metaInterface.GetResourceVersion()
resourceVersion = listMetaInterface.GetResourceVersion()
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
@ -306,13 +277,33 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}
r.setLastSyncResourceVersion(resourceVersion)
for {
options := api.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations when resyncing is breaking the TCP connection
// - see comment for 'timeoutForWatch()' for more details.
TimeoutSeconds: r.timeoutForWatch(),
resyncerrc := make(chan error, 1)
go func() {
for {
select {
case <-resyncCh:
case <-stopCh:
return
}
glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
for {
timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = api.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timemoutseconds,
}
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
@ -337,16 +328,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}
return nil
}
if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil {
if err != errorResyncRequested && err != errorStopRequested {
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
}
return nil
}
if r.canForceResyncNow() {
glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync)
return nil
}
}
}
@ -360,7 +348,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
}
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, resyncCh <-chan time.Time, stopCh <-chan struct{}) error {
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
start := time.Now()
eventCount := 0
@ -373,8 +361,8 @@ loop:
select {
case <-stopCh:
return errorStopRequested
case <-resyncCh:
return errorResyncRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
if !ok {
break loop

View file

@ -44,6 +44,7 @@ type Store interface {
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error
Resync() error
}
// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
@ -180,6 +181,10 @@ func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
return c.cacheStorage.ByIndex(indexName, indexKey)
}
func (c *cache) AddIndexers(newIndexers Indexers) error {
return c.cacheStorage.AddIndexers(newIndexers)
}
// Get returns the requested item, or sets exists=false.
// Get is completely threadsafe as long as you treat all items as immutable.
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
@ -213,6 +218,11 @@ func (c *cache) Replace(list []interface{}, resourceVersion string) error {
return nil
}
// Resync touches all items in the store to force processing
func (c *cache) Resync() error {
return c.cacheStorage.Resync()
}
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
return &cache{

View file

@ -46,6 +46,11 @@ type ThreadSafeStore interface {
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
Resync() error
}
// threadSafeMap implements ThreadSafeStore
@ -195,6 +200,27 @@ func (c *threadSafeMap) GetIndexers() Indexers {
return c.indexers
}
func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
c.lock.Lock()
defer c.lock.Unlock()
if len(c.items) > 0 {
return fmt.Errorf("cannot add indexers to running index")
}
oldKeys := sets.StringKeySet(c.indexers)
newKeys := sets.StringKeySet(newIndexers)
if oldKeys.HasAny(newKeys.List()...) {
return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
}
for k, v := range newIndexers {
c.indexers[k] = v
}
return nil
}
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error {
@ -247,6 +273,11 @@ func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error {
return nil
}
func (c *threadSafeMap) Resync() error {
// Nothing to do
return nil
}
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},