Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions controllers/aga/globalaccelerator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,15 @@ const (
agaResourcesGroupVersion = "aga.k8s.aws/v1beta1"
globalAcceleratorKind = "GlobalAccelerator"

// Requeue constants for provisioning state monitoring
requeueMessage = "Monitoring provisioning state"
statusUpdateRequeueTime = 1 * time.Minute
// Requeue constants for state monitoring
// requeueReasonAcceleratorInProgress indicates that the reconciliation is being requeued because
// the Global Accelerator is still in progress state
requeueReasonAcceleratorInProgress = "Waiting for Global Accelerator %s with status 'IN_PROGRESS' to complete"

// requeueReasonEndpointsInWarningState indicates that the reconciliation is being requeued because
// there are endpoints in warning state that need to be periodically rechecked
requeueReasonEndpointsInWarningState = "Retrying endpoints for Global Accelerator %s which did load successfully - will check availability again soon"
statusUpdateRequeueTime = 1 * time.Minute

// Metric stage constants
MetricStageFetchGlobalAccelerator = "fetch_globalAccelerator"
Expand Down Expand Up @@ -251,8 +257,8 @@ func (r *globalAcceleratorReconciler) cleanupGlobalAccelerator(ctx context.Conte
return nil
}

func (r *globalAcceleratorReconciler) buildModel(ctx context.Context, ga *agaapi.GlobalAccelerator) (core.Stack, *agamodel.Accelerator, error) {
stack, accelerator, err := r.modelBuilder.Build(ctx, ga)
func (r *globalAcceleratorReconciler) buildModel(ctx context.Context, ga *agaapi.GlobalAccelerator, loadedEndpoints []*aga.LoadedEndpoint) (core.Stack, *agamodel.Accelerator, error) {
stack, accelerator, err := r.modelBuilder.Build(ctx, ga, loadedEndpoints)
if err != nil {
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedBuildModel, fmt.Sprintf("Failed build model due to %v", err))
return nil, nil, err
Expand All @@ -279,7 +285,7 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
r.endpointResourcesManager.MonitorEndpointResources(ga, endpoints)

// Validate and load endpoint status using the endpoint loader
_, fatalErrors := r.endpointLoader.LoadEndpoints(ctx, ga, endpoints)
loadedEndpoints, fatalErrors := r.endpointLoader.LoadEndpoints(ctx, ga, endpoints)
if len(fatalErrors) > 0 {
err := fmt.Errorf("failed to load endpoints: %v", fatalErrors[0])
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedEndpointLoad, fmt.Sprintf("Failed to reconcile due to %v", err))
Expand All @@ -295,7 +301,7 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
var accelerator *agamodel.Accelerator
var err error
buildModelFn := func() {
stack, accelerator, err = r.buildModel(ctx, ga)
stack, accelerator, err = r.buildModel(ctx, ga, loadedEndpoints)
}
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageBuildModel, buildModelFn)
if err != nil {
Expand Down Expand Up @@ -326,14 +332,37 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co

r.logger.Info("Successfully deployed GlobalAccelerator stack", "stackID", stack.StackID())

// Update GlobalAccelerator status after successful deployment
// Check if any endpoints have warning status and collect them
hasWarningEndpoints := false
for _, ep := range loadedEndpoints {
if ep.Status == aga.EndpointStatusWarning {
hasWarningEndpoints = true
}
}

// Update GlobalAccelerator status after successful deployment, including warning endpoints
requeueNeeded, err := r.statusUpdater.UpdateStatusSuccess(ctx, ga, accelerator)
if err != nil {
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedUpdateStatus, fmt.Sprintf("Failed update status due to %v", err))
return err
}
if requeueNeeded {
return ctrlerrors.NewRequeueNeededAfter(requeueMessage, statusUpdateRequeueTime)

// If we have warning endpoints, add a separate condition for them and requeue
if hasWarningEndpoints {
r.logger.V(1).Info("Detected endpoints in warning state, will requeue",
"Global Accelerator", k8s.NamespacedName(ga))

// Add event to notify about warning endpoints
warningMessage := fmt.Sprintf("Detected endpoints which did not load successfully. These endpoints will be rechecked shortly.")
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonWarningEndpoints, warningMessage)
}

if requeueNeeded || hasWarningEndpoints {
message := fmt.Sprintf(requeueReasonAcceleratorInProgress, k8s.NamespacedName(ga))
if hasWarningEndpoints {
message = fmt.Sprintf(requeueReasonEndpointsInWarningState, k8s.NamespacedName(ga))
}
return ctrlerrors.NewRequeueNeededAfter(message, statusUpdateRequeueTime)
}

r.eventRecorder.Event(ga, corev1.EventTypeNormal, k8s.GlobalAcceleratorEventReasonSuccessfullyReconciled, "Successfully reconciled")
Expand Down Expand Up @@ -379,7 +408,7 @@ func (r *globalAcceleratorReconciler) cleanupGlobalAcceleratorResources(ctx cont
if updateErr := r.statusUpdater.UpdateStatusDeletion(ctx, ga); updateErr != nil {
r.logger.Error(updateErr, "Failed to update status during accelerator deletion")
}
return ctrlerrors.NewRequeueNeeded("Waiting for accelerator to be disabled")
return ctrlerrors.NewRequeueNeeded(fmt.Sprintf(requeueReasonAcceleratorInProgress, k8s.NamespacedName(ga)))
}

// Any other error
Expand Down
133 changes: 121 additions & 12 deletions pkg/aga/model_build_endpoint_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/go-logr/logr"
agaapi "sigs.k8s.io/aws-load-balancer-controller/apis/aga/v1beta1"
agamodel "sigs.k8s.io/aws-load-balancer-controller/pkg/model/aga"
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core"
Expand All @@ -12,27 +13,35 @@ import (
// endpointGroupBuilder builds EndpointGroup model resources
type endpointGroupBuilder interface {
// Build builds all endpoint groups for all listeners
Build(ctx context.Context, stack core.Stack, listeners []*agamodel.Listener, listenerConfigs []agaapi.GlobalAcceleratorListener) ([]*agamodel.EndpointGroup, error)
Build(ctx context.Context, stack core.Stack, listeners []*agamodel.Listener,
listenerConfigs []agaapi.GlobalAcceleratorListener, loadedEndpoints []*LoadedEndpoint) ([]*agamodel.EndpointGroup, error)

// buildEndpointGroupsForListener builds endpoint groups for a specific listener
buildEndpointGroupsForListener(ctx context.Context, stack core.Stack, listener *agamodel.Listener, endpointGroups []agaapi.GlobalAcceleratorEndpointGroup, listenerIndex int) ([]*agamodel.EndpointGroup, error)
buildEndpointGroupsForListener(ctx context.Context, stack core.Stack, listener *agamodel.Listener,
endpointGroups []agaapi.GlobalAcceleratorEndpointGroup, listenerIndex int,
loadedEndpoints []*LoadedEndpoint) ([]*agamodel.EndpointGroup, error)
}

// NewEndpointGroupBuilder constructs new endpointGroupBuilder
func NewEndpointGroupBuilder(clusterRegion string) endpointGroupBuilder {
func NewEndpointGroupBuilder(clusterRegion string, gaNamespace string, logger logr.Logger) endpointGroupBuilder {
return &defaultEndpointGroupBuilder{
clusterRegion: clusterRegion,
gaNamespace: gaNamespace,
logger: logger,
}
}

var _ endpointGroupBuilder = &defaultEndpointGroupBuilder{}

type defaultEndpointGroupBuilder struct {
clusterRegion string
gaNamespace string
logger logr.Logger
}

// Build builds EndpointGroup model resources
func (b *defaultEndpointGroupBuilder) Build(ctx context.Context, stack core.Stack, listeners []*agamodel.Listener, listenerConfigs []agaapi.GlobalAcceleratorListener) ([]*agamodel.EndpointGroup, error) {
func (b *defaultEndpointGroupBuilder) Build(ctx context.Context, stack core.Stack, listeners []*agamodel.Listener,
listenerConfigs []agaapi.GlobalAcceleratorListener, loadedEndpoints []*LoadedEndpoint) ([]*agamodel.EndpointGroup, error) {
if listeners == nil || len(listeners) == 0 {
return nil, nil
}
Expand All @@ -51,7 +60,7 @@ func (b *defaultEndpointGroupBuilder) Build(ctx context.Context, stack core.Stac
continue
}

listenerEndpointGroups, err := b.buildEndpointGroupsForListener(ctx, stack, listener, *listenerConfig.EndpointGroups, i)
listenerEndpointGroups, err := b.buildEndpointGroupsForListener(ctx, stack, listener, *listenerConfig.EndpointGroups, i, loadedEndpoints)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,11 +123,13 @@ func (b *defaultEndpointGroupBuilder) validateEndpointPortOverridesWithinListene
}

// buildEndpointGroupsForListener builds EndpointGroup models for a specific listener
func (b *defaultEndpointGroupBuilder) buildEndpointGroupsForListener(ctx context.Context, stack core.Stack, listener *agamodel.Listener, endpointGroups []agaapi.GlobalAcceleratorEndpointGroup, listenerIndex int) ([]*agamodel.EndpointGroup, error) {
func (b *defaultEndpointGroupBuilder) buildEndpointGroupsForListener(ctx context.Context, stack core.Stack,
listener *agamodel.Listener, endpointGroups []agaapi.GlobalAcceleratorEndpointGroup,
listenerIndex int, loadedEndpoints []*LoadedEndpoint) ([]*agamodel.EndpointGroup, error) {
var result []*agamodel.EndpointGroup

for i, endpointGroup := range endpointGroups {
spec, err := b.buildEndpointGroupSpec(ctx, listener, endpointGroup)
spec, err := b.buildEndpointGroupSpec(ctx, listener, endpointGroup, loadedEndpoints)
if err != nil {
return nil, err
}
Expand All @@ -132,7 +143,9 @@ func (b *defaultEndpointGroupBuilder) buildEndpointGroupsForListener(ctx context
}

// buildEndpointGroupSpec builds the EndpointGroupSpec for a single EndpointGroup model resource
func (b *defaultEndpointGroupBuilder) buildEndpointGroupSpec(ctx context.Context, listener *agamodel.Listener, endpointGroup agaapi.GlobalAcceleratorEndpointGroup) (agamodel.EndpointGroupSpec, error) {
func (b *defaultEndpointGroupBuilder) buildEndpointGroupSpec(ctx context.Context,
listener *agamodel.Listener, endpointGroup agaapi.GlobalAcceleratorEndpointGroup,
loadedEndpoints []*LoadedEndpoint) (agamodel.EndpointGroupSpec, error) {
region, err := b.determineRegion(endpointGroup)
if err != nil {
return agamodel.EndpointGroupSpec{}, err
Expand All @@ -146,14 +159,90 @@ func (b *defaultEndpointGroupBuilder) buildEndpointGroupSpec(ctx context.Context
return agamodel.EndpointGroupSpec{}, err
}

// Build endpoint configurations from both static configurations and loaded endpoints
endpointConfigurations, err := b.buildEndpointConfigurations(ctx, endpointGroup, loadedEndpoints)
if err != nil {
return agamodel.EndpointGroupSpec{}, err
}

return agamodel.EndpointGroupSpec{
ListenerARN: listener.ListenerARN(),
Region: region,
TrafficDialPercentage: trafficDialPercentage,
PortOverrides: portOverrides,
ListenerARN: listener.ListenerARN(),
Region: region,
TrafficDialPercentage: trafficDialPercentage,
PortOverrides: portOverrides,
EndpointConfigurations: endpointConfigurations,
}, nil
}

// generateEndpointKey creates a consistent string key for endpoint lookup
func generateEndpointKey(ep agaapi.GlobalAcceleratorEndpoint, gaNamespace string) string {
namespace := gaNamespace
if ep.Namespace != nil {
namespace = awssdk.ToString(ep.Namespace)
}
name := awssdk.ToString(ep.Name)

if ep.Type == agaapi.GlobalAcceleratorEndpointTypeEndpointID {
return fmt.Sprintf("%s/%s", ep.Type, awssdk.ToString(ep.EndpointID))
}
return fmt.Sprintf("%s/%s/%s", ep.Type, namespace, name)
}

// buildEndpointConfigurations builds endpoint configurations from both static configurations in the API struct
// and from successfully loaded endpoints
func (b *defaultEndpointGroupBuilder) buildEndpointConfigurations(_ context.Context,
endpointGroup agaapi.GlobalAcceleratorEndpointGroup, loadedEndpoints []*LoadedEndpoint) ([]agamodel.EndpointConfiguration, error) {

var endpointConfigurations []agamodel.EndpointConfiguration

// Skip if no endpoints defined in the endpoint group
if endpointGroup.Endpoints == nil {
return nil, nil
}

// Build a map of loaded endpoints with for quick lookup
loadedEndpointsMap := make(map[string]*LoadedEndpoint)
for _, le := range loadedEndpoints {
key := le.GetKey()
loadedEndpointsMap[key] = le

}

// Process the endpoints defined in the CRD and match with loaded endpoints
for _, ep := range *endpointGroup.Endpoints {
// Create key for lookup using the helper function
lookupKey := generateEndpointKey(ep, b.gaNamespace)

// Find the loaded endpoint
if loadedEndpoint, found := loadedEndpointsMap[lookupKey]; found {
// Add endpoint to model stack only if its in Loaded status and has valid ARN
if loadedEndpoint.Status == EndpointStatusLoaded {
// Create a base configuration with the loaded endpoint's ARN
endpointConfig := agamodel.EndpointConfiguration{
EndpointID: loadedEndpoint.ARN,
}
endpointConfig.Weight = awssdk.Int32(loadedEndpoint.Weight)
endpointConfig.ClientIPPreservationEnabled = ep.ClientIPPreservationEnabled
endpointConfigurations = append(endpointConfigurations, endpointConfig)
} else {
// Log warning for endpoints which are not loaded successfully during loading and has Warning status
b.logger.Info("Endpoint not added to endpoint group as no valid ARN was found during loading",
"endpoint", lookupKey,
"message", loadedEndpoint.Message,
"error", loadedEndpoint.Error)
}
} else {
b.logger.Info("Endpoint not found in loaded endpoints",
"endpoint", lookupKey)
}
}

return endpointConfigurations, nil
}

// Note: The TargetsEndpointGroup method is no longer needed since we match endpoints based on
// the explicit references in the GlobalAcceleratorEndpoint resources under each endpoint group

// validateListenerPortOverrideWithinListenerPortRanges ensures all listener ports used in port overrides are
// contained within the listener's port ranges
func (b *defaultEndpointGroupBuilder) validateListenerPortOverrideWithinListenerPortRanges(listener *agamodel.Listener, portOverrides []agamodel.PortOverride) error {
Expand Down Expand Up @@ -248,3 +337,23 @@ func (b *defaultEndpointGroupBuilder) validatePortOverrides(listener *agamodel.L

return nil
}

// buildEndpointConfiguration creates an EndpointConfiguration from a GlobalAcceleratorEndpoint
// This helper function consolidates the repeated code for creating endpoint configurations
func buildEndpointConfigurationFromEndpoint(endpoint *agaapi.GlobalAcceleratorEndpoint) agamodel.EndpointConfiguration {
endpointConfig := agamodel.EndpointConfiguration{
EndpointID: awssdk.ToString(endpoint.EndpointID),
}

// Add weight if specified
if endpoint.Weight != nil {
endpointConfig.Weight = endpoint.Weight
}

// Add client IP preservation setting if specified
if endpoint.ClientIPPreservationEnabled != nil {
endpointConfig.ClientIPPreservationEnabled = endpoint.ClientIPPreservationEnabled
}

return endpointConfig
}
Loading
Loading