Update go dependencies and cleanup deprecated packages

This commit is contained in:
Manuel de Brito Fontes 2018-01-07 12:10:25 -03:00
parent 03a1e20fde
commit 44fd79d061
No known key found for this signature in database
GPG key ID: 786136016A8BA02A
1099 changed files with 75691 additions and 31913 deletions

View file

@ -18,7 +18,6 @@ go_library(
"kubelet_pods.go",
"kubelet_resources.go",
"kubelet_volumes.go",
"networks.go",
"oom_watcher.go",
"pod_container_deletor.go",
"pod_workers.go",
@ -28,15 +27,16 @@ go_library(
"util.go",
"volume_host.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet",
deps = [
"//cmd/kubelet/app/options:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/api/v1/helper:go_default_library",
"//pkg/api/v1/helper/qos:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/api/v1/resource:go_default_library",
"//pkg/api/v1/validation:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/pods:go_default_library",
"//pkg/apis/core/v1:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/apis/core/v1/helper/qos:go_default_library",
"//pkg/apis/core/v1/validation:go_default_library",
"//pkg/capabilities:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/features:go_default_library",
@ -53,7 +53,6 @@ go_library(
"//pkg/kubelet/configmap:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/dockershim:go_default_library",
"//pkg/kubelet/dockershim/libdocker:go_default_library",
"//pkg/kubelet/dockershim/remote:go_default_library",
"//pkg/kubelet/envvars:go_default_library",
"//pkg/kubelet/events:go_default_library",
@ -65,7 +64,9 @@ go_library(
"//pkg/kubelet/kuberuntime:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/mountpod:go_default_library",
"//pkg/kubelet/network:go_default_library",
"//pkg/kubelet/network/dns:go_default_library",
"//pkg/kubelet/pleg:go_default_library",
"//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/preemption:go_default_library",
@ -116,7 +117,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
@ -130,12 +130,12 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/tools/remotecommand:go_default_library",
"//vendor/k8s.io/client-go/util/certificate:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/k8s.io/client-go/util/integer:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
@ -153,7 +153,6 @@ go_test(
"kubelet_resources_test.go",
"kubelet_test.go",
"kubelet_volumes_test.go",
"networks_test.go",
"oom_watcher_test.go",
"pod_container_deletor_test.go",
"pod_workers_test.go",
@ -165,10 +164,11 @@ go_test(
],
"//conditions:default": [],
}),
importpath = "k8s.io/kubernetes/pkg/kubelet",
library = ":go_default_library",
deps = [
"//pkg/api:go_default_library",
"//pkg/api/install:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core/install:go_default_library",
"//pkg/capabilities:go_default_library",
"//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/kubelet/apis:go_default_library",
@ -208,6 +208,7 @@ go_test(
"//pkg/volume/host_path:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/github.com/google/cadvisor/info/v2:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
@ -253,13 +254,13 @@ filegroup(
"//pkg/kubelet/apis:all-srcs",
"//pkg/kubelet/cadvisor:all-srcs",
"//pkg/kubelet/certificate:all-srcs",
"//pkg/kubelet/checkpoint:all-srcs",
"//pkg/kubelet/client:all-srcs",
"//pkg/kubelet/cm:all-srcs",
"//pkg/kubelet/config:all-srcs",
"//pkg/kubelet/configmap:all-srcs",
"//pkg/kubelet/container:all-srcs",
"//pkg/kubelet/custommetrics:all-srcs",
"//pkg/kubelet/deviceplugin:all-srcs",
"//pkg/kubelet/dockershim:all-srcs",
"//pkg/kubelet/envvars:all-srcs",
"//pkg/kubelet/events:all-srcs",
@ -271,6 +272,7 @@ filegroup(
"//pkg/kubelet/leaky:all-srcs",
"//pkg/kubelet/lifecycle:all-srcs",
"//pkg/kubelet/metrics:all-srcs",
"//pkg/kubelet/mountpod:all-srcs",
"//pkg/kubelet/network:all-srcs",
"//pkg/kubelet/pleg:all-srcs",
"//pkg/kubelet/pod:all-srcs",
@ -288,6 +290,7 @@ filegroup(
"//pkg/kubelet/types:all-srcs",
"//pkg/kubelet/util:all-srcs",
"//pkg/kubelet/volumemanager:all-srcs",
"//pkg/kubelet/winstats:all-srcs",
],
tags = ["automanaged"],
)

View file

@ -11,6 +11,7 @@ go_library(
"well_known_annotations.go",
"well_known_labels.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/apis",
)
filegroup(
@ -25,7 +26,7 @@ filegroup(
srcs = [
":package-srcs",
"//pkg/kubelet/apis/cri:all-srcs",
"//pkg/kubelet/apis/deviceplugin/v1alpha1:all-srcs",
"//pkg/kubelet/apis/deviceplugin/v1alpha:all-srcs",
"//pkg/kubelet/apis/kubeletconfig:all-srcs",
"//pkg/kubelet/apis/stats/v1alpha1:all-srcs",
],

View file

@ -8,6 +8,7 @@ load(
go_library(
name = "go_default_library",
srcs = ["services.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/apis/cri",
deps = ["//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library"],
)

View file

@ -11,6 +11,7 @@ go_library(
"api.pb.go",
"constants.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime",
deps = [
"//vendor/github.com/gogo/protobuf/gogoproto:go_default_library",
"//vendor/github.com/gogo/protobuf/proto:go_default_library",

File diff suppressed because it is too large Load diff

View file

@ -331,6 +331,8 @@ message RemovePodSandboxResponse {}
message PodSandboxStatusRequest {
// ID of the PodSandbox for which to retrieve status.
string pod_sandbox_id = 1;
// Verbose indicates whether to return extra information about the pod sandbox.
bool verbose = 2;
}
// PodSandboxNetworkStatus is the status of the network for a PodSandbox.
@ -382,6 +384,11 @@ message PodSandboxStatus {
message PodSandboxStatusResponse {
// Status of the PodSandbox.
PodSandboxStatus status = 1;
// Info is extra information of the PodSandbox. The key could be abitrary string, and
// value should be in json format. The information could include anything useful for
// debug, e.g. network namespace for linux container based container runtime.
// It should only be returned non-empty when Verbose is true.
map<string, string> info = 2;
}
// PodSandboxStateValue is the wrapper of PodSandboxState.
@ -523,6 +530,7 @@ message LinuxContainerSecurityContext {
repeated int64 supplemental_groups = 8;
// AppArmor profile for the container, candidate values are:
// * runtime/default: equivalent to not specifying a profile.
// * unconfined: no profiles are loaded
// * localhost/<profile_name>: profile loaded on the node
// (localhost) by name. The possible profile names are detailed at
// http://wiki.apparmor.net/index.php/AppArmor_Core_Policy_Reference
@ -746,6 +754,8 @@ message ListContainersResponse {
message ContainerStatusRequest {
// ID of the container for which to retrieve status.
string container_id = 1;
// Verbose indicates whether to return extra information about the container.
bool verbose = 2;
}
// ContainerStatus represents the status of a container.
@ -790,6 +800,11 @@ message ContainerStatus {
message ContainerStatusResponse {
// Status of the container.
ContainerStatus status = 1;
// Info is extra information of the Container. The key could be abitrary string, and
// value should be in json format. The information could include anything useful for
// debug, e.g. pid for linux container based container runtime.
// It should only be returned non-empty when Verbose is true.
map<string, string> info = 2;
}
message UpdateContainerResourcesRequest {
@ -827,7 +842,17 @@ message ExecRequest {
// Whether to exec the command in a TTY.
bool tty = 3;
// Whether to stream stdin.
// One of `stdin`, `stdout`, and `stderr` MUST be true.
bool stdin = 4;
// Whether to stream stdout.
// One of `stdin`, `stdout`, and `stderr` MUST be true.
bool stdout = 5;
// Whether to stream stderr.
// One of `stdin`, `stdout`, and `stderr` MUST be true.
// If `tty` is true, `stderr` MUST be false. Multiplexing is not supported
// in this case. The output of stdout and stderr will be combined to a
// single stream.
bool stderr = 6;
}
message ExecResponse {
@ -839,10 +864,20 @@ message AttachRequest {
// ID of the container to which to attach.
string container_id = 1;
// Whether to stream stdin.
// One of `stdin`, `stdout`, and `stderr` MUST be true.
bool stdin = 2;
// Whether the process being attached is running in a TTY.
// This must match the TTY setting in the ContainerConfig.
bool tty = 3;
// Whether to stream stdout.
// One of `stdin`, `stdout`, and `stderr` MUST be true.
bool stdout = 4;
// Whether to stream stderr.
// One of `stdin`, `stdout`, and `stderr` MUST be true.
// If `tty` is true, `stderr` MUST be false. Multiplexing is not supported
// in this case. The output of stdout and stderr will be combined to a
// single stream.
bool stderr = 5;
}
message AttachResponse {
@ -899,11 +934,18 @@ message ListImagesResponse {
message ImageStatusRequest {
// Spec of the image.
ImageSpec image = 1;
// Verbose indicates whether to return extra information about the image.
bool verbose = 2;
}
message ImageStatusResponse {
// Status of the image.
Image image = 1;
// Info is extra information of the Image. The key could be abitrary string, and
// value should be in json format. The information could include anything useful
// for debug, e.g. image config for oci image based container runtime.
// It should only be returned non-empty when Verbose is true.
map<string, string> info = 2;
}
// AuthConfig contains authorization information for connecting to a registry.
@ -986,11 +1028,19 @@ message RuntimeStatus {
repeated RuntimeCondition conditions = 1;
}
message StatusRequest {}
message StatusRequest {
// Verbose indicates whether to return extra information about the runtime.
bool verbose = 1;
}
message StatusResponse {
// Status of the Runtime.
RuntimeStatus status = 1;
// Info is extra information of the Runtime. The key could be abitrary string, and
// value should be in json format. The information could include anything useful for
// debug, e.g. plugins used by the container runtime.
// It should only be returned non-empty when Verbose is true.
map<string, string> info = 2;
}
message ImageFsInfoRequest {}

View file

@ -25,3 +25,31 @@ const (
// NetworkReady means the runtime network is up and ready to accept containers which require network.
NetworkReady = "NetworkReady"
)
// LogStreamType is the type of the stream in CRI container log.
type LogStreamType string
const (
// Stdout is the stream type for stdout.
Stdout LogStreamType = "stdout"
// Stderr is the stream type for stderr.
Stderr LogStreamType = "stderr"
)
// LogTag is the tag of a log line in CRI container log.
// Currently defined log tags:
// * First tag: Partial/End - P/E.
// The field in the container log format can be extended to include multiple
// tags by using a delimiter, but changes should be rare. If it becomes clear
// that better extensibility is desired, a more extensible format (e.g., json)
// should be adopted as a replacement and/or addition.
type LogTag string
const (
// LogTagPartial means the line is part of multiple lines.
LogTagPartial LogTag = "P"
// LogTagFull means the line is a single full line or the end of multiple lines.
LogTagFull LogTag = "F"
// LogTagDelimiter is the delimiter for different log tags.
LogTagDelimiter = ":"
)

View file

@ -21,9 +21,10 @@ go_library(
],
"//conditions:default": [],
}),
importpath = "k8s.io/kubernetes/pkg/kubelet/container",
visibility = ["//visibility:public"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/kubelet/util/ioutils:go_default_library",
@ -57,10 +58,11 @@ go_test(
"ref_test.go",
"sync_result_test.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/container",
library = ":go_default_library",
deps = [
"//pkg/api:go_default_library",
"//pkg/api/install:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core/install:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -71,6 +73,7 @@ go_test(
go_test(
name = "go_default_xtest",
srcs = ["runtime_cache_test.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/container_test",
deps = [
":go_default_library",
"//pkg/kubelet/container/testing:go_default_library",

View file

@ -46,9 +46,9 @@ type HandlerRunner interface {
// RuntimeHelper wraps kubelet to make container runtime
// able to get necessary informations like the RunContainerOptions, DNS settings, Host IP.
type RuntimeHelper interface {
GenerateRunContainerOptions(pod *v1.Pod, container *v1.Container, podIP string) (contOpts *RunContainerOptions, useClusterFirstPolicy bool, err error)
GetClusterDNS(pod *v1.Pod) (dnsServers []string, dnsSearches []string, useClusterFirstPolicy bool, err error)
// GetPodCgroupParent returns the the CgroupName identifer, and its literal cgroupfs form on the host
GenerateRunContainerOptions(pod *v1.Pod, container *v1.Container, podIP string) (contOpts *RunContainerOptions, err error)
GetPodDNS(pod *v1.Pod) (dnsConfig *runtimeapi.DNSConfig, err error)
// GetPodCgroupParent returns the CgroupName identifer, and its literal cgroupfs form on the host
// of a pod.
GetPodCgroupParent(pod *v1.Pod) string
GetPodDir(podUID types.UID) string

View file

@ -21,7 +21,7 @@ import (
"k8s.io/api/core/v1"
ref "k8s.io/client-go/tools/reference"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/legacyscheme"
)
var ImplicitContainerPrefix string = "implicitly required container "
@ -39,7 +39,7 @@ func GenerateContainerRef(pod *v1.Pod, container *v1.Container) (*v1.ObjectRefer
// start (like the pod infra container). This is not a good way, ugh.
fieldPath = ImplicitContainerPrefix + container.Name
}
ref, err := ref.GetPartialReference(api.Scheme, pod, fieldPath)
ref, err := ref.GetPartialReference(legacyscheme.Scheme, pod, fieldPath)
if err != nil {
return nil, err
}

View file

@ -21,8 +21,8 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api"
_ "k8s.io/kubernetes/pkg/api/install"
"k8s.io/kubernetes/pkg/api/legacyscheme"
_ "k8s.io/kubernetes/pkg/apis/core/install"
)
func TestFieldPath(t *testing.T) {
@ -68,14 +68,14 @@ func TestGenerateContainerRef(t *testing.T) {
okPod = v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
APIVersion: legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: "ok",
Namespace: "test-ns",
UID: "bar",
ResourceVersion: "42",
SelfLink: "/api/" + api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String() + "/pods/foo",
SelfLink: "/api/" + legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion.String() + "/pods/foo",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
@ -92,7 +92,7 @@ func TestGenerateContainerRef(t *testing.T) {
noSelfLinkPod.Kind = ""
noSelfLinkPod.APIVersion = ""
noSelfLinkPod.ObjectMeta.SelfLink = ""
defaultedSelfLinkPod.ObjectMeta.SelfLink = "/api/" + api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String() + "/pods/ok"
defaultedSelfLinkPod.ObjectMeta.SelfLink = "/api/" + legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion.String() + "/pods/ok"
cases := []struct {
name string
@ -109,7 +109,7 @@ func TestGenerateContainerRef(t *testing.T) {
},
expected: &v1.ObjectReference{
Kind: "Pod",
APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
APIVersion: legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
Name: "ok",
Namespace: "test-ns",
UID: "bar",
@ -124,7 +124,7 @@ func TestGenerateContainerRef(t *testing.T) {
container: &v1.Container{},
expected: &v1.ObjectReference{
Kind: "Pod",
APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
APIVersion: legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
Name: "ok",
Namespace: "test-ns",
UID: "bar",
@ -148,7 +148,7 @@ func TestGenerateContainerRef(t *testing.T) {
},
expected: &v1.ObjectReference{
Kind: "Pod",
APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
APIVersion: legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
Name: "ok",
Namespace: "test-ns",
UID: "bar",
@ -165,7 +165,7 @@ func TestGenerateContainerRef(t *testing.T) {
},
expected: &v1.ObjectReference{
Kind: "Pod",
APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
APIVersion: legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
Name: "ok",
Namespace: "test-ns",
UID: "bar",

View file

@ -127,9 +127,8 @@ type Runtime interface {
// DirectStreamingRuntime is the interface implemented by runtimes for which the streaming calls
// (exec/attach/port-forward) should be served directly by the Kubelet.
type DirectStreamingRuntime interface {
// Runs the command in the container of the specified pod using nsenter.
// Attaches the processes stdin, stdout, and stderr. Optionally uses a
// tty.
// Runs the command in the container of the specified pod. Attaches
// the processes stdin, stdout, and stderr. Optionally uses a tty.
ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error
// Forward the specified port from the specified pod to the stream.
PortForward(pod *Pod, port int32, stream io.ReadWriteCloser) error
@ -429,10 +428,6 @@ type RunContainerOptions struct {
// this directory will be used to create and mount the log file to
// container.TerminationMessagePath
PodContainerDir string
// The list of DNS servers for the container to use.
DNS []string
// The list of DNS search domains.
DNSSearch []string
// The parent cgroup to pass to Docker
CgroupParent string
// The type of container rootfs
@ -451,9 +446,14 @@ type RunContainerOptions struct {
type VolumeInfo struct {
// Mounter is the volume's mounter
Mounter volume.Mounter
// BlockVolumeMapper is the Block volume's mapper
BlockVolumeMapper volume.BlockVolumeMapper
// SELinuxLabeled indicates whether this volume has had the
// pod's SELinux label applied to it or not
SELinuxLabeled bool
// Whether the volume permission is set to read-only or not
// This value is passed from volume.spec
ReadOnly bool
}
type VolumeMap map[string]VolumeInfo

View file

@ -19,14 +19,11 @@ package kubelet
import (
"crypto/tls"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
goruntime "runtime"
"sort"
"strings"
"sync"
@ -35,8 +32,6 @@ import (
"github.com/golang/glog"
clientgoclientset "k8s.io/client-go/kubernetes"
cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/api/core/v1"
@ -54,23 +49,22 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/certificate"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/integer"
"k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/pkg/api"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/features"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubeletconfigv1alpha1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/certificate"
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/configmap"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim"
"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction"
@ -82,6 +76,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/network/dns"
"k8s.io/kubernetes/pkg/kubelet/pleg"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/preemption"
@ -196,13 +191,35 @@ type Bootstrap interface {
// Builder creates and initializes a Kubelet instance
type Builder func(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *Dependencies,
crOptions *options.ContainerRuntimeOptions,
hostnameOverride,
nodeIP,
providerID,
cloudProvider,
certDirectory,
rootDirectory string) (Bootstrap, error)
crOptions *config.ContainerRuntimeOptions,
containerRuntime string,
runtimeCgroups string,
hostnameOverride string,
nodeIP string,
providerID string,
cloudProvider string,
certDirectory string,
rootDirectory string,
registerNode bool,
registerWithTaints []api.Taint,
allowedUnsafeSysctls []string,
containerized bool,
remoteRuntimeEndpoint string,
remoteImageEndpoint string,
experimentalMounterPath string,
experimentalKernelMemcgNotification bool,
experimentalCheckNodeCapabilitiesBeforeMount bool,
experimentalNodeAllocatableIgnoreEvictionThreshold bool,
minimumGCAge metav1.Duration,
maxPerPodContainerCount int32,
maxContainerCount int32,
masterServiceNamespace string,
registerSchedulable bool,
nonMasqueradeCIDR string,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
seccompProfileRoot string,
bootstrapCheckpointPath string) (Bootstrap, error)
// Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed
// at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
@ -235,11 +252,11 @@ type Dependencies struct {
CAdvisorInterface cadvisor.Interface
Cloud cloudprovider.Interface
ContainerManager cm.ContainerManager
DockerClient libdocker.Interface
DockerClientConfig *dockershim.ClientConfig
EventClient v1core.EventsGetter
HeartbeatClient v1core.CoreV1Interface
KubeClient clientset.Interface
ExternalKubeClient clientgoclientset.Interface
ExternalKubeClient clientset.Interface
Mounter mount.Interface
NetworkPlugins []network.NetworkPlugin
OOMAdjuster *oom.OOMAdjuster
@ -255,14 +272,14 @@ type Dependencies struct {
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName) (*config.PodConfig, error) {
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if kubeCfg.ManifestURLHeader != "" {
pieces := strings.Split(kubeCfg.ManifestURLHeader, ":")
if len(pieces) != 2 {
return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", kubeCfg.ManifestURLHeader)
if len(kubeCfg.ManifestURLHeader) > 0 {
for k, v := range kubeCfg.ManifestURLHeader {
for i := range v {
manifestURLHeader.Add(k, v[i])
}
}
manifestURLHeader.Set(pieces[0], pieces[1])
}
// source of all configuration
@ -270,7 +287,7 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
// define file config source
if kubeCfg.PodManifestPath != "" {
glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath)
glog.Infof("Adding manifest path: %v", kubeCfg.PodManifestPath)
config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
@ -279,19 +296,32 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader)
config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
// Restore from the checkpoint path
// NOTE: This MUST happen before creating the apiserver source
// below, or the checkpoint would override the source of truth.
updatechannel := cfg.Channel(kubetypes.ApiserverSource)
if bootstrapCheckpointPath != "" {
glog.Infof("Adding checkpoint path: %v", bootstrapCheckpointPath)
err := cfg.Restore(bootstrapCheckpointPath, updatechannel)
if err != nil {
return nil, err
}
}
if kubeDeps.KubeClient != nil {
glog.Infof("Watching apiserver")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
}
return cfg, nil
}
func getRuntimeAndImageServices(config *kubeletconfiginternal.KubeletConfiguration) (internalapi.RuntimeService, internalapi.ImageManagerService, error) {
rs, err := remote.NewRemoteRuntimeService(config.RemoteRuntimeEndpoint, config.RuntimeRequestTimeout.Duration)
func getRuntimeAndImageServices(remoteRuntimeEndpoint string, remoteImageEndpoint string, runtimeRequestTimeout metav1.Duration) (internalapi.RuntimeService, internalapi.ImageManagerService, error) {
rs, err := remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, runtimeRequestTimeout.Duration)
if err != nil {
return nil, nil, err
}
is, err := remote.NewRemoteImageService(config.RemoteImageEndpoint, config.RuntimeRequestTimeout.Duration)
is, err := remote.NewRemoteImageService(remoteImageEndpoint, runtimeRequestTimeout.Duration)
if err != nil {
return nil, nil, err
}
@ -302,13 +332,35 @@ func getRuntimeAndImageServices(config *kubeletconfiginternal.KubeletConfigurati
// No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *Dependencies,
crOptions *options.ContainerRuntimeOptions,
hostnameOverride,
nodeIP,
providerID,
cloudProvider,
certDirectory,
rootDirectory string) (*Kubelet, error) {
crOptions *config.ContainerRuntimeOptions,
containerRuntime string,
runtimeCgroups string,
hostnameOverride string,
nodeIP string,
providerID string,
cloudProvider string,
certDirectory string,
rootDirectory string,
registerNode bool,
registerWithTaints []api.Taint,
allowedUnsafeSysctls []string,
containerized bool,
remoteRuntimeEndpoint string,
remoteImageEndpoint string,
experimentalMounterPath string,
experimentalKernelMemcgNotification bool,
experimentalCheckNodeCapabilitiesBeforeMount bool,
experimentalNodeAllocatableIgnoreEvictionThreshold bool,
minimumGCAge metav1.Duration,
maxPerPodContainerCount int32,
maxContainerCount int32,
masterServiceNamespace string,
registerSchedulable bool,
nonMasqueradeCIDR string,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
seccompProfileRoot string,
bootstrapCheckpointPath string) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
@ -369,16 +421,16 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
if kubeDeps.PodConfig == nil {
var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
if err != nil {
return nil, err
}
}
containerGCPolicy := kubecontainer.ContainerGCPolicy{
MinAge: kubeCfg.MinimumGCAge.Duration,
MaxPerPodContainer: int(kubeCfg.MaxPerPodContainerCount),
MaxContainers: int(kubeCfg.MaxContainerCount),
MinAge: minimumGCAge.Duration,
MaxPerPodContainer: int(maxPerPodContainerCount),
MaxContainers: int(maxContainerCount),
}
daemonEndpoints := &v1.NodeDaemonEndpoints{
@ -392,7 +444,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
if kubeCfg.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
if experimentalNodeAllocatableIgnoreEvictionThreshold {
// Do not provide kubeCfg.EnforceNodeAllocatable to eviction threshold parsing if we are not enforcing Evictions
enforceNodeAllocatable = []string{}
}
@ -404,12 +456,12 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
KernelMemcgNotification: kubeCfg.ExperimentalKernelMemcgNotification,
KernelMemcgNotification: experimentalKernelMemcgNotification,
}
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
if kubeDeps.KubeClient != nil {
serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
go r.Run(wait.NeverStop)
}
@ -418,7 +470,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
if kubeDeps.KubeClient != nil {
fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
go r.Run(wait.NeverStop)
}
@ -448,6 +500,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
}
httpClient := &http.Client{}
parsedNodeIP := net.ParseIP(nodeIP)
klet := &Kubelet{
hostname: hostname,
@ -457,13 +510,13 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
rootDirectory: rootDirectory,
resyncInterval: kubeCfg.SyncFrequency.Duration,
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
registerNode: kubeCfg.RegisterNode,
registerSchedulable: kubeCfg.RegisterSchedulable,
clusterDomain: kubeCfg.ClusterDomain,
clusterDNS: clusterDNS,
registerNode: registerNode,
registerWithTaints: registerWithTaints,
registerSchedulable: registerSchedulable,
dnsConfigurer: dns.NewConfigurer(kubeDeps.Recorder, nodeRef, parsedNodeIP, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
serviceLister: serviceLister,
nodeInfo: nodeInfo,
masterServiceNamespace: kubeCfg.MasterServiceNamespace,
masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
recorder: kubeDeps.Recorder,
cadvisor: kubeDeps.CAdvisorInterface,
@ -472,28 +525,29 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
externalCloudProvider: cloudprovider.IsExternal(cloudProvider),
providerID: providerID,
nodeRef: nodeRef,
nodeLabels: kubeCfg.NodeLabels,
nodeLabels: nodeLabels,
nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
os: kubeDeps.OSInterface,
oomWatcher: oomWatcher,
cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
cgroupRoot: kubeCfg.CgroupRoot,
mounter: kubeDeps.Mounter,
writer: kubeDeps.Writer,
maxPods: int(kubeCfg.MaxPods),
podsPerCore: int(kubeCfg.PodsPerCore),
syncLoopMonitor: atomic.Value{},
resolverConfig: kubeCfg.ResolverConfig,
daemonEndpoints: daemonEndpoints,
containerManager: kubeDeps.ContainerManager,
nodeIP: net.ParseIP(nodeIP),
clock: clock.RealClock{},
os: kubeDeps.OSInterface,
oomWatcher: oomWatcher,
cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
cgroupRoot: kubeCfg.CgroupRoot,
mounter: kubeDeps.Mounter,
writer: kubeDeps.Writer,
maxPods: int(kubeCfg.MaxPods),
podsPerCore: int(kubeCfg.PodsPerCore),
syncLoopMonitor: atomic.Value{},
daemonEndpoints: daemonEndpoints,
containerManager: kubeDeps.ContainerManager,
containerRuntimeName: containerRuntime,
nodeIP: parsedNodeIP,
clock: clock.RealClock{},
enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
iptClient: utilipt.New(utilexec.New(), utildbus.New(), utilipt.ProtocolIpv4),
makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit),
iptablesDropBit: int(kubeCfg.IPTablesDropBit),
experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
}
secretManager := secret.NewCachingSecretManager(
@ -508,7 +562,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
glog.Infof("Experimental host user namespace defaulting is enabled.")
}
hairpinMode, err := effectiveHairpinMode(kubeletconfiginternal.HairpinMode(kubeCfg.HairpinMode), kubeCfg.ContainerRuntime, crOptions.NetworkPluginName)
hairpinMode, err := effectiveHairpinMode(kubeletconfiginternal.HairpinMode(kubeCfg.HairpinMode), containerRuntime, crOptions.NetworkPluginName)
if err != nil {
// This is a non-recoverable error. Returning it up the callstack will just
// lead to retries of the same failure, so just fail hard.
@ -516,17 +570,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
glog.Infof("Hairpin mode set to %q", hairpinMode)
// TODO(#36485) Remove this workaround once we fix the init-container issue.
// Touch iptables lock file, which will be shared among all processes accessing
// the iptables.
f, err := os.OpenFile(utilipt.LockfilePath16x, os.O_CREATE, 0600)
if err != nil {
glog.Warningf("Failed to open iptables lock file: %v", err)
} else if err = f.Close(); err != nil {
glog.Warningf("Failed to close iptables lock file: %v", err)
}
plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, crOptions.NetworkPluginName, &criNetworkHost{&networkHost{klet}, &network.NoopPortMappingGetter{}}, hairpinMode, kubeCfg.NonMasqueradeCIDR, int(crOptions.NetworkPluginMTU))
plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, crOptions.NetworkPluginName, &criNetworkHost{&networkHost{klet}, &network.NoopPortMappingGetter{}}, hairpinMode, nonMasqueradeCIDR, int(crOptions.NetworkPluginMTU))
if err != nil {
return nil, err
}
@ -545,24 +589,20 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager)
if kubeCfg.RemoteRuntimeEndpoint != "" {
// kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified
if kubeCfg.RemoteImageEndpoint == "" {
kubeCfg.RemoteImageEndpoint = kubeCfg.RemoteRuntimeEndpoint
if remoteRuntimeEndpoint != "" {
// remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified
if remoteImageEndpoint == "" {
remoteImageEndpoint = remoteRuntimeEndpoint
}
}
// TODO: These need to become arguments to a standalone docker shim.
binDir := crOptions.CNIBinDir
if binDir == "" {
binDir = crOptions.NetworkPluginDir
}
pluginSettings := dockershim.NetworkPluginSettings{
HairpinMode: hairpinMode,
NonMasqueradeCIDR: kubeCfg.NonMasqueradeCIDR,
NonMasqueradeCIDR: nonMasqueradeCIDR,
PluginName: crOptions.NetworkPluginName,
PluginConfDir: crOptions.CNIConfDir,
PluginBinDir: binDir,
PluginBinDir: crOptions.CNIBinDir,
MTU: int(crOptions.NetworkPluginMTU),
}
@ -575,20 +615,22 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
pluginSettings.LegacyRuntimeHost = nl
// rktnetes cannot be run with CRI.
if kubeCfg.ContainerRuntime != kubetypes.RktContainerRuntime {
if containerRuntime != kubetypes.RktContainerRuntime {
// kubelet defers to the runtime shim to setup networking. Setting
// this to nil will prevent it from trying to invoke the plugin.
// It's easier to always probe and initialize plugins till cri
// becomes the default.
klet.networkPlugin = nil
// if left at nil, that means it is unneeded
var legacyLogProvider kuberuntime.LegacyLogProvider
switch kubeCfg.ContainerRuntime {
switch containerRuntime {
case kubetypes.DockerContainerRuntime:
// Create and start the CRI shim running as a grpc server.
streamingConfig := getStreamingConfig(kubeCfg, kubeDeps)
ds, err := dockershim.NewDockerService(kubeDeps.DockerClient, crOptions.PodSandboxImage, streamingConfig,
&pluginSettings, kubeCfg.RuntimeCgroups, kubeCfg.CgroupDriver, crOptions.DockerExecHandlerName,
crOptions.DockershimRootDirectory, crOptions.DockerDisableSharedPID)
ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
&pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory,
crOptions.DockerDisableSharedPID)
if err != nil {
return nil, err
}
@ -601,29 +643,30 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
// The unix socket for kubelet <-> dockershim communication.
glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
kubeCfg.RemoteRuntimeEndpoint,
kubeCfg.RemoteImageEndpoint)
remoteRuntimeEndpoint,
remoteImageEndpoint)
glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
server := dockerremote.NewDockerServer(kubeCfg.RemoteRuntimeEndpoint, ds)
server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
if err := server.Start(); err != nil {
return nil, err
}
// Create dockerLegacyService when the logging driver is not supported.
supported, err := dockershim.IsCRISupportedLogDriver(kubeDeps.DockerClient)
supported, err := ds.IsCRISupportedLogDriver()
if err != nil {
return nil, err
}
if !supported {
klet.dockerLegacyService = dockershim.NewDockerLegacyService(kubeDeps.DockerClient)
klet.dockerLegacyService = ds.NewDockerLegacyService()
legacyLogProvider = dockershim.NewLegacyLogProvider(klet.dockerLegacyService)
}
case kubetypes.RemoteContainerRuntime:
// No-op.
break
default:
return nil, fmt.Errorf("unsupported CRI runtime: %q", kubeCfg.ContainerRuntime)
return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
}
runtimeService, imageService, err := getRuntimeAndImageServices(kubeCfg)
runtimeService, imageService, err := getRuntimeAndImageServices(remoteRuntimeEndpoint, remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout)
if err != nil {
return nil, err
}
@ -631,7 +674,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
kubeCfg.SeccompProfileRoot,
seccompProfileRoot,
containerRefManager,
machineInfo,
klet,
@ -646,6 +689,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
runtimeService,
imageService,
kubeDeps.ContainerManager.InternalContainerLifecycle(),
legacyLogProvider,
)
if err != nil {
return nil, err
@ -653,14 +697,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.containerRuntime = runtime
klet.runner = runtime
// CRI integrations should get container metrics via CRI. Docker
// uses the built-in cadvisor to gather such metrics on Linux for
// historical reasons.
// cri-o relies on cadvisor as a temporary workaround. The code should
// be removed. Related issue:
// https://github.com/kubernetes/kubernetes/issues/51798
if (kubeCfg.ContainerRuntime == kubetypes.DockerContainerRuntime &&
goruntime.GOOS == "linux") || kubeCfg.RemoteRuntimeEndpoint == "/var/run/crio.sock" {
if cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint) {
klet.StatsProvider = stats.NewCadvisorStatsProvider(
klet.cadvisor,
klet.resourceAnalyzer,
@ -753,7 +790,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
ips = append(ips, cloudIPs...)
names := append([]string{klet.GetHostname(), hostnameOverride}, cloudNames...)
klet.serverCertificateManager, err = certificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, ips, names, certDirectory)
klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, ips, names, certDirectory)
if err != nil {
return nil, fmt.Errorf("failed to initialize certificate manager: %v", err)
}
@ -781,11 +818,11 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
// If the experimentalMounterPathFlag is set, we do not want to
// check node capabilities since the mount path is not the default
if len(kubeCfg.ExperimentalMounterPath) != 0 {
kubeCfg.ExperimentalCheckNodeCapabilitiesBeforeMount = false
if len(experimentalMounterPath) != 0 {
experimentalCheckNodeCapabilitiesBeforeMount = false
// Replace the nameserver in containerized-mounter's rootfs/etc/resolve.conf with kubelet.ClusterDNS
// so that service name could be resolved
klet.setupDNSinContainerizedMounter(kubeCfg.ExperimentalMounterPath)
klet.dnsConfigurer.SetupDNSinContainerizedMounter(experimentalMounterPath)
}
// setup volumeManager
@ -800,8 +837,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps.Mounter,
klet.getPodsDir(),
kubeDeps.Recorder,
kubeCfg.ExperimentalCheckNodeCapabilitiesBeforeMount,
kubeCfg.KeepTerminatedPodVolumes)
experimentalCheckNodeCapabilitiesBeforeMount,
keepTerminatedPodVolumes)
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
@ -833,7 +870,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
// Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec
// Hence, we concatenate those two lists.
safeAndUnsafeSysctls := append(sysctl.SafeSysctlWhitelist(), kubeCfg.AllowedUnsafeSysctls...)
safeAndUnsafeSysctls := append(sysctl.SafeSysctlWhitelist(), allowedUnsafeSysctls...)
unsafeWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls, v1.UnsafeSysctlsPodAnnotationKey)
if err != nil {
return nil, err
@ -851,18 +888,18 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.AddPodSyncHandler(activeDeadlineHandler)
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler))
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
// apply functional Option's
for _, opt := range kubeDeps.Options {
opt(klet)
}
klet.appArmorValidator = apparmor.NewValidator(kubeCfg.ContainerRuntime)
klet.appArmorValidator = apparmor.NewValidator(containerRuntime)
klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewNoNewPrivsAdmitHandler(klet.containerRuntime))
if utilfeature.DefaultFeatureGate.Enabled(features.Accelerators) {
if kubeCfg.ContainerRuntime == kubetypes.DockerContainerRuntime {
if klet.gpuManager, err = nvidia.NewNvidiaGPUManager(klet, kubeDeps.DockerClient); err != nil {
if containerRuntime == kubetypes.DockerContainerRuntime {
if klet.gpuManager, err = nvidia.NewNvidiaGPUManager(klet, kubeDeps.DockerClientConfig); err != nil {
return nil, err
}
} else {
@ -922,16 +959,15 @@ type Kubelet struct {
// Set to true to have the node register itself with the apiserver.
registerNode bool
// List of taints to add to a node object when the kubelet registers itself.
registerWithTaints []api.Taint
// Set to true to have the node register itself as schedulable.
registerSchedulable bool
// for internal book keeping; access only from within registerWithApiserver
registrationCompleted bool
// If non-empty, use this for container DNS search.
clusterDomain string
// If non-nil, use this for container DNS server.
clusterDNS []net.IP
// dnsConfigurer is used for setting up DNS resolver configuration when launching pods.
dnsConfigurer *dns.Configurer
// masterServiceNamespace is the namespace that the master service is exposed in.
masterServiceNamespace string
@ -1006,6 +1042,9 @@ type Kubelet struct {
// Reference to this node.
nodeRef *v1.ObjectReference
// The name of the container runtime
containerRuntimeName string
// Container runtime.
containerRuntime kubecontainer.Runtime
@ -1073,11 +1112,6 @@ type Kubelet struct {
// Channel for sending pods to kill.
podKillingCh chan *kubecontainer.PodPair
// The configuration file used as the base to generate the container's
// DNS resolver configuration file. This can be used in conjunction with
// clusterDomain and clusterDNS.
resolverConfig string
// Information about the ports which are opened by daemons on Node running this Kubelet server.
daemonEndpoints *v1.NodeDaemonEndpoints
@ -1158,6 +1192,13 @@ type Kubelet struct {
// StatsProvider provides the node and the container stats.
*stats.StatsProvider
// containerized should be set to true if the kubelet is running in a container
containerized bool
// This flag, if set, instructs the kubelet to keep volumes from terminated pods mounted to the node.
// This can be useful for debugging volume related issues.
keepTerminatedPodVolumes bool // DEPRECATED
}
func allLocalIPsWithoutLoopback() ([]net.IP, error) {
@ -1278,7 +1319,7 @@ func (kl *Kubelet) initializeModules() error {
return fmt.Errorf("Kubelet failed to get node info: %v", err)
}
if err := kl.containerManager.Start(node, kl.GetActivePods, kl.statusManager, kl.runtimeService); err != nil {
if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil {
return fmt.Errorf("Failed to start ContainerManager %v", err)
}
@ -1344,8 +1385,8 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
// Start gorouting responsible for checking limits in resolv.conf
if kl.resolverConfig != "" {
go wait.Until(func() { kl.checkLimitsForResolvConf() }, 30*time.Second, wait.NeverStop)
if kl.dnsConfigurer.ResolverConfig != "" {
go wait.Until(func() { kl.dnsConfigurer.CheckLimitsForResolvConf() }, 30*time.Second, wait.NeverStop)
}
// Start component sync loops.
@ -1364,64 +1405,6 @@ func (kl *Kubelet) GetKubeClient() clientset.Interface {
return kl.kubeClient
}
// GetClusterDNS returns a list of the DNS servers and a list of the DNS search
// domains of the cluster.
func (kl *Kubelet) GetClusterDNS(pod *v1.Pod) ([]string, []string, bool, error) {
var hostDNS, hostSearch []string
// Get host DNS settings
if kl.resolverConfig != "" {
f, err := os.Open(kl.resolverConfig)
if err != nil {
return nil, nil, false, err
}
defer f.Close()
hostDNS, hostSearch, err = kl.parseResolvConf(f)
if err != nil {
return nil, nil, false, err
}
}
useClusterFirstPolicy := ((pod.Spec.DNSPolicy == v1.DNSClusterFirst && !kubecontainer.IsHostNetworkPod(pod)) || pod.Spec.DNSPolicy == v1.DNSClusterFirstWithHostNet)
if useClusterFirstPolicy && len(kl.clusterDNS) == 0 {
// clusterDNS is not known.
// pod with ClusterDNSFirst Policy cannot be created
kl.recorder.Eventf(pod, v1.EventTypeWarning, "MissingClusterDNS", "kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy)
log := fmt.Sprintf("kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. pod: %q. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy, format.Pod(pod))
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, "MissingClusterDNS", log)
// fallback to DNSDefault
useClusterFirstPolicy = false
}
if !useClusterFirstPolicy {
// When the kubelet --resolv-conf flag is set to the empty string, use
// DNS settings that override the docker default (which is to use
// /etc/resolv.conf) and effectively disable DNS lookups. According to
// the bind documentation, the behavior of the DNS client library when
// "nameservers" are not specified is to "use the nameserver on the
// local machine". A nameserver setting of localhost is equivalent to
// this documented behavior.
if kl.resolverConfig == "" {
hostDNS = []string{"127.0.0.1"}
hostSearch = []string{"."}
} else {
hostSearch = kl.formDNSSearchForDNSDefault(hostSearch, pod)
}
return hostDNS, hostSearch, useClusterFirstPolicy, nil
}
// for a pod with DNSClusterFirst policy, the cluster DNS server is the only nameserver configured for
// the pod. The cluster DNS server itself will forward queries to other nameservers that is configured to use,
// in case the cluster DNS server cannot resolve the DNS query itself
dns := make([]string, len(kl.clusterDNS))
for i, ip := range kl.clusterDNS {
dns[i] = ip.String()
}
dnsSearch := kl.formDNSSearch(hostSearch, pod)
return dns, dnsSearch, useClusterFirstPolicy, nil
}
// syncPod is the transaction script for the sync of a single pod.
//
// Arguments:
@ -1445,6 +1428,10 @@ func (kl *Kubelet) GetClusterDNS(pod *v1.Pod) ([]string, []string, bool, error)
//
// If any step of this workflow errors, the error is returned, and is repeated
// on the next syncPod call.
//
// This operation writes all events that are dispatched in order to provide
// the most accurate information possible about an error situation to aid debugging.
// Callers should not throw an event if this operation returns an error.
func (kl *Kubelet) syncPod(o syncPodOptions) error {
// pull out the required options
pod := o.pod
@ -1462,6 +1449,7 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// we kill the pod with the specified grace period since this is a termination
if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
// there was an error killing the pod, so we return that error directly
utilruntime.HandleError(err)
return err
@ -1528,6 +1516,7 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
var syncErr error
if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
syncErr = fmt.Errorf("error killing pod: %v", err)
utilruntime.HandleError(syncErr)
} else {
@ -1542,6 +1531,7 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
// If the network plugin is not ready, only start the pod if it uses the host network
if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "network is not ready: %v", rs)
return fmt.Errorf("network is not ready: %v", rs)
}
@ -1584,6 +1574,7 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
glog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
}
if err := pcm.EnsureExists(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
}
}
@ -1607,15 +1598,21 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
}
}
if mirrorPod == nil || deleted {
glog.V(3).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
glog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
node, err := kl.GetNode()
if err != nil || node.DeletionTimestamp != nil {
glog.V(4).Infof("No need to create a mirror pod, since node %q has been removed from the cluster", kl.nodeName)
} else {
glog.V(4).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
glog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
}
}
}
}
// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
glog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
return err
}
@ -1637,6 +1634,8 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
// Do not record an event here, as we keep all event logging for sync pod failures
// local to container runtime so we get better errors
return err
}
@ -1853,17 +1852,28 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
glog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
// These are pods restored from the checkpoint. Treat them as new
// pods.
handler.HandlePodAdditions(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
glog.Errorf("Kubelet does not support snapshot update")
}
// Mark the source ready after receiving at least one update from the
// source. Once all the sources are marked ready, various cleanup
// routines will start reclaiming resources. It is important that this
// takes place only after kubelet calls the update handler to process
// the update to ensure the internal pod cache is up-to-date.
kl.sourcesReady.AddSource(u.Source)
if u.Op != kubetypes.RESTORE {
// If the update type is RESTORE, it means that the update is from
// the pod checkpoints and may be incomplete. Do not mark the
// source as ready.
// Mark the source ready after receiving at least one update from the
// source. Once all the sources are marked ready, various cleanup
// routines will start reclaiming resources. It is important that this
// takes place only after kubelet calls the update handler to process
// the update to ensure the internal pod cache is up-to-date.
kl.sourcesReady.AddSource(u.Source)
}
case e := <-plegCh:
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
@ -1888,7 +1898,7 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
break
}
glog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
kl.HandlePodSyncs(podsToSync)
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
// The liveness manager detected a failure; sync the pod.
@ -2081,7 +2091,7 @@ func (kl *Kubelet) updateRuntimeUp() {
}
// rkt uses the legacy, non-CRI integration. Don't check the runtime
// conditions for it.
if kl.kubeletConfiguration.ContainerRuntime != kubetypes.RktContainerRuntime {
if kl.containerRuntimeName != kubetypes.RktContainerRuntime {
if s == nil {
glog.Errorf("Container runtime status is nil")
return
@ -2171,36 +2181,6 @@ func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID str
}
}
// Replace the nameserver in containerized-mounter's rootfs/etc/resolve.conf with kubelet.ClusterDNS
func (kl *Kubelet) setupDNSinContainerizedMounter(mounterPath string) {
resolvePath := filepath.Join(strings.TrimSuffix(mounterPath, "/mounter"), "rootfs", "etc", "resolv.conf")
dnsString := ""
for _, dns := range kl.clusterDNS {
dnsString = dnsString + fmt.Sprintf("nameserver %s\n", dns)
}
if kl.resolverConfig != "" {
f, err := os.Open(kl.resolverConfig)
defer f.Close()
if err != nil {
glog.Error("Could not open resolverConf file")
} else {
_, hostSearch, err := kl.parseResolvConf(f)
if err != nil {
glog.Errorf("Error for parsing the reslov.conf file: %v", err)
} else {
dnsString = dnsString + "search"
for _, search := range hostSearch {
dnsString = dnsString + fmt.Sprintf(" %s", search)
}
dnsString = dnsString + "\n"
}
}
}
if err := ioutil.WriteFile(resolvePath, []byte(dnsString), 0600); err != nil {
glog.Errorf("Could not write dns nameserver in file %s, with error %v", resolvePath, err)
}
}
// isSyncPodWorthy filters out events that are not worthy of pod syncing
func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
// ContatnerRemoved doesn't affect pod state

View file

@ -27,8 +27,8 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
utilfile "k8s.io/kubernetes/pkg/util/file"
utilnode "k8s.io/kubernetes/pkg/util/node"
@ -46,7 +46,7 @@ func (kl *Kubelet) getRootDir() string {
// getPodsDir returns the full path to the directory under which pod
// directories are created.
func (kl *Kubelet) getPodsDir() string {
return filepath.Join(kl.getRootDir(), options.DefaultKubeletPodsDirName)
return filepath.Join(kl.getRootDir(), config.DefaultKubeletPodsDirName)
}
// getPluginsDir returns the full path to the directory under which plugin
@ -54,7 +54,7 @@ func (kl *Kubelet) getPodsDir() string {
// they need to persist. Plugins should create subdirectories under this named
// after their own names.
func (kl *Kubelet) getPluginsDir() string {
return filepath.Join(kl.getRootDir(), options.DefaultKubeletPluginsDirName)
return filepath.Join(kl.getRootDir(), config.DefaultKubeletPluginsDirName)
}
// getPluginDir returns a data directory name for a given plugin name.
@ -64,6 +64,21 @@ func (kl *Kubelet) getPluginDir(pluginName string) string {
return filepath.Join(kl.getPluginsDir(), pluginName)
}
// getVolumeDevicePluginsDir returns the full path to the directory under which plugin
// directories are created. Plugins can use these directories for data that
// they need to persist. Plugins should create subdirectories under this named
// after their own names.
func (kl *Kubelet) getVolumeDevicePluginsDir() string {
return filepath.Join(kl.getRootDir(), config.DefaultKubeletPluginsDirName)
}
// getVolumeDevicePluginDir returns a data directory name for a given plugin name.
// Plugins can use these directories to store data that they need to persist.
// For per-pod plugin data, see getVolumeDevicePluginsDir.
func (kl *Kubelet) getVolumeDevicePluginDir(pluginName string) string {
return filepath.Join(kl.getVolumeDevicePluginsDir(), pluginName, config.DefaultKubeletVolumeDevicesDirName)
}
// GetPodDir returns the full path to the per-pod data directory for the
// specified pod. This directory may not exist if the pod does not exist.
func (kl *Kubelet) GetPodDir(podUID types.UID) string {
@ -80,7 +95,7 @@ func (kl *Kubelet) getPodDir(podUID types.UID) string {
// which volumes are created for the specified pod. This directory may not
// exist if the pod does not exist.
func (kl *Kubelet) getPodVolumesDir(podUID types.UID) string {
return filepath.Join(kl.getPodDir(podUID), options.DefaultKubeletVolumesDirName)
return filepath.Join(kl.getPodDir(podUID), config.DefaultKubeletVolumesDirName)
}
// getPodVolumeDir returns the full path to the directory which represents the
@ -90,11 +105,24 @@ func (kl *Kubelet) getPodVolumeDir(podUID types.UID, pluginName string, volumeNa
return filepath.Join(kl.getPodVolumesDir(podUID), pluginName, volumeName)
}
// getPodVolumeDevicesDir returns the full path to the per-pod data directory under
// which volumes are created for the specified pod. This directory may not
// exist if the pod does not exist.
func (kl *Kubelet) getPodVolumeDevicesDir(podUID types.UID) string {
return filepath.Join(kl.getPodDir(podUID), config.DefaultKubeletVolumeDevicesDirName)
}
// getPodVolumeDeviceDir returns the full path to the directory which represents the
// named plugin for specified pod. This directory may not exist if the pod does not exist.
func (kl *Kubelet) getPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
return filepath.Join(kl.getPodVolumeDevicesDir(podUID), pluginName)
}
// getPodPluginsDir returns the full path to the per-pod data directory under
// which plugins may store data for the specified pod. This directory may not
// exist if the pod does not exist.
func (kl *Kubelet) getPodPluginsDir(podUID types.UID) string {
return filepath.Join(kl.getPodDir(podUID), options.DefaultKubeletPluginsDirName)
return filepath.Join(kl.getPodDir(podUID), config.DefaultKubeletPluginsDirName)
}
// getPodPluginDir returns a data directory name for a given plugin name for a
@ -108,7 +136,7 @@ func (kl *Kubelet) getPodPluginDir(podUID types.UID, pluginName string) string {
// which container data is held for the specified pod. This directory may not
// exist if the pod or container does not exist.
func (kl *Kubelet) getPodContainerDir(podUID types.UID, ctrName string) string {
return filepath.Join(kl.getPodDir(podUID), options.DefaultKubeletContainersDirName, ctrName)
return filepath.Join(kl.getPodDir(podUID), config.DefaultKubeletContainersDirName, ctrName)
}
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
@ -219,7 +247,7 @@ func (kl *Kubelet) getPodVolumePathListFromDisk(podUID types.UID) ([]string, err
if pathExists, pathErr := volumeutil.PathExists(podVolDir); pathErr != nil {
return volumes, fmt.Errorf("Error checking if path %q exists: %v", podVolDir, pathErr)
} else if !pathExists {
glog.Warningf("Warning: path %q does not exist: %q", podVolDir)
glog.Warningf("Path %q does not exist", podVolDir)
return volumes, nil
}

View file

@ -18,14 +18,13 @@ package kubelet
import (
"fmt"
"io"
"io/ioutil"
"os"
"strings"
"github.com/golang/glog"
"k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
@ -46,6 +45,77 @@ const (
KubeFirewallChain utiliptables.Chain = "KUBE-FIREWALL"
)
// This just exports required functions from kubelet proper, for use by network
// plugins.
// TODO(#35457): get rid of this backchannel to the kubelet. The scope of
// the back channel is restricted to host-ports/testing, and restricted
// to kubenet. No other network plugin wrapper needs it. Other plugins
// only require a way to access namespace information, which they can do
// directly through the methods implemented by criNetworkHost.
type networkHost struct {
kubelet *Kubelet
}
func (nh *networkHost) GetPodByName(name, namespace string) (*v1.Pod, bool) {
return nh.kubelet.GetPodByName(name, namespace)
}
func (nh *networkHost) GetKubeClient() clientset.Interface {
return nh.kubelet.kubeClient
}
func (nh *networkHost) GetRuntime() kubecontainer.Runtime {
return nh.kubelet.GetRuntime()
}
func (nh *networkHost) SupportsLegacyFeatures() bool {
return true
}
// criNetworkHost implements the part of network.Host required by the
// cri (NamespaceGetter). It leechs off networkHost for all other
// methods, because networkHost is slated for deletion.
type criNetworkHost struct {
*networkHost
// criNetworkHost currently support legacy features. Hence no need to support PortMappingGetter
*network.NoopPortMappingGetter
}
// GetNetNS returns the network namespace of the given containerID.
// This method satisfies the network.NamespaceGetter interface for
// networkHost. It's only meant to be used from network plugins
// that are directly invoked by the kubelet (aka: legacy, pre-cri).
// Any network plugin invoked by a cri must implement NamespaceGetter
// to talk directly to the runtime instead.
func (c *criNetworkHost) GetNetNS(containerID string) (string, error) {
return c.kubelet.GetRuntime().GetNetNS(kubecontainer.ContainerID{Type: "", ID: containerID})
}
// NoOpLegacyHost implements the network.LegacyHost interface for the remote
// runtime shim by just returning empties. It doesn't support legacy features
// like host port and bandwidth shaping.
type NoOpLegacyHost struct{}
// GetPodByName always returns "nil, true" for 'NoOpLegacyHost'
func (n *NoOpLegacyHost) GetPodByName(namespace, name string) (*v1.Pod, bool) {
return nil, true
}
// GetKubeClient always returns "nil" for 'NoOpLegacyHost'
func (n *NoOpLegacyHost) GetKubeClient() clientset.Interface {
return nil
}
// GetRuntime always returns "nil" for 'NoOpLegacyHost'
func (n *NoOpLegacyHost) GetRuntime() kubecontainer.Runtime {
return nil
}
// SupportsLegacyFeatures always returns "false" for 'NoOpLegacyHost'
func (n *NoOpLegacyHost) SupportsLegacyFeatures() bool {
return false
}
// effectiveHairpinMode determines the effective hairpin mode given the
// configured mode, container runtime, and whether cbr0 should be configured.
func effectiveHairpinMode(hairpinMode kubeletconfig.HairpinMode, containerRuntime string, networkPlugin string) (kubeletconfig.HairpinMode, error) {
@ -89,156 +159,6 @@ func (kl *Kubelet) providerRequiresNetworkingConfiguration() bool {
return supported
}
func omitDuplicates(kl *Kubelet, pod *v1.Pod, combinedSearch []string) []string {
uniqueDomains := map[string]bool{}
for _, dnsDomain := range combinedSearch {
if _, exists := uniqueDomains[dnsDomain]; !exists {
combinedSearch[len(uniqueDomains)] = dnsDomain
uniqueDomains[dnsDomain] = true
}
}
return combinedSearch[:len(uniqueDomains)]
}
func formDNSSearchFitsLimits(kl *Kubelet, pod *v1.Pod, composedSearch []string) []string {
// resolver file Search line current limitations
resolvSearchLineDNSDomainsLimit := 6
resolvSearchLineLenLimit := 255
limitsExceeded := false
if len(composedSearch) > resolvSearchLineDNSDomainsLimit {
composedSearch = composedSearch[:resolvSearchLineDNSDomainsLimit]
limitsExceeded = true
}
if resolvSearchhLineStrLen := len(strings.Join(composedSearch, " ")); resolvSearchhLineStrLen > resolvSearchLineLenLimit {
cutDomainsNum := 0
cutDoaminsLen := 0
for i := len(composedSearch) - 1; i >= 0; i-- {
cutDoaminsLen += len(composedSearch[i]) + 1
cutDomainsNum++
if (resolvSearchhLineStrLen - cutDoaminsLen) <= resolvSearchLineLenLimit {
break
}
}
composedSearch = composedSearch[:(len(composedSearch) - cutDomainsNum)]
limitsExceeded = true
}
if limitsExceeded {
log := fmt.Sprintf("Search Line limits were exceeded, some dns names have been omitted, the applied search line is: %s", strings.Join(composedSearch, " "))
kl.recorder.Event(pod, v1.EventTypeWarning, "DNSSearchForming", log)
glog.Error(log)
}
return composedSearch
}
func (kl *Kubelet) formDNSSearchForDNSDefault(hostSearch []string, pod *v1.Pod) []string {
return formDNSSearchFitsLimits(kl, pod, hostSearch)
}
func (kl *Kubelet) formDNSSearch(hostSearch []string, pod *v1.Pod) []string {
if kl.clusterDomain == "" {
formDNSSearchFitsLimits(kl, pod, hostSearch)
return hostSearch
}
nsSvcDomain := fmt.Sprintf("%s.svc.%s", pod.Namespace, kl.clusterDomain)
svcDomain := fmt.Sprintf("svc.%s", kl.clusterDomain)
dnsSearch := []string{nsSvcDomain, svcDomain, kl.clusterDomain}
combinedSearch := append(dnsSearch, hostSearch...)
combinedSearch = omitDuplicates(kl, pod, combinedSearch)
return formDNSSearchFitsLimits(kl, pod, combinedSearch)
}
func (kl *Kubelet) checkLimitsForResolvConf() {
// resolver file Search line current limitations
resolvSearchLineDNSDomainsLimit := 6
resolvSearchLineLenLimit := 255
f, err := os.Open(kl.resolverConfig)
if err != nil {
kl.recorder.Event(kl.nodeRef, v1.EventTypeWarning, "checkLimitsForResolvConf", err.Error())
glog.Error("checkLimitsForResolvConf: " + err.Error())
return
}
defer f.Close()
_, hostSearch, err := kl.parseResolvConf(f)
if err != nil {
kl.recorder.Event(kl.nodeRef, v1.EventTypeWarning, "checkLimitsForResolvConf", err.Error())
glog.Error("checkLimitsForResolvConf: " + err.Error())
return
}
domainCntLimit := resolvSearchLineDNSDomainsLimit
if kl.clusterDomain != "" {
domainCntLimit -= 3
}
if len(hostSearch) > domainCntLimit {
log := fmt.Sprintf("Resolv.conf file '%s' contains search line consisting of more than %d domains!", kl.resolverConfig, domainCntLimit)
kl.recorder.Event(kl.nodeRef, v1.EventTypeWarning, "checkLimitsForResolvConf", log)
glog.Error("checkLimitsForResolvConf: " + log)
return
}
if len(strings.Join(hostSearch, " ")) > resolvSearchLineLenLimit {
log := fmt.Sprintf("Resolv.conf file '%s' contains search line which length is more than allowed %d chars!", kl.resolverConfig, resolvSearchLineLenLimit)
kl.recorder.Event(kl.nodeRef, v1.EventTypeWarning, "checkLimitsForResolvConf", log)
glog.Error("checkLimitsForResolvConf: " + log)
return
}
return
}
// parseResolveConf reads a resolv.conf file from the given reader, and parses
// it into nameservers and searches, possibly returning an error.
// TODO: move to utility package
func (kl *Kubelet) parseResolvConf(reader io.Reader) (nameservers []string, searches []string, err error) {
file, err := ioutil.ReadAll(reader)
if err != nil {
return nil, nil, err
}
// Lines of the form "nameserver 1.2.3.4" accumulate.
nameservers = []string{}
// Lines of the form "search example.com" overrule - last one wins.
searches = []string{}
lines := strings.Split(string(file), "\n")
for l := range lines {
trimmed := strings.TrimSpace(lines[l])
if strings.HasPrefix(trimmed, "#") {
continue
}
fields := strings.Fields(trimmed)
if len(fields) == 0 {
continue
}
if fields[0] == "nameserver" && len(fields) >= 2 {
nameservers = append(nameservers, fields[1])
}
if fields[0] == "search" {
searches = fields[1:]
}
}
// There used to be code here to scrub DNS for each cloud, but doesn't
// make sense anymore since cloudproviders are being factored out.
// contact @thockin or @wlan0 for more information
return nameservers, searches, nil
}
// syncNetworkStatus updates the network state
func (kl *Kubelet) syncNetworkStatus() {
// For cri integration, network state will be updated in updateRuntimeUp,
@ -361,3 +281,10 @@ func getIPTablesMark(bit int) string {
value := 1 << uint(bit)
return fmt.Sprintf("%#08x/%#08x", value, value)
}
// GetPodDNS returns DNS setttings for the pod.
// This function is defined in kubecontainer.RuntimeHelper interface so we
// have to implement it.
func (kl *Kubelet) GetPodDNS(pod *v1.Pod) (*runtimeapi.DNSConfig, error) {
return kl.dnsConfigurer.GetPodDNS(pod)
}

View file

@ -19,38 +19,132 @@ package kubelet
import (
"fmt"
"net"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
)
func TestNodeIPParam(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
func TestNetworkHostGetsPodNotFound(t *testing.T) {
testKubelet := newTestKubelet(t, true)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
tests := []struct {
nh := networkHost{testKubelet.kubelet}
actualPod, _ := nh.GetPodByName("", "")
if actualPod != nil {
t.Fatalf("Was expected nil, received %v instead", actualPod)
}
}
func TestNetworkHostGetsKubeClient(t *testing.T) {
testKubelet := newTestKubelet(t, true)
defer testKubelet.Cleanup()
nh := networkHost{testKubelet.kubelet}
if nh.GetKubeClient() != testKubelet.fakeKubeClient {
t.Fatalf("NetworkHost client does not match testKubelet's client")
}
}
func TestNetworkHostGetsRuntime(t *testing.T) {
testKubelet := newTestKubelet(t, true)
defer testKubelet.Cleanup()
nh := networkHost{testKubelet.kubelet}
if nh.GetRuntime() != testKubelet.fakeRuntime {
t.Fatalf("NetworkHost runtime does not match testKubelet's runtime")
}
}
func TestNetworkHostSupportsLegacyFeatures(t *testing.T) {
testKubelet := newTestKubelet(t, true)
defer testKubelet.Cleanup()
nh := networkHost{testKubelet.kubelet}
if nh.SupportsLegacyFeatures() == false {
t.Fatalf("SupportsLegacyFeatures should not be false")
}
}
func TestNoOpHostGetsName(t *testing.T) {
nh := NoOpLegacyHost{}
pod, err := nh.GetPodByName("", "")
if pod != nil && err != true {
t.Fatalf("noOpLegacyHost getpodbyname expected to be nil and true")
}
}
func TestNoOpHostGetsKubeClient(t *testing.T) {
nh := NoOpLegacyHost{}
if nh.GetKubeClient() != nil {
t.Fatalf("noOpLegacyHost client expected to be nil")
}
}
func TestNoOpHostGetsRuntime(t *testing.T) {
nh := NoOpLegacyHost{}
if nh.GetRuntime() != nil {
t.Fatalf("noOpLegacyHost runtime expected to be nil")
}
}
func TestNoOpHostSupportsLegacyFeatures(t *testing.T) {
nh := NoOpLegacyHost{}
if nh.SupportsLegacyFeatures() != false {
t.Fatalf("noOpLegacyHost legacy features expected to be false")
}
}
func TestNodeIPParam(t *testing.T) {
type test struct {
nodeIP string
success bool
testName string
}{
}
tests := []test{
{
nodeIP: "",
success: true,
success: false,
testName: "IP not set",
},
{
nodeIP: "127.0.0.1",
success: false,
testName: "loopback address",
testName: "IPv4 loopback address",
},
{
nodeIP: "FE80::0202:B3FF:FE1E:8329",
nodeIP: "::1",
success: false,
testName: "IPv6 address",
testName: "IPv6 loopback address",
},
{
nodeIP: "224.0.0.1",
success: false,
testName: "multicast IPv4 address",
},
{
nodeIP: "ff00::1",
success: false,
testName: "multicast IPv6 address",
},
{
nodeIP: "169.254.0.1",
success: false,
testName: "IPv4 link-local unicast address",
},
{
nodeIP: "fe80::0202:b3ff:fe1e:8329",
success: false,
testName: "IPv6 link-local unicast address",
},
{
nodeIP: "0.0.0.0",
success: false,
testName: "Unspecified IPv4 address",
},
{
nodeIP: "::",
success: false,
testName: "Unspecified IPv6 address",
},
{
nodeIP: "1.2.3.4",
@ -58,9 +152,31 @@ func TestNodeIPParam(t *testing.T) {
testName: "IPv4 address that doesn't belong to host",
},
}
addrs, err := net.InterfaceAddrs()
if err != nil {
assert.Error(t, err, fmt.Sprintf(
"Unable to obtain a list of the node's unicast interface addresses."))
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip.IsLoopback() || ip.IsLinkLocalUnicast() {
break
}
successTest := test{
nodeIP: ip.String(),
success: true,
testName: fmt.Sprintf("Success test case for address %s", ip.String()),
}
tests = append(tests, successTest)
}
for _, test := range tests {
kubelet.nodeIP = net.ParseIP(test.nodeIP)
err := kubelet.validateNodeIP()
err := validateNodeIP(net.ParseIP(test.nodeIP))
if test.success {
assert.NoError(t, err, "test %s", test.testName)
} else {
@ -69,115 +185,6 @@ func TestNodeIPParam(t *testing.T) {
}
}
func TestParseResolvConf(t *testing.T) {
testCases := []struct {
data string
nameservers []string
searches []string
}{
{"", []string{}, []string{}},
{" ", []string{}, []string{}},
{"\n", []string{}, []string{}},
{"\t\n\t", []string{}, []string{}},
{"#comment\n", []string{}, []string{}},
{" #comment\n", []string{}, []string{}},
{"#comment\n#comment", []string{}, []string{}},
{"#comment\nnameserver", []string{}, []string{}},
{"#comment\nnameserver\nsearch", []string{}, []string{}},
{"nameserver 1.2.3.4", []string{"1.2.3.4"}, []string{}},
{" nameserver 1.2.3.4", []string{"1.2.3.4"}, []string{}},
{"\tnameserver 1.2.3.4", []string{"1.2.3.4"}, []string{}},
{"nameserver\t1.2.3.4", []string{"1.2.3.4"}, []string{}},
{"nameserver \t 1.2.3.4", []string{"1.2.3.4"}, []string{}},
{"nameserver 1.2.3.4\nnameserver 5.6.7.8", []string{"1.2.3.4", "5.6.7.8"}, []string{}},
{"nameserver 1.2.3.4 #comment", []string{"1.2.3.4"}, []string{}},
{"search foo", []string{}, []string{"foo"}},
{"search foo bar", []string{}, []string{"foo", "bar"}},
{"search foo bar bat\n", []string{}, []string{"foo", "bar", "bat"}},
{"search foo\nsearch bar", []string{}, []string{"bar"}},
{"nameserver 1.2.3.4\nsearch foo bar", []string{"1.2.3.4"}, []string{"foo", "bar"}},
{"nameserver 1.2.3.4\nsearch foo\nnameserver 5.6.7.8\nsearch bar", []string{"1.2.3.4", "5.6.7.8"}, []string{"bar"}},
{"#comment\nnameserver 1.2.3.4\n#comment\nsearch foo\ncomment", []string{"1.2.3.4"}, []string{"foo"}},
}
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
for i, tc := range testCases {
ns, srch, err := kubelet.parseResolvConf(strings.NewReader(tc.data))
require.NoError(t, err)
assert.EqualValues(t, tc.nameservers, ns, "test case [%d]: name servers", i)
assert.EqualValues(t, tc.searches, srch, "test case [%d] searches", i)
}
}
func TestComposeDNSSearch(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
recorder := record.NewFakeRecorder(20)
kubelet.recorder = recorder
pod := podWithUIDNameNs("", "test_pod", "testNS")
kubelet.clusterDomain = "TEST"
testCases := []struct {
dnsNames []string
hostNames []string
resultSearch []string
events []string
}{
{
[]string{"testNS.svc.TEST", "svc.TEST", "TEST"},
[]string{},
[]string{"testNS.svc.TEST", "svc.TEST", "TEST"},
[]string{},
},
{
[]string{"testNS.svc.TEST", "svc.TEST", "TEST"},
[]string{"AAA", "svc.TEST", "BBB", "TEST"},
[]string{"testNS.svc.TEST", "svc.TEST", "TEST", "AAA", "BBB"},
[]string{},
},
{
[]string{"testNS.svc.TEST", "svc.TEST", "TEST"},
[]string{"AAA", strings.Repeat("B", 256), "BBB"},
[]string{"testNS.svc.TEST", "svc.TEST", "TEST", "AAA"},
[]string{"Search Line limits were exceeded, some dns names have been omitted, the applied search line is: testNS.svc.TEST svc.TEST TEST AAA"},
},
{
[]string{"testNS.svc.TEST", "svc.TEST", "TEST"},
[]string{"AAA", "TEST", "BBB", "TEST", "CCC", "DDD"},
[]string{"testNS.svc.TEST", "svc.TEST", "TEST", "AAA", "BBB", "CCC"},
[]string{
"Search Line limits were exceeded, some dns names have been omitted, the applied search line is: testNS.svc.TEST svc.TEST TEST AAA BBB CCC",
},
},
}
fetchEvent := func(recorder *record.FakeRecorder) string {
select {
case event := <-recorder.Events:
return event
default:
return "No more events!"
}
}
for i, tc := range testCases {
dnsSearch := kubelet.formDNSSearch(tc.hostNames, pod)
assert.EqualValues(t, tc.resultSearch, dnsSearch, "test [%d]", i)
for _, expectedEvent := range tc.events {
expected := fmt.Sprintf("%s %s %s", v1.EventTypeWarning, "DNSSearchForming", expectedEvent)
event := fetchEvent(recorder)
assert.Equal(t, expected, event, "test [%d]", i)
}
}
}
func TestGetIPTablesMark(t *testing.T) {
tests := []struct {
bit int

View file

@ -30,12 +30,11 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilfeature "k8s.io/apiserver/pkg/util/feature"
k8s_api_v1 "k8s.io/kubernetes/pkg/api/v1"
v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
@ -98,7 +97,7 @@ func (kl *Kubelet) registerWithAPIServer() {
// a different externalID value, it attempts to delete that node so that a
// later attempt can recreate it.
func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
_, err := kl.kubeClient.Core().Nodes().Create(node)
_, err := kl.kubeClient.CoreV1().Nodes().Create(node)
if err == nil {
return true
}
@ -108,7 +107,7 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
return false
}
existingNode, err := kl.kubeClient.Core().Nodes().Get(string(kl.nodeName), metav1.GetOptions{})
existingNode, err := kl.kubeClient.CoreV1().Nodes().Get(string(kl.nodeName), metav1.GetOptions{})
if err != nil {
glog.Errorf("Unable to register node %q with API server: error getting existing node: %v", kl.nodeName, err)
return false
@ -118,15 +117,9 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
return false
}
clonedNode, err := conversion.NewCloner().DeepCopy(existingNode)
if err != nil {
glog.Errorf("Unable to clone %q node object %#v: %v", kl.nodeName, existingNode, err)
return false
}
originalNode, ok := clonedNode.(*v1.Node)
if !ok || originalNode == nil {
glog.Errorf("Unable to cast %q node object %#v to v1.Node", kl.nodeName, clonedNode)
originalNode := existingNode.DeepCopy()
if originalNode == nil {
glog.Errorf("Nil %q node object", kl.nodeName)
return false
}
@ -153,7 +146,7 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
"Previously node %q had externalID %q; now it is %q; will delete and recreate.",
kl.nodeName, node.Spec.ExternalID, existingNode.Spec.ExternalID,
)
if err := kl.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil {
if err := kl.kubeClient.CoreV1().Nodes().Delete(node.Name, nil); err != nil {
glog.Errorf("Unable to register node %q with API server: error deleting old node: %v", kl.nodeName, err)
} else {
glog.Infof("Deleted old node object %q", kl.nodeName)
@ -240,10 +233,10 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
},
}
nodeTaints := make([]v1.Taint, 0)
if len(kl.kubeletConfiguration.RegisterWithTaints) > 0 {
taints := make([]v1.Taint, len(kl.kubeletConfiguration.RegisterWithTaints))
for i := range kl.kubeletConfiguration.RegisterWithTaints {
if err := k8s_api_v1.Convert_api_Taint_To_v1_Taint(&kl.kubeletConfiguration.RegisterWithTaints[i], &taints[i], nil); err != nil {
if len(kl.registerWithTaints) > 0 {
taints := make([]v1.Taint, len(kl.registerWithTaints))
for i := range kl.registerWithTaints {
if err := k8s_api_v1.Convert_core_Taint_To_v1_Taint(&kl.registerWithTaints[i], &taints[i], nil); err != nil {
return nil, err
}
}
@ -283,7 +276,7 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
glog.Infof("Controller attach/detach is disabled for this node; Kubelet will attach and detach volumes")
}
if kl.kubeletConfiguration.KeepTerminatedPodVolumes {
if kl.keepTerminatedPodVolumes {
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}
@ -413,14 +406,9 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}
clonedNode, err := conversion.NewCloner().DeepCopy(node)
if err != nil {
return fmt.Errorf("error clone node %q: %v", kl.nodeName, err)
}
originalNode, ok := clonedNode.(*v1.Node)
if !ok || originalNode == nil {
return fmt.Errorf("failed to cast %q node object %#v to v1.Node", kl.nodeName, clonedNode)
originalNode := node.DeepCopy()
if originalNode == nil {
return fmt.Errorf("nil %q node object", kl.nodeName)
}
kl.updatePodCIDR(node.Spec.PodCIDR)
@ -449,13 +437,19 @@ func (kl *Kubelet) recordNodeStatusEvent(eventType, event string) {
// Set IP and hostname addresses for the node.
func (kl *Kubelet) setNodeAddress(node *v1.Node) error {
if kl.nodeIP != nil {
if err := kl.validateNodeIP(); err != nil {
if err := validateNodeIP(kl.nodeIP); err != nil {
return fmt.Errorf("failed to validate nodeIP: %v", err)
}
glog.V(2).Infof("Using node IP: %q", kl.nodeIP.String())
}
if kl.externalCloudProvider {
if kl.nodeIP != nil {
if node.ObjectMeta.Annotations == nil {
node.ObjectMeta.Annotations = make(map[string]string)
}
node.ObjectMeta.Annotations[kubeletapis.AnnotationProvidedIPAddr] = kl.nodeIP.String()
}
// We rely on the external cloud provider to supply the addresses.
return nil
}
@ -509,20 +503,25 @@ func (kl *Kubelet) setNodeAddress(node *v1.Node) error {
// 1) Use nodeIP if set
// 2) If the user has specified an IP to HostnameOverride, use it
// 3) Lookup the IP from node name by DNS and use the first non-loopback ipv4 address
// 3) Lookup the IP from node name by DNS and use the first valid IPv4 address.
// If the node does not have a valid IPv4 address, use the first valid IPv6 address.
// 4) Try to get the IP from the network interface used as default gateway
if kl.nodeIP != nil {
ipAddr = kl.nodeIP
node.ObjectMeta.Annotations[kubeletapis.AnnotationProvidedIPAddr] = kl.nodeIP.String()
} else if addr := net.ParseIP(kl.hostname); addr != nil {
ipAddr = addr
} else {
var addrs []net.IP
addrs, err = net.LookupIP(node.Name)
addrs, _ = net.LookupIP(node.Name)
for _, addr := range addrs {
if !addr.IsLoopback() && addr.To4() != nil {
ipAddr = addr
break
if err = validateNodeIP(addr); err == nil {
if addr.To4() != nil {
ipAddr = addr
break
}
if addr.To16() != nil && ipAddr == nil {
ipAddr = addr
}
}
}
@ -602,15 +601,17 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
}
}
currentCapacity := kl.containerManager.GetCapacity()
if currentCapacity != nil {
for k, v := range currentCapacity {
if v1helper.IsExtendedResourceName(k) {
glog.V(2).Infof("Update capacity for %s to %d", k, v.Value())
node.Status.Capacity[k] = v
}
devicePluginCapacity, removedDevicePlugins := kl.containerManager.GetDevicePluginResourceCapacity()
if devicePluginCapacity != nil {
for k, v := range devicePluginCapacity {
glog.V(2).Infof("Update capacity for %s to %d", k, v.Value())
node.Status.Capacity[k] = v
}
}
for _, removedResource := range removedDevicePlugins {
glog.V(2).Infof("Remove capacity for %s", removedResource)
delete(node.Status.Capacity, v1.ResourceName(removedResource))
}
}
// Set Allocatable.
@ -998,17 +999,22 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
}
// Validate given node IP belongs to the current host
func (kl *Kubelet) validateNodeIP() error {
if kl.nodeIP == nil {
return nil
}
func validateNodeIP(nodeIP net.IP) error {
// Honor IP limitations set in setNodeStatus()
if kl.nodeIP.IsLoopback() {
if nodeIP.To4() == nil && nodeIP.To16() == nil {
return fmt.Errorf("nodeIP must be a valid IP address")
}
if nodeIP.IsLoopback() {
return fmt.Errorf("nodeIP can't be loopback address")
}
if kl.nodeIP.To4() == nil {
return fmt.Errorf("nodeIP must be IPv4 address")
if nodeIP.IsMulticast() {
return fmt.Errorf("nodeIP can't be a multicast address")
}
if nodeIP.IsLinkLocalUnicast() {
return fmt.Errorf("nodeIP can't be a link-local unicast address")
}
if nodeIP.IsUnspecified() {
return fmt.Errorf("nodeIP can't be an all zeros address")
}
addrs, err := net.InterfaceAddrs()
@ -1023,9 +1029,9 @@ func (kl *Kubelet) validateNodeIP() error {
case *net.IPAddr:
ip = v.IP
}
if ip != nil && ip.Equal(kl.nodeIP) {
if ip != nil && ip.Equal(nodeIP) {
return nil
}
}
return fmt.Errorf("Node IP: %q not found in the host's network interfaces", kl.nodeIP.String())
return fmt.Errorf("Node IP: %q not found in the host's network interfaces", nodeIP.String())
}

View file

@ -144,7 +144,7 @@ func TestNodeStatusWithCloudProviderNodeIP(t *testing.T) {
Spec: v1.NodeSpec{},
}
// TODO : is it possible to mock kubelet.validateNodeIP() to avoid relying on the host interface addresses ?
// TODO : is it possible to mock validateNodeIP() to avoid relying on the host interface addresses ?
addrs, err := net.InterfaceAddrs()
assert.NoError(t, err)
for _, addr := range addrs {

View file

@ -43,12 +43,12 @@ import (
"k8s.io/apimachinery/pkg/util/validation/field"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubernetes/pkg/api"
v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
v1qos "k8s.io/kubernetes/pkg/api/v1/helper/qos"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/api/v1/resource"
"k8s.io/kubernetes/pkg/api/v1/validation"
podshelper "k8s.io/kubernetes/pkg/apis/core/pods"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"k8s.io/kubernetes/pkg/apis/core/v1/validation"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/fieldpath"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
@ -64,6 +64,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util/format"
utilfile "k8s.io/kubernetes/pkg/util/file"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
volumevalidation "k8s.io/kubernetes/pkg/volume/validation"
"k8s.io/kubernetes/third_party/forked/golang/expansion"
@ -91,9 +92,9 @@ func (kl *Kubelet) GetActivePods() []*v1.Pod {
return activePods
}
// makeDevices determines the devices for the given container.
// makeGPUDevices determines the devices for the given container.
// Experimental.
func (kl *Kubelet) makeDevices(pod *v1.Pod, container *v1.Container) ([]kubecontainer.DeviceInfo, error) {
func (kl *Kubelet) makeGPUDevices(pod *v1.Pod, container *v1.Container) ([]kubecontainer.DeviceInfo, error) {
if container.Resources.Limits.NvidiaGPU().IsZero() {
return nil, nil
}
@ -111,6 +112,56 @@ func (kl *Kubelet) makeDevices(pod *v1.Pod, container *v1.Container) ([]kubecont
return devices, nil
}
func makeAbsolutePath(goos, path string) string {
if goos != "windows" {
return "/" + path
}
// These are all for windows
// If there is a colon, give up.
if strings.Contains(path, ":") {
return path
}
// If there is a slash, but no drive, add 'c:'
if strings.HasPrefix(path, "/") || strings.HasPrefix(path, "\\") {
return "c:" + path
}
// Otherwise, add 'c:\'
return "c:\\" + path
}
// makeBlockVolumes maps the raw block devices specified in the path of the container
// Experimental
func (kl *Kubelet) makeBlockVolumes(pod *v1.Pod, container *v1.Container, podVolumes kubecontainer.VolumeMap, blkutil volumeutil.BlockVolumePathHandler) ([]kubecontainer.DeviceInfo, error) {
var devices []kubecontainer.DeviceInfo
for _, device := range container.VolumeDevices {
// check path is absolute
if !filepath.IsAbs(device.DevicePath) {
return nil, fmt.Errorf("error DevicePath `%s` must be an absolute path", device.DevicePath)
}
vol, ok := podVolumes[device.Name]
if !ok || vol.BlockVolumeMapper == nil {
glog.Errorf("Block volume cannot be satisfied for container %q, because the volume is missing or the volume mapper is nil: %+v", container.Name, device)
return nil, fmt.Errorf("cannot find volume %q to pass into container %q", device.Name, container.Name)
}
// Get a symbolic link associated to a block device under pod device path
dirPath, volName := vol.BlockVolumeMapper.GetPodDeviceMapPath()
symlinkPath := path.Join(dirPath, volName)
if islinkExist, checkErr := blkutil.IsSymlinkExist(symlinkPath); checkErr != nil {
return nil, checkErr
} else if islinkExist {
// Check readOnly in PVCVolumeSource and set read only permission if it's true.
permission := "mrw"
if vol.ReadOnly {
permission = "r"
}
glog.V(4).Infof("Device will be attached to container %q. Path on host: %v", container.Name, symlinkPath)
devices = append(devices, kubecontainer.DeviceInfo{PathOnHost: symlinkPath, PathInContainer: device.DevicePath, Permissions: permission})
}
}
return devices, nil
}
// makeMounts determines the mount points for the given container.
func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, hostDomain, podIP string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.Mount, error) {
// Kubernetes only mounts on /etc/hosts if:
@ -127,8 +178,8 @@ func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, h
mountEtcHostsFile = mountEtcHostsFile && (mount.MountPath != etcHostsPath)
vol, ok := podVolumes[mount.Name]
if !ok || vol.Mounter == nil {
glog.Warningf("Mount cannot be satisfied for container %q, because the volume is missing or the volume mounter is nil: %q", container.Name, mount)
continue
glog.Errorf("Mount cannot be satisfied for container %q, because the volume is missing or the volume mounter is nil: %+v", container.Name, mount)
return nil, fmt.Errorf("cannot find volume %q to mount into container %q", mount.Name, container.Name)
}
relabelVolume := false
@ -187,9 +238,9 @@ func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, h
if (strings.HasPrefix(hostPath, "/") || strings.HasPrefix(hostPath, "\\")) && !strings.Contains(hostPath, ":") {
hostPath = "c:" + hostPath
}
if (strings.HasPrefix(containerPath, "/") || strings.HasPrefix(containerPath, "\\")) && !strings.Contains(containerPath, ":") {
containerPath = "c:" + containerPath
}
}
if !filepath.IsAbs(containerPath) {
containerPath = makeAbsolutePath(runtime.GOOS, containerPath)
}
propagation, err := translateMountPropagation(mount.MountPropagation)
@ -346,7 +397,7 @@ func truncatePodHostnameIfNeeded(podName, hostname string) (string, error) {
// given that pod's spec and annotations or returns an error.
func (kl *Kubelet) GeneratePodHostNameAndDomain(pod *v1.Pod) (string, string, error) {
// TODO(vmarmol): Handle better.
clusterDomain := kl.clusterDomain
clusterDomain := kl.dnsConfigurer.ClusterDomain
hostname := pod.Name
if len(pod.Spec.Hostname) > 0 {
@ -381,18 +432,17 @@ func (kl *Kubelet) GetPodCgroupParent(pod *v1.Pod) string {
// GenerateRunContainerOptions generates the RunContainerOptions, which can be used by
// the container runtime to set parameters for launching a container.
func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Container, podIP string) (*kubecontainer.RunContainerOptions, bool, error) {
useClusterFirstPolicy := false
opts, err := kl.containerManager.GetResources(pod, container, kl.GetActivePods())
func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Container, podIP string) (*kubecontainer.RunContainerOptions, error) {
opts, err := kl.containerManager.GetResources(pod, container)
if err != nil {
return nil, false, err
return nil, err
}
cgroupParent := kl.GetPodCgroupParent(pod)
opts.CgroupParent = cgroupParent
hostname, hostDomainName, err := kl.GeneratePodHostNameAndDomain(pod)
if err != nil {
return nil, false, err
return nil, err
}
opts.Hostname = hostname
podName := volumehelper.GetUniquePodName(pod)
@ -400,21 +450,31 @@ func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Contai
opts.PortMappings = kubecontainer.MakePortMappings(container)
// TODO(random-liu): Move following convert functions into pkg/kubelet/container
devices, err := kl.makeDevices(pod, container)
devices, err := kl.makeGPUDevices(pod, container)
if err != nil {
return nil, false, err
return nil, err
}
opts.Devices = append(opts.Devices, devices...)
// TODO: remove feature gate check after no longer needed
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
blkutil := volumeutil.NewBlockVolumePathHandler()
blkVolumes, err := kl.makeBlockVolumes(pod, container, volumes, blkutil)
if err != nil {
return nil, err
}
opts.Devices = append(opts.Devices, blkVolumes...)
}
mounts, err := makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIP, volumes)
if err != nil {
return nil, false, err
return nil, err
}
opts.Mounts = append(opts.Mounts, mounts...)
envs, err := kl.makeEnvironmentVariables(pod, container, podIP)
if err != nil {
return nil, false, err
return nil, err
}
opts.Envs = append(opts.Envs, envs...)
@ -429,17 +489,12 @@ func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Contai
}
}
opts.DNS, opts.DNSSearch, useClusterFirstPolicy, err = kl.GetClusterDNS(pod)
if err != nil {
return nil, false, err
}
// only do this check if the experimental behavior is enabled, otherwise allow it to default to false
if kl.experimentalHostUserNamespaceDefaulting {
opts.EnableHostUserNamespace = kl.enableHostUserNamespace(pod)
}
return opts, useClusterFirstPolicy, nil
return opts, nil
}
var masterServices = sets.NewString("kubernetes")
@ -722,7 +777,7 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container
// podFieldSelectorRuntimeValue returns the runtime value of the given
// selector for a pod.
func (kl *Kubelet) podFieldSelectorRuntimeValue(fs *v1.ObjectFieldSelector, pod *v1.Pod, podIP string) (string, error) {
internalFieldPath, _, err := api.Scheme.ConvertFieldLabel(fs.APIVersion, "Pod", fs.FieldPath, "")
internalFieldPath, _, err := podshelper.ConvertDownwardAPIFieldLabel(fs.APIVersion, fs.FieldPath, "")
if err != nil {
return "", err
}
@ -864,7 +919,7 @@ func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bo
glog.V(3).Infof("Pod %q is terminated, but some containers have not been cleaned up: %+v", format.Pod(pod), runtimeStatus.ContainerStatuses)
return false
}
if kl.podVolumesExist(pod.UID) && !kl.kubeletConfiguration.KeepTerminatedPodVolumes {
if kl.podVolumesExist(pod.UID) && !kl.keepTerminatedPodVolumes {
// We shouldnt delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes
glog.V(3).Infof("Pod %q is terminated, but some volumes have not been cleaned up", format.Pod(pod))
return false
@ -1709,7 +1764,7 @@ func (kl *Kubelet) cleanupOrphanedPodCgroups(cgroupPods map[types.UID]cm.CgroupN
// parent croup. If the volumes still exist, reduce the cpu shares for any
// process in the cgroup to the minimum value while we wait. if the kubelet
// is configured to keep terminated volumes, we will delete the cgroup and not block.
if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist && !kl.kubeletConfiguration.KeepTerminatedPodVolumes {
if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist && !kl.keepTerminatedPodVolumes {
glog.V(3).Infof("Orphaned pod %q found, but volumes not yet removed. Reducing cpu to minimum", uid)
if err := pcm.ReduceCPULimits(val); err != nil {
glog.Warningf("Failed to reduce cpu time for pod %q pending volume cleanup due to %v", uid, err)
@ -1777,13 +1832,13 @@ func hasHostNamespace(pod *v1.Pod) bool {
func (kl *Kubelet) hasHostMountPVC(pod *v1.Pod) bool {
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim != nil {
pvc, err := kl.kubeClient.Core().PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName, metav1.GetOptions{})
pvc, err := kl.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName, metav1.GetOptions{})
if err != nil {
glog.Warningf("unable to retrieve pvc %s:%s - %v", pod.Namespace, volume.PersistentVolumeClaim.ClaimName, err)
continue
}
if pvc != nil {
referencedVolume, err := kl.kubeClient.Core().PersistentVolumes().Get(pvc.Spec.VolumeName, metav1.GetOptions{})
referencedVolume, err := kl.kubeClient.CoreV1().PersistentVolumes().Get(pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
glog.Warningf("unable to retrieve pv %s - %v", pvc.Spec.VolumeName, err)
continue

View file

@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"io/ioutil"
"net"
"os"
"path/filepath"
"sort"
@ -38,18 +37,65 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
// TODO: remove this import if
// api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String() is changed
// to "v1"?
_ "k8s.io/kubernetes/pkg/api/install"
"k8s.io/kubernetes/pkg/api/legacyscheme"
_ "k8s.io/kubernetes/pkg/apis/core/install"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)
func TestMakeAbsolutePath(t *testing.T) {
tests := []struct {
goos string
path string
expectedPath string
name string
}{
{
goos: "linux",
path: "non-absolute/path",
expectedPath: "/non-absolute/path",
name: "basic linux",
},
{
goos: "windows",
path: "some\\path",
expectedPath: "c:\\some\\path",
name: "basic windows",
},
{
goos: "windows",
path: "/some/path",
expectedPath: "c:/some/path",
name: "linux path on windows",
},
{
goos: "windows",
path: "\\some\\path",
expectedPath: "c:\\some\\path",
name: "windows path no drive",
},
{
goos: "windows",
path: "\\:\\some\\path",
expectedPath: "\\:\\some\\path",
name: "windows path with colon",
},
}
for _, test := range tests {
path := makeAbsolutePath(test.goos, test.path)
if path != test.expectedPath {
t.Errorf("[%s] Expected %s saw %s", test.name, test.expectedPath, path)
}
}
}
func TestMakeMounts(t *testing.T) {
bTrue := true
propagationHostToContainer := v1.MountPropagationHostToContainer
@ -223,6 +269,36 @@ func TestMakeMounts(t *testing.T) {
expectErr: true,
expectedErrMsg: "unable to provision SubPath `no/backsteps/../allowed`: must not contain '..'",
},
"volume doesn't exist": {
podVolumes: kubecontainer.VolumeMap{},
container: v1.Container{
VolumeMounts: []v1.VolumeMount{
{
MountPath: "/mnt/path3",
Name: "disk",
ReadOnly: true,
},
},
},
expectErr: true,
expectedErrMsg: "cannot find volume \"disk\" to mount into container \"\"",
},
"volume mounter is nil": {
podVolumes: kubecontainer.VolumeMap{
"disk": kubecontainer.VolumeInfo{},
},
container: v1.Container{
VolumeMounts: []v1.VolumeMount{
{
MountPath: "/mnt/path3",
Name: "disk",
ReadOnly: true,
},
},
},
expectErr: true,
expectedErrMsg: "cannot find volume \"disk\" to mount into container \"\"",
},
}
for name, tc := range testCases {
@ -277,6 +353,139 @@ func TestMakeMounts(t *testing.T) {
}
}
func TestMakeBlockVolumes(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
testCases := map[string]struct {
container v1.Container
podVolumes kubecontainer.VolumeMap
expectErr bool
expectedErrMsg string
expectedDevices []kubecontainer.DeviceInfo
}{
"valid volumeDevices in container": {
podVolumes: kubecontainer.VolumeMap{
"disk1": kubecontainer.VolumeInfo{BlockVolumeMapper: &stubBlockVolume{dirPath: "/dev/", volName: "sda"}},
"disk2": kubecontainer.VolumeInfo{BlockVolumeMapper: &stubBlockVolume{dirPath: "/dev/disk/by-path/", volName: "diskPath"}, ReadOnly: true},
"disk3": kubecontainer.VolumeInfo{BlockVolumeMapper: &stubBlockVolume{dirPath: "/dev/disk/by-id/", volName: "diskUuid"}},
"disk4": kubecontainer.VolumeInfo{BlockVolumeMapper: &stubBlockVolume{dirPath: "/var/lib/", volName: "rawdisk"}, ReadOnly: true},
},
container: v1.Container{
Name: "container1",
VolumeDevices: []v1.VolumeDevice{
{
DevicePath: "/dev/sda",
Name: "disk1",
},
{
DevicePath: "/dev/xvda",
Name: "disk2",
},
{
DevicePath: "/dev/xvdb",
Name: "disk3",
},
{
DevicePath: "/mnt/rawdisk",
Name: "disk4",
},
},
},
expectedDevices: []kubecontainer.DeviceInfo{
{
PathInContainer: "/dev/sda",
PathOnHost: "/dev/sda",
Permissions: "mrw",
},
{
PathInContainer: "/dev/xvda",
PathOnHost: "/dev/disk/by-path/diskPath",
Permissions: "r",
},
{
PathInContainer: "/dev/xvdb",
PathOnHost: "/dev/disk/by-id/diskUuid",
Permissions: "mrw",
},
{
PathInContainer: "/mnt/rawdisk",
PathOnHost: "/var/lib/rawdisk",
Permissions: "r",
},
},
expectErr: false,
},
"invalid absolute Path": {
podVolumes: kubecontainer.VolumeMap{
"disk": kubecontainer.VolumeInfo{BlockVolumeMapper: &stubBlockVolume{dirPath: "/dev/", volName: "sda"}},
},
container: v1.Container{
VolumeDevices: []v1.VolumeDevice{
{
DevicePath: "must/be/absolute",
Name: "disk",
},
},
},
expectErr: true,
expectedErrMsg: "error DevicePath `must/be/absolute` must be an absolute path",
},
"volume doesn't exist": {
podVolumes: kubecontainer.VolumeMap{},
container: v1.Container{
VolumeDevices: []v1.VolumeDevice{
{
DevicePath: "/dev/sdaa",
Name: "disk",
},
},
},
expectErr: true,
expectedErrMsg: "cannot find volume \"disk\" to pass into container \"\"",
},
"volume BlockVolumeMapper is nil": {
podVolumes: kubecontainer.VolumeMap{
"disk": kubecontainer.VolumeInfo{},
},
container: v1.Container{
VolumeDevices: []v1.VolumeDevice{
{
DevicePath: "/dev/sdzz",
Name: "disk",
},
},
},
expectErr: true,
expectedErrMsg: "cannot find volume \"disk\" to pass into container \"\"",
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
pod := v1.Pod{
Spec: v1.PodSpec{
HostNetwork: true,
},
}
blkutil := volumetest.NewBlockVolumePathHandler()
blkVolumes, err := kubelet.makeBlockVolumes(&pod, &tc.container, tc.podVolumes, blkutil)
// validate only the error if we expect an error
if tc.expectErr {
if err == nil || err.Error() != tc.expectedErrMsg {
t.Fatalf("expected error message `%s` but got `%v`", tc.expectedErrMsg, err)
}
return
}
// otherwise validate the devices
if err != nil {
t.Fatal(err)
}
assert.Equal(t, tc.expectedDevices, blkVolumes, "devices of container %+v", tc.container)
})
}
}
func TestNodeHostsFileContent(t *testing.T) {
testCases := []struct {
hostsFileName string
@ -561,90 +770,6 @@ func TestRunInContainer(t *testing.T) {
}
}
func TestGenerateRunContainerOptions_DNSConfigurationParams(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
clusterNS := "203.0.113.1"
kubelet.clusterDomain = "kubernetes.io"
kubelet.clusterDNS = []net.IP{net.ParseIP(clusterNS)}
pods := newTestPods(4)
pods[0].Spec.DNSPolicy = v1.DNSClusterFirstWithHostNet
pods[1].Spec.DNSPolicy = v1.DNSClusterFirst
pods[2].Spec.DNSPolicy = v1.DNSClusterFirst
pods[2].Spec.HostNetwork = false
pods[3].Spec.DNSPolicy = v1.DNSDefault
options := make([]*kubecontainer.RunContainerOptions, 4)
for i, pod := range pods {
var err error
options[i], _, err = kubelet.GenerateRunContainerOptions(pod, &v1.Container{}, "")
if err != nil {
t.Fatalf("failed to generate container options: %v", err)
}
}
if len(options[0].DNS) != 1 || options[0].DNS[0] != clusterNS {
t.Errorf("expected nameserver %s, got %+v", clusterNS, options[0].DNS)
}
if len(options[0].DNSSearch) == 0 || options[0].DNSSearch[0] != ".svc."+kubelet.clusterDomain {
t.Errorf("expected search %s, got %+v", ".svc."+kubelet.clusterDomain, options[0].DNSSearch)
}
if len(options[1].DNS) != 1 || options[1].DNS[0] != "127.0.0.1" {
t.Errorf("expected nameserver 127.0.0.1, got %+v", options[1].DNS)
}
if len(options[1].DNSSearch) != 1 || options[1].DNSSearch[0] != "." {
t.Errorf("expected search \".\", got %+v", options[1].DNSSearch)
}
if len(options[2].DNS) != 1 || options[2].DNS[0] != clusterNS {
t.Errorf("expected nameserver %s, got %+v", clusterNS, options[2].DNS)
}
if len(options[2].DNSSearch) == 0 || options[2].DNSSearch[0] != ".svc."+kubelet.clusterDomain {
t.Errorf("expected search %s, got %+v", ".svc."+kubelet.clusterDomain, options[2].DNSSearch)
}
if len(options[3].DNS) != 1 || options[3].DNS[0] != "127.0.0.1" {
t.Errorf("expected nameserver 127.0.0.1, got %+v", options[3].DNS)
}
if len(options[3].DNSSearch) != 1 || options[3].DNSSearch[0] != "." {
t.Errorf("expected search \".\", got %+v", options[3].DNSSearch)
}
kubelet.resolverConfig = "/etc/resolv.conf"
for i, pod := range pods {
var err error
options[i], _, err = kubelet.GenerateRunContainerOptions(pod, &v1.Container{}, "")
if err != nil {
t.Fatalf("failed to generate container options: %v", err)
}
}
t.Logf("nameservers %+v", options[1].DNS)
if len(options[0].DNS) != 1 {
t.Errorf("expected cluster nameserver only, got %+v", options[0].DNS)
} else if options[0].DNS[0] != clusterNS {
t.Errorf("expected nameserver %s, got %v", clusterNS, options[0].DNS[0])
}
expLength := len(options[1].DNSSearch) + 3
if expLength > 6 {
expLength = 6
}
if len(options[0].DNSSearch) != expLength {
t.Errorf("expected prepend of cluster domain, got %+v", options[0].DNSSearch)
} else if options[0].DNSSearch[0] != ".svc."+kubelet.clusterDomain {
t.Errorf("expected domain %s, got %s", ".svc."+kubelet.clusterDomain, options[0].DNSSearch)
}
if len(options[2].DNS) != 1 {
t.Errorf("expected cluster nameserver only, got %+v", options[2].DNS)
} else if options[2].DNS[0] != clusterNS {
t.Errorf("expected nameserver %s, got %v", clusterNS, options[2].DNS[0])
}
if len(options[2].DNSSearch) != expLength {
t.Errorf("expected prepend of cluster domain, got %+v", options[2].DNSSearch)
} else if options[2].DNSSearch[0] != ".svc."+kubelet.clusterDomain {
t.Errorf("expected domain %s, got %s", ".svc."+kubelet.clusterDomain, options[0].DNSSearch)
}
}
type testServiceLister struct {
services []*v1.Service
}
@ -854,7 +979,7 @@ func TestMakeEnvironmentVariables(t *testing.T) {
Name: "POD_NAME",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
APIVersion: legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
FieldPath: "metadata.name",
},
},
@ -863,7 +988,7 @@ func TestMakeEnvironmentVariables(t *testing.T) {
Name: "POD_NAMESPACE",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
APIVersion: legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
FieldPath: "metadata.namespace",
},
},
@ -872,7 +997,7 @@ func TestMakeEnvironmentVariables(t *testing.T) {
Name: "POD_NODE_NAME",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
APIVersion: legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
FieldPath: "spec.nodeName",
},
},
@ -881,7 +1006,7 @@ func TestMakeEnvironmentVariables(t *testing.T) {
Name: "POD_SERVICE_ACCOUNT_NAME",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
APIVersion: legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
FieldPath: "spec.serviceAccountName",
},
},
@ -890,7 +1015,7 @@ func TestMakeEnvironmentVariables(t *testing.T) {
Name: "POD_IP",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
APIVersion: legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
FieldPath: "status.podIP",
},
},
@ -899,7 +1024,7 @@ func TestMakeEnvironmentVariables(t *testing.T) {
Name: "HOST_IP",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
APIVersion: legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
FieldPath: "status.hostIP",
},
},
@ -930,7 +1055,7 @@ func TestMakeEnvironmentVariables(t *testing.T) {
Name: "POD_NAME",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
APIVersion: legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion.String(),
FieldPath: "metadata.name",
},
},

View file

@ -22,7 +22,6 @@ import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/kubernetes/pkg/api/v1/resource"
)
@ -44,28 +43,14 @@ func (kl *Kubelet) defaultPodLimitsForDownwardAPI(pod *v1.Pod, container *v1.Con
}
allocatable := node.Status.Allocatable
glog.Infof("allocatable: %v", allocatable)
podCopy, err := scheme.Scheme.Copy(pod)
if err != nil {
return nil, nil, fmt.Errorf("failed to perform a deep copy of pod object: %v", err)
}
outputPod, ok := podCopy.(*v1.Pod)
if !ok {
return nil, nil, fmt.Errorf("unexpected type returned from deep copy of pod object")
}
outputPod := pod.DeepCopy()
for idx := range outputPod.Spec.Containers {
resource.MergeContainerResourceLimits(&outputPod.Spec.Containers[idx], allocatable)
}
var outputContainer *v1.Container
if container != nil {
containerCopy, err := scheme.Scheme.DeepCopy(container)
if err != nil {
return nil, nil, fmt.Errorf("failed to perform a deep copy of container object: %v", err)
}
outputContainer, ok = containerCopy.(*v1.Container)
if !ok {
return nil, nil, fmt.Errorf("unexpected type returned from deep copy of container object")
}
outputContainer = container.DeepCopy()
resource.MergeContainerResourceLimits(outputContainer, allocatable)
}
return outputPod, outputContainer, nil

View file

@ -72,6 +72,7 @@ import (
_ "k8s.io/kubernetes/pkg/volume/host_path"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
func init() {
@ -284,7 +285,7 @@ func newTestKubeletWithImageList(
kubelet.evictionManager = evictionManager
kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
// Add this as cleanup predicate pod admitter
kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub()))
kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), kubelet.containerManager.UpdatePluginResources))
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock
@ -573,6 +574,116 @@ func TestHandleMemExceeded(t *testing.T) {
checkPodStatus(t, kl, fittingPod, v1.PodPending)
}
// Tests that we handle result of interface UpdatePluginResources correctly
// by setting corresponding status in status map.
func TestHandlePluginResources(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
testKubelet.chainMock()
kl := testKubelet.kubelet
adjustedResource := v1.ResourceName("domain1.com/adjustedResource")
unadjustedResouce := v1.ResourceName("domain2.com/unadjustedResouce")
failedResource := v1.ResourceName("domain2.com/failedResource")
resourceQuantity1 := *resource.NewQuantity(int64(1), resource.DecimalSI)
resourceQuantity2 := *resource.NewQuantity(int64(2), resource.DecimalSI)
resourceQuantityInvalid := *resource.NewQuantity(int64(-1), resource.DecimalSI)
allowedPodQuantity := *resource.NewQuantity(int64(10), resource.DecimalSI)
nodes := []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: v1.ResourceList{
adjustedResource: resourceQuantity1,
unadjustedResouce: resourceQuantity1,
v1.ResourcePods: allowedPodQuantity,
}}},
}
kl.nodeInfo = testNodeInfo{nodes: nodes}
updatePluginResourcesFunc := func(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
// Maps from resourceName to the value we use to set node.allocatableResource[resourceName].
// A resource with invalid value (< 0) causes the function to return an error
// to emulate resource Allocation failure.
// Resources not contained in this map will have their node.allocatableResource
// quantity unchanged.
updateResourceMap := map[v1.ResourceName]resource.Quantity{
adjustedResource: resourceQuantity2,
failedResource: resourceQuantityInvalid,
}
pod := attrs.Pod
allocatableResource := node.AllocatableResource()
newAllocatableResource := allocatableResource.Clone()
for _, container := range pod.Spec.Containers {
for resource := range container.Resources.Requests {
newQuantity, exist := updateResourceMap[resource]
if !exist {
continue
}
if newQuantity.Value() < 0 {
return fmt.Errorf("Allocation failed")
}
newAllocatableResource.ScalarResources[resource] = newQuantity.Value()
}
}
node.SetAllocatableResource(newAllocatableResource)
return nil
}
// add updatePluginResourcesFunc to admission handler, to test it's behavior.
kl.admitHandlers = lifecycle.PodAdmitHandlers{}
kl.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kl.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), updatePluginResourcesFunc))
// pod requiring adjustedResource can be successfully allocated because updatePluginResourcesFunc
// adjusts node.allocatableResource for this resource to a sufficient value.
fittingPodspec := v1.PodSpec{NodeName: string(kl.nodeName),
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
adjustedResource: resourceQuantity2,
},
Requests: v1.ResourceList{
adjustedResource: resourceQuantity2,
},
}}},
}
// pod requiring unadjustedResouce with insufficient quantity will still fail PredicateAdmit.
exceededPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
unadjustedResouce: resourceQuantity2,
},
Requests: v1.ResourceList{
unadjustedResouce: resourceQuantity2,
},
}}},
}
// pod requiring failedResource will fail with the resource failed to be allocated.
failedPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
failedResource: resourceQuantity1,
},
Requests: v1.ResourceList{
failedResource: resourceQuantity1,
},
}}},
}
pods := []*v1.Pod{
podWithUIDNameNsSpec("123", "fittingpod", "foo", fittingPodspec),
podWithUIDNameNsSpec("456", "exceededpod", "foo", exceededPodSpec),
podWithUIDNameNsSpec("789", "failedpod", "foo", failedPodSpec),
}
// The latter two pod should be rejected.
fittingPod := pods[0]
exceededPod := pods[1]
failedPod := pods[2]
kl.HandlePodAdditions(pods)
// Check pod status stored in the status map.
checkPodStatus(t, kl, fittingPod, v1.PodPending)
checkPodStatus(t, kl, exceededPod, v1.PodFailed)
checkPodStatus(t, kl, failedPod, v1.PodFailed)
}
// TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal.
func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)

View file

@ -69,7 +69,6 @@ func (kl *Kubelet) newVolumeMounterFromPlugins(spec *volume.Spec, pod *v1.Pod, o
if err != nil {
return nil, fmt.Errorf("can't use volume plugins for %s: %v", spec.Name(), err)
}
opts.Containerized = kl.kubeletConfiguration.Containerized
physicalMounter, err := plugin.NewMounter(spec, pod, opts)
if err != nil {
return nil, fmt.Errorf("failed to instantiate mounter for volume: %s using plugin: %s with a root cause: %v", spec.Name(), plugin.GetPluginName(), err)

View file

@ -58,9 +58,7 @@ func TestListVolumesForPod(t *testing.T) {
})
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
defer close(stopCh)
kubelet.podManager.SetPods([]*v1.Pod{pod})
err := kubelet.volumeManager.WaitForAttachAndMount(pod)
@ -142,9 +140,7 @@ func TestPodVolumesExist(t *testing.T) {
}
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
defer close(stopCh)
kubelet.podManager.SetPods(pods)
for _, pod := range pods {
@ -177,9 +173,7 @@ func TestVolumeAttachAndMountControllerDisabled(t *testing.T) {
})
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
defer close(stopCh)
kubelet.podManager.SetPods([]*v1.Pod{pod})
err := kubelet.volumeManager.WaitForAttachAndMount(pod)
@ -223,9 +217,7 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) {
})
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
defer close(stopCh)
// Add pod
kubelet.podManager.SetPods([]*v1.Pod{pod})
@ -312,9 +304,7 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) {
})
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
defer close(stopCh)
kubelet.podManager.SetPods([]*v1.Pod{pod})
@ -381,9 +371,7 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
})
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
defer close(stopCh)
// Add pod
kubelet.podManager.SetPods([]*v1.Pod{pod})
@ -460,3 +448,23 @@ func (f *stubVolume) SetUp(fsGroup *int64) error {
func (f *stubVolume) SetUpAt(dir string, fsGroup *int64) error {
return nil
}
type stubBlockVolume struct {
dirPath string
volName string
}
func (f *stubBlockVolume) GetGlobalMapPath(spec *volume.Spec) (string, error) {
return "", nil
}
func (f *stubBlockVolume) GetPodDeviceMapPath() (string, string) {
return f.dirPath, f.volName
}
func (f *stubBlockVolume) SetUpDevice() (string, error) {
return "", nil
}
func (f *stubBlockVolume) TearDownDevice(mapPath string, devicePath string) error {
return nil
}

View file

@ -1,95 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
)
// This just exports required functions from kubelet proper, for use by network
// plugins.
// TODO(#35457): get rid of this backchannel to the kubelet. The scope of
// the back channel is restricted to host-ports/testing, and restricted
// to kubenet. No other network plugin wrapper needs it. Other plugins
// only require a way to access namespace information, which they can do
// directly through the methods implemented by criNetworkHost.
type networkHost struct {
kubelet *Kubelet
}
func (nh *networkHost) GetPodByName(name, namespace string) (*v1.Pod, bool) {
return nh.kubelet.GetPodByName(name, namespace)
}
func (nh *networkHost) GetKubeClient() clientset.Interface {
return nh.kubelet.kubeClient
}
func (nh *networkHost) GetRuntime() kubecontainer.Runtime {
return nh.kubelet.GetRuntime()
}
func (nh *networkHost) SupportsLegacyFeatures() bool {
return true
}
// criNetworkHost implements the part of network.Host required by the
// cri (NamespaceGetter). It leechs off networkHost for all other
// methods, because networkHost is slated for deletion.
type criNetworkHost struct {
*networkHost
// criNetworkHost currently support legacy features. Hence no need to support PortMappingGetter
*network.NoopPortMappingGetter
}
// GetNetNS returns the network namespace of the given containerID.
// This method satisfies the network.NamespaceGetter interface for
// networkHost. It's only meant to be used from network plugins
// that are directly invoked by the kubelet (aka: legacy, pre-cri).
// Any network plugin invoked by a cri must implement NamespaceGetter
// to talk directly to the runtime instead.
func (c *criNetworkHost) GetNetNS(containerID string) (string, error) {
return c.kubelet.GetRuntime().GetNetNS(kubecontainer.ContainerID{Type: "", ID: containerID})
}
// NoOpLegacyHost implements the network.LegacyHost interface for the remote
// runtime shim by just returning empties. It doesn't support legacy features
// like host port and bandwidth shaping.
type NoOpLegacyHost struct{}
// GetPodByName always returns "nil, true" for 'NoOpLegacyHost'
func (n *NoOpLegacyHost) GetPodByName(namespace, name string) (*v1.Pod, bool) {
return nil, true
}
// GetKubeClient always returns "nil" for 'NoOpLegacyHost'
func (n *NoOpLegacyHost) GetKubeClient() clientset.Interface {
return nil
}
// GetRuntime always returns "nil" for 'NoOpLegacyHost'
func (n *NoOpLegacyHost) GetRuntime() kubecontainer.Runtime {
return nil
}
// SupportsLegacyFeatures always returns "false" for 'NoOpLegacyHost'
func (n *NoOpLegacyHost) SupportsLegacyFeatures() bool {
return false
}

View file

@ -1,91 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"testing"
)
func TestNetworkHostGetsPodNotFound(t *testing.T) {
testKubelet := newTestKubelet(t, true)
defer testKubelet.Cleanup()
nh := networkHost{testKubelet.kubelet}
actualPod, _ := nh.GetPodByName("", "")
if actualPod != nil {
t.Fatalf("Was expected nil, received %v instead", actualPod)
}
}
func TestNetworkHostGetsKubeClient(t *testing.T) {
testKubelet := newTestKubelet(t, true)
defer testKubelet.Cleanup()
nh := networkHost{testKubelet.kubelet}
if nh.GetKubeClient() != testKubelet.fakeKubeClient {
t.Fatalf("NetworkHost client does not match testKubelet's client")
}
}
func TestNetworkHostGetsRuntime(t *testing.T) {
testKubelet := newTestKubelet(t, true)
defer testKubelet.Cleanup()
nh := networkHost{testKubelet.kubelet}
if nh.GetRuntime() != testKubelet.fakeRuntime {
t.Fatalf("NetworkHost runtime does not match testKubelet's runtime")
}
}
func TestNetworkHostSupportsLegacyFeatures(t *testing.T) {
testKubelet := newTestKubelet(t, true)
defer testKubelet.Cleanup()
nh := networkHost{testKubelet.kubelet}
if nh.SupportsLegacyFeatures() == false {
t.Fatalf("SupportsLegacyFeatures should not be false")
}
}
func TestNoOpHostGetsName(t *testing.T) {
nh := NoOpLegacyHost{}
pod, err := nh.GetPodByName("", "")
if pod != nil && err != true {
t.Fatalf("noOpLegacyHost getpodbyname expected to be nil and true")
}
}
func TestNoOpHostGetsKubeClient(t *testing.T) {
nh := NoOpLegacyHost{}
if nh.GetKubeClient() != nil {
t.Fatalf("noOpLegacyHost client expected to be nil")
}
}
func TestNoOpHostGetsRuntime(t *testing.T) {
nh := NoOpLegacyHost{}
if nh.GetRuntime() != nil {
t.Fatalf("noOpLegacyHost runtime expected to be nil")
}
}
func TestNoOpHostSupportsLegacyFeatures(t *testing.T) {
nh := NoOpLegacyHost{}
if nh.SupportsLegacyFeatures() != false {
t.Fatalf("noOpLegacyHost legacy features expected to be false")
}
}

View file

@ -162,6 +162,9 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
// the previous sync.
status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
if err != nil {
// This is the legacy event thrown by manage pod loop
// all other events are now dispatched from syncPodFn
p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
return err
}
err = p.syncPodFn(syncPodOptions{
@ -179,14 +182,8 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
update.OnCompleteFunc(err)
}
if err != nil {
// IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
glog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
// if we failed sync, we throw more specific events for why it happened.
// as a result, i question the value of this event.
// TODO: determine if we can remove this in a future release.
// do not include descriptive text that can vary on why it failed so in a pathological
// scenario, kubelet does not create enough discrete events that miss default aggregation
// window.
p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "Error syncing pod")
}
p.wrapUp(update.Pod.UID, err)
}

View file

@ -15,8 +15,9 @@ go_library(
"pod_update.go",
"types.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/types",
deps = [
"//pkg/api:go_default_library",
"//pkg/apis/core:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
@ -30,6 +31,7 @@ go_test(
"pod_update_test.go",
"types_test.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/types",
library = ":go_default_library",
deps = [
"//vendor/github.com/stretchr/testify/assert:go_default_library",

View file

@ -21,7 +21,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeapi "k8s.io/kubernetes/pkg/api"
kubeapi "k8s.io/kubernetes/pkg/apis/core"
)
const (
@ -49,6 +49,8 @@ const (
// Pods with the given ids have unexpected status in this source,
// kubelet should reconcile status with this source
RECONCILE
// Pods with the given ids have been restored from a checkpoint.
RESTORE
// These constants identify the sources of pods
// Updates from a file

View file

@ -9,6 +9,7 @@ load(
go_test(
name = "go_default_test",
srcs = ["util_test.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/util",
library = ":go_default_library",
deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"],
)
@ -20,6 +21,9 @@ go_library(
"util.go",
"util_unsupported.go",
] + select({
"@io_bazel_rules_go//go/platform:darwin_amd64": [
"util_unix.go",
],
"@io_bazel_rules_go//go/platform:linux_amd64": [
"util_unix.go",
],
@ -28,9 +32,14 @@ go_library(
],
"//conditions:default": [],
}),
importpath = "k8s.io/kubernetes/pkg/kubelet/util",
deps = [
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
] + select({
"@io_bazel_rules_go//go/platform:darwin_amd64": [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/golang.org/x/sys/unix:go_default_library",
],
"@io_bazel_rules_go//go/platform:linux_amd64": [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/golang.org/x/sys/unix:go_default_library",
@ -51,11 +60,11 @@ filegroup(
srcs = [
":package-srcs",
"//pkg/kubelet/util/cache:all-srcs",
"//pkg/kubelet/util/csr:all-srcs",
"//pkg/kubelet/util/format:all-srcs",
"//pkg/kubelet/util/ioutils:all-srcs",
"//pkg/kubelet/util/queue:all-srcs",
"//pkg/kubelet/util/sliceutils:all-srcs",
"//pkg/kubelet/util/store:all-srcs",
],
tags = ["automanaged"],
)

View file

@ -12,6 +12,7 @@ go_library(
"pod.go",
"resources.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/util/format",
deps = [
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
@ -21,6 +22,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["resources_test.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/util/format",
library = ":go_default_library",
deps = [
"//vendor/k8s.io/api/core/v1:go_default_library",

View file

@ -8,6 +8,7 @@ load(
go_library(
name = "go_default_library",
srcs = ["ioutils.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/util/ioutils",
)
filegroup(

View file

@ -9,6 +9,7 @@ load(
go_library(
name = "go_default_library",
srcs = ["sliceutils.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/util/sliceutils",
deps = [
"//pkg/kubelet/container:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
@ -31,6 +32,7 @@ filegroup(
go_test(
name = "go_default_test",
srcs = ["sliceutils_test.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/util/sliceutils",
library = ":go_default_library",
deps = [
"//pkg/kubelet/container:go_default_library",

View file

@ -1,4 +1,4 @@
// +build freebsd linux
// +build freebsd linux darwin
/*
Copyright 2017 The Kubernetes Authors.

View file

@ -1,4 +1,4 @@
// +build !freebsd,!linux,!windows
// +build !freebsd,!linux,!windows,!darwin
/*
Copyright 2017 The Kubernetes Authors.

View file

@ -19,12 +19,19 @@ package kubelet
import (
"fmt"
"net"
"runtime"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/configmap"
"k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/mountpod"
"k8s.io/kubernetes/pkg/kubelet/secret"
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
@ -43,11 +50,17 @@ func NewInitializedVolumePluginMgr(
configMapManager configmap.Manager,
plugins []volume.VolumePlugin,
prober volume.DynamicPluginProber) (*volume.VolumePluginMgr, error) {
mountPodManager, err := mountpod.NewManager(kubelet.getRootDir(), kubelet.podManager)
if err != nil {
return nil, err
}
kvh := &kubeletVolumeHost{
kubelet: kubelet,
volumePluginMgr: volume.VolumePluginMgr{},
secretManager: secretManager,
configMapManager: configMapManager,
mountPodManager: mountPodManager,
}
if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil {
@ -71,10 +84,23 @@ type kubeletVolumeHost struct {
volumePluginMgr volume.VolumePluginMgr
secretManager secret.Manager
configMapManager configmap.Manager
mountPodManager mountpod.Manager
}
func (kvh *kubeletVolumeHost) GetVolumeDevicePluginDir(pluginName string) string {
return kvh.kubelet.getVolumeDevicePluginDir(pluginName)
}
func (kvh *kubeletVolumeHost) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
return kvh.kubelet.getPodVolumeDir(podUID, pluginName, volumeName)
dir := kvh.kubelet.getPodVolumeDir(podUID, pluginName, volumeName)
if runtime.GOOS == "windows" {
dir = volume.GetWindowsPath(dir)
}
return dir
}
func (kvh *kubeletVolumeHost) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
return kvh.kubelet.getPodVolumeDeviceDir(podUID, pluginName)
}
func (kvh *kubeletVolumeHost) GetPodPluginDir(podUID types.UID, pluginName string) string {
@ -119,7 +145,16 @@ func (kvh *kubeletVolumeHost) GetCloudProvider() cloudprovider.Interface {
}
func (kvh *kubeletVolumeHost) GetMounter(pluginName string) mount.Interface {
return kvh.kubelet.mounter
exec, err := kvh.getMountExec(pluginName)
if err != nil {
glog.V(2).Info("Error finding mount pod for plugin %s: %s", pluginName, err.Error())
// Use the default mounter
exec = nil
}
if exec == nil {
return kvh.kubelet.mounter
}
return mount.NewExecMounter(exec, kvh.kubelet.mounter)
}
func (kvh *kubeletVolumeHost) GetWriter() io.Writer {
@ -158,6 +193,61 @@ func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {
return node.Labels, nil
}
func (kvh *kubeletVolumeHost) GetExec(pluginName string) mount.Exec {
return mount.NewOsExec()
func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName {
return kvh.kubelet.nodeName
}
func (kvh *kubeletVolumeHost) GetExec(pluginName string) mount.Exec {
exec, err := kvh.getMountExec(pluginName)
if err != nil {
glog.V(2).Info("Error finding mount pod for plugin %s: %s", pluginName, err.Error())
// Use the default exec
exec = nil
}
if exec == nil {
return mount.NewOsExec()
}
return exec
}
// getMountExec returns mount.Exec implementation that leads to pod with mount
// utilities. It returns nil,nil when there is no such pod and default mounter /
// os.Exec should be used.
func (kvh *kubeletVolumeHost) getMountExec(pluginName string) (mount.Exec, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.MountContainers) {
glog.V(5).Infof("using default mounter/exec for %s", pluginName)
return nil, nil
}
pod, container, err := kvh.mountPodManager.GetMountPod(pluginName)
if err != nil {
return nil, err
}
if pod == nil {
// Use default mounter/exec for this plugin
glog.V(5).Infof("using default mounter/exec for %s", pluginName)
return nil, nil
}
glog.V(5).Infof("using container %s/%s/%s to execute mount utilites for %s", pod.Namespace, pod.Name, container, pluginName)
return &containerExec{
pod: pod,
containerName: container,
kl: kvh.kubelet,
}, nil
}
// containerExec is implementation of mount.Exec that executes commands in given
// container in given pod.
type containerExec struct {
pod *v1.Pod
containerName string
kl *Kubelet
}
var _ mount.Exec = &containerExec{}
func (e *containerExec) Run(cmd string, args ...string) ([]byte, error) {
cmdline := append([]string{cmd}, args...)
glog.V(5).Infof("Exec mounter running in pod %s/%s/%s: %v", e.pod.Namespace, e.pod.Name, e.containerName, cmdline)
return e.kl.RunInContainer(container.GetPodFullName(e.pod), e.pod.UID, e.containerName, cmdline)
}