Update ingress godeps

This commit is contained in:
Manuel de Brito Fontes 2016-08-10 14:53:55 -04:00
parent d43021b3f1
commit 28db8fb16d
1068 changed files with 461467 additions and 117300 deletions

View file

@ -21,7 +21,6 @@ import (
"net/http"
"reflect"
"strconv"
"strings"
"sync"
"time"
@ -59,9 +58,67 @@ type CacherConfig struct {
// KeyFunc is used to get a key in the underyling storage for a given object.
KeyFunc func(runtime.Object) (string, error)
// TriggerPublisherFunc is used for optimizing amount of watchers that
// needs to process an incoming event.
TriggerPublisherFunc TriggerPublisherFunc
// NewList is a function that creates new empty object storing a list of
// objects of type Type.
NewListFunc func() runtime.Object
Codec runtime.Codec
}
type watchersMap map[int]*cacheWatcher
func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
wm[number] = w
}
func (wm watchersMap) deleteWatcher(number int) {
delete(wm, number)
}
func (wm watchersMap) terminateAll() {
for key, watcher := range wm {
delete(wm, key)
watcher.stop()
}
}
type indexedWatchers struct {
allWatchers watchersMap
valueWatchers map[string]watchersMap
}
func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) {
if supported {
if _, ok := i.valueWatchers[value]; !ok {
i.valueWatchers[value] = watchersMap{}
}
i.valueWatchers[value].addWatcher(w, number)
} else {
i.allWatchers.addWatcher(w, number)
}
}
func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool) {
if supported {
i.valueWatchers[value].deleteWatcher(number)
if len(i.valueWatchers[value]) == 0 {
delete(i.valueWatchers, value)
}
} else {
i.allWatchers.deleteWatcher(number)
}
}
func (i *indexedWatchers) terminateAll() {
i.allWatchers.terminateAll()
for index, watchers := range i.valueWatchers {
watchers.terminateAll()
delete(i.valueWatchers, index)
}
}
// Cacher is responsible for serving WATCH and LIST requests for a given
@ -83,20 +140,27 @@ type Cacher struct {
// Underlying storage.Interface.
storage Interface
// Expected type of objects in the underlying cache.
objectType reflect.Type
// "sliding window" of recent changes of objects and the current state.
watchCache *watchCache
reflector *cache.Reflector
// Registered watchers.
watcherIdx int
watchers map[int]*cacheWatcher
// Versioner is used to handle resource versions.
versioner Versioner
// keyFunc is used to get a key in the underyling storage for a given object.
keyFunc func(runtime.Object) (string, error)
// triggerFunc is used for optimizing amount of watchers that needs to process
// an incoming event.
triggerFunc TriggerPublisherFunc
// watchers is mapping from the value of trigger function that a
// watcher is interested into the watchers
watcherIdx int
watchers indexedWatchers
// Handling graceful termination.
stopLock sync.RWMutex
stopped bool
@ -114,19 +178,25 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
// Give this error when it is constructed rather than when you get the
// first watch item, because it's much easier to track down that way.
if obj, ok := config.Type.(runtime.Object); ok {
if err := runtime.CheckCodec(config.Storage.Codec(), obj); err != nil {
if err := runtime.CheckCodec(config.Codec, obj); err != nil {
panic("storage codec doesn't seem to match given type: " + err.Error())
}
}
cacher := &Cacher{
ready: newReady(),
storage: config.Storage,
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
watchers: make(map[int]*cacheWatcher),
versioner: config.Versioner,
keyFunc: config.KeyFunc,
ready: newReady(),
storage: config.Storage,
objectType: reflect.TypeOf(config.Type),
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
versioner: config.Versioner,
keyFunc: config.KeyFunc,
triggerFunc: config.TriggerPublisherFunc,
watcherIdx: 0,
watchers: indexedWatchers{
allWatchers: make(map[int]*cacheWatcher),
valueWatchers: make(map[string]watchersMap),
},
// We need to (potentially) stop both:
// - wait.Until go-routine
// - reflector.ListAndWatch
@ -179,11 +249,6 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
}
}
// Implements storage.Interface.
func (c *Cacher) Backends(ctx context.Context) []string {
return c.storage.Backends(ctx)
}
// Implements storage.Interface.
func (c *Cacher) Versioner() Versioner {
return c.storage.Versioner()
@ -200,7 +265,7 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, pre
}
// Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) {
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error) {
watchRV, err := ParseWatchResourceVersion(resourceVersion)
if err != nil {
return nil, err
@ -223,16 +288,26 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
return newErrWatcher(err), nil
}
triggerValue, triggerSupported := "", false
// TODO: Currently we assume that in a given Cacher object, any <filter> that is
// passed here is aware of exactly the same trigger (at most one).
// Thus, either 0 or 1 values will be returned.
if matchValues := filter.Trigger(); len(matchValues) > 0 {
triggerValue, triggerSupported = matchValues[0].Value, true
}
c.Lock()
defer c.Unlock()
watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
c.watchers[c.watcherIdx] = watcher
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forget)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watcherIdx++
return watcher, nil
}
// Implements storage.Interface.
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) {
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error) {
return c.Watch(ctx, key, resourceVersion, filter)
}
@ -242,12 +317,12 @@ func (c *Cacher) Get(ctx context.Context, key string, objPtr runtime.Object, ign
}
// Implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error {
func (c *Cacher) GetToList(ctx context.Context, key string, filter Filter, listObj runtime.Object) error {
return c.storage.GetToList(ctx, key, filter, listObj)
}
// Implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error {
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter Filter, listObj runtime.Object) error {
if resourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
@ -285,7 +360,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f
if !ok {
return fmt.Errorf("non runtime.Object returned from storage: %v", obj)
}
if filterFunc(object) {
if filterFunc.Filter(object) {
listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
}
}
@ -302,26 +377,69 @@ func (c *Cacher) GuaranteedUpdate(ctx context.Context, key string, ptrToType run
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
}
// Implements storage.Interface.
func (c *Cacher) Codec() runtime.Codec {
return c.storage.Codec()
func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
// TODO: Currently we assume that in a given Cacher object, its <c.triggerFunc>
// is aware of exactly the same trigger (at most one). Thus calling:
// c.triggerFunc(<some object>)
// can return only 0 or 1 values.
// That means, that triggerValues itself may return up to 2 different values.
if c.triggerFunc == nil {
return nil, false
}
result := make([]string, 0, 2)
matchValues := c.triggerFunc(event.Object)
if len(matchValues) > 0 {
result = append(result, matchValues[0].Value)
}
if event.PrevObject == nil {
return result, len(result) > 0
}
prevMatchValues := c.triggerFunc(event.PrevObject)
if len(prevMatchValues) > 0 {
if len(result) == 0 || result[0] != prevMatchValues[0].Value {
result = append(result, prevMatchValues[0].Value)
}
}
return result, len(result) > 0
}
func (c *Cacher) processEvent(event watchCacheEvent) {
triggerValues, supported := c.triggerValues(&event)
c.Lock()
defer c.Unlock()
for _, watcher := range c.watchers {
// Iterate over "allWatchers" no matter what the trigger function is.
for _, watcher := range c.watchers.allWatchers {
watcher.add(event)
}
if supported {
// Iterate over watchers interested in the given values of the trigger.
for _, triggerValue := range triggerValues {
for _, watcher := range c.watchers.valueWatchers[triggerValue] {
watcher.add(event)
}
}
} else {
// supported equal to false generally means that trigger function
// is not defined (or not aware of any indexes). In this case,
// watchers filters should generally also don't generate any
// trigger values, but can cause problems in case of some
// misconfiguration. Thus we paranoidly leave this branch.
// Iterate over watchers interested in exact values for all values.
for _, watchers := range c.watchers.valueWatchers {
for _, watcher := range watchers {
watcher.add(event)
}
}
}
}
func (c *Cacher) terminateAllWatchers() {
glog.Warningf("Terminating all watchers from cacher %v", c.objectType)
c.Lock()
defer c.Unlock()
for key, watcher := range c.watchers {
delete(c.watchers, key)
watcher.stop()
}
c.watchers.terminateAll()
}
func (c *Cacher) isStopped() bool {
@ -338,30 +456,31 @@ func (c *Cacher) Stop() {
c.stopWg.Wait()
}
func forgetWatcher(c *Cacher, index int) func(bool) {
func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func(bool) {
return func(lock bool) {
if lock {
c.Lock()
defer c.Unlock()
}
// It's possible that the watcher is already not in the map (e.g. in case of
// It's possible that the watcher is already not in the structure (e.g. in case of
// simulaneous Stop() and terminateAllWatchers(), but it doesn't break anything.
delete(c.watchers, index)
c.watchers.deleteWatcher(index, triggerValue, triggerSupported)
}
}
func filterFunction(key string, keyFunc func(runtime.Object) (string, error), filter FilterFunc) FilterFunc {
return func(obj runtime.Object) bool {
func filterFunction(key string, keyFunc func(runtime.Object) (string, error), filter Filter) Filter {
filterFunc := func(obj runtime.Object) bool {
objKey, err := keyFunc(obj)
if err != nil {
glog.Errorf("invalid object for filter: %v", obj)
return false
}
if !strings.HasPrefix(objKey, key) {
if !hasPathPrefix(objKey, key) {
return false
}
return filter(obj)
return filter.Filter(obj)
}
return NewSimpleFilter(filterFunc, filter.Trigger)
}
// Returns resource version to which the underlying cache is synced.
@ -450,12 +569,12 @@ type cacheWatcher struct {
sync.Mutex
input chan watchCacheEvent
result chan watch.Event
filter FilterFunc
filter Filter
stopped bool
forget func(bool)
}
func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter Filter, forget func(bool)) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan watchCacheEvent, 10),
result: make(chan watch.Event, 10),
@ -527,10 +646,10 @@ func (c *cacheWatcher) add(event watchCacheEvent) {
}
func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
curObjPasses := event.Type != watch.Deleted && c.filter(event.Object)
curObjPasses := event.Type != watch.Deleted && c.filter.Filter(event.Object)
oldObjPasses := false
if event.PrevObject != nil {
oldObjPasses = c.filter(event.PrevObject)
oldObjPasses = c.filter.Filter(event.PrevObject)
}
if !curObjPasses && !oldObjPasses {
// Watcher is not interested in that object.

View file

@ -107,24 +107,20 @@ func IsUnreachable(err error) bool {
// IsTestFailed returns true if and only if err is a write conflict.
func IsTestFailed(err error) bool {
return isErrCode(err, ErrCodeResourceVersionConflicts, ErrCodeInvalidObj)
return isErrCode(err, ErrCodeResourceVersionConflicts)
}
// IsInvalidUID returns true if and only if err is invalid UID error
// IsInvalidObj returns true if and only if err is invalid error
func IsInvalidObj(err error) bool {
return isErrCode(err, ErrCodeInvalidObj)
}
func isErrCode(err error, codes ...int) bool {
func isErrCode(err error, code int) bool {
if err == nil {
return false
}
if e, ok := err.(*StorageError); ok {
for _, code := range codes {
if e.Code == code {
return true
}
}
return e.Code == code
}
return false
}

View file

@ -0,0 +1,98 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"strconv"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// APIObjectVersioner implements versioning and extracting etcd node information
// for objects that have an embedded ObjectMeta or ListMeta field.
type APIObjectVersioner struct{}
// UpdateObject implements Versioner
func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
accessor, err := meta.Accessor(obj)
if err != nil {
return err
}
versionString := ""
if resourceVersion != 0 {
versionString = strconv.FormatUint(resourceVersion, 10)
}
accessor.SetResourceVersion(versionString)
return nil
}
// UpdateList implements Versioner
func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64) error {
listMeta, err := api.ListMetaFor(obj)
if err != nil || listMeta == nil {
return err
}
versionString := ""
if resourceVersion != 0 {
versionString = strconv.FormatUint(resourceVersion, 10)
}
listMeta.ResourceVersion = versionString
return nil
}
// ObjectResourceVersion implements Versioner
func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return 0, err
}
version := accessor.GetResourceVersion()
if len(version) == 0 {
return 0, nil
}
return strconv.ParseUint(version, 10, 64)
}
// APIObjectVersioner implements Versioner
var Versioner storage.Versioner = APIObjectVersioner{}
// CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
// but etcd resource versions are special, they're actually ints, so we can easily compare them.
func (a APIObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
lhsVersion, err := Versioner.ObjectResourceVersion(lhs)
if err != nil {
// coder error
panic(err)
}
rhsVersion, err := Versioner.ObjectResourceVersion(rhs)
if err != nil {
// coder error
panic(err)
}
if lhsVersion == rhsVersion {
return 0
}
if lhsVersion < rhsVersion {
return -1
}
return 1
}

17
vendor/k8s.io/kubernetes/pkg/storage/etcd/doc.go generated vendored Normal file
View file

@ -0,0 +1,17 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd

View file

@ -0,0 +1,616 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"errors"
"fmt"
"path"
"reflect"
"strings"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd/metrics"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util"
utilcache "k8s.io/kubernetes/pkg/util/cache"
"k8s.io/kubernetes/pkg/watch"
etcd "github.com/coreos/etcd/client"
"github.com/golang/glog"
"golang.org/x/net/context"
)
// Creates a new storage interface from the client
// TODO: deprecate in favor of storage.Config abstraction over time
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface {
return &etcdHelper{
etcdMembersAPI: etcd.NewMembersAPI(client),
etcdKeysAPI: etcd.NewKeysAPI(client),
codec: codec,
versioner: APIObjectVersioner{},
copier: api.Scheme,
pathPrefix: path.Join("/", prefix),
quorum: quorum,
cache: utilcache.NewCache(cacheSize),
}
}
// etcdHelper is the reference implementation of storage.Interface.
type etcdHelper struct {
etcdMembersAPI etcd.MembersAPI
etcdKeysAPI etcd.KeysAPI
codec runtime.Codec
copier runtime.ObjectCopier
// Note that versioner is required for etcdHelper to work correctly.
// The public constructors (NewStorage & NewEtcdStorage) are setting it
// correctly, so be careful when manipulating with it manually.
// optional, has to be set to perform any atomic operations
versioner storage.Versioner
// prefix for all etcd keys
pathPrefix string
// if true, perform quorum read
quorum bool
// We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
// to resourceVersion.
// This depends on etcd's indexes being globally unique across all objects/types. This will
// have to revisited if we decide to do things like multiple etcd clusters, or etcd will
// support multi-object transaction that will result in many objects with the same index.
// Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
// TODO: Measure how much this cache helps after the conversion code is optimized.
cache utilcache.Cache
}
func init() {
metrics.Register()
}
// Implements storage.Interface.
func (h *etcdHelper) Versioner() storage.Versioner {
return h.versioner
}
// Implements storage.Interface.
func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
trace := util.NewTrace("etcdHelper::Create " + getTypeName(obj))
defer trace.LogIfLong(250 * time.Millisecond)
if ctx == nil {
glog.Errorf("Context is nil")
}
key = h.prefixEtcdKey(key)
data, err := runtime.Encode(h.codec, obj)
trace.Step("Object encoded")
if err != nil {
return err
}
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion may not be set on objects to be created")
}
trace.Step("Version checked")
startTime := time.Now()
opts := etcd.SetOptions{
TTL: time.Duration(ttl) * time.Second,
PrevExist: etcd.PrevNoExist,
}
response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
trace.Step("Object created")
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
if err != nil {
return toStorageErr(err, key, 0)
}
if out != nil {
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err = h.extractObj(response, err, out, false, false)
}
return err
}
func checkPreconditions(key string, preconditions *storage.Preconditions, out runtime.Object) error {
if preconditions == nil {
return nil
}
objMeta, err := api.ObjectMetaFor(out)
if err != nil {
return storage.NewInternalErrorf("can't enforce preconditions %v on un-introspectable object %v, got error: %v", *preconditions, out, err)
}
if preconditions.UID != nil && *preconditions.UID != objMeta.UID {
errMsg := fmt.Sprintf("Precondition failed: UID in precondition: %v, UID in object meta: %v", preconditions.UID, objMeta.UID)
return storage.NewInvalidObjError(key, errMsg)
}
return nil
}
// Implements storage.Interface.
func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
key = h.prefixEtcdKey(key)
v, err := conversion.EnforcePtr(out)
if err != nil {
panic("unable to convert output object to pointer")
}
if preconditions == nil {
startTime := time.Now()
response, err := h.etcdKeysAPI.Delete(ctx, key, nil)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if !etcdutil.IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update the out object.
if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true)
}
}
return toStorageErr(err, key, 0)
}
// Check the preconditions match.
obj := reflect.New(v.Type()).Interface().(runtime.Object)
for {
_, node, res, err := h.bodyAndExtractObj(ctx, key, obj, false)
if err != nil {
return toStorageErr(err, key, 0)
}
if err := checkPreconditions(key, preconditions, obj); err != nil {
return toStorageErr(err, key, 0)
}
index := uint64(0)
if node != nil {
index = node.ModifiedIndex
} else if res != nil {
index = res.Index
}
opt := etcd.DeleteOptions{PrevIndex: index}
startTime := time.Now()
response, err := h.etcdKeysAPI.Delete(ctx, key, &opt)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if etcdutil.IsEtcdTestFailed(err) {
glog.Infof("deletion of %s failed because of a conflict, going to retry", key)
} else {
if !etcdutil.IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update the out object.
if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true)
}
}
return toStorageErr(err, key, 0)
}
}
}
// Implements storage.Interface.
func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
if err != nil {
return nil, err
}
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, h.quorum, nil, filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil
}
// Implements storage.Interface.
func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
if err != nil {
return nil, err
}
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(true, h.quorum, exceptKey(key), filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil
}
// Implements storage.Interface.
func (h *etcdHelper) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
key = h.prefixEtcdKey(key)
_, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
return err
}
// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
// about the response, like the current etcd index and the ttl.
func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
startTime := time.Now()
opts := &etcd.GetOptions{
Quorum: h.quorum,
}
response, err := h.etcdKeysAPI.Get(ctx, key, opts)
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
if err != nil && !etcdutil.IsEtcdNotFound(err) {
return "", nil, nil, toStorageErr(err, key, 0)
}
body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
return body, node, response, toStorageErr(err, key, 0)
}
func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) {
if response != nil {
if prevNode {
node = response.PrevNode
} else {
node = response.Node
}
}
if inErr != nil || node == nil || len(node.Value) == 0 {
if ignoreNotFound {
v, err := conversion.EnforcePtr(objPtr)
if err != nil {
return "", nil, err
}
v.Set(reflect.Zero(v.Type()))
return "", nil, nil
} else if inErr != nil {
return "", nil, inErr
}
return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response)
}
body = node.Value
out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr)
if err != nil {
return body, nil, err
}
if out != objPtr {
return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
}
// being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex)
return body, node, err
}
// Implements storage.Interface.
func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.Filter, listObj runtime.Object) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
trace := util.NewTrace("GetToList " + getTypeName(listObj))
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
key = h.prefixEtcdKey(key)
startTime := time.Now()
trace.Step("About to read etcd node")
opts := &etcd.GetOptions{
Quorum: h.quorum,
}
response, err := h.etcdKeysAPI.Get(ctx, key, opts)
trace.Step("Etcd node read")
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
if err != nil {
if etcdutil.IsEtcdNotFound(err) {
return nil
}
return toStorageErr(err, key, 0)
}
nodes := make([]*etcd.Node, 0)
nodes = append(nodes, response.Node)
if err := h.decodeNodeList(nodes, filter, listPtr); err != nil {
return err
}
trace.Step("Object decoded")
if err := h.versioner.UpdateList(listObj, response.Index); err != nil {
return err
}
return nil
}
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.Filter, slicePtr interface{}) error {
trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
defer trace.LogIfLong(400 * time.Millisecond)
v, err := conversion.EnforcePtr(slicePtr)
if err != nil || v.Kind() != reflect.Slice {
// This should not happen at runtime.
panic("need ptr to slice")
}
for _, node := range nodes {
if node.Dir {
trace.Step("Decoding dir " + node.Key + " START")
if err := h.decodeNodeList(node.Nodes, filter, slicePtr); err != nil {
return err
}
trace.Step("Decoding dir " + node.Key + " END")
continue
}
if obj, found := h.getFromCache(node.ModifiedIndex, filter); found {
// obj != nil iff it matches the filter function.
if obj != nil {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
} else {
obj, _, err := h.codec.Decode([]byte(node.Value), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(obj, node.ModifiedIndex)
if filter.Filter(obj) {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
if node.ModifiedIndex != 0 {
h.addToCache(node.ModifiedIndex, obj)
}
}
}
trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes)))
return nil
}
// Implements storage.Interface.
func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, filter storage.Filter, listObj runtime.Object) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
trace := util.NewTrace("List " + getTypeName(listObj))
defer trace.LogIfLong(400 * time.Millisecond)
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
key = h.prefixEtcdKey(key)
startTime := time.Now()
trace.Step("About to list etcd node")
nodes, index, err := h.listEtcdNode(ctx, key)
trace.Step("Etcd node listed")
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
if err != nil {
return err
}
if err := h.decodeNodeList(nodes, filter, listPtr); err != nil {
return err
}
trace.Step("Node list decoded")
if err := h.versioner.UpdateList(listObj, index); err != nil {
return err
}
return nil
}
func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node, uint64, error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
opts := etcd.GetOptions{
Recursive: true,
Sort: true,
Quorum: h.quorum,
}
result, err := h.etcdKeysAPI.Get(ctx, key, &opts)
if err != nil {
var index uint64
if etcdError, ok := err.(etcd.Error); ok {
index = etcdError.Index
}
nodes := make([]*etcd.Node, 0)
if etcdutil.IsEtcdNotFound(err) {
return nodes, index, nil
} else {
return nodes, index, toStorageErr(err, key, 0)
}
}
return result.Node.Nodes, result.Index, nil
}
// Implements storage.Interface.
func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
v, err := conversion.EnforcePtr(ptrToType)
if err != nil {
// Panic is appropriate, because this is a programming error.
panic("need ptr to type")
}
key = h.prefixEtcdKey(key)
for {
obj := reflect.New(v.Type()).Interface().(runtime.Object)
origBody, node, res, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
if err != nil {
return toStorageErr(err, key, 0)
}
if err := checkPreconditions(key, preconditions, obj); err != nil {
return toStorageErr(err, key, 0)
}
meta := storage.ResponseMeta{}
if node != nil {
meta.TTL = node.TTL
meta.ResourceVersion = node.ModifiedIndex
}
// Get the object to be written by calling tryUpdate.
ret, newTTL, err := tryUpdate(obj, meta)
if err != nil {
return toStorageErr(err, key, 0)
}
index := uint64(0)
ttl := uint64(0)
if node != nil {
index = node.ModifiedIndex
if node.TTL != 0 {
ttl = uint64(node.TTL)
}
if node.Expiration != nil && ttl == 0 {
ttl = 1
}
} else if res != nil {
index = res.Index
}
if newTTL != nil {
if ttl != 0 && *newTTL == 0 {
// TODO: remove this after we have verified this is no longer an issue
glog.V(4).Infof("GuaranteedUpdate is clearing TTL for %q, may not be intentional", key)
}
ttl = *newTTL
}
// Since update object may have a resourceVersion set, we need to clear it here.
if err := h.versioner.UpdateObject(ret, 0); err != nil {
return errors.New("resourceVersion cannot be set on objects store in etcd")
}
data, err := runtime.Encode(h.codec, ret)
if err != nil {
return err
}
// First time this key has been used, try creating new value.
if index == 0 {
startTime := time.Now()
opts := etcd.SetOptions{
TTL: time.Duration(ttl) * time.Second,
PrevExist: etcd.PrevNoExist,
}
response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
if etcdutil.IsEtcdNodeExist(err) {
continue
}
_, _, err = h.extractObj(response, err, ptrToType, false, false)
return toStorageErr(err, key, 0)
}
if string(data) == origBody {
// If we don't send an update, we simply return the currently existing
// version of the object.
_, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false)
return err
}
startTime := time.Now()
// Swap origBody with data, if origBody is the latest etcd data.
opts := etcd.SetOptions{
PrevValue: origBody,
PrevIndex: index,
TTL: time.Duration(ttl) * time.Second,
}
response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
if etcdutil.IsEtcdTestFailed(err) {
// Try again.
continue
}
_, _, err = h.extractObj(response, err, ptrToType, false, false)
return toStorageErr(err, key, int64(index))
}
}
func (h *etcdHelper) prefixEtcdKey(key string) string {
if strings.HasPrefix(key, h.pathPrefix) {
return key
}
return path.Join(h.pathPrefix, key)
}
// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
// their Node.ModifiedIndex, which is unique across all types.
// All implementations must be thread-safe.
type etcdCache interface {
getFromCache(index uint64, filter storage.Filter) (runtime.Object, bool)
addToCache(index uint64, obj runtime.Object)
}
func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}
func (h *etcdHelper) getFromCache(index uint64, filter storage.Filter) (runtime.Object, bool) {
startTime := time.Now()
defer func() {
metrics.ObserveGetCache(startTime)
}()
obj, found := h.cache.Get(index)
if found {
if !filter.Filter(obj.(runtime.Object)) {
return nil, true
}
// We should not return the object itself to avoid polluting the cache if someone
// modifies returned values.
objCopy, err := h.copier.Copy(obj.(runtime.Object))
if err != nil {
glog.Errorf("Error during DeepCopy of cached object: %q", err)
// We can't return a copy, thus we report the object as not found.
return nil, false
}
metrics.ObserveCacheHit()
return objCopy.(runtime.Object), true
}
metrics.ObserveCacheMiss()
return nil, false
}
func (h *etcdHelper) addToCache(index uint64, obj runtime.Object) {
startTime := time.Now()
defer func() {
metrics.ObserveAddCache(startTime)
}()
objCopy, err := h.copier.Copy(obj)
if err != nil {
glog.Errorf("Error during DeepCopy of cached object: %q", err)
return
}
isOverwrite := h.cache.Add(index, objCopy)
if !isOverwrite {
metrics.ObserveNewEntry()
}
}
func toStorageErr(err error, key string, rv int64) error {
if err == nil {
return nil
}
switch {
case etcdutil.IsEtcdNotFound(err):
return storage.NewKeyNotFoundError(key, rv)
case etcdutil.IsEtcdNodeExist(err):
return storage.NewKeyExistsError(key, rv)
case etcdutil.IsEtcdTestFailed(err):
return storage.NewResourceVersionConflictsError(key, rv)
case etcdutil.IsEtcdUnreachable(err):
return storage.NewUnreachableError(key, rv)
default:
return err
}
}

