diff --git a/api/kuik/v1alpha1/cachedimage_types.go b/api/kuik/v1alpha1/cachedimage_types.go index 85d4bdbc..37a19c29 100644 --- a/api/kuik/v1alpha1/cachedimage_types.go +++ b/api/kuik/v1alpha1/cachedimage_types.go @@ -33,6 +33,13 @@ type UsedBy struct { Count int `json:"count,omitempty"` } +type Progress struct { + // Total is the total size of all compressed layer blobs + Total int64 `json:"total,omitempty"` + // Available is current size of all compressed layer blobs already written into the cache + Available int64 `json:"available,omitempty"` +} + // CachedImageStatus defines the observed state of CachedImage. type CachedImageStatus struct { // IsCached indicate whether the image is already cached or not @@ -42,6 +49,9 @@ type CachedImageStatus struct { // UsedBy is the list of pods using this image UsedBy UsedBy `json:"usedBy,omitempty"` + // Progress is the current available / total size of compressed layer blobs + Progress Progress `json:"progress,omitempty"` + // Digest is the digest of the cached image Digest string `json:"digest,omitempty"` // UpstreamDigest is the upstream image digest diff --git a/helm/kube-image-keeper/crds/cachedimage-crd.yaml b/helm/kube-image-keeper/crds/cachedimage-crd.yaml index 017a505a..73deedca 100644 --- a/helm/kube-image-keeper/crds/cachedimage-crd.yaml +++ b/helm/kube-image-keeper/crds/cachedimage-crd.yaml @@ -34,6 +34,9 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.progress.available + name: Downloaded + type: integer name: v1alpha1 schema: openAPIV3Schema: @@ -113,6 +116,15 @@ spec: upstreamDigest: description: UpstreamDigest is the upstream image digest type: string + progress: + type: object + properties: + total: + type: integer + description: Total size of the compressed blob in bytes, including all layers, excluding all manifests. + available: + type: integer + description: Total downloaded / available size of the compressed blob in bytes, including all layers, excluding all manifests. usedBy: description: UsedBy is the list of pods using this image properties: diff --git a/internal/controller/kuik/cachedimage_controller.go b/internal/controller/kuik/cachedimage_controller.go index 96c64c48..4237fa52 100644 --- a/internal/controller/kuik/cachedimage_controller.go +++ b/internal/controller/kuik/cachedimage_controller.go @@ -6,10 +6,12 @@ import ( "net/http" "strconv" "strings" + "sync" "time" "github.com/distribution/reference" "github.com/go-logr/logr" + v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/remote" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -350,7 +352,42 @@ func (r *CachedImageReconciler) cacheImage(cachedImage *kuikv1alpha1.CachedImage return err } - err = registry.CacheImage(cachedImage.Spec.SourceImage, desc, r.Architectures) + // Prepare callbacks to update progress during caching + var statusLock sync.Mutex + + lastUpdateTime := time.Now() + lastWriteComplete := int64(0) + totalSizeAvailable := false + onUpdated := func(update v1.Update) { + + isCompleted := lastWriteComplete != update.Complete && update.Complete == update.Total + needUpdate := time.Since(lastUpdateTime).Seconds() >= 5 || isCompleted + + statusLock.Lock() + defer statusLock.Unlock() + if needUpdate && !totalSizeAvailable { + _ = updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) { + cachedImage.Status.Progress.Total = update.Total + cachedImage.Status.Progress.Available = update.Complete + }) + + lastUpdateTime = time.Now() + } + lastWriteComplete = update.Complete + } + + onUpdateFinalSize := func(totalSize int64) { + statusLock.Lock() + totalSizeAvailable = true // Disable future progress update. + statusLock.Unlock() + + _ = updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) { + cachedImage.Status.Progress.Total = totalSize + cachedImage.Status.Progress.Available = totalSize + }) + } + + err = registry.CacheImage(cachedImage.Spec.SourceImage, desc, r.Architectures, onUpdated, onUpdateFinalSize) statusErr = updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) { if err == nil { diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 3c187601..18cf2bdd 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -15,6 +15,7 @@ import ( "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/mutate" + "github.com/google/go-containerregistry/pkg/v1/partial" "github.com/google/go-containerregistry/pkg/v1/remote" "github.com/google/go-containerregistry/pkg/v1/remote/transport" "github.com/google/go-containerregistry/pkg/v1/types" @@ -123,16 +124,32 @@ func DeleteImage(imageName string) error { return remote.Delete(digest) } -func CacheImage(imageName string, desc *remote.Descriptor, architectures []string) error { +// Perform an image caching, and update the caching progress +// onProgressUpdate: Local cache registry write progress update call back. The total size written to the cache registry may be less then the total size of the image, if there are duplicated or existing layer already. +// onUpdateTotalSize: Total image size callback at the end of the caching. Size of all layers will be included, regardless whether they are already in cache registry. +func CacheImage(imageName string, desc *remote.Descriptor, architectures []string, onProgressUpdate func(v1.Update), onUpdateTotalSize func(int64)) error { + destRef, err := parseLocalReference(imageName) if err != nil { return err } + progressUpdate := make(chan v1.Update, 100) + // The channel will be closed by remote.Write / remote.WriteIndex call with remote.WithProgress option. + + go func() { + for update := range progressUpdate { + if onProgressUpdate != nil { + onProgressUpdate(update) + } + } + }() + switch desc.MediaType { case types.OCIImageIndex, types.DockerManifestList: index, err := desc.ImageIndex() if err != nil { + close(progressUpdate) return err } @@ -145,17 +162,65 @@ func CacheImage(imageName string, desc *remote.Descriptor, architectures []strin return true }) - if err := remote.WriteIndex(destRef, filteredIndex); err != nil { + if err := remote.WriteIndex(destRef, filteredIndex, remote.WithProgress(progressUpdate)); err != nil { return err } + + if onUpdateTotalSize != nil { + // Calculate total compressed size for image blobs + totalSize, err := getImageSizeByManifestIndex(filteredIndex) + if err != nil { + return nil + } + + onUpdateTotalSize(totalSize) + } default: image, err := desc.Image() if err != nil { + close(progressUpdate) return err } - if err := remote.Write(destRef, image); err != nil { + if err := remote.Write(destRef, image, remote.WithProgress(progressUpdate)); err != nil { return err } + + if onUpdateTotalSize != nil { + var totalSize int64 + + // We will ignore the size of the manifest, as well as the size of config file. + // Only blob size is calculated. + // The code snippet to include config and manifest file size is being kept here for future reference. + /* + + manifestSize, err := image.Size() + if err != nil { + return nil + } + totalSize += manifestSize + config, err := image.Manifest() + if err != nil { + return nil + } + totalSize += config.Config.Size + */ + + // Get layers and calculate total size + layers, err := image.Layers() + if err != nil { + return nil // Ignore + } + + for _, layer := range layers { + size, err := layer.Size() + if err != nil { + return nil + } + totalSize += size + } + + onUpdateTotalSize(totalSize) + } } return nil @@ -228,3 +293,78 @@ func ContainerAnnotationKey(containerName string, initContainer bool) string { return fmt.Sprintf(template, containerName) } + +// Calculate total compressed layer blob size by given Image manifest +func getImageSizeByImageManifest(im v1.Image) (int64, error) { + var totalSize int64 + totalSize = 0 + + // We will ignore the size of manifest file here. The code snippet is list below for information. + /* + manifestSize, err := im.Size() + if err != nil { + return 0, err + } + totalSize += manifestSize + */ + + layers, err := im.Layers() + if err != nil { + return 0, err + } + for _, layer := range layers { + layerSize, err := layer.Size() + if err != nil { + return 0, err + } + totalSize += layerSize + } + return totalSize, nil +} + +// Calculate total compressed layer blob size by given ImageIndex +// Reference: https://github.com/google/go-containerregistry/blob/59a4b85930392a30c39462519adc8a2026d47181/pkg/v1/remote/pusher.go#L380 +func getImageSizeByManifestIndex(tt v1.ImageIndex) (int64, error) { + var totalSize int64 + totalSize = 0 + + // We will ignore the size of manifest file here. The code snippet is list below for information. + /* + manifestSize, err := tt.Size() + if err != nil { + return 0, err + } + totalSize += manifestSize + */ + children, err := partial.Manifests(tt) + if err != nil { + return 0, err + } + + for _, child := range children { + switch typedChild := child.(type) { + case v1.ImageIndex: + size, err := getImageSizeByManifestIndex(typedChild) + if err != nil { + return 0, err + } + totalSize += size + + case v1.Image: + imageSize, err := getImageSizeByImageManifest(typedChild) + if err != nil { + return 0, err + } + totalSize += imageSize + + case v1.Layer: + layerSize, err := typedChild.Size() + if err != nil { + return 0, err + } + totalSize += layerSize + } + } + + return totalSize, nil +} diff --git a/internal/registry/registry_test.go b/internal/registry/registry_test.go index af135502..c1453e21 100644 --- a/internal/registry/registry_test.go +++ b/internal/registry/registry_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/google/go-containerregistry/pkg/name" + v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/remote" "github.com/google/go-containerregistry/pkg/v1/remote/transport" . "github.com/onsi/gomega" @@ -320,12 +321,22 @@ func Test_CacheImage(t *testing.T) { desc, err := remote.Get(sourceRef) g.Expect(err).To(BeNil()) - err = CacheImage(imageName, desc, []string{"amd64"}) + // Prepare progress update callbacks + onUpdated := func(update v1.Update) { + } + + var finalReportedSize int64 + onFinal := func(totalSize int64) { + finalReportedSize = totalSize + } + + err = CacheImage(imageName, desc, []string{"amd64"}, onUpdated, onFinal) if tt.wantErr != "" { g.Expect(err).To(BeAssignableToTypeOf(tt.errType)) g.Expect(err).To(MatchError(ContainSubstring(tt.wantErr))) } else { g.Expect(err).ToNot(HaveOccurred()) + g.Expect(finalReportedSize).To(Equal(int64(2107098))) } }) }