Update go dependencies

This commit is contained in:
Manuel de Brito Fontes 2017-04-01 11:42:02 -03:00
parent e0561ddeb9
commit 88a2751234
1970 changed files with 413928 additions and 222867 deletions

215
vendor/k8s.io/client-go/util/cert/cert.go generated vendored Normal file
View file

@ -0,0 +1,215 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cert
import (
"bytes"
"crypto/ecdsa"
"crypto/elliptic"
cryptorand "crypto/rand"
"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"errors"
"fmt"
"math"
"math/big"
"net"
"time"
)
const (
rsaKeySize = 2048
duration365d = time.Hour * 24 * 365
)
// Config containes the basic fields required for creating a certificate
type Config struct {
CommonName string
Organization []string
AltNames AltNames
Usages []x509.ExtKeyUsage
}
// AltNames contains the domain names and IP addresses that will be added
// to the API Server's x509 certificate SubAltNames field. The values will
// be passed directly to the x509.Certificate object.
type AltNames struct {
DNSNames []string
IPs []net.IP
}
// NewPrivateKey creates an RSA private key
func NewPrivateKey() (*rsa.PrivateKey, error) {
return rsa.GenerateKey(cryptorand.Reader, rsaKeySize)
}
// NewSelfSignedCACert creates a CA certificate
func NewSelfSignedCACert(cfg Config, key *rsa.PrivateKey) (*x509.Certificate, error) {
now := time.Now()
tmpl := x509.Certificate{
SerialNumber: new(big.Int).SetInt64(0),
Subject: pkix.Name{
CommonName: cfg.CommonName,
Organization: cfg.Organization,
},
NotBefore: now.UTC(),
NotAfter: now.Add(duration365d * 10).UTC(),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
IsCA: true,
}
certDERBytes, err := x509.CreateCertificate(cryptorand.Reader, &tmpl, &tmpl, key.Public(), key)
if err != nil {
return nil, err
}
return x509.ParseCertificate(certDERBytes)
}
// NewSignedCert creates a signed certificate using the given CA certificate and key
func NewSignedCert(cfg Config, key *rsa.PrivateKey, caCert *x509.Certificate, caKey *rsa.PrivateKey) (*x509.Certificate, error) {
serial, err := cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64))
if err != nil {
return nil, err
}
if len(cfg.CommonName) == 0 {
return nil, errors.New("must specify a CommonName")
}
if len(cfg.Usages) == 0 {
return nil, errors.New("must specify at least one ExtKeyUsage")
}
certTmpl := x509.Certificate{
Subject: pkix.Name{
CommonName: cfg.CommonName,
Organization: cfg.Organization,
},
DNSNames: cfg.AltNames.DNSNames,
IPAddresses: cfg.AltNames.IPs,
SerialNumber: serial,
NotBefore: caCert.NotBefore,
NotAfter: time.Now().Add(duration365d).UTC(),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: cfg.Usages,
}
certDERBytes, err := x509.CreateCertificate(cryptorand.Reader, &certTmpl, caCert, key.Public(), caKey)
if err != nil {
return nil, err
}
return x509.ParseCertificate(certDERBytes)
}
// MakeEllipticPrivateKeyPEM creates an ECDSA private key
func MakeEllipticPrivateKeyPEM() ([]byte, error) {
privateKey, err := ecdsa.GenerateKey(elliptic.P256(), cryptorand.Reader)
if err != nil {
return nil, err
}
derBytes, err := x509.MarshalECPrivateKey(privateKey)
if err != nil {
return nil, err
}
privateKeyPemBlock := &pem.Block{
Type: ECPrivateKeyBlockType,
Bytes: derBytes,
}
return pem.EncodeToMemory(privateKeyPemBlock), nil
}
// GenerateSelfSignedCertKey creates a self-signed certificate and key for the given host.
// Host may be an IP or a DNS name
// You may also specify additional subject alt names (either ip or dns names) for the certificate
func GenerateSelfSignedCertKey(host string, alternateIPs []net.IP, alternateDNS []string) ([]byte, []byte, error) {
priv, err := rsa.GenerateKey(cryptorand.Reader, 2048)
if err != nil {
return nil, nil, err
}
template := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s@%d", host, time.Now().Unix()),
},
NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour * 24 * 365),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
BasicConstraintsValid: true,
IsCA: true,
}
if ip := net.ParseIP(host); ip != nil {
template.IPAddresses = append(template.IPAddresses, ip)
} else {
template.DNSNames = append(template.DNSNames, host)
}
template.IPAddresses = append(template.IPAddresses, alternateIPs...)
template.DNSNames = append(template.DNSNames, alternateDNS...)
derBytes, err := x509.CreateCertificate(cryptorand.Reader, &template, &template, &priv.PublicKey, priv)
if err != nil {
return nil, nil, err
}
// Generate cert
certBuffer := bytes.Buffer{}
if err := pem.Encode(&certBuffer, &pem.Block{Type: CertificateBlockType, Bytes: derBytes}); err != nil {
return nil, nil, err
}
// Generate key
keyBuffer := bytes.Buffer{}
if err := pem.Encode(&keyBuffer, &pem.Block{Type: RSAPrivateKeyBlockType, Bytes: x509.MarshalPKCS1PrivateKey(priv)}); err != nil {
return nil, nil, err
}
return certBuffer.Bytes(), keyBuffer.Bytes(), nil
}
// FormatBytesCert receives byte array certificate and formats in human-readable format
func FormatBytesCert(cert []byte) (string, error) {
block, _ := pem.Decode(cert)
c, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return "", fmt.Errorf("failed to parse certificate [%v]", err)
}
return FormatCert(c), nil
}
// FormatCert receives certificate and formats in human-readable format
func FormatCert(c *x509.Certificate) string {
var ips []string
for _, ip := range c.IPAddresses {
ips = append(ips, ip.String())
}
altNames := append(ips, c.DNSNames...)
res := fmt.Sprintf(
"Issuer: CN=%s | Subject: CN=%s | CA: %t\n",
c.Issuer.CommonName, c.Subject.CommonName, c.IsCA,
)
res += fmt.Sprintf("Not before: %s Not After: %s", c.NotBefore, c.NotAfter)
if len(altNames) > 0 {
res += fmt.Sprintf("\nAlternate Names: %v", altNames)
}
return res
}

