Skip to content

Commit 557f5e3

Browse files
committed
[feat aga] Implement endpoints deployer with requeue logic for partially loaded endpoints
1 parent 28ea3e0 commit 557f5e3

File tree

7 files changed

+1275
-11
lines changed

7 files changed

+1275
-11
lines changed

controllers/aga/globalaccelerator_controller.go

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,15 @@ const (
6464
agaResourcesGroupVersion = "aga.k8s.aws/v1beta1"
6565
globalAcceleratorKind = "GlobalAccelerator"
6666

67-
// Requeue constants for provisioning state monitoring
68-
requeueMessage = "Monitoring provisioning state"
69-
statusUpdateRequeueTime = 1 * time.Minute
67+
// Requeue constants for state monitoring
68+
// requeueReasonAcceleratorInProgress indicates that the reconciliation is being requeued because
69+
// the Global Accelerator is still in progress state
70+
requeueReasonAcceleratorInProgress = "Waiting for Global Accelerator %s with status 'IN_PROGRESS' to complete"
71+
72+
// requeueReasonEndpointsInWarningState indicates that the reconciliation is being requeued because
73+
// there are endpoints in warning state that need to be periodically rechecked
74+
requeueReasonEndpointsInWarningState = "Retrying endpoints for Global Accelerator %s which did load successfully - will check availability again soon"
75+
statusUpdateRequeueTime = 1 * time.Minute
7076

7177
// Metric stage constants
7278
MetricStageFetchGlobalAccelerator = "fetch_globalAccelerator"
@@ -251,8 +257,8 @@ func (r *globalAcceleratorReconciler) cleanupGlobalAccelerator(ctx context.Conte
251257
return nil
252258
}
253259

254-
func (r *globalAcceleratorReconciler) buildModel(ctx context.Context, ga *agaapi.GlobalAccelerator) (core.Stack, *agamodel.Accelerator, error) {
255-
stack, accelerator, err := r.modelBuilder.Build(ctx, ga)
260+
func (r *globalAcceleratorReconciler) buildModel(ctx context.Context, ga *agaapi.GlobalAccelerator, loadedEndpoints []*aga.LoadedEndpoint) (core.Stack, *agamodel.Accelerator, error) {
261+
stack, accelerator, err := r.modelBuilder.Build(ctx, ga, loadedEndpoints)
256262
if err != nil {
257263
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedBuildModel, fmt.Sprintf("Failed build model due to %v", err))
258264
return nil, nil, err
@@ -279,7 +285,7 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
279285
r.endpointResourcesManager.MonitorEndpointResources(ga, endpoints)
280286

281287
// Validate and load endpoint status using the endpoint loader
282-
_, fatalErrors := r.endpointLoader.LoadEndpoints(ctx, ga, endpoints)
288+
loadedEndpoints, fatalErrors := r.endpointLoader.LoadEndpoints(ctx, ga, endpoints)
283289
if len(fatalErrors) > 0 {
284290
err := fmt.Errorf("failed to load endpoints: %v", fatalErrors[0])
285291
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedEndpointLoad, fmt.Sprintf("Failed to reconcile due to %v", err))
@@ -295,7 +301,7 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
295301
var accelerator *agamodel.Accelerator
296302
var err error
297303
buildModelFn := func() {
298-
stack, accelerator, err = r.buildModel(ctx, ga)
304+
stack, accelerator, err = r.buildModel(ctx, ga, loadedEndpoints)
299305
}
300306
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageBuildModel, buildModelFn)
301307
if err != nil {
@@ -326,14 +332,37 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
326332

327333
r.logger.Info("Successfully deployed GlobalAccelerator stack", "stackID", stack.StackID())
328334

329-
// Update GlobalAccelerator status after successful deployment
335+
// Check if any endpoints have warning status and collect them
336+
hasWarningEndpoints := false
337+
for _, ep := range loadedEndpoints {
338+
if ep.Status == aga.EndpointStatusWarning {
339+
hasWarningEndpoints = true
340+
}
341+
}
342+
343+
// Update GlobalAccelerator status after successful deployment, including warning endpoints
330344
requeueNeeded, err := r.statusUpdater.UpdateStatusSuccess(ctx, ga, accelerator)
331345
if err != nil {
332346
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedUpdateStatus, fmt.Sprintf("Failed update status due to %v", err))
333347
return err
334348
}
335-
if requeueNeeded {
336-
return ctrlerrors.NewRequeueNeededAfter(requeueMessage, statusUpdateRequeueTime)
349+
350+
// If we have warning endpoints, add a separate condition for them and requeue
351+
if hasWarningEndpoints {
352+
r.logger.V(1).Info("Detected endpoints in warning state, will requeue",
353+
"Global Accelerator", k8s.NamespacedName(ga))
354+
355+
// Add event to notify about warning endpoints
356+
warningMessage := fmt.Sprintf("Detected endpoints which did not load successfully. These endpoints will be rechecked shortly.")
357+
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonWarningEndpoints, warningMessage)
358+
}
359+
360+
if requeueNeeded || hasWarningEndpoints {
361+
message := fmt.Sprintf(requeueReasonAcceleratorInProgress, k8s.NamespacedName(ga))
362+
if hasWarningEndpoints {
363+
message = fmt.Sprintf(requeueReasonEndpointsInWarningState, k8s.NamespacedName(ga))
364+
}
365+
return ctrlerrors.NewRequeueNeededAfter(message, statusUpdateRequeueTime)
337366
}
338367

339368
r.eventRecorder.Event(ga, corev1.EventTypeNormal, k8s.GlobalAcceleratorEventReasonSuccessfullyReconciled, "Successfully reconciled")
@@ -379,7 +408,7 @@ func (r *globalAcceleratorReconciler) cleanupGlobalAcceleratorResources(ctx cont
379408
if updateErr := r.statusUpdater.UpdateStatusDeletion(ctx, ga); updateErr != nil {
380409
r.logger.Error(updateErr, "Failed to update status during accelerator deletion")
381410
}
382-
return ctrlerrors.NewRequeueNeeded("Waiting for accelerator to be disabled")
411+
return ctrlerrors.NewRequeueNeeded(fmt.Sprintf(requeueReasonAcceleratorInProgress, k8s.NamespacedName(ga)))
383412
}
384413

385414
// Any other error

pkg/aws/services/globalaccelerator.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ type GlobalAccelerator interface {
6464

6565
// ListTagsForResource lists tags for a resource.
6666
ListTagsForResourceWithContext(ctx context.Context, input *globalaccelerator.ListTagsForResourceInput) (*globalaccelerator.ListTagsForResourceOutput, error)
67+
68+
// AddEndpoints adds endpoints to an endpoint group.
69+
AddEndpointsWithContext(ctx context.Context, input *globalaccelerator.AddEndpointsInput) (*globalaccelerator.AddEndpointsOutput, error)
70+
71+
// RemoveEndpoints removes endpoints from an endpoint group.
72+
RemoveEndpointsWithContext(ctx context.Context, input *globalaccelerator.RemoveEndpointsInput) (*globalaccelerator.RemoveEndpointsOutput, error)
6773
}
6874

6975
// NewGlobalAccelerator constructs new GlobalAccelerator implementation.
@@ -256,3 +262,19 @@ func (c *defaultGlobalAccelerator) ListEndpointGroupsAsList(ctx context.Context,
256262
}
257263
return result, nil
258264
}
265+
266+
func (c *defaultGlobalAccelerator) AddEndpointsWithContext(ctx context.Context, input *globalaccelerator.AddEndpointsInput) (*globalaccelerator.AddEndpointsOutput, error) {
267+
client, err := c.awsClientsProvider.GetGlobalAcceleratorClient(ctx, "AddEndpoints")
268+
if err != nil {
269+
return nil, err
270+
}
271+
return client.AddEndpoints(ctx, input)
272+
}
273+
274+
func (c *defaultGlobalAccelerator) RemoveEndpointsWithContext(ctx context.Context, input *globalaccelerator.RemoveEndpointsInput) (*globalaccelerator.RemoveEndpointsOutput, error) {
275+
client, err := c.awsClientsProvider.GetGlobalAcceleratorClient(ctx, "RemoveEndpoints")
276+
if err != nil {
277+
return nil, err
278+
}
279+
return client.RemoveEndpoints(ctx, input)
280+
}

pkg/aws/services/globalaccelerator_mocks.go

Lines changed: 30 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)