View file

@ -0,0 +1,499 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/watch"
etcd "github.com/coreos/etcd/client"
"github.com/golang/glog"
"golang.org/x/net/context"
)
// Etcd watch event actions
const (
EtcdCreate = "create"
EtcdGet = "get"
EtcdSet = "set"
EtcdCAS = "compareAndSwap"
EtcdDelete = "delete"
EtcdCAD = "compareAndDelete"
EtcdExpire = "expire"
)
// HighWaterMark is a thread-safe object for tracking the maximum value seen
// for some quantity.
type HighWaterMark int64
// Update returns true if and only if 'current' is the highest value ever seen.
func (hwm *HighWaterMark) Update(current int64) bool {
for {
old := atomic.LoadInt64((*int64)(hwm))
if current <= old {
return false
}
if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
return true
}
}
}
// TransformFunc attempts to convert an object to another object for use with a watcher.
type TransformFunc func(runtime.Object) (runtime.Object, error)
// includeFunc returns true if the given key should be considered part of a watch
type includeFunc func(key string) bool
// exceptKey is an includeFunc that returns false when the provided key matches the watched key
func exceptKey(except string) includeFunc {
return func(key string) bool {
return key != except
}
}
// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
encoding runtime.Codec
// Note that versioner is required for etcdWatcher to work correctly.
// There is no public constructor of it, so be careful when manipulating
// with it manually.
versioner storage.Versioner
transform TransformFunc
list bool // If we're doing a recursive watch, should be true.
quorum bool // If we enable quorum, shoule be true
include includeFunc
filter storage.Filter
etcdIncoming chan *etcd.Response
etcdError chan error
ctx context.Context
cancel context.CancelFunc
etcdCallEnded chan struct{}
outgoing chan watch.Event
userStop chan struct{}
stopped bool
stopLock sync.Mutex
// wg is used to avoid calls to etcd after Stop(), and to make sure
// that the translate goroutine is not leaked.
wg sync.WaitGroup
// Injectable for testing. Send the event down the outgoing channel.
emit func(watch.Event)
cache etcdCache
}
// watchWaitDuration is the amount of time to wait for an error from watch.
const watchWaitDuration = 100 * time.Millisecond
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes.
// The versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(
list bool, quorum bool, include includeFunc, filter storage.Filter,
encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
cache etcdCache) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
quorum: quorum,
include: include,
filter: filter,
// Buffer this channel, so that the etcd client is not forced
// to context switch with every object it gets, and so that a
// long time spent decoding an object won't block the *next*
// object. Basically, we see a lot of "401 window exceeded"
// errors from etcd, and that's due to the client not streaming
// results but rather getting them one at a time. So we really
// want to never block the etcd client, if possible. The 100 is
// mostly arbitrary--we know it goes as high as 50, though.
// There's a V(2) log message that prints the length so we can
// monitor how much of this buffer is actually used.
etcdIncoming: make(chan *etcd.Response, 100),
etcdError: make(chan error, 1),
// Similarly to etcdIncomming, we don't want to force context
// switch on every new incoming object.
outgoing: make(chan watch.Event, 100),
userStop: make(chan struct{}),
stopped: false,
wg: sync.WaitGroup{},
cache: cache,
ctx: nil,
cancel: nil,
}
w.emit = func(e watch.Event) {
// Give up on user stop, without this we leak a lot of goroutines in tests.
select {
case w.outgoing <- e:
case <-w.userStop:
}
}
// translate will call done. We need to Add() here because otherwise,
// if Stop() gets called before translate gets started, there'd be a
// problem.
w.wg.Add(1)
go w.translate()
return w
}
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
// as a goroutine.
func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key string, resourceVersion uint64) {
defer utilruntime.HandleCrash()
defer close(w.etcdError)
defer close(w.etcdIncoming)
// All calls to etcd are coming from this function - once it is finished
// no other call to etcd should be generated by this watcher.
done := func() {}
// We need to be prepared, that Stop() can be called at any time.
// It can potentially also be called, even before this function is called.
// If that is the case, we simply skip all the code here.
// See #18928 for more details.
var watcher etcd.Watcher
returned := func() bool {
w.stopLock.Lock()
defer w.stopLock.Unlock()
if w.stopped {
// Watcher has already been stopped - don't event initiate it here.
return true
}
w.wg.Add(1)
done = w.wg.Done
// Perform initialization of watcher under lock - we want to avoid situation when
// Stop() is called in the meantime (which in tests can cause etcd termination and
// strange behavior here).
if resourceVersion == 0 {
latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.quorum, w.etcdIncoming)
if err != nil {
w.etcdError <- err
return true
}
resourceVersion = latest
}
opts := etcd.WatcherOptions{
Recursive: w.list,
AfterIndex: resourceVersion,
}
watcher = client.Watcher(key, &opts)
w.ctx, w.cancel = context.WithCancel(ctx)
return false
}()
defer done()
if returned {
return
}
for {
resp, err := watcher.Next(w.ctx)
if err != nil {
w.etcdError <- err
return
}
w.etcdIncoming <- resp
}
}
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key string, recursive bool, quorum bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
opts := etcd.GetOptions{
Recursive: recursive,
Sort: false,
Quorum: quorum,
}
resp, err := client.Get(ctx, key, &opts)
if err != nil {
if !etcdutil.IsEtcdNotFound(err) {
utilruntime.HandleError(fmt.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err))
return resourceVersion, toStorageErr(err, key, 0)
}
if etcdError, ok := err.(etcd.Error); ok {
resourceVersion = etcdError.Index
}
return resourceVersion, nil
}
resourceVersion = resp.Index
convertRecursiveResponse(resp.Node, resp, incoming)
return
}
// convertRecursiveResponse turns a recursive get response from etcd into individual response objects
// by copying the original response. This emulates the behavior of a recursive watch.
func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) {
if node.Dir {
for i := range node.Nodes {
convertRecursiveResponse(node.Nodes[i], response, incoming)
}
return
}
copied := *response
copied.Action = "get"
copied.Node = node
incoming <- &copied
}
var (
watchChannelHWM HighWaterMark
)
// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
// called as a goroutine.
func (w *etcdWatcher) translate() {
defer w.wg.Done()
defer close(w.outgoing)
defer utilruntime.HandleCrash()
for {
select {
case err := <-w.etcdError:
if err != nil {
var status *unversioned.Status
switch {
case etcdutil.IsEtcdWatchExpired(err):
status = &unversioned.Status{
Status: unversioned.StatusFailure,
Message: err.Error(),
Code: http.StatusGone, // Gone
Reason: unversioned.StatusReasonExpired,
}
// TODO: need to generate errors using api/errors which has a circular dependency on this package
// no other way to inject errors
// case etcdutil.IsEtcdUnreachable(err):
// status = errors.NewServerTimeout(...)
default:
status = &unversioned.Status{
Status: unversioned.StatusFailure,
Message: err.Error(),
Code: http.StatusInternalServerError,
Reason: unversioned.StatusReasonInternalError,
}
}
w.emit(watch.Event{
Type: watch.Error,
Object: status,
})
}
return
case <-w.userStop:
return
case res, ok := <-w.etcdIncoming:
if ok {
if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
glog.V(2).Infof("watch: %v objects queued in channel.", curLen)
}
w.sendResult(res)
}
// If !ok, don't return here-- must wait for etcdError channel
// to give an error or be closed.
}
}
}
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.Everything); found {
return obj, nil
}
obj, err := runtime.Decode(w.encoding, []byte(node.Value))
if err != nil {
return nil, err
}
// ensure resource version is set on the object we load from etcd
if err := w.versioner.UpdateObject(obj, node.ModifiedIndex); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err))
}
// perform any necessary transformation
if w.transform != nil {
obj, err = w.transform(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failure to transform api object %#v: %v", obj, err))
return nil, err
}
}
if node.ModifiedIndex != 0 {
w.cache.addToCache(node.ModifiedIndex, obj)
}
return obj, nil
}
func (w *etcdWatcher) sendAdd(res *etcd.Response) {
if res.Node == nil {
utilruntime.HandleError(fmt.Errorf("unexpected nil node: %#v", res))
return
}
if w.include != nil && !w.include(res.Node.Key) {
return
}
obj, err := w.decodeObject(res.Node)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node))
// TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value.
return
}
if !w.filter.Filter(obj) {
return
}
action := watch.Added
if res.Node.ModifiedIndex != res.Node.CreatedIndex {
action = watch.Modified
}
w.emit(watch.Event{
Type: action,
Object: obj,
})
}
func (w *etcdWatcher) sendModify(res *etcd.Response) {
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
if w.include != nil && !w.include(res.Node.Key) {
return
}
curObj, err := w.decodeObject(res.Node)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node))
// TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value.
return
}
curObjPasses := w.filter.Filter(curObj)
oldObjPasses := false
var oldObj runtime.Object
if res.PrevNode != nil && res.PrevNode.Value != "" {
// Ignore problems reading the old object.
if oldObj, err = w.decodeObject(res.PrevNode); err == nil {
if err := w.versioner.UpdateObject(oldObj, res.Node.ModifiedIndex); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", res.Node.ModifiedIndex, oldObj, err))
}
oldObjPasses = w.filter.Filter(oldObj)
}
}
// Some changes to an object may cause it to start or stop matching a filter.
// We need to report those as adds/deletes. So we have to check both the previous
// and current value of the object.
switch {
case curObjPasses && oldObjPasses:
w.emit(watch.Event{
Type: watch.Modified,
Object: curObj,
})
case curObjPasses && !oldObjPasses:
w.emit(watch.Event{
Type: watch.Added,
Object: curObj,
})
case !curObjPasses && oldObjPasses:
w.emit(watch.Event{
Type: watch.Deleted,
Object: oldObj,
})
}
// Do nothing if neither new nor old object passed the filter.
}
func (w *etcdWatcher) sendDelete(res *etcd.Response) {
if res.PrevNode == nil {
utilruntime.HandleError(fmt.Errorf("unexpected nil prev node: %#v", res))
return
}
if w.include != nil && !w.include(res.PrevNode.Key) {
return
}
node := *res.PrevNode
if res.Node != nil {
// Note that this sends the *old* object with the etcd index for the time at
// which it gets deleted. This will allow users to restart the watch at the right
// index.
node.ModifiedIndex = res.Node.ModifiedIndex
}
obj, err := w.decodeObject(&node)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\nfrom %#v %#v", err, res, res.Node))
// TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value.
return
}
if !w.filter.Filter(obj) {
return
}
w.emit(watch.Event{
Type: watch.Deleted,
Object: obj,
})
}
func (w *etcdWatcher) sendResult(res *etcd.Response) {
switch res.Action {
case EtcdCreate, EtcdGet:
w.sendAdd(res)
case EtcdSet, EtcdCAS:
w.sendModify(res)
case EtcdDelete, EtcdExpire, EtcdCAD:
w.sendDelete(res)
default:
utilruntime.HandleError(fmt.Errorf("unknown action: %v", res.Action))
}
}
// ResultChan implements watch.Interface.
func (w *etcdWatcher) ResultChan() <-chan watch.Event {
return w.outgoing
}
// Stop implements watch.Interface.
func (w *etcdWatcher) Stop() {
w.stopLock.Lock()
if w.cancel != nil {
w.cancel()
w.cancel = nil
}
if !w.stopped {
w.stopped = true
close(w.userStop)
}
w.stopLock.Unlock()
// Wait until all calls to etcd are finished and no other
// will be issued.
w.wg.Wait()
}