75
vendor/k8s.io/client-go/util/cert/csr.go generated vendored Normal file
View file

@ -0,0 +1,75 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cert
import (
cryptorand "crypto/rand"
"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"net"
)
// MakeCSR generates a PEM-encoded CSR using the supplied private key, subject, and SANs.
// All key types that are implemented via crypto.Signer are supported (This includes *rsa.PrivateKey and *ecdsa.PrivateKey.)
func MakeCSR(privateKey interface{}, subject *pkix.Name, dnsSANs []string, ipSANs []net.IP) (csr []byte, err error) {
template := &x509.CertificateRequest{
Subject: *subject,
DNSNames: dnsSANs,
IPAddresses: ipSANs,
}
return MakeCSRFromTemplate(privateKey, template)
}
// MakeCSRFromTemplate generates a PEM-encoded CSR using the supplied private
// key and certificate request as a template. All key types that are
// implemented via crypto.Signer are supported (This includes *rsa.PrivateKey
// and *ecdsa.PrivateKey.)
func MakeCSRFromTemplate(privateKey interface{}, template *x509.CertificateRequest) ([]byte, error) {
t := *template
t.SignatureAlgorithm = sigType(privateKey)
csrDER, err := x509.CreateCertificateRequest(cryptorand.Reader, &t, privateKey)
if err != nil {
return nil, err
}
csrPemBlock := &pem.Block{
Type: "CERTIFICATE REQUEST",
Bytes: csrDER,
}
return pem.EncodeToMemory(csrPemBlock), nil
}
func sigType(privateKey interface{}) x509.SignatureAlgorithm {
// Customize the signature for RSA keys, depending on the key size
if privateKey, ok := privateKey.(*rsa.PrivateKey); ok {
keySize := privateKey.N.BitLen()
switch {
case keySize >= 4096:
return x509.SHA512WithRSA
case keySize >= 3072:
return x509.SHA384WithRSA
default:
return x509.SHA256WithRSA
}
}
return x509.UnknownSignatureAlgorithm
}

150
vendor/k8s.io/client-go/util/cert/io.go generated vendored Normal file
View file

@ -0,0 +1,150 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cert
import (
"crypto/x509"
"fmt"
"io/ioutil"
"os"
"path/filepath"
)
// CanReadCertAndKey returns true if the certificate and key files already exists,
// otherwise returns false. If lost one of cert and key, returns error.
func CanReadCertAndKey(certPath, keyPath string) (bool, error) {
certReadable := canReadFile(certPath)
keyReadable := canReadFile(keyPath)
if certReadable == false && keyReadable == false {
return false, nil
}
if certReadable == false {
return false, fmt.Errorf("error reading %s, certificate and key must be supplied as a pair", certPath)
}
if keyReadable == false {
return false, fmt.Errorf("error reading %s, certificate and key must be supplied as a pair", keyPath)
}
return true, nil
}
// If the file represented by path exists and
// readable, returns true otherwise returns false.
func canReadFile(path string) bool {
f, err := os.Open(path)
if err != nil {
return false
}
defer f.Close()
return true
}
// WriteCert writes the pem-encoded certificate data to certPath.
// The certificate file will be created with file mode 0644.
// If the certificate file already exists, it will be overwritten.
// The parent directory of the certPath will be created as needed with file mode 0755.
func WriteCert(certPath string, data []byte) error {
if err := os.MkdirAll(filepath.Dir(certPath), os.FileMode(0755)); err != nil {
return err
}
if err := ioutil.WriteFile(certPath, data, os.FileMode(0644)); err != nil {
return err
}
return nil
}
// WriteKey writes the pem-encoded key data to keyPath.
// The key file will be created with file mode 0600.
// If the key file already exists, it will be overwritten.
// The parent directory of the keyPath will be created as needed with file mode 0755.
func WriteKey(keyPath string, data []byte) error {
if err := os.MkdirAll(filepath.Dir(keyPath), os.FileMode(0755)); err != nil {
return err
}
if err := ioutil.WriteFile(keyPath, data, os.FileMode(0600)); err != nil {
return err
}
return nil
}
// LoadOrGenerateKeyFile looks for a key in the file at the given path. If it
// can't find one, it will generate a new key and store it there.
func LoadOrGenerateKeyFile(keyPath string) (data []byte, wasGenerated bool, err error) {
loadedData, err := ioutil.ReadFile(keyPath)
if err == nil {
return loadedData, false, err
}
if !os.IsNotExist(err) {
return nil, false, fmt.Errorf("error loading key from %s: %v", keyPath, err)
}
generatedData, err := MakeEllipticPrivateKeyPEM()
if err != nil {
return nil, false, fmt.Errorf("error generating key: %v", err)
}
if err := WriteKey(keyPath, generatedData); err != nil {
return nil, false, fmt.Errorf("error writing key to %s: %v", keyPath, err)
}
return generatedData, true, nil
}
// NewPool returns an x509.CertPool containing the certificates in the given PEM-encoded file.
// Returns an error if the file could not be read, a certificate could not be parsed, or if the file does not contain any certificates
func NewPool(filename string) (*x509.CertPool, error) {
certs, err := CertsFromFile(filename)
if err != nil {
return nil, err
}
pool := x509.NewCertPool()
for _, cert := range certs {
pool.AddCert(cert)
}
return pool, nil
}
// CertsFromFile returns the x509.Certificates contained in the given PEM-encoded file.
// Returns an error if the file could not be read, a certificate could not be parsed, or if the file does not contain any certificates
func CertsFromFile(file string) ([]*x509.Certificate, error) {
pemBlock, err := ioutil.ReadFile(file)
if err != nil {
return nil, err
}
certs, err := ParseCertsPEM(pemBlock)
if err != nil {
return nil, fmt.Errorf("error reading %s: %s", file, err)
}
return certs, nil
}
// PrivateKeyFromFile returns the private key in rsa.PrivateKey or ecdsa.PrivateKey format from a given PEM-encoded file.
// Returns an error if the file could not be read or if the private key could not be parsed.
func PrivateKeyFromFile(file string) (interface{}, error) {
pemBlock, err := ioutil.ReadFile(file)
if err != nil {
return nil, err
}
key, err := ParsePrivateKeyPEM(pemBlock)
if err != nil {
return nil, fmt.Errorf("error reading %s: %v", file, err)
}
return key, nil
}

