From 3b52e436f1492b3b3304e230992808ebb3ddff20 Mon Sep 17 00:00:00 2001 From: Adam Rozman Date: Fri, 4 Jul 2025 11:44:14 +0300 Subject: [PATCH] Limit registry connections to prevent 503 errors during push Add HTTP client configuration with connection limits to prevent registry overload when pushing images. This addresses 503 errors that occur when too many parallel requests are made to registries. Changes: - Add MaxConnsPerHost (default: 5) and MaxIdleConns (default: 50) options - Add RequestTimeout configuration for large uploads - Apply limits to both HTTPS and HTTP fallback scenarios - Maintain backward compatibility with existing configurations Signed-off-by: Adam Rozman --- cmd/nerdctl/image/image_push.go | 36 +++ pkg/api/types/image_types.go | 10 + pkg/cmd/image/push.go | 46 ++- .../dockerconfigresolver.go | 300 +++++++++++++++++- 4 files changed, 374 insertions(+), 18 deletions(-) diff --git a/cmd/nerdctl/image/image_push.go b/cmd/nerdctl/image/image_push.go index 47104a4b7e5..e0cb4cc8143 100644 --- a/cmd/nerdctl/image/image_push.go +++ b/cmd/nerdctl/image/image_push.go @@ -69,6 +69,17 @@ func PushCommand() *cobra.Command { cmd.Flags().Bool(allowNonDistFlag, false, "Allow pushing images with non-distributable blobs") + // #region connection limit flags + cmd.Flags().Int("max-conns-per-host", 5, "Maximum number of connections per registry host") + cmd.Flags().Int("max-idle-conns", 50, "Maximum number of idle connections") + cmd.Flags().Int("request-timeout", 300, "Request timeout in seconds") + // #endregion + + // #region retry flags + cmd.Flags().Int("max-retries", 3, "Maximum number of retry attempts for 503 errors") + cmd.Flags().Int("retry-initial-delay", 1000, "Initial delay before first retry in milliseconds") + // #endregion + return cmd } @@ -113,6 +124,26 @@ func pushOptions(cmd *cobra.Command) (types.ImagePushOptions, error) { if err != nil { return types.ImagePushOptions{}, err } + maxConnsPerHost, err := cmd.Flags().GetInt("max-conns-per-host") + if err != nil { + return types.ImagePushOptions{}, err + } + maxIdleConns, err := cmd.Flags().GetInt("max-idle-conns") + if err != nil { + return types.ImagePushOptions{}, err + } + requestTimeout, err := cmd.Flags().GetInt("request-timeout") + if err != nil { + return types.ImagePushOptions{}, err + } + maxRetries, err := cmd.Flags().GetInt("max-retries") + if err != nil { + return types.ImagePushOptions{}, err + } + retryInitialDelay, err := cmd.Flags().GetInt("retry-initial-delay") + if err != nil { + return types.ImagePushOptions{}, err + } return types.ImagePushOptions{ GOptions: globalOptions, SignOptions: signOptions, @@ -124,6 +155,11 @@ func pushOptions(cmd *cobra.Command) (types.ImagePushOptions, error) { IpfsAddress: ipfsAddress, Quiet: quiet, AllowNondistributableArtifacts: allowNonDist, + MaxConnsPerHost: maxConnsPerHost, + MaxIdleConns: maxIdleConns, + RequestTimeout: requestTimeout, + MaxRetries: maxRetries, + RetryInitialDelay: retryInitialDelay, Stdout: cmd.OutOrStdout(), }, nil } diff --git a/pkg/api/types/image_types.go b/pkg/api/types/image_types.go index 5ff507ccc7c..38b22051f9a 100644 --- a/pkg/api/types/image_types.go +++ b/pkg/api/types/image_types.go @@ -200,6 +200,16 @@ type ImagePushOptions struct { Quiet bool // AllowNondistributableArtifacts allow pushing non-distributable artifacts AllowNondistributableArtifacts bool + // MaxConnsPerHost maximum number of connections per registry host (default: 5) + MaxConnsPerHost int + // MaxIdleConns maximum number of idle connections (default: 50) + MaxIdleConns int + // RequestTimeout timeout for registry requests in seconds (default: 300) + RequestTimeout int + // MaxRetries maximum number of retry attempts for 503 errors (default: 3) + MaxRetries int + // RetryInitialDelay initial delay before first retry in milliseconds (default: 1000) + RetryInitialDelay int } // RemoteSnapshotterFlags are used for pulling with remote snapshotters diff --git a/pkg/cmd/image/push.go b/pkg/cmd/image/push.go index 8731b0cfc94..95cca5fb4b1 100644 --- a/pkg/cmd/image/push.go +++ b/pkg/cmd/image/push.go @@ -24,6 +24,7 @@ import ( "net/http" "os" "path/filepath" + "time" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -34,7 +35,6 @@ import ( "github.com/containerd/containerd/v2/core/images/converter" "github.com/containerd/containerd/v2/core/remotes" "github.com/containerd/containerd/v2/core/remotes/docker" - dockerconfig "github.com/containerd/containerd/v2/core/remotes/docker/config" "github.com/containerd/containerd/v2/pkg/reference" "github.com/containerd/log" "github.com/containerd/stargz-snapshotter/estargz" @@ -165,17 +165,29 @@ func Push(ctx context.Context, client *containerd.Client, rawRef string, options } dOpts = append(dOpts, dockerconfigresolver.WithHostsDirs(options.GOptions.HostsDir)) - ho, err := dockerconfigresolver.NewHostOptions(ctx, refDomain, dOpts...) - if err != nil { - return err + // Configure connection limits to prevent registry overload (503 errors) + if options.MaxConnsPerHost > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithMaxConnsPerHost(options.MaxConnsPerHost)) } - - resolverOpts := docker.ResolverOptions{ - Tracker: pushTracker, - Hosts: dockerconfig.ConfigureHosts(ctx, *ho), + if options.MaxIdleConns > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithMaxIdleConns(options.MaxIdleConns)) + } + if options.RequestTimeout > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithRequestTimeout(time.Duration(options.RequestTimeout)*time.Second)) } + if options.MaxRetries > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithMaxRetries(options.MaxRetries)) + } + if options.RetryInitialDelay > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithRetryInitialDelay(time.Duration(options.RetryInitialDelay)*time.Millisecond)) + } + // Use the local push tracker for this operation + dOpts = append(dOpts, dockerconfigresolver.WithTracker(pushTracker)) - resolver := docker.NewResolver(resolverOpts) + resolver, err := dockerconfigresolver.New(ctx, refDomain, dOpts...) + if err != nil { + return err + } if err = pushFunc(resolver); err != nil { // In some circumstance (e.g. people just use 80 port to support pure http), the error will contain message like "dial tcp : connection refused" if !errors.Is(err, http.ErrSchemeMismatch) && !errutil.IsErrConnectionRefused(err) { @@ -184,6 +196,22 @@ func Push(ctx context.Context, client *containerd.Client, rawRef string, options if options.GOptions.InsecureRegistry { log.G(ctx).WithError(err).Warnf("server %q does not seem to support HTTPS, falling back to plain HTTP", refDomain) dOpts = append(dOpts, dockerconfigresolver.WithPlainHTTP(true)) + // Apply same connection limits for HTTP fallback + if options.MaxConnsPerHost > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithMaxConnsPerHost(options.MaxConnsPerHost)) + } + if options.MaxIdleConns > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithMaxIdleConns(options.MaxIdleConns)) + } + if options.RequestTimeout > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithRequestTimeout(time.Duration(options.RequestTimeout)*time.Second)) + } + if options.MaxRetries > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithMaxRetries(options.MaxRetries)) + } + if options.RetryInitialDelay > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithRetryInitialDelay(time.Duration(options.RetryInitialDelay)*time.Millisecond)) + } resolver, err = dockerconfigresolver.New(ctx, refDomain, dOpts...) if err != nil { return err diff --git a/pkg/imgutil/dockerconfigresolver/dockerconfigresolver.go b/pkg/imgutil/dockerconfigresolver/dockerconfigresolver.go index 8577b8e2bc6..0c1ca331533 100644 --- a/pkg/imgutil/dockerconfigresolver/dockerconfigresolver.go +++ b/pkg/imgutil/dockerconfigresolver/dockerconfigresolver.go @@ -20,6 +20,14 @@ import ( "context" "crypto/tls" "errors" + "fmt" + "io" + "math" + "net" + "net/http" + "strings" + "sync" + "time" "github.com/containerd/containerd/v2/core/remotes" "github.com/containerd/containerd/v2/core/remotes/docker" @@ -30,11 +38,156 @@ import ( var PushTracker = docker.NewInMemoryTracker() +// Global semaphores per registry host to enforce true concurrency limits +var ( + semaphoresMutex sync.RWMutex + semaphores = make(map[string]chan struct{}) +) + +// getSemaphore returns or creates a semaphore for a given host with the specified limit +func getSemaphore(host string, limit int) chan struct{} { + semaphoresMutex.Lock() + defer semaphoresMutex.Unlock() + + key := fmt.Sprintf("%s:%d", host, limit) + if sem, exists := semaphores[key]; exists { + return sem + } + + // Create a new semaphore with the specified limit + sem := make(chan struct{}, limit) + semaphores[key] = sem + log.L.Debugf("Created semaphore for %s with limit %d", host, limit) + return sem +} + +// semaphoreTransport wraps an http.RoundTripper to enforce true concurrency limits using semaphores +type semaphoreTransport struct { + transport http.RoundTripper + limit int +} + +// RoundTrip implements http.RoundTripper with semaphore-based concurrency limiting +func (st *semaphoreTransport) RoundTrip(req *http.Request) (*http.Response, error) { + if st.limit <= 0 { + // No limit, use underlying transport directly + return st.transport.RoundTrip(req) + } + + host := req.URL.Host + sem := getSemaphore(host, st.limit) + + // Acquire semaphore (blocks if limit reached) + log.L.Debugf("Acquiring semaphore for %s (limit %d)", host, st.limit) + sem <- struct{}{} + defer func() { + <-sem // Release semaphore + log.L.Debugf("Released semaphore for %s", host) + }() + + log.L.Debugf("Acquired semaphore for %s, making request", host) + return st.transport.RoundTrip(req) +} + +// retryTransport wraps an http.RoundTripper to add retry logic for 503 errors +type retryTransport struct { + transport http.RoundTripper + maxRetries int + initialDelay time.Duration +} + +// classifies whether the error should trigger a retry and retruns true or false +// depending on the result +func RoundTripErrorClassifier(resp *http.Response, err error, rt *retryTransport, attempt int) bool { + if resp != nil && resp.StatusCode == http.StatusServiceUnavailable { + log.L.Infof("retryTransport.RoundTrip: Retrying due to 503 Service Unavailable error (attempt %d/%d)", attempt+1, rt.maxRetries) + return true + } else if err != nil { + // Check for specific network errors that warrant a retry + if errors.Is(err, io.EOF) { + log.L.Debugf("retryTransport.RoundTrip: Retrying due to io.EOF error (attempt %d/%d)", attempt+1, rt.maxRetries) + return true + } else if strings.Contains(err.Error(), "connection reset by peer") { + log.L.Debugf("retryTransport.RoundTrip: Retrying due to 'connection reset by peer' error (attempt %d/%d)", attempt+1, rt.maxRetries) + return true + } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + log.L.Debugf("retryTransport.RoundTrip: Retrying due to timeout network error: %v (attempt %d/%d)", netErr, attempt+1, rt.maxRetries) + return true + } else if errors.Is(err, context.DeadlineExceeded) { + log.L.Debugf("retryTransport.RoundTrip: Retrying due to context deadline exceeded error (attempt %d/%d)", attempt+1, rt.maxRetries) + return true + } else if errors.Is(err, context.Canceled) { + log.L.Debugf("retryTransport.RoundTrip: Retrying due to context canceled error (attempt %d/%d)", attempt+1, rt.maxRetries) + return true + } + log.L.Debugf("retryTransport.RoundTrip: Not retrying for non-retryable error: %T %v", err, err) + } + return false +} + +// RoundTrip implements http.RoundTripper with retry logic for 503 Service Unavailable errors +func (rt *retryTransport) RoundTrip(req *http.Request) (*http.Response, error) { + log.L.Infof("retryTransport.RoundTrip: Starting request to %s (maxRetries=%d)", req.URL.Host, rt.maxRetries) + + for attempt := 0; attempt <= rt.maxRetries; attempt++ { + // Clone the request for potential retries + reqClone := req.Clone(req.Context()) + + resp, err := rt.transport.RoundTrip(reqClone) + + statusCode := 0 + if resp != nil { + statusCode = resp.StatusCode + } + log.L.Infof("retryTransport.RoundTrip: attempt %d, err=%v, status=%d", attempt, err, statusCode) + + // Retry logic: retry on 503, EOF, connection reset, or temporary network errors. + // These errors are often transient and can be resolved by a retry. + log.L.Infof("retryTransport.RoundTrip: Evaluating retry conditions - resp=%v, statusCode=%d, StatusServiceUnavailable=%d", resp != nil, statusCode, http.StatusServiceUnavailable) + shouldRetry := RoundTripErrorClassifier(resp, err, rt, attempt) + log.L.Infof("retryTransport.RoundTrip: shouldRetry=%v for attempt %d", shouldRetry, attempt) + if shouldRetry { + // We have a condition that warrants a retry. + if attempt == rt.maxRetries { + log.L.Debugf("Max retries (%d) exceeded for request to %s", rt.maxRetries, req.URL.Host) + return resp, err // Return the last response and error + } + + // Close the response body before retrying. + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + + // Calculate exponential backoff delay: initialDelay * 2^attempt + delay := time.Duration(float64(rt.initialDelay) * math.Pow(2, float64(attempt))) + log.L.Debugf("Request to %s failed, retrying in %v (attempt %d/%d)", + req.URL.Host, delay, attempt+1, rt.maxRetries) + + // Wait before retrying. + time.Sleep(delay) + continue // Continue to the next retry attempt + } + + // If we are here, it means we are not retrying. + log.L.Infof("retryTransport.RoundTrip: Not retrying, returning response (status=%d, err=%v)", statusCode, err) + return resp, err + } + + // This should never be reached, but return error just in case + return nil, fmt.Errorf("unexpected retry logic error") +} + type opts struct { - plainHTTP bool - skipVerifyCerts bool - hostsDirs []string - authCreds AuthCreds + plainHTTP bool + skipVerifyCerts bool + hostsDirs []string + AuthCreds AuthCreds + maxConnsPerHost int + maxIdleConns int + requestTimeout time.Duration + maxRetries int + retryInitialDelay time.Duration + tracker docker.StatusTrackLocker } // Opt for New @@ -64,7 +217,49 @@ func WithHostsDirs(orig []string) Opt { func WithAuthCreds(ac AuthCreds) Opt { return func(o *opts) { - o.authCreds = ac + o.AuthCreds = ac + } +} + +// WithMaxConnsPerHost sets the maximum number of connections per host +func WithMaxConnsPerHost(n int) Opt { + return func(o *opts) { + o.maxConnsPerHost = n + } +} + +// WithMaxIdleConns sets the maximum number of idle connections +func WithMaxIdleConns(n int) Opt { + return func(o *opts) { + o.maxIdleConns = n + } +} + +// WithRequestTimeout sets the request timeout +func WithRequestTimeout(d time.Duration) Opt { + return func(o *opts) { + o.requestTimeout = d + } +} + +// WithMaxRetries sets the maximum number of retry attempts for 503 errors +func WithMaxRetries(n int) Opt { + return func(o *opts) { + o.maxRetries = n + } +} + +// WithRetryInitialDelay sets the initial delay before first retry +func WithRetryInitialDelay(d time.Duration) Opt { + return func(o *opts) { + o.retryInitialDelay = d + } +} + +// WithTracker sets a custom status tracker +func WithTracker(tracker docker.StatusTrackLocker) Opt { + return func(o *opts) { + o.tracker = tracker } } @@ -102,8 +297,8 @@ func NewHostOptions(ctx context.Context, refHostname string, optFuncs ...Opt) (* return dir, nil } - if o.authCreds != nil { - ho.Credentials = o.authCreds + if o.AuthCreds != nil { + ho.Credentials = o.AuthCreds } else { authCreds, err := NewAuthCreds(refHostname) if err != nil { @@ -140,19 +335,106 @@ func NewHostOptions(ctx context.Context, refHostname string, optFuncs ...Opt) (* // $DOCKER_CONFIG defaults to "~/.docker". // // refHostname is like "docker.io". +type customResolver struct { + remotes.Resolver + client *http.Client +} + +func (r *customResolver) Client(ctx context.Context, host string) (*http.Client, error) { + return r.client, nil +} + func New(ctx context.Context, refHostname string, optFuncs ...Opt) (remotes.Resolver, error) { ho, err := NewHostOptions(ctx, refHostname, optFuncs...) if err != nil { return nil, err } + // Configure HTTP client with connection limits to prevent registry overload + var o opts + for _, of := range optFuncs { + of(&o) + } + + // Use custom tracker if provided, otherwise use global PushTracker + tracker := PushTracker + if o.tracker != nil { + tracker = o.tracker + } + + // Build the custom transport chain first + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } + + // Apply connection limits + if o.maxConnsPerHost > 0 { + transport.MaxConnsPerHost = o.maxConnsPerHost + } + if o.maxIdleConns > 0 { + transport.MaxIdleConns = o.maxIdleConns + } + + finalTransport := http.RoundTripper(transport) + + // Wrap transport with semaphore-based concurrency limiting first + if o.maxConnsPerHost > 0 { + finalTransport = &semaphoreTransport{ + transport: finalTransport, + limit: o.maxConnsPerHost, + } + log.L.Debugf("Enabled semaphore-based concurrency limiting: limit=%d", o.maxConnsPerHost) + } + + // Then, wrap with retry logic if retries are configured + if o.maxRetries > 0 { + retryDelay := o.retryInitialDelay + if retryDelay == 0 { + retryDelay = 1000 * time.Millisecond // Default to 1 second + } + finalTransport = &retryTransport{ + transport: finalTransport, + maxRetries: o.maxRetries, + initialDelay: retryDelay, + } + log.L.Infof("Enabled retry logic: maxRetries=%d, initialDelay=%v for %s", o.maxRetries, retryDelay, refHostname) + } + + client := &http.Client{ + Transport: finalTransport, + } + + if o.requestTimeout > 0 { + client.Timeout = o.requestTimeout + } + + // Set up the host options with our custom client via UpdateClient + ho.UpdateClient = func(defaultClient *http.Client) error { + // Replace the default client's transport with our custom retry transport + defaultClient.Transport = finalTransport + if o.requestTimeout > 0 { + defaultClient.Timeout = o.requestTimeout + } + return nil + } + resolverOpts := docker.ResolverOptions{ - Tracker: PushTracker, + Tracker: tracker, Hosts: dockerconfig.ConfigureHosts(ctx, *ho), } resolver := docker.NewResolver(resolverOpts) - return resolver, nil + return &customResolver{ + Resolver: resolver, + client: client, + }, nil } // AuthCreds is for docker.WithAuthCreds