View file

@ -0,0 +1,113 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
)
var (
cacheHitCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "etcd_helper_cache_hit_count",
Help: "Counter of etcd helper cache hits.",
},
)
cacheMissCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "etcd_helper_cache_miss_count",
Help: "Counter of etcd helper cache miss.",
},
)
cacheEntryCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "etcd_helper_cache_entry_count",
Help: "Counter of etcd helper cache entries. This can be different from etcd_helper_cache_miss_count " +
"because two concurrent threads can miss the cache and generate the same entry twice.",
},
)
cacheGetLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "etcd_request_cache_get_latencies_summary",
Help: "Latency in microseconds of getting an object from etcd cache",
},
)
cacheAddLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "etcd_request_cache_add_latencies_summary",
Help: "Latency in microseconds of adding an object to etcd cache",
},
)
etcdRequestLatenciesSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "etcd_request_latencies_summary",
Help: "Etcd request latency summary in microseconds for each operation and object type.",
},
[]string{"operation", "type"},
)
)
var registerMetrics sync.Once
// Register all metrics.
func Register() {
// Register the metrics.
registerMetrics.Do(func() {
prometheus.MustRegister(cacheHitCounter)
prometheus.MustRegister(cacheMissCounter)
prometheus.MustRegister(cacheEntryCounter)
prometheus.MustRegister(cacheAddLatency)
prometheus.MustRegister(cacheGetLatency)
prometheus.MustRegister(etcdRequestLatenciesSummary)
})
}
func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond))
}
func ObserveGetCache(startTime time.Time) {
cacheGetLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
}
func ObserveAddCache(startTime time.Time) {
cacheAddLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
}
func ObserveCacheHit() {
cacheHitCounter.Inc()
}
func ObserveCacheMiss() {
cacheMissCounter.Inc()
}
func ObserveNewEntry() {
cacheEntryCounter.Inc()
}
func Reset() {
cacheHitCounter.Set(0)
cacheMissCounter.Set(0)
cacheEntryCounter.Set(0)
// TODO: Reset cacheAddLatency.
// TODO: Reset cacheGetLatency.
etcdRequestLatenciesSummary.Reset()
}