138
vendor/k8s.io/client-go/util/cert/pem.go generated vendored Normal file
View file

@ -0,0 +1,138 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cert
import (
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
)
const (
// ECPrivateKeyBlockType is a possible value for pem.Block.Type.
ECPrivateKeyBlockType = "EC PRIVATE KEY"
// RSAPrivateKeyBlockType is a possible value for pem.Block.Type.
RSAPrivateKeyBlockType = "RSA PRIVATE KEY"
// CertificateBlockType is a possible value for pem.Block.Type.
CertificateBlockType = "CERTIFICATE"
// CertificateRequestBlockType is a possible value for pem.Block.Type.
CertificateRequestBlockType = "CERTIFICATE REQUEST"
// PrivateKeyBlockType is a possible value for pem.Block.Type.
PrivateKeyBlockType = "PRIVATE KEY"
// PublicKeyBlockType is a possible value for pem.Block.Type.
PublicKeyBlockType = "PUBLIC KEY"
)
// EncodePublicKeyPEM returns PEM-endcode public data
func EncodePublicKeyPEM(key *rsa.PublicKey) ([]byte, error) {
der, err := x509.MarshalPKIXPublicKey(key)
if err != nil {
return []byte{}, err
}
block := pem.Block{
Type: PublicKeyBlockType,
Bytes: der,
}
return pem.EncodeToMemory(&block), nil
}
// EncodePrivateKeyPEM returns PEM-encoded private key data
func EncodePrivateKeyPEM(key *rsa.PrivateKey) []byte {
block := pem.Block{
Type: RSAPrivateKeyBlockType,
Bytes: x509.MarshalPKCS1PrivateKey(key),
}
return pem.EncodeToMemory(&block)
}
// EncodeCertPEM returns PEM-endcoded certificate data
func EncodeCertPEM(cert *x509.Certificate) []byte {
block := pem.Block{
Type: CertificateBlockType,
Bytes: cert.Raw,
}
return pem.EncodeToMemory(&block)
}
// ParsePrivateKeyPEM returns a private key parsed from a PEM block in the supplied data.
// Recognizes PEM blocks for "EC PRIVATE KEY", "RSA PRIVATE KEY", or "PRIVATE KEY"
func ParsePrivateKeyPEM(keyData []byte) (interface{}, error) {
var privateKeyPemBlock *pem.Block
for {
privateKeyPemBlock, keyData = pem.Decode(keyData)
if privateKeyPemBlock == nil {
break
}
switch privateKeyPemBlock.Type {
case ECPrivateKeyBlockType:
// ECDSA Private Key in ASN.1 format
if key, err := x509.ParseECPrivateKey(privateKeyPemBlock.Bytes); err == nil {
return key, nil
}
case RSAPrivateKeyBlockType:
// RSA Private Key in PKCS#1 format
if key, err := x509.ParsePKCS1PrivateKey(privateKeyPemBlock.Bytes); err == nil {
return key, nil
}
case PrivateKeyBlockType:
// RSA or ECDSA Private Key in unencrypted PKCS#8 format
if key, err := x509.ParsePKCS8PrivateKey(privateKeyPemBlock.Bytes); err == nil {
return key, nil
}
}
// tolerate non-key PEM blocks for compatibility with things like "EC PARAMETERS" blocks
// originally, only the first PEM block was parsed and expected to be a key block
}
// we read all the PEM blocks and didn't recognize one
return nil, fmt.Errorf("data does not contain a valid RSA or ECDSA private key")
}
// ParseCertsPEM returns the x509.Certificates contained in the given PEM-encoded byte array
// Returns an error if a certificate could not be parsed, or if the data does not contain any certificates
func ParseCertsPEM(pemCerts []byte) ([]*x509.Certificate, error) {
ok := false
certs := []*x509.Certificate{}
for len(pemCerts) > 0 {
var block *pem.Block
block, pemCerts = pem.Decode(pemCerts)
if block == nil {
break
}
// Only use PEM "CERTIFICATE" blocks without extra headers
if block.Type != CertificateBlockType || len(block.Headers) != 0 {
continue
}
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return certs, err
}
certs = append(certs, cert)
ok = true
}
if !ok {
return certs, errors.New("could not read any certificates")
}
return certs, nil
}

327
vendor/k8s.io/client-go/util/clock/clock.go generated vendored Normal file
View file

