Skip to content

Commit 28ea3e0

Browse files
committed
[feat aga] Implement endpoints builder filtering for partially loaded endpoints
1 parent 7a6020b commit 28ea3e0

File tree

4 files changed

+468
-26
lines changed

4 files changed

+468
-26
lines changed

pkg/aga/model_build_endpoint_group.go

Lines changed: 121 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
awssdk "github.com/aws/aws-sdk-go-v2/aws"
7+
"github.com/go-logr/logr"
78
agaapi "sigs.k8s.io/aws-load-balancer-controller/apis/aga/v1beta1"
89
agamodel "sigs.k8s.io/aws-load-balancer-controller/pkg/model/aga"
910
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core"
@@ -12,27 +13,35 @@ import (
1213
// endpointGroupBuilder builds EndpointGroup model resources
1314
type endpointGroupBuilder interface {
1415
// Build builds all endpoint groups for all listeners
15-
Build(ctx context.Context, stack core.Stack, listeners []*agamodel.Listener, listenerConfigs []agaapi.GlobalAcceleratorListener) ([]*agamodel.EndpointGroup, error)
16+
Build(ctx context.Context, stack core.Stack, listeners []*agamodel.Listener,
17+
listenerConfigs []agaapi.GlobalAcceleratorListener, loadedEndpoints []*LoadedEndpoint) ([]*agamodel.EndpointGroup, error)
1618

1719
// buildEndpointGroupsForListener builds endpoint groups for a specific listener
18-
buildEndpointGroupsForListener(ctx context.Context, stack core.Stack, listener *agamodel.Listener, endpointGroups []agaapi.GlobalAcceleratorEndpointGroup, listenerIndex int) ([]*agamodel.EndpointGroup, error)
20+
buildEndpointGroupsForListener(ctx context.Context, stack core.Stack, listener *agamodel.Listener,
21+
endpointGroups []agaapi.GlobalAcceleratorEndpointGroup, listenerIndex int,
22+
loadedEndpoints []*LoadedEndpoint) ([]*agamodel.EndpointGroup, error)
1923
}
2024

2125
// NewEndpointGroupBuilder constructs new endpointGroupBuilder
22-
func NewEndpointGroupBuilder(clusterRegion string) endpointGroupBuilder {
26+
func NewEndpointGroupBuilder(clusterRegion string, gaNamespace string, logger logr.Logger) endpointGroupBuilder {
2327
return &defaultEndpointGroupBuilder{
2428
clusterRegion: clusterRegion,
29+
gaNamespace: gaNamespace,
30+
logger: logger,
2531
}
2632
}
2733

2834
var _ endpointGroupBuilder = &defaultEndpointGroupBuilder{}
2935

3036
type defaultEndpointGroupBuilder struct {
3137
clusterRegion string
38+
gaNamespace string
39+
logger logr.Logger
3240
}
3341

3442
// Build builds EndpointGroup model resources
35-
func (b *defaultEndpointGroupBuilder) Build(ctx context.Context, stack core.Stack, listeners []*agamodel.Listener, listenerConfigs []agaapi.GlobalAcceleratorListener) ([]*agamodel.EndpointGroup, error) {
43+
func (b *defaultEndpointGroupBuilder) Build(ctx context.Context, stack core.Stack, listeners []*agamodel.Listener,
44+
listenerConfigs []agaapi.GlobalAcceleratorListener, loadedEndpoints []*LoadedEndpoint) ([]*agamodel.EndpointGroup, error) {
3645
if listeners == nil || len(listeners) == 0 {
3746
return nil, nil
3847
}
@@ -51,7 +60,7 @@ func (b *defaultEndpointGroupBuilder) Build(ctx context.Context, stack core.Stac
5160
continue
5261
}
5362

54-
listenerEndpointGroups, err := b.buildEndpointGroupsForListener(ctx, stack, listener, *listenerConfig.EndpointGroups, i)
63+
listenerEndpointGroups, err := b.buildEndpointGroupsForListener(ctx, stack, listener, *listenerConfig.EndpointGroups, i, loadedEndpoints)
5564
if err != nil {
5665
return nil, err
5766
}
@@ -114,11 +123,13 @@ func (b *defaultEndpointGroupBuilder) validateEndpointPortOverridesWithinListene
114123
}
115124

116125
// buildEndpointGroupsForListener builds EndpointGroup models for a specific listener
117-
func (b *defaultEndpointGroupBuilder) buildEndpointGroupsForListener(ctx context.Context, stack core.Stack, listener *agamodel.Listener, endpointGroups []agaapi.GlobalAcceleratorEndpointGroup, listenerIndex int) ([]*agamodel.EndpointGroup, error) {
126+
func (b *defaultEndpointGroupBuilder) buildEndpointGroupsForListener(ctx context.Context, stack core.Stack,
127+
listener *agamodel.Listener, endpointGroups []agaapi.GlobalAcceleratorEndpointGroup,
128+
listenerIndex int, loadedEndpoints []*LoadedEndpoint) ([]*agamodel.EndpointGroup, error) {
118129
var result []*agamodel.EndpointGroup
119130

120131
for i, endpointGroup := range endpointGroups {
121-
spec, err := b.buildEndpointGroupSpec(ctx, listener, endpointGroup)
132+
spec, err := b.buildEndpointGroupSpec(ctx, listener, endpointGroup, loadedEndpoints)
122133
if err != nil {
123134
return nil, err
124135
}
@@ -132,7 +143,9 @@ func (b *defaultEndpointGroupBuilder) buildEndpointGroupsForListener(ctx context
132143
}
133144

134145
// buildEndpointGroupSpec builds the EndpointGroupSpec for a single EndpointGroup model resource
135-
func (b *defaultEndpointGroupBuilder) buildEndpointGroupSpec(ctx context.Context, listener *agamodel.Listener, endpointGroup agaapi.GlobalAcceleratorEndpointGroup) (agamodel.EndpointGroupSpec, error) {
146+
func (b *defaultEndpointGroupBuilder) buildEndpointGroupSpec(ctx context.Context,
147+
listener *agamodel.Listener, endpointGroup agaapi.GlobalAcceleratorEndpointGroup,
148+
loadedEndpoints []*LoadedEndpoint) (agamodel.EndpointGroupSpec, error) {
136149
region, err := b.determineRegion(endpointGroup)
137150
if err != nil {
138151
return agamodel.EndpointGroupSpec{}, err
@@ -146,14 +159,90 @@ func (b *defaultEndpointGroupBuilder) buildEndpointGroupSpec(ctx context.Context
146159
return agamodel.EndpointGroupSpec{}, err
147160
}
148161

162+
// Build endpoint configurations from both static configurations and loaded endpoints
163+
endpointConfigurations, err := b.buildEndpointConfigurations(ctx, endpointGroup, loadedEndpoints)
164+
if err != nil {
165+
return agamodel.EndpointGroupSpec{}, err
166+
}
167+
149168
return agamodel.EndpointGroupSpec{
150-
ListenerARN: listener.ListenerARN(),
151-
Region: region,
152-
TrafficDialPercentage: trafficDialPercentage,
153-
PortOverrides: portOverrides,
169+
ListenerARN: listener.ListenerARN(),
170+
Region: region,
171+
TrafficDialPercentage: trafficDialPercentage,
172+
PortOverrides: portOverrides,
173+
EndpointConfigurations: endpointConfigurations,
154174
}, nil
155175
}
156176

177+
// generateEndpointKey creates a consistent string key for endpoint lookup
178+
func generateEndpointKey(ep agaapi.GlobalAcceleratorEndpoint, gaNamespace string) string {
179+
namespace := gaNamespace
180+
if ep.Namespace != nil {
181+
namespace = awssdk.ToString(ep.Namespace)
182+
}
183+
name := awssdk.ToString(ep.Name)
184+
185+
if ep.Type == agaapi.GlobalAcceleratorEndpointTypeEndpointID {
186+
return fmt.Sprintf("%s/%s", ep.Type, awssdk.ToString(ep.EndpointID))
187+
}
188+
return fmt.Sprintf("%s/%s/%s", ep.Type, namespace, name)
189+
}
190+
191+
// buildEndpointConfigurations builds endpoint configurations from both static configurations in the API struct
192+
// and from successfully loaded endpoints
193+
func (b *defaultEndpointGroupBuilder) buildEndpointConfigurations(_ context.Context,
194+
endpointGroup agaapi.GlobalAcceleratorEndpointGroup, loadedEndpoints []*LoadedEndpoint) ([]agamodel.EndpointConfiguration, error) {
195+
196+
var endpointConfigurations []agamodel.EndpointConfiguration
197+
198+
// Skip if no endpoints defined in the endpoint group
199+
if endpointGroup.Endpoints == nil {
200+
return nil, nil
201+
}
202+
203+
// Build a map of loaded endpoints with for quick lookup
204+
loadedEndpointsMap := make(map[string]*LoadedEndpoint)
205+
for _, le := range loadedEndpoints {
206+
key := le.GetKey()
207+
loadedEndpointsMap[key] = le
208+
209+
}
210+
211+
// Process the endpoints defined in the CRD and match with loaded endpoints
212+
for _, ep := range *endpointGroup.Endpoints {
213+
// Create key for lookup using the helper function
214+
lookupKey := generateEndpointKey(ep, b.gaNamespace)
215+
216+
// Find the loaded endpoint
217+
if loadedEndpoint, found := loadedEndpointsMap[lookupKey]; found {
218+
// Add endpoint to model stack only if its in Loaded status and has valid ARN
219+
if loadedEndpoint.Status == EndpointStatusLoaded {
220+
// Create a base configuration with the loaded endpoint's ARN
221+
endpointConfig := agamodel.EndpointConfiguration{
222+
EndpointID: loadedEndpoint.ARN,
223+
}
224+
endpointConfig.Weight = awssdk.Int32(loadedEndpoint.Weight)
225+
endpointConfig.ClientIPPreservationEnabled = ep.ClientIPPreservationEnabled
226+
endpointConfigurations = append(endpointConfigurations, endpointConfig)
227+
} else {
228+
// Log warning for endpoints which are not loaded successfully during loading and has Warning status
229+
b.logger.Info("Endpoint not added to endpoint group as no valid ARN was found during loading",
230+
"endpoint", lookupKey,
231+
"message", loadedEndpoint.Message,
232+
"error", loadedEndpoint.Error)
233+
}
234+
} else {
235+
b.logger.Info("Endpoint not found in loaded endpoints",
236+
"endpoint", lookupKey)
237+
}
238+
}
239+
240+
return endpointConfigurations, nil
241+
}
242+
243+
// Note: The TargetsEndpointGroup method is no longer needed since we match endpoints based on
244+
// the explicit references in the GlobalAcceleratorEndpoint resources under each endpoint group
245+
157246
// validateListenerPortOverrideWithinListenerPortRanges ensures all listener ports used in port overrides are
158247
// contained within the listener's port ranges
159248
func (b *defaultEndpointGroupBuilder) validateListenerPortOverrideWithinListenerPortRanges(listener *agamodel.Listener, portOverrides []agamodel.PortOverride) error {
@@ -248,3 +337,23 @@ func (b *defaultEndpointGroupBuilder) validatePortOverrides(listener *agamodel.L
248337

249338
return nil
250339
}
340+
341+
// buildEndpointConfiguration creates an EndpointConfiguration from a GlobalAcceleratorEndpoint
342+
// This helper function consolidates the repeated code for creating endpoint configurations
343+
func buildEndpointConfigurationFromEndpoint(endpoint *agaapi.GlobalAcceleratorEndpoint) agamodel.EndpointConfiguration {
344+
endpointConfig := agamodel.EndpointConfiguration{
345+
EndpointID: awssdk.ToString(endpoint.EndpointID),
346+
}
347+
348+
// Add weight if specified
349+
if endpoint.Weight != nil {
350+
endpointConfig.Weight = endpoint.Weight
351+
}
352+
353+
// Add client IP preservation setting if specified
354+
if endpoint.ClientIPPreservationEnabled != nil {
355+
endpointConfig.ClientIPPreservationEnabled = endpoint.ClientIPPreservationEnabled
356+
}
357+
358+
return endpointConfig
359+
}

0 commit comments

Comments
 (0)