Update go dependencies

This commit is contained in:
Manuel de Brito Fontes 2018-07-12 13:19:04 -04:00 committed by Manuel Alejandro de Brito Fontes
parent d5cf22c129
commit 063cc68d1c
No known key found for this signature in database
GPG key ID: 786136016A8BA02A
1321 changed files with 52830 additions and 31081 deletions

View file

@ -46,6 +46,7 @@ go_library(
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/certificate:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/configmap:go_default_library",
@ -55,8 +56,6 @@ go_library(
"//pkg/kubelet/envvars:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/eviction:go_default_library",
"//pkg/kubelet/gpu:go_default_library",
"//pkg/kubelet/gpu/nvidia:go_default_library",
"//pkg/kubelet/images:go_default_library",
"//pkg/kubelet/kubeletconfig:go_default_library",
"//pkg/kubelet/kuberuntime:go_default_library",
@ -65,7 +64,6 @@ go_library(
"//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/metrics/collectors:go_default_library",
"//pkg/kubelet/mountpod:go_default_library",
"//pkg/kubelet/network:go_default_library",
"//pkg/kubelet/network/dns:go_default_library",
"//pkg/kubelet/pleg:go_default_library",
"//pkg/kubelet/pod:go_default_library",
@ -73,7 +71,6 @@ go_library(
"//pkg/kubelet/prober:go_default_library",
"//pkg/kubelet/prober/results:go_default_library",
"//pkg/kubelet/remote:go_default_library",
"//pkg/kubelet/rkt:go_default_library",
"//pkg/kubelet/secret:go_default_library",
"//pkg/kubelet/server:go_default_library",
"//pkg/kubelet/server/portforward:go_default_library",
@ -83,15 +80,19 @@ go_library(
"//pkg/kubelet/stats:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//pkg/kubelet/sysctl:go_default_library",
"//pkg/kubelet/token:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/kubelet/util/manager:go_default_library",
"//pkg/kubelet/util/pluginwatcher:go_default_library",
"//pkg/kubelet/util/queue:go_default_library",
"//pkg/kubelet/util/sliceutils:go_default_library",
"//pkg/kubelet/volumemanager:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/security/apparmor:go_default_library",
"//pkg/security/podsecuritypolicy/sysctl:go_default_library",
"//pkg/securitycontext:go_default_library",
"//pkg/util/dbus:go_default_library",
"//pkg/util/file:go_default_library",
@ -103,6 +104,7 @@ go_library(
"//pkg/util/removeall:go_default_library",
"//pkg/version:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//pkg/volume/util/volumepathhandler:go_default_library",
@ -113,6 +115,7 @@ go_library(
"//vendor/github.com/google/cadvisor/events:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/github.com/google/cadvisor/info/v2:go_default_library",
"//vendor/k8s.io/api/authentication/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
@ -133,7 +136,6 @@ go_library(
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/tools/remotecommand:go_default_library",
"//vendor/k8s.io/client-go/util/certificate:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/k8s.io/client-go/util/integer:go_default_library",
@ -165,13 +167,11 @@ go_test(
}),
embed = [":go_default_library"],
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core/install:go_default_library",
"//pkg/capabilities:go_default_library",
"//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/cadvisor/testing:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/config:go_default_library",
@ -179,12 +179,9 @@ go_test(
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/container/testing:go_default_library",
"//pkg/kubelet/eviction:go_default_library",
"//pkg/kubelet/gpu:go_default_library",
"//pkg/kubelet/images:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/logs:go_default_library",
"//pkg/kubelet/network:go_default_library",
"//pkg/kubelet/network/testing:go_default_library",
"//pkg/kubelet/pleg:go_default_library",
"//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/pod/testing:go_default_library",
@ -197,14 +194,18 @@ go_test(
"//pkg/kubelet/stats:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//pkg/kubelet/status/testing:go_default_library",
"//pkg/kubelet/token:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/queue:go_default_library",
"//pkg/kubelet/util/sliceutils:go_default_library",
"//pkg/kubelet/volumemanager:go_default_library",
"//pkg/scheduler/schedulercache:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/version:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/aws_ebs:go_default_library",
"//pkg/volume/azure_dd:go_default_library",
"//pkg/volume/gce_pd:go_default_library",
"//pkg/volume/host_path:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
@ -254,6 +255,7 @@ filegroup(
"//pkg/kubelet/cadvisor:all-srcs",
"//pkg/kubelet/certificate:all-srcs",
"//pkg/kubelet/checkpoint:all-srcs",
"//pkg/kubelet/checkpointmanager:all-srcs",
"//pkg/kubelet/client:all-srcs",
"//pkg/kubelet/cm:all-srcs",
"//pkg/kubelet/config:all-srcs",
@ -264,7 +266,6 @@ filegroup(
"//pkg/kubelet/envvars:all-srcs",
"//pkg/kubelet/events:all-srcs",
"//pkg/kubelet/eviction:all-srcs",
"//pkg/kubelet/gpu:all-srcs",
"//pkg/kubelet/images:all-srcs",
"//pkg/kubelet/kubeletconfig:all-srcs",
"//pkg/kubelet/kuberuntime:all-srcs",
@ -280,12 +281,12 @@ filegroup(
"//pkg/kubelet/prober:all-srcs",
"//pkg/kubelet/qos:all-srcs",
"//pkg/kubelet/remote:all-srcs",
"//pkg/kubelet/rkt:all-srcs",
"//pkg/kubelet/secret:all-srcs",
"//pkg/kubelet/server:all-srcs",
"//pkg/kubelet/stats:all-srcs",
"//pkg/kubelet/status:all-srcs",
"//pkg/kubelet/sysctl:all-srcs",
"//pkg/kubelet/token:all-srcs",
"//pkg/kubelet/types:all-srcs",
"//pkg/kubelet/util:all-srcs",
"//pkg/kubelet/volumemanager:all-srcs",

View file

@ -41,6 +41,7 @@ filegroup(
"//pkg/kubelet/apis/deviceplugin/v1alpha:all-srcs",
"//pkg/kubelet/apis/deviceplugin/v1beta1:all-srcs",
"//pkg/kubelet/apis/kubeletconfig:all-srcs",
"//pkg/kubelet/apis/pluginregistration/v1alpha1:all-srcs",
"//pkg/kubelet/apis/stats/v1alpha1:all-srcs",
],
tags = ["automanaged"],

File diff suppressed because it is too large Load diff

View file

@ -171,7 +171,9 @@ enum MountPropagation {
message Mount {
// Path of the mount within the container.
string container_path = 1;
// Path of the mount on the host.
// Path of the mount on the host. If the hostPath doesn't exist, then runtimes
// should report error. If the hostpath is a symbolic link, runtimes should
// follow the symlink and mount the real destination to container.
string host_path = 2;
// If set, the mount is read-only.
bool readonly = 3;
@ -235,7 +237,8 @@ message LinuxSandboxSecurityContext {
SELinuxOption selinux_options = 2;
// UID to run sandbox processes as, when applicable.
Int64Value run_as_user = 3;
// GID to run sandbox processes as, when applicable.
// GID to run sandbox processes as, when applicable. run_as_group should only
// be specified when run_as_user is specified; otherwise, the runtime MUST error.
Int64Value run_as_group = 8;
// If set, the root filesystem of the sandbox is read-only.
bool readonly_rootfs = 4;
@ -249,7 +252,7 @@ message LinuxSandboxSecurityContext {
// privileged containers are expected to be run.
bool privileged = 6;
// Seccomp profile for the sandbox, candidate values are:
// * docker/default: the default profile for the docker container runtime
// * runtime/default: the default profile for the container runtime
// * unconfined: unconfined profile, ie, no seccomp sandboxing
// * localhost/<full-path-to-profile>: the profile installed on the node.
// <full-path-to-profile> is the full path of the profile.
@ -304,7 +307,7 @@ message PodSandboxConfig {
// structured logs, systemd-journald journal files, gRPC trace files, etc.
// E.g.,
// PodSandboxConfig.LogDirectory = `/var/log/pods/<podUID>/`
// ContainerConfig.LogPath = `containerName_Instance#.log`
// ContainerConfig.LogPath = `containerName/Instance#.log`
//
// WARNING: Log management and how kubelet should interface with the
// container logs are under active discussion in
@ -553,8 +556,9 @@ message LinuxContainerSecurityContext {
// UID to run the container process as. Only one of run_as_user and
// run_as_username can be specified at a time.
Int64Value run_as_user = 5;
// GID to run the container process as. Only one of run_as_group and
// run_as_groupname can be specified at a time.
// GID to run the container process as. run_as_group should only be specified
// when run_as_user or run_as_username is specified; otherwise, the runtime
// MUST error.
Int64Value run_as_group = 12;
// User name to run the container process as. If specified, the user MUST
// exist in the container image (i.e. in the /etc/passwd inside the image),
@ -573,7 +577,7 @@ message LinuxContainerSecurityContext {
// http://wiki.apparmor.net/index.php/AppArmor_Core_Policy_Reference
string apparmor_profile = 9;
// Seccomp profile for the container, candidate values are:
// * docker/default: the default profile for the docker container runtime
// * runtime/default: the default profile for the container runtime
// * unconfined: unconfined profile, ie, no seccomp sandboxing
// * localhost/<full-path-to-profile>: the profile installed on the node.
// <full-path-to-profile> is the full path of the profile.
@ -593,11 +597,21 @@ message LinuxContainerConfig {
LinuxContainerSecurityContext security_context = 2;
}
// WindowsContainerSecurityContext holds windows security configuration that will be applied to a container.
message WindowsContainerSecurityContext {
// User name to run the container process as. If specified, the user MUST
// exist in the container image and be resolved there by the runtime;
// otherwise, the runtime MUST return error.
string run_as_username = 1;
}
// WindowsContainerConfig contains platform-specific configuration for
// Windows-based containers.
message WindowsContainerConfig {
// Resources specification for the container.
WindowsContainerResources resources = 1;
// WindowsContainerSecurityContext configuration for the container.
WindowsContainerSecurityContext security_context = 2;
}
// WindowsContainerResources specifies Windows specific configuration for
@ -682,7 +696,7 @@ message ContainerConfig {
// the log (STDOUT and STDERR) on the host.
// E.g.,
// PodSandboxConfig.LogDirectory = `/var/log/pods/<podUID>/`
// ContainerConfig.LogPath = `containerName_Instance#.log`
// ContainerConfig.LogPath = `containerName/Instance#.log`
//
// WARNING: Log management and how kubelet should interface with the
// container logs are under active discussion in
@ -1043,7 +1057,8 @@ message RemoveImageRequest {
message RemoveImageResponse {}
message NetworkConfig {
// CIDR to use for pod IP addresses.
// CIDR to use for pod IP addresses. If the CIDR is empty, runtimes
// should omit it.
string pod_cidr = 1;
}

View file

@ -19,7 +19,7 @@ package apis
const (
// When kubelet is started with the "external" cloud provider, then
// it sets this annotation on the node to denote an ip address set from the
// cmd line flag. This ip is verified with the cloudprovider as valid by
// cmd line flag (--node-ip). This ip is verified with the cloudprovider as valid by
// the cloud-controller-manager
AnnotationProvidedIPAddr = "alpha.kubernetes.io/provided-node-ip"
)

View file

@ -14,49 +14,13 @@ go_library(
"runtime_cache.go",
"runtime_cache_fake.go",
"sync_result.go",
] + select({
"@io_bazel_rules_go//go/platform:android": [
"pty_unsupported.go",
],
"@io_bazel_rules_go//go/platform:darwin": [
"pty_unsupported.go",
],
"@io_bazel_rules_go//go/platform:dragonfly": [
"pty_unsupported.go",
],
"@io_bazel_rules_go//go/platform:freebsd": [
"pty_unsupported.go",
],
"@io_bazel_rules_go//go/platform:linux": [
"pty_linux.go",
],
"@io_bazel_rules_go//go/platform:nacl": [
"pty_unsupported.go",
],
"@io_bazel_rules_go//go/platform:netbsd": [
"pty_unsupported.go",
],
"@io_bazel_rules_go//go/platform:openbsd": [
"pty_unsupported.go",
],
"@io_bazel_rules_go//go/platform:plan9": [
"pty_unsupported.go",
],
"@io_bazel_rules_go//go/platform:solaris": [
"pty_unsupported.go",
],
"@io_bazel_rules_go//go/platform:windows": [
"pty_unsupported.go",
],
"//conditions:default": [],
}),
],
importpath = "k8s.io/kubernetes/pkg/kubelet/container",
visibility = ["//visibility:public"],
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/kubelet/util/ioutils:go_default_library",
"//pkg/util/hash:go_default_library",
"//pkg/volume:go_default_library",
"//third_party/forked/golang/expansion:go_default_library",
@ -71,12 +35,7 @@ go_library(
"//vendor/k8s.io/client-go/tools/reference:go_default_library",
"//vendor/k8s.io/client-go/tools/remotecommand:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
] + select({
"@io_bazel_rules_go//go/platform:linux": [
"//vendor/github.com/kr/pty:go_default_library",
],
"//conditions:default": [],
}),
],
)
go_test(
@ -89,7 +48,6 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core/install:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",

View file

@ -17,12 +17,9 @@ limitations under the License.
package container
import (
"bytes"
"fmt"
"hash/adler32"
"hash/fnv"
"strings"
"time"
"github.com/golang/glog"
@ -33,7 +30,6 @@ import (
"k8s.io/client-go/tools/record"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
hashutil "k8s.io/kubernetes/pkg/util/hash"
"k8s.io/kubernetes/third_party/forked/golang/expansion"
)
@ -100,17 +96,6 @@ func HashContainer(container *v1.Container) uint64 {
return uint64(hash.Sum32())
}
// HashContainerLegacy returns the hash of the container. It is used to compare
// the running container with its desired spec.
// This is used by rktnetes and dockershim (for handling <=1.5 containers).
// TODO: Remove this function when kubernetes version is >=1.8 AND rktnetes
// update its hash function.
func HashContainerLegacy(container *v1.Container) uint64 {
hash := adler32.New()
hashutil.DeepHashObject(hash, *container)
return uint64(hash.Sum32())
}
// EnvVarsToMap constructs a map of environment name to value from a slice
// of env vars.
func EnvVarsToMap(envs []EnvVar) map[string]string {
@ -145,6 +130,11 @@ func ExpandContainerCommandOnlyStatic(containerCommand []string, envs []v1.EnvVa
return command
}
func ExpandContainerVolumeMounts(mount v1.VolumeMount, envs []EnvVar) (expandedSubpath string) {
mapping := expansion.MappingFuncFor(EnvVarsToMap(envs))
return expansion.Expand(mount.SubPath, mapping)
}
func ExpandContainerCommandAndArgs(container *v1.Container, envs []EnvVar) (command []string, args []string) {
mapping := expansion.MappingFuncFor(EnvVarsToMap(envs))
@ -205,6 +195,13 @@ func (irecorder *innerEventRecorder) PastEventf(object runtime.Object, timestamp
}
}
func (irecorder *innerEventRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
if ref, ok := irecorder.shouldRecordEvent(object); ok {
irecorder.recorder.AnnotatedEventf(ref, annotations, eventtype, reason, messageFmt, args...)
}
}
// Pod must not be nil.
func IsHostNetworkPod(pod *v1.Pod) bool {
return pod.Spec.HostNetwork
@ -265,26 +262,6 @@ func FormatPod(pod *Pod) string {
return fmt.Sprintf("%s_%s(%s)", pod.Name, pod.Namespace, pod.ID)
}
type containerCommandRunnerWrapper struct {
DirectStreamingRuntime
}
var _ ContainerCommandRunner = &containerCommandRunnerWrapper{}
func DirectStreamingRunner(runtime DirectStreamingRuntime) ContainerCommandRunner {
return &containerCommandRunnerWrapper{runtime}
}
func (r *containerCommandRunnerWrapper) RunInContainer(id ContainerID, cmd []string, timeout time.Duration) ([]byte, error) {
var buffer bytes.Buffer
output := ioutils.WriteCloserWrapper(&buffer)
err := r.ExecInContainer(id, cmd, nil, output, output, false, nil, timeout)
// Even if err is non-nil, there still may be output (e.g. the exec wrote to stdout or stderr but
// the command returned a nonzero exit code). Therefore, always return the output along with the
// error.
return buffer.Bytes(), err
}
// GetContainerSpec gets the container spec by containerName.
func GetContainerSpec(pod *v1.Pod, containerName string) *v1.Container {
for i, c := range pod.Spec.Containers {
@ -312,21 +289,6 @@ func HasPrivilegedContainer(pod *v1.Pod) bool {
return false
}
// MakeCapabilities creates string slices from Capability slices
func MakeCapabilities(capAdd []v1.Capability, capDrop []v1.Capability) ([]string, []string) {
var (
addCaps []string
dropCaps []string
)
for _, cap := range capAdd {
addCaps = append(addCaps, string(cap))
}
for _, cap := range capDrop {
dropCaps = append(dropCaps, string(cap))
}
return addCaps, dropCaps
}
// MakePortMappings creates internal port mapping from api port mapping.
func MakePortMappings(container *v1.Container) (ports []PortMapping) {
names := make(map[string]struct{})

View file

@ -43,7 +43,7 @@ type OSInterface interface {
// RealOS is used to dispatch the real system level operations.
type RealOS struct{}
// MkDir will will call os.Mkdir to create a directory.
// MkdirAll will call os.MkdirAll to create a directory.
func (RealOS) MkdirAll(path string, perm os.FileMode) error {
return os.MkdirAll(path, perm)
}

View file

@ -1,30 +0,0 @@
// +build linux
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"os"
"os/exec"
"github.com/kr/pty"
)
func StartPty(c *exec.Cmd) (*os.File, error) {
return pty.Start(c)
}

View file

@ -1,28 +0,0 @@
// +build !linux
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"os"
"os/exec"
)
func StartPty(c *exec.Cmd) (pty *os.File, err error) {
return nil, nil
}

View file

@ -32,11 +32,7 @@ func HandleResizing(resize <-chan remotecommand.TerminalSize, resizeFunc func(si
go func() {
defer runtime.HandleCrash()
for {
size, ok := <-resize
if !ok {
return
}
for size := range resize {
if size.Height < 1 || size.Width < 1 {
continue
}

View file

@ -95,7 +95,7 @@ type Runtime interface {
// it is useful when doing SIGKILL for hard eviction scenarios, or max grace period during soft eviction scenarios.
KillPod(pod *v1.Pod, runningPod Pod, gracePeriodOverride *int64) error
// GetPodStatus retrieves the status of the pod, including the
// information of all containers in the pod that are visble in Runtime.
// information of all containers in the pod that are visible in Runtime.
GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, error)
// Returns the filesystem path of the pod's network namespace; if the
// runtime does not handle namespace creation itself, or cannot return
@ -124,22 +124,10 @@ type Runtime interface {
UpdatePodCIDR(podCIDR string) error
}
// DirectStreamingRuntime is the interface implemented by runtimes for which the streaming calls
// (exec/attach/port-forward) should be served directly by the Kubelet.
type DirectStreamingRuntime interface {
// Runs the command in the container of the specified pod. Attaches
// the processes stdin, stdout, and stderr. Optionally uses a tty.
ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error
// Forward the specified port from the specified pod to the stream.
PortForward(pod *Pod, port int32, stream io.ReadWriteCloser) error
// ContainerAttach encapsulates the attaching to containers for testability
ContainerAttacher
}
// IndirectStreamingRuntime is the interface implemented by runtimes that handle the serving of the
// StreamingRuntime is the interface implemented by runtimes that handle the serving of the
// streaming calls (exec/attach/port-forward) themselves. In this case, Kubelet should redirect to
// the runtime server.
type IndirectStreamingRuntime interface {
type StreamingRuntime interface {
GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error)
GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error)
GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error)
@ -198,7 +186,7 @@ type PodPair struct {
// ContainerID is a type that identifies a container.
type ContainerID struct {
// The type of the container runtime. e.g. 'docker', 'rkt'.
// The type of the container runtime. e.g. 'docker'.
Type string
// The identification of the container, this is comsumable by
// the underlying container runtime. (Note that the container
@ -444,8 +432,6 @@ type RunContainerOptions struct {
// this directory will be used to create and mount the log file to
// container.TerminationMessagePath
PodContainerDir string
// The parent cgroup to pass to Docker
CgroupParent string
// The type of container rootfs
ReadOnly bool
// hostname for pod containers

View file

@ -27,7 +27,6 @@ import (
"os"
"path"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
@ -61,6 +60,7 @@ import (
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/configmap"
@ -69,8 +69,6 @@ import (
dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/gpu"
"k8s.io/kubernetes/pkg/kubelet/gpu/nvidia"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
@ -78,7 +76,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/logs"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/network/dns"
"k8s.io/kubernetes/pkg/kubelet/pleg"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
@ -86,7 +83,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/remote"
"k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/secret"
"k8s.io/kubernetes/pkg/kubelet/server"
serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
@ -94,13 +90,17 @@ import (
"k8s.io/kubernetes/pkg/kubelet/stats"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/sysctl"
"k8s.io/kubernetes/pkg/kubelet/token"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/manager"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/security/apparmor"
sysctlwhitelist "k8s.io/kubernetes/pkg/security/podsecuritypolicy/sysctl"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
kubeio "k8s.io/kubernetes/pkg/util/io"
utilipt "k8s.io/kubernetes/pkg/util/iptables"
@ -108,6 +108,7 @@ import (
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi"
utilexec "k8s.io/utils/exec"
)
@ -221,7 +222,8 @@ type Builder func(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
seccompProfileRoot string,
bootstrapCheckpointPath string) (Bootstrap, error)
bootstrapCheckpointPath string,
nodeStatusMaxImages int32) (Bootstrap, error)
// Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed
// at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
@ -241,7 +243,6 @@ type Dependencies struct {
KubeClient clientset.Interface
ExternalKubeClient clientset.Interface
Mounter mount.Interface
NetworkPlugins []network.NetworkPlugin
OOMAdjuster *oom.OOMAdjuster
OSInterface kubecontainer.OSInterface
PodConfig *config.PodConfig
@ -347,7 +348,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
seccompProfileRoot string,
bootstrapCheckpointPath string) (*Kubelet, error) {
bootstrapCheckpointPath string,
nodeStatusMaxImages int32) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
@ -444,6 +446,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
KernelMemcgNotification: experimentalKernelMemcgNotification,
PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(),
}
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
@ -514,20 +517,22 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
nodeRef: nodeRef,
nodeLabels: nodeLabels,
nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
os: kubeDeps.OSInterface,
oomWatcher: oomWatcher,
cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
cgroupRoot: kubeCfg.CgroupRoot,
mounter: kubeDeps.Mounter,
writer: kubeDeps.Writer,
maxPods: int(kubeCfg.MaxPods),
podsPerCore: int(kubeCfg.PodsPerCore),
syncLoopMonitor: atomic.Value{},
daemonEndpoints: daemonEndpoints,
containerManager: kubeDeps.ContainerManager,
containerRuntimeName: containerRuntime,
nodeIP: parsedNodeIP,
clock: clock.RealClock{},
os: kubeDeps.OSInterface,
oomWatcher: oomWatcher,
cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
cgroupRoot: kubeCfg.CgroupRoot,
mounter: kubeDeps.Mounter,
writer: kubeDeps.Writer,
maxPods: int(kubeCfg.MaxPods),
podsPerCore: int(kubeCfg.PodsPerCore),
syncLoopMonitor: atomic.Value{},
daemonEndpoints: daemonEndpoints,
containerManager: kubeDeps.ContainerManager,
containerRuntimeName: containerRuntime,
redirectContainerStreaming: crOptions.RedirectContainerStreaming,
nodeIP: parsedNodeIP,
nodeIPValidator: validateNodeIP,
clock: clock.RealClock{},
enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
iptClient: utilipt.New(utilexec.New(), utildbus.New(), utilipt.ProtocolIpv4),
makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
@ -535,33 +540,29 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
iptablesDropBit: int(kubeCfg.IPTablesDropBit),
experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
nodeStatusMaxImages: nodeStatusMaxImages,
enablePluginsWatcher: utilfeature.DefaultFeatureGate.Enabled(features.KubeletPluginsWatcher),
}
if klet.cloud != nil {
klet.cloudproviderRequestParallelism = make(chan int, 1)
klet.cloudproviderRequestSync = make(chan int)
// TODO(jchaloup): Make it configurable via --cloud-provider-request-timeout
klet.cloudproviderRequestTimeout = 10 * time.Second
}
secretManager := secret.NewCachingSecretManager(
kubeDeps.KubeClient, secret.GetObjectTTLFromNodeFunc(klet.GetNode))
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
klet.secretManager = secretManager
configMapManager := configmap.NewCachingConfigMapManager(
kubeDeps.KubeClient, configmap.GetObjectTTLFromNodeFunc(klet.GetNode))
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
klet.configMapManager = configMapManager
if klet.experimentalHostUserNamespaceDefaulting {
glog.Infof("Experimental host user namespace defaulting is enabled.")
}
hairpinMode, err := effectiveHairpinMode(kubeletconfiginternal.HairpinMode(kubeCfg.HairpinMode), containerRuntime, crOptions.NetworkPluginName)
if err != nil {
// This is a non-recoverable error. Returning it up the callstack will just
// lead to retries of the same failure, so just fail hard.
glog.Fatalf("Invalid hairpin mode: %v", err)
}
glog.Infof("Hairpin mode set to %q", hairpinMode)
plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, crOptions.NetworkPluginName, &criNetworkHost{&networkHost{klet}, &network.NoopPortMappingGetter{}}, hairpinMode, nonMasqueradeCIDR, int(crOptions.NetworkPluginMTU))
if err != nil {
return nil, err
}
klet.networkPlugin = plug
machineInfo, err := klet.cadvisor.MachineInfo()
if err != nil {
return nil, err
@ -573,8 +574,15 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.livenessManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()
var checkpointManager checkpointmanager.CheckpointManager
if bootstrapCheckpointPath != "" {
checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
}
}
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager)
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)
if remoteRuntimeEndpoint != "" {
// remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified
@ -585,161 +593,110 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
// TODO: These need to become arguments to a standalone docker shim.
pluginSettings := dockershim.NetworkPluginSettings{
HairpinMode: hairpinMode,
NonMasqueradeCIDR: nonMasqueradeCIDR,
PluginName: crOptions.NetworkPluginName,
PluginConfDir: crOptions.CNIConfDir,
PluginBinDir: crOptions.CNIBinDir,
MTU: int(crOptions.NetworkPluginMTU),
HairpinMode: kubeletconfiginternal.HairpinMode(kubeCfg.HairpinMode),
NonMasqueradeCIDR: nonMasqueradeCIDR,
PluginName: crOptions.NetworkPluginName,
PluginConfDir: crOptions.CNIConfDir,
PluginBinDirString: crOptions.CNIBinDir,
MTU: int(crOptions.NetworkPluginMTU),
}
klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration)
// Remote runtime shim just cannot talk back to kubelet, so it doesn't
// support bandwidth shaping or hostports till #35457. To enable legacy
// features, replace with networkHost.
var nl *NoOpLegacyHost
pluginSettings.LegacyRuntimeHost = nl
if containerRuntime == kubetypes.RktContainerRuntime {
glog.Warningln("rktnetes has been deprecated in favor of rktlet. Please see https://github.com/kubernetes-incubator/rktlet for more information.")
if containerRuntime == "rkt" {
glog.Fatalln("rktnetes has been deprecated in favor of rktlet. Please see https://github.com/kubernetes-incubator/rktlet for more information.")
}
// rktnetes cannot be run with CRI.
if containerRuntime != kubetypes.RktContainerRuntime {
// kubelet defers to the runtime shim to setup networking. Setting
// this to nil will prevent it from trying to invoke the plugin.
// It's easier to always probe and initialize plugins till cri
// becomes the default.
klet.networkPlugin = nil
// if left at nil, that means it is unneeded
var legacyLogProvider kuberuntime.LegacyLogProvider
// if left at nil, that means it is unneeded
var legacyLogProvider kuberuntime.LegacyLogProvider
switch containerRuntime {
case kubetypes.DockerContainerRuntime:
// Create and start the CRI shim running as a grpc server.
streamingConfig := getStreamingConfig(kubeCfg, kubeDeps)
ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
&pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory,
crOptions.DockerDisableSharedPID)
if err != nil {
return nil, err
}
// For now, the CRI shim redirects the streaming requests to the
// kubelet, which handles the requests using DockerService..
switch containerRuntime {
case kubetypes.DockerContainerRuntime:
// Create and start the CRI shim running as a grpc server.
streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
&pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory,
crOptions.DockerDisableSharedPID, !crOptions.RedirectContainerStreaming)
if err != nil {
return nil, err
}
if crOptions.RedirectContainerStreaming {
klet.criHandler = ds
// The unix socket for kubelet <-> dockershim communication.
glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
remoteRuntimeEndpoint,
remoteImageEndpoint)
glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
if err := server.Start(); err != nil {
return nil, err
}
// Create dockerLegacyService when the logging driver is not supported.
supported, err := ds.IsCRISupportedLogDriver()
if err != nil {
return nil, err
}
if !supported {
klet.dockerLegacyService = ds
legacyLogProvider = ds
}
case kubetypes.RemoteContainerRuntime:
// No-op.
break
default:
return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
}
runtimeService, imageService, err := getRuntimeAndImageServices(remoteRuntimeEndpoint, remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout)
// The unix socket for kubelet <-> dockershim communication.
glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
remoteRuntimeEndpoint,
remoteImageEndpoint)
glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
if err := server.Start(); err != nil {
return nil, err
}
// Create dockerLegacyService when the logging driver is not supported.
supported, err := ds.IsCRISupportedLogDriver()
if err != nil {
return nil, err
}
klet.runtimeService = runtimeService
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
seccompProfileRoot,
containerRefManager,
machineInfo,
klet,
kubeDeps.OSInterface,
klet,
httpClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
kubeCfg.CPUCFSQuota,
runtimeService,
imageService,
kubeDeps.ContainerManager.InternalContainerLifecycle(),
legacyLogProvider,
)
if err != nil {
return nil, err
if !supported {
klet.dockerLegacyService = ds
legacyLogProvider = ds
}
klet.containerRuntime = runtime
klet.runner = runtime
case kubetypes.RemoteContainerRuntime:
// No-op.
break
default:
return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
}
runtimeService, imageService, err := getRuntimeAndImageServices(remoteRuntimeEndpoint, remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout)
if err != nil {
return nil, err
}
klet.runtimeService = runtimeService
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
seccompProfileRoot,
containerRefManager,
machineInfo,
klet,
kubeDeps.OSInterface,
klet,
httpClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
kubeCfg.CPUCFSQuota,
runtimeService,
imageService,
kubeDeps.ContainerManager.InternalContainerLifecycle(),
legacyLogProvider,
)
if err != nil {
return nil, err
}
klet.containerRuntime = runtime
klet.streamingRuntime = runtime
klet.runner = runtime
if cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint) {
klet.StatsProvider = stats.NewCadvisorStatsProvider(
klet.cadvisor,
klet.resourceAnalyzer,
klet.podManager,
klet.runtimeCache,
klet.containerRuntime)
} else {
klet.StatsProvider = stats.NewCRIStatsProvider(
klet.cadvisor,
klet.resourceAnalyzer,
klet.podManager,
klet.runtimeCache,
runtimeService,
imageService,
stats.NewLogMetricsService())
}
} else {
// rkt uses the legacy, non-CRI, integration. Configure it the old way.
// TODO: Include hairpin mode settings in rkt?
conf := &rkt.Config{
Path: crOptions.RktPath,
Stage1Image: crOptions.RktStage1Image,
InsecureOptions: "image,ondisk",
}
runtime, err := rkt.New(
crOptions.RktAPIEndpoint,
conf,
klet,
kubeDeps.Recorder,
containerRefManager,
klet,
klet.livenessManager,
httpClient,
klet.networkPlugin,
hairpinMode == kubeletconfiginternal.HairpinVeth,
utilexec.New(),
kubecontainer.RealOS{},
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
kubeCfg.RuntimeRequestTimeout.Duration,
)
if err != nil {
return nil, err
}
klet.containerRuntime = runtime
klet.runner = kubecontainer.DirectStreamingRunner(runtime)
if cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint) {
klet.StatsProvider = stats.NewCadvisorStatsProvider(
klet.cadvisor,
klet.resourceAnalyzer,
klet.podManager,
klet.runtimeCache,
klet.containerRuntime)
} else {
klet.StatsProvider = stats.NewCRIStatsProvider(
klet.cadvisor,
klet.resourceAnalyzer,
klet.podManager,
klet.runtimeCache,
runtimeService,
imageService,
stats.NewLogMetricsService())
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
@ -779,21 +736,29 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
if utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) && kubeDeps.TLSOptions != nil {
var ips []net.IP
cfgAddress := net.ParseIP(kubeCfg.Address)
if cfgAddress == nil || cfgAddress.IsUnspecified() {
localIPs, err := allLocalIPsWithoutLoopback()
if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
var (
ips []net.IP
names []string
)
// If the address was explicitly configured, use that. Otherwise, try to
// discover addresses from the cloudprovider. Otherwise, make a best guess.
if cfgAddress := net.ParseIP(kubeCfg.Address); cfgAddress != nil && !cfgAddress.IsUnspecified() {
ips = []net.IP{cfgAddress}
names = []string{klet.GetHostname(), hostnameOverride}
} else if len(cloudIPs) != 0 || len(cloudNames) != 0 {
ips = cloudIPs
names = cloudNames
} else {
localIPs, err := allGlobalUnicastIPs()
if err != nil {
return nil, err
}
ips = localIPs
} else {
ips = []net.IP{cfgAddress}
names = []string{klet.GetHostname(), hostnameOverride}
}
ips = append(ips, cloudIPs...)
names := append([]string{klet.GetHostname(), hostnameOverride}, cloudNames...)
klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, ips, names, certDirectory)
if err != nil {
return nil, fmt.Errorf("failed to initialize certificate manager: %v", err)
@ -814,11 +779,16 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
containerRefManager,
kubeDeps.Recorder)
tokenManager := token.NewManager(kubeDeps.KubeClient)
klet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
if err != nil {
return nil, err
}
if klet.enablePluginsWatcher {
klet.pluginWatcher = pluginwatcher.NewWatcher(klet.getPluginsDir())
}
// If the experimentalMounterPathFlag is set, we do not want to
// check node capabilities since the mount path is not the default
@ -863,25 +833,23 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.evictionManager = evictionManager
klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
// add sysctl admission
runtimeSupport, err := sysctl.NewRuntimeAdmitHandler(klet.containerRuntime)
if err != nil {
return nil, err
if utilfeature.DefaultFeatureGate.Enabled(features.Sysctls) {
// add sysctl admission
runtimeSupport, err := sysctl.NewRuntimeAdmitHandler(klet.containerRuntime)
if err != nil {
return nil, err
}
// Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec.
// Hence, we concatenate those two lists.
safeAndUnsafeSysctls := append(sysctlwhitelist.SafeSysctlWhitelist(), allowedUnsafeSysctls...)
sysctlsWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls)
if err != nil {
return nil, err
}
klet.admitHandlers.AddPodAdmitHandler(runtimeSupport)
klet.admitHandlers.AddPodAdmitHandler(sysctlsWhitelist)
}
safeWhitelist, err := sysctl.NewWhitelist(sysctl.SafeSysctlWhitelist(), v1.SysctlsPodAnnotationKey)
if err != nil {
return nil, err
}
// Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec
// Hence, we concatenate those two lists.
safeAndUnsafeSysctls := append(sysctl.SafeSysctlWhitelist(), allowedUnsafeSysctls...)
unsafeWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls, v1.UnsafeSysctlsPodAnnotationKey)
if err != nil {
return nil, err
}
klet.admitHandlers.AddPodAdmitHandler(runtimeSupport)
klet.admitHandlers.AddPodAdmitHandler(safeWhitelist)
klet.admitHandlers.AddPodAdmitHandler(unsafeWhitelist)
// enable active deadline handler
activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
@ -901,20 +869,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.appArmorValidator = apparmor.NewValidator(containerRuntime)
klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewNoNewPrivsAdmitHandler(klet.containerRuntime))
if utilfeature.DefaultFeatureGate.Enabled(features.Accelerators) {
if containerRuntime == kubetypes.DockerContainerRuntime {
glog.Warningln("Accelerators feature is deprecated and will be removed in v1.11. Please use device plugins instead. They can be enabled using the DevicePlugins feature gate.")
if klet.gpuManager, err = nvidia.NewNvidiaGPUManager(klet, kubeDeps.DockerClientConfig); err != nil {
return nil, err
}
} else {
glog.Errorf("Accelerators feature is supported with docker runtime only. Disabling this feature internally.")
}
}
// Set GPU manager to a stub implementation if it is not enabled or cannot be supported.
if klet.gpuManager == nil {
klet.gpuManager = gpu.NewGPUManagerStub()
}
// Finally, put the most recent version of the config on the Kubelet, so
// people can see how it was configured.
klet.kubeletConfiguration = *kubeCfg
@ -994,9 +948,6 @@ type Kubelet struct {
// Volume plugins.
volumePluginMgr *volume.VolumePluginMgr
// Network plugin.
networkPlugin network.NetworkPlugin
// Handles container probing.
probeManager prober.Manager
// Manages container health check results.
@ -1043,6 +994,14 @@ type Kubelet struct {
// Cloud provider interface.
cloud cloudprovider.Interface
// To keep exclusive access to the cloudproviderRequestParallelism
cloudproviderRequestMux sync.Mutex
// Keep the count of requests processed in parallel (expected to be 1 at most at a given time)
cloudproviderRequestParallelism chan int
// Sync with finished requests
cloudproviderRequestSync chan int
// Request timeout
cloudproviderRequestTimeout time.Duration
// Indicates that the node initialization happens in an external cloud controller
externalCloudProvider bool
@ -1052,9 +1011,15 @@ type Kubelet struct {
// The name of the container runtime
containerRuntimeName string
// redirectContainerStreaming enables container streaming redirect.
redirectContainerStreaming bool
// Container runtime.
containerRuntime kubecontainer.Runtime
// Streaming runtime handles container streaming.
streamingRuntime kubecontainer.StreamingRuntime
// Container runtime service (needed by container runtime Start()).
// TODO(CD): try to make this available without holding a reference in this
// struct. For example, by adding a getter to generic runtime.
@ -1131,6 +1096,9 @@ type Kubelet struct {
// If non-nil, use this IP address for the node
nodeIP net.IP
// use this function to validate the kubelet nodeIP
nodeIPValidator func(net.IP) error
// If non-nil, this is a unique identifier for the node in an external database, eg. cloudprovider
providerID string
@ -1190,9 +1158,6 @@ type Kubelet struct {
// experimental behavior is desired.
experimentalHostUserNamespaceDefaulting bool
// GPU Manager
gpuManager gpu.GPUManager
// dockerLegacyService contains some legacy methods for backward compatibility.
// It should be set only when docker is using non json-file logging driver.
dockerLegacyService dockershim.DockerLegacyService
@ -1200,15 +1165,23 @@ type Kubelet struct {
// StatsProvider provides the node and the container stats.
*stats.StatsProvider
// containerized should be set to true if the kubelet is running in a container
containerized bool
// This flag, if set, instructs the kubelet to keep volumes from terminated pods mounted to the node.
// This can be useful for debugging volume related issues.
keepTerminatedPodVolumes bool // DEPRECATED
// pluginwatcher is a utility for Kubelet to register different types of node-level plugins
// such as device plugins or CSI plugins. It discovers plugins by monitoring inotify events under the
// directory returned by kubelet.getPluginsDir()
pluginWatcher pluginwatcher.Watcher
// This flag sets a maximum number of images to report in the node status.
nodeStatusMaxImages int32
// This flag indicates that kubelet should start plugin watcher utility server for discovering Kubelet plugins
enablePluginsWatcher bool
}
func allLocalIPsWithoutLoopback() ([]net.IP, error) {
func allGlobalUnicastIPs() ([]net.IP, error) {
interfaces, err := net.Interfaces()
if err != nil {
return nil, fmt.Errorf("could not list network interfaces: %v", err)
@ -1222,7 +1195,7 @@ func allLocalIPsWithoutLoopback() ([]net.IP, error) {
for _, address := range addresses {
switch v := address.(type) {
case *net.IPNet:
if !v.IP.IsLoopback() {
if v.IP.IsGlobalUnicast() {
ips = append(ips, v.IP)
}
}
@ -1271,6 +1244,14 @@ func (kl *Kubelet) StartGarbageCollection() {
}
}, ContainerGCPeriod, wait.NeverStop)
stopChan := make(chan struct{})
defer close(stopChan)
// when the high threshold is set to 100, stub the image GC manager
if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
glog.V(2).Infof("ImageGCHighThresholdPercent is set 100, Disable image GC")
go func() { stopChan <- struct{}{} }()
}
prevImageGCFailed := false
go wait.Until(func() {
if err := kl.imageManager.GarbageCollect(); err != nil {
@ -1291,7 +1272,7 @@ func (kl *Kubelet) StartGarbageCollection() {
glog.V(vLevel).Infof("Image garbage collection succeeded")
}
}, ImageGCPeriod, wait.NeverStop)
}, ImageGCPeriod, stopChan)
}
// initializeModules will initialize internal modules that do not require the container runtime to be up.
@ -1315,8 +1296,8 @@ func (kl *Kubelet) initializeModules() error {
// Start the image manager.
kl.imageManager.Start()
// Start the certificate manager.
if utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
// Start the certificate manager if it was enabled.
if kl.serverCertificateManager != nil {
kl.serverCertificateManager.Start()
}
@ -1325,11 +1306,6 @@ func (kl *Kubelet) initializeModules() error {
return fmt.Errorf("Failed to start OOM watcher %v", err)
}
// Initialize GPUs
if err := kl.gpuManager.Start(); err != nil {
glog.Errorf("Failed to start gpuManager %v", err)
}
// Start resource analyzer
kl.resourceAnalyzer.Start()
@ -1364,6 +1340,16 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
// container log manager must start after container runtime is up to retrieve information from container runtime
// and inform container to reopen log file after log rotation.
kl.containerLogManager.Start()
if kl.enablePluginsWatcher {
// Adding Registration Callback function for CSI Driver
kl.pluginWatcher.AddHandler("CSIPlugin", csi.RegistrationCallback)
// Start the plugin watcher
glog.V(4).Infof("starting watcher")
if err := kl.pluginWatcher.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
glog.Fatalf("failed to start Plugin Watcher. err: %v", err)
}
}
}
// Run starts the kubelet reacting to config updates
@ -1387,7 +1373,6 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
}
go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Start loop to sync iptables util rules
@ -1413,13 +1398,6 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
kl.syncLoop(updates, kl)
}
// GetKubeClient returns the Kubernetes client.
// TODO: This is currently only required by network plugins. Replace
// with more specific methods.
func (kl *Kubelet) GetKubeClient() clientset.Interface {
return kl.kubeClient
}
// syncPod is the transaction script for the sync of a single pod.
//
// Arguments:
@ -2071,11 +2049,18 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
// HandlePodReconcile is the callback in the SyncHandler interface for pods
// that should be reconciled.
func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
// Update the pod in pod manager, status manager will do periodically reconcile according
// to the pod manager.
kl.podManager.UpdatePod(pod)
// Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
if status.NeedToReconcilePodReadiness(pod) {
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
}
// After an evicted pod is synced, all dead containers in the pod can be removed.
if eviction.PodIsEvicted(pod.Status) {
if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
@ -2114,53 +2099,35 @@ func (kl *Kubelet) updateRuntimeUp() {
glog.Errorf("Container runtime sanity check failed: %v", err)
return
}
// rkt uses the legacy, non-CRI integration. Don't check the runtime
// conditions for it.
if kl.containerRuntimeName != kubetypes.RktContainerRuntime {
if s == nil {
glog.Errorf("Container runtime status is nil")
return
}
// Periodically log the whole runtime status for debugging.
// TODO(random-liu): Consider to send node event when optional
// condition is unmet.
glog.V(4).Infof("Container runtime status: %v", s)
networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
if networkReady == nil || !networkReady.Status {
glog.Errorf("Container runtime network not ready: %v", networkReady)
kl.runtimeState.setNetworkState(fmt.Errorf("runtime network not ready: %v", networkReady))
} else {
// Set nil if the container runtime network is ready.
kl.runtimeState.setNetworkState(nil)
}
// TODO(random-liu): Add runtime error in runtimeState, and update it
// when runtime is not ready, so that the information in RuntimeReady
// condition will be propagated to NodeReady condition.
runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
// If RuntimeReady is not set or is false, report an error.
if runtimeReady == nil || !runtimeReady.Status {
glog.Errorf("Container runtime not ready: %v", runtimeReady)
return
}
if s == nil {
glog.Errorf("Container runtime status is nil")
return
}
// Periodically log the whole runtime status for debugging.
// TODO(random-liu): Consider to send node event when optional
// condition is unmet.
glog.V(4).Infof("Container runtime status: %v", s)
networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
if networkReady == nil || !networkReady.Status {
glog.Errorf("Container runtime network not ready: %v", networkReady)
kl.runtimeState.setNetworkState(fmt.Errorf("runtime network not ready: %v", networkReady))
} else {
// Set nil if the container runtime network is ready.
kl.runtimeState.setNetworkState(nil)
}
// TODO(random-liu): Add runtime error in runtimeState, and update it
// when runtime is not ready, so that the information in RuntimeReady
// condition will be propagated to NodeReady condition.
runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
// If RuntimeReady is not set or is false, report an error.
if runtimeReady == nil || !runtimeReady.Status {
glog.Errorf("Container runtime not ready: %v", runtimeReady)
return
}
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
kl.runtimeState.setRuntimeSync(kl.clock.Now())
}
// updateCloudProviderFromMachineInfo updates the node's provider ID field
// from the given cadvisor machine info.
func (kl *Kubelet) updateCloudProviderFromMachineInfo(node *v1.Node, info *cadvisorapi.MachineInfo) {
if info.CloudProvider != cadvisorapi.UnknownProvider &&
info.CloudProvider != cadvisorapi.Baremetal {
// The cloud providers from pkg/cloudprovider/providers/* that update ProviderID
// will use the format of cloudprovider://project/availability_zone/instance_name
// here we only have the cloudprovider and the instance name so we leave project
// and availability zone empty for compatibility.
node.Spec.ProviderID = strings.ToLower(string(info.CloudProvider)) +
":////" + string(info.InstanceID)
}
}
// GetConfiguration returns the KubeletConfiguration used to configure the kubelet.
func (kl *Kubelet) GetConfiguration() kubeletconfiginternal.KubeletConfiguration {
return kl.kubeletConfiguration
@ -2172,11 +2139,6 @@ func (kl *Kubelet) BirthCry() {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
}
// StreamingConnectionIdleTimeout returns the timeout for streaming connections to the HTTP server.
func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
return kl.streamingConnectionIdleTimeout
}
// ResyncInterval returns the interval used for periodic syncs.
func (kl *Kubelet) ResyncInterval() time.Duration {
return kl.resyncInterval
@ -2184,12 +2146,12 @@ func (kl *Kubelet) ResyncInterval() time.Duration {
// ListenAndServe runs the kubelet HTTP server.
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers, enableContentionProfiling bool) {
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, enableContentionProfiling, kl.containerRuntime, kl.criHandler)
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, enableContentionProfiling, kl.redirectContainerStreaming, kl.criHandler)
}
// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime)
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port)
}
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
@ -2213,19 +2175,23 @@ func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
}
// Gets the streaming server configuration to use with in-process CRI shims.
func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies) *streaming.Config {
func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, crOptions *config.ContainerRuntimeOptions) *streaming.Config {
config := &streaming.Config{
// Use a relative redirect (no scheme or host).
BaseURL: &url.URL{
Path: "/cri/",
},
StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols,
}
if kubeDeps.TLSOptions != nil {
config.TLSConfig = kubeDeps.TLSOptions.Config
if !crOptions.RedirectContainerStreaming {
config.Addr = net.JoinHostPort("localhost", "0")
} else {
// Use a relative redirect (no scheme or host).
config.BaseURL = &url.URL{
Path: "/cri/",
}
if kubeDeps.TLSOptions != nil {
config.TLSConfig = kubeDeps.TLSOptions.Config
}
}
return config
}

View file

@ -174,6 +174,16 @@ func (kl *Kubelet) GetPodByName(namespace, name string) (*v1.Pod, bool) {
return kl.podManager.GetPodByName(namespace, name)
}
// GetPodByCgroupfs provides the pod that maps to the specified cgroup, as well
// as whether the pod was found.
func (kl *Kubelet) GetPodByCgroupfs(cgroupfs string) (*v1.Pod, bool) {
pcm := kl.containerManager.NewPodContainerManager()
if result, podUID := pcm.IsPodCgroup(cgroupfs); result {
return kl.podManager.GetPodByUID(podUID)
}
return nil, false
}
// GetHostname Returns the hostname as the kubelet sees it.
func (kl *Kubelet) GetHostname() string {
return kl.hostname

View file

@ -21,12 +21,7 @@ import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
)
@ -45,106 +40,6 @@ const (
KubeFirewallChain utiliptables.Chain = "KUBE-FIREWALL"
)
// This just exports required functions from kubelet proper, for use by network
// plugins.
// TODO(#35457): get rid of this backchannel to the kubelet. The scope of
// the back channel is restricted to host-ports/testing, and restricted
// to kubenet. No other network plugin wrapper needs it. Other plugins
// only require a way to access namespace information, which they can do
// directly through the methods implemented by criNetworkHost.
type networkHost struct {
kubelet *Kubelet
}
func (nh *networkHost) GetPodByName(name, namespace string) (*v1.Pod, bool) {
return nh.kubelet.GetPodByName(name, namespace)
}
func (nh *networkHost) GetKubeClient() clientset.Interface {
return nh.kubelet.kubeClient
}
func (nh *networkHost) GetRuntime() kubecontainer.Runtime {
return nh.kubelet.getRuntime()
}
func (nh *networkHost) SupportsLegacyFeatures() bool {
return true
}
// criNetworkHost implements the part of network.Host required by the
// cri (NamespaceGetter). It leechs off networkHost for all other
// methods, because networkHost is slated for deletion.
type criNetworkHost struct {
*networkHost
// criNetworkHost currently support legacy features. Hence no need to support PortMappingGetter
*network.NoopPortMappingGetter
}
// GetNetNS returns the network namespace of the given containerID.
// This method satisfies the network.NamespaceGetter interface for
// networkHost. It's only meant to be used from network plugins
// that are directly invoked by the kubelet (aka: legacy, pre-cri).
// Any network plugin invoked by a cri must implement NamespaceGetter
// to talk directly to the runtime instead.
func (c *criNetworkHost) GetNetNS(containerID string) (string, error) {
return c.kubelet.getRuntime().GetNetNS(kubecontainer.ContainerID{Type: "", ID: containerID})
}
// NoOpLegacyHost implements the network.LegacyHost interface for the remote
// runtime shim by just returning empties. It doesn't support legacy features
// like host port and bandwidth shaping.
type NoOpLegacyHost struct{}
// GetPodByName always returns "nil, true" for 'NoOpLegacyHost'
func (n *NoOpLegacyHost) GetPodByName(namespace, name string) (*v1.Pod, bool) {
return nil, true
}
// GetKubeClient always returns "nil" for 'NoOpLegacyHost'
func (n *NoOpLegacyHost) GetKubeClient() clientset.Interface {
return nil
}
// getRuntime always returns "nil" for 'NoOpLegacyHost'
func (n *NoOpLegacyHost) GetRuntime() kubecontainer.Runtime {
return nil
}
// SupportsLegacyFeatures always returns "false" for 'NoOpLegacyHost'
func (n *NoOpLegacyHost) SupportsLegacyFeatures() bool {
return false
}
// effectiveHairpinMode determines the effective hairpin mode given the
// configured mode, container runtime, and whether cbr0 should be configured.
func effectiveHairpinMode(hairpinMode kubeletconfig.HairpinMode, containerRuntime string, networkPlugin string) (kubeletconfig.HairpinMode, error) {
// The hairpin mode setting doesn't matter if:
// - We're not using a bridge network. This is hard to check because we might
// be using a plugin.
// - It's set to hairpin-veth for a container runtime that doesn't know how
// to set the hairpin flag on the veth's of containers. Currently the
// docker runtime is the only one that understands this.
// - It's set to "none".
if hairpinMode == kubeletconfig.PromiscuousBridge || hairpinMode == kubeletconfig.HairpinVeth {
// Only on docker.
if containerRuntime != kubetypes.DockerContainerRuntime {
glog.Warningf("Hairpin mode set to %q but container runtime is %q, ignoring", hairpinMode, containerRuntime)
return kubeletconfig.HairpinNone, nil
}
if hairpinMode == kubeletconfig.PromiscuousBridge && networkPlugin != "kubenet" {
// This is not a valid combination, since promiscuous-bridge only works on kubenet. Users might be using the
// default values (from before the hairpin-mode flag existed) and we
// should keep the old behavior.
glog.Warningf("Hairpin mode set to %q but kubenet is not enabled, falling back to %q", hairpinMode, kubeletconfig.HairpinVeth)
return kubeletconfig.HairpinVeth, nil
}
} else if hairpinMode != kubeletconfig.HairpinNone {
return "", fmt.Errorf("unknown value: %q", hairpinMode)
}
return hairpinMode, nil
}
// providerRequiresNetworkingConfiguration returns whether the cloud provider
// requires special networking configuration.
func (kl *Kubelet) providerRequiresNetworkingConfiguration() bool {
@ -159,16 +54,6 @@ func (kl *Kubelet) providerRequiresNetworkingConfiguration() bool {
return supported
}
// syncNetworkStatus updates the network state
func (kl *Kubelet) syncNetworkStatus() {
// For cri integration, network state will be updated in updateRuntimeUp,
// we'll get runtime network status through cri directly.
// TODO: Remove this once we completely switch to cri integration.
if kl.networkPlugin != nil {
kl.runtimeState.setNetworkState(kl.networkPlugin.Status())
}
}
// updatePodCIDR updates the pod CIDR in the runtime state if it is different
// from the current CIDR.
func (kl *Kubelet) updatePodCIDR(cidr string) {
@ -178,16 +63,8 @@ func (kl *Kubelet) updatePodCIDR(cidr string) {
return
}
// kubelet -> network plugin
// cri runtime shims are responsible for their own network plugins
if kl.networkPlugin != nil {
details := make(map[string]interface{})
details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = cidr
kl.networkPlugin.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details)
}
// kubelet -> generic runtime -> runtime shim -> network plugin
// docker/rkt non-cri implementations have a passthrough UpdatePodCIDR
// docker/non-cri implementations have a passthrough UpdatePodCIDR
if err := kl.getRuntime().UpdatePodCIDR(cidr); err != nil {
glog.Errorf("Failed to update pod CIDR: %v", err)
return

View file

@ -49,9 +49,6 @@ import (
)
const (
// maxImagesInNodeStatus is the number of max images we store in image status.
maxImagesInNodeStatus = 50
// maxNamesPerImageInNodeStatus is max number of names per image stored in
// the node status.
maxNamesPerImageInNodeStatus = 5
@ -123,34 +120,35 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
return false
}
if existingNode.Spec.ExternalID == node.Spec.ExternalID {
glog.Infof("Node %s was previously registered", kl.nodeName)
glog.Infof("Node %s was previously registered", kl.nodeName)
// Edge case: the node was previously registered; reconcile
// the value of the controller-managed attach-detach
// annotation.
requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode)
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
if requiresUpdate {
if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err)
return false
}
// Edge case: the node was previously registered; reconcile
// the value of the controller-managed attach-detach
// annotation.
requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode)
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
if requiresUpdate {
if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err)
return false
}
return true
}
glog.Errorf("Previously node %q had externalID %q; now it is %q; will delete and recreate.",
kl.nodeName, node.Spec.ExternalID, existingNode.Spec.ExternalID,
)
if err := kl.kubeClient.CoreV1().Nodes().Delete(node.Name, nil); err != nil {
glog.Errorf("Unable to register node %q with API server: error deleting old node: %v", kl.nodeName, err)
} else {
glog.Infof("Deleted old node object %q", kl.nodeName)
}
return true
}
return false
// Zeros out extended resource capacity during reconciliation.
func (kl *Kubelet) reconcileExtendedResource(initialNode, node *v1.Node) bool {
requiresUpdate := false
for k := range node.Status.Capacity {
if v1helper.IsExtendedResourceName(k) {
node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
requiresUpdate = true
}
}
return requiresUpdate
}
// updateDefaultLabels will set the default labels on the node
@ -300,18 +298,10 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
return nil, fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO: ExternalID is deprecated, we'll have to drop this code
externalID, err := instances.ExternalID(context.TODO(), kl.nodeName)
if err != nil {
return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err)
}
node.Spec.ExternalID = externalID
// TODO: We can't assume that the node has credentials to talk to the
// cloudprovider from arbitrary nodes. At most, we should talk to a
// local metadata server here.
var err error
if node.Spec.ProviderID == "" {
node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(context.TODO(), kl.cloud, kl.nodeName)
if err != nil {
@ -343,14 +333,37 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
node.ObjectMeta.Labels[kubeletapis.LabelZoneRegion] = zone.Region
}
}
} else {
node.Spec.ExternalID = kl.hostname
}
kl.setNodeStatus(node)
return node, nil
}
// setVolumeLimits updates volume limits on the node
func (kl *Kubelet) setVolumeLimits(node *v1.Node) {
if node.Status.Capacity == nil {
node.Status.Capacity = v1.ResourceList{}
}
if node.Status.Allocatable == nil {
node.Status.Allocatable = v1.ResourceList{}
}
pluginWithLimits := kl.volumePluginMgr.ListVolumePluginWithLimits()
for _, volumePlugin := range pluginWithLimits {
attachLimits, err := volumePlugin.GetVolumeLimits()
if err != nil {
glog.V(4).Infof("Error getting volume limit for plugin %s", volumePlugin.GetPluginName())
continue
}
for limitKey, value := range attachLimits {
node.Status.Capacity[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI)
node.Status.Allocatable[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI)
}
}
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.
@ -369,6 +382,7 @@ func (kl *Kubelet) syncNodeStatus() {
// updateNodeStatus updates node status to master with retries.
func (kl *Kubelet) updateNodeStatus() error {
glog.V(5).Infof("Updating node status")
for i := 0; i < nodeStatusUpdateRetry; i++ {
if err := kl.tryUpdateNodeStatus(i); err != nil {
if i > 0 && kl.onRepeatedHeartbeatFailure != nil {
@ -405,7 +419,9 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
return fmt.Errorf("nil %q node object", kl.nodeName)
}
kl.updatePodCIDR(node.Spec.PodCIDR)
if node.Spec.PodCIDR != "" {
kl.updatePodCIDR(node.Spec.PodCIDR)
}
kl.setNodeStatus(node)
// Patch the current status on the API server
@ -431,7 +447,7 @@ func (kl *Kubelet) recordNodeStatusEvent(eventType, event string) {
// Set IP and hostname addresses for the node.
func (kl *Kubelet) setNodeAddress(node *v1.Node) error {
if kl.nodeIP != nil {
if err := validateNodeIP(kl.nodeIP); err != nil {
if err := kl.nodeIPValidator(kl.nodeIP); err != nil {
return fmt.Errorf("failed to validate nodeIP: %v", err)
}
glog.V(2).Infof("Using node IP: %q", kl.nodeIP.String())
@ -456,18 +472,57 @@ func (kl *Kubelet) setNodeAddress(node *v1.Node) error {
// to the cloud provider?
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
// TODO: If IP addresses couldn't be fetched from the cloud provider, should kubelet fallback on the other methods for getting the IP below?
nodeAddresses, err := instances.NodeAddresses(context.TODO(), kl.nodeName)
var nodeAddresses []v1.NodeAddress
var err error
// Make sure the instances.NodeAddresses returns even if the cloud provider API hangs for a long time
func() {
kl.cloudproviderRequestMux.Lock()
if len(kl.cloudproviderRequestParallelism) > 0 {
kl.cloudproviderRequestMux.Unlock()
return
}
kl.cloudproviderRequestParallelism <- 0
kl.cloudproviderRequestMux.Unlock()
go func() {
nodeAddresses, err = instances.NodeAddresses(context.TODO(), kl.nodeName)
kl.cloudproviderRequestMux.Lock()
<-kl.cloudproviderRequestParallelism
kl.cloudproviderRequestMux.Unlock()
kl.cloudproviderRequestSync <- 0
}()
}()
select {
case <-kl.cloudproviderRequestSync:
case <-time.After(kl.cloudproviderRequestTimeout):
err = fmt.Errorf("Timeout after %v", kl.cloudproviderRequestTimeout)
}
if err != nil {
return fmt.Errorf("failed to get node address from cloud provider: %v", err)
}
if kl.nodeIP != nil {
enforcedNodeAddresses := []v1.NodeAddress{}
var nodeIPType v1.NodeAddressType
for _, nodeAddress := range nodeAddresses {
if nodeAddress.Address == kl.nodeIP.String() {
enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address})
nodeIPType = nodeAddress.Type
break
}
}
if len(enforcedNodeAddresses) > 0 {
for _, nodeAddress := range nodeAddresses {
if nodeAddress.Type != nodeIPType && nodeAddress.Type != v1.NodeHostName {
enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address})
}
}
enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: v1.NodeHostName, Address: kl.GetHostname()})
node.Status.Addresses = enforcedNodeAddresses
return nil
@ -508,7 +563,7 @@ func (kl *Kubelet) setNodeAddress(node *v1.Node) error {
var addrs []net.IP
addrs, _ = net.LookupIP(node.Name)
for _, addr := range addrs {
if err = validateNodeIP(addr); err == nil {
if err = kl.nodeIPValidator(addr); err == nil {
if addr.To4() != nil {
ipAddr = addr
break
@ -543,14 +598,6 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
node.Status.Capacity = v1.ResourceList{}
}
// populate GPU capacity.
gpuCapacity := kl.gpuManager.Capacity()
if gpuCapacity != nil {
for k, v := range gpuCapacity {
node.Status.Capacity[k] = v
}
}
var devicePluginAllocatable v1.ResourceList
var devicePluginCapacity v1.ResourceList
var removedDevicePlugins []string
@ -602,7 +649,9 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = kl.containerManager.GetDevicePluginResourceCapacity()
if devicePluginCapacity != nil {
for k, v := range devicePluginCapacity {
glog.V(2).Infof("Update capacity for %s to %d", k, v.Value())
if old, ok := node.Status.Capacity[k]; !ok || old.Value() != v.Value() {
glog.V(2).Infof("Update capacity for %s to %d", k, v.Value())
}
node.Status.Capacity[k] = v
}
}
@ -645,9 +694,12 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
}
node.Status.Allocatable[k] = value
}
if devicePluginAllocatable != nil {
for k, v := range devicePluginAllocatable {
glog.V(2).Infof("Update allocatable for %s to %d", k, v.Value())
if old, ok := node.Status.Allocatable[k]; !ok || old.Value() != v.Value() {
glog.V(2).Infof("Update allocatable for %s to %d", k, v.Value())
}
node.Status.Allocatable[k] = v
}
}
@ -704,8 +756,9 @@ func (kl *Kubelet) setNodeStatusImages(node *v1.Node) {
return
}
// sort the images from max to min, and only set top N images into the node status.
if maxImagesInNodeStatus < len(containerImages) {
containerImages = containerImages[0:maxImagesInNodeStatus]
if int(kl.nodeStatusMaxImages) > -1 &&
int(kl.nodeStatusMaxImages) < len(containerImages) {
containerImages = containerImages[0:kl.nodeStatusMaxImages]
}
for _, image := range containerImages {
@ -736,6 +789,9 @@ func (kl *Kubelet) setNodeStatusInfo(node *v1.Node) {
kl.setNodeStatusDaemonEndpoints(node)
kl.setNodeStatusImages(node)
kl.setNodeStatusGoRuntime(node)
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
kl.setVolumeLimits(node)
}
}
// Set Ready condition for the node.
@ -1052,7 +1108,8 @@ func (kl *Kubelet) setNodeVolumesInUseStatus(node *v1.Node) {
// TODO(madhusudancs): Simplify the logic for setting node conditions and
// refactor the node status condition code out to a different file.
func (kl *Kubelet) setNodeStatus(node *v1.Node) {
for _, f := range kl.setNodeStatusFuncs {
for i, f := range kl.setNodeStatusFuncs {
glog.V(5).Infof("Setting node status at position %v", i)
if err := f(node); err != nil {
glog.Warningf("Failed to set some node status fields: %s", err)
}

View file

@ -30,7 +30,6 @@ import (
"sort"
"strings"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
@ -41,7 +40,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
utilvalidation "k8s.io/apimachinery/pkg/util/validation"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/remotecommand"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/api/v1/resource"
podshelper "k8s.io/kubernetes/pkg/apis/core/pods"
@ -60,7 +58,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
utilfile "k8s.io/kubernetes/pkg/util/file"
mountutil "k8s.io/kubernetes/pkg/util/mount"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
@ -68,6 +65,11 @@ import (
"k8s.io/kubernetes/third_party/forked/golang/expansion"
)
const (
managedHostsHeader = "# Kubernetes-managed hosts file.\n"
managedHostsHeaderWithHostNetwork = "# Kubernetes-managed hosts file (host network).\n"
)
// Get a list of pods that have data directories.
func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
podInfos, err := ioutil.ReadDir(kl.getPodsDir())
@ -90,26 +92,6 @@ func (kl *Kubelet) GetActivePods() []*v1.Pod {
return activePods
}
// makeGPUDevices determines the devices for the given container.
// Experimental.
func (kl *Kubelet) makeGPUDevices(pod *v1.Pod, container *v1.Container) ([]kubecontainer.DeviceInfo, error) {
if container.Resources.Limits.NvidiaGPU().IsZero() {
return nil, nil
}
nvidiaGPUPaths, err := kl.gpuManager.AllocateGPU(pod, container)
if err != nil {
return nil, err
}
var devices []kubecontainer.DeviceInfo
for _, path := range nvidiaGPUPaths {
// Devices have to be mapped one to one because of nvidia CUDA library requirements.
devices = append(devices, kubecontainer.DeviceInfo{PathOnHost: path, PathInContainer: path, Permissions: "mrw"})
}
return devices, nil
}
// makeBlockVolumes maps the raw block devices specified in the path of the container
// Experimental
func (kl *Kubelet) makeBlockVolumes(pod *v1.Pod, container *v1.Container, podVolumes kubecontainer.VolumeMap, blkutil volumepathhandler.BlockVolumePathHandler) ([]kubecontainer.DeviceInfo, error) {
@ -144,7 +126,8 @@ func (kl *Kubelet) makeBlockVolumes(pod *v1.Pod, container *v1.Container, podVol
}
// makeMounts determines the mount points for the given container.
func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, hostDomain, podIP string, podVolumes kubecontainer.VolumeMap, mounter mountutil.Interface) ([]kubecontainer.Mount, func(), error) {
func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, hostDomain, podIP string, podVolumes kubecontainer.VolumeMap, mounter mountutil.Interface, expandEnvs []kubecontainer.EnvVar) ([]kubecontainer.Mount, func(), error) {
// Kubernetes only mounts on /etc/hosts if:
// - container is not an infrastructure (pause) container
// - container is not already mounting on /etc/hosts
@ -181,6 +164,11 @@ func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, h
return nil, cleanupAction, fmt.Errorf("volume subpaths are disabled")
}
// Expand subpath variables
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeSubpathEnvExpansion) {
mount.SubPath = kubecontainer.ExpandContainerVolumeMounts(mount, expandEnvs)
}
if filepath.IsAbs(mount.SubPath) {
return nil, cleanupAction, fmt.Errorf("error SubPath `%s` must not be an absolute path", mount.SubPath)
}
@ -190,19 +178,10 @@ func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, h
return nil, cleanupAction, fmt.Errorf("unable to provision SubPath `%s`: %v", mount.SubPath, err)
}
fileinfo, err := os.Lstat(hostPath)
if err != nil {
return nil, cleanupAction, err
}
perm := fileinfo.Mode()
volumePath, err := filepath.EvalSymlinks(hostPath)
if err != nil {
return nil, cleanupAction, err
}
volumePath := hostPath
hostPath = filepath.Join(volumePath, mount.SubPath)
if subPathExists, err := utilfile.FileOrSymlinkExists(hostPath); err != nil {
if subPathExists, err := mounter.ExistsPath(hostPath); err != nil {
glog.Errorf("Could not determine if subPath %s exists; will not attempt to change its permissions", hostPath)
} else if !subPathExists {
// Create the sub path now because if it's auto-created later when referenced, it may have an
@ -210,10 +189,15 @@ func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, h
// when the pod specifies an fsGroup, and if the directory is not created here, Docker will
// later auto-create it with the incorrect mode 0750
// Make extra care not to escape the volume!
if err := mounter.SafeMakeDir(hostPath, volumePath, perm); err != nil {
glog.Errorf("failed to mkdir %q: %v", hostPath, err)
perm, err := mounter.GetMode(volumePath)
if err != nil {
return nil, cleanupAction, err
}
if err := mounter.SafeMakeDir(mount.SubPath, volumePath, perm); err != nil {
// Don't pass detailed error back to the user because it could give information about host filesystem
glog.Errorf("failed to create subPath directory for volumeMount %q of container %q: %v", mount.Name, container.Name, err)
return nil, cleanupAction, fmt.Errorf("failed to create subPath directory for volumeMount %q of container %q", mount.Name, container.Name)
}
}
hostPath, cleanupAction, err = mounter.PrepareSafeSubpath(mountutil.Subpath{
VolumeMountIndex: i,
@ -222,7 +206,6 @@ func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, h
VolumePath: volumePath,
PodDir: podDir,
ContainerName: container.Name,
ReadOnly: mount.ReadOnly || vol.Mounter.GetAttributes().ReadOnly,
})
if err != nil {
// Don't pass detailed error back to the user because it could give information about host filesystem
@ -342,15 +325,18 @@ func nodeHostsFileContent(hostsFilePath string, hostAliases []v1.HostAlias) ([]b
if err != nil {
return nil, err
}
hostsFileContent = append(hostsFileContent, hostsEntriesFromHostAliases(hostAliases)...)
return hostsFileContent, nil
var buffer bytes.Buffer
buffer.WriteString(managedHostsHeaderWithHostNetwork)
buffer.Write(hostsFileContent)
buffer.Write(hostsEntriesFromHostAliases(hostAliases))
return buffer.Bytes(), nil
}
// managedHostsFileContent generates the content of the managed etc hosts based on Pod IP and other
// information.
func managedHostsFileContent(hostIP, hostName, hostDomainName string, hostAliases []v1.HostAlias) []byte {
var buffer bytes.Buffer
buffer.WriteString("# Kubernetes-managed hosts file.\n")
buffer.WriteString(managedHostsHeader)
buffer.WriteString("127.0.0.1\tlocalhost\n") // ipv4 localhost
buffer.WriteString("::1\tlocalhost ip6-localhost ip6-loopback\n") // ipv6 localhost
buffer.WriteString("fe00::0\tip6-localnet\n")
@ -362,9 +348,8 @@ func managedHostsFileContent(hostIP, hostName, hostDomainName string, hostAliase
} else {
buffer.WriteString(fmt.Sprintf("%s\t%s\n", hostIP, hostName))
}
hostsFileContent := buffer.Bytes()
hostsFileContent = append(hostsFileContent, hostsEntriesFromHostAliases(hostAliases)...)
return hostsFileContent
buffer.Write(hostsEntriesFromHostAliases(hostAliases))
return buffer.Bytes()
}
func hostsEntriesFromHostAliases(hostAliases []v1.HostAlias) []byte {
@ -447,8 +432,6 @@ func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Contai
return nil, nil, err
}
cgroupParent := kl.GetPodCgroupParent(pod)
opts.CgroupParent = cgroupParent
hostname, hostDomainName, err := kl.GeneratePodHostNameAndDomain(pod)
if err != nil {
return nil, nil, err
@ -458,12 +441,6 @@ func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Contai
volumes := kl.volumeManager.GetMountedVolumesForPod(podName)
opts.PortMappings = kubecontainer.MakePortMappings(container)
// TODO(random-liu): Move following convert functions into pkg/kubelet/container
devices, err := kl.makeGPUDevices(pod, container)
if err != nil {
return nil, nil, err
}
opts.Devices = append(opts.Devices, devices...)
// TODO: remove feature gate check after no longer needed
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
@ -475,18 +452,18 @@ func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Contai
opts.Devices = append(opts.Devices, blkVolumes...)
}
mounts, cleanupAction, err := makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIP, volumes, kl.mounter)
envs, err := kl.makeEnvironmentVariables(pod, container, podIP)
if err != nil {
return nil, nil, err
}
opts.Envs = append(opts.Envs, envs...)
mounts, cleanupAction, err := makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIP, volumes, kl.mounter, opts.Envs)
if err != nil {
return nil, cleanupAction, err
}
opts.Mounts = append(opts.Mounts, mounts...)
envs, err := kl.makeEnvironmentVariables(pod, container, podIP)
if err != nil {
return nil, cleanupAction, err
}
opts.Envs = append(opts.Envs, envs...)
// Disabling adding TerminationMessagePath on Windows as these files would be mounted as docker volume and
// Docker for Windows has a bug where only directories can be mounted
if len(container.TerminationMessagePath) != 0 && runtime.GOOS != "windows" {
@ -925,7 +902,11 @@ func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bo
return false
}
if len(runtimeStatus.ContainerStatuses) > 0 {
glog.V(3).Infof("Pod %q is terminated, but some containers have not been cleaned up: %+v", format.Pod(pod), runtimeStatus.ContainerStatuses)
var statusStr string
for _, status := range runtimeStatus.ContainerStatuses {
statusStr += fmt.Sprintf("%+v ", *status)
}
glog.V(3).Infof("Pod %q is terminated, but some containers have not been cleaned up: %s", format.Pod(pod), statusStr)
return false
}
if kl.podVolumesExist(pod.UID) && !kl.keepTerminatedPodVolumes {
@ -1085,35 +1066,28 @@ func (kl *Kubelet) podKiller() {
killing := sets.NewString()
// guard for the killing set
lock := sync.Mutex{}
for {
select {
case podPair, ok := <-kl.podKillingCh:
if !ok {
return
}
for podPair := range kl.podKillingCh {
runningPod := podPair.RunningPod
apiPod := podPair.APIPod
runningPod := podPair.RunningPod
apiPod := podPair.APIPod
lock.Lock()
exists := killing.Has(string(runningPod.ID))
if !exists {
killing.Insert(string(runningPod.ID))
}
lock.Unlock()
lock.Lock()
exists := killing.Has(string(runningPod.ID))
if !exists {
killing.Insert(string(runningPod.ID))
}
lock.Unlock()
if !exists {
go func(apiPod *v1.Pod, runningPod *kubecontainer.Pod) {
glog.V(2).Infof("Killing unwanted pod %q", runningPod.Name)
err := kl.killPod(apiPod, runningPod, nil, nil)
if err != nil {
glog.Errorf("Failed killing the pod %q: %v", runningPod.Name, err)
}
lock.Lock()
killing.Delete(string(runningPod.ID))
lock.Unlock()
}(apiPod, runningPod)
}
if !exists {
go func(apiPod *v1.Pod, runningPod *kubecontainer.Pod) {
glog.V(2).Infof("Killing unwanted pod %q", runningPod.Name)
err := kl.killPod(apiPod, runningPod, nil, nil)
if err != nil {
glog.Errorf("Failed killing the pod %q: %v", runningPod.Name, err)
}
lock.Lock()
killing.Delete(string(runningPod.ID))
lock.Unlock()
}(apiPod, runningPod)
}
}
}
@ -1379,7 +1353,8 @@ func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.Po
}
kl.probeManager.UpdatePodStatus(pod.UID, s)
s.Conditions = append(s.Conditions, status.GeneratePodInitializedCondition(spec, s.InitContainerStatuses, s.Phase))
s.Conditions = append(s.Conditions, status.GeneratePodReadyCondition(spec, s.ContainerStatuses, s.Phase))
s.Conditions = append(s.Conditions, status.GeneratePodReadyCondition(spec, s.Conditions, s.ContainerStatuses, s.Phase))
s.Conditions = append(s.Conditions, status.GenerateContainersReadyCondition(spec, s.ContainerStatuses, s.Phase))
// Status manager will take care of the LastTransitionTimestamp, either preserve
// the timestamp from apiserver, or set a new one. When kubelet sees the pod,
// `PodScheduled` condition must be true.
@ -1432,6 +1407,12 @@ func (kl *Kubelet) convertStatusToAPIStatus(pod *v1.Pod, podStatus *kubecontaine
true,
)
// Preserves conditions not controlled by kubelet
for _, c := range pod.Status.Conditions {
if !kubetypes.PodConditionByKubelet(c.Type) {
apiPodStatus.Conditions = append(apiPodStatus.Conditions, c)
}
}
return &apiPodStatus
}
@ -1613,142 +1594,60 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe
return kl.runner.RunInContainer(container.ID, cmd, 0)
}
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime)
if !ok {
return fmt.Errorf("streaming methods not supported by runtime")
}
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return streamingRuntime.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize, timeout)
}
// AttachContainer uses the container runtime to attach the given streams to
// the given container.
func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime)
if !ok {
return fmt.Errorf("streaming methods not supported by runtime")
}
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return streamingRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty, resize)
}
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error {
streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime)
if !ok {
return fmt.Errorf("streaming methods not supported by runtime")
}
pods, err := kl.containerRuntime.GetPods(false)
if err != nil {
return err
}
// Resolve and type convert back again.
// We need the static pod UID but the kubecontainer API works with types.UID.
podUID = types.UID(kl.podManager.TranslatePodUID(podUID))
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
if pod.IsEmpty() {
return fmt.Errorf("pod not found (%q)", podFullName)
}
return streamingRuntime.PortForward(&pod, port, stream)
}
// GetExec gets the URL the exec will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) {
switch streamingRuntime := kl.containerRuntime.(type) {
case kubecontainer.DirectStreamingRuntime:
// Kubelet will serve the exec directly.
return nil, nil
case kubecontainer.IndirectStreamingRuntime:
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return nil, err
}
if container == nil {
return nil, fmt.Errorf("container not found (%q)", containerName)
}
return streamingRuntime.GetExec(container.ID, cmd, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, streamOpts.TTY)
default:
return nil, fmt.Errorf("container runtime does not support exec")
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return nil, err
}
if container == nil {
return nil, fmt.Errorf("container not found (%q)", containerName)
}
return kl.streamingRuntime.GetExec(container.ID, cmd, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, streamOpts.TTY)
}
// GetAttach gets the URL the attach will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error) {
switch streamingRuntime := kl.containerRuntime.(type) {
case kubecontainer.DirectStreamingRuntime:
// Kubelet will serve the attach directly.
return nil, nil
case kubecontainer.IndirectStreamingRuntime:
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return nil, err
}
if container == nil {
return nil, fmt.Errorf("container %s not found in pod %s", containerName, podFullName)
}
// The TTY setting for attach must match the TTY setting in the initial container configuration,
// since whether the process is running in a TTY cannot be changed after it has started. We
// need the api.Pod to get the TTY status.
pod, found := kl.GetPodByFullName(podFullName)
if !found || (string(podUID) != "" && pod.UID != podUID) {
return nil, fmt.Errorf("pod %s not found", podFullName)
}
containerSpec := kubecontainer.GetContainerSpec(pod, containerName)
if containerSpec == nil {
return nil, fmt.Errorf("container %s not found in pod %s", containerName, podFullName)
}
tty := containerSpec.TTY
return streamingRuntime.GetAttach(container.ID, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, tty)
default:
return nil, fmt.Errorf("container runtime does not support attach")
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return nil, err
}
if container == nil {
return nil, fmt.Errorf("container %s not found in pod %s", containerName, podFullName)
}
// The TTY setting for attach must match the TTY setting in the initial container configuration,
// since whether the process is running in a TTY cannot be changed after it has started. We
// need the api.Pod to get the TTY status.
pod, found := kl.GetPodByFullName(podFullName)
if !found || (string(podUID) != "" && pod.UID != podUID) {
return nil, fmt.Errorf("pod %s not found", podFullName)
}
containerSpec := kubecontainer.GetContainerSpec(pod, containerName)
if containerSpec == nil {
return nil, fmt.Errorf("container %s not found in pod %s", containerName, podFullName)
}
tty := containerSpec.TTY
return kl.streamingRuntime.GetAttach(container.ID, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, tty)
}
// GetPortForward gets the URL the port-forward will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) {
switch streamingRuntime := kl.containerRuntime.(type) {
case kubecontainer.DirectStreamingRuntime:
// Kubelet will serve the attach directly.
return nil, nil
case kubecontainer.IndirectStreamingRuntime:
pods, err := kl.containerRuntime.GetPods(false)
if err != nil {
return nil, err
}
// Resolve and type convert back again.
// We need the static pod UID but the kubecontainer API works with types.UID.
podUID = types.UID(kl.podManager.TranslatePodUID(podUID))
podFullName := kubecontainer.BuildPodFullName(podName, podNamespace)
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
if pod.IsEmpty() {
return nil, fmt.Errorf("pod not found (%q)", podFullName)
}
return streamingRuntime.GetPortForward(podName, podNamespace, podUID, portForwardOpts.Ports)
default:
return nil, fmt.Errorf("container runtime does not support port-forward")
pods, err := kl.containerRuntime.GetPods(false)
if err != nil {
return nil, err
}
// Resolve and type convert back again.
// We need the static pod UID but the kubecontainer API works with types.UID.
podUID = types.UID(kl.podManager.TranslatePodUID(podUID))
podFullName := kubecontainer.BuildPodFullName(podName, podNamespace)
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
if pod.IsEmpty() {
return nil, fmt.Errorf("pod not found (%q)", podFullName)
}
return kl.streamingRuntime.GetPortForward(podName, podNamespace, podUID, portForwardOpts.Ports)
}
// cleanupOrphanedPodCgroups removes cgroups that should no longer exist.

View file

@ -45,10 +45,8 @@ func newPodContainerDeletor(runtime kubecontainer.Runtime, containersToKeep int)
buffer := make(chan kubecontainer.ContainerID, containerDeletorBufferLimit)
go wait.Until(func() {
for {
select {
case id := <-buffer:
runtime.DeleteContainer(id)
}
id := <-buffer
runtime.DeleteContainer(id)
}
}, 0, wait.NeverStop)
@ -101,6 +99,7 @@ func (p *podContainerDeletor) deleteContainersInPod(filterContainerID string, po
containersToKeep := p.containersToKeep
if removeAll {
containersToKeep = 0
filterContainerID = ""
}
for _, candidate := range getContainersToDeleteInPod(filterContainerID, podStatus, containersToKeep) {

View file

@ -235,7 +235,7 @@ func (p *podWorkers) removeWorker(uid types.UID) {
delete(p.podUpdates, uid)
// If there is an undelivered work update for this pod we need to remove it
// since per-pod goroutine won't be able to put it to the already closed
// channel when it finish processing the current work update.
// channel when it finishes processing the current work update.
if _, cached := p.lastUndeliveredWorkUpdate[uid]; cached {
delete(p.lastUndeliveredWorkUpdate, uid)
}

View file

@ -91,8 +91,12 @@ func (kl *Kubelet) runOnce(pods []*v1.Pod, retryDelay time.Duration) (results []
res := <-ch
results = append(results, res)
if res.Err != nil {
// TODO(proppy): report which containers failed the pod.
glog.Infof("failed to start pod %q: %v", format.Pod(res.Pod), res.Err)
faliedContainerName, err := kl.getFailedContainers(res.Pod)
if err != nil {
glog.Infof("unable to get failed containers' names for pod %q, error:%v", format.Pod(res.Pod), err)
} else {
glog.Infof("unable to start pod %q because container:%v failed", format.Pod(res.Pod), faliedContainerName)
}
failedPods = append(failedPods, format.Pod(res.Pod))
} else {
glog.Infof("started pod %q", format.Pod(res.Pod))
@ -156,3 +160,18 @@ func (kl *Kubelet) isPodRunning(pod *v1.Pod, status *kubecontainer.PodStatus) bo
}
return true
}
// getFailedContainer returns failed container name for pod.
func (kl *Kubelet) getFailedContainers(pod *v1.Pod) ([]string, error) {
status, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
if err != nil {
return nil, fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err)
}
var containerNames []string
for _, cs := range status.ContainerStatuses {
if cs.State != kubecontainer.ContainerStateRunning && cs.ExitCode != 0 {
containerNames = append(containerNames, cs.Name)
}
}
return containerNames, nil
}

View file

@ -12,13 +12,14 @@ go_library(
"constants.go",
"doc.go",
"labels.go",
"pod_status.go",
"pod_update.go",
"types.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/types",
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/apis/scheduling:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
@ -29,6 +30,7 @@ go_test(
name = "go_default_test",
srcs = [
"labels_test.go",
"pod_status_test.go",
"pod_update_test.go",
"types_test.go",
],

View file

@ -22,7 +22,6 @@ const (
// different container runtimes
DockerContainerRuntime = "docker"
RktContainerRuntime = "rkt"
RemoteContainerRuntime = "remote"
// User visible keys for managing node allocatable enforcement on the node.

View file

@ -0,0 +1,40 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"k8s.io/api/core/v1"
)
// PodConditionsByKubelet is the list of pod conditions owned by kubelet
var PodConditionsByKubelet = []v1.PodConditionType{
v1.PodScheduled,
v1.PodReady,
v1.PodInitialized,
v1.PodReasonUnschedulable,
v1.ContainersReady,
}
// PodConditionByKubelet returns if the pod condition type is owned by kubelet
func PodConditionByKubelet(conditionType v1.PodConditionType) bool {
for _, c := range PodConditionsByKubelet {
if c == conditionType {
return true
}
}
return false
}

View file

@ -22,7 +22,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeapi "k8s.io/kubernetes/pkg/apis/core"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/apis/scheduling"
)
const (
@ -144,7 +144,7 @@ func (sp SyncPodType) String() string {
// or equal to SystemCriticalPriority. Both the rescheduler(deprecated in 1.10) and the kubelet use this function
// to make admission and scheduling decisions.
func IsCriticalPod(pod *v1.Pod) bool {
return IsCritical(pod.Namespace, pod.Annotations) || (pod.Spec.Priority != nil && IsCriticalPodBasedOnPriority(pod.Namespace, *pod.Spec.Priority))
return IsCritical(pod.Namespace, pod.Annotations) || (pod.Spec.Priority != nil && IsCriticalPodBasedOnPriority(*pod.Spec.Priority))
}
// IsCritical returns true if parameters bear the critical pod annotation
@ -163,12 +163,8 @@ func IsCritical(ns string, annotations map[string]string) bool {
}
// IsCriticalPodBasedOnPriority checks if the given pod is a critical pod based on priority resolved from pod Spec.
func IsCriticalPodBasedOnPriority(ns string, priority int32) bool {
// Critical pods are restricted to "kube-system" namespace as of now.
if ns != kubeapi.NamespaceSystem {
return false
}
if priority >= schedulerapi.SystemCriticalPriority {
func IsCriticalPodBasedOnPriority(priority int32) bool {
if priority >= scheduling.SystemCriticalPriority {
return true
}
return false

View file

@ -92,9 +92,12 @@ filegroup(
"//pkg/kubelet/util/cache:all-srcs",
"//pkg/kubelet/util/format:all-srcs",
"//pkg/kubelet/util/ioutils:all-srcs",
"//pkg/kubelet/util/manager:all-srcs",
"//pkg/kubelet/util/pluginwatcher:all-srcs",
"//pkg/kubelet/util/queue:all-srcs",
"//pkg/kubelet/util/sliceutils:all-srcs",
"//pkg/kubelet/util/store:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View file

@ -1,25 +0,0 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["ioutils.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/util/ioutils",
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View file

@ -1,37 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutils
import "io"
// writeCloserWrapper represents a WriteCloser whose closer operation is noop.
type writeCloserWrapper struct {
Writer io.Writer
}
func (w *writeCloserWrapper) Write(buf []byte) (int, error) {
return w.Writer.Write(buf)
}
func (w *writeCloserWrapper) Close() error {
return nil
}
// WriteCloserWrapper returns a writeCloserWrapper.
func WriteCloserWrapper(w io.Writer) io.WriteCloser {
return &writeCloserWrapper{w}
}

View file

@ -23,16 +23,19 @@ import (
"github.com/golang/glog"
authenticationv1 "k8s.io/api/authentication/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/configmap"
"k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/mountpod"
"k8s.io/kubernetes/pkg/kubelet/secret"
"k8s.io/kubernetes/pkg/kubelet/token"
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
@ -49,6 +52,7 @@ func NewInitializedVolumePluginMgr(
kubelet *Kubelet,
secretManager secret.Manager,
configMapManager configmap.Manager,
tokenManager *token.Manager,
plugins []volume.VolumePlugin,
prober volume.DynamicPluginProber) (*volume.VolumePluginMgr, error) {
@ -61,6 +65,7 @@ func NewInitializedVolumePluginMgr(
volumePluginMgr: volume.VolumePluginMgr{},
secretManager: secretManager,
configMapManager: configMapManager,
tokenManager: tokenManager,
mountPodManager: mountPodManager,
}
@ -84,6 +89,7 @@ type kubeletVolumeHost struct {
kubelet *Kubelet
volumePluginMgr volume.VolumePluginMgr
secretManager secret.Manager
tokenManager *token.Manager
configMapManager configmap.Manager
mountPodManager mountpod.Manager
}
@ -92,6 +98,10 @@ func (kvh *kubeletVolumeHost) GetVolumeDevicePluginDir(pluginName string) string
return kvh.kubelet.getVolumeDevicePluginDir(pluginName)
}
func (kvh *kubeletVolumeHost) GetPodsDir() string {
return kvh.kubelet.getPodsDir()
}
func (kvh *kubeletVolumeHost) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
dir := kvh.kubelet.getPodVolumeDir(podUID, pluginName, volumeName)
if runtime.GOOS == "windows" {
@ -148,7 +158,7 @@ func (kvh *kubeletVolumeHost) GetCloudProvider() cloudprovider.Interface {
func (kvh *kubeletVolumeHost) GetMounter(pluginName string) mount.Interface {
exec, err := kvh.getMountExec(pluginName)
if err != nil {
glog.V(2).Info("Error finding mount pod for plugin %s: %s", pluginName, err.Error())
glog.V(2).Infof("Error finding mount pod for plugin %s: %s", pluginName, err.Error())
// Use the default mounter
exec = nil
}
@ -186,6 +196,10 @@ func (kvh *kubeletVolumeHost) GetConfigMapFunc() func(namespace, name string) (*
return kvh.configMapManager.GetConfigMap
}
func (kvh *kubeletVolumeHost) GetServiceAccountTokenFunc() func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
return kvh.tokenManager.GetServiceAccountToken
}
func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {
node, err := kvh.kubelet.GetNode()
if err != nil {
@ -198,10 +212,14 @@ func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName {
return kvh.kubelet.nodeName
}
func (kvh *kubeletVolumeHost) GetEventRecorder() record.EventRecorder {
return kvh.kubelet.recorder
}
func (kvh *kubeletVolumeHost) GetExec(pluginName string) mount.Exec {
exec, err := kvh.getMountExec(pluginName)
if err != nil {
glog.V(2).Info("Error finding mount pod for plugin %s: %s", pluginName, err.Error())
glog.V(2).Infof("Error finding mount pod for plugin %s: %s", pluginName, err.Error())
// Use the default exec
exec = nil
}