@ -0,0 +1,327 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package clock
import (
"sync"
"time"
)
// Clock allows for injecting fake or real clocks into code that
// needs to do arbitrary things based on time.
type Clock interface {
Now() time.Time
Since(time.Time) time.Duration
After(d time.Duration) <-chan time.Time
NewTimer(d time.Duration) Timer
Sleep(d time.Duration)
Tick(d time.Duration) <-chan time.Time
}
var (
_ = Clock(RealClock{})
_ = Clock(&FakeClock{})
_ = Clock(&IntervalClock{})
)
// RealClock really calls time.Now()
type RealClock struct{}
// Now returns the current time.
func (RealClock) Now() time.Time {
return time.Now()
}
// Since returns time since the specified timestamp.
func (RealClock) Since(ts time.Time) time.Duration {
return time.Since(ts)
}
// Same as time.After(d).
func (RealClock) After(d time.Duration) <-chan time.Time {
return time.After(d)
}
func (RealClock) NewTimer(d time.Duration) Timer {
return &realTimer{
timer: time.NewTimer(d),
}
}
func (RealClock) Tick(d time.Duration) <-chan time.Time {
return time.Tick(d)
}
func (RealClock) Sleep(d time.Duration) {
time.Sleep(d)
}
// FakeClock implements Clock, but returns an arbitrary time.
type FakeClock struct {
lock sync.RWMutex
time time.Time
// waiters are waiting for the fake time to pass their specified time
waiters []fakeClockWaiter
}
type fakeClockWaiter struct {
targetTime time.Time
stepInterval time.Duration
skipIfBlocked bool
destChan chan time.Time
fired bool
}
func NewFakeClock(t time.Time) *FakeClock {
return &FakeClock{
time: t,
}
}
// Now returns f's time.
func (f *FakeClock) Now() time.Time {
f.lock.RLock()
defer f.lock.RUnlock()
return f.time
}
// Since returns time since the time in f.
func (f *FakeClock) Since(ts time.Time) time.Duration {
f.lock.RLock()
defer f.lock.RUnlock()
return f.time.Sub(ts)
}
// Fake version of time.After(d).
func (f *FakeClock) After(d time.Duration) <-chan time.Time {
f.lock.Lock()
defer f.lock.Unlock()
stopTime := f.time.Add(d)
ch := make(chan time.Time, 1) // Don't block!
f.waiters = append(f.waiters, fakeClockWaiter{
targetTime: stopTime,
destChan: ch,
})
return ch
}
// Fake version of time.NewTimer(d).
func (f *FakeClock) NewTimer(d time.Duration) Timer {
f.lock.Lock()
defer f.lock.Unlock()
stopTime := f.time.Add(d)
ch := make(chan time.Time, 1) // Don't block!
timer := &fakeTimer{
fakeClock: f,
waiter: fakeClockWaiter{
targetTime: stopTime,
destChan: ch,
},
}
f.waiters = append(f.waiters, timer.waiter)
return timer
}
func (f *FakeClock) Tick(d time.Duration) <-chan time.Time {
f.lock.Lock()
defer f.lock.Unlock()
tickTime := f.time.Add(d)
ch := make(chan time.Time, 1) // hold one tick
f.waiters = append(f.waiters, fakeClockWaiter{
targetTime: tickTime,
stepInterval: d,
skipIfBlocked: true,
destChan: ch,
})
return ch
}
// Move clock by Duration, notify anyone that's called After, Tick, or NewTimer
func (f *FakeClock) Step(d time.Duration) {
f.lock.Lock()
defer f.lock.Unlock()
f.setTimeLocked(f.time.Add(d))
}
// Sets the time.
func (f *FakeClock) SetTime(t time.Time) {
f.lock.Lock()
defer f.lock.Unlock()
f.setTimeLocked(t)
}
// Actually changes the time and checks any waiters. f must be write-locked.
func (f *FakeClock) setTimeLocked(t time.Time) {
f.time = t
newWaiters := make([]fakeClockWaiter, 0, len(f.waiters))
for i := range f.waiters {
w := &f.waiters[i]
if !w.targetTime.After(t) {
if w.skipIfBlocked {
select {
case w.destChan <- t:
w.fired = true
default:
}
} else {
w.destChan <- t
w.fired = true
}
if w.stepInterval > 0 {
for !w.targetTime.After(t) {
w.targetTime = w.targetTime.Add(w.stepInterval)
}
newWaiters = append(newWaiters, *w)
}
} else {
newWaiters = append(newWaiters, f.waiters[i])
}
}
f.waiters = newWaiters
}
// Returns true if After has been called on f but not yet satisfied (so you can
// write race-free tests).
func (f *FakeClock) HasWaiters() bool {
f.lock.RLock()
defer f.lock.RUnlock()
return len(f.waiters) > 0
}
func (f *FakeClock) Sleep(d time.Duration) {
f.Step(d)
}
// IntervalClock implements Clock, but each invocation of Now steps the clock forward the specified duration
type IntervalClock struct {
Time time.Time
Duration time.Duration
}
// Now returns i's time.
func (i *IntervalClock) Now() time.Time {
i.Time = i.Time.Add(i.Duration)
return i.Time
}
// Since returns time since the time in i.
func (i *IntervalClock) Since(ts time.Time) time.Duration {
return i.Time.Sub(ts)
}
// Unimplemented, will panic.
// TODO: make interval clock use FakeClock so this can be implemented.
func (*IntervalClock) After(d time.Duration) <-chan time.Time {
panic("IntervalClock doesn't implement After")
}
// Unimplemented, will panic.
// TODO: make interval clock use FakeClock so this can be implemented.
func (*IntervalClock) NewTimer(d time.Duration) Timer {
panic("IntervalClock doesn't implement NewTimer")
}
// Unimplemented, will panic.
// TODO: make interval clock use FakeClock so this can be implemented.
func (*IntervalClock) Tick(d time.Duration) <-chan time.Time {
panic("IntervalClock doesn't implement Tick")
}
func (*IntervalClock) Sleep(d time.Duration) {
panic("IntervalClock doesn't implement Sleep")
}
// Timer allows for injecting fake or real timers into code that
// needs to do arbitrary things based on time.
type Timer interface {
C() <-chan time.Time
Stop() bool
Reset(d time.Duration) bool
}
var (
_ = Timer(&realTimer{})
_ = Timer(&fakeTimer{})
)
// realTimer is backed by an actual time.Timer.
type realTimer struct {
timer *time.Timer
}
// C returns the underlying timer's channel.
func (r *realTimer) C() <-chan time.Time {
return r.timer.C
}
// Stop calls Stop() on the underlying timer.
func (r *realTimer) Stop() bool {
return r.timer.Stop()
}
// Reset calls Reset() on the underlying timer.
func (r *realTimer) Reset(d time.Duration) bool {
return r.timer.Reset(d)
}
// fakeTimer implements Timer based on a FakeClock.
type fakeTimer struct {
fakeClock *FakeClock
waiter fakeClockWaiter
}
// C returns the channel that notifies when this timer has fired.
func (f *fakeTimer) C() <-chan time.Time {
return f.waiter.destChan
}
// Stop stops the timer and returns true if the timer has not yet fired, or false otherwise.
func (f *fakeTimer) Stop() bool {
f.fakeClock.lock.Lock()
defer f.fakeClock.lock.Unlock()
newWaiters := make([]fakeClockWaiter, 0, len(f.fakeClock.waiters))
for i := range f.fakeClock.waiters {
w := &f.fakeClock.waiters[i]
if w != &f.waiter {
newWaiters = append(newWaiters, *w)
}
}
f.fakeClock.waiters = newWaiters
return !f.waiter.fired
}
// Reset resets the timer to the fake clock's "now" + d. It returns true if the timer has not yet
// fired, or false otherwise.
func (f *fakeTimer) Reset(d time.Duration) bool {
f.fakeClock.lock.Lock()
defer f.fakeClock.lock.Unlock()
active := !f.waiter.fired
f.waiter.fired = false
f.waiter.targetTime = f.fakeClock.time.Add(d)
return active
}

