Skip to content

Commit 769c5be

Browse files
committed
NLB-6294 The synchronizer uses a typed rate-limited workqueue
The latest versions of the kubernetes libraries recommend using a typed workqueue and this reduces a bit of boilerplate and error handling, because we no longer have to cast the workitems returned by the queue into the desired types.
1 parent d6be96b commit 769c5be

File tree

4 files changed

+30
-41
lines changed

4 files changed

+30
-41
lines changed

cmd/nginx-loadbalancer-kubernetes/main.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,13 +133,12 @@ func buildKubernetesClient() (*kubernetes.Clientset, error) {
133133
return client, nil
134134
}
135135

136-
// TODO: NLB-6294 change to use new typed workqueues
137-
//
138-
//nolint:staticcheck //ignore deprecation warnings
139-
func buildWorkQueue(settings configuration.WorkQueueSettings) workqueue.RateLimitingInterface {
136+
func buildWorkQueue(settings configuration.WorkQueueSettings,
137+
) workqueue.TypedRateLimitingInterface[synchronization.ServiceKey] {
140138
slog.Debug("Watcher::buildSynchronizerWorkQueue")
141139

142-
//nolint:staticcheck //ignore deprecation warnings
143-
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(settings.RateLimiterBase, settings.RateLimiterMax)
144-
return workqueue.NewNamedRateLimitingQueue(rateLimiter, settings.Name)
140+
rateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[synchronization.ServiceKey](
141+
settings.RateLimiterBase, settings.RateLimiterMax)
142+
return workqueue.NewTypedRateLimitingQueueWithConfig(
143+
rateLimiter, workqueue.TypedRateLimitingQueueConfig[synchronization.ServiceKey]{Name: settings.Name})
145144
}

internal/synchronization/synchronizer.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,7 @@ type ServiceKey struct {
5050
// a Border Client as specified in the Service annotation for the Upstream.
5151
// See application/border_client.go and application/application_constants.go for details.
5252
type Synchronizer struct {
53-
// TODO: NLB-6294 change to use new typed workqueues
54-
//nolint:staticcheck //ignore deprecation warnings
55-
eventQueue workqueue.RateLimitingInterface
53+
eventQueue workqueue.TypedRateLimitingInterface[ServiceKey]
5654
settings configuration.Settings
5755
translator Translator
5856
cache *cache
@@ -62,8 +60,7 @@ type Synchronizer struct {
6260
// NewSynchronizer creates a new Synchronizer.
6361
func NewSynchronizer(
6462
settings configuration.Settings,
65-
//nolint:staticcheck //ignore deprecation warnings
66-
eventQueue workqueue.RateLimitingInterface,
63+
eventQueue workqueue.TypedRateLimitingInterface[ServiceKey],
6764
translator Translator,
6865
serviceLister corelisters.ServiceLister,
6966
) (*Synchronizer, error) {
@@ -292,13 +289,7 @@ func (s *Synchronizer) handleNextServiceEvent(ctx context.Context) bool {
292289

293290
defer s.eventQueue.Done(svc)
294291

295-
key, ok := svc.(ServiceKey)
296-
if !ok {
297-
slog.Warn(`Synchronizer::handleNextServiceEvent: invalid service type`, "service", svc)
298-
return true
299-
}
300-
301-
s.withRetry(s.handleServiceEvent(ctx, key), key)
292+
s.withRetry(s.handleServiceEvent(ctx, svc), svc)
302293

303294
return true
304295
}

internal/synchronization/synchronizer_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
func TestSynchronizer_NewSynchronizer(t *testing.T) {
2323
t.Parallel()
2424

25-
rateLimiter := &mocks.MockRateLimiter{}
25+
rateLimiter := &mocks.MockRateLimiter[ServiceKey]{}
2626

2727
synchronizer, err := NewSynchronizer(
2828
configuration.Settings{},
@@ -43,7 +43,7 @@ func TestSynchronizer_AddEventNoHosts(t *testing.T) {
4343
t.Parallel()
4444
const expectedEventCount = 0
4545

46-
rateLimiter := &mocks.MockRateLimiter{}
46+
rateLimiter := &mocks.MockRateLimiter[ServiceKey]{}
4747

4848
synchronizer, err := NewSynchronizer(
4949
defaultSettings(),
@@ -73,7 +73,7 @@ func TestSynchronizer_AddEventOneHost(t *testing.T) {
7373
const expectedEventCount = 1
7474
events := buildServerUpdateEvents(1)
7575

76-
rateLimiter := &mocks.MockRateLimiter{}
76+
rateLimiter := &mocks.MockRateLimiter[ServiceKey]{}
7777

7878
synchronizer, err := NewSynchronizer(
7979
defaultSettings("https://localhost:8080"),
@@ -106,7 +106,7 @@ func TestSynchronizer_AddEventManyHosts(t *testing.T) {
106106
"https://localhost:8082",
107107
}
108108

109-
rateLimiter := &mocks.MockRateLimiter{}
109+
rateLimiter := &mocks.MockRateLimiter[ServiceKey]{}
110110

111111
synchronizer, err := NewSynchronizer(
112112
defaultSettings(hosts...),
@@ -133,7 +133,7 @@ func TestSynchronizer_AddEventsNoHosts(t *testing.T) {
133133
t.Parallel()
134134
const expectedEventCount = 0
135135
events := buildServerUpdateEvents(4)
136-
rateLimiter := &mocks.MockRateLimiter{}
136+
rateLimiter := &mocks.MockRateLimiter[ServiceKey]{}
137137

138138
synchronizer, err := NewSynchronizer(
139139
defaultSettings(),
@@ -165,7 +165,7 @@ func TestSynchronizer_AddEventsOneHost(t *testing.T) {
165165
t.Parallel()
166166
const expectedEventCount = 4
167167
events := buildServerUpdateEvents(1)
168-
rateLimiter := &mocks.MockRateLimiter{}
168+
rateLimiter := &mocks.MockRateLimiter[ServiceKey]{}
169169

170170
synchronizer, err := NewSynchronizer(
171171
defaultSettings("https://localhost:8080"),
@@ -195,7 +195,7 @@ func TestSynchronizer_AddEventsManyHosts(t *testing.T) {
195195
t.Parallel()
196196
const eventCount = 4
197197
events := buildServerUpdateEvents(eventCount)
198-
rateLimiter := &mocks.MockRateLimiter{}
198+
rateLimiter := &mocks.MockRateLimiter[ServiceKey]{}
199199

200200
hosts := []string{
201201
"https://localhost:8080",

test/mocks/mock_ratelimitinginterface.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,51 +7,50 @@ package mocks
77

88
import "time"
99

10-
type MockRateLimiter struct {
11-
items []interface{}
10+
type MockRateLimiter[T any] struct {
11+
items []T
1212
}
1313

14-
func (m *MockRateLimiter) Add(_ interface{}) {
14+
func (m *MockRateLimiter[T]) Add(_ T) {
1515
}
1616

17-
func (m *MockRateLimiter) Len() int {
17+
func (m *MockRateLimiter[T]) Len() int {
1818
return len(m.items)
1919
}
2020

21-
func (m *MockRateLimiter) Get() (item interface{}, shutdown bool) {
21+
func (m *MockRateLimiter[T]) Get() (item T, shutdown bool) {
2222
if len(m.items) > 0 {
2323
item = m.items[0]
2424
m.items = m.items[1:]
2525
return item, false
2626
}
27-
return nil, false
27+
return item, false
2828
}
2929

30-
func (m *MockRateLimiter) Done(_ interface{}) {
30+
func (m *MockRateLimiter[T]) Done(_ T) {
3131
}
3232

33-
func (m *MockRateLimiter) ShutDown() {
33+
func (m *MockRateLimiter[T]) ShutDown() {
3434
}
3535

36-
func (m *MockRateLimiter) ShutDownWithDrain() {
36+
func (m *MockRateLimiter[T]) ShutDownWithDrain() {
3737
}
3838

39-
func (m *MockRateLimiter) ShuttingDown() bool {
39+
func (m *MockRateLimiter[T]) ShuttingDown() bool {
4040
return true
4141
}
4242

43-
func (m *MockRateLimiter) AddAfter(item interface{}, _ time.Duration) {
43+
func (m *MockRateLimiter[T]) AddAfter(item T, _ time.Duration) {
4444
m.items = append(m.items, item)
4545
}
4646

47-
func (m *MockRateLimiter) AddRateLimited(item interface{}) {
47+
func (m *MockRateLimiter[T]) AddRateLimited(item T) {
4848
m.items = append(m.items, item)
4949
}
5050

51-
func (m *MockRateLimiter) Forget(_ interface{}) {
52-
51+
func (m *MockRateLimiter[T]) Forget(_ T) {
5352
}
5453

55-
func (m *MockRateLimiter) NumRequeues(_ interface{}) int {
54+
func (m *MockRateLimiter[T]) NumRequeues(_ T) int {
5655
return 0
5756
}

0 commit comments

Comments
 (0)