diff --git a/webhook/filtered_notifier.go b/webhook/filtered_notifier.go new file mode 100644 index 000000000..131f46ce5 --- /dev/null +++ b/webhook/filtered_notifier.go @@ -0,0 +1,64 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package webhook + +import ( + "context" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" +) + +type FilteredNotifierParams struct { + // Events will be used to filter out webhook events. One might want only a subset of events + // If Events is nil or zero-sized, all events will be sent + Events []string + Logger logger.Logger +} + +type FilteredNotifier struct { + logger logger.Logger + events []string // TODO do we really need a map[string]struct{} + queuedNotifier QueuedNotifier +} + +func NewFilteredNotifier(notifier QueuedNotifier, params FilteredNotifierParams) *FilteredNotifier { + if params.Logger == nil { + params.Logger = logger.GetLogger() + } + return &FilteredNotifier{ + logger: params.Logger, + events: params.Events, + queuedNotifier: notifier, + } +} + +func (notifier *FilteredNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error { + if len(notifier.events) == 0 { + return notifier.queuedNotifier.QueueNotify(ctx, event) + } + + for _, ev := range notifier.events { + if ev == event.Event { + return notifier.queuedNotifier.QueueNotify(ctx, event) + } + } + + notifier.logger.Debugw("ignoring event: %s", event.Event) + return nil +} + +func (notifier *FilteredNotifier) Stop(force bool) { + notifier.queuedNotifier.Stop(force) +} diff --git a/webhook/notifier.go b/webhook/notifier.go index da1a91de4..57d599615 100644 --- a/webhook/notifier.go +++ b/webhook/notifier.go @@ -24,41 +24,63 @@ import ( type QueuedNotifier interface { QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error + Stop(force bool) } type DefaultNotifier struct { - urlNotifiers []*URLNotifier + queuedNotifiers []QueuedNotifier } func NewDefaultNotifier(apiKey, apiSecret string, urls []string) QueuedNotifier { n := &DefaultNotifier{} for _, url := range urls { - u := NewURLNotifier(URLNotifierParams{ + u := NewURLNotifierWrapper(URLNotifierParams{ URL: url, Logger: logger.GetLogger().WithComponent("webhook"), APIKey: apiKey, APISecret: apiSecret, }) - n.urlNotifiers = append(n.urlNotifiers, u) + n.queuedNotifiers = append(n.queuedNotifiers, u) + } + return n +} + +// NewDefaultNotifierWithFilter takes an events eventsFilter that is shared across all urls. +// if eventsFilter is nil, then all events will be sent. If not only the events specified +// by eventsFilter will be sent and any other event will be ignored with a debug log +// TODO maybe add eventsFilter per url?! but it's not my use case. +func NewDefaultNotifierWithFilter(apiKey, apiSecret string, urls []string, eventsFilter []string) QueuedNotifier { + n := &DefaultNotifier{} + for _, url := range urls { + u := NewFilteredNotifier(NewURLNotifierWrapper(URLNotifierParams{ + URL: url, + Logger: logger.GetLogger().WithComponent("webhook"), + APIKey: apiKey, + APISecret: apiSecret, + }), FilteredNotifierParams{ + Events: eventsFilter, + Logger: logger.GetLogger().WithComponent("webhook"), + }) + n.queuedNotifiers = append(n.queuedNotifiers, u) } return n } func (n *DefaultNotifier) Stop(force bool) { wg := sync.WaitGroup{} - for _, u := range n.urlNotifiers { + for _, u := range n.queuedNotifiers { wg.Add(1) - go func(u *URLNotifier) { + go func(qn QueuedNotifier) { defer wg.Done() - u.Stop(force) + qn.Stop(force) }(u) } wg.Wait() } -func (n *DefaultNotifier) QueueNotify(_ context.Context, event *livekit.WebhookEvent) error { - for _, u := range n.urlNotifiers { - if err := u.QueueNotify(event); err != nil { +func (n *DefaultNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error { + for _, u := range n.queuedNotifiers { + if err := u.QueueNotify(ctx, event); err != nil { return err } } diff --git a/webhook/url_notifier_wrapper.go b/webhook/url_notifier_wrapper.go new file mode 100644 index 000000000..68282ec34 --- /dev/null +++ b/webhook/url_notifier_wrapper.go @@ -0,0 +1,32 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package webhook + +import ( + "context" + "github.com/livekit/protocol/livekit" +) + +type URLNotifierWrapper struct { + *URLNotifier +} + +func NewURLNotifierWrapper(params URLNotifierParams) *URLNotifierWrapper { + return &URLNotifierWrapper{URLNotifier: NewURLNotifier(params)} +} + +func (n *URLNotifierWrapper) QueueNotify(_ context.Context, event *livekit.WebhookEvent) error { + return n.URLNotifier.QueueNotify(event) +} diff --git a/webhook/webhook_test.go b/webhook/webhook_test.go index 1ec111c2e..44022f3f9 100644 --- a/webhook/webhook_test.go +++ b/webhook/webhook_test.go @@ -16,6 +16,7 @@ package webhook import ( "context" + "github.com/livekit/protocol/logger" "net" "net/http" "sync" @@ -143,6 +144,80 @@ func TestURLNotifierLifecycle(t *testing.T) { }) } +func TestFilteredNotifier(t *testing.T) { + var ( + maxItr = 10 + events = []string{ + EventRoomStarted, + EventRoomFinished, + EventParticipantJoined, + EventParticipantLeft, + EventTrackPublished, + EventTrackUnpublished, + EventEgressStarted, + EventEgressUpdated, + EventEgressEnded, + EventIngressStarted, + EventIngressEnded, + } + nm = notifierMock{numCalled: &atomic.Int32{}} + ) + + t.Run("empty filter should send all events", func(t *testing.T) { + filteredNotifier := newTestFilteredNotifier(&nm, nil) + defer filteredNotifier.Stop(true) + + for i := 0; i < maxItr; i++ { + for _, event := range events { + _ = filteredNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: event}) + } + } + + require.Equal(t, int32(maxItr*len(events)), nm.numCalled.Load()) + }) + + t.Run("one filter", func(t *testing.T) { + events := events[:1] + filteredNotifier := newTestFilteredNotifier(&nm, events) + defer filteredNotifier.Stop(true) + + for i := 0; i < maxItr; i++ { + for _, event := range events { + _ = filteredNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: event}) + } + } + + require.Equal(t, int32(maxItr*len(events)), nm.numCalled.Load()) + }) + + t.Run("some filter", func(t *testing.T) { + events := events[2:5] + filteredNotifier := newTestFilteredNotifier(&nm, events) + defer filteredNotifier.Stop(true) + + for i := 0; i < maxItr; i++ { + for _, event := range events { + _ = filteredNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: event}) + } + } + + require.Equal(t, int32(maxItr*len(events)), nm.numCalled.Load()) + }) + + t.Run("all filter should send all events", func(t *testing.T) { + filteredNotifier := newTestFilteredNotifier(&nm, events) + defer filteredNotifier.Stop(true) + + for i := 0; i < maxItr; i++ { + for _, event := range events { + _ = filteredNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: event}) + } + } + + require.Equal(t, int32(maxItr*len(events)), nm.numCalled.Load()) + }) +} + func newTestNotifier() *URLNotifier { return NewURLNotifier(URLNotifierParams{ QueueSize: 20, @@ -152,6 +227,27 @@ func newTestNotifier() *URLNotifier { }) } +type notifierMock struct { + numCalled *atomic.Int32 +} + +func (n notifierMock) QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error { + n.numCalled.Inc() + return nil +} + +func (n notifierMock) Stop(force bool) { + n.numCalled.Store(0) +} + +func newTestFilteredNotifier(notifier *notifierMock, events []string) *FilteredNotifier { + + return NewFilteredNotifier(notifier, FilteredNotifierParams{ + Events: events, + Logger: logger.GetLogger(), + }) +} + type testServer struct { handler func(r *http.Request) server *http.Server