149
vendor/k8s.io/client-go/util/flowcontrol/backoff.go generated vendored Normal file
View file

@ -0,0 +1,149 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"sync"
"time"
"k8s.io/client-go/util/clock"
"k8s.io/client-go/util/integer"
)
type backoffEntry struct {
backoff time.Duration
lastUpdate time.Time
}
type Backoff struct {
sync.Mutex
Clock clock.Clock
defaultDuration time.Duration
maxDuration time.Duration
perItemBackoff map[string]*backoffEntry
}
func NewFakeBackOff(initial, max time.Duration, tc *clock.FakeClock) *Backoff {
return &Backoff{
perItemBackoff: map[string]*backoffEntry{},
Clock: tc,
defaultDuration: initial,
maxDuration: max,
}
}
func NewBackOff(initial, max time.Duration) *Backoff {
return &Backoff{
perItemBackoff: map[string]*backoffEntry{},
Clock: clock.RealClock{},
defaultDuration: initial,
maxDuration: max,
}
}
// Get the current backoff Duration
func (p *Backoff) Get(id string) time.Duration {
p.Lock()
defer p.Unlock()
var delay time.Duration
entry, ok := p.perItemBackoff[id]
if ok {
delay = entry.backoff
}
return delay
}
// move backoff to the next mark, capping at maxDuration
func (p *Backoff) Next(id string, eventTime time.Time) {
p.Lock()
defer p.Unlock()
entry, ok := p.perItemBackoff[id]
if !ok || hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
entry = p.initEntryUnsafe(id)
} else {
delay := entry.backoff * 2 // exponential
entry.backoff = time.Duration(integer.Int64Min(int64(delay), int64(p.maxDuration)))
}
entry.lastUpdate = p.Clock.Now()
}
// Reset forces clearing of all backoff data for a given key.
func (p *Backoff) Reset(id string) {
p.Lock()
defer p.Unlock()
delete(p.perItemBackoff, id)
}
// Returns True if the elapsed time since eventTime is smaller than the current backoff window
func (p *Backoff) IsInBackOffSince(id string, eventTime time.Time) bool {
p.Lock()
defer p.Unlock()
entry, ok := p.perItemBackoff[id]
if !ok {
return false
}
if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
return false
}
return p.Clock.Now().Sub(eventTime) < entry.backoff
}
// Returns True if time since lastupdate is less than the current backoff window.
func (p *Backoff) IsInBackOffSinceUpdate(id string, eventTime time.Time) bool {
p.Lock()
defer p.Unlock()
entry, ok := p.perItemBackoff[id]
if !ok {
return false
}
if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
return false
}
return eventTime.Sub(entry.lastUpdate) < entry.backoff
}
// Garbage collect records that have aged past maxDuration. Backoff users are expected
// to invoke this periodically.
func (p *Backoff) GC() {
p.Lock()
defer p.Unlock()
now := p.Clock.Now()
for id, entry := range p.perItemBackoff {
if now.Sub(entry.lastUpdate) > p.maxDuration*2 {
// GC when entry has not been updated for 2*maxDuration
delete(p.perItemBackoff, id)
}
}
}
func (p *Backoff) DeleteEntry(id string) {
p.Lock()
defer p.Unlock()
delete(p.perItemBackoff, id)
}
// Take a lock on *Backoff, before calling initEntryUnsafe
func (p *Backoff) initEntryUnsafe(id string) *backoffEntry {
entry := &backoffEntry{backoff: p.defaultDuration}
p.perItemBackoff[id] = entry
return entry
}
// After 2*maxDuration we restart the backoff factor to the beginning
func hasExpired(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool {
return eventTime.Sub(lastUpdate) > maxDuration*2 // consider stable if it's ok for twice the maxDuration
}

132
vendor/k8s.io/client-go/util/flowcontrol/throttle.go generated vendored Normal file
View file

@ -0,0 +1,132 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"sync"
"github.com/juju/ratelimit"
)
type RateLimiter interface {
// TryAccept returns true if a token is taken immediately. Otherwise,
// it returns false.
TryAccept() bool
// Accept returns once a token becomes available.
Accept()
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
Stop()
// Saturation returns a percentage number which describes how saturated
// this rate limiter is.
// Usually we use token bucket rate limiter. In that case,
// 1.0 means no tokens are available; 0.0 means we have a full bucket of tokens to use.
Saturation() float64
// QPS returns QPS of this rate limiter
QPS() float32
}
type tokenBucketRateLimiter struct {
limiter *ratelimit.Bucket
qps float32
}
// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
// The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a
// smoothed qps rate of 'qps'.
// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
// The maximum number of tokens in the bucket is capped at 'burst'.
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst))
return &tokenBucketRateLimiter{
limiter: limiter,
qps: qps,
}
}
func (t *tokenBucketRateLimiter) TryAccept() bool {
return t.limiter.TakeAvailable(1) == 1
}
func (t *tokenBucketRateLimiter) Saturation() float64 {
capacity := t.limiter.Capacity()
avail := t.limiter.Available()
return float64(capacity-avail) / float64(capacity)
}
// Accept will block until a token becomes available
func (t *tokenBucketRateLimiter) Accept() {
t.limiter.Wait(1)
}
func (t *tokenBucketRateLimiter) Stop() {
}
func (t *tokenBucketRateLimiter) QPS() float32 {
return t.qps
}
type fakeAlwaysRateLimiter struct{}
func NewFakeAlwaysRateLimiter() RateLimiter {
return &fakeAlwaysRateLimiter{}
}
func (t *fakeAlwaysRateLimiter) TryAccept() bool {
return true
}
func (t *fakeAlwaysRateLimiter) Saturation() float64 {
return 0
}
func (t *fakeAlwaysRateLimiter) Stop() {}
func (t *fakeAlwaysRateLimiter) Accept() {}
func (t *fakeAlwaysRateLimiter) QPS() float32 {
return 1
}
type fakeNeverRateLimiter struct {
wg sync.WaitGroup
}
func NewFakeNeverRateLimiter() RateLimiter {
rl := fakeNeverRateLimiter{}
rl.wg.Add(1)
return &rl
}
func (t *fakeNeverRateLimiter) TryAccept() bool {
return false
}
func (t *fakeNeverRateLimiter) Saturation() float64 {
return 1
}
func (t *fakeNeverRateLimiter) Stop() {
t.wg.Done()
}
func (t *fakeNeverRateLimiter) Accept() {
t.wg.Wait()
}
func (t *fakeNeverRateLimiter) QPS() float32 {
return 1
}

