From 50dcd10e6d3575763f3dc8bf2d5a20c8a67bc84c Mon Sep 17 00:00:00 2001 From: aDisplayName Date: Mon, 4 Aug 2025 11:36:22 -0500 Subject: [PATCH 1/4] feat(controller): Add progress update when caching images --- api/kuik/v1alpha1/cachedimage_types.go | 10 ++ .../crds/cachedimage-crd.yaml | 14 +- .../controller/kuik/cachedimage_controller.go | 47 +++++- internal/registry/registry.go | 147 +++++++++++++++++- internal/registry/registry_test.go | 13 +- 5 files changed, 225 insertions(+), 6 deletions(-) diff --git a/api/kuik/v1alpha1/cachedimage_types.go b/api/kuik/v1alpha1/cachedimage_types.go index 85d4bdbc..37a19c29 100644 --- a/api/kuik/v1alpha1/cachedimage_types.go +++ b/api/kuik/v1alpha1/cachedimage_types.go @@ -33,6 +33,13 @@ type UsedBy struct { Count int `json:"count,omitempty"` } +type Progress struct { + // Total is the total size of all compressed layer blobs + Total int64 `json:"total,omitempty"` + // Available is current size of all compressed layer blobs already written into the cache + Available int64 `json:"available,omitempty"` +} + // CachedImageStatus defines the observed state of CachedImage. type CachedImageStatus struct { // IsCached indicate whether the image is already cached or not @@ -42,6 +49,9 @@ type CachedImageStatus struct { // UsedBy is the list of pods using this image UsedBy UsedBy `json:"usedBy,omitempty"` + // Progress is the current available / total size of compressed layer blobs + Progress Progress `json:"progress,omitempty"` + // Digest is the digest of the cached image Digest string `json:"digest,omitempty"` // UpstreamDigest is the upstream image digest diff --git a/helm/kube-image-keeper/crds/cachedimage-crd.yaml b/helm/kube-image-keeper/crds/cachedimage-crd.yaml index 017a505a..b06febd3 100644 --- a/helm/kube-image-keeper/crds/cachedimage-crd.yaml +++ b/helm/kube-image-keeper/crds/cachedimage-crd.yaml @@ -34,6 +34,9 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.progress.available + name: Downloaded + type: integer name: v1alpha1 schema: openAPIV3Schema: @@ -113,7 +116,16 @@ spec: upstreamDigest: description: UpstreamDigest is the upstream image digest type: string - usedBy: + progress: + type: object + properties: + total: + type: integer + description: Total size of the compressed blob in bytes, including all layers, excluding all manifests. + available: + type: integer + description: Total downloaded / available size of the compressed blob in bytes, including all layers, excluding all manifests. + usedBy: description: UsedBy is the list of pods using this image properties: count: diff --git a/internal/controller/kuik/cachedimage_controller.go b/internal/controller/kuik/cachedimage_controller.go index 96c64c48..fa08418b 100644 --- a/internal/controller/kuik/cachedimage_controller.go +++ b/internal/controller/kuik/cachedimage_controller.go @@ -6,10 +6,12 @@ import ( "net/http" "strconv" "strings" + "sync" "time" "github.com/distribution/reference" "github.com/go-logr/logr" + v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/remote" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -350,7 +352,50 @@ func (r *CachedImageReconciler) cacheImage(cachedImage *kuikv1alpha1.CachedImage return err } - err = registry.CacheImage(cachedImage.Spec.SourceImage, desc, r.Architectures) + // Prepare callbacks to update progress during caching + var statusLock sync.Mutex + + lastUpdateTime := time.Now() + lastWriteComplete := int64(0) + totalSizeAvailable := false + onUpdated := func(update v1.Update) { + + needUpdate := false + if lastWriteComplete != update.Complete && update.Complete == update.Total { + // Update is needed whenever the writing complmetes. + needUpdate = true + } + + if time.Since(lastUpdateTime).Seconds() >= 5 { + // Update is needed if last update is more than 5 seconds ago + needUpdate = true + } + + statusLock.Lock() + defer statusLock.Unlock() + if needUpdate && !totalSizeAvailable { + updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) { + cachedImage.Status.Progress.Total = update.Total + cachedImage.Status.Progress.Available = update.Complete + }) + + lastUpdateTime = time.Now() + } + lastWriteComplete = update.Complete + } + + onUpdateFinalSize := func(totalSize int64) { + statusLock.Lock() + totalSizeAvailable = true // Disable future progress update. + statusLock.Unlock() + + updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) { + cachedImage.Status.Progress.Total = totalSize + cachedImage.Status.Progress.Available = totalSize + }) + } + + err = registry.CacheImage(cachedImage.Spec.SourceImage, desc, r.Architectures, onUpdated, onUpdateFinalSize) statusErr = updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) { if err == nil { diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 3c187601..f8c7460e 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -15,6 +15,7 @@ import ( "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/mutate" + "github.com/google/go-containerregistry/pkg/v1/partial" "github.com/google/go-containerregistry/pkg/v1/remote" "github.com/google/go-containerregistry/pkg/v1/remote/transport" "github.com/google/go-containerregistry/pkg/v1/types" @@ -123,16 +124,32 @@ func DeleteImage(imageName string) error { return remote.Delete(digest) } -func CacheImage(imageName string, desc *remote.Descriptor, architectures []string) error { +// Perform an image caching, and update the caching progress +// callback: Local cache registry write progress update call back. The total size written to the cache registry may be less then the total size of the image, if there are duplicated or existing layer already. +// onUpdateTotalSize: Total image size callback at the end of the caching. Size of all layers will be included, regardless whether they are already in cache registry. +func CacheImage(imageName string, desc *remote.Descriptor, architectures []string, callback func(v1.Update), onUpdateTotalSize func(int64)) error { + destRef, err := parseLocalReference(imageName) if err != nil { return err } + progressUpdate := make(chan v1.Update, 100) + // The channel will be closed by remote.Write / remote.WriteIndex call with remote.WithProgress option. + + go func() { + for update := range progressUpdate { + if callback != nil { + callback(update) + } + } + }() + switch desc.MediaType { case types.OCIImageIndex, types.DockerManifestList: index, err := desc.ImageIndex() if err != nil { + close(progressUpdate) return err } @@ -145,17 +162,65 @@ func CacheImage(imageName string, desc *remote.Descriptor, architectures []strin return true }) - if err := remote.WriteIndex(destRef, filteredIndex); err != nil { + if err := remote.WriteIndex(destRef, filteredIndex, remote.WithProgress(progressUpdate)); err != nil { return err } + + if onUpdateTotalSize != nil { + // Calculate total compressed size for image blobs + totalSize, err := getImageSizeByManifestIndex(filteredIndex) + if err != nil { + return nil + } + + onUpdateTotalSize(totalSize) + } default: image, err := desc.Image() if err != nil { + close(progressUpdate) return err } - if err := remote.Write(destRef, image); err != nil { + if err := remote.Write(destRef, image, remote.WithProgress(progressUpdate)); err != nil { return err } + + if onUpdateTotalSize != nil { + var totalSize int64 + + // We will ignore the size of the manifest, as well as the size of config file. + // Only blob size is calculated. + // The code snippet to include config and manifest file size is being kept here for future reference. + /* + + manifestSize, err := image.Size() + if err != nil { + return nil + } + totalSize += manifestSize + config, err := image.Manifest() + if err != nil { + return nil + } + totalSize += config.Config.Size + */ + + // Get layers and calculate total size + layers, err := image.Layers() + if err != nil { + return nil // Ignore + } + + for _, layer := range layers { + size, err := layer.Size() + if err != nil { + return nil + } + totalSize += size + } + + onUpdateTotalSize(totalSize) + } } return nil @@ -228,3 +293,79 @@ func ContainerAnnotationKey(containerName string, initContainer bool) string { return fmt.Sprintf(template, containerName) } + +// Calculate total compressed layer blob size by given Image manifest +func getImageSizeByImageManifest(im v1.Image) (int64, error) { + var totalSize int64 + totalSize = 0 + + // We will ignore the size of manifest file here. The code snippet is list below for information. + /* + manifestSize, err := im.Size() + if err != nil { + return 0, err + } + totalSize += manifestSize + */ + + layers, err := im.Layers() + if err != nil { + return 0, err + } + for _, layer := range layers { + layerSize, err := layer.Size() + if err != nil { + return 0, err + } + totalSize += layerSize + } + return totalSize, nil +} + +// Calculate total compressed layer blob size by given ImageIndex +// Reference: https://github.com/google/go-containerregistry/blob/59a4b85930392a30c39462519adc8a2026d47181/pkg/v1/remote/pusher.go#L380 +func getImageSizeByManifestIndex(tt v1.ImageIndex) (int64, error) { + var totalSize int64 + totalSize = 0 + + // We will ignore the size of manifest file here. The code snippet is list below for information. + /* + manifestSize, err := tt.Size() + if err != nil { + return 0, err + } + totalSize += manifestSize + */ + children, err := partial.Manifests(tt) + if err != nil { + return 0, err + } + + for _, child := range children { + child := child + switch child.(type) { + case v1.ImageIndex: + size, err := getImageSizeByManifestIndex(child.(v1.ImageIndex)) + if err != nil { + return 0, err + } + totalSize += size + + case v1.Image: + imageSize, err := getImageSizeByImageManifest(child.(v1.Image)) + if err != nil { + return 0, err + } + totalSize += imageSize + + case v1.Layer: + layerSize, err := child.(v1.Layer).Size() + if err != nil { + return 0, err + } + totalSize += layerSize + } + } + + return totalSize, nil +} diff --git a/internal/registry/registry_test.go b/internal/registry/registry_test.go index af135502..c1453e21 100644 --- a/internal/registry/registry_test.go +++ b/internal/registry/registry_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/google/go-containerregistry/pkg/name" + v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/remote" "github.com/google/go-containerregistry/pkg/v1/remote/transport" . "github.com/onsi/gomega" @@ -320,12 +321,22 @@ func Test_CacheImage(t *testing.T) { desc, err := remote.Get(sourceRef) g.Expect(err).To(BeNil()) - err = CacheImage(imageName, desc, []string{"amd64"}) + // Prepare progress update callbacks + onUpdated := func(update v1.Update) { + } + + var finalReportedSize int64 + onFinal := func(totalSize int64) { + finalReportedSize = totalSize + } + + err = CacheImage(imageName, desc, []string{"amd64"}, onUpdated, onFinal) if tt.wantErr != "" { g.Expect(err).To(BeAssignableToTypeOf(tt.errType)) g.Expect(err).To(MatchError(ContainSubstring(tt.wantErr))) } else { g.Expect(err).ToNot(HaveOccurred()) + g.Expect(finalReportedSize).To(Equal(int64(2107098))) } }) } From 52c10133e6b1ca3b31ae2fc8686d0155abf17a7f Mon Sep 17 00:00:00 2001 From: aDisplayName Date: Mon, 4 Aug 2025 12:25:03 -0500 Subject: [PATCH 2/4] fix: Static code check error --- .../controller/kuik/cachedimage_controller.go | 16 ++++------------ internal/registry/registry.go | 9 ++++----- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/internal/controller/kuik/cachedimage_controller.go b/internal/controller/kuik/cachedimage_controller.go index fa08418b..4237fa52 100644 --- a/internal/controller/kuik/cachedimage_controller.go +++ b/internal/controller/kuik/cachedimage_controller.go @@ -360,21 +360,13 @@ func (r *CachedImageReconciler) cacheImage(cachedImage *kuikv1alpha1.CachedImage totalSizeAvailable := false onUpdated := func(update v1.Update) { - needUpdate := false - if lastWriteComplete != update.Complete && update.Complete == update.Total { - // Update is needed whenever the writing complmetes. - needUpdate = true - } - - if time.Since(lastUpdateTime).Seconds() >= 5 { - // Update is needed if last update is more than 5 seconds ago - needUpdate = true - } + isCompleted := lastWriteComplete != update.Complete && update.Complete == update.Total + needUpdate := time.Since(lastUpdateTime).Seconds() >= 5 || isCompleted statusLock.Lock() defer statusLock.Unlock() if needUpdate && !totalSizeAvailable { - updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) { + _ = updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) { cachedImage.Status.Progress.Total = update.Total cachedImage.Status.Progress.Available = update.Complete }) @@ -389,7 +381,7 @@ func (r *CachedImageReconciler) cacheImage(cachedImage *kuikv1alpha1.CachedImage totalSizeAvailable = true // Disable future progress update. statusLock.Unlock() - updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) { + _ = updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) { cachedImage.Status.Progress.Total = totalSize cachedImage.Status.Progress.Available = totalSize }) diff --git a/internal/registry/registry.go b/internal/registry/registry.go index f8c7460e..b88c2754 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -342,24 +342,23 @@ func getImageSizeByManifestIndex(tt v1.ImageIndex) (int64, error) { } for _, child := range children { - child := child - switch child.(type) { + switch typedChild := child.(type) { case v1.ImageIndex: - size, err := getImageSizeByManifestIndex(child.(v1.ImageIndex)) + size, err := getImageSizeByManifestIndex(typedChild) if err != nil { return 0, err } totalSize += size case v1.Image: - imageSize, err := getImageSizeByImageManifest(child.(v1.Image)) + imageSize, err := getImageSizeByImageManifest(typedChild) if err != nil { return 0, err } totalSize += imageSize case v1.Layer: - layerSize, err := child.(v1.Layer).Size() + layerSize, err := typedChild.Size() if err != nil { return 0, err } From 63964911ba856ef91ccb260decf40aa54c0395a5 Mon Sep 17 00:00:00 2001 From: aDisplayName Date: Mon, 4 Aug 2025 13:23:34 -0500 Subject: [PATCH 3/4] fix(crd): Fix crd manifest --- helm/kube-image-keeper/crds/cachedimage-crd.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helm/kube-image-keeper/crds/cachedimage-crd.yaml b/helm/kube-image-keeper/crds/cachedimage-crd.yaml index b06febd3..73deedca 100644 --- a/helm/kube-image-keeper/crds/cachedimage-crd.yaml +++ b/helm/kube-image-keeper/crds/cachedimage-crd.yaml @@ -125,7 +125,7 @@ spec: available: type: integer description: Total downloaded / available size of the compressed blob in bytes, including all layers, excluding all manifests. - usedBy: + usedBy: description: UsedBy is the list of pods using this image properties: count: From 0142b8830df38b422f2fbf2715c7da1536cd1f9b Mon Sep 17 00:00:00 2001 From: aDisplayName Date: Mon, 4 Aug 2025 13:26:31 -0500 Subject: [PATCH 4/4] fix: code refactor --- internal/registry/registry.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/registry/registry.go b/internal/registry/registry.go index b88c2754..18cf2bdd 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -125,9 +125,9 @@ func DeleteImage(imageName string) error { } // Perform an image caching, and update the caching progress -// callback: Local cache registry write progress update call back. The total size written to the cache registry may be less then the total size of the image, if there are duplicated or existing layer already. +// onProgressUpdate: Local cache registry write progress update call back. The total size written to the cache registry may be less then the total size of the image, if there are duplicated or existing layer already. // onUpdateTotalSize: Total image size callback at the end of the caching. Size of all layers will be included, regardless whether they are already in cache registry. -func CacheImage(imageName string, desc *remote.Descriptor, architectures []string, callback func(v1.Update), onUpdateTotalSize func(int64)) error { +func CacheImage(imageName string, desc *remote.Descriptor, architectures []string, onProgressUpdate func(v1.Update), onUpdateTotalSize func(int64)) error { destRef, err := parseLocalReference(imageName) if err != nil { @@ -139,8 +139,8 @@ func CacheImage(imageName string, desc *remote.Descriptor, architectures []strin go func() { for update := range progressUpdate { - if callback != nil { - callback(update) + if onProgressUpdate != nil { + onProgressUpdate(update) } } }()