diff --git a/conformance/resources/base.yaml b/conformance/resources/base.yaml index 2e1b378c3..8738c531c 100644 --- a/conformance/resources/base.yaml +++ b/conformance/resources/base.yaml @@ -200,7 +200,7 @@ spec: terminationGracePeriodSeconds: 130 containers: - name: epp - image: us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/epp:v1.0.0 + image: us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/epp:v20251023-d788a2c imagePullPolicy: Always args: - --pool-name @@ -298,7 +298,7 @@ spec: terminationGracePeriodSeconds: 130 containers: - name: epp - image: us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/epp:v1.0.0 + image: us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/epp:v20251023-d788a2c imagePullPolicy: Always args: - --pool-name @@ -340,6 +340,209 @@ spec: configMap: name: plugins-config --- +# -- Data Parallelism (DP) backend deployment: 3 pods, each listening on three ports to simulate ranks --- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dp-inference-model-server-deployment + namespace: inference-conformance-app-backend + labels: + app: dp-inference-model-server +spec: + replicas: 3 + selector: + matchLabels: + app: dp-inference-model-server + template: + metadata: + labels: + app: dp-inference-model-server + spec: + containers: + - name: echoserver-3000 + image: gcr.io/k8s-staging-gateway-api/echo-basic:v20240412-v1.0.0-394-g40c666fd + ports: + - containerPort: 3000 + readinessProbe: + httpGet: + path: / + port: 3000 + initialDelaySeconds: 3 + periodSeconds: 5 + failureThreshold: 2 + env: + - name: HTTP_PORT # Default port for HTTP echo server + value: "3000" + - name: H2C_PORT # Default port for HTC echo server + value: "3001" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: echoserver-3002 + image: gcr.io/k8s-staging-gateway-api/echo-basic:v20240412-v1.0.0-394-g40c666fd + ports: + - containerPort: 3002 + readinessProbe: + httpGet: + path: / + port: 3002 + initialDelaySeconds: 3 + periodSeconds: 5 + failureThreshold: 2 + env: + - name: HTTP_PORT + value: "3002" + - name: H2C_PORT + value: "3003" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: echoserver-3004 + image: gcr.io/k8s-staging-gateway-api/echo-basic:v20240412-v1.0.0-394-g40c666fd + ports: + - containerPort: 3004 + readinessProbe: + httpGet: + path: / + port: 3004 + initialDelaySeconds: 3 + periodSeconds: 5 + failureThreshold: 2 + env: + - name: HTTP_PORT + value: "3004" + - name: H2C_PORT + value: "3005" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP +--- +# --- Data Parallelism (DP) InferencePool Definition --- +apiVersion: inference.networking.k8s.io/v1 +kind: InferencePool +metadata: + name: dp-inference-pool + namespace: inference-conformance-app-backend +spec: + selector: + matchLabels: + app: dp-inference-model-server + targetPorts: + - number: 3000 + - number: 3002 + - number: 3004 + endpointPickerRef: + name: dp-endpoint-picker-svc + port: + number: 9002 +--- +# --- Data Parallelism (DP) Conformance EPP service Definition --- +apiVersion: v1 +kind: Service +metadata: + name: dp-endpoint-picker-svc + namespace: inference-conformance-app-backend +spec: + selector: + app: dp-app-backend-epp + ports: + - protocol: TCP + port: 9002 + targetPort: 9002 + appProtocol: http2 + type: ClusterIP +--- +# --- Data Parallelism (DP) Conformance EPP Deployment --- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dp-app-endpoint-picker + namespace: inference-conformance-app-backend + labels: + app: dp-app-backend-epp +spec: + replicas: 1 + selector: + matchLabels: + app: dp-app-backend-epp + template: + metadata: + labels: + app: dp-app-backend-epp + spec: + # Conservatively, this timeout should mirror the longest grace period of the pods within the pool + terminationGracePeriodSeconds: 130 + containers: + - name: epp + image: us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/epp:v20251023-d788a2c + imagePullPolicy: Always + args: + - --pool-name + - "dp-inference-pool" + - --pool-namespace + - "inference-conformance-app-backend" + - --v + - "4" + - --zap-encoder + - "json" + - --grpc-port + - "9002" + - --grpc-health-port + - "9003" + - "--config-file" + - "/config/conformance-plugins.yaml" + ports: + - containerPort: 9002 + - containerPort: 9003 + - name: metrics + containerPort: 9090 + livenessProbe: + grpc: + port: 9003 + service: inference-extension + initialDelaySeconds: 5 + periodSeconds: 10 + readinessProbe: + grpc: + port: 9003 + service: inference-extension + initialDelaySeconds: 5 + periodSeconds: 10 + volumeMounts: + - name: plugins-config-volume + mountPath: "/config" + volumes: + - name: plugins-config-volume + configMap: + name: plugins-config +--- apiVersion: v1 kind: ConfigMap metadata: diff --git a/conformance/tests/gateway_following_epp_routing_dp.go b/conformance/tests/gateway_following_epp_routing_dp.go new file mode 100644 index 000000000..50cba26f0 --- /dev/null +++ b/conformance/tests/gateway_following_epp_routing_dp.go @@ -0,0 +1,189 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tests + +import ( + "fmt" + "net/http" + "slices" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "k8s.io/apimachinery/pkg/types" + gwhttp "sigs.k8s.io/gateway-api/conformance/utils/http" + "sigs.k8s.io/gateway-api/conformance/utils/suite" + "sigs.k8s.io/gateway-api/pkg/features" + + "sigs.k8s.io/gateway-api-inference-extension/conformance/resources" + k8sutils "sigs.k8s.io/gateway-api-inference-extension/conformance/utils/kubernetes" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/test" +) + +func init() { + ConformanceTests = append(ConformanceTests, GatewayFollowingEPPRoutingWithDataParallelism) +} + +// GatewayFollowingEPPRoutingWithDataParallelism verifies that with multiple targetPorts (ranks) +// the gateway still routes *only* to pods returned by EPP. Rank/port fan-out is exercised by load. +var GatewayFollowingEPPRoutingWithDataParallelism = suite.ConformanceTest{ + ShortName: "GatewayFollowingEPPRoutingWithDataParallelism", + Description: "Inference gateway should restrict traffic to EPP-selected pods while EPP balances across multiple targetPorts (DP ranks)", + Manifests: []string{"tests/gateway_following_epp_routing_dp.yaml"}, + Features: []features.FeatureName{ + features.FeatureName("SupportInferencePool"), + features.SupportGateway, + }, + Test: func(t *testing.T, s *suite.ConformanceTestSuite) { + const ( + hostname = "primary.example.com" + path = "/primary-gateway-dp-test" + appPodBackendPrefix = "dp-inference-model-server" // Must match the app label in the backend pods + ) + + httpRouteNN := types.NamespacedName{Name: "httproute-for-primary-gw-dp", Namespace: resources.AppBackendNamespace} + gatewayNN := resources.PrimaryGatewayNN + poolNN := types.NamespacedName{Name: "dp-inference-pool", Namespace: resources.AppBackendNamespace} + backendPodLabels := map[string]string{"app": "dp-inference-model-server"} + + t.Log("Verifying HTTPRoute and InferencePool are accepted and the Gateway has an address.") + k8sutils.HTTPRouteMustBeAcceptedAndResolved(t, s.Client, s.TimeoutConfig, httpRouteNN, gatewayNN) + k8sutils.InferencePoolMustBeAcceptedByParent(t, s.Client, poolNN, gatewayNN) + gwAddr := k8sutils.GetGatewayEndpoint(t, s.Client, s.TimeoutConfig, gatewayNN) + + t.Logf("Fetching DP backend pods with labels: %v", backendPodLabels) + pods, err := k8sutils.GetPodsWithLabel(t, s.Client, resources.AppBackendNamespace, backendPodLabels, s.TimeoutConfig) + require.NoError(t, err, "Failed to get DP backend pods") + require.Len(t, pods, 3, "Expected to find 3 DP backend pods, found %d", len(pods)) + + podIPs := make([]string, len(pods)) + podNames := make([]string, len(pods)) + for i, pod := range pods { + podIPs[i] = pod.Status.PodIP + podNames[i] = pod.Name + } + + requestBody := `{ + "model": "conformance-fake-model", + "prompt": "Write as if you were a critic: San Francisco" + }` + + // Smoke: single-pod pin to ensure header filter works before main cases. + gwhttp.MakeRequestAndExpectEventuallyConsistentResponse( + t, + s.RoundTripper, + s.TimeoutConfig, + gwAddr, + gwhttp.ExpectedResponse{ + Request: gwhttp.Request{ + Host: hostname, + Path: path, + Method: http.MethodPost, + Body: requestBody, + Headers: map[string]string{ + test.HeaderTestEppEndPointSelectionKey: podIPs[0], + }, + }, + Response: gwhttp.Response{StatusCodes: []int{http.StatusOK}}, + Backend: podNames[0], + Namespace: resources.AppBackendNamespace, + }, + ) + + testCases := []struct { + name string + podIPsToBeReturnedByEPP []string + expectAllRequestsRoutedWithinPodNames []string + }{ + { + name: "DP routes only to one designated pod (multiple ranks under the hood)", + podIPsToBeReturnedByEPP: []string{podIPs[1]}, + expectAllRequestsRoutedWithinPodNames: []string{podNames[1]}, + }, + { + name: "DP routes only to two designated pods", + podIPsToBeReturnedByEPP: []string{podIPs[0], podIPs[2]}, + expectAllRequestsRoutedWithinPodNames: []string{podNames[0], podNames[2]}, + }, + { + name: "DP routes only to all pods (EPP returns all; ranks balanced internally)", + podIPsToBeReturnedByEPP: []string{podIPs[0], podIPs[1], podIPs[2]}, + expectAllRequestsRoutedWithinPodNames: []string{podNames[0], podNames[1], podNames[2]}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + eppHeaderValue := strings.Join(tc.podIPsToBeReturnedByEPP, ",") + headers := map[string]string{test.HeaderTestEppEndPointSelectionKey: eppHeaderValue} + + assertTrafficOnlyReachesToExpectedPodsDP( + t, s, gwAddr, + gwhttp.ExpectedResponse{ + Request: gwhttp.Request{ + Host: hostname, + Path: path, + Method: http.MethodPost, + Body: requestBody, + Headers: headers, + }, + Response: gwhttp.Response{StatusCode: http.StatusOK}, + Backend: appPodBackendPrefix, + Namespace: resources.AppBackendNamespace, + }, + tc.expectAllRequestsRoutedWithinPodNames, + ) + }) + } + }, +} + +func assertTrafficOnlyReachesToExpectedPodsDP(t *testing.T, suite *suite.ConformanceTestSuite, gwAddr string, expected gwhttp.ExpectedResponse, expectedPodNames []string) { + t.Helper() + const ( + concurrency = 20 + totalRequests = 200 + ) + var ( + rt = suite.RoundTripper + g errgroup.Group + r = gwhttp.MakeRequest(t, &expected, gwAddr, "HTTP", "http") + ) + g.SetLimit(concurrency) + + for i := 0; i < totalRequests; i++ { + g.Go(func() error { + cReq, cRes, err := rt.CaptureRoundTrip(r) + if err != nil { + return fmt.Errorf("failed roundtrip: %w", err) + } + if err := gwhttp.CompareRoundTrip(t, &r, cReq, cRes, expected); err != nil { + return fmt.Errorf("expectation failed: %w", err) + } + // Enforce no leakage to non-selected pods (ports/ranks are internal). + if !slices.Contains(expectedPodNames, cReq.Pod) { + return fmt.Errorf("unexpected pod %q (expected one of %v)", cReq.Pod, expectedPodNames) + } + return nil + }) + } + if err := g.Wait(); err != nil { + t.Fatalf("Requests were not confined to expected pods (DP), err: %v", err) + } + t.Logf("DP traffic successfully restricted to expected pods: %v", expectedPodNames) +} diff --git a/conformance/tests/gateway_following_epp_routing_dp.yaml b/conformance/tests/gateway_following_epp_routing_dp.yaml new file mode 100644 index 000000000..02f849c2f --- /dev/null +++ b/conformance/tests/gateway_following_epp_routing_dp.yaml @@ -0,0 +1,23 @@ +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: httproute-for-primary-gw-dp + namespace: inference-conformance-app-backend +spec: + parentRefs: + - group: gateway.networking.k8s.io + kind: Gateway + name: conformance-primary + namespace: inference-conformance-infra + sectionName: http + hostnames: + - "primary.example.com" + rules: + - backendRefs: + - group: inference.networking.k8s.io + kind: InferencePool + name: dp-inference-pool + matches: + - path: + type: PathPrefix + value: /primary-gateway-dp-test diff --git a/pkg/epp/scheduling/framework/plugins/test/filter/filter_test.go b/pkg/epp/scheduling/framework/plugins/test/filter/filter_test.go index d5615bc90..51011d045 100644 --- a/pkg/epp/scheduling/framework/plugins/test/filter/filter_test.go +++ b/pkg/epp/scheduling/framework/plugins/test/filter/filter_test.go @@ -36,7 +36,7 @@ func TestFilter(t *testing.T) { }{ { name: "TestHeaderBasedFilter, header endpoint unset in request", - req: &types.LLMRequest{}, // Delieverately unset the header. + req: &types.LLMRequest{}, // Deliberately unset the header. input: []types.Pod{ &types.PodMetrics{ Pod: &backend.Pod{ @@ -109,6 +109,45 @@ func TestFilter(t *testing.T) { }, }, }, + { + name: "TestHeaderBasedFilter, IP:port values match by IP (port ignored)", + req: &types.LLMRequest{Headers: map[string]string{test.HeaderTestEppEndPointSelectionKey: "10.0.0.2:3001,10.0.0.1:3000"}}, + input: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.1"}}, + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.2"}}, + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.3"}}, + }, + // Output should follow the header order, mapped by IP (ports ignored) + output: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.2"}}, + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.1"}}, + }, + }, + { + name: "TestHeaderBasedFilter, duplicate IP with different ports yields a single match (dedup by IP)", + req: &types.LLMRequest{Headers: map[string]string{test.HeaderTestEppEndPointSelectionKey: "10.0.0.2:3001,10.0.0.2:3002"}}, + input: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.1"}}, + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.2"}}, + }, + output: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.2"}}, + }, + }, + { + name: "TestHeaderBasedFilter, IPv6 bare and bracketed with port", + req: &types.LLMRequest{Headers: map[string]string{test.HeaderTestEppEndPointSelectionKey: "fd00::2,[fd00::1]:3000"}}, + input: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{Address: "fd00::1"}}, + &types.PodMetrics{Pod: &backend.Pod{Address: "fd00::2"}}, + &types.PodMetrics{Pod: &backend.Pod{Address: "fd00::3"}}, + }, + // Should match ::2, then ::1 (header order), trimming brackets and ignoring port + output: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{Address: "fd00::2"}}, + &types.PodMetrics{Pod: &backend.Pod{Address: "fd00::1"}}, + }, + }, } for _, test := range tests { diff --git a/pkg/epp/scheduling/framework/plugins/test/filter/request_header_based_filter.go b/pkg/epp/scheduling/framework/plugins/test/filter/request_header_based_filter.go index 2836755d4..951acbbd2 100644 --- a/pkg/epp/scheduling/framework/plugins/test/filter/request_header_based_filter.go +++ b/pkg/epp/scheduling/framework/plugins/test/filter/request_header_based_filter.go @@ -19,6 +19,7 @@ package filter import ( "context" "encoding/json" + "net" "strings" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" @@ -64,24 +65,44 @@ func (f *HeaderBasedTestingFilter) WithName(name string) *HeaderBasedTestingFilt return f } -// Filter selects pods that match the IP addresses specified in the request header. +// Filter selects pods whose IPs match any value in the "test-epp-endpoint-selection" header. +// Values may be "IP" or "IP:port"; ports (ranks) are ignored here because DP fan-out happens later. func (f *HeaderBasedTestingFilter) Filter(_ context.Context, _ *types.CycleState, request *types.LLMRequest, pods []types.Pod) []types.Pod { headerValue, ok := request.Headers[test.HeaderTestEppEndPointSelectionKey] if !ok || headerValue == "" { return []types.Pod{} } + // Build a map of pod IP -> pod podAddressMap := make(map[string]types.Pod, len(pods)) for _, pod := range pods { podAddressMap[pod.GetPod().GetIPAddress()] = pod } + // Accept comma-separated list of IP or IP:port endpoints := strings.Split(headerValue, ",") filteredPods := make([]types.Pod, 0, len(endpoints)) - for _, endpoint := range endpoints { - trimmedEndpoint := strings.TrimSpace(endpoint) - if pod, found := podAddressMap[trimmedEndpoint]; found { - filteredPods = append(filteredPods, pod) + seen := make(map[string]struct{}, len(endpoints)) // dedupe + + for _, ep := range endpoints { + item := strings.TrimSpace(ep) + if item == "" { + continue + } + // Handle IPv6 with or without ports, e.g. "[fd00::1]:3000" or "fd00::1" + host := item + if h, _, err := net.SplitHostPort(item); err == nil { + host = h + } else { + // Could still be a bare IPv6 in brackets "[fd00::1]" + host = strings.Trim(host, "[]") + } + + if pod, found := podAddressMap[host]; found { + if _, dup := seen[pod.GetPod().GetIPAddress()]; !dup { + seen[pod.GetPod().GetIPAddress()] = struct{}{} + filteredPods = append(filteredPods, pod) + } } } return filteredPods