47
vendor/k8s.io/client-go/util/homedir/homedir.go generated vendored Normal file
View file

@ -0,0 +1,47 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package homedir
import (
"os"
"runtime"
)
// HomeDir returns the home directory for the current user
func HomeDir() string {
if runtime.GOOS == "windows" {
// First prefer the HOME environmental variable
if home := os.Getenv("HOME"); len(home) > 0 {
if _, err := os.Stat(home); err == nil {
return home
}
}
if homeDrive, homePath := os.Getenv("HOMEDRIVE"), os.Getenv("HOMEPATH"); len(homeDrive) > 0 && len(homePath) > 0 {
homeDir := homeDrive + homePath
if _, err := os.Stat(homeDir); err == nil {
return homeDir
}
}
if userProfile := os.Getenv("USERPROFILE"); len(userProfile) > 0 {
if _, err := os.Stat(userProfile); err == nil {
return userProfile
}
}
}
return os.Getenv("HOME")
}

67
vendor/k8s.io/client-go/util/integer/integer.go generated vendored Normal file
View file

@ -0,0 +1,67 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package integer
func IntMax(a, b int) int {
if b > a {
return b
}
return a
}
func IntMin(a, b int) int {
if b < a {
return b
}
return a
}
func Int32Max(a, b int32) int32 {
if b > a {
return b
}
return a
}
func Int32Min(a, b int32) int32 {
if b < a {
return b
}
return a
}
func Int64Max(a, b int64) int64 {
if b > a {
return b
}
return a
}
func Int64Min(a, b int64) int64 {
if b < a {
return b
}
return a
}
// RoundToInt32 rounds floats into integer numbers.
func RoundToInt32(a float64) int32 {
if a < 0 {
return int32(a - 0.5)
}
return int32(a + 0.5)
}

View file

@ -0,0 +1,211 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"math"
"sync"
"time"
"github.com/juju/ratelimit"
)
type RateLimiter interface {
// When gets an item and gets to decide how long that item should wait
When(item interface{}) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
// or for success, we'll stop tracking it
Forget(item interface{})
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
}
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
// both overall and per-item rate limitting. The overall is a token bucket and the per-item is exponential
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))},
)
}
// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type BucketRateLimiter struct {
*ratelimit.Bucket
}
var _ RateLimiter = &BucketRateLimiter{}
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Bucket.Take(1)
}
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
return 0
}
func (r *BucketRateLimiter) Forget(item interface{}) {
}
// ItemExponentialFailureRateLimiter does a simple baseDelay*10^<num-failures> limit
// dealing with max failures and expiration are up to the caller
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
baseDelay time.Duration
maxDelay time.Duration
}
var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
return &ItemExponentialFailureRateLimiter{
failures: map[interface{}]int{},
baseDelay: baseDelay,
maxDelay: maxDelay,
}
}
func DefaultItemBasedRateLimiter() RateLimiter {
return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
}
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
maxFastAttempts int
fastDelay time.Duration
slowDelay time.Duration
}
var _ RateLimiter = &ItemFastSlowRateLimiter{}
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
return &ItemFastSlowRateLimiter{
failures: map[interface{}]int{},
fastDelay: fastDelay,
slowDelay: slowDelay,
maxFastAttempts: maxFastAttempts,
}
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
r.failures[item] = r.failures[item] + 1
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
// were separately delayed a longer time.
type MaxOfRateLimiter struct {
limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
return &MaxOfRateLimiter{limiters: limiters}
}
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
ret := 0
for _, limiter := range r.limiters {
curr := limiter.NumRequeues(item)
if curr > ret {
ret = curr
}
}
return ret
}
func (r *MaxOfRateLimiter) Forget(item interface{}) {
for _, limiter := range r.limiters {
limiter.Forget(item)
}
}

