Skip to content

Commit 456b2d7

Browse files
authored
[release-1.14] Allow enabling sarama logging and disabling client pool (knative-extensions#4103) (knative-extensions#4107)
* Allow enabling sarama logging and disabling client pool (knative-extensions#4103) Add 3 new environment variables: ``` ENABLE_SARAMA_LOGGER (default: false) ENABLE_SARAMA_DEBUG_LOGGER (default: false) ENABLE_SARAMA_CLIENT_POOL (default: true) ``` Signed-off-by: Pierangelo Di Pilato <[email protected]> * Set GetKafkaClient Signed-off-by: Pierangelo Di Pilato <[email protected]> --------- Signed-off-by: Pierangelo Di Pilato <[email protected]>
1 parent eca1ab4 commit 456b2d7

File tree

10 files changed

+120
-48
lines changed

10 files changed

+120
-48
lines changed

control-plane/cmd/kafka-controller/main.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ package main
1919
import (
2020
"context"
2121
"log"
22+
"os"
23+
"strings"
2224

25+
"github.com/IBM/sarama"
2326
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
2427
"knative.dev/pkg/configmap"
2528
"knative.dev/pkg/controller"
@@ -70,7 +73,16 @@ func main() {
7073
auth.OIDCLabelSelector,
7174
eventing.DispatcherLabelSelectorStr,
7275
)
73-
ctx = clientpool.WithKafkaClientPool(ctx)
76+
77+
if v := os.Getenv("ENABLE_SARAMA_LOGGER"); strings.EqualFold(v, "true") {
78+
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags|log.Llongfile)
79+
}
80+
if v := os.Getenv("ENABLE_SARAMA_DEBUG_LOGGER"); strings.EqualFold(v, "true") {
81+
sarama.DebugLogger = log.New(os.Stdout, "[sarama][debug] ", log.LstdFlags|log.Llongfile)
82+
}
83+
if v := os.Getenv("ENABLE_SARAMA_CLIENT_POOL"); v == "" || strings.EqualFold(v, "true") {
84+
ctx = clientpool.WithKafkaClientPool(ctx)
85+
}
7486

7587
sharedmain.MainNamed(ctx, component,
7688

control-plane/config/eventing-kafka-broker/200-controller/500-controller.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ spec:
173173
valueFrom:
174174
fieldRef:
175175
fieldPath: metadata.name
176+
- name: ENABLE_SARAMA_LOGGER
177+
value: "false"
178+
- name: ENABLE_SARAMA_DEBUG_LOGGER
179+
value: "false"
180+
- name: ENABLE_SARAMA_CLIENT_POOL
181+
value: "true"
176182

177183
ports:
178184
- containerPort: 9090

control-plane/pkg/kafka/clientpool/clientpool.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ import (
2727
"go.uber.org/zap"
2828

2929
corev1 "k8s.io/api/core/v1"
30+
"knative.dev/pkg/logging"
31+
3032
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
3133
"knative.dev/eventing-kafka-broker/control-plane/pkg/prober"
3234
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"
33-
"knative.dev/pkg/logging"
3435
)
3536

3637
type KafkaClientKey struct{}
@@ -63,8 +64,21 @@ type ClientPool struct {
6364
}
6465

6566
type GetKafkaClientFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error)
67+
6668
type GetKafkaClusterAdminFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error)
6769

70+
func DisabledGetKafkaClusterAdminFunc(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error) {
71+
c, err := makeSaramaClient(bootstrapServers, secret, sarama.NewClient)
72+
if err != nil {
73+
return nil, err
74+
}
75+
return sarama.NewClusterAdminFromClient(c)
76+
}
77+
78+
func DisabledGetClient(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
79+
return makeSaramaClient(bootstrapServers, secret, sarama.NewClient)
80+
}
81+
6882
func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
6983
client, err := cp.getClient(ctx, bootstrapServers, secret)
7084
if err != nil {
@@ -141,7 +155,11 @@ func (cp *ClientPool) GetClusterAdmin(ctx context.Context, bootstrapServers []st
141155
}
142156

143157
func Get(ctx context.Context) *ClientPool {
144-
return ctx.Value(ctxKey).(*ClientPool)
158+
v := ctx.Value(ctxKey)
159+
if v == nil {
160+
return nil
161+
}
162+
return v.(*ClientPool)
145163
}
146164

147165
func makeClusterAdminKey(bootstrapServers []string, secret *corev1.Secret) clientKey {
@@ -162,6 +180,10 @@ func makeClusterAdminKey(bootstrapServers []string, secret *corev1.Secret) clien
162180
}
163181

164182
func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
183+
return makeSaramaClient(bootstrapServers, secret, cp.newSaramaClient)
184+
}
185+
186+
func makeSaramaClient(bootstrapServers []string, secret *corev1.Secret, newSaramaClient kafka.NewClientFunc) (sarama.Client, error) {
165187
secretOpt, err := security.NewSaramaSecurityOptionFromSecret(secret)
166188
if err != nil {
167189
return nil, err
@@ -172,7 +194,7 @@ func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1
172194
return nil, err
173195
}
174196

175-
saramaClient, err := cp.newSaramaClient(bootstrapServers, config)
197+
saramaClient, err := newSaramaClient(bootstrapServers, config)
176198
if err != nil {
177199
return nil, err
178200
}

control-plane/pkg/reconciler/broker/controller.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
6565
configmapInformer := configmapinformer.Get(ctx)
6666
featureFlags := apisconfig.DefaultFeaturesConfig()
6767

68-
clientPool := clientpool.Get(ctx)
69-
7068
reconciler := &Reconciler{
7169
Reconciler: &base.Reconciler{
7270
KubeClient: kubeclient.Get(ctx),
@@ -79,11 +77,17 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
7977
DispatcherLabel: base.BrokerDispatcherLabel,
8078
ReceiverLabel: base.BrokerReceiverLabel,
8179
},
82-
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
83-
ConfigMapLister: configmapInformer.Lister(),
84-
Env: env,
85-
Counter: counter.NewExpiringCounter(ctx),
86-
KafkaFeatureFlags: featureFlags,
80+
ConfigMapLister: configmapInformer.Lister(),
81+
Env: env,
82+
Counter: counter.NewExpiringCounter(ctx),
83+
KafkaFeatureFlags: featureFlags,
84+
}
85+
86+
clientPool := clientpool.Get(ctx)
87+
if clientPool == nil {
88+
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
89+
} else {
90+
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
8791
}
8892

8993
logger := logging.FromContext(ctx)

control-plane/pkg/reconciler/broker/namespaced_controller.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
8888
logger.Fatal("unable to create Manifestival client-go client", zap.Error(err))
8989
}
9090

91-
clientPool := clientpool.Get(ctx)
92-
9391
reconciler := &NamespacedReconciler{
9492
Reconciler: &base.Reconciler{
9593
KubeClient: kubeclient.Get(ctx),
@@ -103,7 +101,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
103101
DispatcherLabel: base.BrokerDispatcherLabel,
104102
ReceiverLabel: base.BrokerReceiverLabel,
105103
},
106-
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
107104
NamespaceLister: namespaceinformer.Get(ctx).Lister(),
108105
ConfigMapLister: configmapInformer.Lister(),
109106
ServiceAccountLister: serviceaccountinformer.Get(ctx).Lister(),
@@ -119,6 +116,13 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
119116
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
120117
}
121118

119+
clientPool := clientpool.Get(ctx)
120+
if clientPool == nil {
121+
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
122+
} else {
123+
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
124+
}
125+
122126
impl := brokerreconciler.NewImpl(ctx, reconciler, kafka.NamespacedBrokerClass, func(impl *controller.Impl) controller.Options {
123127
return controller.Options{PromoteFilterFunc: kafka.NamespacedBrokerClassFilter()}
124128
})

control-plane/pkg/reconciler/channel/controller.go

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,20 @@ import (
3131
"knative.dev/eventing/pkg/apis/feature"
3232
subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"
3333

34-
messagingv1beta "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1"
35-
kafkachannelinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/messaging/v1beta1/kafkachannel"
36-
kafkachannelreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
37-
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
38-
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
39-
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"
40-
4134
kubeclient "knative.dev/pkg/client/injection/kube/client"
4235
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
4336
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
4437
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
4538
serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service"
4639
"knative.dev/pkg/configmap"
4740

41+
messagingv1beta "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1"
42+
kafkachannelinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/messaging/v1beta1/kafkachannel"
43+
kafkachannelreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
44+
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
45+
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
46+
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"
47+
4848
"knative.dev/pkg/controller"
4949
"knative.dev/pkg/logging"
5050
"knative.dev/pkg/network"
@@ -63,8 +63,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
6363
configmapInformer := configmapinformer.Get(ctx)
6464
serviceInformer := serviceinformer.Get(ctx)
6565

66-
clientPool := clientpool.Get(ctx)
67-
6866
reconciler := &Reconciler{
6967
Reconciler: &base.Reconciler{
7068
KubeClient: kubeclient.Get(ctx),
@@ -77,14 +75,21 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
7775
DispatcherLabel: base.ChannelDispatcherLabel,
7876
ReceiverLabel: base.ChannelReceiverLabel,
7977
},
80-
SubscriptionLister: subscriptioninformer.Get(ctx).Lister(),
81-
GetKafkaClient: clientPool.GetClient,
82-
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
83-
InitOffsetsFunc: offset.InitOffsets,
84-
Env: configs,
85-
ConfigMapLister: configmapInformer.Lister(),
86-
ServiceLister: serviceInformer.Lister(),
87-
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
78+
Env: configs,
79+
InitOffsetsFunc: offset.InitOffsets,
80+
ConfigMapLister: configmapInformer.Lister(),
81+
ServiceLister: serviceinformer.Get(ctx).Lister(),
82+
SubscriptionLister: subscriptioninformer.Get(ctx).Lister(),
83+
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
84+
}
85+
86+
clientPool := clientpool.Get(ctx)
87+
if clientPool == nil {
88+
reconciler.GetKafkaClient = clientpool.DisabledGetClient
89+
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
90+
} else {
91+
reconciler.GetKafkaClient = clientPool.GetClient
92+
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
8893
}
8994

9095
logger := logging.FromContext(ctx)

control-plane/pkg/reconciler/consumergroup/controller.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
119119
//KafkaChannelScheduler: createKafkaScheduler(ctx, c, kafkainternals.ChannelStatefulSetName), //To be added with channel/v2 reconciler version only
120120
}
121121

122-
clientPool := clientpool.Get(ctx)
123-
124122
dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr)
125123

