Skip to content

Commit 5af67b1

Browse files
committed
SASL in processing loop
1 parent 8e89414 commit 5af67b1

File tree

4 files changed

+99
-6
lines changed

4 files changed

+99
-6
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ VERSION ?= $(shell git describe --tags --always --dirty)
88
GOPKGS = $(shell go list ./... | grep -v /vendor/)
99
BUILD_FLAGS ?=
1010
LDFLAGS ?= -X github.com/grepplabs/kafka-proxy/config.Version=$(VERSION) -w -s
11-
TAG ?= "v0.0.1"
11+
TAG ?= "v0.0.2"
1212

1313
PLATFORM ?= $(shell uname -s)
1414
ifeq ($(PLATFORM), Darwin)

README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ The dynamic local listeners feature can be disabled and an additional list of ex
1717
The Proxy can terminate TLS traffic and authenticate users using SASL/PLAIN. The credentials verification method
1818
is configurable and uses golang plugin system over RPC.
1919

20+
The proxies can also authenticate each other using a pluggable method which is transparent to other Kafka servers and clients.
21+
Currently the Google ID Token for service accounts is implemented i.e. proxy client requests and sends service account JWT and proxy server receives and validates it against Google JWKS.
22+
2023
Kafka API calls can be restricted to prevent some operations e.g. topic deletion or produce requests.
2124

2225

@@ -31,11 +34,11 @@ See:
3134

3235
Linux
3336

34-
curl -Lso kafka-proxy https://github.com/grepplabs/kafka-proxy/releases/download/v0.0.1/linux.amd64.kafka-proxy
37+
curl -Lso kafka-proxy https://github.com/grepplabs/kafka-proxy/releases/download/v0.0.2/linux.amd64.kafka-proxy
3538

3639
macOS
3740

38-
curl -Lso kafka-proxy https://github.com/grepplabs/kafka-proxy/releases/download/v0.0.1/darwin.amd64.kafka-proxy
41+
curl -Lso kafka-proxy https://github.com/grepplabs/kafka-proxy/releases/download/v0.0.2/darwin.amd64.kafka-proxy
3942

4043
2. Make the kafka-proxy binary executable
4144

proxy/processor.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ const (
1818
defaultWriteTimeout = 30 * time.Second
1919
defaultReadTimeout = 30 * time.Second
2020
minOpenRequests = 1
21+
22+
apiKeyUnset = int16(-1) // not in protocol
23+
apiKeySaslAuth = int16(-2) // not in protocol
24+
apiKeySaslHandshake = int16(17)
2125
)
2226