19
vendor/k8s.io/kubernetes/pkg/storage/etcd/util/doc.go generated vendored Normal file
View file

@ -0,0 +1,19 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package util holds generic etcd-related utility functions that any user of ectd might want to
// use, without pulling in kubernetes-specific code.
package util

View file

@ -0,0 +1,99 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
etcd "github.com/coreos/etcd/client"
)
// IsEtcdNotFound returns true if and only if err is an etcd not found error.
func IsEtcdNotFound(err error) bool {
return isEtcdErrorNum(err, etcd.ErrorCodeKeyNotFound)
}
// IsEtcdNodeExist returns true if and only if err is an etcd node already exist error.
func IsEtcdNodeExist(err error) bool {
return isEtcdErrorNum(err, etcd.ErrorCodeNodeExist)
}
// IsEtcdTestFailed returns true if and only if err is an etcd write conflict.
func IsEtcdTestFailed(err error) bool {
return isEtcdErrorNum(err, etcd.ErrorCodeTestFailed)
}
// IsEtcdWatchExpired returns true if and only if err indicates the watch has expired.
func IsEtcdWatchExpired(err error) bool {
// NOTE: This seems weird why it wouldn't be etcd.ErrorCodeWatcherCleared
// I'm using the previous matching value
return isEtcdErrorNum(err, etcd.ErrorCodeEventIndexCleared)
}
// IsEtcdUnreachable returns true if and only if err indicates the server could not be reached.
func IsEtcdUnreachable(err error) bool {
// NOTE: The logic has changed previous error code no longer applies
return err == etcd.ErrClusterUnavailable
}
// isEtcdErrorNum returns true if and only if err is an etcd error, whose errorCode matches errorCode
func isEtcdErrorNum(err error, errorCode int) bool {
if err != nil {
if etcdError, ok := err.(etcd.Error); ok {
return etcdError.Code == errorCode
}
// NOTE: There are other error types returned
}
return false
}
// GetEtcdVersion performs a version check against the provided Etcd server,
// returning the string response, and error (if any).
func GetEtcdVersion(host string) (string, error) {
response, err := http.Get(host + "/version")
if err != nil {
return "", err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return "", fmt.Errorf("unsuccessful response from etcd server %q: %v", host, err)
}
versionBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", err
}
return string(versionBytes), nil
}
type etcdHealth struct {
// Note this has to be public so the json library can modify it.
Health string `json:"health"`
}
func EtcdHealthCheck(data []byte) error {
obj := etcdHealth{}
if err := json.Unmarshal(data, &obj); err != nil {
return err
}
if obj.Health != "true" {
return fmt.Errorf("Unhealthy status: %s", obj.Health)
}
return nil
}

