Skip to content

Commit 67dcf04

Browse files
committed
Local SaslHandshakeV1
1 parent 070749a commit 67dcf04

11 files changed

+253
-37
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -359,8 +359,8 @@ spec:
359359
* [X] Registry for built-in plugins
360360
* [X] Client cert check
361361
* [X] Set TLS server CipherSuites and CurvePreferences
362-
* [ ] Optional ApiVersionsRequest before Local SASL Authentication Sequence
363-
* [ ] SaslHandshakeRequest v1 - Kafka 1.1.0
362+
* [X] Optional ApiVersionsRequest before Local SASL Authentication Sequence
363+
* [X] SaslHandshakeRequest v1 - Kafka 1.0.0
364364
* [X] Connect to Kafka through SOCKS5 Proxy
365365
* [ ] Performance tests and tuning
366366
* [ ] Socket buffer sizing e.g. SO_RCVBUF = 32768, SO_SNDBUF = 131072

proxy/processor_default.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,17 @@ func (handler *DefaultRequestHandler) handleRequest(dst DeadlineWriter, src Dead
5353
} else {
5454
switch requestKeyVersion.ApiKey {
5555
case apiKeySaslHandshake:
56-
//TODO: this is only V0 version
57-
if err = ctx.localSasl.receiveAndSendSASLPlainAuth(src, keyVersionBuf); err != nil {
58-
return true, err
56+
switch requestKeyVersion.ApiVersion {
57+
case 0:
58+
if err = ctx.localSasl.receiveAndSendSASLPlainAuthV0(src, keyVersionBuf); err != nil {
59+
return true, err
60+
}
61+
case 1:
62+
if err = ctx.localSasl.receiveAndSendSASLPlainAuthV1(src, keyVersionBuf); err != nil {
63+
return true, err
64+
}
65+
default:
66+
return true, fmt.Errorf("only saslHandshake version 0 and 1 are supported, got version %d", requestKeyVersion.ApiVersion)
5967
}
6068
ctx.localSaslDone = true
6169
src.SetDeadline(time.Time{})

proxy/protocol/packet_decoder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type packetDecoder interface {
1313
getArrayLength() (int, error)
1414
getBool() (bool, error)
1515

16+
getBytes() ([]byte, error)
1617
getString() (string, error)
1718
getNullableString() (*string, error)
1819
getInt32Array() ([]int32, error)

proxy/protocol/packet_encoder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type packetEncoder interface {
1313
putArrayLength(in int) error
1414
putBool(in bool)
1515

16+
putBytes(in []byte) error
1617
putString(in string) error
1718
putNullableString(in *string) error
1819
putStringArray(in []string) error

proxy/protocol/prep_encoder.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,22 @@ func (pe *prepEncoder) putBool(in bool) {
4646
}
4747

4848
// arrays
49+
func (pe *prepEncoder) putBytes(in []byte) error {
50+
pe.length += 4
51+
if in == nil {
52+
return nil
53+
}
54+
return pe.putRawBytes(in)
55+
}
56+
57+
func (pe *prepEncoder) putRawBytes(in []byte) error {
58+
if len(in) > math.MaxInt32 {
59+
return PacketEncodingError{fmt.Sprintf("byteslice too long (%d)", len(in))}
60+
}
61+
pe.length += len(in)
62+
return nil
63+
}
64+
4965
func (pe *prepEncoder) putNullableString(in *string) error {
5066
if in == nil {
5167
pe.length += 2

proxy/protocol/real_decoder.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ var errInvalidArrayLength = PacketDecodingError{"invalid array length"}
99
var errInvalidStringLength = PacketDecodingError{"invalid string length"}
1010
var errVarintOverflow = PacketDecodingError{"varint overflow"}
1111
var errInvalidBool = PacketDecodingError{"invalid bool"}
12+
var errInvalidByteSliceLength = PacketDecodingError{"invalid byteslice length"}
1213

1314
type realDecoder struct {
1415
raw []byte
@@ -100,6 +101,31 @@ func (rd *realDecoder) getBool() (bool, error) {
100101

101102
// collections
102103

104+
func (rd *realDecoder) getBytes() ([]byte, error) {
105+
tmp, err := rd.getInt32()
106+
if err != nil {
107+
return nil, err
108+
}
109+
if tmp == -1 {
110+
return nil, nil
111+
}
112+
113+
return rd.getRawBytes(int(tmp))
114+
}
115+
116+
func (rd *realDecoder) getRawBytes(length int) ([]byte, error) {
117+
if length < 0 {
118+
return nil, errInvalidByteSliceLength
119+
} else if length > rd.remaining() {
120+
rd.off = len(rd.raw)
121+
return nil, ErrInsufficientData
122+
}
123+
124+
start := rd.off
125+
rd.off += length
126+
return rd.raw[start:rd.off], nil
127+
}
128+
103129
func (rd *realDecoder) getStringLength() (int, error) {
104130
length, err := rd.getInt16()
105131
if err != nil {

proxy/protocol/real_encoder.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,21 @@ func (re *realEncoder) putBool(in bool) {
5050

5151
// collection
5252

53+
func (re *realEncoder) putRawBytes(in []byte) error {
54+
copy(re.raw[re.off:], in)
55+
re.off += len(in)
56+
return nil
57+
}
58+
59+
func (re *realEncoder) putBytes(in []byte) error {
60+
if in == nil {
61+
re.putInt32(-1)
62+
return nil
63+
}
64+
re.putInt32(int32(len(in)))
65+
return re.putRawBytes(in)
66+
}
67+
5368
func (re *realEncoder) putString(in string) error {
5469
re.putInt16(int16(len(in)))
5570
copy(re.raw[re.off:], in)
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package protocol
2+
3+
type SaslAuthenticateRequestV0 struct {
4+
SaslAuthBytes []byte
5+
}
6+
7+
func (r *SaslAuthenticateRequestV0) encode(pe packetEncoder) error {
8+
if err := pe.putBytes(r.SaslAuthBytes); err != nil {
9+
return err
10+
}
11+
return nil
12+
}
13+
func (r *SaslAuthenticateRequestV0) decode(pd packetDecoder) (err error) {
14+
if r.SaslAuthBytes, err = pd.getBytes(); err != nil {
15+
return err
16+
}
17+
18+
return nil
19+
}
20+
21+
func (r *SaslAuthenticateRequestV0) key() int16 {
22+
return 36
23+
}
24+
25+
func (r *SaslAuthenticateRequestV0) version() int16 {
26+
return 0
27+
}
28+
29+
type SaslAuthenticateResponseV0 struct {
30+
Err KError
31+
ErrMsg *string
32+
SaslAuthBytes []byte
33+
}
34+
35+
func (r *SaslAuthenticateResponseV0) encode(pe packetEncoder) error {
36+
pe.putInt16(int16(r.Err))
37+
38+
if err := pe.putNullableString(r.ErrMsg); err != nil {
39+
return err
40+
}
41+
return pe.putBytes(r.SaslAuthBytes)
42+
}
43+
44+
func (r *SaslAuthenticateResponseV0) decode(pd packetDecoder) error {
45+
kerr, err := pd.getInt16()
46+
if err != nil {
47+
return err
48+
}
49+
r.Err = KError(kerr)
50+
51+
if r.ErrMsg, err = pd.getNullableString(); err != nil {
52+
return err
53+
}
54+
if r.SaslAuthBytes, err = pd.getBytes(); err != nil {
55+
return err
56+
}
57+
return nil
58+
}

proxy/protocol/sasl_handshake.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,52 @@
11
package protocol
22

3-
type SaslHandshakeRequestV0 struct {
3+
import "github.com/pkg/errors"
4+
5+
type SaslHandshakeRequestV0orV1 struct {
6+
Version int16 // not encoded / decoded
47
Mechanism string
58
}
69

7-
func (r *SaslHandshakeRequestV0) encode(pe packetEncoder) error {
10+
func (r *SaslHandshakeRequestV0orV1) encode(pe packetEncoder) error {
11+
if r.Version != 0 && r.Version != 1 {
12+
return errors.New("SaslHandshakeRequestV0orV1 expects version 0 or 1")
13+
}
14+
815
if err := pe.putString(r.Mechanism); err != nil {
916
return err
1017
}
1118
return nil
1219
}
13-
func (r *SaslHandshakeRequestV0) decode(pd packetDecoder) (err error) {
20+
func (r *SaslHandshakeRequestV0orV1) decode(pd packetDecoder) (err error) {
21+
if r.Version != 0 && r.Version != 1 {
22+
return errors.New("SaslHandshakeRequestV0orV1 expects version 0 or 1")
23+
}
1424
if r.Mechanism, err = pd.getString(); err != nil {
1525
return err
1626
}
1727

1828
return nil
1929
}
2030

21-
func (r *SaslHandshakeRequestV0) key() int16 {
31+
func (r *SaslHandshakeRequestV0orV1) key() int16 {
2232
return 17
2333
}
2434

25-
func (r *SaslHandshakeRequestV0) version() int16 {
26-
return 0
35+
func (r *SaslHandshakeRequestV0orV1) version() int16 {
36+
return r.Version
2737
}
2838

29-
type SaslHandshakeResponseV0 struct {
39+
type SaslHandshakeResponseV0orV1 struct {
3040
Err KError
3141
EnabledMechanisms []string
3242
}
3343

34-
func (r *SaslHandshakeResponseV0) encode(pe packetEncoder) error {
44+
func (r *SaslHandshakeResponseV0orV1) encode(pe packetEncoder) error {
3545
pe.putInt16(int16(r.Err))
3646
return pe.putStringArray(r.EnabledMechanisms)
3747
}
3848

39-
func (r *SaslHandshakeResponseV0) decode(pd packetDecoder) error {
49+
func (r *SaslHandshakeResponseV0orV1) decode(pd packetDecoder) error {
4050
kerr, err := pd.getInt16()
4151
if err != nil {
4252
return err

proxy/sasl.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (b *SASLPlainAuth) sendAndReceiveSASLPlainHandshake(conn DeadlineReaderWrit
8181

8282
req := &protocol.Request{
8383
ClientID: b.clientID,
84-
Body: &protocol.SaslHandshakeRequestV0{Mechanism: SASLPlain},
84+
Body: &protocol.SaslHandshakeRequestV0orV1{Version: 0, Mechanism: SASLPlain},
8585
}
8686
reqBuf, err := protocol.Encode(req)
8787
if err != nil {
@@ -117,7 +117,7 @@ func (b *SASLPlainAuth) sendAndReceiveSASLPlainHandshake(conn DeadlineReaderWrit
117117
if err != nil {
118118
return errors.Wrap(err, "Failed to read SASL handshake payload")
119119
}
120-
res := &protocol.SaslHandshakeResponseV0{}
120+
res := &protocol.SaslHandshakeResponseV0orV1{}
121121
err = protocol.Decode(payload, res)
122122
if err != nil {
123123
return errors.Wrap(err, "Failed to parse SASL handshake")

0 commit comments

Comments
 (0)