126124
r := &Reconciler{
@@ -132,17 +130,24 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
132130
PodLister: dispatcherPodInformer.Lister(),
133131
KubeClient: kubeclient.Get(ctx),
134132
NameGenerator: names.SimpleNameGenerator,
135-
GetKafkaClient: clientPool.GetClient,
136133
InitOffsetsFunc: offset.InitOffsets,
137134
SystemNamespace: system.Namespace(),
138-
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
139135
KafkaFeatureFlags: config.DefaultFeaturesConfig(),
140136
KedaClient: kedaclient.Get(ctx),
141137
AutoscalerConfig: env.AutoscalerConfigMap,
142138
DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx),
143139
InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache[string, prober.Status, struct{}](ctx, 20*time.Minute),
144140
}
145141

142+
clientPool := clientpool.Get(ctx)
143+
if clientPool == nil {
144+
r.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
145+
r.GetKafkaClient = clientpool.DisabledGetClient
146+
} else {
147+
r.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
148+
r.GetKafkaClient = clientPool.GetClient
149+
}
150+
146151
consumerInformer := consumer.Get(ctx)
147152

148153
consumerGroupInformer := consumergroup.Get(ctx)

control-plane/pkg/reconciler/sink/controller.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
5454

5555
configmapInformer := configmapinformer.Get(ctx)
5656