161
vendor/k8s.io/kubernetes/pkg/storage/etcd3/compact.go generated vendored Normal file
View file

@ -0,0 +1,161 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"strconv"
"sync"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/golang/glog"
"golang.org/x/net/context"
)
const (
compactInterval = 10 * time.Minute
compactRevKey = "compact_rev_key"
)
var (
endpointsMapMu sync.Mutex
endpointsMap map[string]struct{}
)
func init() {
endpointsMap = make(map[string]struct{})
}
// StartCompactor starts a compactor in the background to compact old version of keys that's not needed.
// By default, we save the most recent 10 minutes data and compact versions > 10minutes ago.
// It should be enough for slow watchers and to tolerate burst.
// TODO: We might keep a longer history (12h) in the future once storage API can take advantage of past version of keys.
func StartCompactor(ctx context.Context, client *clientv3.Client) {
endpointsMapMu.Lock()
defer endpointsMapMu.Unlock()
// In one process, we can have only one compactor for one cluster.
// Currently we rely on endpoints to differentiate clusters.
for _, ep := range client.Endpoints() {
if _, ok := endpointsMap[ep]; ok {
glog.V(4).Infof("compactor already exists for endpoints %v", client.Endpoints())
return
}
}
for _, ep := range client.Endpoints() {
endpointsMap[ep] = struct{}{}
}
go compactor(ctx, client, compactInterval)
}
// compactor periodically compacts historical versions of keys in etcd.
// It will compact keys with versions older than given interval.
// In other words, after compaction, it will only contain keys set during last interval.
// Any API call for the older versions of keys will return error.
// Interval is the time interval between each compaction. The first compaction happens after "interval".
func compactor(ctx context.Context, client *clientv3.Client, interval time.Duration) {
// Technical definitions:
// We have a special key in etcd defined as *compactRevKey*.
// compactRevKey's value will be set to the string of last compacted revision.
// compactRevKey's version will be used as logical time for comparison. THe version is referred as compact time.
// Initially, because the key doesn't exist, the compact time (version) is 0.
//
// Algorithm:
// - Compare to see if (local compact_time) = (remote compact_time).
// - If yes, increment both local and remote compact_time, and do a compaction.
// - If not, set local to remote compact_time.
//
// Technical details/insights:
//
// The protocol here is lease based. If one compactor CAS successfully, the others would know it when they fail in
// CAS later and would try again in 10 minutes. If an APIServer crashed, another one would "take over" the lease.
//
// For example, in the following diagram, we have a compactor C1 doing compaction in t1, t2. Another compactor C2
// at t1' (t1 < t1' < t2) would CAS fail, set its known oldRev to rev at t1', and try again in t2' (t2' > t2).
// If C1 crashed and wouldn't compact at t2, C2 would CAS successfully at t2'.
//
// oldRev(t2) curRev(t2)
// +
// oldRev curRev |
// + + |
// | | |
// | | t1' | t2'
// +---v-------------v----^---------v------^---->
// t0 t1 t2
//
// We have the guarantees:
// - in normal cases, the interval is 10 minutes.
// - in failover, the interval is >10m and <20m
//
// FAQ:
// - What if time is not accurate? We don't care as long as someone did the compaction. Atomicity is ensured using
// etcd API.
// - What happened under heavy load scenarios? Initially, each apiserver will do only one compaction
// every 10 minutes. This is very unlikely affecting or affected w.r.t. server load.
var compactTime int64
var rev int64
var err error
for {
select {
case <-time.After(interval):
case <-ctx.Done():
return
}
compactTime, rev, err = compact(ctx, client, compactTime, rev)
if err != nil {
glog.Errorf("etcd: endpoint (%v) compact failed: %v", client.Endpoints(), err)
continue
}
}
}
// compact compacts etcd store and returns current rev.
// It will return the current compact time and global revision if no error occurred.
// Note that CAS fail will not incur any error.
func compact(ctx context.Context, client *clientv3.Client, t, rev int64) (int64, int64, error) {
resp, err := client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.Version(compactRevKey), "=", t),
).Then(
clientv3.OpPut(compactRevKey, strconv.FormatInt(rev, 10)), // Expect side effect: increment Version
).Else(
clientv3.OpGet(compactRevKey),
).Commit()
if err != nil {
return t, rev, err
}
curRev := resp.Header.Revision
if !resp.Succeeded {
curTime := resp.Responses[0].GetResponseRange().Kvs[0].Version
return curTime, curRev, nil
}
curTime := t + 1
if rev == 0 {
// We don't compact on bootstrap.
return curTime, curRev, nil
}
if _, err = client.Compact(ctx, rev); err != nil {
return curTime, curRev, err
}
glog.Infof("etcd: compacted rev (%d), endpoints (%v)", rev, client.Endpoints())
return curTime, curRev, nil
}

