Update go dependencies

This commit is contained in:
Manuel Alejandro de Brito Fontes 2018-09-27 14:20:02 -03:00
parent 3c1a5c5fc2
commit 6c33bee8fd
No known key found for this signature in database
GPG key ID: 786136016A8BA02A
620 changed files with 29782 additions and 15901 deletions

View file

@ -15,20 +15,21 @@ go_test(
"//pkg/controller/volume/persistentvolume:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
],
)
@ -46,18 +47,19 @@ go_library(
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)
@ -75,6 +77,7 @@ filegroup(
"//pkg/scheduler/algorithm:all-srcs",
"//pkg/scheduler/algorithmprovider:all-srcs",
"//pkg/scheduler/api:all-srcs",
"//pkg/scheduler/apis/config:all-srcs",
"//pkg/scheduler/cache:all-srcs",
"//pkg/scheduler/core:all-srcs",
"//pkg/scheduler/factory:all-srcs",

View file

@ -19,24 +19,20 @@ go_library(
"//pkg/apis/core:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//vendor/k8s.io/api/apps/v1beta1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"scheduler_interface_test.go",
"types_test.go",
],
srcs = ["types_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/cache:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
],
)

View file

@ -37,14 +37,14 @@ go_library(
"//pkg/scheduler/cache:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/parsers:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
@ -74,13 +74,12 @@ go_test(
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//pkg/util/parsers:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/apps/v1beta1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)

View file

@ -32,7 +32,7 @@ var (
// BalancedResourceAllocationMap should **NOT** be used alone, and **MUST** be used together
// with LeastRequestedPriority. It calculates the difference between the cpu and memory fraction
// of capacity, and prioritizes the host based on how close the two metrics are to each other.
// Detail: score = 10 - abs(cpuFraction-memoryFraction)*10. The algorithm is partly inspired by:
// Detail: score = 10 - variance(cpuFraction,memoryFraction,volumeFraction)*10. The algorithm is partly inspired by:
// "Wei Huang et al. An Energy Efficient Virtual Machine Placement Algorithm with Balanced
// Resource Utilization"
BalancedResourceAllocationMap = balancedResourcePriority.PriorityMap

View file

@ -26,11 +26,12 @@ import (
"k8s.io/kubernetes/pkg/util/parsers"
)
// This is a reasonable size range of all container images. 90%ile of images on dockerhub drops into this range.
// The two thresholds are used as bounds for the image score range. They correspond to a reasonable size range for
// container images compressed and stored in registries; 90%ile of images on dockerhub drops into this range.
const (
mb int64 = 1024 * 1024
minImgSize int64 = 23 * mb
maxImgSize int64 = 1000 * mb
mb int64 = 1024 * 1024
minThreshold int64 = 23 * mb
maxThreshold int64 = 1000 * mb
)
// ImageLocalityPriorityMap is a priority function that favors nodes that already have requested pod container's images.
@ -44,44 +45,55 @@ func ImageLocalityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *scheduler
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
}
sumSize := totalImageSize(nodeInfo, pod.Spec.Containers)
var score int
if priorityMeta, ok := meta.(*priorityMetadata); ok {
score = calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, priorityMeta.totalNumNodes))
} else {
// if we are not able to parse priority meta data, skip this priority
score = 0
}
return schedulerapi.HostPriority{
Host: node.Name,
Score: calculateScoreFromSize(sumSize),
Score: score,
}, nil
}
// calculateScoreFromSize calculates the priority of a node. sumSize is sum size of requested images on this node.
// 1. Split image size range into 10 buckets.
// 2. Decide the priority of a given sumSize based on which bucket it belongs to.
func calculateScoreFromSize(sumSize int64) int {
switch {
case sumSize == 0 || sumSize < minImgSize:
// 0 means none of the images required by this pod are present on this
// node or the total size of the images present is too small to be taken into further consideration.
return 0
case sumSize >= maxImgSize:
// If existing images' total size is larger than max, just make it highest priority.
return schedulerapi.MaxPriority
// calculatePriority returns the priority of a node. Given the sumScores of requested images on the node, the node's
// priority is obtained by scaling the maximum priority value with a ratio proportional to the sumScores.
func calculatePriority(sumScores int64) int {
if sumScores < minThreshold {
sumScores = minThreshold
} else if sumScores > maxThreshold {
sumScores = maxThreshold
}
return int((int64(schedulerapi.MaxPriority) * (sumSize - minImgSize) / (maxImgSize - minImgSize)) + 1)
return int(int64(schedulerapi.MaxPriority) * (sumScores - minThreshold) / (maxThreshold - minThreshold))
}
// totalImageSize returns the total image size of all the containers that are already on the node.
func totalImageSize(nodeInfo *schedulercache.NodeInfo, containers []v1.Container) int64 {
var total int64
// sumImageScores returns the sum of image scores of all the containers that are already on the node.
// Each image receives a raw score of its size, scaled by scaledImageScore. The raw scores are later used to calculate
// the final score. Note that the init containers are not considered for it's rare for users to deploy huge init containers.
func sumImageScores(nodeInfo *schedulercache.NodeInfo, containers []v1.Container, totalNumNodes int) int64 {
var sum int64
imageStates := nodeInfo.ImageStates()
imageSizes := nodeInfo.ImageSizes()
for _, container := range containers {
if size, ok := imageSizes[normalizedImageName(container.Image)]; ok {
total += size
if state, ok := imageStates[normalizedImageName(container.Image)]; ok {
sum += scaledImageScore(state, totalNumNodes)
}
}
return total
return sum
}
// scaledImageScore returns an adaptively scaled score for the given state of an image.
// The size of the image is used as the base score, scaled by a factor which considers how much nodes the image has "spread" to.
// This heuristic aims to mitigate the undesirable "node heating problem", i.e., pods get assigned to the same or
// a few nodes due to image locality.
func scaledImageScore(imageState *schedulercache.ImageStateSummary, totalNumNodes int) int64 {
spread := float64(imageState.NumNodes) / float64(totalNumNodes)
return int64(float64(imageState.Size) * spread)
}
// normalizedImageName returns the CRI compliant name for a given image.

View file

@ -29,7 +29,7 @@ var (
// prioritizes based on the minimum of the average of the fraction of requested to capacity.
//
// Details:
// cpu((capacity-sum(requested))*10/capacity) + memory((capacity-sum(requested))*10/capacity)/2
// (cpu((capacity-sum(requested))*10/capacity) + memory((capacity-sum(requested))*10/capacity))/2
LeastRequestedPriorityMap = leastResourcePriority.PriorityMap
)

View file

@ -21,7 +21,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)
@ -52,6 +51,7 @@ type priorityMetadata struct {
podSelectors []labels.Selector
controllerRef *metav1.OwnerReference
podFirstServiceSelector labels.Selector
totalNumNodes int
}
// PriorityMetadata is a PriorityMetadataProducer. Node info can be nil.
@ -65,8 +65,9 @@ func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, nodeNameToInfo
podTolerations: getAllTolerationPreferNoSchedule(pod.Spec.Tolerations),
affinity: pod.Spec.Affinity,
podSelectors: getSelectors(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister),
controllerRef: priorityutil.GetControllerRef(pod),
controllerRef: metav1.GetControllerOf(pod),
podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister),
totalNumNodes: len(nodeNameToInfo),
}
}

