Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/kuik/v1alpha1/cachedimage_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ type UsedBy struct {
Count int `json:"count,omitempty"`
}

type Progress struct {
// Total is the total size of all compressed layer blobs
Total int64 `json:"total,omitempty"`
// Available is current size of all compressed layer blobs already written into the cache
Available int64 `json:"available,omitempty"`
}

// CachedImageStatus defines the observed state of CachedImage.
type CachedImageStatus struct {
// IsCached indicate whether the image is already cached or not
Expand All @@ -42,6 +49,9 @@ type CachedImageStatus struct {
// UsedBy is the list of pods using this image
UsedBy UsedBy `json:"usedBy,omitempty"`

// Progress is the current available / total size of compressed layer blobs
Progress Progress `json:"progress,omitempty"`

// Digest is the digest of the cached image
Digest string `json:"digest,omitempty"`
// UpstreamDigest is the upstream image digest
Expand Down
12 changes: 12 additions & 0 deletions helm/kube-image-keeper/crds/cachedimage-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ spec:
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
- jsonPath: .status.progress.available
name: Downloaded
type: integer
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -113,6 +116,15 @@ spec:
upstreamDigest:
description: UpstreamDigest is the upstream image digest
type: string
progress:
type: object
properties:
total:
type: integer
description: Total size of the compressed blob in bytes, including all layers, excluding all manifests.
available:
type: integer
description: Total downloaded / available size of the compressed blob in bytes, including all layers, excluding all manifests.
usedBy:
description: UsedBy is the list of pods using this image
properties:
Expand Down
39 changes: 38 additions & 1 deletion internal/controller/kuik/cachedimage_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"time"

"github.com/distribution/reference"
"github.com/go-logr/logr"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/remote"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -350,7 +352,42 @@ func (r *CachedImageReconciler) cacheImage(cachedImage *kuikv1alpha1.CachedImage
return err
}

err = registry.CacheImage(cachedImage.Spec.SourceImage, desc, r.Architectures)
// Prepare callbacks to update progress during caching
var statusLock sync.Mutex

lastUpdateTime := time.Now()
lastWriteComplete := int64(0)
totalSizeAvailable := false
onUpdated := func(update v1.Update) {

isCompleted := lastWriteComplete != update.Complete && update.Complete == update.Total
needUpdate := time.Since(lastUpdateTime).Seconds() >= 5 || isCompleted

statusLock.Lock()
defer statusLock.Unlock()
if needUpdate && !totalSizeAvailable {
_ = updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) {
cachedImage.Status.Progress.Total = update.Total
cachedImage.Status.Progress.Available = update.Complete
})

lastUpdateTime = time.Now()
}
lastWriteComplete = update.Complete
}

onUpdateFinalSize := func(totalSize int64) {
statusLock.Lock()
totalSizeAvailable = true // Disable future progress update.
statusLock.Unlock()

_ = updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) {
cachedImage.Status.Progress.Total = totalSize
cachedImage.Status.Progress.Available = totalSize
})
}

err = registry.CacheImage(cachedImage.Spec.SourceImage, desc, r.Architectures, onUpdated, onUpdateFinalSize)

