Skip to content

Commit bb0b17c

Browse files
committed
[WIP] bundle: Parallel download and decompression
This commit does the following: - Return a reader from the bundle Download function. - Use the reader to stream the bytes to Extract function. This commit replaces grab client with the net/http client to ensure that the bytes are streamed come in correct order to the Extract func. Currently, only zst decompression is being used in the UncompressWithReader function as it is the primary compression algorithm being used in crc.
1 parent 8a1d173 commit bb0b17c

File tree

10 files changed

+113
-81
lines changed

10 files changed

+113
-81
lines changed

cmd/crc-embedder/cmd/embed.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func downloadDataFiles(goos string, components []string, destDir string) ([]stri
163163
if !shouldDownload(components, componentName) {
164164
continue
165165
}
166-
filename, err := download.Download(context.TODO(), dl.url, destDir, dl.permissions, nil)
166+
_, filename, err := download.Download(context.TODO(), dl.url, destDir, dl.permissions, nil)
167167
if err != nil {
168168
return nil, err
169169
}

pkg/crc/cache/cache.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@ func (c *Cache) getExecutable(destDir string) (string, error) {
154154
destPath := filepath.Join(destDir, archiveName)
155155
err := embed.Extract(archiveName, destPath)
156156
if err != nil {
157-
return download.Download(context.TODO(), c.archiveURL, destDir, 0600, nil)
157+
_, filename, err := download.Download(context.TODO(), c.archiveURL, destDir, 0600, nil)
158+
return filename, err
158159
}
159160

160161
return destPath, err

pkg/crc/image/image.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -116,43 +116,43 @@ func GetPresetName(imageName string) crcpreset.Preset {
116116
return preset
117117
}
118118

119-
func PullBundle(ctx context.Context, imageURI string) (string, error) {
119+
func PullBundle(ctx context.Context, imageURI string) (io.Reader, string, error) {
120120
imgHandler := imageHandler{
121121
imageURI: strings.TrimPrefix(imageURI, "docker:"),
122122
}
123123
destDir, err := os.MkdirTemp(constants.MachineCacheDir, "tmpBundleImage")
124124
if err != nil {
125-
return "", err
125+
return nil, "", err
126126
}
127127
defer os.RemoveAll(destDir)
128128
imgManifest, err := imgHandler.copyImage(ctx, destDir, os.Stdout)
129129
if err != nil {
130-
return "", err
130+
return nil, "", err
131131
}
132132

133133
logging.Info("Extracting the image bundle layer...")
134134
imgLayer, err := getLayerPath(imgManifest, 0, "application/vnd.oci.image.layer.v1.tar+gzip")
135135
if err != nil {
136-
return "", err
136+
return nil, "", err
137137
}
138138
fileList, err := extract.Uncompress(ctx, filepath.Join(destDir, imgLayer), constants.MachineCacheDir)
139139
if err != nil {
140-
return "", err
140+
return nil, "", err
141141
}
142142
logging.Debugf("Bundle and sign path: %v", fileList)
143143

144144
logging.Info("Verifying the bundle signature...")
145145
if len(fileList) != 2 {
146-
return "", fmt.Errorf("image layer contains more files than expected: %v", fileList)
146+
return nil, "", fmt.Errorf("image layer contains more files than expected: %v", fileList)
147147
}
148148
bundleFilePath, sigFilePath := fileList[0], fileList[1]
149149
if !strings.HasSuffix(sigFilePath, ".crcbundle.sig") {
150150
sigFilePath, bundleFilePath = fileList[0], fileList[1]
151151
}
152152

153153
if err := gpg.Verify(bundleFilePath, sigFilePath); err != nil {
154-
return "", err
154+
return nil, "", err
155155
}
156156

157-
return bundleFilePath, nil
157+
return nil, bundleFilePath, nil
158158
}

pkg/crc/machine/bundle/metadata.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -344,15 +344,16 @@ func getVerifiedHash(url string, file string) (string, error) {
344344
return "", fmt.Errorf("%s hash is missing or shasums are malformed", file)
345345
}
346346

347-
func downloadDefault(ctx context.Context, preset crcPreset.Preset) (string, error) {
347+
func downloadDefault(ctx context.Context, preset crcPreset.Preset) (io.Reader, string, error) {
348348
downloadInfo, err := getBundleDownloadInfo(preset)
349349
if err != nil {
350-
return "", err
350+
return nil, "", err
351351
}
352352
return downloadInfo.Download(ctx, constants.GetDefaultBundlePath(preset), 0664)
353353
}
354354

355-
func Download(ctx context.Context, preset crcPreset.Preset, bundleURI string, enableBundleQuayFallback bool) (string, error) {
355+
func Download(ctx context.Context, preset crcPreset.Preset, bundleURI string, enableBundleQuayFallback bool) (io.Reader, string, error) {
356+
var reader io.Reader
356357
// If we are asked to download
357358
// ~/.crc/cache/crc_podman_libvirt_4.1.1.crcbundle, this means we want
358359
// are downloading the default bundle for this release. This uses a
@@ -361,12 +362,14 @@ func Download(ctx context.Context, preset crcPreset.Preset, bundleURI string, en
361362
if bundleURI == constants.GetDefaultBundlePath(preset) {
362363
switch preset {
363364
case crcPreset.OpenShift, crcPreset.Microshift:
364-
downloadedBundlePath, err := downloadDefault(ctx, preset)
365+
var err error
366+
var downloadedBundlePath string
367+
reader, downloadedBundlePath, err = downloadDefault(ctx, preset)
365368
if err != nil && enableBundleQuayFallback {
366369
logging.Info("Unable to download bundle from mirror, falling back to quay")
367370
return image.PullBundle(ctx, constants.GetDefaultBundleImageRegistry(preset))
368371
}
369-
return downloadedBundlePath, err
372+
return reader, downloadedBundlePath, err
370373
case crcPreset.OKD:
371374
fallthrough
372375
default:
@@ -380,7 +383,7 @@ func Download(ctx context.Context, preset crcPreset.Preset, bundleURI string, en
380383
return image.PullBundle(ctx, bundleURI)
381384
}
382385
// the `bundleURI` parameter turned out to be a local path
383-
return bundleURI, nil
386+
return reader, bundleURI, nil
384387
}
385388

386389
type Version struct {

pkg/crc/machine/bundle/repository.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"io"
78
"os"
89
"path/filepath"
910
"runtime"
@@ -124,6 +125,36 @@ func (bundle *CrcBundleInfo) createSymlinkOrCopyPodmanRemote(binDir string) erro
124125
return bundle.copyExecutableFromBundle(binDir, PodmanExecutable, constants.PodmanRemoteExecutableName)
125126
}
126127

128+
func (repo *Repository) ExtractWithReader(ctx context.Context, reader io.Reader, path string) error {
129+
logging.Debugf("Extracting bundle from reader")
130+
bundleName := filepath.Base(path)
131+
132+
tmpDir := filepath.Join(repo.CacheDir, "tmp-extract")
133+
_ = os.RemoveAll(tmpDir) // clean up before using it
134+
defer func() {
135+
_ = os.RemoveAll(tmpDir) // clean up after using it
136+
}()
137+
138+
if _, err := extract.UncompressWithReader(ctx, reader, tmpDir); err != nil {
139+
return err
140+
}
141+
142+
bundleBaseDir := GetBundleNameWithoutExtension(bundleName)
143+
bundleDir := filepath.Join(repo.CacheDir, bundleBaseDir)
144+
_ = os.RemoveAll(bundleDir)
145+
err := crcerrors.Retry(context.Background(), time.Minute, func() error {
146+
if err := os.Rename(filepath.Join(tmpDir, bundleBaseDir), bundleDir); err != nil {
147+
return &crcerrors.RetriableError{Err: err}
148+
}
149+
return nil
150+
}, 5*time.Second)
151+
if err != nil {
152+
return err
153+
}
154+
155+
return os.Chmod(bundleDir, 0755)
156+
}
157+
127158
func (repo *Repository) Extract(ctx context.Context, path string) error {
128159
bundleName := filepath.Base(path)
129160

@@ -198,8 +229,14 @@ func Use(bundleName string) (*CrcBundleInfo, error) {
198229
return defaultRepo.Use(bundleName)
199230
}
200231

201-
func Extract(ctx context.Context, path string) (*CrcBundleInfo, error) {
202-
if err := defaultRepo.Extract(ctx, path); err != nil {
232+
func Extract(ctx context.Context, reader io.Reader, path string) (*CrcBundleInfo, error) {
233+
if reader == nil {
234+
if err := defaultRepo.Extract(ctx, path); err != nil {
235+
return nil, err
236+
}
237+
return defaultRepo.Get(filepath.Base(path))
238+
}
239+
if err := defaultRepo.ExtractWithReader(ctx, reader, path); err != nil {
203240
return nil, err
204241
}
205242
return defaultRepo.Get(filepath.Base(path))

pkg/crc/machine/start.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,15 @@ func getCrcBundleInfo(ctx context.Context, preset crcPreset.Preset, bundleName,
4848
return bundleInfo, nil
4949
}
5050
logging.Debugf("Failed to load bundle %s: %v", bundleName, err)
51+
5152
logging.Infof("Downloading bundle: %s...", bundleName)
52-
bundlePath, err = bundle.Download(ctx, preset, bundlePath, enableBundleQuayFallback)
53+
reader, bundlePath, err := bundle.Download(ctx, preset, bundlePath, enableBundleQuayFallback)
5354
if err != nil {
5455
return nil, err
5556
}
57+
5658
logging.Infof("Extracting bundle: %s...", bundleName)
57-
if _, err := bundle.Extract(ctx, bundlePath); err != nil {
59+
if _, err := bundle.Extract(ctx, reader, bundlePath); err != nil {
5860
return nil, err
5961
}
6062
return bundle.Use(bundleName)

pkg/crc/preflight/preflight_checks_common.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package preflight
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"os"
78
"path/filepath"
89

@@ -116,13 +117,14 @@ func fixBundleExtracted(bundlePath string, preset crcpreset.Preset, enableBundle
116117
return fmt.Errorf("Cannot create directory %s: %v", bundleDir, err)
117118
}
118119
var err error
120+
var reader io.Reader
119121
logging.Infof("Downloading bundle: %s...", bundlePath)
120-
if bundlePath, err = bundle.Download(context.TODO(), preset, bundlePath, enableBundleQuayFallback); err != nil {
122+
if reader, bundlePath, err = bundle.Download(context.TODO(), preset, bundlePath, enableBundleQuayFallback); err != nil {
121123
return err
122124
}
123125

124126
logging.Infof("Uncompressing %s", bundlePath)
125-
if _, err := bundle.Extract(context.TODO(), bundlePath); err != nil {
127+
if _, err := bundle.Extract(context.TODO(), reader, bundlePath); err != nil {
126128
if errors.Is(err, os.ErrNotExist) {
127129
return errors.Wrap(err, "Use `crc setup -b <bundle-path>`")
128130
}

pkg/download/download.go

Lines changed: 27 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,97 +2,66 @@ package download
22

33
import (
44
"context"
5-
"crypto/sha256"
65
"encoding/hex"
76
"fmt"
87
"io"
8+
"mime"
99
"net/http"
1010
"net/url"
1111
"os"
1212
"path/filepath"
13-
"time"
1413

14+
"github.com/cavaliergopher/grab/v3"
1515
"github.com/crc-org/crc/v2/pkg/crc/logging"
1616
"github.com/crc-org/crc/v2/pkg/crc/network/httpproxy"
1717
"github.com/crc-org/crc/v2/pkg/crc/version"
18-
"github.com/crc-org/crc/v2/pkg/os/terminal"
19-
20-
"github.com/cavaliergopher/grab/v3"
21-
"github.com/cheggaaa/pb/v3"
2218
"github.com/pkg/errors"
2319
)
2420

25-
func doRequest(client *grab.Client, req *grab.Request) (string, error) {
26-
const minSizeForProgressBar = 100_000_000
27-
28-
resp := client.Do(req)
29-
if resp.Size() < minSizeForProgressBar {
30-
<-resp.Done
31-
return resp.Filename, resp.Err()
32-
}
33-
34-
t := time.NewTicker(500 * time.Millisecond)
35-
defer t.Stop()
36-
var bar *pb.ProgressBar
37-
if terminal.IsShowTerminalOutput() {
38-
bar = pb.Start64(resp.Size())
39-
bar.Set(pb.Bytes, true)
40-
// This is the same as the 'Default' template https://github.com/cheggaaa/pb/blob/224e0746e1e7b9c5309d6e2637264bfeb746d043/v3/preset.go#L8-L10
41-
// except that the 'per second' suffix is changed to '/s' (by default it is ' p/s' which is unexpected)
42-
progressBarTemplate := `{{with string . "prefix"}}{{.}} {{end}}{{counters . }} {{bar . }} {{percent . }} {{speed . "%s/s" "??/s"}}{{with string . "suffix"}} {{.}}{{end}}`
43-
bar.SetTemplateString(progressBarTemplate)
44-
defer bar.Finish()
45-
}
46-
47-
loop:
48-
for {
49-
select {
50-
case <-t.C:
51-
if terminal.IsShowTerminalOutput() {
52-
bar.SetCurrent(resp.BytesComplete())
53-
}
54-
case <-resp.Done:
55-
break loop
56-
}
57-
}
58-
59-
return resp.Filename, resp.Err()
60-
}
61-
6221
// Download function takes sha256sum as hex decoded byte
6322
// something like hex.DecodeString("33daf4c03f86120fdfdc66bddf6bfff4661c7ca11c5d")
64-
func Download(ctx context.Context, uri, destination string, mode os.FileMode, sha256sum []byte) (string, error) {
23+
func Download(ctx context.Context, uri, destination string, mode os.FileMode, _ []byte) (io.Reader, string, error) {
6524
logging.Debugf("Downloading %s to %s", uri, destination)
6625

67-
client := grab.NewClient()
68-
client.UserAgent = version.UserAgent()
69-
client.HTTPClient = &http.Client{Transport: httpproxy.HTTPTransport()}
70-
req, err := grab.NewRequest(destination, uri)
26+
req, err := http.NewRequestWithContext(ctx, "GET", uri, nil)
27+
7128
if err != nil {
72-
return "", errors.Wrapf(err, "unable to get request from %s", uri)
29+
return nil, "", errors.Wrapf(err, "unable to get request from %s", uri)
7330
}
31+
client := http.Client{Transport: &http.Transport{}}
7432

7533
if ctx == nil {
7634
panic("ctx is nil, this should not happen")
7735
}
7836
req = req.WithContext(ctx)
7937

80-
if sha256sum != nil {
81-
req.SetChecksum(sha256.New(), sha256sum, true)
38+
resp, err := client.Do(req)
39+
if err != nil {
40+
return nil, "", err
8241
}
8342

84-
filename, err := doRequest(client, req)
43+
var filename, dir string
44+
if filepath.Ext(destination) == "crcbundle" {
45+
dir = filepath.Dir(destination)
46+
} else {
47+
dir = destination
48+
}
49+
if disposition, params, _ := mime.ParseMediaType(resp.Header.Get("Content-Disposition")); disposition == "attachment" {
50+
filename = filepath.Join(dir, params["filename"])
51+
} else {
52+
filename = filepath.Join(dir, filepath.Base(resp.Request.URL.Path))
53+
}
54+
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, mode)
8555
if err != nil {
86-
return "", err
56+
return nil, "", err
8757
}
8858

8959
if err := os.Chmod(filename, mode); err != nil {
9060
_ = os.Remove(filename)
91-
return "", err
61+
return nil, "", err
9262
}
9363

94-
logging.Debugf("Download saved to %v", filename)
95-
return filename, nil
64+
return io.TeeReader(resp.Body, file), filename, nil
9665
}
9766

9867
// InMemory takes a URL and returns a ReadCloser object to the downloaded file
@@ -138,10 +107,10 @@ func NewRemoteFile(uri, sha256sum string) *RemoteFile {
138107

139108
}
140109

141-
func (r *RemoteFile) Download(ctx context.Context, bundlePath string, mode os.FileMode) (string, error) {
110+
func (r *RemoteFile) Download(ctx context.Context, bundlePath string, mode os.FileMode) (io.Reader, string, error) {
142111
sha256bytes, err := hex.DecodeString(r.sha256sum)
143112
if err != nil {
144-
return "", err
113+
return nil, "", err
145114
}
146115
return Download(ctx, r.URI, bundlePath, mode, sha256bytes)
147116
}

pkg/extract/extract.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,21 @@ func Uncompress(ctx context.Context, tarball, targetDir string) ([]string, error
3232
return uncompress(ctx, tarball, targetDir, nil, terminal.IsShowTerminalOutput())
3333
}
3434

35+
func UncompressWithReader(ctx context.Context, reader io.Reader, targetDir string) ([]string, error) {
36+
return uncompressWithReader(ctx, reader, targetDir, nil, terminal.IsShowTerminalOutput())
37+
}
38+
39+
func uncompressWithReader(ctx context.Context, reader io.Reader, targetDir string, fileFilter func(string) bool, showProgress bool) ([]string, error) {
40+
logging.Debugf("Uncompressing from reader to %s", targetDir)
41+
42+
reader, err := zstd.NewReader(reader)
43+
if err != nil {
44+
return nil, err
45+
}
46+
return untar(ctx, reader, targetDir, fileFilter, showProgress)
47+
48+
}
49+
3550
func uncompress(ctx context.Context, tarball, targetDir string, fileFilter func(string) bool, showProgress bool) ([]string, error) {
3651
logging.Debugf("Uncompressing %s to %s", tarball, targetDir)
3752

@@ -86,6 +101,9 @@ func uncompress(ctx context.Context, tarball, targetDir string, fileFilter func(
86101
}
87102
}
88103

104+
func Untar(ctx context.Context, reader io.Reader, targetDir string, fileFilter func(string) bool, showProgress bool) ([]string, error) {
105+
return untar(ctx, reader, targetDir, fileFilter, showProgress)
106+
}
89107
func untar(ctx context.Context, reader io.Reader, targetDir string, fileFilter func(string) bool, showProgress bool) ([]string, error) {
90108
var extractedFiles []string
91109
tarReader := tar.NewReader(reader)

test/extended/util/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func DownloadBundle(bundleLocation string, bundleDestination string, bundleName
125125
return bundleDestination, err
126126
}
127127

128-
filename, err := download.Download(context.TODO(), bundleLocation, bundleDestination, 0644, nil)
128+
_, filename, err := download.Download(context.TODO(), bundleLocation, bundleDestination, 0644, nil)
129129
fmt.Printf("Downloading bundle from %s to %s.\n", bundleLocation, bundleDestination)
130130
if err != nil {
131131
return "", err

0 commit comments

Comments
 (0)