diff --git a/internal/adc/translator/annotations.go b/internal/adc/translator/annotations.go index 509a093dc4..28319b650c 100644 --- a/internal/adc/translator/annotations.go +++ b/internal/adc/translator/annotations.go @@ -22,16 +22,24 @@ import ( "github.com/imdario/mergo" "github.com/apache/apisix-ingress-controller/internal/adc/translator/annotations" + "github.com/apache/apisix-ingress-controller/internal/adc/translator/annotations/upstream" ) // Structure extracted by Ingress Resource -type Ingress struct{} +type IngressConfig struct { + Upstream upstream.Upstream +} // parsers registered for ingress annotations -var ingressAnnotationParsers = map[string]annotations.IngressAnnotationsParser{} +var ingressAnnotationParsers = map[string]annotations.IngressAnnotationsParser{ + "upstream": upstream.NewParser(), +} -func (t *Translator) TranslateIngressAnnotations(anno map[string]string) *Ingress { - ing := &Ingress{} +func (t *Translator) TranslateIngressAnnotations(anno map[string]string) *IngressConfig { + if len(anno) == 0 { + return nil + } + ing := &IngressConfig{} if err := translateAnnotations(anno, ing); err != nil { t.Log.Error(err, "failed to translate ingress annotations", "annotations", anno) } diff --git a/internal/adc/translator/annotations/upstream/upstream.go b/internal/adc/translator/annotations/upstream/upstream.go new file mode 100644 index 0000000000..881e02b9ec --- /dev/null +++ b/internal/adc/translator/annotations/upstream/upstream.go @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package upstream + +import ( + "fmt" + "strconv" + "strings" + + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" + "github.com/apache/apisix-ingress-controller/internal/adc/translator/annotations" +) + +func NewParser() annotations.IngressAnnotationsParser { + return &Upstream{} +} + +type Upstream struct { + Scheme string + Retries int + TimeoutRead int + TimeoutConnect int + TimeoutSend int +} + +var validSchemes = map[string]struct{}{ + apiv2.SchemeHTTP: {}, + apiv2.SchemeHTTPS: {}, + apiv2.SchemeGRPC: {}, + apiv2.SchemeGRPCS: {}, +} + +func (u Upstream) Parse(e annotations.Extractor) (any, error) { + if scheme := strings.ToLower(e.GetStringAnnotation(annotations.AnnotationsUpstreamScheme)); scheme != "" { + if _, ok := validSchemes[scheme]; ok { + u.Scheme = scheme + } else { + return nil, fmt.Errorf("invalid upstream scheme: %s", scheme) + } + } + + if retry := e.GetStringAnnotation(annotations.AnnotationsUpstreamRetry); retry != "" { + t, err := strconv.Atoi(retry) + if err != nil { + return nil, fmt.Errorf("could not parse retry as an integer: %s", err.Error()) + } + u.Retries = t + } + + if timeoutConnect := strings.TrimSuffix(e.GetStringAnnotation(annotations.AnnotationsUpstreamTimeoutConnect), "s"); timeoutConnect != "" { + t, err := strconv.Atoi(timeoutConnect) + if err != nil { + return nil, fmt.Errorf("could not parse timeout as an integer: %s", err.Error()) + } + u.TimeoutConnect = t + } + + if timeoutRead := strings.TrimSuffix(e.GetStringAnnotation(annotations.AnnotationsUpstreamTimeoutRead), "s"); timeoutRead != "" { + t, err := strconv.Atoi(timeoutRead) + if err != nil { + return nil, fmt.Errorf("could not parse timeout as an integer: %s", err.Error()) + } + u.TimeoutRead = t + } + + if timeoutSend := strings.TrimSuffix(e.GetStringAnnotation(annotations.AnnotationsUpstreamTimeoutSend), "s"); timeoutSend != "" { + t, err := strconv.Atoi(timeoutSend) + if err != nil { + return nil, fmt.Errorf("could not parse timeout as an integer: %s", err.Error()) + } + u.TimeoutSend = t + } + + return u, nil +} diff --git a/internal/adc/translator/annotations/upstream/upstream_test.go b/internal/adc/translator/annotations/upstream/upstream_test.go new file mode 100644 index 0000000000..55d7459d98 --- /dev/null +++ b/internal/adc/translator/annotations/upstream/upstream_test.go @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package upstream + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/apache/apisix-ingress-controller/internal/adc/translator/annotations" +) + +func TestIPRestrictionHandler(t *testing.T) { + anno := map[string]string{ + annotations.AnnotationsUpstreamScheme: "grpcs", + } + u := NewParser() + + out, err := u.Parse(annotations.NewExtractor(anno)) + assert.Nil(t, err, "checking given error") + + ups, ok := out.(Upstream) + if !ok { + t.Fatalf("could not parse upstream") + } + assert.Equal(t, "grpcs", ups.Scheme) + + anno[annotations.AnnotationsUpstreamScheme] = "gRPC" + out, err = u.Parse(annotations.NewExtractor(anno)) + ups, ok = out.(Upstream) + if !ok { + t.Fatalf("could not parse upstream") + } + assert.Nil(t, err, "checking given error") + assert.Equal(t, "grpc", ups.Scheme) + + anno[annotations.AnnotationsUpstreamScheme] = "nothing" + out, err = u.Parse(annotations.NewExtractor(anno)) + assert.NotNil(t, err, "checking given error") + assert.Nil(t, out, "checking given output") +} + +func TestRetryParsing(t *testing.T) { + anno := map[string]string{ + annotations.AnnotationsUpstreamRetry: "2", + } + u := NewParser() + out, err := u.Parse(annotations.NewExtractor(anno)) + assert.Nil(t, err, "checking given error") + ups, ok := out.(Upstream) + if !ok { + t.Fatalf("could not parse upstream") + } + assert.Nil(t, err, "checking given error") + assert.Equal(t, 2, ups.Retries) + + anno[annotations.AnnotationsUpstreamRetry] = "asdf" + out, err = u.Parse(annotations.NewExtractor(anno)) + assert.NotNil(t, err, "checking given error") + assert.Nil(t, out, "checking given output") +} + +func TestTimeoutParsing(t *testing.T) { + anno := map[string]string{ + annotations.AnnotationsUpstreamTimeoutConnect: "2s", + annotations.AnnotationsUpstreamTimeoutRead: "3s", + annotations.AnnotationsUpstreamTimeoutSend: "4s", + } + u := NewParser() + out, err := u.Parse(annotations.NewExtractor(anno)) + assert.Nil(t, err, "checking given error") + + ups, ok := out.(Upstream) + if !ok { + t.Fatalf("could not parse upstream") + } + assert.Nil(t, err, "checking given error") + assert.Equal(t, 2, ups.TimeoutConnect) + assert.Equal(t, 3, ups.TimeoutRead) + assert.Equal(t, 4, ups.TimeoutSend) + anno[annotations.AnnotationsUpstreamRetry] = "asdf" + out, err = u.Parse(annotations.NewExtractor(anno)) + assert.NotNil(t, err, "checking given error") + assert.Nil(t, out, "checking given output") +} diff --git a/internal/adc/translator/annotations_test.go b/internal/adc/translator/annotations_test.go index d23a2474be..8216be3f72 100644 --- a/internal/adc/translator/annotations_test.go +++ b/internal/adc/translator/annotations_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/apache/apisix-ingress-controller/internal/adc/translator/annotations" + "github.com/apache/apisix-ingress-controller/internal/adc/translator/annotations/upstream" ) type mockParser struct { @@ -63,7 +64,10 @@ func TestTranslateAnnotations(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - // Set up mock parsers + orig := ingressAnnotationParsers + defer func() { ingressAnnotationParsers = orig }() + + ingressAnnotationParsers = make(map[string]annotations.IngressAnnotationsParser) for key, parser := range tt.parsers { ingressAnnotationParsers[key] = parser } @@ -77,11 +81,94 @@ func TestTranslateAnnotations(t *testing.T) { assert.NoError(t, err) } assert.Equal(t, tt.expected, dst) + }) + } +} - // Clean up mock parsers - for key := range tt.parsers { - delete(ingressAnnotationParsers, key) - } +func TestTranslateIngressAnnotations(t *testing.T) { + tests := []struct { + name string + anno map[string]string + expected *IngressConfig + }{ + { + name: "no matching annotations", + anno: map[string]string{"upstream": "value1"}, + expected: &IngressConfig{}, + }, + { + name: "invalid scheme", + anno: map[string]string{annotations.AnnotationsUpstreamScheme: "invalid"}, + expected: &IngressConfig{}, + }, + { + name: "http scheme", + anno: map[string]string{annotations.AnnotationsUpstreamScheme: "https"}, + expected: &IngressConfig{ + Upstream: upstream.Upstream{ + Scheme: "https", + }, + }, + }, + { + name: "retries", + anno: map[string]string{annotations.AnnotationsUpstreamRetry: "3"}, + expected: &IngressConfig{ + Upstream: upstream.Upstream{ + Retries: 3, + }, + }, + }, + { + name: "read timeout", + anno: map[string]string{ + annotations.AnnotationsUpstreamTimeoutRead: "5s", + }, + expected: &IngressConfig{ + Upstream: upstream.Upstream{ + TimeoutRead: 5, + }, + }, + }, + { + name: "timeouts", + anno: map[string]string{ + annotations.AnnotationsUpstreamTimeoutRead: "5s", + annotations.AnnotationsUpstreamTimeoutSend: "6s", + annotations.AnnotationsUpstreamTimeoutConnect: "7s", + }, + expected: &IngressConfig{ + Upstream: upstream.Upstream{ + TimeoutRead: 5, + TimeoutSend: 6, + TimeoutConnect: 7, + }, + }, + }, + { + name: "timeout/scheme/retries", + anno: map[string]string{ + annotations.AnnotationsUpstreamTimeoutRead: "5s", + annotations.AnnotationsUpstreamScheme: "http", + annotations.AnnotationsUpstreamRetry: "2", + }, + expected: &IngressConfig{ + Upstream: upstream.Upstream{ + TimeoutRead: 5, + Scheme: "http", + Retries: 2, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + translator := &Translator{} + result := translator.TranslateIngressAnnotations(tt.anno) + + assert.NotNil(t, result) + assert.Equal(t, tt.expected, result) }) } } diff --git a/internal/adc/translator/ingress.go b/internal/adc/translator/ingress.go index 894224b22b..da3c5d0ec4 100644 --- a/internal/adc/translator/ingress.go +++ b/internal/adc/translator/ingress.go @@ -18,6 +18,7 @@ package translator import ( + "cmp" "fmt" "strings" @@ -80,6 +81,10 @@ func (t *Translator) TranslateIngress( labels := label.GenLabel(obj) + config := t.TranslateIngressAnnotations(obj.Annotations) + + t.Log.V(1).Info("translating Ingress Annotations", "config", config) + // handle TLS configuration, convert to SSL objects if err := t.translateIngressTLSSection(tctx, obj, result, labels); err != nil { return nil, err @@ -97,7 +102,8 @@ func (t *Translator) TranslateIngress( } for j, path := range rule.HTTP.Paths { - if svc := t.buildServiceFromIngressPath(tctx, obj, &path, i, j, hosts, labels); svc != nil { + index := fmt.Sprintf("%d-%d", i, j) + if svc := t.buildServiceFromIngressPath(tctx, obj, config, &path, index, hosts, labels); svc != nil { result.Services = append(result.Services, svc) } } @@ -135,8 +141,9 @@ func (t *Translator) translateIngressTLSSection( func (t *Translator) buildServiceFromIngressPath( tctx *provider.TranslateContext, obj *networkingv1.Ingress, + config *IngressConfig, path *networkingv1.HTTPIngressPath, - ruleIndex, pathIndex int, + index string, hosts []string, labels map[string]string, ) *adctypes.Service { @@ -146,15 +153,15 @@ func (t *Translator) buildServiceFromIngressPath( service := adctypes.NewDefaultService() service.Labels = labels - service.Name = adctypes.ComposeServiceNameWithRule(obj.Namespace, obj.Name, fmt.Sprintf("%d-%d", ruleIndex, pathIndex)) + service.Name = adctypes.ComposeServiceNameWithRule(obj.Namespace, obj.Name, index) service.ID = id.GenID(service.Name) service.Hosts = hosts upstream := adctypes.NewDefaultUpstream() - protocol := t.resolveIngressUpstream(tctx, obj, path.Backend.Service, upstream) + protocol := t.resolveIngressUpstream(tctx, obj, config, path.Backend.Service, upstream) service.Upstream = upstream - route := buildRouteFromIngressPath(obj, path, ruleIndex, pathIndex, labels) + route := buildRouteFromIngressPath(obj, path, index, labels) if protocol == internaltypes.AppProtocolWS || protocol == internaltypes.AppProtocolWSS { route.EnableWebsocket = ptr.To(true) } @@ -167,11 +174,28 @@ func (t *Translator) buildServiceFromIngressPath( func (t *Translator) resolveIngressUpstream( tctx *provider.TranslateContext, obj *networkingv1.Ingress, + config *IngressConfig, backendService *networkingv1.IngressServiceBackend, upstream *adctypes.Upstream, ) string { backendRef := convertBackendRef(obj.Namespace, backendService.Name, internaltypes.KindService) t.AttachBackendTrafficPolicyToUpstream(backendRef, tctx.BackendTrafficPolicies, upstream) + if config != nil { + upConfig := config.Upstream + if upConfig.Scheme != "" { + upstream.Scheme = upConfig.Scheme + } + if upConfig.Retries > 0 { + upstream.Retries = ptr.To(int64(upConfig.Retries)) + } + if upConfig.TimeoutConnect > 0 || upConfig.TimeoutRead > 0 || upConfig.TimeoutSend > 0 { + upstream.Timeout = &adctypes.Timeout{ + Connect: cmp.Or(upConfig.TimeoutConnect, 60), + Read: cmp.Or(upConfig.TimeoutRead, 60), + Send: cmp.Or(upConfig.TimeoutSend, 60), + } + } + } // determine service port/port name var protocol string var port intstr.IntOrString @@ -224,11 +248,11 @@ func (t *Translator) resolveIngressUpstream( func buildRouteFromIngressPath( obj *networkingv1.Ingress, path *networkingv1.HTTPIngressPath, - ruleIndex, pathIndex int, + index string, labels map[string]string, ) *adctypes.Route { route := adctypes.NewDefaultRoute() - route.Name = adctypes.ComposeRouteName(obj.Namespace, obj.Name, fmt.Sprintf("%d-%d", ruleIndex, pathIndex)) + route.Name = adctypes.ComposeRouteName(obj.Namespace, obj.Name, index) route.ID = id.GenID(route.Name) route.Labels = labels diff --git a/internal/webhook/v1/ingress_webhook.go b/internal/webhook/v1/ingress_webhook.go index 03ee8f5b48..4941a8210b 100644 --- a/internal/webhook/v1/ingress_webhook.go +++ b/internal/webhook/v1/ingress_webhook.go @@ -42,11 +42,6 @@ var unsupportedAnnotations = []string{ "k8s.apisix.apache.org/use-regex", "k8s.apisix.apache.org/enable-websocket", "k8s.apisix.apache.org/plugin-config-name", - "k8s.apisix.apache.org/upstream-scheme", - "k8s.apisix.apache.org/upstream-retries", - "k8s.apisix.apache.org/upstream-connect-timeout", - "k8s.apisix.apache.org/upstream-read-timeout", - "k8s.apisix.apache.org/upstream-send-timeout", "k8s.apisix.apache.org/enable-cors", "k8s.apisix.apache.org/cors-allow-origin", "k8s.apisix.apache.org/cors-allow-headers", diff --git a/test/e2e/framework/manifests/nginx.yaml b/test/e2e/framework/manifests/nginx.yaml index a01d554904..e75c1dae38 100644 --- a/test/e2e/framework/manifests/nginx.yaml +++ b/test/e2e/framework/manifests/nginx.yaml @@ -43,6 +43,14 @@ data: return 200 'Hello, World!'; } + location /delay { + content_by_lua_block { + local delay = tonumber(ngx.var.arg_delay) or 0 + ngx.sleep(delay) + ngx.say("Slept for ", delay, " seconds") + } + } + location /ws { content_by_lua_block { local server = require "resty.websocket.server" @@ -144,6 +152,10 @@ spec: protocol: TCP targetPort: 443 appProtocol: https + - name: https-v2 + port: 7443 + protocol: TCP + targetPort: 443 - name: ws port: 8080 protocol: TCP diff --git a/test/e2e/ingress/annotations.go b/test/e2e/ingress/annotations.go new file mode 100644 index 0000000000..3043d5b138 --- /dev/null +++ b/test/e2e/ingress/annotations.go @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ingress + +import ( + "context" + "fmt" + "net/http" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/utils/ptr" + + "github.com/apache/apisix-ingress-controller/test/e2e/framework" + "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" +) + +var _ = Describe("Test Ingress With Annotations", Label("networking.k8s.io", "ingress"), func() { + s := scaffold.NewDefaultScaffold() + + Context("Upstream", func() { + var ( + ingressRetries = ` +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: retries + annotations: + k8s.apisix.apache.org/upstream-retries: "3" +spec: + ingressClassName: %s + rules: + - host: nginx.example + http: + paths: + - path: /get + pathType: Exact + backend: + service: + name: nginx + port: + number: 80 +` + ingressSchemeHTTPS = ` +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: https-backend + annotations: + k8s.apisix.apache.org/upstream-scheme: https +spec: + ingressClassName: %s + rules: + - host: nginx.example + http: + paths: + - path: /get + pathType: Exact + backend: + service: + name: nginx + port: + number: 7443 +` + + ingressTimeouts = ` +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: timeouts + annotations: + k8s.apisix.apache.org/upstream-read-timeout: "2s" + k8s.apisix.apache.org/upstream-send-timeout: "3s" + k8s.apisix.apache.org/upstream-connect-timeout: "4s" +spec: + ingressClassName: %s + rules: + - host: nginx.example + http: + paths: + - path: /delay + pathType: Exact + backend: + service: + name: nginx + port: + number: 443 +` + ) + BeforeEach(func() { + s.DeployNginx(framework.NginxOptions{ + Namespace: s.Namespace(), + Replicas: ptr.To(int32(1)), + }) + By("create GatewayProxy") + Expect(s.CreateResourceFromString(s.GetGatewayProxySpec())).NotTo(HaveOccurred(), "creating GatewayProxy") + + By("create IngressClass") + err := s.CreateResourceFromStringWithNamespace(s.GetIngressClassYaml(), "") + Expect(err).NotTo(HaveOccurred(), "creating IngressClass") + time.Sleep(5 * time.Second) + }) + It("retries", func() { + Expect(s.CreateResourceFromString(fmt.Sprintf(ingressRetries, s.Namespace()))).ShouldNot(HaveOccurred(), "creating Ingress") + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "nginx.example", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + upstreams, err := s.DefaultDataplaneResource().Upstream().List(context.Background()) + Expect(err).NotTo(HaveOccurred(), "listing Upstream") + Expect(upstreams).To(HaveLen(1), "checking Upstream length") + Expect(upstreams[0].Retries).To(Equal(ptr.To(int64(3))), "checking Upstream retries") + }) + It("scheme", func() { + Expect(s.CreateResourceFromString(fmt.Sprintf(ingressSchemeHTTPS, s.Namespace()))).ShouldNot(HaveOccurred(), "creating Ingress") + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "nginx.example", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + upstreams, err := s.DefaultDataplaneResource().Upstream().List(context.Background()) + Expect(err).NotTo(HaveOccurred(), "listing Upstream") + Expect(upstreams).To(HaveLen(1), "checking Upstream length") + Expect(upstreams[0].Scheme).To(Equal("https"), "checking Upstream scheme") + }) + It("timeouts", func() { + Expect(s.CreateResourceFromString(fmt.Sprintf(ingressTimeouts, s.Namespace()))).ShouldNot(HaveOccurred(), "creating Ingress") + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/delay", + Host: "nginx.example", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + + _ = s.NewAPISIXClient().GET("/delay").WithQuery("delay", "10"). + WithHost("nginx.example").Expect().Status(http.StatusGatewayTimeout) + + _ = s.NewAPISIXClient().GET("/delay").WithHost("nginx.example").Expect().Status(http.StatusOK) + + upstreams, err := s.DefaultDataplaneResource().Upstream().List(context.Background()) + Expect(err).NotTo(HaveOccurred(), "listing Upstream") + Expect(upstreams).To(HaveLen(1), "checking Upstream length") + Expect(upstreams[0].Timeout).ToNot(BeNil(), "checking Upstream timeout") + Expect(upstreams[0].Timeout.Read).To(Equal(2), "checking Upstream read timeout") + Expect(upstreams[0].Timeout.Send).To(Equal(3), "checking Upstream send timeout") + Expect(upstreams[0].Timeout.Connect).To(Equal(4), "checking Upstream connect timeout") + }) + }) +})