Skip to content

Commit 240e3f4

Browse files
[apiserver]: merge http utils (timeout) of apiserver v1/v2 (#3946)
* fix: missing timeout deadline in context Signed-off-by: Cheng-Yeh Chung <[email protected]> * feat: merge v1/v2 RetryConfig Signed-off-by: Cheng-Yeh Chung <[email protected]> * feat: merge v1/v2 backoff logic and timeout machenism Signed-off-by: Cheng-Yeh Chung <[email protected]> * update util function Signed-off-by: Cheng-Yeh Chung <[email protected]> * naming consistency Signed-off-by: Cheng-Yeh Chung <[email protected]> * update naming in util func Signed-off-by: Cheng-Yeh Chung <[email protected]> * Update apiserversdk/proxy.go Co-authored-by: Nary Yeh <[email protected]> Signed-off-by: Cheng-Yeh Chung <[email protected]> * simplify http util Signed-off-by: Cheng-Yeh Chung <[email protected]> --------- Signed-off-by: Cheng-Yeh Chung <[email protected]> Co-authored-by: Nary Yeh <[email protected]>
1 parent c85646f commit 240e3f4

File tree

7 files changed

+87
-73
lines changed

7 files changed

+87
-73
lines changed

apiserver/deploy/local/e2e/kustomization.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ images:
77
- name: kuberay/apiserver
88
newName: quay.io/kuberay/apiserver
99
newTag: latest
10+
# Replace NodePort with ClusterIP as we do not need to receive requests from outside the Kubernetes cluster
1011
patches:
1112
- patch: |-
1213
- op: replace
@@ -28,7 +29,6 @@ patches:
2829
kind: Deployment
2930
name: kuberay-apiserver
3031
version: v1
31-
# Replace NodePort with ClusterIP as we do not need to receive requests from outside the Kubernetes cluster
3232
- patch: |-
3333
- op: replace
3434
path: /spec/type

apiserver/pkg/http/client.go

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,14 @@ import (
88
"io"
99
"net/http"
1010
"strconv"
11-
"time"
1211

1312
rpcStatus "google.golang.org/genproto/googleapis/rpc/status"
1413
"google.golang.org/protobuf/encoding/protojson"
1514

16-
apiserverutil "github.com/ray-project/kuberay/apiserversdk/util"
15+
apiserversdkutil "github.com/ray-project/kuberay/apiserversdk/util"
1716
api "github.com/ray-project/kuberay/proto/go_client"
1817
)
1918

20-
type RetryConfig struct {
21-
MaxRetry int
22-
BackoffFactor float64
23-
InitBackoff time.Duration
24-
MaxBackoff time.Duration
25-
OverallTimeout time.Duration
26-
}
27-
2819
type KuberayAPIServerClient struct {
2920
httpClient *http.Client
3021
marshaler *protojson.MarshalOptions
@@ -36,7 +27,7 @@ type KuberayAPIServerClient struct {
3627
// Store http request handling function for unit test purpose.
3728
executeHttpRequest func(httpRequest *http.Request, URL string) ([]byte, *rpcStatus.Status, error)
3829
baseURL string
39-
retryCfg RetryConfig
30+
retryCfg apiserversdkutil.RetryConfig
4031
}
4132

4233
type KuberayAPIServerClientError struct {
@@ -57,7 +48,7 @@ func IsNotFoundError(err error) bool {
5748
return false
5849
}
5950

60-
func NewKuberayAPIServerClient(baseURL string, httpClient *http.Client, retryCfg RetryConfig) *KuberayAPIServerClient {
51+
func NewKuberayAPIServerClient(baseURL string, httpClient *http.Client, retryCfg apiserversdkutil.RetryConfig) *KuberayAPIServerClient {
6152
client := &KuberayAPIServerClient{
6253
httpClient: httpClient,
6354
baseURL: baseURL,
@@ -704,7 +695,7 @@ func (krc *KuberayAPIServerClient) executeRequest(httpRequest *http.Request, URL
704695
break
705696
}
706697

707-
if apiserverutil.IsSuccessfulStatusCode(statusCode) {
698+
if apiserversdkutil.IsSuccessfulStatusCode(statusCode) {
708699
return bodyBytes, nil, nil
709700
}
710701

@@ -721,21 +712,22 @@ func (krc *KuberayAPIServerClient) executeRequest(httpRequest *http.Request, URL
721712
HTTPStatusCode: statusCode,
722713
}
723714

724-
if !apiserverutil.IsRetryableHTTPStatusCodes(statusCode) {
715+
if !apiserversdkutil.IsRetryableHTTPStatusCodes(statusCode) {
725716
break
726717
}
727718

728719
// Backoff before retry
729-
sleep := apiserverutil.GetRetryBackoff(attempt,
720+
sleep := apiserversdkutil.GetRetryBackoff(attempt,
730721
krc.retryCfg.InitBackoff,
731722
krc.retryCfg.BackoffFactor,
732723
krc.retryCfg.MaxBackoff)
733724

734-
select {
735-
case <-time.After(sleep):
736-
// continue to the next retry after backoff
737-
case <-ctx.Done():
738-
return nil, lastStatus, fmt.Errorf("overall timeout reached: %w", ctx.Err())
725+
if ok := apiserversdkutil.CheckContextDeadline(ctx, sleep); !ok {
726+
return nil, lastStatus, fmt.Errorf("retry timeout exceeded context deadline")
727+
}
728+
729+
if err = apiserversdkutil.Sleep(ctx, sleep); err != nil {
730+
return nil, lastStatus, fmt.Errorf("retry canceled during backoff: %w", err)
739731
}
740732

741733
}

apiserver/pkg/http/client_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ func (m *mockTransport) RoundTrip(_ *http.Request) (*http.Response, error) {
5353
}
5454

5555
func TestUnmarshalHttpResponseOK(t *testing.T) {
56-
retryCfg := RetryConfig{
56+
retryCfg := util.RetryConfig{
5757
MaxRetry: util.HTTPClientDefaultMaxRetry,
58-
BackoffFactor: util.HTTPClientDefaultBackoffBase,
58+
BackoffFactor: util.HTTPClientDefaultBackoffFactor,
5959
InitBackoff: util.HTTPClientDefaultInitBackoff,
6060
MaxBackoff: util.HTTPClientDefaultMaxBackoff,
6161
OverallTimeout: util.HTTPClientDefaultOverallTimeout,
@@ -89,9 +89,9 @@ func TestUnmarshalHttpResponseOK(t *testing.T) {
8989

9090
// Unmarshal response fails and check error returned.
9191
func TestUnmarshalHttpResponseFails(t *testing.T) {
92-
retryCfg := RetryConfig{
92+
retryCfg := util.RetryConfig{
9393
MaxRetry: util.HTTPClientDefaultMaxRetry,
94-
BackoffFactor: util.HTTPClientDefaultBackoffBase,
94+
BackoffFactor: util.HTTPClientDefaultBackoffFactor,
9595
InitBackoff: util.HTTPClientDefaultInitBackoff,
9696
MaxBackoff: util.HTTPClientDefaultMaxBackoff,
9797
OverallTimeout: util.HTTPClientDefaultOverallTimeout,
@@ -207,9 +207,9 @@ func TestAPIServerClientRetry(t *testing.T) {
207207
t.Run(tt.name, func(t *testing.T) {
208208
mockClient := &http.Client{Transport: tt.transport}
209209

210-
retryCfg := RetryConfig{
210+
retryCfg := util.RetryConfig{
211211
MaxRetry: tt.maxRetry,
212-
BackoffFactor: util.HTTPClientDefaultBackoffBase,
212+
BackoffFactor: util.HTTPClientDefaultBackoffFactor,
213213
InitBackoff: util.HTTPClientDefaultInitBackoff,
214214
MaxBackoff: util.HTTPClientDefaultMaxBackoff,
215215
OverallTimeout: util.HTTPClientDefaultOverallTimeout,
@@ -261,9 +261,9 @@ func TestAPIServerClientBackoff(t *testing.T) {
261261

262262
mockClient := &http.Client{Transport: mockTransport}
263263

264-
retryCfg := RetryConfig{
264+
retryCfg := util.RetryConfig{
265265
MaxRetry: util.HTTPClientDefaultMaxRetry,
266-
BackoffFactor: util.HTTPClientDefaultBackoffBase,
266+
BackoffFactor: util.HTTPClientDefaultBackoffFactor,
267267
// Set short backoff time
268268
InitBackoff: 1 * time.Millisecond,
269269
MaxBackoff: 50 * time.Millisecond,
@@ -303,9 +303,9 @@ func TestAPIServerClientOverallTimeout(t *testing.T) {
303303

304304
mockClient := &http.Client{Transport: mockTransport}
305305

306-
retryCfg := RetryConfig{
306+
retryCfg := util.RetryConfig{
307307
MaxRetry: util.HTTPClientDefaultMaxRetry,
308-
BackoffFactor: util.HTTPClientDefaultBackoffBase,
308+
BackoffFactor: util.HTTPClientDefaultBackoffFactor,
309309
InitBackoff: 1 * time.Millisecond,
310310
MaxBackoff: 50 * time.Millisecond,
311311
// Set short overall timeout so that the timeout error will be raised
@@ -322,5 +322,5 @@ func TestAPIServerClientOverallTimeout(t *testing.T) {
322322

323323
// Expect a timeout error
324324
require.Error(t, err)
325-
require.Contains(t, err.Error(), "timeout")
325+
require.Contains(t, err.Error(), "retry timeout exceeded context deadline")
326326
}

apiserver/test/e2e/types.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ func withHttpClient() contextOption {
9595
return func(_ *testing.T, testingContext *End2EndTestingContext) error {
9696
testingContext.apiServerHttpClient = &http.Client{Timeout: time.Duration(10) * time.Second}
9797

98-
retryCfg := kuberayHTTP.RetryConfig{
98+
retryCfg := util.RetryConfig{
9999
MaxRetry: util.HTTPClientDefaultMaxRetry,
100-
BackoffFactor: util.HTTPClientDefaultBackoffBase,
100+
BackoffFactor: util.HTTPClientDefaultBackoffFactor,
101101
InitBackoff: util.HTTPClientDefaultInitBackoff,
102102
MaxBackoff: util.HTTPClientDefaultMaxBackoff,
103103
OverallTimeout: util.HTTPClientDefaultOverallTimeout,

apiserversdk/proxy.go

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,20 @@ package apiserversdk
22

33
import (
44
"bytes"
5+
"context"
56
"fmt"
67
"io"
78
"net/http"
89
"net/http/httputil"
910
"net/url"
1011
"strings"
11-
"time"
1212

1313
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1414
"k8s.io/apimachinery/pkg/util/net"
1515
"k8s.io/client-go/kubernetes"
1616
"k8s.io/client-go/rest"
1717

18-
apiserverutil "github.com/ray-project/kuberay/apiserversdk/util"
18+
apiserversdkutil "github.com/ray-project/kuberay/apiserversdk/util"
1919
rayutil "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
2020
)
2121

@@ -95,30 +95,33 @@ func requireKubeRayService(handler http.Handler, k8sClient *kubernetes.Clientset
9595
// retryRoundTripper is a custom implementation of http.RoundTripper that retries HTTP requests.
9696
// It verifies retryable HTTP status codes and retries using exponential backoff.
9797
type retryRoundTripper struct {
98-
base http.RoundTripper
99-
100-
// Num of retries after the initial attempt
101-
maxRetries int
102-
103-
// Retry backoff settings
104-
initBackoff time.Duration
105-
backoffBase float64
106-
maxBackoff time.Duration
98+
base http.RoundTripper
99+
retryCfg apiserversdkutil.RetryConfig
107100
}
108101

109102
func newRetryRoundTripper(base http.RoundTripper) http.RoundTripper {
103+
retryCfg := apiserversdkutil.RetryConfig{
104+
MaxRetry: apiserversdkutil.HTTPClientDefaultMaxRetry,
105+
BackoffFactor: apiserversdkutil.HTTPClientDefaultBackoffFactor,
106+
InitBackoff: apiserversdkutil.HTTPClientDefaultInitBackoff,
107+
MaxBackoff: apiserversdkutil.HTTPClientDefaultMaxBackoff,
108+
OverallTimeout: apiserversdkutil.HTTPClientDefaultOverallTimeout,
109+
}
110+
110111
return &retryRoundTripper{
111-
base: base,
112-
maxRetries: apiserverutil.HTTPClientDefaultMaxRetry,
113-
initBackoff: apiserverutil.HTTPClientDefaultInitBackoff,
114-
backoffBase: apiserverutil.HTTPClientDefaultBackoffBase,
115-
maxBackoff: apiserverutil.HTTPClientDefaultMaxBackoff,
112+
base: base,
113+
retryCfg: retryCfg,
116114
}
117115
}
118116

119117
func (rrt *retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
120118
ctx := req.Context()
121119

120+
ctx, cancel := context.WithTimeout(ctx, rrt.retryCfg.OverallTimeout)
121+
defer cancel()
122+
123+
req = req.WithContext(ctx)
124+
122125
var bodyBytes []byte
123126
var resp *http.Response
124127
var err error
@@ -135,8 +138,8 @@ func (rrt *retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, erro
135138
}
136139
}
137140

138-
for attempt := 0; attempt <= rrt.maxRetries; attempt++ {
139-
/* Try up to (rrt.maxRetries + 1) times: initial attempt + retries */
141+
for attempt := 0; attempt <= rrt.retryCfg.MaxRetry; attempt++ {
142+
/* Try up to (rrt.retryCfg.MaxRetry + 1) times: initial attempt + retries */
140143

141144
if bodyBytes != nil {
142145
req.Body = io.NopCloser(bytes.NewReader(bodyBytes))
@@ -147,15 +150,15 @@ func (rrt *retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, erro
147150
return resp, fmt.Errorf("request to %s %s failed with error: %w", req.Method, req.URL.String(), err)
148151
}
149152

150-
if apiserverutil.IsSuccessfulStatusCode(resp.StatusCode) {
153+
if apiserversdkutil.IsSuccessfulStatusCode(resp.StatusCode) {
151154
return resp, nil
152155
}
153156

154-
if !apiserverutil.IsRetryableHTTPStatusCodes(resp.StatusCode) {
157+
if !apiserversdkutil.IsRetryableHTTPStatusCodes(resp.StatusCode) {
155158
return resp, nil
156159
}
157160

158-
if attempt == rrt.maxRetries {
161+
if attempt == rrt.retryCfg.MaxRetry {
159162
return resp, nil
160163
}
161164

@@ -169,20 +172,14 @@ func (rrt *retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, erro
169172
}
170173
}
171174

172-
sleepDuration := apiserverutil.GetRetryBackoff(attempt, rrt.initBackoff, rrt.backoffBase, rrt.maxBackoff)
175+
sleepDuration := apiserversdkutil.GetRetryBackoff(attempt, rrt.retryCfg.InitBackoff, rrt.retryCfg.BackoffFactor, rrt.retryCfg.MaxBackoff)
173176

174-
// TODO: merge common utils for apiserver v1 and v2
175-
if deadline, ok := ctx.Deadline(); ok {
176-
remaining := time.Until(deadline)
177-
if sleepDuration > remaining {
178-
return resp, fmt.Errorf("retry timeout exceeded context deadline")
179-
}
177+
if ok := apiserversdkutil.CheckContextDeadline(ctx, sleepDuration); !ok {
178+
return resp, fmt.Errorf("retry timeout exceeded context deadline")
180179
}
181180

182-
select {
183-
case <-time.After(sleepDuration):
184-
case <-ctx.Done():
185-
return resp, fmt.Errorf("retry canceled during backoff: %w", ctx.Err())
181+
if err = apiserversdkutil.Sleep(ctx, sleepDuration); err != nil {
182+
return resp, fmt.Errorf("retry canceled during backoff: %w", err)
186183
}
187184
}
188185
return resp, err

apiserversdk/util/config.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,18 @@ const (
77
HTTPClientDefaultMaxRetry = 3
88

99
// Retry backoff settings
10-
HTTPClientDefaultBackoffBase = float64(2)
11-
HTTPClientDefaultInitBackoff = 500 * time.Millisecond
12-
HTTPClientDefaultMaxBackoff = 10 * time.Second
10+
HTTPClientDefaultBackoffFactor = float64(2)
11+
HTTPClientDefaultInitBackoff = 500 * time.Millisecond
12+
HTTPClientDefaultMaxBackoff = 10 * time.Second
1313

1414
// Overall timeout for retries
1515
HTTPClientDefaultOverallTimeout = 30 * time.Second
1616
)
17+
18+
type RetryConfig struct {
19+
MaxRetry int
20+
BackoffFactor float64
21+
InitBackoff time.Duration
22+
MaxBackoff time.Duration
23+
OverallTimeout time.Duration
24+
}

apiserversdk/util/http.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,33 @@
11
package util
22

33
import (
4+
"context"
45
"math"
56
"net/http"
67
"time"
78
)
89

9-
func GetRetryBackoff(attempt int, initBackoff time.Duration, backoffBase float64, maxBackoff time.Duration) time.Duration {
10-
sleepDuration := initBackoff * time.Duration(math.Pow(backoffBase, float64(attempt)))
11-
if sleepDuration > maxBackoff {
12-
sleepDuration = maxBackoff
10+
func Sleep(ctx context.Context, sleepDuration time.Duration) error {
11+
select {
12+
case <-time.After(sleepDuration):
13+
case <-ctx.Done():
14+
return ctx.Err()
1315
}
16+
return nil
17+
}
18+
19+
func CheckContextDeadline(ctx context.Context, sleepDuration time.Duration) bool {
20+
if deadline, ok := ctx.Deadline(); ok {
21+
remaining := time.Until(deadline)
22+
if sleepDuration > remaining {
23+
return false
24+
}
25+
}
26+
return true
27+
}
28+
29+
func GetRetryBackoff(attempt int, initBackoff time.Duration, backoffFactor float64, maxBackoff time.Duration) time.Duration {
30+
sleepDuration := min(initBackoff*time.Duration(math.Pow(backoffFactor, float64(attempt))), maxBackoff)
1431
return sleepDuration
1532
}
1633

0 commit comments

Comments
 (0)