Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (h *eventHandlerImpl) sendNginxConfig(ctx context.Context, logger logr.Logg
panic("expected deployment, got nil")
}

cfg := dataplane.BuildConfiguration(ctx, gr, gw, h.cfg.serviceResolver, h.cfg.plus)
cfg := dataplane.BuildConfiguration(ctx, logger, gr, gw, h.cfg.serviceResolver, h.cfg.plus)
depCtx, getErr := h.getDeploymentContext(ctx)
if getErr != nil {
logger.Error(getErr, "error getting deployment context for usage reporting")
Expand Down
29 changes: 24 additions & 5 deletions internal/controller/state/dataplane/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"slices"
"sort"

"github.com/go-logr/logr"
discoveryV1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -32,6 +33,7 @@ const (
// BuildConfiguration builds the Configuration from the Graph.
func BuildConfiguration(
ctx context.Context,
logger logr.Logger,
g *graph.Graph,
gateway *graph.Gateway,
serviceResolver resolver.ServiceResolver,
Expand All @@ -55,6 +57,7 @@ func BuildConfiguration(
backendGroups := buildBackendGroups(append(httpServers, sslServers...))
upstreams := buildUpstreams(
ctx,
logger,
gateway,
serviceResolver,
g.ReferencedServices,
Expand All @@ -71,9 +74,14 @@ func BuildConfiguration(
SSLServers: sslServers,
TLSPassthroughServers: buildPassthroughServers(gateway),
Upstreams: upstreams,
StreamUpstreams: buildStreamUpstreams(ctx, gateway, serviceResolver, baseHTTPConfig.IPFamily),
BackendGroups: backendGroups,
SSLKeyPairs: buildSSLKeyPairs(g.ReferencedSecrets, gateway.Listeners),
StreamUpstreams: buildStreamUpstreams(
ctx,
logger,
gateway,
serviceResolver,
baseHTTPConfig.IPFamily),
BackendGroups: backendGroups,
SSLKeyPairs: buildSSLKeyPairs(g.ReferencedSecrets, gateway.Listeners),
CertBundles: buildCertBundles(
buildRefCertificateBundles(g.ReferencedSecrets, g.ReferencedCaCertConfigMaps),
backendGroups,
Expand Down Expand Up @@ -163,6 +171,7 @@ func buildPassthroughServers(gateway *graph.Gateway) []Layer4VirtualServer {
// buildStreamUpstreams builds all stream upstreams.
func buildStreamUpstreams(
ctx context.Context,
logger logr.Logger,
gateway *graph.Gateway,
serviceResolver resolver.ServiceResolver,
ipFamily IPFamilyType,
Expand Down Expand Up @@ -202,7 +211,13 @@ func buildStreamUpstreams(

allowedAddressType := getAllowedAddressType(ipFamily)

eps, err := serviceResolver.Resolve(ctx, br.SvcNsName, br.ServicePort, allowedAddressType)
eps, err := serviceResolver.Resolve(
ctx,
logger,
br.SvcNsName,
br.ServicePort,
allowedAddressType,
)
if err != nil {
errMsg = err.Error()
}
Expand Down Expand Up @@ -670,6 +685,7 @@ func (hpr *hostPathRules) maxServerCount() int {

func buildUpstreams(
ctx context.Context,
logger logr.Logger,
gateway *graph.Gateway,
svcResolver resolver.ServiceResolver,
referencedServices map[types.NamespacedName]*graph.ReferencedService,
Expand Down Expand Up @@ -701,6 +717,7 @@ func buildUpstreams(
for _, br := range rule.BackendRefs {
if upstream := buildUpstream(
ctx,
logger,
br,
gateway,
svcResolver,
Expand Down Expand Up @@ -735,6 +752,7 @@ func buildUpstreams(

func buildUpstream(
ctx context.Context,
logger logr.Logger,
br graph.BackendRef,
gateway *graph.Gateway,
svcResolver resolver.ServiceResolver,
Expand All @@ -760,9 +778,10 @@ func buildUpstream(

var errMsg string

eps, err := svcResolver.Resolve(ctx, br.SvcNsName, br.ServicePort, allowedAddressType)
eps, err := svcResolver.Resolve(ctx, logger, br.SvcNsName, br.ServicePort, allowedAddressType)
if err != nil {
errMsg = err.Error()
logger.Error(err, "failed to resolve endpoints", "service", br.SvcNsName)
}

var upstreamPolicies []policies.Policy
Expand Down
21 changes: 17 additions & 4 deletions internal/controller/state/dataplane/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sort"
"testing"

"github.com/go-logr/logr"
. "github.com/onsi/gomega"
apiv1 "k8s.io/api/core/v1"
discoveryV1 "k8s.io/api/discovery/v1"
Expand Down Expand Up @@ -714,6 +715,7 @@ func TestBuildConfiguration(t *testing.T) {

fakeResolver.ResolveStub = func(
_ context.Context,
_ logr.Logger,
nsName types.NamespacedName,
_ apiv1.ServicePort,
_ []discoveryV1.AddressType,
Expand Down Expand Up @@ -2533,7 +2535,8 @@ func TestBuildConfiguration(t *testing.T) {
g := NewWithT(t)

result := BuildConfiguration(
context.TODO(),
t.Context(),
logr.Discard(),
test.graph,
test.graph.Gateways[gatewayNsName],
fakeResolver,
Expand Down Expand Up @@ -2648,7 +2651,8 @@ func TestBuildConfiguration_Plus(t *testing.T) {
g := NewWithT(t)

result := BuildConfiguration(
context.TODO(),
t.Context(),
logr.Discard(),
test.graph,
test.graph.Gateways[gatewayNsName],
fakeResolver,
Expand Down Expand Up @@ -3372,6 +3376,7 @@ func TestBuildUpstreams(t *testing.T) {
fakeResolver := &resolverfakes.FakeServiceResolver{}
fakeResolver.ResolveCalls(func(
_ context.Context,
_ logr.Logger,
svcNsName types.NamespacedName,
_ apiv1.ServicePort,
_ []discoveryV1.AddressType,
Expand Down Expand Up @@ -3404,7 +3409,14 @@ func TestBuildUpstreams(t *testing.T) {

g := NewWithT(t)

upstreams := buildUpstreams(context.TODO(), gateway, fakeResolver, referencedServices, Dual)
upstreams := buildUpstreams(
t.Context(),
logr.Discard(),
gateway,
fakeResolver,
referencedServices,
Dual,
)
g.Expect(upstreams).To(ConsistOf(expUpstreams))
}

Expand Down Expand Up @@ -4357,6 +4369,7 @@ func TestBuildStreamUpstreams(t *testing.T) {

fakeResolver.ResolveStub = func(
_ context.Context,
_ logr.Logger,
nsName types.NamespacedName,
_ apiv1.ServicePort,
_ []discoveryV1.AddressType,
Expand All @@ -4367,7 +4380,7 @@ func TestBuildStreamUpstreams(t *testing.T) {
return fakeEndpoints, nil
}

streamUpstreams := buildStreamUpstreams(context.Background(), gateway, &fakeResolver, Dual)
streamUpstreams := buildStreamUpstreams(t.Context(), logr.Discard(), gateway, &fakeResolver, Dual)

expectedStreamUpstreams := []Upstream{
{
Expand Down
20 changes: 15 additions & 5 deletions internal/controller/state/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"slices"

"github.com/go-logr/logr"
v1 "k8s.io/api/core/v1"
discoveryV1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -22,6 +23,7 @@ import (
type ServiceResolver interface {
Resolve(
ctx context.Context,
logger logr.Logger,
svcNsName types.NamespacedName,
svcPort v1.ServicePort,
allowedAddressType []discoveryV1.AddressType,
Expand Down Expand Up @@ -52,6 +54,7 @@ func NewServiceResolverImpl(c client.Client) *ServiceResolverImpl {
// Returns an error if the Service or ServicePort cannot be resolved.
func (e *ServiceResolverImpl) Resolve(
ctx context.Context,
logger logr.Logger,
svcNsName types.NamespacedName,
svcPort v1.ServicePort,
allowedAddressType []discoveryV1.AddressType,
Expand All @@ -76,6 +79,7 @@ func (e *ServiceResolverImpl) Resolve(
}

return resolveEndpoints(
logger,
svcNsName,
svcPort,
endpointSliceList,
Expand All @@ -84,19 +88,23 @@ func (e *ServiceResolverImpl) Resolve(
)
}

type initEndpointSetFunc func([]discoveryV1.EndpointSlice) map[Endpoint]struct{}
type initEndpointSetFunc func(logr.Logger, []discoveryV1.EndpointSlice) map[Endpoint]struct{}

func initEndpointSetWithCalculatedSize(endpointSlices []discoveryV1.EndpointSlice) map[Endpoint]struct{} {
func initEndpointSetWithCalculatedSize(
logger logr.Logger,
endpointSlices []discoveryV1.EndpointSlice,
) map[Endpoint]struct{} {
// performance optimization to reduce the cost of growing the map. See the benchamarks for performance comparison.
return make(map[Endpoint]struct{}, calculateReadyEndpoints(endpointSlices))
return make(map[Endpoint]struct{}, calculateReadyEndpoints(logger, endpointSlices))
}

func calculateReadyEndpoints(endpointSlices []discoveryV1.EndpointSlice) int {
func calculateReadyEndpoints(logger logr.Logger, endpointSlices []discoveryV1.EndpointSlice) int {
total := 0

for _, eps := range endpointSlices {
for _, endpoint := range eps.Endpoints {
if !endpointReady(endpoint) {
logger.V(1).Info("ignoring endpoint that is not ready", "endpoint", endpoint)
continue
}

Expand All @@ -108,6 +116,7 @@ func calculateReadyEndpoints(endpointSlices []discoveryV1.EndpointSlice) int {
}

func resolveEndpoints(
logger logr.Logger,
svcNsName types.NamespacedName,
svcPort v1.ServicePort,
endpointSliceList discoveryV1.EndpointSliceList,
Expand All @@ -122,12 +131,13 @@ func resolveEndpoints(

// Endpoints may be duplicated across multiple EndpointSlices.
// Using a set to prevent returning duplicate endpoints.
endpointSet := initEndpointsSet(filteredSlices)
endpointSet := initEndpointsSet(logger, filteredSlices)

for _, eps := range filteredSlices {
ipv6 := eps.AddressType == discoveryV1.AddressTypeIPv6
for _, endpoint := range eps.Endpoints {
if !endpointReady(endpoint) {
logger.V(1).Info("ignoring endpoint that is not ready", "endpoint", endpoint)
continue
}

Expand Down
16 changes: 12 additions & 4 deletions internal/controller/state/resolver/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"testing"

"github.com/go-logr/logr"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
discoveryV1 "k8s.io/api/discovery/v1"
Expand Down Expand Up @@ -539,7 +540,7 @@ func TestCalculateReadyEndpoints(t *testing.T) {
},
}

result := calculateReadyEndpoints(slices)
result := calculateReadyEndpoints(logr.Discard(), slices)

g.Expect(result).To(Equal(4))
}
Expand Down Expand Up @@ -605,7 +606,7 @@ func BenchmarkResolve(b *testing.B) {
Name: "default-name",
}

initEndpointSet := func([]discoveryV1.EndpointSlice) map[Endpoint]struct{} {
initEndpointSet := func(logr.Logger, []discoveryV1.EndpointSlice) map[Endpoint]struct{} {
return make(map[Endpoint]struct{})
}

Expand All @@ -625,8 +626,15 @@ func bench(b *testing.B, svcNsName types.NamespacedName,
list discoveryV1.EndpointSliceList, initSet initEndpointSetFunc, n int,
) {
b.Helper()
for range b.N {
res, err := resolveEndpoints(svcNsName, v1.ServicePort{Port: 80}, list, initSet, dualAddressType)
for b.Loop() {
res, err := resolveEndpoints(
logr.Discard(),
svcNsName,
v1.ServicePort{Port: 80},
list,
initSet,
dualAddressType,
)
if len(res) != n {
b.Fatalf("expected %d endpoints, got %d", n, len(res))
}
Expand Down
Loading
Loading