Skip to content

Commit 5337a13

Browse files
authored
Merge pull request #16 from grepplabs/fix-bootstrap-conflict
Fix bootstrap conflict - improve PR #14
2 parents fea222c + b1b483c commit 5337a13

File tree

2 files changed

+231
-14
lines changed

2 files changed

+231
-14
lines changed

proxy/proxy.go

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,31 +54,60 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
5454
return net.Listen("tcp", cfg.ListenerAddress)
5555
}
5656

57+
brokerToListenerConfig, err := getBrokerToListenerConfig(cfg)
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
return &Listeners{
63+
defaultListenerIP: defaultListenerIP,
64+
connSrc: make(chan Conn, 1),
65+
brokerToListenerConfig: brokerToListenerConfig,
66+
tcpConnOptions: tcpConnOptions,
67+
listenFunc: listenFunc,
68+
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
69+
}, nil
70+
}
71+
72+
func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerConfig, error) {
5773
brokerToListenerConfig := make(map[string]config.ListenerConfig)
5874

59-
// add mapping without starting local listeners
60-
for _, v := range cfg.Proxy.ExternalServers {
75+
for _, v := range cfg.Proxy.BootstrapServers {
6176
if lc, ok := brokerToListenerConfig[v.BrokerAddress]; ok {
77+
if lc.ListenerAddress != v.ListenerAddress || lc.AdvertisedAddress != v.AdvertisedAddress {
78+
return nil, fmt.Errorf("bootstrap server mapping %s configured twice: %v and %v", v.BrokerAddress, v, lc)
79+
}
80+
continue
81+
}
82+
logrus.Infof("Bootstrap server %s advertised as %s", v.BrokerAddress, v.AdvertisedAddress)
83+
brokerToListenerConfig[v.BrokerAddress] = v
84+
}
85+
86+
externalToListenerConfig := make(map[string]config.ListenerConfig)
87+
for _, v := range cfg.Proxy.ExternalServers {
88+
if lc, ok := externalToListenerConfig[v.BrokerAddress]; ok {
6289
if lc.ListenerAddress != v.ListenerAddress {
63-
return nil, fmt.Errorf("broker to listener address mapping %s configured twice: %s and %v", v.BrokerAddress, v.ListenerAddress, lc)
90+
return nil, fmt.Errorf("external server mapping %s configured twice: %s and %v", v.BrokerAddress, v.ListenerAddress, lc)
6491
}
6592
continue
6693
}
6794
if v.ListenerAddress != v.AdvertisedAddress {
6895
return nil, fmt.Errorf("external server mapping has different listener and advertised addresses %v", v)
6996
}
97+
externalToListenerConfig[v.BrokerAddress] = v
98+
}
99+
100+
for _, v := range externalToListenerConfig {
101+
if lc, ok := brokerToListenerConfig[v.BrokerAddress]; ok {
102+
if lc.AdvertisedAddress != v.AdvertisedAddress {
103+
return nil, fmt.Errorf("bootstrap and external server mappings %s with different advertised addresses: %v and %v", v.BrokerAddress, v.ListenerAddress, lc.AdvertisedAddress)
104+
}
105+
continue
106+
}
70107
logrus.Infof("External server %s advertised as %s", v.BrokerAddress, v.AdvertisedAddress)
71108
brokerToListenerConfig[v.BrokerAddress] = v
72109
}
73-
74-
return &Listeners{
75-
defaultListenerIP: defaultListenerIP,
76-
connSrc: make(chan Conn, 1),
77-
brokerToListenerConfig: brokerToListenerConfig,
78-
tcpConnOptions: tcpConnOptions,
79-
listenFunc: listenFunc,
80-
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
81-
}, nil
110+
return brokerToListenerConfig, nil
82111
}
83112

84113
func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
@@ -132,7 +161,6 @@ func (p *Listeners) ListenInstances(cfgs []config.ListenerConfig) (<-chan Conn,
132161
if err != nil {
133162
return nil, err
134163
}
135-
p.brokerToListenerConfig[v.BrokerAddress] = v
136164
}
137165
return p.connSrc, nil
138166
}
@@ -160,6 +188,6 @@ func listenInstance(dst chan<- Conn, cfg config.ListenerConfig, opts TCPConnOpti
160188
}
161189
})
162190

