Update golang dependencies

This commit is contained in:
Manuel de Brito Fontes 2017-05-20 20:11:38 -04:00
parent c5e30973e5
commit 9ddf98769a
1009 changed files with 175867 additions and 50378 deletions

View file

@ -17,8 +17,9 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/types",
"//pkg/controller:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
],
)

View file

@ -23,10 +23,13 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/controller"
)
// Interface is an abstract, pluggable interface for cloud providers.
type Interface interface {
// Initialize provides the cloud with a kubernetes client builder
Initialize(clientBuilder controller.ControllerClientBuilder)
// LoadBalancer returns a balancer interface. Also returns true if the interface is supported, false otherwise.
LoadBalancer() (LoadBalancer, bool)
// Instances returns an instances interface. Also returns true if the interface is supported, false otherwise.

View file

@ -30,6 +30,7 @@ go_library(
"gce_urlmap.go",
"gce_util.go",
"gce_zones.go",
"metrics.go",
"token_source.go",
],
tags = ["automanaged"],
@ -37,23 +38,25 @@ go_library(
"//pkg/api/v1:go_default_library",
"//pkg/api/v1/service:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/util/net/sets:go_default_library",
"//pkg/volume:go_default_library",
"//vendor:cloud.google.com/go/compute/metadata",
"//vendor:github.com/golang/glog",
"//vendor:github.com/prometheus/client_golang/prometheus",
"//vendor:golang.org/x/oauth2",
"//vendor:golang.org/x/oauth2/google",
"//vendor:google.golang.org/api/compute/v1",
"//vendor:google.golang.org/api/container/v1",
"//vendor:google.golang.org/api/googleapi",
"//vendor:gopkg.in/gcfg.v1",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/errors",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/util/flowcontrol",
"//vendor/cloud.google.com/go/compute/metadata:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/golang.org/x/oauth2:go_default_library",
"//vendor/golang.org/x/oauth2/google:go_default_library",
"//vendor/google.golang.org/api/compute/v0.beta:go_default_library",
"//vendor/google.golang.org/api/compute/v1:go_default_library",
"//vendor/google.golang.org/api/container/v1:go_default_library",
"//vendor/google.golang.org/api/googleapi:go_default_library",
"//vendor/gopkg.in/gcfg.v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)

View file

@ -17,9 +17,9 @@ limitations under the License.
package gce
import (
"errors"
"fmt"
"io"
"net/http"
"regexp"
"strings"
"time"
@ -30,12 +30,13 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"github.com/golang/glog"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
computebeta "google.golang.org/api/compute/v0.beta"
compute "google.golang.org/api/compute/v1"
container "google.golang.org/api/container/v1"
)
@ -76,6 +77,7 @@ const (
// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
type GCECloud struct {
service *compute.Service
serviceBeta *computebeta.Service
containerService *container.Service
projectID string
region string
@ -175,43 +177,29 @@ func newGCECloud(config io.Reader) (*GCECloud, error) {
func CreateGCECloud(projectID, region, zone string, managedZones []string, networkURL string, nodeTags []string,
nodeInstancePrefix string, tokenSource oauth2.TokenSource, useMetadataServer bool) (*GCECloud, error) {
if tokenSource == nil {
var err error
tokenSource, err = google.DefaultTokenSource(
oauth2.NoContext,
compute.CloudPlatformScope,
compute.ComputeScope)
glog.Infof("Using DefaultTokenSource %#v", tokenSource)
if err != nil {
return nil, err
}
} else {
glog.Infof("Using existing Token Source %#v", tokenSource)
}
if err := wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) {
if _, err := tokenSource.Token(); err != nil {
glog.Errorf("error fetching initial token: %v", err)
return false, nil
}
return true, nil
}); err != nil {
return nil, err
}
client := oauth2.NewClient(oauth2.NoContext, tokenSource)
svc, err := compute.New(client)
client, err := newOauthClient(tokenSource)
if err != nil {
return nil, err
}
containerSvc, err := container.New(client)
service, err := compute.New(client)
if err != nil {
return nil, err
}
client, err = newOauthClient(tokenSource)
serviceBeta, err := computebeta.New(client)
if err != nil {
return nil, err
}
containerService, err := container.New(client)
if err != nil {
return nil, err
}
if networkURL == "" {
networkName, err := getNetworkNameViaAPICall(svc, projectID)
networkName, err := getNetworkNameViaAPICall(service, projectID)
if err != nil {
return nil, err
}
@ -219,7 +207,7 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
}
if len(managedZones) == 0 {
managedZones, err = getZonesForRegion(svc, projectID, region)
managedZones, err = getZonesForRegion(service, projectID, region)
if err != nil {
return nil, err
}
@ -231,8 +219,9 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size.
return &GCECloud{
service: svc,
containerService: containerSvc,
service: service,
serviceBeta: serviceBeta,
containerService: containerService,
projectID: projectID,
region: region,
localZone: zone,
@ -245,6 +234,9 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
}, nil
}
// Initialize passes a Kubernetes clientBuilder interface to the cloud provider
func (gce *GCECloud) Initialize(clientBuilder controller.ControllerClientBuilder) {}
// LoadBalancer returns an implementation of LoadBalancer for Google Compute Engine.
func (gce *GCECloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return gce, true
@ -315,7 +307,7 @@ func getNetworkNameViaAPICall(svc *compute.Service, projectID string) (string, e
}
if networkList == nil || len(networkList.Items) <= 0 {
return "", fmt.Errorf("GCE Network List call returned no networks for project %q.", projectID)
return "", fmt.Errorf("GCE Network List call returned no networks for project %q", projectID)
}
return networkList.Items[0].Name, nil
@ -343,16 +335,30 @@ func getZonesForRegion(svc *compute.Service, projectID, region string) ([]string
return zones, nil
}
// NodeAddressesByProviderID returns the node addresses of an instances with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (gce *GCECloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
return []v1.NodeAddress{}, errors.New("unimplemented")
}
func newOauthClient(tokenSource oauth2.TokenSource) (*http.Client, error) {
if tokenSource == nil {
var err error
tokenSource, err = google.DefaultTokenSource(
oauth2.NoContext,
compute.CloudPlatformScope,
compute.ComputeScope)
glog.Infof("Using DefaultTokenSource %#v", tokenSource)
if err != nil {
return nil, err
}
} else {
glog.Infof("Using existing Token Source %#v", tokenSource)
}
// InstanceTypeByProviderID returns the cloudprovider instance type of the node with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (gce *GCECloud) InstanceTypeByProviderID(providerID string) (string, error) {
return "", errors.New("unimplemented")
if err := wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) {
if _, err := tokenSource.Token(); err != nil {
glog.Errorf("error fetching initial token: %v", err)
return false, nil
}
return true, nil
}); err != nil {
return nil, err
}
return oauth2.NewClient(oauth2.NoContext, tokenSource), nil
}

View file

@ -18,45 +18,59 @@ package gce
import (
"net/http"
"time"
compute "google.golang.org/api/compute/v1"
)
// BackendService Management
func newBackendServiceMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"backendservice_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
// GetBackendService retrieves a backend by name.
func (gce *GCECloud) GetBackendService(name string) (*compute.BackendService, error) {
return gce.service.BackendServices.Get(gce.projectID, name).Do()
mc := newBackendServiceMetricContext("get")
v, err := gce.service.BackendServices.Get(gce.projectID, name).Do()
return v, mc.Observe(err)
}
// UpdateBackendService applies the given BackendService as an update to an existing service.
func (gce *GCECloud) UpdateBackendService(bg *compute.BackendService) error {
mc := newBackendServiceMetricContext("update")
op, err := gce.service.BackendServices.Update(gce.projectID, bg.Name, bg).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// DeleteBackendService deletes the given BackendService by name.
func (gce *GCECloud) DeleteBackendService(name string) error {
mc := newBackendServiceMetricContext("delete")
op, err := gce.service.BackendServices.Delete(gce.projectID, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return nil
}
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// CreateBackendService creates the given BackendService.
func (gce *GCECloud) CreateBackendService(bg *compute.BackendService) error {
mc := newBackendServiceMetricContext("create")
op, err := gce.service.BackendServices.Insert(gce.projectID, bg).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListBackendServices lists all backend services in the project.

View file

@ -18,43 +18,61 @@ package gce
import (
"net/http"
"time"
compute "google.golang.org/api/compute/v1"
)
// SSL Certificate management
func newCertMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"cert_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
// GetSslCertificate returns the SslCertificate by name.
func (gce *GCECloud) GetSslCertificate(name string) (*compute.SslCertificate, error) {
return gce.service.SslCertificates.Get(gce.projectID, name).Do()
mc := newCertMetricContext("get")
v, err := gce.service.SslCertificates.Get(gce.projectID, name).Do()
return v, mc.Observe(err)
}
// CreateSslCertificate creates and returns a SslCertificate.
func (gce *GCECloud) CreateSslCertificate(sslCerts *compute.SslCertificate) (*compute.SslCertificate, error) {
mc := newCertMetricContext("create")
op, err := gce.service.SslCertificates.Insert(gce.projectID, sslCerts).Do()
if err != nil {
return nil, err
return nil, mc.Observe(err)
}
if err = gce.waitForGlobalOp(op); err != nil {
return nil, err
if err = gce.waitForGlobalOp(op, mc); err != nil {
return nil, mc.Observe(err)
}
return gce.GetSslCertificate(sslCerts.Name)
}
// DeleteSslCertificate deletes the SslCertificate by name.
func (gce *GCECloud) DeleteSslCertificate(name string) error {
mc := newCertMetricContext("delete")
op, err := gce.service.SslCertificates.Delete(gce.projectID, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return nil
}
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListSslCertificates lists all SslCertificates in the project.
func (gce *GCECloud) ListSslCertificates() (*compute.SslCertificateList, error) {
mc := newCertMetricContext("list")
// TODO: use PageToken to list all not just the first 500
return gce.service.SslCertificates.List(gce.projectID).Do()
v, err := gce.service.SslCertificates.List(gce.projectID).Do()
return v, mc.Observe(err)
}

View file

@ -16,6 +16,15 @@ limitations under the License.
package gce
import "time"
func newClustersMetricContext(request, zone string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"clusters_" + request, unusedMetricLabel, zone},
}
}
func (gce *GCECloud) ListClusters() ([]string, error) {
allClusters := []string{}
@ -36,14 +45,16 @@ func (gce *GCECloud) Master(clusterName string) (string, error) {
}
func (gce *GCECloud) listClustersInZone(zone string) ([]string, error) {
mc := newClustersMetricContext("list_zone", zone)
// TODO: use PageToken to list all not just the first 500
list, err := gce.containerService.Projects.Zones.Clusters.List(gce.projectID, zone).Do()
if err != nil {
return nil, err
return nil, mc.Observe(err)
}
result := []string{}
for _, cluster := range list.Clusters {
result = append(result, cluster.Name)
}
return result, nil
return result, mc.Observe(nil)
}

View file

@ -22,6 +22,7 @@ import (
"net/http"
"path"
"strings"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -77,10 +78,18 @@ type Disks interface {
// GCECloud implements Disks.
var _ Disks = (*GCECloud)(nil)
type gceDisk struct {
type GCEDisk struct {
Zone string
Name string
Kind string
Type string
}
func newDiskMetricContext(request, zone string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"disk_" + request, unusedMetricLabel, zone},
}
}
func (gce *GCECloud) AttachDisk(diskName string, nodeName types.NodeName, readOnly bool) error {
@ -99,12 +108,15 @@ func (gce *GCECloud) AttachDisk(diskName string, nodeName types.NodeName, readOn
}
attachedDisk := gce.convertDiskToAttachedDisk(disk, readWrite)
attachOp, err := gce.service.Instances.AttachDisk(gce.projectID, disk.Zone, instance.Name, attachedDisk).Do()
mc := newDiskMetricContext("attach", instance.Zone)
attachOp, err := gce.service.Instances.AttachDisk(
gce.projectID, disk.Zone, instance.Name, attachedDisk).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForZoneOp(attachOp, disk.Zone)
return gce.waitForZoneOp(attachOp, disk.Zone, mc)
}
func (gce *GCECloud) DetachDisk(devicePath string, nodeName types.NodeName) error {
@ -123,12 +135,13 @@ func (gce *GCECloud) DetachDisk(devicePath string, nodeName types.NodeName) erro
return fmt.Errorf("error getting instance %q", instanceName)
}
mc := newDiskMetricContext("detach", inst.Zone)
detachOp, err := gce.service.Instances.DetachDisk(gce.projectID, inst.Zone, inst.Name, devicePath).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForZoneOp(detachOp, inst.Zone)
return gce.waitForZoneOp(detachOp, inst.Zone, mc)
}
func (gce *GCECloud) DiskIsAttached(diskName string, nodeName types.NodeName) (bool, error) {
@ -192,7 +205,9 @@ func (gce *GCECloud) DisksAreAttached(diskNames []string, nodeName types.NodeNam
// CreateDisk creates a new Persistent Disk, with the specified name &
// size, in the specified zone. It stores specified tags encoded in
// JSON in Description field.
func (gce *GCECloud) CreateDisk(name string, diskType string, zone string, sizeGb int64, tags map[string]string) error {
func (gce *GCECloud) CreateDisk(
name string, diskType string, zone string, sizeGb int64, tags map[string]string) error {
// Do not allow creation of PDs in zones that are not managed. Such PDs
// then cannot be deleted by DeleteDisk.
isManaged := false
@ -228,12 +243,13 @@ func (gce *GCECloud) CreateDisk(name string, diskType string, zone string, sizeG
Type: diskTypeUri,
}
mc := newDiskMetricContext("create", zone)
createOp, err := gce.service.Disks.Insert(gce.projectID, zone, diskToCreate).Do()
if err != nil {
return err
return mc.Observe(err)
}
err = gce.waitForZoneOp(createOp, zone)
err = gce.waitForZoneOp(createOp, zone, mc)
if isGCEError(err, "alreadyExists") {
glog.Warningf("GCE PD %q already exists, reusing", name)
return nil
@ -259,7 +275,7 @@ func (gce *GCECloud) DeleteDisk(diskToDelete string) error {
// If zone is specified, the volume will only be found in the specified zone,
// otherwise all managed zones will be searched.
func (gce *GCECloud) GetAutoLabelsForPD(name string, zone string) (map[string]string, error) {
var disk *gceDisk
var disk *GCEDisk
var err error
if zone == "" {
// We would like as far as possible to avoid this case,
@ -268,7 +284,7 @@ func (gce *GCECloud) GetAutoLabelsForPD(name string, zone string) (map[string]st
// by name, so we have to continue to support that.
// However, wherever possible the zone should be passed (and it is passed
// for most cases that we can control, e.g. dynamic volume provisioning)
disk, err = gce.getDiskByNameUnknownZone(name)
disk, err = gce.GetDiskByNameUnknownZone(name)
if err != nil {
return nil, err
}
@ -300,26 +316,28 @@ func (gce *GCECloud) GetAutoLabelsForPD(name string, zone string) (map[string]st
return labels, nil
}
// Returns a gceDisk for the disk, if it is found in the specified zone.
// Returns a GCEDisk for the disk, if it is found in the specified zone.
// If not found, returns (nil, nil)
func (gce *GCECloud) findDiskByName(diskName string, zone string) (*gceDisk, error) {
func (gce *GCECloud) findDiskByName(diskName string, zone string) (*GCEDisk, error) {
mc := newDiskMetricContext("get", zone)
disk, err := gce.service.Disks.Get(gce.projectID, zone, diskName).Do()
if err == nil {
d := &gceDisk{
d := &GCEDisk{
Zone: lastComponent(disk.Zone),
Name: disk.Name,
Kind: disk.Kind,
Type: disk.Type,
}
return d, nil
return d, mc.Observe(nil)
}
if !isHTTPErrorCode(err, http.StatusNotFound) {
return nil, err
return nil, mc.Observe(err)
}
return nil, nil
return nil, mc.Observe(nil)
}
// Like findDiskByName, but returns an error if the disk is not found
func (gce *GCECloud) getDiskByName(diskName string, zone string) (*gceDisk, error) {
func (gce *GCECloud) getDiskByName(diskName string, zone string) (*GCEDisk, error) {
disk, err := gce.findDiskByName(diskName, zone)
if disk == nil && err == nil {
return nil, fmt.Errorf("GCE persistent disk not found: diskName=%q zone=%q", diskName, zone)
@ -330,7 +348,7 @@ func (gce *GCECloud) getDiskByName(diskName string, zone string) (*gceDisk, erro
// Scans all managed zones to return the GCE PD
// Prefer getDiskByName, if the zone can be established
// Return cloudprovider.DiskNotFound if the given disk cannot be found in any zone
func (gce *GCECloud) getDiskByNameUnknownZone(diskName string) (*gceDisk, error) {
func (gce *GCECloud) GetDiskByNameUnknownZone(diskName string) (*GCEDisk, error) {
// Note: this is the gotcha right now with GCE PD support:
// disk names are not unique per-region.
// (I can create two volumes with name "myvol" in e.g. us-central1-b & us-central1-f)
@ -341,7 +359,7 @@ func (gce *GCECloud) getDiskByNameUnknownZone(diskName string) (*gceDisk, error)
// admission control, but that might be a little weird (values changing
// on create)
var found *gceDisk
var found *GCEDisk
for _, zone := range gce.managedZones {
disk, err := gce.findDiskByName(diskName, zone)
if err != nil {
@ -382,21 +400,23 @@ func (gce *GCECloud) encodeDiskTags(tags map[string]string) (string, error) {
}
func (gce *GCECloud) doDeleteDisk(diskToDelete string) error {
disk, err := gce.getDiskByNameUnknownZone(diskToDelete)
disk, err := gce.GetDiskByNameUnknownZone(diskToDelete)
if err != nil {
return err
}
mc := newDiskMetricContext("delete", disk.Zone)
deleteOp, err := gce.service.Disks.Delete(gce.projectID, disk.Zone, disk.Name).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForZoneOp(deleteOp, disk.Zone)
return gce.waitForZoneOp(deleteOp, disk.Zone, mc)
}
// Converts a Disk resource to an AttachedDisk resource.
func (gce *GCECloud) convertDiskToAttachedDisk(disk *gceDisk, readWrite string) *compute.AttachedDisk {
func (gce *GCECloud) convertDiskToAttachedDisk(disk *GCEDisk, readWrite string) *compute.AttachedDisk {
return &compute.AttachedDisk{
DeviceName: disk.Name,
Kind: disk.Kind,

View file

@ -17,18 +17,26 @@ limitations under the License.
package gce
import (
"time"
"k8s.io/kubernetes/pkg/api/v1"
netsets "k8s.io/kubernetes/pkg/util/net/sets"
compute "google.golang.org/api/compute/v1"
)
// Firewall management: These methods are just passthrough to the existing
// internal firewall creation methods used to manage TCPLoadBalancer.
func newFirewallMetricContext(request string, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"firewall_" + request, region, unusedMetricLabel},
}
}
// GetFirewall returns the Firewall by name.
func (gce *GCECloud) GetFirewall(name string) (*compute.Firewall, error) {
return gce.service.Firewalls.Get(gce.projectID, name).Do()
mc := newFirewallMetricContext("get", "")
v, err := gce.service.Firewalls.Get(gce.projectID, name).Do()
return v, mc.Observe(err)
}
// CreateFirewall creates the given firewall rule.
@ -37,22 +45,28 @@ func (gce *GCECloud) CreateFirewall(name, desc string, sourceRanges netsets.IPNe
if err != nil {
return err
}
// TODO: This completely breaks modularity in the cloudprovider but the methods
// shared with the TCPLoadBalancer take v1.ServicePorts.
mc := newFirewallMetricContext("create", region)
// TODO: This completely breaks modularity in the cloudprovider but
// the methods shared with the TCPLoadBalancer take v1.ServicePorts.
svcPorts := []v1.ServicePort{}
// TODO: Currently the only consumer of this method is the GCE L7
// loadbalancer controller, which never needs a protocol other than TCP.
// We should pipe through a mapping of port:protocol and default to TCP
// if UDP ports are required. This means the method signature will change
// forcing downstream clients to refactor interfaces.
// loadbalancer controller, which never needs a protocol other than
// TCP. We should pipe through a mapping of port:protocol and
// default to TCP if UDP ports are required. This means the method
// signature will change forcing downstream clients to refactor
// interfaces.
for _, p := range ports {
svcPorts = append(svcPorts, v1.ServicePort{Port: int32(p), Protocol: v1.ProtocolTCP})
}
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
mc.Observe(err)
return err
}
return gce.createFirewall(name, region, desc, sourceRanges, svcPorts, hosts)
return mc.Observe(gce.createFirewall(name, region, desc, sourceRanges, svcPorts, hosts))
}
// DeleteFirewall deletes the given firewall rule.
@ -61,30 +75,40 @@ func (gce *GCECloud) DeleteFirewall(name string) error {
if err != nil {
return err
}
return gce.deleteFirewall(name, region)
mc := newFirewallMetricContext("delete", region)
return mc.Observe(gce.deleteFirewall(name, region))
}
// UpdateFirewall applies the given firewall rule as an update to an existing
// firewall rule with the same name.
// UpdateFirewall applies the given firewall rule as an update to an
// existing firewall rule with the same name.
func (gce *GCECloud) UpdateFirewall(name, desc string, sourceRanges netsets.IPNet, ports []int64, hostNames []string) error {
region, err := GetGCERegion(gce.localZone)
if err != nil {
return err
}
// TODO: This completely breaks modularity in the cloudprovider but the methods
// shared with the TCPLoadBalancer take v1.ServicePorts.
mc := newFirewallMetricContext("update", region)
// TODO: This completely breaks modularity in the cloudprovider but
// the methods shared with the TCPLoadBalancer take v1.ServicePorts.
svcPorts := []v1.ServicePort{}
// TODO: Currently the only consumer of this method is the GCE L7
// loadbalancer controller, which never needs a protocol other than TCP.
// We should pipe through a mapping of port:protocol and default to TCP
// if UDP ports are required. This means the method signature will change,
// forcing downstream clients to refactor interfaces.
// loadbalancer controller, which never needs a protocol other than
// TCP. We should pipe through a mapping of port:protocol and
// default to TCP if UDP ports are required. This means the method
// signature will change, forcing downstream clients to refactor
// interfaces.
for _, p := range ports {
svcPorts = append(svcPorts, v1.ServicePort{Port: int32(p), Protocol: v1.ProtocolTCP})
}
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
mc.Observe(err)
return err
}
return gce.updateFirewall(name, region, desc, sourceRanges, svcPorts, hosts)
return mc.Observe(gce.updateFirewall(name, region, desc, sourceRanges, svcPorts, hosts))
}

View file

@ -18,16 +18,23 @@ package gce
import (
"net/http"
"time"
compute "google.golang.org/api/compute/v1"
)
// GlobalForwardingRule management
func newForwardingRuleMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"forwardingrule_" + request, region, unusedMetricLabel},
}
}
// CreateGlobalForwardingRule creates and returns a
// GlobalForwardingRule that points to the given TargetHttp(s)Proxy.
// targetProxyLink is the SelfLink of a TargetHttp(s)Proxy.
func (gce *GCECloud) CreateGlobalForwardingRule(targetProxyLink, ip, name, portRange string) (*compute.ForwardingRule, error) {
mc := newForwardingRuleMetricContext("create", "")
rule := &compute.ForwardingRule{
Name: name,
IPAddress: ip,
@ -37,43 +44,58 @@ func (gce *GCECloud) CreateGlobalForwardingRule(targetProxyLink, ip, name, portR
}
op, err := gce.service.GlobalForwardingRules.Insert(gce.projectID, rule).Do()
if err != nil {
mc.Observe(err)
return nil, err
}
if err = gce.waitForGlobalOp(op); err != nil {
if err = gce.waitForGlobalOp(op, mc); err != nil {
return nil, err
}
return gce.GetGlobalForwardingRule(name)
}
// SetProxyForGlobalForwardingRule links the given TargetHttp(s)Proxy with the given GlobalForwardingRule.
// targetProxyLink is the SelfLink of a TargetHttp(s)Proxy.
func (gce *GCECloud) SetProxyForGlobalForwardingRule(fw *compute.ForwardingRule, targetProxyLink string) error {
op, err := gce.service.GlobalForwardingRules.SetTarget(gce.projectID, fw.Name, &compute.TargetReference{Target: targetProxyLink}).Do()
mc := newForwardingRuleMetricContext("set_proxy", "")
op, err := gce.service.GlobalForwardingRules.SetTarget(
gce.projectID, fw.Name, &compute.TargetReference{Target: targetProxyLink}).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// DeleteGlobalForwardingRule deletes the GlobalForwardingRule by name.
func (gce *GCECloud) DeleteGlobalForwardingRule(name string) error {
mc := newForwardingRuleMetricContext("delete", "")
op, err := gce.service.GlobalForwardingRules.Delete(gce.projectID, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
mc.Observe(nil)
return nil
}
mc.Observe(err)
return err
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// GetGlobalForwardingRule returns the GlobalForwardingRule by name.
func (gce *GCECloud) GetGlobalForwardingRule(name string) (*compute.ForwardingRule, error) {
return gce.service.GlobalForwardingRules.Get(gce.projectID, name).Do()
mc := newForwardingRuleMetricContext("get", "")
v, err := gce.service.GlobalForwardingRules.Get(gce.projectID, name).Do()
return v, mc.Observe(err)
}
// ListGlobalForwardingRules lists all GlobalForwardingRules in the project.
func (gce *GCECloud) ListGlobalForwardingRules() (*compute.ForwardingRuleList, error) {
mc := newForwardingRuleMetricContext("list", "")
// TODO: use PageToken to list all not just the first 500
return gce.service.GlobalForwardingRules.List(gce.projectID).Do()
v, err := gce.service.GlobalForwardingRules.List(gce.projectID).Do()
return v, mc.Observe(err)
}

View file

@ -16,124 +16,165 @@ limitations under the License.
package gce
import compute "google.golang.org/api/compute/v1"
import (
"time"
// Legacy HTTP Health Checks
compute "google.golang.org/api/compute/v1"
)
func newHealthcheckMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"healthcheck_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
// GetHttpHealthCheck returns the given HttpHealthCheck by name.
func (gce *GCECloud) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) {
return gce.service.HttpHealthChecks.Get(gce.projectID, name).Do()
mc := newHealthcheckMetricContext("get_legacy")
v, err := gce.service.HttpHealthChecks.Get(gce.projectID, name).Do()
return v, mc.Observe(err)
}
// UpdateHttpHealthCheck applies the given HttpHealthCheck as an update.
func (gce *GCECloud) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error {
mc := newHealthcheckMetricContext("update_legacy")
op, err := gce.service.HttpHealthChecks.Update(gce.projectID, hc.Name, hc).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// DeleteHttpHealthCheck deletes the given HttpHealthCheck by name.
func (gce *GCECloud) DeleteHttpHealthCheck(name string) error {
mc := newHealthcheckMetricContext("delete_legacy")
op, err := gce.service.HttpHealthChecks.Delete(gce.projectID, name).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// CreateHttpHealthCheck creates the given HttpHealthCheck.
func (gce *GCECloud) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error {
mc := newHealthcheckMetricContext("create_legacy")
op, err := gce.service.HttpHealthChecks.Insert(gce.projectID, hc).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListHttpHealthChecks lists all HttpHealthChecks in the project.
func (gce *GCECloud) ListHttpHealthChecks() (*compute.HttpHealthCheckList, error) {
mc := newHealthcheckMetricContext("list_legacy")
// TODO: use PageToken to list all not just the first 500
return gce.service.HttpHealthChecks.List(gce.projectID).Do()
v, err := gce.service.HttpHealthChecks.List(gce.projectID).Do()
return v, mc.Observe(err)
}
// Legacy HTTPS Health Checks
// GetHttpsHealthCheck returns the given HttpsHealthCheck by name.
func (gce *GCECloud) GetHttpsHealthCheck(name string) (*compute.HttpsHealthCheck, error) {
return gce.service.HttpsHealthChecks.Get(gce.projectID, name).Do()
mc := newHealthcheckMetricContext("get_legacy")
v, err := gce.service.HttpsHealthChecks.Get(gce.projectID, name).Do()
mc.Observe(err)
return v, err
}
// UpdateHttpsHealthCheck applies the given HttpsHealthCheck as an update.
func (gce *GCECloud) UpdateHttpsHealthCheck(hc *compute.HttpsHealthCheck) error {
mc := newHealthcheckMetricContext("update_legacy")
op, err := gce.service.HttpsHealthChecks.Update(gce.projectID, hc.Name, hc).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// DeleteHttpsHealthCheck deletes the given HttpsHealthCheck by name.
func (gce *GCECloud) DeleteHttpsHealthCheck(name string) error {
mc := newHealthcheckMetricContext("delete_legacy")
op, err := gce.service.HttpsHealthChecks.Delete(gce.projectID, name).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// CreateHttpsHealthCheck creates the given HttpsHealthCheck.
func (gce *GCECloud) CreateHttpsHealthCheck(hc *compute.HttpsHealthCheck) error {
mc := newHealthcheckMetricContext("create_legacy")
op, err := gce.service.HttpsHealthChecks.Insert(gce.projectID, hc).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListHttpsHealthChecks lists all HttpsHealthChecks in the project.
func (gce *GCECloud) ListHttpsHealthChecks() (*compute.HttpsHealthCheckList, error) {
mc := newHealthcheckMetricContext("list_legacy")
// TODO: use PageToken to list all not just the first 500
return gce.service.HttpsHealthChecks.List(gce.projectID).Do()
v, err := gce.service.HttpsHealthChecks.List(gce.projectID).Do()
return v, mc.Observe(err)
}
// Generic HealthCheck
// GetHealthCheck returns the given HealthCheck by name.
func (gce *GCECloud) GetHealthCheck(name string) (*compute.HealthCheck, error) {
return gce.service.HealthChecks.Get(gce.projectID, name).Do()
mc := newHealthcheckMetricContext("get")
v, err := gce.service.HealthChecks.Get(gce.projectID, name).Do()
return v, mc.Observe(err)
}
// UpdateHealthCheck applies the given HealthCheck as an update.
func (gce *GCECloud) UpdateHealthCheck(hc *compute.HealthCheck) error {
mc := newHealthcheckMetricContext("update")
op, err := gce.service.HealthChecks.Update(gce.projectID, hc.Name, hc).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// DeleteHealthCheck deletes the given HealthCheck by name.
func (gce *GCECloud) DeleteHealthCheck(name string) error {
mc := newHealthcheckMetricContext("delete")
op, err := gce.service.HealthChecks.Delete(gce.projectID, name).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// CreateHealthCheck creates the given HealthCheck.
func (gce *GCECloud) CreateHealthCheck(hc *compute.HealthCheck) error {
mc := newHealthcheckMetricContext("create")
op, err := gce.service.HealthChecks.Insert(gce.projectID, hc).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListHealthChecks lists all HealthCheck in the project.
func (gce *GCECloud) ListHealthChecks() (*compute.HealthCheckList, error) {
mc := newHealthcheckMetricContext("list")
// TODO: use PageToken to list all not just the first 500
return gce.service.HealthChecks.List(gce.projectID).Do()
v, err := gce.service.HealthChecks.List(gce.projectID).Do()
return v, mc.Observe(err)
}

View file

@ -20,52 +20,76 @@ import (
"fmt"
"net/http"
"strings"
"time"
"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
)
// InstanceGroup Management
func newInstanceGroupMetricContext(request string, zone string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"instancegroup_" + request, unusedMetricLabel, zone},
}
}
// CreateInstanceGroup creates an instance group with the given instances. It is the callers responsibility to add named ports.
// CreateInstanceGroup creates an instance group with the given
// instances. It is the callers responsibility to add named ports.
func (gce *GCECloud) CreateInstanceGroup(name string, zone string) (*compute.InstanceGroup, error) {
mc := newInstanceGroupMetricContext("create", zone)
op, err := gce.service.InstanceGroups.Insert(
gce.projectID, zone, &compute.InstanceGroup{Name: name}).Do()
if err != nil {
mc.Observe(err)
return nil, err
}
if err = gce.waitForZoneOp(op, zone); err != nil {
if err = gce.waitForZoneOp(op, zone, mc); err != nil {
return nil, err
}
return gce.GetInstanceGroup(name, zone)
}
// DeleteInstanceGroup deletes an instance group.
func (gce *GCECloud) DeleteInstanceGroup(name string, zone string) error {
mc := newInstanceGroupMetricContext("delete", zone)
op, err := gce.service.InstanceGroups.Delete(
gce.projectID, zone, name).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForZoneOp(op, zone)
return gce.waitForZoneOp(op, zone, mc)
}
// ListInstanceGroups lists all InstanceGroups in the project and zone.
// ListInstanceGroups lists all InstanceGroups in the project and
// zone.
func (gce *GCECloud) ListInstanceGroups(zone string) (*compute.InstanceGroupList, error) {
mc := newInstanceGroupMetricContext("list", zone)
// TODO: use PageToken to list all not just the first 500
return gce.service.InstanceGroups.List(gce.projectID, zone).Do()
v, err := gce.service.InstanceGroups.List(gce.projectID, zone).Do()
return v, mc.Observe(err)
}
// ListInstancesInInstanceGroup lists all the instances in a given instance group and state.
// ListInstancesInInstanceGroup lists all the instances in a given
// instance group and state.
func (gce *GCECloud) ListInstancesInInstanceGroup(name string, zone string, state string) (*compute.InstanceGroupsListInstances, error) {
mc := newInstanceGroupMetricContext("list_instances", zone)
// TODO: use PageToken to list all not just the first 500
return gce.service.InstanceGroups.ListInstances(
v, err := gce.service.InstanceGroups.ListInstances(
gce.projectID, zone, name,
&compute.InstanceGroupsListInstancesRequest{InstanceState: state}).Do()
return v, mc.Observe(err)
}
// AddInstancesToInstanceGroup adds the given instances to the given instance group.
// AddInstancesToInstanceGroup adds the given instances to the given
// instance group.
func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, instanceNames []string) error {
mc := newInstanceGroupMetricContext("add_instances", zone)
if len(instanceNames) == 0 {
return nil
}
@ -81,16 +105,21 @@ func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, insta
}).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForZoneOp(op, zone)
return gce.waitForZoneOp(op, zone, mc)
}
// RemoveInstancesFromInstanceGroup removes the given instances from the instance group.
// RemoveInstancesFromInstanceGroup removes the given instances from
// the instance group.
func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string, instanceNames []string) error {
mc := newInstanceGroupMetricContext("remove_instances", zone)
if len(instanceNames) == 0 {
return nil
}
instances := []*compute.InstanceReference{}
for _, ins := range instanceNames {
instanceLink := makeHostURL(gce.projectID, zone, ins)
@ -104,21 +133,27 @@ func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string,
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
mc.Observe(nil)
return nil
}
mc.Observe(err)
return err
}
return gce.waitForZoneOp(op, zone)
return gce.waitForZoneOp(op, zone, mc)
}
// AddPortToInstanceGroup adds a port to the given instance group.
func (gce *GCECloud) AddPortToInstanceGroup(ig *compute.InstanceGroup, port int64) (*compute.NamedPort, error) {
mc := newInstanceGroupMetricContext("add_port", ig.Zone)
for _, np := range ig.NamedPorts {
if np.Port == port {
glog.V(3).Infof("Instance group %v already has named port %+v", ig.Name, np)
return np, nil
}
}
glog.Infof("Adding port %v to instance group %v with %d ports", port, ig.Name, len(ig.NamedPorts))
namedPort := compute.NamedPort{Name: fmt.Sprintf("port%v", port), Port: port}
ig.NamedPorts = append(ig.NamedPorts, &namedPort)
@ -133,16 +168,22 @@ func (gce *GCECloud) AddPortToInstanceGroup(ig *compute.InstanceGroup, port int6
gce.projectID, zone, ig.Name,
&compute.InstanceGroupsSetNamedPortsRequest{
NamedPorts: ig.NamedPorts}).Do()
if err != nil {
mc.Observe(err)
return nil, err
}
if err = gce.waitForZoneOp(op, zone); err != nil {
if err = gce.waitForZoneOp(op, zone, mc); err != nil {
return nil, err
}
return &namedPort, nil
}
// GetInstanceGroup returns an instance group by name.
func (gce *GCECloud) GetInstanceGroup(name string, zone string) (*compute.InstanceGroup, error) {
return gce.service.InstanceGroups.Get(gce.projectID, zone, name).Do()
mc := newInstanceGroupMetricContext("get", zone)
v, err := gce.service.InstanceGroups.Get(gce.projectID, zone, name).Do()
return v, mc.Observe(err)
}

View file

@ -25,6 +25,7 @@ import (
"cloud.google.com/go/compute/metadata"
"github.com/golang/glog"
computealpha "google.golang.org/api/compute/v0.beta"
compute "google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/types"
@ -34,6 +35,13 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
)
func newInstancesMetricContext(request, zone string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"instances_" + request, unusedMetricLabel, zone},
}
}
// NodeAddresses is an implementation of Instances.NodeAddresses.
func (gce *GCECloud) NodeAddresses(_ types.NodeName) ([]v1.NodeAddress, error) {
internalIP, err := metadata.Get("instance/network-interfaces/0/ip")
@ -50,6 +58,48 @@ func (gce *GCECloud) NodeAddresses(_ types.NodeName) ([]v1.NodeAddress, error) {
}, nil
}
// This method will not be called from the node that is requesting this ID.
// i.e. metadata service and other local methods cannot be used here
func (gce *GCECloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
project, zone, name, err := splitProviderID(providerID)
if err != nil {
return []v1.NodeAddress{}, err
}
instance, err := gce.service.Instances.Get(project, zone, canonicalizeInstanceName(name)).Do()
if err != nil {
return []v1.NodeAddress{}, fmt.Errorf("error while querying for providerID %q: %v", providerID, err)
}
if len(instance.NetworkInterfaces) < 1 {
return []v1.NodeAddress{}, fmt.Errorf("could not find network interfaces for providerID %q", providerID)
}
networkInterface := instance.NetworkInterfaces[0]
nodeAddresses := []v1.NodeAddress{{Type: v1.NodeInternalIP, Address: networkInterface.NetworkIP}}
for _, config := range networkInterface.AccessConfigs {
nodeAddresses = append(nodeAddresses, v1.NodeAddress{Type: v1.NodeExternalIP, Address: config.NatIP})
}
return nodeAddresses, nil
}
// InstanceTypeByProviderID returns the cloudprovider instance type of the node
// with the specified unique providerID This method will not be called from the
// node that is requesting this ID. i.e. metadata service and other local
// methods cannot be used here
func (gce *GCECloud) InstanceTypeByProviderID(providerID string) (string, error) {
project, zone, name, err := splitProviderID(providerID)
if err != nil {
return "", err
}
instance, err := gce.getInstanceFromProjectInZoneByName(project, zone, name)
if err != nil {
return "", err
}
return instance.Type, nil
}
// ExternalID returns the cloud provider ID of the node with the specified NodeName (deprecated).
func (gce *GCECloud) ExternalID(nodeName types.NodeName) (string, error) {
instanceName := mapNodeNameToInstanceName(nodeName)
@ -140,15 +190,22 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error
Value: &keyString,
})
}
op, err := gce.service.Projects.SetCommonInstanceMetadata(gce.projectID, project.CommonInstanceMetadata).Do()
mc := newInstancesMetricContext("add_ssh_key", "")
op, err := gce.service.Projects.SetCommonInstanceMetadata(
gce.projectID, project.CommonInstanceMetadata).Do()
if err != nil {
glog.Errorf("Could not Set Metadata: %v", err)
mc.Observe(err)
return false, nil
}
if err := gce.waitForGlobalOp(op); err != nil {
if err := gce.waitForGlobalOp(op, mc); err != nil {
glog.Errorf("Could not Set Metadata: %v", err)
return false, nil
}
glog.Infof("Successfully added sshKey to project metadata")
return true, nil
})
@ -167,6 +224,7 @@ func (gce *GCECloud) GetAllZones() (sets.String, error) {
// TODO: Parallelize, although O(zones) so not too bad (N <= 3 typically)
for _, zone := range gce.managedZones {
mc := newInstancesMetricContext("list", zone)
// We only retrieve one page in each zone - we only care about existence
listCall := gce.service.Instances.List(gce.projectID, zone)
@ -183,11 +241,12 @@ func (gce *GCECloud) GetAllZones() (sets.String, error) {
// Just a minimal set of fields - we only care about existence
listCall = listCall.Fields("items(name)")
res, err := listCall.Do()
if err != nil {
return nil, err
return nil, mc.Observe(err)
}
mc.Observe(nil)
if len(res.Items) != 0 {
zones.Insert(zone)
}
@ -201,6 +260,31 @@ func (gce *GCECloud) CurrentNodeName(hostname string) (types.NodeName, error) {
return types.NodeName(hostname), nil
}
// AliasRanges returns a list of CIDR ranges that are assigned to the
// `node` for allocation to pods. Returns a list of the form
// "<ip>/<netmask>".
func (gce *GCECloud) AliasRanges(nodeName types.NodeName) (cidrs []string, err error) {
var instance *gceInstance
instance, err = gce.getInstanceByName(mapNodeNameToInstanceName(nodeName))
if err != nil {
return
}
var res *computealpha.Instance
res, err = gce.serviceBeta.Instances.Get(
gce.projectID, instance.Zone, instance.Name).Do()
if err != nil {
return
}
for _, networkInterface := range res.NetworkInterfaces {
for _, aliasIpRange := range networkInterface.AliasIpRanges {
cidrs = append(cidrs, aliasIpRange.IpCidrRange)
}
}
return
}
// Gets the named instances, returning cloudprovider.InstanceNotFound if any instance is not found
func (gce *GCECloud) getInstancesByNames(names []string) ([]*gceInstance, error) {
instances := make(map[string]*gceInstance)
@ -282,28 +366,38 @@ func (gce *GCECloud) getInstancesByNames(names []string) ([]*gceInstance, error)
func (gce *GCECloud) getInstanceByName(name string) (*gceInstance, error) {
// Avoid changing behaviour when not managing multiple zones
for _, zone := range gce.managedZones {
name = canonicalizeInstanceName(name)
res, err := gce.service.Instances.Get(gce.projectID, zone, name).Do()
instance, err := gce.getInstanceFromProjectInZoneByName(gce.projectID, zone, name)
if err != nil {
glog.Errorf("getInstanceByName: failed to get instance %s; err: %v", name, err)
if isHTTPErrorCode(err, http.StatusNotFound) {
continue
}
return nil, err
}
return &gceInstance{
Zone: lastComponent(res.Zone),
Name: res.Name,
ID: res.Id,
Disks: res.Disks,
Type: lastComponent(res.MachineType),
}, nil
return instance, nil
}
return nil, cloudprovider.InstanceNotFound
}
func (gce *GCECloud) getInstanceFromProjectInZoneByName(project, zone, name string) (*gceInstance, error) {
name = canonicalizeInstanceName(name)
mc := newInstancesMetricContext("get", zone)
res, err := gce.service.Instances.Get(project, zone, name).Do()
mc.Observe(err)
if err != nil {
glog.Errorf("getInstanceFromProjectInZoneByName: failed to get instance %s; err: %v", name, err)
return nil, err
}
return &gceInstance{
Zone: lastComponent(res.Zone),
Name: res.Name,
ID: res.Id,
Disks: res.Disks,
Type: lastComponent(res.MachineType),
}, nil
}
func getInstanceIDViaMetadata() (string, error) {
result, err := metadata.Get("instance/hostname")
if err != nil {

View file

@ -17,11 +17,14 @@ limitations under the License.
package gce
import (
"flag"
"fmt"
"net"
"net/http"
"sort"
"strconv"
"strings"
"time"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
@ -35,6 +38,78 @@ import (
compute "google.golang.org/api/compute/v1"
)
type cidrs struct {
ipn netsets.IPNet
isSet bool
}
var (
lbSrcRngsFlag cidrs
)
func newLoadBalancerMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"loadbalancer_" + request, region, unusedMetricLabel},
}
}
func newTargetPoolMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"targetpool_" + request, region, unusedMetricLabel},
}
}
func newAddressMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"address_" + request, region, unusedMetricLabel},
}
}
func init() {
var err error
lbSrcRngsFlag.ipn, err = netsets.ParseIPNets([]string{"130.211.0.0/22", "35.191.0.0/16", "209.85.152.0/22", "209.85.204.0/22"}...)
if err != nil {
panic("Incorrect default GCE L7 source ranges")
}
flag.Var(&lbSrcRngsFlag, "cloud-provider-gce-lb-src-cidrs", "CIDRS opened in GCE firewall for LB traffic proxy & health checks")
}
// String is the method to format the flag's value, part of the flag.Value interface.
func (c *cidrs) String() string {
return strings.Join(c.ipn.StringSlice(), ",")
}
// Set supports a value of CSV or the flag repeated multiple times
func (c *cidrs) Set(value string) error {
// On first Set(), clear the original defaults
if !c.isSet {
c.isSet = true
c.ipn = make(netsets.IPNet)
} else {
return fmt.Errorf("GCE LB CIDRS have already been set")
}
for _, cidr := range strings.Split(value, ",") {
_, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
return err
}
c.ipn.Insert(ipnet)
}
return nil
}
// LoadBalancerSrcRanges contains the ranges of ips used by the GCE load balancers (l4 & L7)
// for proxying client requests and performing health checks.
func LoadBalancerSrcRanges() []string {
return lbSrcRngsFlag.ipn.StringSlice()
}
// GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer
func (gce *GCECloud) GetLoadBalancer(clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
@ -55,6 +130,7 @@ func (gce *GCECloud) GetLoadBalancer(clusterName string, service *v1.Service) (*
// Our load balancers in GCE consist of four separate GCE resources - a static
// IP address, a firewall rule, a target pool, and a forwarding rule. This
// function has to manage all of them.
//
// Due to an interesting series of design decisions, this handles both creating
// new load balancers and updating existing load balancers, recognizing when
// each is needed.
@ -80,7 +156,8 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
affinityType := apiService.Spec.SessionAffinity
serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name}
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", loadBalancerName, gce.region, loadBalancerIP, portStr, hostNames, serviceName, apiService.Annotations)
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)",
loadBalancerName, gce.region, loadBalancerIP, portStr, hostNames, serviceName, apiService.Annotations)
// Check if the forwarding rule exists, and if so, what its IP is.
fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(loadBalancerName, gce.region, loadBalancerIP, ports)
@ -88,7 +165,8 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
return nil, err
}
if !fwdRuleExists {
glog.Infof("Forwarding rule %v for Service %v/%v doesn't exist", loadBalancerName, apiService.Namespace, apiService.Name)
glog.V(2).Infof("Forwarding rule %v for Service %v/%v doesn't exist",
loadBalancerName, apiService.Namespace, apiService.Name)
}
// Make sure we know which IP address will be used and have properly reserved
@ -233,9 +311,9 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
}
// Ensure health checks are created for this target pool to pass to createTargetPool for health check links
// Alternately, if the annotation on the service was removed, we need to recreate the target pool without
// health checks. This needs to be prior to the forwarding rule deletion below otherwise it is not possible
// to delete just the target pool or http health checks later.
// Alternately, if the service has ExternalTrafficPolicy field set from Local to Global, we need to recreate
// the target pool without health checks. This needs to be prior to the forwarding rule deletion below otherwise
// it is not possible to delete just the target pool or http health checks later.
var hcToCreate *compute.HttpHealthCheck
hcExisting, err := gce.GetHttpHealthCheck(loadBalancerName)
if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) {
@ -247,7 +325,7 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
// This logic exists to detect a transition for a pre-existing service and turn on
// the tpNeedsUpdate flag to delete/recreate fwdrule/tpool adding the health check
// to the target pool.
glog.V(2).Infof("Annotation external-traffic=OnlyLocal added to new or pre-existing service")
glog.V(2).Infof("ExternalTrafficPolicy field set to Local on new or pre-existing service")
tpNeedsUpdate = true
}
hcToCreate, err = gce.ensureHttpHealthCheck(loadBalancerName, path, healthCheckNodePort)
@ -403,8 +481,6 @@ func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.S
return nil
}
// XXX ----------
func (gce *GCECloud) DeleteForwardingRule(name string) error {
region, err := GetGCERegion(gce.localZone)
if err != nil {
@ -414,15 +490,18 @@ func (gce *GCECloud) DeleteForwardingRule(name string) error {
}
func (gce *GCECloud) deleteForwardingRule(name, region string) error {
mc := newForwardingRuleMetricContext("delete", region)
op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Forwarding rule %s already deleted. Continuing to delete other resources.", name)
} else if err != nil {
glog.Warningf("Failed to delete forwarding rule %s: got error %s.", name, err.Error())
return err
return mc.Observe(err)
} else {
if err := gce.waitForRegionOp(op, region); err != nil {
glog.Warningf("Failed waiting for forwarding rule %s to be deleted: got error %s.", name, err.Error())
if err := gce.waitForRegionOp(op, region, mc); err != nil {
glog.Warningf("Failed waiting for forwarding rule %s to be deleted: got error %s.",
name, err.Error())
return err
}
}
@ -439,18 +518,22 @@ func (gce *GCECloud) DeleteTargetPool(name string, hc *compute.HttpHealthCheck)
}
func (gce *GCECloud) deleteTargetPool(name, region string, hc *compute.HttpHealthCheck) error {
mc := newTargetPoolMetricContext("delete", region)
op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Target pool %s already deleted. Continuing to delete other resources.", name)
} else if err != nil {
glog.Warningf("Failed to delete target pool %s, got error %s.", name, err.Error())
return err
return mc.Observe(err)
} else {
if err := gce.waitForRegionOp(op, region); err != nil {
glog.Warningf("Failed waiting for target pool %s to be deleted: got error %s.", name, err.Error())
if err := gce.waitForRegionOp(op, region, mc); err != nil {
glog.Warningf("Failed waiting for target pool %s to be deleted: got error %s.",
name, err.Error())
return err
}
}
// Deletion of health checks is allowed only after the TargetPool reference is deleted
if hc != nil {
glog.Infof("Deleting health check %v", hc.Name)
@ -495,12 +578,14 @@ func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []
SessionAffinity: translateAffinityType(affinityType),
HealthChecks: hcLinks,
}
mc := newTargetPoolMetricContext("insert", region)
op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
return mc.Observe(err)
}
if op != nil {
err = gce.waitForRegionOp(op, region)
err = gce.waitForRegionOp(op, region, mc)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
@ -524,22 +609,25 @@ func (gce *GCECloud) updateTargetPool(loadBalancerName string, existing sets.Str
if len(toAdd) > 0 {
add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd}
mc := newTargetPoolMetricContext("update", gce.region)
op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do()
if err != nil {
return err
return mc.Observe(err)
}
if err := gce.waitForRegionOp(op, gce.region); err != nil {
if err := gce.waitForRegionOp(op, gce.region, mc); err != nil {
return err
}
}
if len(toRemove) > 0 {
mc := newTargetPoolMetricContext("delete", gce.region)
rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove}
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do()
if err != nil {
return err
return mc.Observe(err)
}
if err := gce.waitForRegionOp(op, gce.region); err != nil {
if err := gce.waitForRegionOp(op, gce.region, mc); err != nil {
return err
}
}
@ -815,12 +903,13 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s
Target: gce.targetPoolURL(name, region),
}
mc := newForwardingRuleMetricContext("create", region)
op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
return mc.Observe(err)
}
if op != nil {
err = gce.waitForRegionOp(op, region)
err = gce.waitForRegionOp(op, region, mc)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
@ -829,16 +918,17 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s
}
func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
mc := newFirewallMetricContext("create", region)
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return err
return mc.Observe(err)
}
op, err := gce.service.Firewalls.Insert(gce.projectID, firewall).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
return mc.Observe(err)
}
if op != nil {
err = gce.waitForGlobalOp(op)
err = gce.waitForGlobalOp(op, mc)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
@ -847,16 +937,17 @@ func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges nets
}
func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
mc := newFirewallMetricContext("update", region)
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return err
return mc.Observe(err)
}
op, err := gce.service.Firewalls.Update(gce.projectID, makeFirewallName(name), firewall).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
return mc.Observe(err)
}
if op != nil {
err = gce.waitForGlobalOp(op)
err = gce.waitForGlobalOp(op, mc)
if err != nil {
return err
}
@ -1014,19 +1105,23 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
Name: name,
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
}
if existingIP != "" {
addressObj.Address = existingIP
}
mc := newAddressMetricContext("create", region)
op, err := gce.service.Addresses.Insert(gce.projectID, region, addressObj).Do()
if err != nil {
if !isHTTPErrorCode(err, http.StatusConflict) {
return "", false, fmt.Errorf("error creating gce static IP address: %v", err)
return "", false, fmt.Errorf("error creating gce static IP address: %v",
mc.Observe(err))
}
// StatusConflict == the IP exists already.
existed = true
}
if op != nil {
err := gce.waitForRegionOp(op, region)
err := gce.waitForRegionOp(op, region, mc)
if err != nil {
if !isHTTPErrorCode(err, http.StatusConflict) {
return "", false, fmt.Errorf("error waiting for gce static IP address to be created: %v", err)
@ -1045,15 +1140,17 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
}
func (gce *GCECloud) deleteFirewall(name, region string) error {
mc := newFirewallMetricContext("delete", region)
fwName := makeFirewallName(name)
op, err := gce.service.Firewalls.Delete(gce.projectID, fwName).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Firewall %s already deleted. Continuing to delete other resources.", name)
glog.V(2).Infof("Firewall %s already deleted. Continuing to delete other resources.", name)
} else if err != nil {
glog.Warningf("Failed to delete firewall %s, got error %v", fwName, err)
return err
return mc.Observe(err)
} else {
if err := gce.waitForGlobalOp(op); err != nil {
if err := gce.waitForGlobalOp(op, mc); err != nil {
glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", fwName, err)
return err
}
@ -1062,14 +1159,15 @@ func (gce *GCECloud) deleteFirewall(name, region string) error {
}
func (gce *GCECloud) deleteStaticIP(name, region string) error {
mc := newAddressMetricContext("delete", region)
op, err := gce.service.Addresses.Delete(gce.projectID, region, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Static IP address %s is not reserved", name)
} else if err != nil {
glog.Warningf("Failed to delete static IP address %s, got error %v", name, err)
return err
return mc.Observe(err)
} else {
if err := gce.waitForRegionOp(op, region); err != nil {
if err := gce.waitForRegionOp(op, region, mc); err != nil {
glog.Warningf("Failed waiting for address %s to be deleted, got error: %v", name, err)
return err
}

View file

@ -27,9 +27,9 @@ import (
"google.golang.org/api/googleapi"
)
func (gce *GCECloud) waitForOp(op *compute.Operation, getOperation func(operationName string) (*compute.Operation, error)) error {
func (gce *GCECloud) waitForOp(op *compute.Operation, getOperation func(operationName string) (*compute.Operation, error), mc *metricContext) error {
if op == nil {
return fmt.Errorf("operation must not be nil")
return mc.Observe(fmt.Errorf("operation must not be nil"))
}
if opIsDone(op) {
@ -38,18 +38,20 @@ func (gce *GCECloud) waitForOp(op *compute.Operation, getOperation func(operatio
opStart := time.Now()
opName := op.Name
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
start := time.Now()
gce.operationPollRateLimiter.Accept()
duration := time.Now().Sub(start)
if duration > 5*time.Second {
glog.Infof("pollOperation: throttled %v for %v", duration, opName)
glog.V(2).Infof("pollOperation: throttled %v for %v", duration, opName)
}
pollOp, err := getOperation(opName)
if err != nil {
glog.Warningf("GCE poll operation %s failed: pollOp: [%v] err: [%v] getErrorFromOp: [%v]",
opName, pollOp, err, getErrorFromOp(pollOp))
}
done := opIsDone(pollOp)
if done {
duration := time.Now().Sub(opStart)
@ -60,12 +62,13 @@ func (gce *GCECloud) waitForOp(op *compute.Operation, getOperation func(operatio
glog.Warningf("waitForOperation: long operation (%v): %v (failed to encode to JSON: %v)",
duration, pollOp, err)
} else {
glog.Infof("waitForOperation: long operation (%v): %v",
glog.V(2).Infof("waitForOperation: long operation (%v): %v",
duration, string(enc))
}
}
}
return done, getErrorFromOp(pollOp)
return done, mc.Observe(getErrorFromOp(pollOp))
})
}
@ -86,20 +89,20 @@ func getErrorFromOp(op *compute.Operation) error {
return nil
}
func (gce *GCECloud) waitForGlobalOp(op *compute.Operation) error {
func (gce *GCECloud) waitForGlobalOp(op *compute.Operation, mc *metricContext) error {
return gce.waitForOp(op, func(operationName string) (*compute.Operation, error) {
return gce.service.GlobalOperations.Get(gce.projectID, operationName).Do()
})
}, mc)
}
func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error {
func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string, mc *metricContext) error {
return gce.waitForOp(op, func(operationName string) (*compute.Operation, error) {
return gce.service.RegionOperations.Get(gce.projectID, region, operationName).Do()
})
}, mc)
}
func (gce *GCECloud) waitForZoneOp(op *compute.Operation, zone string) error {
func (gce *GCECloud) waitForZoneOp(op *compute.Operation, zone string, mc *metricContext) error {
return gce.waitForOp(op, func(operationName string) (*compute.Operation, error) {
return gce.service.ZoneOperations.Get(gce.projectID, zone, operationName).Do()
})
}, mc)
}

View file

@ -21,6 +21,7 @@ import (
"net/http"
"path"
"strings"
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/cloudprovider"
@ -29,11 +30,19 @@ import (
compute "google.golang.org/api/compute/v1"
)
func newRoutesMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"routes_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
func (gce *GCECloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, error) {
var routes []*cloudprovider.Route
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
mc := newRoutesMetricContext("list_page")
listCall := gce.service.Routes.List(gce.projectID)
prefix := truncateClusterName(clusterName)
@ -42,6 +51,7 @@ func (gce *GCECloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, err
listCall = listCall.PageToken(pageToken)
}
res, err := listCall.Do()
mc.Observe(err)
if err != nil {
glog.Errorf("Error getting routes from GCE: %v", err)
return nil, err
@ -80,6 +90,8 @@ func (gce *GCECloud) CreateRoute(clusterName string, nameHint string, route *clo
if err != nil {
return err
}
mc := newRoutesMetricContext("create")
insertOp, err := gce.service.Routes.Insert(gce.projectID, &compute.Route{
Name: routeName,
DestRange: route.DestinationCIDR,
@ -93,18 +105,19 @@ func (gce *GCECloud) CreateRoute(clusterName string, nameHint string, route *clo
glog.Info("Route %v already exists.")
return nil
} else {
return err
return mc.Observe(err)
}
}
return gce.waitForGlobalOp(insertOp)
return gce.waitForGlobalOp(insertOp, mc)
}
func (gce *GCECloud) DeleteRoute(clusterName string, route *cloudprovider.Route) error {
mc := newRoutesMetricContext("delete")
deleteOp, err := gce.service.Routes.Delete(gce.projectID, route.Name).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(deleteOp)
return gce.waitForGlobalOp(deleteOp, mc)
}
func truncateClusterName(clusterName string) string {

View file

@ -16,36 +16,51 @@ limitations under the License.
package gce
import compute "google.golang.org/api/compute/v1"
import (
"time"
// Global static IP management
compute "google.golang.org/api/compute/v1"
)
func newStaticIPMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"staticip_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
// ReserveGlobalStaticIP creates a global static IP.
// Caller is allocated a random IP if they do not specify an ipAddress. If an
// ipAddress is specified, it must belong to the current project, eg: an
// ephemeral IP associated with a global forwarding rule.
func (gce *GCECloud) ReserveGlobalStaticIP(name, ipAddress string) (address *compute.Address, err error) {
mc := newStaticIPMetricContext("reserve")
op, err := gce.service.GlobalAddresses.Insert(gce.projectID, &compute.Address{Name: name, Address: ipAddress}).Do()
if err != nil {
return nil, mc.Observe(err)
}
if err := gce.waitForGlobalOp(op, mc); err != nil {
return nil, err
}
if err := gce.waitForGlobalOp(op); err != nil {
return nil, err
}
// We have to get the address to know which IP was allocated for us.
return gce.service.GlobalAddresses.Get(gce.projectID, name).Do()
}
// DeleteGlobalStaticIP deletes a global static IP by name.
func (gce *GCECloud) DeleteGlobalStaticIP(name string) error {
mc := newStaticIPMetricContext("delete")
op, err := gce.service.GlobalAddresses.Delete(gce.projectID, name).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// GetGlobalStaticIP returns the global static IP by name.
func (gce *GCECloud) GetGlobalStaticIP(name string) (address *compute.Address, err error) {
return gce.service.GlobalAddresses.Get(gce.projectID, name).Do()
func (gce *GCECloud) GetGlobalStaticIP(name string) (*compute.Address, error) {
mc := newStaticIPMetricContext("get")
v, err := gce.service.GlobalAddresses.Get(gce.projectID, name).Do()
return v, mc.Observe(err)
}

View file

@ -18,15 +18,23 @@ package gce
import (
"net/http"
"time"
compute "google.golang.org/api/compute/v1"
)
// TargetHttpProxy management
func newTargetProxyMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"targetproxy_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
// GetTargetHttpProxy returns the UrlMap by name.
func (gce *GCECloud) GetTargetHttpProxy(name string) (*compute.TargetHttpProxy, error) {
return gce.service.TargetHttpProxies.Get(gce.projectID, name).Do()
mc := newTargetProxyMetricContext("get")
v, err := gce.service.TargetHttpProxies.Get(gce.projectID, name).Do()
return v, mc.Observe(err)
}
// CreateTargetHttpProxy creates and returns a TargetHttpProxy with the given UrlMap.
@ -35,11 +43,13 @@ func (gce *GCECloud) CreateTargetHttpProxy(urlMap *compute.UrlMap, name string)
Name: name,
UrlMap: urlMap.SelfLink,
}
mc := newTargetProxyMetricContext("create")
op, err := gce.service.TargetHttpProxies.Insert(gce.projectID, proxy).Do()
if err != nil {
return nil, err
return nil, mc.Observe(err)
}
if err = gce.waitForGlobalOp(op); err != nil {
if err = gce.waitForGlobalOp(op, mc); err != nil {
return nil, err
}
return gce.GetTargetHttpProxy(name)
@ -47,40 +57,48 @@ func (gce *GCECloud) CreateTargetHttpProxy(urlMap *compute.UrlMap, name string)
// SetUrlMapForTargetHttpProxy sets the given UrlMap for the given TargetHttpProxy.
func (gce *GCECloud) SetUrlMapForTargetHttpProxy(proxy *compute.TargetHttpProxy, urlMap *compute.UrlMap) error {
op, err := gce.service.TargetHttpProxies.SetUrlMap(gce.projectID, proxy.Name, &compute.UrlMapReference{UrlMap: urlMap.SelfLink}).Do()
mc := newTargetProxyMetricContext("set_url_map")
op, err := gce.service.TargetHttpProxies.SetUrlMap(
gce.projectID, proxy.Name, &compute.UrlMapReference{UrlMap: urlMap.SelfLink}).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// DeleteTargetHttpProxy deletes the TargetHttpProxy by name.
func (gce *GCECloud) DeleteTargetHttpProxy(name string) error {
mc := newTargetProxyMetricContext("delete")
op, err := gce.service.TargetHttpProxies.Delete(gce.projectID, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return nil
}
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListTargetHttpProxies lists all TargetHttpProxies in the project.
func (gce *GCECloud) ListTargetHttpProxies() (*compute.TargetHttpProxyList, error) {
mc := newTargetProxyMetricContext("list")
// TODO: use PageToken to list all not just the first 500
return gce.service.TargetHttpProxies.List(gce.projectID).Do()
v, err := gce.service.TargetHttpProxies.List(gce.projectID).Do()
return v, mc.Observe(err)
}
// TargetHttpsProxy management
// GetTargetHttpsProxy returns the UrlMap by name.
func (gce *GCECloud) GetTargetHttpsProxy(name string) (*compute.TargetHttpsProxy, error) {
return gce.service.TargetHttpsProxies.Get(gce.projectID, name).Do()
mc := newTargetProxyMetricContext("get")
v, err := gce.service.TargetHttpsProxies.Get(gce.projectID, name).Do()
return v, mc.Observe(err)
}
// CreateTargetHttpsProxy creates and returns a TargetHttpsProxy with the given UrlMap and SslCertificate.
func (gce *GCECloud) CreateTargetHttpsProxy(urlMap *compute.UrlMap, sslCert *compute.SslCertificate, name string) (*compute.TargetHttpsProxy, error) {
mc := newTargetProxyMetricContext("create")
proxy := &compute.TargetHttpsProxy{
Name: name,
UrlMap: urlMap.SelfLink,
@ -88,9 +106,9 @@ func (gce *GCECloud) CreateTargetHttpsProxy(urlMap *compute.UrlMap, sslCert *com
}
op, err := gce.service.TargetHttpsProxies.Insert(gce.projectID, proxy).Do()
if err != nil {
return nil, err
return nil, mc.Observe(err)
}
if err = gce.waitForGlobalOp(op); err != nil {
if err = gce.waitForGlobalOp(op, mc); err != nil {
return nil, err
}
return gce.GetTargetHttpsProxy(name)
@ -98,36 +116,43 @@ func (gce *GCECloud) CreateTargetHttpsProxy(urlMap *compute.UrlMap, sslCert *com
// SetUrlMapForTargetHttpsProxy sets the given UrlMap for the given TargetHttpsProxy.
func (gce *GCECloud) SetUrlMapForTargetHttpsProxy(proxy *compute.TargetHttpsProxy, urlMap *compute.UrlMap) error {
op, err := gce.service.TargetHttpsProxies.SetUrlMap(gce.projectID, proxy.Name, &compute.UrlMapReference{UrlMap: urlMap.SelfLink}).Do()
mc := newTargetProxyMetricContext("set_url_map")
op, err := gce.service.TargetHttpsProxies.SetUrlMap(
gce.projectID, proxy.Name, &compute.UrlMapReference{UrlMap: urlMap.SelfLink}).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// SetSslCertificateForTargetHttpsProxy sets the given SslCertificate for the given TargetHttpsProxy.
func (gce *GCECloud) SetSslCertificateForTargetHttpsProxy(proxy *compute.TargetHttpsProxy, sslCert *compute.SslCertificate) error {
op, err := gce.service.TargetHttpsProxies.SetSslCertificates(gce.projectID, proxy.Name, &compute.TargetHttpsProxiesSetSslCertificatesRequest{SslCertificates: []string{sslCert.SelfLink}}).Do()
mc := newTargetProxyMetricContext("set_ssl_cert")
op, err := gce.service.TargetHttpsProxies.SetSslCertificates(
gce.projectID, proxy.Name, &compute.TargetHttpsProxiesSetSslCertificatesRequest{SslCertificates: []string{sslCert.SelfLink}}).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// DeleteTargetHttpsProxy deletes the TargetHttpsProxy by name.
func (gce *GCECloud) DeleteTargetHttpsProxy(name string) error {
mc := newTargetProxyMetricContext("delete")
op, err := gce.service.TargetHttpsProxies.Delete(gce.projectID, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return nil
}
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListTargetHttpsProxies lists all TargetHttpsProxies in the project.
func (gce *GCECloud) ListTargetHttpsProxies() (*compute.TargetHttpsProxyList, error) {
mc := newTargetProxyMetricContext("list")
// TODO: use PageToken to list all not just the first 500
return gce.service.TargetHttpsProxies.List(gce.projectID).Do()
v, err := gce.service.TargetHttpsProxies.List(gce.projectID).Do()
return v, mc.Observe(err)
}

View file

@ -18,15 +18,23 @@ package gce
import (
"net/http"
"time"
compute "google.golang.org/api/compute/v1"
)
// UrlMap management
func newUrlMapMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"urlmap_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
// GetUrlMap returns the UrlMap by name.
func (gce *GCECloud) GetUrlMap(name string) (*compute.UrlMap, error) {
return gce.service.UrlMaps.Get(gce.projectID, name).Do()
mc := newUrlMapMetricContext("get")
v, err := gce.service.UrlMaps.Get(gce.projectID, name).Do()
return v, mc.Observe(err)
}
// CreateUrlMap creates an url map, using the given backend service as the default service.
@ -35,11 +43,12 @@ func (gce *GCECloud) CreateUrlMap(backend *compute.BackendService, name string)
Name: name,
DefaultService: backend.SelfLink,
}
mc := newUrlMapMetricContext("create")
op, err := gce.service.UrlMaps.Insert(gce.projectID, urlMap).Do()
if err != nil {
return nil, err
return nil, mc.Observe(err)
}
if err = gce.waitForGlobalOp(op); err != nil {
if err = gce.waitForGlobalOp(op, mc); err != nil {
return nil, err
}
return gce.GetUrlMap(name)
@ -47,11 +56,12 @@ func (gce *GCECloud) CreateUrlMap(backend *compute.BackendService, name string)
// UpdateUrlMap applies the given UrlMap as an update, and returns the new UrlMap.
func (gce *GCECloud) UpdateUrlMap(urlMap *compute.UrlMap) (*compute.UrlMap, error) {
mc := newUrlMapMetricContext("update")
op, err := gce.service.UrlMaps.Update(gce.projectID, urlMap.Name, urlMap).Do()
if err != nil {
return nil, err
return nil, mc.Observe(err)
}
if err = gce.waitForGlobalOp(op); err != nil {
if err = gce.waitForGlobalOp(op, mc); err != nil {
return nil, err
}
return gce.service.UrlMaps.Get(gce.projectID, urlMap.Name).Do()
@ -59,18 +69,21 @@ func (gce *GCECloud) UpdateUrlMap(urlMap *compute.UrlMap) (*compute.UrlMap, erro
// DeleteUrlMap deletes a url map by name.
func (gce *GCECloud) DeleteUrlMap(name string) error {
mc := newUrlMapMetricContext("delete")
op, err := gce.service.UrlMaps.Delete(gce.projectID, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return nil
}
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListUrlMaps lists all UrlMaps in the project.
func (gce *GCECloud) ListUrlMaps() (*compute.UrlMapList, error) {
mc := newUrlMapMetricContext("list")
// TODO: use PageToken to list all not just the first 500
return gce.service.UrlMaps.List(gce.projectID).Do()
v, err := gce.service.UrlMaps.List(gce.projectID).Do()
return v, mc.Observe(err)
}

View file

@ -17,7 +17,9 @@ limitations under the License.
package gce
import (
"errors"
"fmt"
"regexp"
"strings"
"k8s.io/apimachinery/pkg/types"
@ -35,6 +37,8 @@ type gceInstance struct {
Type string
}
var providerIdRE = regexp.MustCompile(`^` + ProviderName + `://([^/]+)/([^/]+)/([^/]+)$`)
func getProjectAndZone() (string, string, error) {
result, err := metadata.Get("instance/zone")
if err != nil {
@ -100,3 +104,14 @@ func isHTTPErrorCode(err error, code int) bool {
apiErr, ok := err.(*googleapi.Error)
return ok && apiErr.Code == code
}
// splitProviderID splits a provider's id into core components.
// A providerID is build out of '${ProviderName}://${project-id}/${zone}/${instance-name}'
// See cloudprovider.GetInstanceProviderID.
func splitProviderID(providerID string) (project, zone, instance string, err error) {
matches := providerIdRE.FindStringSubmatch(providerID)
if len(matches) != 4 {
return "", "", "", errors.New("error splitting providerID")
}
return matches[1], matches[2], matches[3], nil
}

View file

@ -0,0 +1,80 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
type apiCallMetrics struct {
latency *prometheus.HistogramVec
errors *prometheus.CounterVec
}
var (
apiMetrics = registerAPIMetrics(
"request", // API function that is begin invoked.
"region", // region (optional).
"zone", // zone (optional).
)
)
type metricContext struct {
start time.Time
attributes []string
}
// Value for an unused label in the metric dimension.
const unusedMetricLabel = "<n/a>"
// Observe the result of a API call.
func (mc *metricContext) Observe(err error) error {
apiMetrics.latency.WithLabelValues(mc.attributes...).Observe(
time.Since(mc.start).Seconds())
if err != nil {
apiMetrics.errors.WithLabelValues(mc.attributes...).Inc()
}
return err
}
// registerApiMetrics adds metrics definitions for a category of API calls.
func registerAPIMetrics(attributes ...string) *apiCallMetrics {
metrics := &apiCallMetrics{
latency: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "cloudprovider_gce_api_request_duration_seconds",
Help: "Latency of a GCE API call",
},
attributes,
),
errors: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cloudprovider_gce_api_request_errors",
Help: "Number of errors for an API call",
},
attributes,
),
}
prometheus.MustRegister(metrics.latency)
prometheus.MustRegister(metrics.errors)
return metrics
}