Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion oci/tests/integration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/fluxcd/pkg/cache v0.10.0
github.com/fluxcd/pkg/git v0.34.0
github.com/fluxcd/pkg/git/gogit v0.37.0
github.com/fluxcd/pkg/runtime v0.71.0
github.com/fluxcd/pkg/runtime v0.72.0
github.com/fluxcd/test-infra/tftestenv v0.0.0-20250626232827-e0ca9c3f8d7b
github.com/go-git/go-git/v5 v5.16.2
github.com/google/go-containerregistry v0.20.6
Expand Down
26 changes: 17 additions & 9 deletions runtime/events/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package events

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -93,7 +93,7 @@ func NewRecorder(mgr ctrl.Manager, log logr.Logger, webhook, reportingController

httpClient := retryablehttp.NewClient()
httpClient.HTTPClient.Timeout = 5 * time.Second
httpClient.CheckRetry = retryablehttp.ErrorPropagatedRetryPolicy
httpClient.CheckRetry = checkRetry
httpClient.Logger = nil

return &Recorder{
Expand All @@ -120,7 +120,7 @@ func NewRecorderForScheme(scheme *runtime.Scheme,

httpClient := retryablehttp.NewClient()
httpClient.HTTPClient.Timeout = 5 * time.Second
httpClient.CheckRetry = retryablehttp.ErrorPropagatedRetryPolicy
httpClient.CheckRetry = checkRetry
httpClient.Logger = nil

return &Recorder{
Expand All @@ -133,6 +133,20 @@ func NewRecorderForScheme(scheme *runtime.Scheme,
}, nil
}

func checkRetry(ctx context.Context, resp *http.Response, err error) (bool, error) {
if resp != nil && responseIsEventDuplicated(resp) {
return false, nil // Don't retry
}
return retryablehttp.ErrorPropagatedRetryPolicy(ctx, resp, err)
}

// responseIsEventDuplicated checks if the received response is a signal of a duplicate event.
// The Notification Controller returns a 429 Too-Many-Requests response when the posted message
// is a duplicate (within a certain time window).
func responseIsEventDuplicated(resp *http.Response) bool {
return resp.StatusCode == http.StatusTooManyRequests
}

// Event records an event in the webhook address.
func (r *Recorder) Event(object runtime.Object, eventtype, reason, message string) {
r.AnnotatedEventf(object, nil, eventtype, reason, "%s", message)
Expand Down Expand Up @@ -250,12 +264,6 @@ func (r *Recorder) AnnotatedEventf(
return
}

// avoid retrying rate limited requests
if res, _ := r.Client.HTTPClient.Post(r.Webhook, "application/json", bytes.NewReader(body)); res != nil &&
(res.StatusCode == http.StatusTooManyRequests || res.StatusCode == http.StatusAccepted) {
return
}

if _, err := r.Client.Post(r.Webhook, "application/json", body); err != nil {
log.Error(err, "unable to record event")
return
Expand Down
4 changes: 2 additions & 2 deletions runtime/events/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ func TestEventRecorder_AnnotatedEventf(t *testing.T) {
const msg = "sync object"

eventRecorder.AnnotatedEventf(obj, tt.inputAnnotations, corev1.EventTypeNormal, "sync", "%s", msg)
require.Equal(t, 2, requestCount)
require.Equal(t, 1, requestCount)

// When a trace event is sent, it's dropped, no new request.
eventRecorder.AnnotatedEventf(obj, tt.inputAnnotations, eventv1.EventTypeTrace, "sync", "%s", msg)
require.Equal(t, 2, requestCount)
require.Equal(t, 1, requestCount)
})
}
}
Expand Down
Loading