50
vendor/k8s.io/kubernetes/pkg/storage/etcd3/event.go generated vendored Normal file
View file

@ -0,0 +1,50 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
)
type event struct {
key string
value []byte
rev int64
isDeleted bool
isCreated bool
}
func parseKV(kv *mvccpb.KeyValue) *event {
return &event{
key: string(kv.Key),
value: kv.Value,
rev: kv.ModRevision,
isDeleted: false,
isCreated: kv.ModRevision == kv.CreateRevision,
}
}
func parseEvent(e *clientv3.Event) *event {
return &event{
key: string(e.Kv.Key),
value: e.Kv.Value,
rev: e.Kv.ModRevision,
isDeleted: e.Type == clientv3.EventTypeDelete,
isCreated: e.IsCreate(),
}
}

455
vendor/k8s.io/kubernetes/pkg/storage/etcd3/store.go generated vendored Normal file
View file

@ -0,0 +1,455 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"bytes"
"errors"
"fmt"
"path"
"reflect"
"strings"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/watch"
"github.com/coreos/etcd/clientv3"
"github.com/golang/glog"
"golang.org/x/net/context"
)
type store struct {
client *clientv3.Client
codec runtime.Codec
versioner storage.Versioner
pathPrefix string
watcher *watcher
}
type elemForDecode struct {
data []byte
rev uint64
}
type objState struct {
obj runtime.Object
meta *storage.ResponseMeta
rev int64
data []byte
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface {
return newStore(c, codec, prefix)
}
func newStore(c *clientv3.Client, codec runtime.Codec, prefix string) *store {
versioner := etcd.APIObjectVersioner{}
return &store{
client: c,
versioner: versioner,
codec: codec,
pathPrefix: prefix,
watcher: newWatcher(c, codec, versioner),
}
}
// Versioner implements storage.Interface.Versioner.
func (s *store) Versioner() storage.Versioner {
return s.versioner
}
// Get implements storage.Interface.Get.
func (s *store) Get(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool) error {
key = keyWithPrefix(s.pathPrefix, key)
getResp, err := s.client.KV.Get(ctx, key)
if err != nil {
return err
}
if len(getResp.Kvs) == 0 {
if ignoreNotFound {
return runtime.SetZeroValue(out)
}
return storage.NewKeyNotFoundError(key, 0)
}
kv := getResp.Kvs[0]
return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision)
}
// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
}
data, err := runtime.Encode(s.codec, obj)
if err != nil {
return err
}
key = keyWithPrefix(s.pathPrefix, key)
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(data), opts...),
).Commit()
if err != nil {
return err
}
if !txnResp.Succeeded {
return storage.NewKeyExistsError(key, 0)
}
if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
return nil
}
// Delete implements storage.Interface.Delete.
func (s *store) Delete(ctx context.Context, key string, out runtime.Object, precondtions *storage.Preconditions) error {
v, err := conversion.EnforcePtr(out)
if err != nil {
panic("unable to convert output object to pointer")
}
key = keyWithPrefix(s.pathPrefix, key)
if precondtions == nil {
return s.unconditionalDelete(ctx, key, out)
}
return s.conditionalDelete(ctx, key, out, v, precondtions)
}
func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error {
// We need to do get and delete in single transaction in order to
// know the value and revision before deleting it.
txnResp, err := s.client.KV.Txn(ctx).If().Then(
clientv3.OpGet(key),
clientv3.OpDelete(key),
).Commit()
if err != nil {
return err
}
getResp := txnResp.Responses[0].GetResponseRange()
if len(getResp.Kvs) == 0 {
return storage.NewKeyNotFoundError(key, 0)
}
kv := getResp.Kvs[0]
return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision)
}
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, precondtions *storage.Preconditions) error {
getResp, err := s.client.KV.Get(ctx, key)
if err != nil {
return err
}
for {
origState, err := s.getState(getResp, key, v, false)
if err != nil {
return err
}
if err := checkPreconditions(key, precondtions, origState.obj); err != nil {
return err
}
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpDelete(key),
).Else(
clientv3.OpGet(key),
).Commit()
if err != nil {
return err
}
if !txnResp.Succeeded {
getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
continue
}
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
}
// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, precondtions *storage.Preconditions, tryUpdate storage.UpdateFunc) error {
v, err := conversion.EnforcePtr(out)
if err != nil {
panic("unable to convert output object to pointer")
}
key = keyWithPrefix(s.pathPrefix, key)
getResp, err := s.client.KV.Get(ctx, key)
if err != nil {
return err
}
for {
origState, err := s.getState(getResp, key, v, ignoreNotFound)
if err != nil {
return err
}
if err := checkPreconditions(key, precondtions, origState.obj); err != nil {
return err
}
ret, ttl, err := s.updateState(origState, tryUpdate)
if err != nil {
return err
}
data, err := runtime.Encode(s.codec, ret)
if err != nil {
return err
}
if bytes.Equal(data, origState.data) {
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpPut(key, string(data), opts...),
).Else(
clientv3.OpGet(key),
).Commit()
if err != nil {
return err
}
if !txnResp.Succeeded {
getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
glog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
continue
}
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
}
// GetToList implements storage.Interface.GetToList.
func (s *store) GetToList(ctx context.Context, key string, filter storage.Filter, listObj runtime.Object) error {
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
key = keyWithPrefix(s.pathPrefix, key)
getResp, err := s.client.KV.Get(ctx, key)
if err != nil {
return err
}
if len(getResp.Kvs) == 0 {
return nil
}
elems := []*elemForDecode{{
data: getResp.Kvs[0].Value,
rev: uint64(getResp.Kvs[0].ModRevision),
}}
if err := decodeList(elems, filter, listPtr, s.codec, s.versioner); err != nil {
return err
}
// update version with cluster level revision
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision))
}
// List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key, resourceVersion string, filter storage.Filter, listObj runtime.Object) error {
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
key = keyWithPrefix(s.pathPrefix, key)
// We need to make sure the key ended with "/" so that we only get children "directories".
// e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
// while with prefix "/a/" will return only "/a/b" which is the correct answer.
if !strings.HasSuffix(key, "/") {
key += "/"
}
getResp, err := s.client.KV.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
return err
}
elems := make([]*elemForDecode, len(getResp.Kvs))
for i, kv := range getResp.Kvs {
elems[i] = &elemForDecode{
data: kv.Value,
rev: uint64(kv.ModRevision),
}
}
if err := decodeList(elems, filter, listPtr, s.codec, s.versioner); err != nil {
return err
}
// update version with cluster level revision
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision))
}
// Watch implements storage.Interface.Watch.
func (s *store) Watch(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
return s.watch(ctx, key, resourceVersion, filter, false)
}
// WatchList implements storage.Interface.WatchList.
func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
return s.watch(ctx, key, resourceVersion, filter, true)
}
func (s *store) watch(ctx context.Context, key string, rv string, filter storage.Filter, recursive bool) (watch.Interface, error) {
rev, err := storage.ParseWatchResourceVersion(rv)
if err != nil {
return nil, err
}
key = keyWithPrefix(s.pathPrefix, key)
return s.watcher.Watch(ctx, key, int64(rev), recursive, filter)
}
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
state := &objState{
obj: reflect.New(v.Type()).Interface().(runtime.Object),
meta: &storage.ResponseMeta{},
}
if len(getResp.Kvs) == 0 {
if !ignoreNotFound {
return nil, storage.NewKeyNotFoundError(key, 0)
}
if err := runtime.SetZeroValue(state.obj); err != nil {
return nil, err
}
} else {
state.rev = getResp.Kvs[0].ModRevision
state.meta.ResourceVersion = uint64(state.rev)
state.data = getResp.Kvs[0].Value
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
return nil, err
}
}
return state, nil
}
func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, uint64, error) {
ret, ttlPtr, err := userUpdate(st.obj, *st.meta)
if err != nil {
return nil, 0, err
}
version, err := s.versioner.ObjectResourceVersion(ret)
if err != nil {
return nil, 0, err
}
if version != 0 {
// We cannot store object with resourceVersion in etcd. We need to reset it.
if err := s.versioner.UpdateObject(ret, 0); err != nil {
return nil, 0, fmt.Errorf("UpdateObject failed: %v", err)
}
}
var ttl uint64
if ttlPtr != nil {
ttl = *ttlPtr
}
return ret, ttl, nil
}
// ttlOpts returns client options based on given ttl.
// ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length
func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) {
if ttl == 0 {
return nil, nil
}
// TODO: one lease per ttl key is expensive. Based on current use case, we can have a long window to
// put keys within into same lease. We shall benchmark this and optimize the performance.
lcr, err := s.client.Lease.Grant(ctx, ttl)
if err != nil {
return nil, err
}
return []clientv3.OpOption{clientv3.WithLease(clientv3.LeaseID(lcr.ID))}, nil
}
func keyWithPrefix(prefix, key string) string {
if strings.HasPrefix(key, prefix) {
return key
}
return path.Join(prefix, key)
}
// decode decodes value of bytes into object. It will also set the object resource version to rev.
// On success, objPtr would be set to the object.
func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
if _, err := conversion.EnforcePtr(objPtr); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err := codec.Decode(value, nil, objPtr)
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
versioner.UpdateObject(objPtr, uint64(rev))
return nil
}
// decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev.
// On success, ListPtr would be set to the list of objects.
func decodeList(elems []*elemForDecode, filter storage.Filter, ListPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error {
v, err := conversion.EnforcePtr(ListPtr)
if err != nil || v.Kind() != reflect.Slice {
panic("need ptr to slice")
}
for _, elem := range elems {
obj, _, err := codec.Decode(elem.data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
versioner.UpdateObject(obj, elem.rev)
if filter.Filter(obj) {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
}
return nil
}
func checkPreconditions(key string, preconditions *storage.Preconditions, out runtime.Object) error {
if preconditions == nil {
return nil
}
objMeta, err := api.ObjectMetaFor(out)
if err != nil {
return storage.NewInternalErrorf("can't enforce preconditions %v on un-introspectable object %v, got error: %v", *preconditions, out, err)
}
if preconditions.UID != nil && *preconditions.UID != objMeta.UID {
errMsg := fmt.Sprintf("Precondition failed: UID in precondition: %v, UID in object meta: %v", preconditions.UID, objMeta.UID)
return storage.NewInvalidObjError(key, errMsg)
}
return nil
}
func notFound(key string) clientv3.Cmp {
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}

347
vendor/k8s.io/kubernetes/pkg/storage/etcd3/watcher.go generated vendored Normal file
View file

@ -0,0 +1,347 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"fmt"
"net/http"
"strings"
"sync"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/watch"
"github.com/coreos/etcd/clientv3"
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
const (
// We have set a buffer in order to reduce times of context switches.
incomingBufSize = 100
outgoingBufSize = 100
)
type watcher struct {
client *clientv3.Client
codec runtime.Codec
versioner storage.Versioner
}
// watchChan implements watch.Interface.
type watchChan struct {
watcher *watcher
key string
initialRev int64
recursive bool
filter storage.Filter
ctx context.Context
cancel context.CancelFunc
incomingEventChan chan *event
resultChan chan watch.Event
errChan chan error
}
func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner) *watcher {
return &watcher{
client: client,
codec: codec,
versioner: versioner,
}
}
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
// If rev is zero, it will return the existing object(s) and then start watching from
// the maximum revision+1 from returned objects.
// If rev is non-zero, it will watch events happened after given revision.
// If recursive is false, it watches on given key.
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
// filter must be non-nil. Only if filter returns true will the changes be returned.
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, filter storage.Filter) (watch.Interface, error) {
if recursive && !strings.HasSuffix(key, "/") {
key += "/"
}
wc := w.createWatchChan(ctx, key, rev, recursive, filter)
go wc.run()
return wc, nil
}
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, filter storage.Filter) *watchChan {
wc := &watchChan{
watcher: w,
key: key,
initialRev: rev,
recursive: recursive,
filter: filter,
incomingEventChan: make(chan *event, incomingBufSize),
resultChan: make(chan watch.Event, outgoingBufSize),
errChan: make(chan error, 1),
}
wc.ctx, wc.cancel = context.WithCancel(ctx)
return wc
}
func (wc *watchChan) run() {
go wc.startWatching()
var resultChanWG sync.WaitGroup
resultChanWG.Add(1)
go wc.processEvent(&resultChanWG)
select {
case err := <-wc.errChan:
errResult := parseError(err)
if errResult != nil {
// error result is guaranteed to be received by user before closing ResultChan.
select {
case wc.resultChan <- *errResult:
case <-wc.ctx.Done(): // user has given up all results
}
}
wc.cancel()
case <-wc.ctx.Done():
}
// we need to wait until resultChan wouldn't be sent to anymore
resultChanWG.Wait()
close(wc.resultChan)
}
func (wc *watchChan) Stop() {
wc.cancel()
}
func (wc *watchChan) ResultChan() <-chan watch.Event {
return wc.resultChan
}
// sync tries to retrieve existing data and send them to process.
// The revision to watch will be set to the revision in response.
func (wc *watchChan) sync() error {
opts := []clientv3.OpOption{}
if wc.recursive {
opts = append(opts, clientv3.WithPrefix())
}
getResp, err := wc.watcher.client.Get(wc.ctx, wc.key, opts...)
if err != nil {
return err
}
wc.initialRev = getResp.Header.Revision
for _, kv := range getResp.Kvs {
wc.sendEvent(parseKV(kv))
}
return nil
}
// startWatching does:
// - get current objects if initialRev=0; set initialRev to current rev
// - watch on given key and send events to process.
func (wc *watchChan) startWatching() {
if wc.initialRev == 0 {
if err := wc.sync(); err != nil {
wc.sendError(err)
return
}
}
opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1)}
if wc.recursive {
opts = append(opts, clientv3.WithPrefix())
}
wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...)
for wres := range wch {
if wres.Err() != nil {
// If there is an error on server (e.g. compaction), the channel will return it before closed.
wc.sendError(wres.Err())
return
}
for _, e := range wres.Events {
wc.sendEvent(parseEvent(e))
}
}
}
// processEvent processes events from etcd watcher and sends results to resultChan.
func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case e := <-wc.incomingEventChan:
res := wc.transform(e)
if res == nil {
continue
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher.
select {
case wc.resultChan <- *res:
case <-wc.ctx.Done():
return
}
case <-wc.ctx.Done():
return
}
}
}
// transform transforms an event into a result for user if not filtered.
// TODO (Optimization):
// - Save remote round-trip.
// Currently, DELETE and PUT event don't contain the previous value.
// We need to do another Get() in order to get previous object and have logic upon it.
// We could potentially do some optimizations:
// - For PUT, we can save current and previous objects into the value.
// - For DELETE, See https://github.com/coreos/etcd/issues/4620
func (wc *watchChan) transform(e *event) (res *watch.Event) {
curObj, oldObj, err := prepareObjs(wc.ctx, e, wc.watcher.client, wc.watcher.codec, wc.watcher.versioner)
if err != nil {
wc.sendError(err)
return nil
}
switch {
case e.isDeleted:
if !wc.filter.Filter(oldObj) {
return nil
}
res = &watch.Event{
Type: watch.Deleted,
Object: oldObj,
}
case e.isCreated:
if !wc.filter.Filter(curObj) {
return nil
}
res = &watch.Event{
Type: watch.Added,
Object: curObj,
}
default:
curObjPasses := wc.filter.Filter(curObj)
oldObjPasses := wc.filter.Filter(oldObj)
switch {
case curObjPasses && oldObjPasses:
res = &watch.Event{
Type: watch.Modified,
Object: curObj,
}
case curObjPasses && !oldObjPasses:
res = &watch.Event{
Type: watch.Added,
Object: curObj,
}
case !curObjPasses && oldObjPasses:
res = &watch.Event{
Type: watch.Deleted,
Object: oldObj,
}
}
}
return res
}
func parseError(err error) *watch.Event {
var status *unversioned.Status
switch {
case err == etcdrpc.ErrCompacted:
status = &unversioned.Status{
Status: unversioned.StatusFailure,
Message: err.Error(),
Code: http.StatusGone,
Reason: unversioned.StatusReasonExpired,
}
default:
status = &unversioned.Status{
Status: unversioned.StatusFailure,
Message: err.Error(),
Code: http.StatusInternalServerError,
Reason: unversioned.StatusReasonInternalError,
}
}
return &watch.Event{
Type: watch.Error,
Object: status,
}
}
func (wc *watchChan) sendError(err error) {
// Context.canceled is an expected behavior.
// We should just stop all goroutines in watchChan without returning error.
// TODO: etcd client should return context.Canceled instead of grpc specific error.
if grpc.Code(err) == codes.Canceled || err == context.Canceled {
return
}
select {
case wc.errChan <- err:
case <-wc.ctx.Done():
}
}
func (wc *watchChan) sendEvent(e *event) {
if len(wc.incomingEventChan) == incomingBufSize {
glog.V(2).Infof("Fast watcher, slow processing. Number of buffered events: %d."+
"Probably caused by slow decoding, user not receiving fast, or other processing logic",
incomingBufSize)
}
select {
case wc.incomingEventChan <- e:
case <-wc.ctx.Done():
}
}
func prepareObjs(ctx context.Context, e *event, client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner) (curObj runtime.Object, oldObj runtime.Object, err error) {
if !e.isDeleted {
curObj, err = decodeObj(codec, versioner, e.value, e.rev)
if err != nil {
return nil, nil, err
}
}
if e.isDeleted || !e.isCreated {
getResp, err := client.Get(ctx, e.key, clientv3.WithRev(e.rev-1))
if err != nil {
return nil, nil, err
}
// Note that this sends the *old* object with the etcd revision for the time at
// which it gets deleted.
// We assume old object is returned only in Deleted event. Users (e.g. cacher) need
// to have larger than previous rev to tell the ordering.
oldObj, err = decodeObj(codec, versioner, getResp.Kvs[0].Value, e.rev)
if err != nil {
return nil, nil, err
}
}
return curObj, oldObj, nil
}
func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, rev int64) (runtime.Object, error) {
obj, err := runtime.Decode(codec, []byte(data))
if err != nil {
return nil, err
}
// ensure resource version is set on the object we load from etcd
if err := versioner.UpdateObject(obj, uint64(rev)); err != nil {
return nil, fmt.Errorf("failure to version api object (%d) %#v: %v", rev, obj, err)
}
return obj, nil
}