statusErr = updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) {
if err == nil {
Expand Down
146 changes: 143 additions & 3 deletions internal/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/google/go-containerregistry/pkg/name"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/mutate"
"github.com/google/go-containerregistry/pkg/v1/partial"
"github.com/google/go-containerregistry/pkg/v1/remote"
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
"github.com/google/go-containerregistry/pkg/v1/types"
Expand Down Expand Up @@ -123,16 +124,32 @@ func DeleteImage(imageName string) error {
return remote.Delete(digest)
}

func CacheImage(imageName string, desc *remote.Descriptor, architectures []string) error {
// Perform an image caching, and update the caching progress
// onProgressUpdate: Local cache registry write progress update call back. The total size written to the cache registry may be less then the total size of the image, if there are duplicated or existing layer already.
// onUpdateTotalSize: Total image size callback at the end of the caching. Size of all layers will be included, regardless whether they are already in cache registry.
func CacheImage(imageName string, desc *remote.Descriptor, architectures []string, onProgressUpdate func(v1.Update), onUpdateTotalSize func(int64)) error {

destRef, err := parseLocalReference(imageName)
if err != nil {
return err
}

progressUpdate := make(chan v1.Update, 100)
// The channel will be closed by remote.Write / remote.WriteIndex call with remote.WithProgress option.

go func() {
for update := range progressUpdate {
if onProgressUpdate != nil {
onProgressUpdate(update)
}
}
}()

switch desc.MediaType {
case types.OCIImageIndex, types.DockerManifestList:
index, err := desc.ImageIndex()
if err != nil {
close(progressUpdate)
return err
}

Expand All @@ -145,17 +162,65 @@ func CacheImage(imageName string, desc *remote.Descriptor, architectures []strin
return true
})

if err := remote.WriteIndex(destRef, filteredIndex); err != nil {
if err := remote.WriteIndex(destRef, filteredIndex, remote.WithProgress(progressUpdate)); err != nil {
return err
}

if onUpdateTotalSize != nil {
// Calculate total compressed size for image blobs
totalSize, err := getImageSizeByManifestIndex(filteredIndex)
if err != nil {
return nil
}

onUpdateTotalSize(totalSize)
}
default:
image, err := desc.Image()
if err != nil {
close(progressUpdate)
return err
}
if err := remote.Write(destRef, image); err != nil {
if err := remote.Write(destRef, image, remote.WithProgress(progressUpdate)); err != nil {
return err
}

if onUpdateTotalSize != nil {
var totalSize int64

// We will ignore the size of the manifest, as well as the size of config file.
// Only blob size is calculated.
// The code snippet to include config and manifest file size is being kept here for future reference.
/*

manifestSize, err := image.Size()
if err != nil {
return nil
}
totalSize += manifestSize
config, err := image.Manifest()
if err != nil {
return nil
}
totalSize += config.Config.Size
*/

// Get layers and calculate total size
layers, err := image.Layers()
if err != nil {
return nil // Ignore
}

for _, layer := range layers {
size, err := layer.Size()
if err != nil {
return nil
}
totalSize += size
}

onUpdateTotalSize(totalSize)
}
}

return nil
Expand Down Expand Up @@ -228,3 +293,78 @@ func ContainerAnnotationKey(containerName string, initContainer bool) string {

return fmt.Sprintf(template, containerName)
}

// Calculate total compressed layer blob size by given Image manifest
func getImageSizeByImageManifest(im v1.Image) (int64, error) {
var totalSize int64
totalSize = 0

// We will ignore the size of manifest file here. The code snippet is list below for information.
/*
manifestSize, err := im.Size()
if err != nil {
return 0, err
}
totalSize += manifestSize
*/

layers, err := im.Layers()
if err != nil {
return 0, err
}
for _, layer := range layers {
layerSize, err := layer.Size()
if err != nil {
return 0, err
}
totalSize += layerSize
}
return totalSize, nil
}

// Calculate total compressed layer blob size by given ImageIndex
// Reference: https://github.com/google/go-containerregistry/blob/59a4b85930392a30c39462519adc8a2026d47181/pkg/v1/remote/pusher.go#L380
func getImageSizeByManifestIndex(tt v1.ImageIndex) (int64, error) {
var totalSize int64
totalSize = 0

// We will ignore the size of manifest file here. The code snippet is list below for information.
/*
manifestSize, err := tt.Size()
if err != nil {
return 0, err
}
totalSize += manifestSize
*/
children, err := partial.Manifests(tt)
if err != nil {
return 0, err
}

for _, child := range children {
switch typedChild := child.(type) {
case v1.ImageIndex:
size, err := getImageSizeByManifestIndex(typedChild)
if err != nil {
return 0, err
}
totalSize += size

case v1.Image:
imageSize, err := getImageSizeByImageManifest(typedChild)
if err != nil {
return 0, err
}
totalSize += imageSize

case v1.Layer:
layerSize, err := typedChild.Size()
if err != nil {
return 0, err
}
totalSize += layerSize
}
}

return totalSize, nil
}
13 changes: 12 additions & 1 deletion internal/registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"github.com/google/go-containerregistry/pkg/name"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/remote"
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -320,12 +321,22 @@ func Test_CacheImage(t *testing.T) {
desc, err := remote.Get(sourceRef)
g.Expect(err).To(BeNil())

err = CacheImage(imageName, desc, []string{"amd64"})
// Prepare progress update callbacks
onUpdated := func(update v1.Update) {
}

var finalReportedSize int64
onFinal := func(totalSize int64) {
finalReportedSize = totalSize
}

err = CacheImage(imageName, desc, []string{"amd64"}, onUpdated, onFinal)
if tt.wantErr != "" {
g.Expect(err).To(BeAssignableToTypeOf(tt.errType))
g.Expect(err).To(MatchError(ContainSubstring(tt.wantErr)))
} else {
g.Expect(err).ToNot(HaveOccurred())
g.Expect(finalReportedSize).To(Equal(int64(2107098)))
}
})
}
Expand Down
Loading