View file

@ -0,0 +1,246 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"sort"
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/util/clock"
)
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}
// NewDelayingQueue constructs a new workqueue with delayed queuing ability
func NewDelayingQueue() DelayingInterface {
return newDelayingQueue(clock.RealClock{}, "")
}
func NewNamedDelayingQueue(name string) DelayingInterface {
return newDelayingQueue(clock.RealClock{}, name)
}
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingTimeByEntry: map[t]time.Time{},
waitingForAddCh: make(chan waitFor, 1000),
metrics: newRetryMetrics(name),
}
go ret.waitingLoop()
return ret
}
// delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct {
Interface
// clock tracks time for delayed firing
clock clock.Clock
// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// heartbeat ensures we wait no more than maxWait before firing
//
// TODO: replace with Ticker (and add to clock) so this can be cleaned up.
// clock.Tick will leak.
heartbeat <-chan time.Time
// waitingForAdd is an ordered slice of items to be added to the contained work queue
waitingForAdd []waitFor
// waitingTimeByEntry holds wait time by entry, so we can lookup pre-existing indexes
waitingTimeByEntry map[t]time.Time
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan waitFor
// metrics counts the number of retries
metrics retryMetrics
}
// waitFor holds the data to add and the time it should be added
type waitFor struct {
data t
readyAt time.Time
}
// ShutDown gives a way to shut off this queue
func (q *delayingType) ShutDown() {
q.Interface.ShutDown()
close(q.stopCh)
}
// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// don't add if we're already shutting down
if q.ShuttingDown() {
return
}
q.metrics.retry()
// immediately add things with no delay
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// unblock if ShutDown() is called
case q.waitingForAddCh <- waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
// expired item sitting for more than 10 seconds.
const maxWait = 10 * time.Second
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
// Make a placeholder channel to use when there are no items in our list
never := make(<-chan time.Time)
for {
if q.Interface.ShuttingDown() {
// discard waiting entries
q.waitingForAdd = nil
q.waitingTimeByEntry = nil
return
}
now := q.clock.Now()
// Add ready entries
readyEntries := 0
for _, entry := range q.waitingForAdd {
if entry.readyAt.After(now) {
break
}
q.Add(entry.data)
delete(q.waitingTimeByEntry, entry.data)
readyEntries++
}
q.waitingForAdd = q.waitingForAdd[readyEntries:]
// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never
if len(q.waitingForAdd) > 0 {
nextReadyAt = q.clock.After(q.waitingForAdd[0].readyAt.Sub(now))
}
select {
case <-q.stopCh:
return
case <-q.heartbeat:
// continue the loop, which will add ready items
case <-nextReadyAt:
// continue the loop, which will add ready items
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
} else {
q.Add(waitEntry.data)
}
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
// inserts the given entry into the sorted entries list
// same semantics as append()... the given slice may be modified,
// and the returned value should be used
//
// TODO: This should probably be converted to use container/heap to improve
// running time for a large number of items.
func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor {
// if the entry is already in our retry list and the existing time is before the new one, just skip it
existingTime, exists := knownEntries[entry.data]
if exists && existingTime.Before(entry.readyAt) {
return entries
}
// if the entry exists and is scheduled for later, go ahead and remove the entry
if exists {
if existingIndex := findEntryIndex(entries, existingTime, entry.data); existingIndex >= 0 && existingIndex < len(entries) {
entries = append(entries[:existingIndex], entries[existingIndex+1:]...)
}
}
insertionIndex := sort.Search(len(entries), func(i int) bool {
return entry.readyAt.Before(entries[i].readyAt)
})
// grow by 1
entries = append(entries, waitFor{})
// shift items from the insertion point to the end
copy(entries[insertionIndex+1:], entries[insertionIndex:])
// insert the record
entries[insertionIndex] = entry
knownEntries[entry.data] = entry.readyAt
return entries
}
// findEntryIndex returns the index for an existing entry
func findEntryIndex(entries []waitFor, existingTime time.Time, data t) int {
index := sort.Search(len(entries), func(i int) bool {
return entries[i].readyAt.After(existingTime) || existingTime == entries[i].readyAt
})
// we know this is the earliest possible index, but there could be multiple with the same time
// iterate from here to find the dupe
for ; index < len(entries); index++ {
if entries[index].data == data {
break
}
}
return index
}

26
vendor/k8s.io/client-go/util/workqueue/doc.go generated vendored Normal file
View file

@ -0,0 +1,26 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package workqueue provides a simple queue that supports the following
// features:
// * Fair: items processed in the order in which they are added.
// * Stingy: a single item will not be processed multiple times concurrently,
// and if an item is added multiple times before it can be processed, it
// will only be processed once.
// * Multiple consumers and producers. In particular, it is allowed for an
// item to be reenqueued while it is being processed.
// * Shutdown notifications.
package workqueue

195
vendor/k8s.io/client-go/util/workqueue/metrics.go generated vendored Normal file
View file

@ -0,0 +1,195 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"sync"
"time"
)
// This file provides abstractions for setting the provider (e.g., prometheus)
// of metrics.
type queueMetrics interface {
add(item t)
get(item t)
done(item t)
}
// GaugeMetric represents a single numerical value that can arbitrarily go up
// and down.
type GaugeMetric interface {
Inc()
Dec()
}
// CounterMetric represents a single numerical value that only ever
// goes up.
type CounterMetric interface {
Inc()
}
// SummaryMetric captures individual observations.
type SummaryMetric interface {
Observe(float64)
}
type noopMetric struct{}
func (noopMetric) Inc() {}
func (noopMetric) Dec() {}
func (noopMetric) Observe(float64) {}
type defaultQueueMetrics struct {
// current depth of a workqueue
depth GaugeMetric
// total number of adds handled by a workqueue
adds CounterMetric
// how long an item stays in a workqueue
latency SummaryMetric
// how long processing an item from a workqueue takes
workDuration SummaryMetric
addTimes map[t]time.Time
processingStartTimes map[t]time.Time
}
func (m *defaultQueueMetrics) add(item t) {
if m == nil {
return
}
m.adds.Inc()
m.depth.Inc()
if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = time.Now()
}
}
func (m *defaultQueueMetrics) get(item t) {
if m == nil {
return
}
m.depth.Dec()
m.processingStartTimes[item] = time.Now()
if startTime, exists := m.addTimes[item]; exists {
m.latency.Observe(sinceInMicroseconds(startTime))
delete(m.addTimes, item)
}
}
func (m *defaultQueueMetrics) done(item t) {
if m == nil {
return
}
if startTime, exists := m.processingStartTimes[item]; exists {
m.workDuration.Observe(sinceInMicroseconds(startTime))
delete(m.processingStartTimes, item)
}
}
// Gets the time since the specified start in microseconds.
func sinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}
type retryMetrics interface {
retry()
}
type defaultRetryMetrics struct {
retries CounterMetric
}
func (m *defaultRetryMetrics) retry() {
if m == nil {
return
}
m.retries.Inc()
}
// MetricsProvider generates various metrics used by the queue.
type MetricsProvider interface {
NewDepthMetric(name string) GaugeMetric
NewAddsMetric(name string) CounterMetric
NewLatencyMetric(name string) SummaryMetric
NewWorkDurationMetric(name string) SummaryMetric
NewRetriesMetric(name string) CounterMetric
}
type noopMetricsProvider struct{}
func (_ noopMetricsProvider) NewDepthMetric(name string) GaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewLatencyMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{}
}
var metricsFactory = struct {
metricsProvider MetricsProvider
setProviders sync.Once
}{
metricsProvider: noopMetricsProvider{},
}
func newQueueMetrics(name string) queueMetrics {
var ret *defaultQueueMetrics
if len(name) == 0 {
return ret
}
return &defaultQueueMetrics{
depth: metricsFactory.metricsProvider.NewDepthMetric(name),
adds: metricsFactory.metricsProvider.NewAddsMetric(name),
latency: metricsFactory.metricsProvider.NewLatencyMetric(name),
workDuration: metricsFactory.metricsProvider.NewWorkDurationMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
}
}
func newRetryMetrics(name string) retryMetrics {
var ret *defaultRetryMetrics
if len(name) == 0 {
return ret
}
return &defaultRetryMetrics{
retries: metricsFactory.metricsProvider.NewRetriesMetric(name),
}
}
// SetProvider sets the metrics provider of the metricsFactory.
func SetProvider(metricsProvider MetricsProvider) {
metricsFactory.setProviders.Do(func() {
metricsFactory.metricsProvider = metricsProvider
})
}