View file

@ -51,15 +51,47 @@ type ResponseMeta struct {
ResourceVersion uint64
}
// FilterFunc is a predicate which takes an API object and returns true
// if and only if the object should remain in the set.
type FilterFunc func(obj runtime.Object) bool
// MatchValue defines a pair (<index name>, <value for that index>).
type MatchValue struct {
IndexName string
Value string
}
// Everything is a FilterFunc which accepts all objects.
func Everything(runtime.Object) bool {
// TriggerPublisherFunc is a function that takes an object, and returns a list of pairs
// (<index name>, <index value for the given object>) for all indexes known
// to that function.
type TriggerPublisherFunc func(obj runtime.Object) []MatchValue
// Filter is interface that is used to pass filtering mechanism.
type Filter interface {
// Filter is a predicate which takes an API object and returns true
// if and only if the object should remain in the set.
Filter(obj runtime.Object) bool
// For any triggers known to the Filter, if Filter() can return only
// (a subset of) objects for which indexing function returns <value>,
// (<index name>, <value> pair would be returned.
//
// This is optimization to avoid computing Filter() function (which are
// usually relatively expensive) in case we are sure they will return
// false anyway.
Trigger() []MatchValue
}
// Everything is a Filter which accepts all objects.
var Everything Filter = everything{}
// everything is implementation of Everything.
type everything struct {
}
func (e everything) Filter(runtime.Object) bool {
return true
}
func (e everything) Trigger() []MatchValue {
return nil
}
// Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update
// that is guaranteed to succeed.
// See the comment for GuaranteedUpdate for more details.
@ -77,14 +109,9 @@ func NewUIDPreconditions(uid string) *Preconditions {
return &Preconditions{UID: &u}
}
// Interface offers a common interface for object marshaling/unmarshling operations and
// Interface offers a common interface for object marshaling/unmarshaling operations and
// hides all the storage-related operations behind it.
type Interface interface {
// Returns list of servers addresses of the underyling database.
// TODO: This method is used only in a single place. Consider refactoring and getting rid
// of this method from the interface.
Backends(ctx context.Context) []string
// Returns Versioner associated with this interface.
Versioner() Versioner
@ -102,14 +129,14 @@ type Interface interface {
// resourceVersion may be used to specify what version to begin watching,
// which should be the current resourceVersion, and no longer rv+1
// (e.g. reconnecting without missing any updates).
Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error)
Watch(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error)
// WatchList begins watching the specified key's items. Items are decoded into API
// objects and any item passing 'filter' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching,
// which should be the current resourceVersion, and no longer rv+1
// (e.g. reconnecting without missing any updates).
WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error)
WatchList(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error)
// Get unmarshals json found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on ignoreNotFound.
@ -118,13 +145,13 @@ type Interface interface {
// GetToList unmarshals json found at key and opaque it into *List api object
// (an object that satisfies the runtime.IsList definition).
GetToList(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error
GetToList(ctx context.Context, key string, filter Filter, listObj runtime.Object) error
// List unmarshalls jsons found at directory defined by key and opaque them
// into *List api object (an object that satisfies runtime.IsList definition).
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error
List(ctx context.Context, key string, resourceVersion string, filter Filter, listObj runtime.Object) error
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
// retrying the update until success if there is index conflict.
@ -155,17 +182,4 @@ type Interface interface {
// }
// })
GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, precondtions *Preconditions, tryUpdate UpdateFunc) error
// Codec provides access to the underlying codec being used by the implementation.
Codec() runtime.Codec
}
// Config interface allows storage tiers to generate the proper storage.interface
// and reduce the dependencies to encapsulate storage.
type Config interface {
// Creates the Interface base on ConfigObject
NewStorage() (Interface, error)
// This function is used to enforce membership, and return the underlying type
GetType() string
}

View file

@ -0,0 +1,47 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storagebackend
import "k8s.io/kubernetes/pkg/runtime"
const (
StorageTypeUnset = ""
StorageTypeETCD2 = "etcd2"
StorageTypeETCD3 = "etcd3"
)
// Config is configuration for creating a storage backend.
type Config struct {
// Type defines the type of storage backend, e.g. "etcd2", etcd3". Default ("") is "etcd2".
Type string
// Prefix is the prefix to all keys passed to storage.Interface methods.
Prefix string
// ServerList is the list of storage servers to connect with.
ServerList []string
// TLS credentials
KeyFile string
CertFile string
CAFile string
// Quorum indicates that whether read operations should be quorum-level consistent.
Quorum bool
// DeserializationCacheSize is the size of cache of deserialized objects.
// Currently this is only supported in etcd2.
// We will drop the cache once using protobuf.
DeserializationCacheSize int
Codec runtime.Codec
}

View file

@ -0,0 +1,80 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import (
"net"
"net/http"
"time"
etcd2client "github.com/coreos/etcd/client"
"github.com/coreos/etcd/pkg/transport"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/storagebackend"
utilnet "k8s.io/kubernetes/pkg/util/net"
)
func newETCD2Storage(c storagebackend.Config) (storage.Interface, error) {
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
if err != nil {
return nil, err
}
client, err := newETCD2Client(tr, c.ServerList)
if err != nil {
return nil, err
}
return etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize), nil
}
func newETCD2Client(tr *http.Transport, serverList []string) (etcd2client.Client, error) {
cli, err := etcd2client.New(etcd2client.Config{
Endpoints: serverList,
Transport: tr,
})
if err != nil {
return nil, err
}
return cli, nil
}
func newTransportForETCD2(certFile, keyFile, caFile string) (*http.Transport, error) {
info := transport.TLSInfo{
CertFile: certFile,
KeyFile: keyFile,
CAFile: caFile,
}
cfg, err := info.ClientConfig()
if err != nil {
return nil, err
}
// Copied from etcd.DefaultTransport declaration.
// TODO: Determine if transport needs optimization
tr := utilnet.SetTransportDefaults(&http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConnsPerHost: 500,
TLSClientConfig: cfg,
})
return tr, nil
}