57-
clientPool := clientpool.Get(ctx)
58-
5957
reconciler := &Reconciler{
6058
Reconciler: &base.Reconciler{
6159
KubeClient: kubeclient.Get(ctx),
@@ -67,9 +65,15 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
6765
DataPlaneNamespace: configs.SystemNamespace,
6866
ReceiverLabel: base.SinkReceiverLabel,
6967
},
70-
ConfigMapLister: configmapInformer.Lister(),
71-
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
72-
Env: configs,
68+
ConfigMapLister: configmapInformer.Lister(),
69+
Env: configs,
70+
}
71+
72+
clientPool := clientpool.Get(ctx)
73+
if clientPool == nil {
74+
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
75+
} else {
76+
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
7377
}
7478

7579
_, err := reconciler.GetOrCreateDataPlaneConfigMap(ctx)

control-plane/pkg/reconciler/trigger/controller.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
6767
triggerLister := triggerInformer.Lister()
6868
serviceaccountInformer := serviceaccountinformer.Get(ctx)
6969

70-
clientPool := clientpool.Get(ctx)
71-
7270
reconciler := &Reconciler{
7371
Reconciler: &base.Reconciler{
7472
KubeClient: kubeclient.Get(ctx),
@@ -92,12 +90,19 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
9290
BrokerClass: kafka.BrokerClass,
9391
DataPlaneConfigMapLabeler: base.NoopConfigmapOption,
9492
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
95-
GetKafkaClient: clientPool.GetClient,
96-
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
9793
InitOffsetsFunc: offset.InitOffsets,
9894
ServiceAccountLister: serviceaccountInformer.Lister(),
9995
}
10096

97+
clientPool := clientpool.Get(ctx)
98+
if clientPool == nil {
99+
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
100+
reconciler.GetKafkaClient = clientpool.DisabledGetClient
101+
} else {
102+
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
103+
reconciler.GetKafkaClient = clientPool.GetClient
104+
}
105+
101106
impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options {
102107
return controller.Options{
103108
FinalizerName: FinalizerName,

control-plane/pkg/reconciler/trigger/namespaced_controller.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
6262
triggerLister := triggerInformer.Lister()
6363
serviceaccountInformer := serviceaccountinformer.Get(ctx)
6464

65-
clientPool := clientpool.Get(ctx)
66-
6765
reconciler := &NamespacedReconciler{
6866
Reconciler: &base.Reconciler{
6967
KubeClient: kubeclient.Get(ctx),
@@ -85,12 +83,19 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
8583
ServiceAccountLister: serviceaccountInformer.Lister(),
8684
EventingClient: eventingclient.Get(ctx),
8785
Env: configs,
88-
GetKafkaClient: clientPool.GetClient,
89-
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
9086
InitOffsetsFunc: offset.InitOffsets,
9187
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
9288
}
9389

90+
clientPool := clientpool.Get(ctx)
91+
if clientPool == nil {
92+
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
93+
reconciler.GetKafkaClient = clientpool.DisabledGetClient
94+
} else {
95+
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
96+
reconciler.GetKafkaClient = clientPool.GetClient
97+
}
98+
9499
impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options {
95100
return controller.Options{
96101
FinalizerName: NamespacedFinalizerName,

0 commit comments

Comments
 (0)