52
vendor/k8s.io/client-go/util/workqueue/parallelizer.go generated vendored Normal file
View file

@ -0,0 +1,52 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"sync"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)
type DoWorkPieceFunc func(piece int)
// Parallelize is a very simple framework that allow for parallelizing
// N independent pieces of work.
func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) {
toProcess := make(chan int, pieces)
for i := 0; i < pieces; i++ {
toProcess <- i
}
close(toProcess)
if pieces < workers {
workers = pieces
}
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer utilruntime.HandleCrash()
defer wg.Done()
for piece := range toProcess {
doWorkPiece(piece)
}
}()
}
wg.Wait()
}

172
vendor/k8s.io/client-go/util/workqueue/queue.go generated vendored Normal file
View file

@ -0,0 +1,172 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"sync"
)
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}
// New constructs a new workqueue (see the package comment).
func New() *Type {
return NewNamed("")
}
func NewNamed(name string) *Type {
return &Type{
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
metrics: newQueueMetrics(name),
}
}
// Type is a work queue (see the package comment).
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t
// dirty defines all of the items that need to be processed.
dirty set
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set
cond *sync.Cond
shuttingDown bool
metrics queueMetrics
}
type empty struct{}
type t interface{}
type set map[t]empty
func (s set) has(item t) bool {
_, exists := s[item]
return exists
}
func (s set) insert(item t) {
s[item] = empty{}
}
func (s set) delete(item t) {
delete(s, item)
}
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if q.dirty.has(item) {
return
}
q.metrics.add(item)
q.dirty.insert(item)
if q.processing.has(item) {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()
}
// Len returns the current queue length, for informational purposes only. You
// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
// value, that can't be synchronized properly.
func (q *Type) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return len(q.queue)
}
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.delete(item)
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
// ShutDown will cause q to ignore all new items added to it. As soon as the
// worker goroutines have drained the existing items in the queue, they will be
// instructed to exit.
func (q *Type) ShutDown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.shuttingDown = true
q.cond.Broadcast()
}
func (q *Type) ShuttingDown() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.shuttingDown
}

View file

@ -0,0 +1,69 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface
// AddRateLimited adds an item to the workqueue after the rate limiter says its ok
AddRateLimited(item interface{})
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}
// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
// Remember to call Forget! If you don't, you may end up tracking failures forever.
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
return &rateLimitingType{
DelayingInterface: NewDelayingQueue(),
rateLimiter: rateLimiter,
}
}
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
return &rateLimitingType{
DelayingInterface: NewNamedDelayingQueue(name),
rateLimiter: rateLimiter,
}
}
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
type rateLimitingType struct {
DelayingInterface
rateLimiter RateLimiter
}
// AddRateLimited AddAfter's the item based on the time when the rate limiter says its ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}