2327
type ProcessorConfig struct {
@@ -108,8 +112,16 @@ func requestsLoop(dst DeadlineWriter, src DeadlineReader, openRequestsChannel ch
108112
keyVersionBuf := make([]byte, 8) // Size => int32 + ApiKey => int16 + ApiVersion => int16
109113

110114
buf := make([]byte, bufSize)
111-
115+
lastApiKey := apiKeyUnset
116+
nextRequest:
112117
for {
118+
if lastApiKey == apiKeySaslHandshake {
119+
lastApiKey = apiKeySaslAuth
120+
if readErr, err = copySaslAuthRequest(dst, src, timeout, buf); err != nil {
121+
return readErr, err
122+
}
123+
continue nextRequest
124+
}
113125
// logrus.Println("Await Kafka request")
114126

115127
// waiting for first bytes or EOF - reset deadlines
@@ -124,7 +136,7 @@ func requestsLoop(dst DeadlineWriter, src DeadlineReader, openRequestsChannel ch
124136
if err = protocol.Decode(keyVersionBuf, requestKeyVersion); err != nil {
125137
return true, err
126138
}
127-
// logrus.Printf("Kafka request length %v, key %v, version %v", requestKeyVersion.Length, requestKeyVersion.ApiKey, requestKeyVersion.ApiVersion)
139+
//logrus.Printf("Kafka request length %v, key %v, version %v", requestKeyVersion.Length, requestKeyVersion.ApiKey, requestKeyVersion.ApiVersion)
128140

129141
proxyRequestsTotal.WithLabelValues(brokerAddress, strconv.Itoa(int(requestKeyVersion.ApiKey)), strconv.Itoa(int(requestKeyVersion.ApiVersion))).Inc()
130142
proxyRequestsBytes.WithLabelValues(brokerAddress).Add(float64(requestKeyVersion.Length + 4))
@@ -156,15 +168,26 @@ func requestsLoop(dst DeadlineWriter, src DeadlineReader, openRequestsChannel ch
156168
if readErr, err = myCopyN(dst, src, int64(requestKeyVersion.Length-4), buf); err != nil {
157169
return readErr, err
158170
}
171+
172+
lastApiKey = requestKeyVersion.ApiKey
159173
}
160174
}
161175

162176
func responsesLoop(dst DeadlineWriter, src DeadlineReader, openRequestsChannel <-chan protocol.RequestKeyVersion, netAddressMappingFunc config.NetAddressMappingFunc, bufSize int, timeout time.Duration, brokerAddress string) (readErr bool, err error) {
163177
responseHeaderBuf := make([]byte, 8) // Size => int32, CorrelationId => int32
164178

165179
buf := make([]byte, bufSize)
180+
lastApiKey := apiKeyUnset
166181

182+
nextResponse:
167183
for {
184+
if lastApiKey == apiKeySaslHandshake {
185+
lastApiKey = apiKeySaslAuth
186+
if readErr, err = copySaslAuthResponse(dst, src, timeout); err != nil {
187+
return readErr, err
188+
}
189+
continue nextResponse
190+
}
168191
//logrus.Println("Await Kafka response")
169192

170193
// waiting for first bytes or EOF - reset deadlines
@@ -197,7 +220,7 @@ func responsesLoop(dst DeadlineWriter, src DeadlineReader, openRequestsChannel <
197220
if err != nil {
198221
return true, err
199222
}
200-
// logrus.Printf("Kafka response length %v, key %v, version %v", requestKeyVersion.Length, requestKeyVersion.ApiKey, requestKeyVersion.ApiVersion)
223+
201224
responseModifier, err := protocol.GetResponseModifier(requestKeyVersion.ApiKey, requestKeyVersion.ApiVersion, netAddressMappingFunc)
202225
if err != nil {
203226
return true, err
@@ -235,6 +258,7 @@ func responsesLoop(dst DeadlineWriter, src DeadlineReader, openRequestsChannel <
235258
return readErr, err
236259
}
237260
}
261+
lastApiKey = requestKeyVersion.ApiKey
238262
}
239263
}
240264

proxy/sasl_forward.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package proxy
2+
3+
import (
4+
"encoding/binary"
5+
"fmt"
6+
"github.com/grepplabs/kafka-proxy/proxy/protocol"
7+
"io"
8+
"time"
9+
)
10+
11+
func copySaslAuthRequest(dst DeadlineWriter, src DeadlineReader, timeout time.Duration, buf []byte) (readErr bool, err error) {
12+
requestDeadline := time.Now().Add(timeout)
13+
err = dst.SetWriteDeadline(requestDeadline)
14+
if err != nil {
15+
return false, err
16+
}
17+
err = src.SetReadDeadline(requestDeadline)
18+
if err != nil {
19+
return true, err
20+
}
21+
22+
sizeBuf := make([]byte, 4) // Size => int32
23+
if _, err = io.ReadFull(src, sizeBuf); err != nil {
24+
return true, err
25+
}
26+
27+
length := binary.BigEndian.Uint32(sizeBuf)
28+
if int32(length) > protocol.MaxRequestSize {
29+
return true, protocol.PacketDecodingError{Info: fmt.Sprintf("auth message of length %d too large", length)}
30+
}
31+
//logrus.Printf("SASL auth request length %v", length)
32+
33+
// write - send to broker
34+
if _, err = dst.Write(sizeBuf); err != nil {
35+
return false, err
36+
}
37+
if readErr, err = myCopyN(dst, src, int64(length), buf); err != nil {
38+
return readErr, err
39+
}
40+
return false, nil
41+
}
42+
43+
func copySaslAuthResponse(dst DeadlineWriter, src DeadlineReader, timeout time.Duration) (readErr bool, err error) {
44+
//logrus.Printf("SASL auth response")
45+
46+
responseDeadline := time.Now().Add(timeout)
47+
err = dst.SetWriteDeadline(responseDeadline)
48+
if err != nil {
49+
return false, err
50+
}
51+
err = src.SetReadDeadline(responseDeadline)
52+
if err != nil {
53+
return true, err
54+
}
55+
56+
header := make([]byte, 4)
57+
_, err = io.ReadFull(src, header)
58+
if err != nil {
59+
return true, err
60+
}
61+
62+
if _, err = dst.Write(header); err != nil {
63+
return false, err
64+
}
65+
return false, nil
66+
}

0 commit comments

Comments
 (0)