163-
logrus.Infof("Listening on %s (%s) for remote %s advertised as %s", cfg.ListenerAddress, l.Addr().String(), cfg.BrokerAddress, cfg.AdvertisedAddress)
191+
logrus.Infof("Listening on %s (%s) for remote %s", cfg.ListenerAddress, l.Addr().String(), cfg.BrokerAddress)
164192
return l, nil
165193
}

proxy/proxy_test.go

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package proxy
2+
3+
import (
4+
"fmt"
5+
"github.com/grepplabs/kafka-proxy/config"
6+
"github.com/stretchr/testify/assert"
7+
"testing"
8+
)
9+
10+
func TestGetBrokerToListenerConfig(t *testing.T) {
11+
a := assert.New(t)
12+
13+
tests := []struct {
14+
bootstrapServers []config.ListenerConfig
15+
externalServers []config.ListenerConfig
16+
err error
17+
mapping map[string]config.ListenerConfig
18+
}{
19+
{
20+
[]config.ListenerConfig{},
21+
[]config.ListenerConfig{},
22+
nil,
23+
map[string]config.ListenerConfig{},
24+
},
25+
{
26+
[]config.ListenerConfig{
27+
{"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"},
28+
},
29+
[]config.ListenerConfig{},
30+
nil,
31+
map[string]config.ListenerConfig{
32+
"192.168.99.100:32400": {
33+
BrokerAddress: "192.168.99.100:32400",
34+
ListenerAddress: "0.0.0.0:32400",
35+
AdvertisedAddress: "0.0.0.0:32400",
36+
},
37+
},
38+
},
39+
{
40+
[]config.ListenerConfig{
41+
{"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"},
42+
{"192.168.99.100:32401", "0.0.0.0:32401", "kafka-proxy-0:32401"},
43+
{"192.168.99.100:32402", "0.0.0.0:32402", "kafka-proxy-0:32402"},
44+
},
45+
[]config.ListenerConfig{},
46+
nil,
47+
map[string]config.ListenerConfig{
48+
"192.168.99.100:32400": {
49+
BrokerAddress: "192.168.99.100:32400",
50+
ListenerAddress: "0.0.0.0:32400",
51+
AdvertisedAddress: "kafka-proxy-0:32400",
52+
},
53+
"192.168.99.100:32401": {
54+
BrokerAddress: "192.168.99.100:32401",
55+
ListenerAddress: "0.0.0.0:32401",
56+
AdvertisedAddress: "kafka-proxy-0:32401",
57+
},
58+
"192.168.99.100:32402": {
59+
BrokerAddress: "192.168.99.100:32402",
60+
ListenerAddress: "0.0.0.0:32402",
61+
AdvertisedAddress: "kafka-proxy-0:32402",
62+
},
63+
},
64+
},
65+
{
66+
[]config.ListenerConfig{
67+
{"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"},
68+
{"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"},
69+
},
70+
[]config.ListenerConfig{},
71+
nil,
72+
map[string]config.ListenerConfig{
73+
"192.168.99.100:32400": {
74+
BrokerAddress: "192.168.99.100:32400",
75+
ListenerAddress: "0.0.0.0:32400",
76+
AdvertisedAddress: "0.0.0.0:32400",
77+
},
78+
},
79+
},
80+
{
81+
[]config.ListenerConfig{
82+
{"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"},
83+
{"192.168.99.100:32400", "0.0.0.0:32401", "0.0.0.0:32400"},
84+
},
85+
[]config.ListenerConfig{},
86+
fmt.Errorf("bootstrap server mapping 192.168.99.100:32400 configured twice: {192.168.99.100:32400 0.0.0.0:32401 0.0.0.0:32400} and {192.168.99.100:32400 0.0.0.0:32400 0.0.0.0:32400}"),
87+
nil,
88+
},
89+
{
90+
[]config.ListenerConfig{
91+
{"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"},
92+
{"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32401"},
93+
},
94+
[]config.ListenerConfig{},
95+
fmt.Errorf("bootstrap server mapping 192.168.99.100:32400 configured twice: {192.168.99.100:32400 0.0.0.0:32400 0.0.0.0:32401} and {192.168.99.100:32400 0.0.0.0:32400 0.0.0.0:32400}"),
96+
nil,
97+
},
98+
{
99+
[]config.ListenerConfig{
100+
{"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"},
101+
{"192.168.99.100:32401", "0.0.0.0:32401", "kafka-proxy-0:32401"},
102+
{"192.168.99.100:32402", "0.0.0.0:32402", "kafka-proxy-0:32402"},
103+
},
104+
[]config.ListenerConfig{
105+
{"192.168.99.100:32403", "kafka-proxy-0:32403", "kafka-proxy-0:32403"},
106+
{"192.168.99.100:32404", "kafka-proxy-0:32404", "kafka-proxy-0:32404"},
107+
},
108+
nil,
109+
map[string]config.ListenerConfig{
110+
"192.168.99.100:32400": {
111+
BrokerAddress: "192.168.99.100:32400",
112+
ListenerAddress: "0.0.0.0:32400",
113+
AdvertisedAddress: "kafka-proxy-0:32400",
114+
},
115+
"192.168.99.100:32401": {
116+
BrokerAddress: "192.168.99.100:32401",
117+
ListenerAddress: "0.0.0.0:32401",
118+
AdvertisedAddress: "kafka-proxy-0:32401",
119+
},
120+
"192.168.99.100:32402": {
121+
BrokerAddress: "192.168.99.100:32402",
122+
ListenerAddress: "0.0.0.0:32402",
123+
AdvertisedAddress: "kafka-proxy-0:32402",
124+
},
125+
"192.168.99.100:32403": {
126+
BrokerAddress: "192.168.99.100:32403",
127+
ListenerAddress: "kafka-proxy-0:32403",
128+
AdvertisedAddress: "kafka-proxy-0:32403",
129+
},
130+
"192.168.99.100:32404": {
131+
BrokerAddress: "192.168.99.100:32404",
132+
ListenerAddress: "kafka-proxy-0:32404",
133+
AdvertisedAddress: "kafka-proxy-0:32404",
134+
},
135+
},
136+
},
137+
{
138+
[]config.ListenerConfig{
139+
{"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"},
140+
},
141+
[]config.ListenerConfig{
142+
{"192.168.99.100:32400", "kafka-proxy-0:32400", "kafka-proxy-0:32400"},
143+
},
144+
nil,
145+
map[string]config.ListenerConfig{
146+
"192.168.99.100:32400": {
147+
BrokerAddress: "192.168.99.100:32400",
148+
ListenerAddress: "0.0.0.0:32400",
149+
AdvertisedAddress: "kafka-proxy-0:32400",
150+
},
151+
},
152+
},
153+
{
154+
[]config.ListenerConfig{
155+
{"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"},
156+
},
157+
[]config.ListenerConfig{
158+
{"192.168.99.100:32400", "kafka-proxy-1:32400", "kafka-proxy-1:32400"},
159+
},
160+
fmt.Errorf("bootstrap and external server mappings 192.168.99.100:32400 with different advertised addresses: kafka-proxy-1:32400 and kafka-proxy-0:32400"),
161+
nil,
162+
},
163+
{
164+
[]config.ListenerConfig{},
165+
[]config.ListenerConfig{
166+
{"192.168.99.100:32400", "kafka-proxy-0:32400", "kafka-proxy-0:32401"},
167+
},
168+
fmt.Errorf("external server mapping has different listener and advertised addresses {192.168.99.100:32400 kafka-proxy-0:32400 kafka-proxy-0:32401}"),
169+
nil,
170+
},
171+
{
172+
[]config.ListenerConfig{},
173+
[]config.ListenerConfig{
174+
{"192.168.99.100:32400", "kafka-proxy-0:32400", "kafka-proxy-0:32400"},
175+
{"192.168.99.100:32400", "kafka-proxy-0:32401", "kafka-proxy-0:32401"},
176+
},
177+
fmt.Errorf("external server mapping 192.168.99.100:32400 configured twice: kafka-proxy-0:32401 and {192.168.99.100:32400 kafka-proxy-0:32400 kafka-proxy-0:32400}"),
178+
nil,
179+
},
180+
}
181+
for _, tt := range tests {
182+
c := &config.Config{}
183+
c.Proxy.BootstrapServers = tt.bootstrapServers
184+
c.Proxy.ExternalServers = tt.externalServers
185+
mapping, err := getBrokerToListenerConfig(c)
186+
a.Equal(tt.err, err)
187+
a.Equal(tt.mapping, mapping)
188+
}
189+
}

0 commit comments

Comments
 (0)