diff --git a/oci/tests/integration/go.mod b/oci/tests/integration/go.mod index ae614790e..3415a8535 100644 --- a/oci/tests/integration/go.mod +++ b/oci/tests/integration/go.mod @@ -22,7 +22,7 @@ require ( github.com/fluxcd/pkg/cache v0.10.0 github.com/fluxcd/pkg/git v0.34.0 github.com/fluxcd/pkg/git/gogit v0.37.0 - github.com/fluxcd/pkg/runtime v0.71.0 + github.com/fluxcd/pkg/runtime v0.72.0 github.com/fluxcd/test-infra/tftestenv v0.0.0-20250626232827-e0ca9c3f8d7b github.com/go-git/go-git/v5 v5.16.2 github.com/google/go-containerregistry v0.20.6 diff --git a/runtime/events/recorder.go b/runtime/events/recorder.go index a4cd6b632..fb5bf9027 100644 --- a/runtime/events/recorder.go +++ b/runtime/events/recorder.go @@ -17,7 +17,7 @@ limitations under the License. package events import ( - "bytes" + "context" "encoding/json" "errors" "fmt" @@ -93,7 +93,7 @@ func NewRecorder(mgr ctrl.Manager, log logr.Logger, webhook, reportingController httpClient := retryablehttp.NewClient() httpClient.HTTPClient.Timeout = 5 * time.Second - httpClient.CheckRetry = retryablehttp.ErrorPropagatedRetryPolicy + httpClient.CheckRetry = checkRetry httpClient.Logger = nil return &Recorder{ @@ -120,7 +120,7 @@ func NewRecorderForScheme(scheme *runtime.Scheme, httpClient := retryablehttp.NewClient() httpClient.HTTPClient.Timeout = 5 * time.Second - httpClient.CheckRetry = retryablehttp.ErrorPropagatedRetryPolicy + httpClient.CheckRetry = checkRetry httpClient.Logger = nil return &Recorder{ @@ -133,6 +133,20 @@ func NewRecorderForScheme(scheme *runtime.Scheme, }, nil } +func checkRetry(ctx context.Context, resp *http.Response, err error) (bool, error) { + if resp != nil && responseIsEventDuplicated(resp) { + return false, nil // Don't retry + } + return retryablehttp.ErrorPropagatedRetryPolicy(ctx, resp, err) +} + +// responseIsEventDuplicated checks if the received response is a signal of a duplicate event. +// The Notification Controller returns a 429 Too-Many-Requests response when the posted message +// is a duplicate (within a certain time window). +func responseIsEventDuplicated(resp *http.Response) bool { + return resp.StatusCode == http.StatusTooManyRequests +} + // Event records an event in the webhook address. func (r *Recorder) Event(object runtime.Object, eventtype, reason, message string) { r.AnnotatedEventf(object, nil, eventtype, reason, "%s", message) @@ -250,12 +264,6 @@ func (r *Recorder) AnnotatedEventf( return } - // avoid retrying rate limited requests - if res, _ := r.Client.HTTPClient.Post(r.Webhook, "application/json", bytes.NewReader(body)); res != nil && - (res.StatusCode == http.StatusTooManyRequests || res.StatusCode == http.StatusAccepted) { - return - } - if _, err := r.Client.Post(r.Webhook, "application/json", body); err != nil { log.Error(err, "unable to record event") return diff --git a/runtime/events/recorder_test.go b/runtime/events/recorder_test.go index 887668ffb..5576735f0 100644 --- a/runtime/events/recorder_test.go +++ b/runtime/events/recorder_test.go @@ -118,11 +118,11 @@ func TestEventRecorder_AnnotatedEventf(t *testing.T) { const msg = "sync object" eventRecorder.AnnotatedEventf(obj, tt.inputAnnotations, corev1.EventTypeNormal, "sync", "%s", msg) - require.Equal(t, 2, requestCount) + require.Equal(t, 1, requestCount) // When a trace event is sent, it's dropped, no new request. eventRecorder.AnnotatedEventf(obj, tt.inputAnnotations, eventv1.EventTypeTrace, "sync", "%s", msg) - require.Equal(t, 2, requestCount) + require.Equal(t, 1, requestCount) }) } }