View file

@ -39,10 +39,10 @@ func mostResourceScorer(requested, allocable *schedulercache.Resource, includeVo
// The used capacity is calculated on a scale of 0-10
// 0 being the lowest priority and 10 being the highest.
// The more resources are used the higher the score is. This function
// is almost a reversed version of least_requested_priority.calculatUnusedScore
// is almost a reversed version of least_requested_priority.calculateUnusedScore
// (10 - calculateUnusedScore). The main difference is in rounding. It was added to
// keep the final formula clean and not to modify the widely used (by users
// in their default scheduling policies) calculateUSedScore.
// in their default scheduling policies) calculateUsedScore.
func mostRequestedScore(requested, capacity int64) int64 {
if capacity == 0 {
return 0

View file

@ -22,7 +22,6 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)
@ -39,7 +38,7 @@ func CalculateNodePreferAvoidPodsPriorityMap(pod *v1.Pod, meta interface{}, node
controllerRef = priorityMeta.controllerRef
} else {
// We couldn't parse metadata - fallback to the podspec.
controllerRef = priorityutil.GetControllerRef(pod)
controllerRef = metav1.GetControllerOf(pod)
}
if controllerRef != nil {

View file

@ -21,6 +21,8 @@ import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
@ -56,20 +58,31 @@ func (r *ResourceAllocationPriority) PriorityMap(
requested.Memory += nodeInfo.NonZeroRequest().Memory
var score int64
// Check if the pod has volumes and this could be added to scorer function for balanced resource allocation.
if len(pod.Spec.Volumes) >= 0 && nodeInfo.TransientInfo != nil {
if len(pod.Spec.Volumes) >= 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {
score = r.scorer(&requested, &allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
} else {
score = r.scorer(&requested, &allocatable, false, 0, 0)
}
if glog.V(10) {
glog.Infof(
"%v -> %v: %v, capacity %d millicores %d memory bytes, total request %d millicores %d memory bytes, score %d",
pod.Name, node.Name, r.Name,
allocatable.MilliCPU, allocatable.Memory,
requested.MilliCPU+allocatable.MilliCPU, requested.Memory+allocatable.Memory,
score,
)
if len(pod.Spec.Volumes) >= 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {
glog.Infof(
"%v -> %v: %v, capacity %d millicores %d memory bytes, %d volumes, total request %d millicores %d memory bytes %d volumes, score %d",
pod.Name, node.Name, r.Name,
allocatable.MilliCPU, allocatable.Memory, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount,
requested.MilliCPU, requested.Memory,
nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes,
score,
)
} else {
glog.Infof(
"%v -> %v: %v, capacity %d millicores %d memory bytes, total request %d millicores %d memory bytes, score %d",
pod.Name, node.Name, r.Name,
allocatable.MilliCPU, allocatable.Memory,
requested.MilliCPU, requested.Memory,
score,
)
}
}
return schedulerapi.HostPriority{

View file

@ -70,7 +70,7 @@ func ResourceLimitsPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedule
}, nil
}
// computeScore return 1 if limit value is less than or equal to allocable
// computeScore returns 1 if limit value is less than or equal to allocatable
// value, otherwise it returns 0.
func computeScore(limit, allocatable int64) int64 {
if limit != 0 && allocatable != 0 && limit <= allocatable {

View file

@ -97,16 +97,12 @@ func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{
glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name)
continue
}
matches := false
for _, selector := range selectors {
if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) {
matches = true
count++
break
}
}
if matches {
count++
}
}
return schedulerapi.HostPriority{
Host: node.Name,

View file

@ -11,17 +11,16 @@ go_test(
srcs = [
"non_zero_test.go",
"topologies_test.go",
"util_test.go",
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/selection:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/selection:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
],
)
@ -30,14 +29,12 @@ go_library(
srcs = [
"non_zero.go",
"topologies.go",
"util.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util",
deps = [
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
],
)

View file

@ -1,36 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// GetControllerRef gets pod's owner controller reference from a pod object.
func GetControllerRef(pod *v1.Pod) *metav1.OwnerReference {
if len(pod.OwnerReferences) == 0 {
return nil
}
for i := range pod.OwnerReferences {
ref := &pod.OwnerReferences[i]
if ref.Controller != nil && *ref.Controller {
return ref
}
}
return nil
}

View file

@ -17,9 +17,8 @@ limitations under the License.
package algorithm
import (
apps "k8s.io/api/apps/v1beta1"
apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/labels"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
@ -120,7 +119,7 @@ type ControllerLister interface {
// ReplicaSetLister interface represents anything that can produce a list of ReplicaSet; the list is consumed by a scheduler.
type ReplicaSetLister interface {
// Gets the replicasets for the given pod
GetPodReplicaSets(*v1.Pod) ([]*extensions.ReplicaSet, error)
GetPodReplicaSets(*v1.Pod) ([]*apps.ReplicaSet, error)
}
var _ ControllerLister = &EmptyControllerLister{}
@ -144,7 +143,7 @@ var _ ReplicaSetLister = &EmptyReplicaSetLister{}
type EmptyReplicaSetLister struct{}
// GetPodReplicaSets returns nil
func (f EmptyReplicaSetLister) GetPodReplicaSets(pod *v1.Pod) (rss []*extensions.ReplicaSet, err error) {
func (f EmptyReplicaSetLister) GetPodReplicaSets(pod *v1.Pod) (rss []*apps.ReplicaSet, err error) {
return nil, nil
}

View file

@ -15,12 +15,12 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/scheduler/api",
deps = [
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
],
)

View file

@ -36,6 +36,9 @@ const (
MaxPriority = 10
// MaxWeight defines the max weight value.
MaxWeight = MaxInt / MaxPriority
// DefaultPercentageOfNodesToScore defines the percentage of nodes of all nodes
// that once found feasible, the scheduler stops looking for more nodes.
DefaultPercentageOfNodesToScore = 50
)
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

View file

@ -31,33 +31,21 @@ func (in *ExtenderArgs) DeepCopyInto(out *ExtenderArgs) {
*out = *in
if in.Pod != nil {
in, out := &in.Pod, &out.Pod
if *in == nil {
*out = nil
} else {
*out = new(v1.Pod)
(*in).DeepCopyInto(*out)
}
*out = new(v1.Pod)
(*in).DeepCopyInto(*out)
}
if in.Nodes != nil {
in, out := &in.Nodes, &out.Nodes
if *in == nil {
*out = nil
} else {
*out = new(v1.NodeList)
(*in).DeepCopyInto(*out)
}
*out = new(v1.NodeList)
(*in).DeepCopyInto(*out)
}
if in.NodeNames != nil {
in, out := &in.NodeNames, &out.NodeNames
if *in == nil {
*out = nil
} else {
*out = new([]string)
if **in != nil {
in, out := *in, *out
*out = make([]string, len(*in))
copy(*out, *in)
}
*out = new([]string)
if **in != nil {
in, out := *in, *out
*out = make([]string, len(*in))
copy(*out, *in)
}
}
return
@ -110,12 +98,8 @@ func (in *ExtenderConfig) DeepCopyInto(out *ExtenderConfig) {
*out = *in
if in.TLSConfig != nil {
in, out := &in.TLSConfig, &out.TLSConfig
if *in == nil {
*out = nil
} else {
*out = new(rest.TLSClientConfig)
(*in).DeepCopyInto(*out)
}
*out = new(rest.TLSClientConfig)
(*in).DeepCopyInto(*out)
}
if in.ManagedResources != nil {
in, out := &in.ManagedResources, &out.ManagedResources
@ -140,24 +124,16 @@ func (in *ExtenderFilterResult) DeepCopyInto(out *ExtenderFilterResult) {
*out = *in
if in.Nodes != nil {
in, out := &in.Nodes, &out.Nodes
if *in == nil {
*out = nil
} else {
*out = new(v1.NodeList)
(*in).DeepCopyInto(*out)
}
*out = new(v1.NodeList)
(*in).DeepCopyInto(*out)
}
if in.NodeNames != nil {
in, out := &in.NodeNames, &out.NodeNames
if *in == nil {
*out = nil
} else {
*out = new([]string)
if **in != nil {
in, out := *in, *out
*out = make([]string, len(*in))
copy(*out, *in)
}
*out = new([]string)
if **in != nil {
in, out := *in, *out
*out = make([]string, len(*in))
copy(*out, *in)
}
}
if in.FailedNodes != nil {
@ -201,35 +177,37 @@ func (in *ExtenderPreemptionArgs) DeepCopyInto(out *ExtenderPreemptionArgs) {
*out = *in
if in.Pod != nil {
in, out := &in.Pod, &out.Pod
if *in == nil {
*out = nil
} else {
*out = new(v1.Pod)
(*in).DeepCopyInto(*out)
}
*out = new(v1.Pod)
(*in).DeepCopyInto(*out)
}
if in.NodeNameToVictims != nil {
in, out := &in.NodeNameToVictims, &out.NodeNameToVictims
*out = make(map[string]*Victims, len(*in))
for key, val := range *in {
var outVal *Victims
if val == nil {
(*out)[key] = nil
} else {
(*out)[key] = new(Victims)
val.DeepCopyInto((*out)[key])
in, out := &val, &outVal
*out = new(Victims)
(*in).DeepCopyInto(*out)
}
(*out)[key] = outVal
}
}
if in.NodeNameToMetaVictims != nil {
in, out := &in.NodeNameToMetaVictims, &out.NodeNameToMetaVictims
*out = make(map[string]*MetaVictims, len(*in))
for key, val := range *in {
var outVal *MetaVictims
if val == nil {
(*out)[key] = nil
} else {
(*out)[key] = new(MetaVictims)
val.DeepCopyInto((*out)[key])
in, out := &val, &outVal
*out = new(MetaVictims)
(*in).DeepCopyInto(*out)
}
(*out)[key] = outVal
}
}
return
@ -252,12 +230,15 @@ func (in *ExtenderPreemptionResult) DeepCopyInto(out *ExtenderPreemptionResult)
in, out := &in.NodeNameToMetaVictims, &out.NodeNameToMetaVictims
*out = make(map[string]*MetaVictims, len(*in))
for key, val := range *in {
var outVal *MetaVictims
if val == nil {
(*out)[key] = nil
} else {
(*out)[key] = new(MetaVictims)
val.DeepCopyInto((*out)[key])
in, out := &val, &outVal
*out = new(MetaVictims)
(*in).DeepCopyInto(*out)
}
(*out)[key] = outVal
}
}
return
@ -391,11 +372,10 @@ func (in *MetaVictims) DeepCopyInto(out *MetaVictims) {
in, out := &in.Pods, &out.Pods
*out = make([]*MetaPod, len(*in))
for i := range *in {
if (*in)[i] == nil {
(*out)[i] = nil
} else {
(*out)[i] = new(MetaPod)
(*in)[i].DeepCopyInto((*out)[i])
if (*in)[i] != nil {
in, out := &(*in)[i], &(*out)[i]
*out = new(MetaPod)
**out = **in
}
}
}
@ -463,21 +443,13 @@ func (in *PredicateArgument) DeepCopyInto(out *PredicateArgument) {
*out = *in
if in.ServiceAffinity != nil {
in, out := &in.ServiceAffinity, &out.ServiceAffinity
if *in == nil {
*out = nil
} else {
*out = new(ServiceAffinity)
(*in).DeepCopyInto(*out)
}
*out = new(ServiceAffinity)
(*in).DeepCopyInto(*out)
}
if in.LabelsPresence != nil {
in, out := &in.LabelsPresence, &out.LabelsPresence
if *in == nil {
*out = nil
} else {
*out = new(LabelsPresence)
(*in).DeepCopyInto(*out)
}
*out = new(LabelsPresence)
(*in).DeepCopyInto(*out)
}
return
}
@ -497,12 +469,8 @@ func (in *PredicatePolicy) DeepCopyInto(out *PredicatePolicy) {
*out = *in
if in.Argument != nil {
in, out := &in.Argument, &out.Argument
if *in == nil {
*out = nil
} else {
*out = new(PredicateArgument)
(*in).DeepCopyInto(*out)
}
*out = new(PredicateArgument)
(*in).DeepCopyInto(*out)
}
return
}
@ -522,30 +490,18 @@ func (in *PriorityArgument) DeepCopyInto(out *PriorityArgument) {
*out = *in
if in.ServiceAntiAffinity != nil {
in, out := &in.ServiceAntiAffinity, &out.ServiceAntiAffinity
if *in == nil {
*out = nil
} else {
*out = new(ServiceAntiAffinity)
**out = **in
}
*out = new(ServiceAntiAffinity)
**out = **in
}
if in.LabelPreference != nil {
in, out := &in.LabelPreference, &out.LabelPreference
if *in == nil {
*out = nil
} else {
*out = new(LabelPreference)
**out = **in
}
*out = new(LabelPreference)
**out = **in
}
if in.RequestedToCapacityRatioArguments != nil {
in, out := &in.RequestedToCapacityRatioArguments, &out.RequestedToCapacityRatioArguments
if *in == nil {
*out = nil
} else {
*out = new(RequestedToCapacityRatioArguments)
(*in).DeepCopyInto(*out)
}
*out = new(RequestedToCapacityRatioArguments)
(*in).DeepCopyInto(*out)
}
return
}
@ -565,12 +521,8 @@ func (in *PriorityPolicy) DeepCopyInto(out *PriorityPolicy) {
*out = *in
if in.Argument != nil {
in, out := &in.Argument, &out.Argument
if *in == nil {
*out = nil
} else {
*out = new(PriorityArgument)
(*in).DeepCopyInto(*out)
}
*out = new(PriorityArgument)
(*in).DeepCopyInto(*out)
}
return
}
@ -666,11 +618,10 @@ func (in *Victims) DeepCopyInto(out *Victims) {
in, out := &in.Pods, &out.Pods
*out = make([]*v1.Pod, len(*in))
for i := range *in {
if (*in)[i] == nil {
(*out)[i] = nil
} else {
(*out)[i] = new(v1.Pod)
(*in)[i].DeepCopyInto((*out)[i])
if (*in)[i] != nil {
in, out := &(*in)[i], &(*out)[i]
*out = new(v1.Pod)
(*in).DeepCopyInto(*out)
}
}
}

View file

@ -6,6 +6,7 @@ go_library(
"cache.go",
"interface.go",
"node_info.go",
"node_tree.go",
"util.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/cache",
@ -15,13 +16,15 @@ go_library(
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)
@ -30,22 +33,25 @@ go_test(
srcs = [
"cache_test.go",
"node_info_test.go",
"node_tree_test.go",
"util_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/util/parsers:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)

View file

@ -23,6 +23,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
@ -51,14 +52,17 @@ type schedulerCache struct {
period time.Duration
// This mutex guards all fields within this cache struct.
mu sync.Mutex
mu sync.RWMutex
// a set of assumed pod keys.
// The key could further be used to get an entry in podStates.
assumedPods map[string]bool
// a map from pod key to podState.
podStates map[string]*podState
nodes map[string]*NodeInfo
nodeTree *NodeTree
pdbs map[string]*policy.PodDisruptionBudget
// A map from image name to its imageState.
imageStates map[string]*imageState
}
type podState struct {
@ -69,6 +73,29 @@ type podState struct {
bindingFinished bool
}
type imageState struct {
// Size of the image
size int64
// A set of node names for nodes having this image present
nodes sets.String
}
// ImageStateSummary provides summarized information about the state of an image.
type ImageStateSummary struct {
// Size of the image
Size int64
// Used to track how many nodes have this image
NumNodes int
}
// createImageStateSummary returns a summarizing snapshot of the given image's state.
func (cache *schedulerCache) createImageStateSummary(state *imageState) *ImageStateSummary {
return &ImageStateSummary{
Size: state.size,
NumNodes: len(state.nodes),
}
}
func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache {
return &schedulerCache{
ttl: ttl,
@ -76,17 +103,19 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
stop: stop,
nodes: make(map[string]*NodeInfo),
nodeTree: newNodeTree(nil),
assumedPods: make(map[string]bool),
podStates: make(map[string]*podState),
pdbs: make(map[string]*policy.PodDisruptionBudget),
imageStates: make(map[string]*imageState),
}
}
// Snapshot takes a snapshot of the current schedulerCache. The method has performance impact,
// and should be only used in non-critical path.
func (cache *schedulerCache) Snapshot() *Snapshot {
cache.mu.Lock()
defer cache.mu.Unlock()
cache.mu.RLock()
defer cache.mu.RUnlock()
nodes := make(map[string]*NodeInfo)
for k, v := range cache.nodes {
@ -113,6 +142,7 @@ func (cache *schedulerCache) Snapshot() *Snapshot {
func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error {
cache.mu.Lock()
defer cache.mu.Unlock()
for name, info := range cache.nodes {
if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil {
// Transient scheduler info is reset here.
@ -136,8 +166,8 @@ func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) {
}
func (cache *schedulerCache) FilteredList(podFilter PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
cache.mu.Lock()
defer cache.mu.Unlock()
cache.mu.RLock()
defer cache.mu.RUnlock()
// podFilter is expected to return true for most or all of the pods. We
// can avoid expensive array growth without wasting too much memory by
// pre-allocating capacity.
@ -188,8 +218,8 @@ func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
cache.mu.RLock()
defer cache.mu.RUnlock()
glog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
currState, ok := cache.podStates[key]
@ -317,6 +347,7 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
if err := cache.updatePod(oldPod, newPod); err != nil {
return err
}
currState.pod = newPod
default:
return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key)
}
@ -358,8 +389,8 @@ func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
return false, err
}
cache.mu.Lock()
defer cache.mu.Unlock()
cache.mu.RLock()
defer cache.mu.RUnlock()
b, found := cache.assumedPods[key]
if !found {
@ -374,8 +405,8 @@ func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
return nil, err
}
cache.mu.Lock()
defer cache.mu.Unlock()
cache.mu.RLock()
defer cache.mu.RUnlock()
podState, ok := cache.podStates[key]
if !ok {
@ -393,7 +424,12 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error {
if !ok {
n = NewNodeInfo()
cache.nodes[node.Name] = n
} else {
cache.removeNodeImageStates(n.node)
}
cache.nodeTree.AddNode(node)
cache.addNodeImageStates(node, n)
return n.SetNode(node)
}
@ -405,7 +441,12 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
if !ok {
n = NewNodeInfo()
cache.nodes[newNode.Name] = n
} else {
cache.removeNodeImageStates(n.node)
}
cache.nodeTree.UpdateNode(oldNode, newNode)
cache.addNodeImageStates(newNode, n)
return n.SetNode(newNode)
}
@ -424,9 +465,63 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
if len(n.pods) == 0 && n.node == nil {
delete(cache.nodes, node.Name)
}
cache.nodeTree.RemoveNode(node)
cache.removeNodeImageStates(node)
return nil
}
// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
// scheduler cache. This function assumes the lock to scheduler cache has been acquired.
func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *NodeInfo) {
newSum := make(map[string]*ImageStateSummary)
for _, image := range node.Status.Images {
for _, name := range image.Names {
// update the entry in imageStates
state, ok := cache.imageStates[name]
if !ok {
state = &imageState{
size: image.SizeBytes,
nodes: sets.NewString(node.Name),
}
cache.imageStates[name] = state
} else {
state.nodes.Insert(node.Name)
}
// create the imageStateSummary for this image
if _, ok := newSum[name]; !ok {
newSum[name] = cache.createImageStateSummary(state)
}
}
}
nodeInfo.imageStates = newSum
}
// removeNodeImageStates removes the given node record from image entries having the node
// in imageStates cache. After the removal, if any image becomes free, i.e., the image
// is no longer available on any node, the image entry will be removed from imageStates.
func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) {
if node == nil {
return
}
for _, image := range node.Status.Images {
for _, name := range image.Names {
state, ok := cache.imageStates[name]
if ok {
state.nodes.Delete(node.Name)
if len(state.nodes) == 0 {
// Remove the unused image to make sure the length of
// imageStates represents the total number of different
// images on all nodes
delete(cache.imageStates, name)
}
}
}
}
}
func (cache *schedulerCache) AddPDB(pdb *policy.PodDisruptionBudget) error {
cache.mu.Lock()
defer cache.mu.Unlock()
@ -449,8 +544,8 @@ func (cache *schedulerCache) RemovePDB(pdb *policy.PodDisruptionBudget) error {
}
func (cache *schedulerCache) ListPDBs(selector labels.Selector) ([]*policy.PodDisruptionBudget, error) {
cache.mu.Lock()
defer cache.mu.Unlock()
cache.mu.RLock()
defer cache.mu.RUnlock()
var pdbs []*policy.PodDisruptionBudget
for _, pdb := range cache.pdbs {
if selector.Matches(labels.Set(pdb.Labels)) {
@ -461,8 +556,8 @@ func (cache *schedulerCache) ListPDBs(selector labels.Selector) ([]*policy.PodDi
}
func (cache *schedulerCache) IsUpToDate(n *NodeInfo) bool {
cache.mu.Lock()
defer cache.mu.Unlock()
cache.mu.RLock()
defer cache.mu.RUnlock()
node, ok := cache.nodes[n.Node().Name]
return ok && n.generation == node.generation
}
@ -508,3 +603,7 @@ func (cache *schedulerCache) expirePod(key string, ps *podState) error {
delete(cache.podStates, key)
return nil
}
func (cache *schedulerCache) NodeTree() *NodeTree {
return cache.nodeTree
}

View file

@ -125,6 +125,9 @@ type Cache interface {
// IsUpToDate returns true if the given NodeInfo matches the current data in the cache.
IsUpToDate(n *NodeInfo) bool
// NodeTree returns a node tree structure
NodeTree() *NodeTree
}
// Snapshot is a snapshot of cache state

View file

@ -58,9 +58,10 @@ type NodeInfo struct {
taints []v1.Taint
taintsErr error
// This is a map from image name to image size, also for checking image existence on the node
// Cache it here to avoid rebuilding the map during scheduling, e.g., in image_locality.go
imageSizes map[string]int64
// imageStates holds the entry of an image if and only if this image is on the node. The entry can be used for
// checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image
// state information.
imageStates map[string]*ImageStateSummary
// TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of
// scheduling cycle.
@ -261,7 +262,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
TransientInfo: newTransientSchedulerInfo(),
generation: nextGeneration(),
usedPorts: make(util.HostPortInfo),
imageSizes: make(map[string]int64),
imageStates: make(map[string]*ImageStateSummary),
}
for _, pod := range pods {
ni.AddPod(pod)
@ -293,12 +294,12 @@ func (n *NodeInfo) UsedPorts() util.HostPortInfo {
return n.usedPorts
}
// ImageSizes returns the image size information on this node.
func (n *NodeInfo) ImageSizes() map[string]int64 {
// ImageStates returns the state information of all images.
func (n *NodeInfo) ImageStates() map[string]*ImageStateSummary {
if n == nil {
return nil
}
return n.imageSizes
return n.imageStates
}
// PodsWithAffinity return all pods with (anti)affinity constraints on this node.
@ -392,15 +393,20 @@ func (n *NodeInfo) Clone() *NodeInfo {
diskPressureCondition: n.diskPressureCondition,
pidPressureCondition: n.pidPressureCondition,
usedPorts: make(util.HostPortInfo),
imageSizes: n.imageSizes,
imageStates: n.imageStates,
generation: n.generation,
}
if len(n.pods) > 0 {
clone.pods = append([]*v1.Pod(nil), n.pods...)
}
if len(n.usedPorts) > 0 {
for k, v := range n.usedPorts {
clone.usedPorts[k] = v
// util.HostPortInfo is a map-in-map struct
// make sure it's deep copied
for ip, portMap := range n.usedPorts {
clone.usedPorts[ip] = make(map[util.ProtocolPort]struct{})
for protocolPort, v := range portMap {
clone.usedPorts[ip][protocolPort] = v
}
}
}
if len(n.podsWithAffinity) > 0 {
@ -547,17 +553,6 @@ func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) {
}
}
func (n *NodeInfo) updateImageSizes() {
node := n.Node()
imageSizes := make(map[string]int64)
for _, image := range node.Status.Images {
for _, name := range image.Names {
imageSizes[name] = image.SizeBytes
}
}
n.imageSizes = imageSizes
}
// SetNode sets the overall node information.
func (n *NodeInfo) SetNode(node *v1.Node) error {
n.node = node
@ -579,7 +574,6 @@ func (n *NodeInfo) SetNode(node *v1.Node) error {
}
}
n.TransientInfo = newTransientSchedulerInfo()
n.updateImageSizes()
n.generation = nextGeneration()
return nil
}
@ -596,6 +590,7 @@ func (n *NodeInfo) RemoveNode(node *v1.Node) error {
n.memoryPressureCondition = v1.ConditionUnknown
n.diskPressureCondition = v1.ConditionUnknown
n.pidPressureCondition = v1.ConditionUnknown
n.imageStates = make(map[string]*ImageStateSummary)
n.generation = nextGeneration()
return nil
}

View file

@ -0,0 +1,187 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"sync"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilnode "k8s.io/kubernetes/pkg/util/node"
"github.com/golang/glog"
)
// NodeTree is a tree-like data structure that holds node names in each zone. Zone names are
// keys to "NodeTree.tree" and values of "NodeTree.tree" are arrays of node names.
type NodeTree struct {
tree map[string]*nodeArray // a map from zone (region-zone) to an array of nodes in the zone.
zones []string // a list of all the zones in the tree (keys)
zoneIndex int
exhaustedZones sets.String // set of zones that all of their nodes are returned by next()
NumNodes int
mu sync.RWMutex
}
// nodeArray is a struct that has nodes that are in a zone.
// We use a slice (as opposed to a set/map) to store the nodes because iterating over the nodes is
// a lot more frequent than searching them by name.
type nodeArray struct {
nodes []string
lastIndex int
}
func (na *nodeArray) next() (nodeName string, exhausted bool) {
if len(na.nodes) == 0 {
glog.Error("The nodeArray is empty. It should have been deleted from NodeTree.")
return "", false
}
if na.lastIndex >= len(na.nodes) {
return "", true
}
nodeName = na.nodes[na.lastIndex]
na.lastIndex++
return nodeName, false
}
func newNodeTree(nodes []*v1.Node) *NodeTree {
nt := &NodeTree{
tree: make(map[string]*nodeArray),
exhaustedZones: sets.NewString(),
}
for _, n := range nodes {
nt.AddNode(n)
}
return nt
}
// AddNode adds a node and its corresponding zone to the tree. If the zone already exists, the node
// is added to the array of nodes in that zone.
func (nt *NodeTree) AddNode(n *v1.Node) {
nt.mu.Lock()
defer nt.mu.Unlock()
nt.addNode(n)
}
func (nt *NodeTree) addNode(n *v1.Node) {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for _, nodeName := range na.nodes {
if nodeName == n.Name {
glog.Warningf("node %v already exist in the NodeTree", n.Name)
return
}
}
na.nodes = append(na.nodes, n.Name)
} else {
nt.zones = append(nt.zones, zone)
nt.tree[zone] = &nodeArray{nodes: []string{n.Name}, lastIndex: 0}
}
glog.V(5).Infof("Added node %v in group %v to NodeTree", n.Name, zone)
nt.NumNodes++
}
// RemoveNode removes a node from the NodeTree.
func (nt *NodeTree) RemoveNode(n *v1.Node) error {
nt.mu.Lock()
defer nt.mu.Unlock()
return nt.removeNode(n)
}
func (nt *NodeTree) removeNode(n *v1.Node) error {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for i, nodeName := range na.nodes {
if nodeName == n.Name {
na.nodes = append(na.nodes[:i], na.nodes[i+1:]...)
if len(na.nodes) == 0 {
nt.removeZone(zone)
}
glog.V(5).Infof("Removed node %v in group %v from NodeTree", n.Name, zone)
nt.NumNodes--
return nil
}
}
}
glog.Errorf("Node %v in group %v was not found", n.Name, zone)
return fmt.Errorf("node %v in group %v was not found", n.Name, zone)
}
// removeZone removes a zone from tree.
// This function must be called while writer locks are hold.
func (nt *NodeTree) removeZone(zone string) {
delete(nt.tree, zone)
for i, z := range nt.zones {
if z == zone {
nt.zones = append(nt.zones[:i], nt.zones[i+1:]...)
}
}
}
// UpdateNode updates a node in the NodeTree.
func (nt *NodeTree) UpdateNode(old, new *v1.Node) {
var oldZone string
if old != nil {
oldZone = utilnode.GetZoneKey(old)
}
newZone := utilnode.GetZoneKey(new)
// If the zone ID of the node has not changed, we don't need to do anything. Name of the node
// cannot be changed in an update.
if oldZone == newZone {
return
}
nt.mu.Lock()
defer nt.mu.Unlock()
nt.removeNode(old) // No error checking. We ignore whether the old node exists or not.
nt.addNode(new)
}
func (nt *NodeTree) resetExhausted() {
for _, na := range nt.tree {
na.lastIndex = 0
}
nt.exhaustedZones = sets.NewString()
}
// Next returns the name of the next node. NodeTree iterates over zones and in each zone iterates
// over nodes in a round robin fashion.
func (nt *NodeTree) Next() string {
nt.mu.Lock()
defer nt.mu.Unlock()
if len(nt.zones) == 0 {
return ""
}
for {
if nt.zoneIndex >= len(nt.zones) {
nt.zoneIndex = 0
}
zone := nt.zones[nt.zoneIndex]
nt.zoneIndex++
// We do not check the set of exhausted zones before calling next() on the zone. This ensures
// that if more nodes are added to a zone after it is exhausted, we iterate over the new nodes.
nodeName, exhausted := nt.tree[zone].next()
if exhausted {
nt.exhaustedZones.Insert(zone)
if len(nt.exhaustedZones) == len(nt.zones) { // all zones are exhausted. we should reset.
nt.resetExhausted()
}
} else {
return nodeName
}
}
}

View file

@ -16,7 +16,10 @@ limitations under the License.
package cache
import "k8s.io/api/core/v1"
import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
)
// CreateNodeNameToInfoMap obtains a list of pods and pivots that list into a map where the keys are node names
// and the values are the aggregated information for that node.
@ -29,11 +32,47 @@ func CreateNodeNameToInfoMap(pods []*v1.Pod, nodes []*v1.Node) map[string]*NodeI
}
nodeNameToInfo[nodeName].AddPod(pod)
}
imageExistenceMap := createImageExistenceMap(nodes)
for _, node := range nodes {
if _, ok := nodeNameToInfo[node.Name]; !ok {
nodeNameToInfo[node.Name] = NewNodeInfo()
}
nodeNameToInfo[node.Name].SetNode(node)
nodeInfo := nodeNameToInfo[node.Name]
nodeInfo.SetNode(node)
nodeInfo.imageStates = getNodeImageStates(node, imageExistenceMap)
}
return nodeNameToInfo
}
// getNodeImageStates returns the given node's image states based on the given imageExistence map.
func getNodeImageStates(node *v1.Node, imageExistenceMap map[string]sets.String) map[string]*ImageStateSummary {
imageStates := make(map[string]*ImageStateSummary)
for _, image := range node.Status.Images {
for _, name := range image.Names {
imageStates[name] = &ImageStateSummary{
Size: image.SizeBytes,
NumNodes: len(imageExistenceMap[name]),
}
}
}
return imageStates
}
// createImageExistenceMap returns a map recording on which nodes the images exist, keyed by the images' names.
func createImageExistenceMap(nodes []*v1.Node) map[string]sets.String {
imageExistenceMap := make(map[string]sets.String)
for _, node := range nodes {
for _, image := range node.Status.Images {
for _, name := range image.Names {
if _, ok := imageExistenceMap[name]; !ok {
imageExistenceMap[name] = sets.NewString(node.Name)
} else {
imageExistenceMap[name].Insert(node.Name)
}
}
}
}
return imageExistenceMap
}

View file

@ -17,7 +17,6 @@ limitations under the License.
package scheduler
import (
"fmt"
"time"
"k8s.io/api/core/v1"
@ -34,6 +33,7 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
@ -73,21 +73,29 @@ func (sched *Scheduler) StopEverything() {
close(sched.config.StopEverything)
}
// Cache returns the cache in scheduler for test to check the data in scheduler.
func (sched *Scheduler) Cache() schedulercache.Cache {
return sched.config.SchedulerCache
}
// Configurator defines I/O, caching, and other functionality needed to
// construct a new scheduler. An implementation of this can be seen in
// factory.go.
type Configurator interface {
GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error)
GetPriorityMetadataProducer() (algorithm.PriorityMetadataProducer, error)
// Exposed for testing
GetHardPodAffinitySymmetricWeight() int32
// Exposed for testing
MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue core.SchedulingQueue) func(pod *v1.Pod, err error)
// Predicate related accessors to be exposed for use by k8s.io/autoscaler/cluster-autoscaler
GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error)
GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error)
GetHardPodAffinitySymmetricWeight() int32
GetSchedulerName() string
MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue core.SchedulingQueue) func(pod *v1.Pod, err error)
// Needs to be exposed for things like integration tests where we want to make fake nodes.
GetNodeLister() corelisters.NodeLister
// Exposed for testing
GetClient() clientset.Interface
// Exposed for testing
GetScheduledPodLister() corelisters.PodLister
Create() (*Config, error)
@ -104,7 +112,7 @@ type Config struct {
SchedulerCache schedulercache.Cache
// Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod
Ecache *core.EquivalenceCache
Ecache *equivalence.Cache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
@ -175,10 +183,6 @@ func (sched *Scheduler) Run() {
return
}
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything)
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
@ -191,7 +195,6 @@ func (sched *Scheduler) Config() *Config {
func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
if err != nil {
glog.V(1).Infof("Failed to schedule pod: %v/%v", pod.Namespace, pod.Name)
pod = pod.DeepCopy()
sched.config.Error(pod, err)
sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "%v", err)
@ -232,7 +235,7 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e
nodeName = node.Name
err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
if err != nil {
glog.Errorf("Error in preemption process. Cannot update pod %v annotations: %v", preemptor.Name, err)
glog.Errorf("Error in preemption process. Cannot update pod %v/%v annotations: %v", preemptor.Namespace, preemptor.Name, err)
return "", err
}
for _, victim := range victims {
@ -257,17 +260,12 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e
return nodeName, err
}
// assumeAndBindVolumes will update the volume cache and then asynchronously bind volumes if required.
//
// If volume binding is required, then the bind volumes routine will update the pod to send it back through
// the scheduler.
//
// Otherwise, return nil error and continue to assume the pod.
// assumeVolumes will update the volume cache with the chosen bindings
//
// This function modifies assumed if volume binding is required.
func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error {
func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bool, err error) {
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
allBound, bindingRequired, err := sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host)
allBound, err = sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host)
if err != nil {
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePodVolumes failed: %v", err)
@ -277,76 +275,38 @@ func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error
Reason: "SchedulerError",
Message: err.Error(),
})
return err
}
if !allBound {
err = fmt.Errorf("Volume binding started, waiting for completion")
if bindingRequired {
if sched.config.Ecache != nil {
invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred)
sched.config.Ecache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}
// bindVolumesWorker() will update the Pod object to put it back in the scheduler queue
sched.config.VolumeBinder.BindQueue.Add(assumed)
} else {
// We are just waiting for PV controller to finish binding, put it back in the
// scheduler queue
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: "VolumeBindingWaiting",
})
}
return err
// Invalidate ecache because assumed volumes could have affected the cached
// pvs for other pods
if sched.config.Ecache != nil {
invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred)
sched.config.Ecache.InvalidatePredicates(invalidPredicates)
}
}
return nil
return
}
// bindVolumesWorker() processes pods queued in assumeAndBindVolumes() and tries to
// make the API update for volume binding.
// This function runs forever until the volume BindQueue is closed.
func (sched *Scheduler) bindVolumesWorker() {
workFunc := func() bool {
keyObj, quit := sched.config.VolumeBinder.BindQueue.Get()
if quit {
return true
}
defer sched.config.VolumeBinder.BindQueue.Done(keyObj)
// bindVolumes will make the API update with the assumed bindings and wait until
// the PV controller has completely finished the binding operation.
//
// If binding errors, times out or gets undone, then an error will be returned to
// retry scheduling.
func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error {
var reason string
var eventType string
assumed, ok := keyObj.(*v1.Pod)
if !ok {
glog.V(4).Infof("Object is not a *v1.Pod")
return false
glog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed)
if err != nil {
glog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err)
// Unassume the Pod and retry scheduling
if forgetErr := sched.config.SchedulerCache.ForgetPod(assumed); forgetErr != nil {
glog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// TODO: add metrics
var reason string
var eventType string
glog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
// The Pod is always sent back to the scheduler afterwards.
err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed)
if err != nil {
glog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err)
reason = "VolumeBindingFailed"
eventType = v1.EventTypeWarning
} else {
glog.V(4).Infof("Successfully bound volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
reason = "VolumeBindingWaiting"
eventType = v1.EventTypeNormal
err = fmt.Errorf("Volume binding started, waiting for completion")
}
// Always fail scheduling regardless of binding success.
// The Pod needs to be sent back through the scheduler to:
// * Retry volume binding if it fails.
// * Retry volume binding if dynamic provisioning fails.
// * Bind the Pod to the Node once all volumes are bound.
reason = "VolumeBindingFailed"
eventType = v1.EventTypeWarning
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, eventType, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
@ -354,15 +314,11 @@ func (sched *Scheduler) bindVolumesWorker() {
Status: v1.ConditionFalse,
Reason: reason,
})
return false
return err
}
for {
if quit := workFunc(); quit {
glog.V(4).Infof("bindVolumesWorker shutting down")
break
}
}
glog.V(5).Infof("Success binding volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
return nil
}
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
@ -470,16 +426,12 @@ func (sched *Scheduler) scheduleOne() {
// Assume volumes first before assuming the pod.
//
// If no volumes need binding, then nil is returned, and continue to assume the pod.
// If all volumes are completely bound, then allBound is true and binding will be skipped.
//
// Otherwise, error is returned and volume binding is started asynchronously for all of the pod's volumes.
// scheduleOne() returns immediately on error, so that it doesn't continue to assume the pod.
//
// After the asynchronous volume binding updates are made, it will send the pod back through the scheduler for
// subsequent passes until all volumes are fully bound.
// Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
//
// This function modifies 'assumedPod' if volume binding is required.
err = sched.assumeAndBindVolumes(assumedPod, suggestedHost)
allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
if err != nil {
return
}
@ -491,6 +443,14 @@ func (sched *Scheduler) scheduleOne() {
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
// Bind volumes first before Pod
if !allBound {
err = sched.bindVolumes(assumedPod)
if err != nil {
return
}
}
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{

View file

@ -34,16 +34,6 @@ type FakeConfigurator struct {
Config *Config
}
// GetPriorityFunctionConfigs is not implemented yet.
func (fc *FakeConfigurator) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) {
return nil, fmt.Errorf("not implemented")
}
// GetPriorityMetadataProducer is not implemented yet.
func (fc *FakeConfigurator) GetPriorityMetadataProducer() (algorithm.PriorityMetadataProducer, error) {
return nil, fmt.Errorf("not implemented")
}
// GetPredicateMetadataProducer is not implemented yet.
func (fc *FakeConfigurator) GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error) {
return nil, fmt.Errorf("not implemented")
@ -59,11 +49,6 @@ func (fc *FakeConfigurator) GetHardPodAffinitySymmetricWeight() int32 {
panic("not implemented")
}
// GetSchedulerName is not implemented yet.
func (fc *FakeConfigurator) GetSchedulerName() string {
panic("not implemented")
}
// MakeDefaultErrorFunc is not implemented yet.
func (fc *FakeConfigurator) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue core.SchedulingQueue) func(pod *v1.Pod, err error) {
return nil

View file

@ -15,8 +15,9 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/apis/scheduling:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
],
)
@ -30,10 +31,10 @@ go_library(
deps = [
"//pkg/apis/scheduling:go_default_library",
"//pkg/features:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)