From d502cf40d3aa39f1b52c673a9c4c4fc4908415d1 Mon Sep 17 00:00:00 2001 From: Ali Abbasi Alaei Date: Tue, 15 Apr 2025 18:43:15 -0400 Subject: [PATCH 1/3] Fix negative pod startup duration --- pkg/kubelet/images/image_manager.go | 11 +++++++++++ pkg/kubelet/images/image_manager_test.go | 6 +++++- pkg/kubelet/util/pod_startup_latency_tracker.go | 5 ++++- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/pkg/kubelet/images/image_manager.go b/pkg/kubelet/images/image_manager.go index b76f87d19aae6..e43e92aff1ca2 100644 --- a/pkg/kubelet/images/image_manager.go +++ b/pkg/kubelet/images/image_manager.go @@ -210,6 +210,11 @@ func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectR if !utilfeature.DefaultFeatureGate.Enabled(features.KubeletEnsureSecretPulledImages) { msg := fmt.Sprintf("Container image %q already present on machine", requestedImage) m.logIt(objRef, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, klog.Info) + + // we need to stop recording if it is still in progress, as the image + // has already been pulled by another pod. + m.podPullingTimeRecorder.RecordImageFinishedPulling(pod.UID) + return imageRef, msg, nil } @@ -231,6 +236,11 @@ func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectR if !pullRequired { msg := fmt.Sprintf("Container image %q already present on machine and can be accessed by the pod", requestedImage) m.logIt(objRef, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, klog.Info) + + // we need to stop recording if it is still in progress, as the image + // has already been pulled by another pod. + m.podPullingTimeRecorder.RecordImageFinishedPulling(pod.UID) + return imageRef, msg, nil } } @@ -380,3 +390,4 @@ func trackedToImagePullCreds(trackedCreds *credentialprovider.TrackedAuthConfig) return ret } + diff --git a/pkg/kubelet/images/image_manager_test.go b/pkg/kubelet/images/image_manager_test.go index 2f94534830097..80aeeff79f710 100644 --- a/pkg/kubelet/images/image_manager_test.go +++ b/pkg/kubelet/images/image_manager_test.go @@ -503,7 +503,10 @@ func (m *mockPodPullingTimeRecorder) RecordImageStartedPulling(podUID types.UID) func (m *mockPodPullingTimeRecorder) RecordImageFinishedPulling(podUID types.UID) { m.Lock() defer m.Unlock() - m.finishedPullingRecorded = true + + if m.startedPullingRecorded { + m.finishedPullingRecorded = true + } } func (m *mockPodPullingTimeRecorder) reset() { @@ -961,3 +964,4 @@ func makeDockercfgSecretForRepo(sMeta metav1.ObjectMeta, repo string) v1.Secret }, } } + diff --git a/pkg/kubelet/util/pod_startup_latency_tracker.go b/pkg/kubelet/util/pod_startup_latency_tracker.go index 3ab3d2aaae537..2dc3a85d087b3 100644 --- a/pkg/kubelet/util/pod_startup_latency_tracker.go +++ b/pkg/kubelet/util/pod_startup_latency_tracker.go @@ -146,7 +146,9 @@ func (p *basicPodStartupLatencyTracker) RecordImageFinishedPulling(podUID types. return } - state.lastFinishedPulling = p.clock.Now() // Now is always grater than values from the past. + if !state.firstStartedPulling.IsZero() { + state.lastFinishedPulling = p.clock.Now() // Now is always grater than values from the past. + } } func (p *basicPodStartupLatencyTracker) RecordStatusUpdated(pod *v1.Pod) { @@ -194,3 +196,4 @@ func (p *basicPodStartupLatencyTracker) DeletePodStartupState(podUID types.UID) delete(p.pods, podUID) } + From cb610fae5ee3fa0c4c9aaadbf4e23a8b08107fe6 Mon Sep 17 00:00:00 2001 From: Ali Abbasi Alaei Date: Wed, 16 Apr 2025 14:31:22 -0400 Subject: [PATCH 2/3] Add TestParallelPodPullingTimeRecorderWithErr test case --- pkg/kubelet/container/testing/fake_runtime.go | 24 +++- pkg/kubelet/images/image_manager.go | 1 - pkg/kubelet/images/image_manager_test.go | 126 +++++++++++++++--- .../util/pod_startup_latency_tracker.go | 1 - 4 files changed, 131 insertions(+), 21 deletions(-) diff --git a/pkg/kubelet/container/testing/fake_runtime.go b/pkg/kubelet/container/testing/fake_runtime.go index 226972a44a7af..fb90b621ce410 100644 --- a/pkg/kubelet/container/testing/fake_runtime.go +++ b/pkg/kubelet/container/testing/fake_runtime.go @@ -71,8 +71,12 @@ type FakeRuntime struct { // from container runtime. BlockImagePulls bool imagePullTokenBucket chan bool - SwapBehavior map[string]kubetypes.SwapBehavior - T TB + // imagePullErrBucket sends an error to a PullImage() call + // blocked by BlockImagePulls. This is used to simulate + // a failure in some of the parallel pull image calls. + imagePullErrBucket chan error + SwapBehavior map[string]kubetypes.SwapBehavior + T TB } const FakeHost = "localhost:12345" @@ -341,11 +345,16 @@ func (f *FakeRuntime) PullImage(ctx context.Context, image kubecontainer.ImageSp if f.imagePullTokenBucket == nil { f.imagePullTokenBucket = make(chan bool, 1) } + if f.imagePullErrBucket == nil { + f.imagePullErrBucket = make(chan error, 1) + } // Unlock before waiting for UnblockImagePulls calls, to avoid deadlock. f.Unlock() select { case <-ctx.Done(): case <-f.imagePullTokenBucket: + case pullImageErr := <-f.imagePullErrBucket: + return image.Image, retCreds, pullImageErr } return image.Image, retCreds, retErr @@ -363,6 +372,17 @@ func (f *FakeRuntime) UnblockImagePulls(count int) { } } +// SendImagePullError sends an error to a PullImage() call blocked by BlockImagePulls. +// PullImage() immediately returns after receiving the error. +func (f *FakeRuntime) SendImagePullError(err error) { + if f.imagePullErrBucket != nil { + select { + case f.imagePullErrBucket <- err: + default: + } + } +} + func (f *FakeRuntime) GetImageRef(_ context.Context, image kubecontainer.ImageSpec) (string, error) { f.Lock() defer f.Unlock() diff --git a/pkg/kubelet/images/image_manager.go b/pkg/kubelet/images/image_manager.go index e43e92aff1ca2..4747f76774990 100644 --- a/pkg/kubelet/images/image_manager.go +++ b/pkg/kubelet/images/image_manager.go @@ -390,4 +390,3 @@ func trackedToImagePullCreds(trackedCreds *credentialprovider.TrackedAuthConfig) return ret } - diff --git a/pkg/kubelet/images/image_manager_test.go b/pkg/kubelet/images/image_manager_test.go index 80aeeff79f710..8fca8547cfa97 100644 --- a/pkg/kubelet/images/image_manager_test.go +++ b/pkg/kubelet/images/image_manager_test.go @@ -490,30 +490,33 @@ func ensureSecretImagesTestCases() []pullerTestCase { type mockPodPullingTimeRecorder struct { sync.Mutex - startedPullingRecorded bool - finishedPullingRecorded bool + startedPullingRecorded map[types.UID]bool + finishedPullingRecorded map[types.UID]bool } func (m *mockPodPullingTimeRecorder) RecordImageStartedPulling(podUID types.UID) { m.Lock() defer m.Unlock() - m.startedPullingRecorded = true + + if !m.startedPullingRecorded[podUID] { + m.startedPullingRecorded[podUID] = true + } } func (m *mockPodPullingTimeRecorder) RecordImageFinishedPulling(podUID types.UID) { m.Lock() defer m.Unlock() - if m.startedPullingRecorded { - m.finishedPullingRecorded = true + if m.startedPullingRecorded[podUID] { + m.finishedPullingRecorded[podUID] = true } } func (m *mockPodPullingTimeRecorder) reset() { m.Lock() defer m.Unlock() - m.startedPullingRecorded = false - m.finishedPullingRecorded = false + clear(m.startedPullingRecorded) + clear(m.finishedPullingRecorded) } type mockImagePullManager struct { @@ -573,7 +576,10 @@ func pullerTestEnv( fakeRuntime.Err = c.pullerErr fakeRuntime.InspectErr = c.inspectErr - fakePodPullingTimeRecorder = &mockPodPullingTimeRecorder{} + fakePodPullingTimeRecorder = &mockPodPullingTimeRecorder{ + startedPullingRecorded: make(map[types.UID]bool), + finishedPullingRecorded: make(map[types.UID]bool), + } pullManager := &mockImagePullManager{allowAll: true} if c.allowedCredentials != nil { @@ -615,8 +621,8 @@ func TestParallelPuller(t *testing.T) { _, msg, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, c.pullSecrets, nil, "", container.ImagePullPolicy) fakeRuntime.AssertCalls(expected.calls) assert.Equal(t, expected.err, err) - assert.Equal(t, expected.shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded) - assert.Equal(t, expected.shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded) + assert.Equal(t, expected.shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded[pod.UID]) + assert.Equal(t, expected.shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded[pod.UID]) assert.Contains(t, msg, expected.msg) fakePodPullingTimeRecorder.reset() } @@ -648,8 +654,8 @@ func TestSerializedPuller(t *testing.T) { _, msg, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, c.pullSecrets, nil, "", container.ImagePullPolicy) fakeRuntime.AssertCalls(expected.calls) assert.Equal(t, expected.err, err) - assert.Equal(t, expected.shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded) - assert.Equal(t, expected.shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded) + assert.Equal(t, expected.shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded[pod.UID]) + assert.Equal(t, expected.shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded[pod.UID]) assert.Contains(t, msg, expected.msg) fakePodPullingTimeRecorder.reset() } @@ -712,8 +718,8 @@ func TestPullAndListImageWithPodAnnotations(t *testing.T) { _, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, c.pullSecrets, nil, "", container.ImagePullPolicy) fakeRuntime.AssertCalls(c.expected[0].calls) assert.Equal(t, c.expected[0].err, err, "tick=%d", 0) - assert.Equal(t, c.expected[0].shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded) - assert.Equal(t, c.expected[0].shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded) + assert.Equal(t, c.expected[0].shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded[pod.UID]) + assert.Equal(t, c.expected[0].shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded[pod.UID]) images, _ := fakeRuntime.ListImages(ctx) assert.Len(t, images, 1, "ListImages() count") @@ -769,8 +775,8 @@ func TestPullAndListImageWithRuntimeHandlerInImageCriAPIFeatureGate(t *testing.T _, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, c.pullSecrets, nil, runtimeHandler, container.ImagePullPolicy) fakeRuntime.AssertCalls(c.expected[0].calls) assert.Equal(t, c.expected[0].err, err, "tick=%d", 0) - assert.Equal(t, c.expected[0].shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded) - assert.Equal(t, c.expected[0].shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded) + assert.Equal(t, c.expected[0].shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded[pod.UID]) + assert.Equal(t, c.expected[0].shouldRecordFinishedPullingTime, fakePodPullingTimeRecorder.finishedPullingRecorded[pod.UID]) images, _ := fakeRuntime.ListImages(ctx) assert.Len(t, images, 1, "ListImages() count") @@ -857,6 +863,93 @@ func TestMaxParallelImagePullsLimit(t *testing.T) { fakeRuntime.AssertCallCounts("PullImage", 7) } +func TestParallelPodPullingTimeRecorderWithErr(t *testing.T) { + ctx := context.Background() + pod1 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test_pod1", + Namespace: "test-ns", + UID: "bar1", + ResourceVersion: "42", + }} + + pod2 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test_pod2", + Namespace: "test-ns", + UID: "bar2", + ResourceVersion: "42", + }} + + pods := [2]*v1.Pod{pod1, pod2} + + testCase := &pullerTestCase{ + containerImage: "missing_image", + testName: "missing image, pull if not present", + policy: v1.PullIfNotPresent, + inspectErr: nil, + pullerErr: nil, + qps: 0.0, + burst: 0, + } + + useSerializedEnv := false + maxParallelImagePulls := 2 + var wg sync.WaitGroup + + puller, fakeClock, fakeRuntime, container, fakePodPullingTimeRecorder, _ := pullerTestEnv(t, *testCase, useSerializedEnv, ptr.To(int32(maxParallelImagePulls))) + fakeRuntime.BlockImagePulls = true + fakeRuntime.CalledFunctions = nil + fakeRuntime.T = t + fakeClock.Step(time.Second) + + // First, each pod's puller calls EnsureImageExists + for i := 0; i < 2; i++ { + wg.Add(1) + go func(i int) { + _, _, _ = puller.EnsureImageExists(ctx, nil, pods[i], container.Image, testCase.pullSecrets, nil, "", container.ImagePullPolicy) + wg.Done() + }(i) + } + time.Sleep(1 * time.Second) + + // Assert the number of PullImage calls is 2 + fakeRuntime.AssertCallCounts("PullImage", 2) + + // Recording for both of the pods should be started but not finished + assert.True(t, fakePodPullingTimeRecorder.startedPullingRecorded[pods[0].UID]) + assert.True(t, fakePodPullingTimeRecorder.startedPullingRecorded[pods[1].UID]) + assert.False(t, fakePodPullingTimeRecorder.finishedPullingRecorded[pods[0].UID]) + assert.False(t, fakePodPullingTimeRecorder.finishedPullingRecorded[pods[1].UID]) + + // Unblock one of the pods to pull the image + fakeRuntime.UnblockImagePulls(1) + time.Sleep(1 * time.Second) + + // Introduce a pull error for the second pod and unblock it + fakeRuntime.SendImagePullError(errors.New("pull image error")) + + wg.Wait() + + // This time EnsureImageExists will return without pulling + for i := 0; i < 2; i++ { + wg.Add(1) + go func(i int) { + _, _, err := puller.EnsureImageExists(ctx, nil, pods[i], container.Image, testCase.pullSecrets, nil, "", container.ImagePullPolicy) + assert.NoError(t, err) + wg.Done() + }(i) + } + wg.Wait() + + // Assert the number of PullImage calls is still 2 + fakeRuntime.AssertCallCounts("PullImage", 2) + + // Both recorders should be finished + assert.True(t, fakePodPullingTimeRecorder.finishedPullingRecorded[pods[0].UID]) + assert.True(t, fakePodPullingTimeRecorder.finishedPullingRecorded[pods[1].UID]) +} + func TestEvalCRIPullErr(t *testing.T) { t.Parallel() for _, tc := range []struct { @@ -964,4 +1057,3 @@ func makeDockercfgSecretForRepo(sMeta metav1.ObjectMeta, repo string) v1.Secret }, } } - diff --git a/pkg/kubelet/util/pod_startup_latency_tracker.go b/pkg/kubelet/util/pod_startup_latency_tracker.go index 2dc3a85d087b3..e5610d3d31c8a 100644 --- a/pkg/kubelet/util/pod_startup_latency_tracker.go +++ b/pkg/kubelet/util/pod_startup_latency_tracker.go @@ -196,4 +196,3 @@ func (p *basicPodStartupLatencyTracker) DeletePodStartupState(podUID types.UID) delete(p.pods, podUID) } - From 78e4b114705bb7681881bac198f4d627426543f7 Mon Sep 17 00:00:00 2001 From: Ali Abbasi Alaei Date: Fri, 25 Apr 2025 13:44:13 -0400 Subject: [PATCH 3/3] Fix fake runtime's image pull --- pkg/kubelet/container/testing/fake_runtime.go | 47 ++++++++++--------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/pkg/kubelet/container/testing/fake_runtime.go b/pkg/kubelet/container/testing/fake_runtime.go index fb90b621ce410..ef7858d0c5b46 100644 --- a/pkg/kubelet/container/testing/fake_runtime.go +++ b/pkg/kubelet/container/testing/fake_runtime.go @@ -322,6 +322,30 @@ func (f *FakeRuntime) GetContainerLogs(_ context.Context, pod *v1.Pod, container func (f *FakeRuntime) PullImage(ctx context.Context, image kubecontainer.ImageSpec, creds []credentialprovider.TrackedAuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, *credentialprovider.TrackedAuthConfig, error) { f.Lock() f.CalledFunctions = append(f.CalledFunctions, "PullImage") + + if f.imagePullTokenBucket == nil { + f.imagePullTokenBucket = make(chan bool, 1) + } + if f.imagePullErrBucket == nil { + f.imagePullErrBucket = make(chan error, 1) + } + + blockImagePulls := f.BlockImagePulls + f.Unlock() + + if blockImagePulls { + // Block the function before adding the image to f.ImageList + select { + case <-ctx.Done(): + case <-f.imagePullTokenBucket: + case pullImageErr := <-f.imagePullErrBucket: + return "", nil, pullImageErr + } + } + + f.Lock() + defer f.Unlock() + if f.Err == nil { i := kubecontainer.Image{ ID: image.Image, @@ -336,28 +360,7 @@ func (f *FakeRuntime) PullImage(ctx context.Context, image kubecontainer.ImageSp retCreds = &creds[0] } - if !f.BlockImagePulls { - f.Unlock() - return image.Image, retCreds, f.Err - } - - retErr := f.Err - if f.imagePullTokenBucket == nil { - f.imagePullTokenBucket = make(chan bool, 1) - } - if f.imagePullErrBucket == nil { - f.imagePullErrBucket = make(chan error, 1) - } - // Unlock before waiting for UnblockImagePulls calls, to avoid deadlock. - f.Unlock() - select { - case <-ctx.Done(): - case <-f.imagePullTokenBucket: - case pullImageErr := <-f.imagePullErrBucket: - return image.Image, retCreds, pullImageErr - } - - return image.Image, retCreds, retErr + return image.Image, retCreds, f.Err } // UnblockImagePulls unblocks a certain number of image pulls, if BlockImagePulls is true.