View file

@ -0,0 +1,50 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import (
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd3"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"golang.org/x/net/context"
)
func newETCD3Storage(c storagebackend.Config) (storage.Interface, error) {
tlsInfo := transport.TLSInfo{
CertFile: c.CertFile,
KeyFile: c.KeyFile,
CAFile: c.CAFile,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
cfg := clientv3.Config{
Endpoints: c.ServerList,
TLS: tlsConfig,
}
client, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
etcd3.StartCompactor(context.Background(), client)
return etcd3.New(client, c.Codec, c.Prefix), nil
}

View file

@ -0,0 +1,40 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import (
"fmt"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)
// Create creates a storage backend based on given config.
func Create(c storagebackend.Config) (storage.Interface, error) {
switch c.Type {
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD2:
return newETCD2Storage(c)
case storagebackend.StorageTypeETCD3:
// TODO: We have the following features to implement:
// - Support secure connection by using key, cert, and CA files.
// - Honor "https" scheme to support secure connection in gRPC.
// - Support non-quorum read.
return newETCD3Storage(c)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}

View file

@ -19,6 +19,7 @@ package storage
import (
"fmt"
"strconv"
"strings"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/validation"
@ -36,6 +37,41 @@ func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc {
}
}
// SimpleFilter implements Filter interface.
type SimpleFilter struct {
filterFunc func(runtime.Object) bool
triggerFunc func() []MatchValue
}
func (s *SimpleFilter) Filter(obj runtime.Object) bool {
return s.filterFunc(obj)
}
func (s *SimpleFilter) Trigger() []MatchValue {
return s.triggerFunc()
}
func NewSimpleFilter(
filterFunc func(runtime.Object) bool,
triggerFunc func() []MatchValue) Filter {
return &SimpleFilter{
filterFunc: filterFunc,
triggerFunc: triggerFunc,
}
}
func EverythingFunc(runtime.Object) bool {
return true
}
func NoTriggerFunc() []MatchValue {
return nil
}
func NoTriggerPublisher(runtime.Object) []MatchValue {
return nil
}
// ParseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch
@ -88,3 +124,28 @@ func NoNamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
}
return prefix + "/" + name, nil
}
// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary
func hasPathPrefix(s, pathPrefix string) bool {
// Short circuit if s doesn't contain the prefix at all
if !strings.HasPrefix(s, pathPrefix) {
return false
}
pathPrefixLength := len(pathPrefix)
if len(s) == pathPrefixLength {
// Exact match
return true
}
if strings.HasSuffix(pathPrefix, "/") {
// pathPrefix already ensured a path segment boundary
return true
}
if s[pathPrefixLength:pathPrefixLength+1] == "/" {
// The next character in s is a path segment boundary
// Check this instead of normalizing pathPrefix to avoid allocating on every call
return true
}
return false
}

View file

@ -27,7 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/clock"
"k8s.io/kubernetes/pkg/watch"
)
@ -96,7 +96,7 @@ type watchCache struct {
onEvent func(watchCacheEvent)
// for testing timeouts.
clock util.Clock
clock clock.Clock
}
func newWatchCache(capacity int) *watchCache {
@ -107,7 +107,7 @@ func newWatchCache(capacity int) *watchCache {
endIndex: 0,
store: cache.NewStore(cache.MetaNamespaceKeyFunc),
resourceVersion: 0,
clock: util.RealClock{},
clock: clock.RealClock{},
}
wc.cond = sync.NewCond(wc.RLocker())
return wc