Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 42 additions & 19 deletions pkg/kubelet/container/testing/fake_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,12 @@ type FakeRuntime struct {
// from container runtime.
BlockImagePulls bool
imagePullTokenBucket chan bool
SwapBehavior map[string]kubetypes.SwapBehavior
T TB
// imagePullErrBucket sends an error to a PullImage() call
// blocked by BlockImagePulls. This is used to simulate
// a failure in some of the parallel pull image calls.
imagePullErrBucket chan error
SwapBehavior map[string]kubetypes.SwapBehavior
T TB
}

const FakeHost = "localhost:12345"
Expand Down Expand Up @@ -318,6 +322,30 @@ func (f *FakeRuntime) GetContainerLogs(_ context.Context, pod *v1.Pod, container
func (f *FakeRuntime) PullImage(ctx context.Context, image kubecontainer.ImageSpec, creds []credentialprovider.TrackedAuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, *credentialprovider.TrackedAuthConfig, error) {
f.Lock()
f.CalledFunctions = append(f.CalledFunctions, "PullImage")

if f.imagePullTokenBucket == nil {
f.imagePullTokenBucket = make(chan bool, 1)
}
if f.imagePullErrBucket == nil {
f.imagePullErrBucket = make(chan error, 1)
}

blockImagePulls := f.BlockImagePulls
f.Unlock()

if blockImagePulls {
// Block the function before adding the image to f.ImageList
select {
case <-ctx.Done():
case <-f.imagePullTokenBucket:
case pullImageErr := <-f.imagePullErrBucket:
return "", nil, pullImageErr
}
}

f.Lock()
defer f.Unlock()

if f.Err == nil {
i := kubecontainer.Image{
ID: image.Image,
Expand All @@ -332,23 +360,7 @@ func (f *FakeRuntime) PullImage(ctx context.Context, image kubecontainer.ImageSp
retCreds = &creds[0]
}

if !f.BlockImagePulls {
f.Unlock()
return image.Image, retCreds, f.Err
}

retErr := f.Err
if f.imagePullTokenBucket == nil {
f.imagePullTokenBucket = make(chan bool, 1)
}
// Unlock before waiting for UnblockImagePulls calls, to avoid deadlock.
f.Unlock()
select {
case <-ctx.Done():
case <-f.imagePullTokenBucket:
}

return image.Image, retCreds, retErr
return image.Image, retCreds, f.Err
}

// UnblockImagePulls unblocks a certain number of image pulls, if BlockImagePulls is true.
Expand All @@ -363,6 +375,17 @@ func (f *FakeRuntime) UnblockImagePulls(count int) {
}
}

// SendImagePullError sends an error to a PullImage() call blocked by BlockImagePulls.
// PullImage() immediately returns after receiving the error.
func (f *FakeRuntime) SendImagePullError(err error) {
if f.imagePullErrBucket != nil {
select {
case f.imagePullErrBucket <- err:
default:
}
}
}

func (f *FakeRuntime) GetImageRef(_ context.Context, image kubecontainer.ImageSpec) (string, error) {
f.Lock()
defer f.Unlock()
Expand Down
10 changes: 10 additions & 0 deletions pkg/kubelet/images/image_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectR
if !utilfeature.DefaultFeatureGate.Enabled(features.KubeletEnsureSecretPulledImages) {
msg := fmt.Sprintf("Container image %q already present on machine", requestedImage)
m.logIt(objRef, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, klog.Info)

// we need to stop recording if it is still in progress, as the image
// has already been pulled by another pod.
m.podPullingTimeRecorder.RecordImageFinishedPulling(pod.UID)

return imageRef, msg, nil
}

Expand All @@ -231,6 +236,11 @@ func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectR
if !pullRequired {
msg := fmt.Sprintf("Container image %q already present on machine and can be accessed by the pod", requestedImage)
m.logIt(objRef, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, klog.Info)

// we need to stop recording if it is still in progress, as the image
// has already been pulled by another pod.
m.podPullingTimeRecorder.RecordImageFinishedPulling(pod.UID)

return imageRef, msg, nil
}
}
Expand Down
126 changes: 111 additions & 15 deletions pkg/kubelet/images/image_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,27 +490,33 @@ func ensureSecretImagesTestCases() []pullerTestCase {

type mockPodPullingTimeRecorder struct {
sync.Mutex
startedPullingRecorded bool
finishedPullingRecorded bool
startedPullingRecorded map[types.UID]bool
finishedPullingRecorded map[types.UID]bool
}

func (m *mockPodPullingTimeRecorder) RecordImageStartedPulling(podUID types.UID) {
m.Lock()
defer m.Unlock()
m.startedPullingRecorded = true

if !m.startedPullingRecorded[podUID] {
m.startedPullingRecorded[podUID] = true
}
}

func (m *mockPodPullingTimeRecorder) RecordImageFinishedPulling(podUID types.UID) {
m.Lock()
defer m.Unlock()
m.finishedPullingRecorded = true

if m.startedPullingRecorded[podUID] {
m.finishedPullingRecorded[podUID] = true
}
}

func (m *mockPodPullingTimeRecorder) reset() {
m.Lock()
defer m.Unlock()
m.startedPullingRecorded = false
m.finishedPullingRecorded = false
clear(m.startedPullingRecorded)
clear(m.finishedPullingRecorded)
}

type mockImagePullManager struct {
Expand Down Expand Up @@ -570,7 +576,10 @@ func pullerTestEnv(
fakeRuntime.Err = c.pullerErr
fakeRuntime.InspectErr = c.inspectErr

fakePodPullingTimeRecorder = &mockPodPullingTimeRecorder{}
fakePodPullingTimeRecorder = &mockPodPullingTimeRecorder{
startedPullingRecorded: make(map[types.UID]bool),
finishedPullingRecorded: make(map[types.UID]bool),
}

pullManager := &mockImagePullManager{allowAll: true}
if c.allowedCredentials != nil {
Expand Down Expand Up @@ -612,8 +621,8 @@ func TestParallelPuller(t *testing.T) {
_, msg, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, c.pullSecrets, nil, "", container.ImagePullPolicy)
fakeRuntime.AssertCalls(expected.calls)
assert.Equal(t, expected.err, err)
assert.Equal(t, expected.shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded)
assert.Equal(t, expected.shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded)
assert.Equal(t, expected.shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded[pod.UID])
assert.Equal(t, expected.shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded[pod.UID])
assert.Contains(t, msg, expected.msg)
fakePodPullingTimeRecorder.reset()
}
Expand Down Expand Up @@ -645,8 +654,8 @@ func TestSerializedPuller(t *testing.T) {
_, msg, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, c.pullSecrets, nil, "", container.ImagePullPolicy)
fakeRuntime.AssertCalls(expected.calls)
assert.Equal(t, expected.err, err)
assert.Equal(t, expected.shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded)
assert.Equal(t, expected.shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded)
assert.Equal(t, expected.shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded[pod.UID])
assert.Equal(t, expected.shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded[pod.UID])
assert.Contains(t, msg, expected.msg)
fakePodPullingTimeRecorder.reset()
}
Expand Down Expand Up @@ -709,8 +718,8 @@ func TestPullAndListImageWithPodAnnotations(t *testing.T) {
_, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, c.pullSecrets, nil, "", container.ImagePullPolicy)
fakeRuntime.AssertCalls(c.expected[0].calls)
assert.Equal(t, c.expected[0].err, err, "tick=%d", 0)
assert.Equal(t, c.expected[0].shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded)
assert.Equal(t, c.expected[0].shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded)
assert.Equal(t, c.expected[0].shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded[pod.UID])
assert.Equal(t, c.expected[0].shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded[pod.UID])

images, _ := fakeRuntime.ListImages(ctx)
assert.Len(t, images, 1, "ListImages() count")
Expand Down Expand Up @@ -766,8 +775,8 @@ func TestPullAndListImageWithRuntimeHandlerInImageCriAPIFeatureGate(t *testing.T
_, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, c.pullSecrets, nil, runtimeHandler, container.ImagePullPolicy)
fakeRuntime.AssertCalls(c.expected[0].calls)
assert.Equal(t, c.expected[0].err, err, "tick=%d", 0)
assert.Equal(t, c.expected[0].shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded)
assert.Equal(t, c.expected[0].shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded)
assert.Equal(t, c.expected[0].shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded[pod.UID])
assert.Equal(t, c.expected[0].shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded[pod.UID])

images, _ := fakeRuntime.ListImages(ctx)
assert.Len(t, images, 1, "ListImages() count")
Expand Down Expand Up @@ -854,6 +863,93 @@ func TestMaxParallelImagePullsLimit(t *testing.T) {
fakeRuntime.AssertCallCounts("PullImage", 7)
}

func TestParallelPodPullingTimeRecorderWithErr(t *testing.T) {
ctx := context.Background()
pod1 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test_pod1",
Namespace: "test-ns",
UID: "bar1",
ResourceVersion: "42",
}}

pod2 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test_pod2",
Namespace: "test-ns",
UID: "bar2",
ResourceVersion: "42",
}}

pods := [2]*v1.Pod{pod1, pod2}

testCase := &pullerTestCase{
containerImage: "missing_image",
testName: "missing image, pull if not present",
policy: v1.PullIfNotPresent,
inspectErr: nil,
pullerErr: nil,
qps: 0.0,
burst: 0,
}

useSerializedEnv := false
maxParallelImagePulls := 2
var wg sync.WaitGroup

puller, fakeClock, fakeRuntime, container, fakePodPullingTimeRecorder, _ := pullerTestEnv(t, *testCase, useSerializedEnv, ptr.To(int32(maxParallelImagePulls)))
fakeRuntime.BlockImagePulls = true
fakeRuntime.CalledFunctions = nil
fakeRuntime.T = t
fakeClock.Step(time.Second)

// First, each pod's puller calls EnsureImageExists
for i := 0; i < 2; i++ {
wg.Add(1)
go func(i int) {
_, _, _ = puller.EnsureImageExists(ctx, nil, pods[i], container.Image, testCase.pullSecrets, nil, "", container.ImagePullPolicy)
wg.Done()
}(i)
}
time.Sleep(1 * time.Second)

// Assert the number of PullImage calls is 2
fakeRuntime.AssertCallCounts("PullImage", 2)

// Recording for both of the pods should be started but not finished
assert.True(t, fakePodPullingTimeRecorder.startedPullingRecorded[pods[0].UID])
assert.True(t, fakePodPullingTimeRecorder.startedPullingRecorded[pods[1].UID])
assert.False(t, fakePodPullingTimeRecorder.finishedPullingRecorded[pods[0].UID])
assert.False(t, fakePodPullingTimeRecorder.finishedPullingRecorded[pods[1].UID])

// Unblock one of the pods to pull the image
fakeRuntime.UnblockImagePulls(1)
time.Sleep(1 * time.Second)

// Introduce a pull error for the second pod and unblock it
fakeRuntime.SendImagePullError(errors.New("pull image error"))

wg.Wait()

// This time EnsureImageExists will return without pulling
for i := 0; i < 2; i++ {
wg.Add(1)
go func(i int) {
_, _, err := puller.EnsureImageExists(ctx, nil, pods[i], container.Image, testCase.pullSecrets, nil, "", container.ImagePullPolicy)
assert.NoError(t, err)
wg.Done()
}(i)
}
wg.Wait()

// Assert the number of PullImage calls is still 2
fakeRuntime.AssertCallCounts("PullImage", 2)

// Both recorders should be finished
assert.True(t, fakePodPullingTimeRecorder.finishedPullingRecorded[pods[0].UID])
assert.True(t, fakePodPullingTimeRecorder.finishedPullingRecorded[pods[1].UID])
}

func TestEvalCRIPullErr(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
Expand Down
4 changes: 3 additions & 1 deletion pkg/kubelet/util/pod_startup_latency_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ func (p *basicPodStartupLatencyTracker) RecordImageFinishedPulling(podUID types.
return
}

state.lastFinishedPulling = p.clock.Now() // Now is always grater than values from the past.
if !state.firstStartedPulling.IsZero() {
state.lastFinishedPulling = p.clock.Now() // Now is always grater than values from the past.
}
}

func (p *basicPodStartupLatencyTracker) RecordStatusUpdated(pod *v1.Pod) {
Expand Down