Skip to content

Commit 29fbf46

Browse files
Add sliceHeader for zero-copy parsing of message headers, use for client info (#6453)
In the various places that we are handling client info from headers, like when queuing and pulling from the JS API queue or due to the service export, we could safely refer to the underlying header slice, instead of making a copy as `getHeader()` does today. This PR adds a new `sliceHeader()` that merely slices the input header instead, reducing copies considerably in any system where there is heavy usage of JS API requests. Signed-off-by: Neil Twigg <[email protected]>
2 parents f7c32d0 + 7f8075d commit 29fbf46

File tree

3 files changed

+43
-10
lines changed

3 files changed

+43
-10
lines changed

server/client.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4248,9 +4248,20 @@ func (c *client) setHeader(key, value string, msg []byte) []byte {
42484248
return bb.Bytes()
42494249
}
42504250

4251-
// Will return the value for the header denoted by key or nil if it does not exists.
4252-
// This function ignores errors and tries to achieve speed and no additional allocations.
4251+
// Will return a copy of the value for the header denoted by key or nil if it does not exist.
4252+
// If you know that it is safe to refer to the underlying hdr slice for the period that the
4253+
// return value is used, then sliceHeader() will be faster.
42534254
func getHeader(key string, hdr []byte) []byte {
4255+
v := sliceHeader(key, hdr)
4256+
if v == nil {
4257+
return nil
4258+
}
4259+
return append(make([]byte, 0, len(v)), v...)
4260+
}
4261+
4262+
// Will return the sliced value for the header denoted by key or nil if it does not exists.
4263+
// This function ignores errors and tries to achieve speed and no additional allocations.
4264+
func sliceHeader(key string, hdr []byte) []byte {
42544265
if len(hdr) == 0 {
42554266
return nil
42564267
}
@@ -4275,15 +4286,14 @@ func getHeader(key string, hdr []byte) []byte {
42754286
index++
42764287
}
42774288
// Collect together the rest of the value until we hit a CRLF.
4278-
var value []byte
4289+
start := index
42794290
for index < hdrLen {
42804291
if hdr[index] == '\r' && index < hdrLen-1 && hdr[index+1] == '\n' {
42814292
break
42824293
}
4283-
value = append(value, hdr[index])
42844294
index++
42854295
}
4286-
return value
4296+
return hdr[start:index:index]
42874297
}
42884298

42894299
// For bytes.HasPrefix below.
@@ -4400,7 +4410,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
44004410
var ci *ClientInfo
44014411
if hadPrevSi && c.pa.hdr >= 0 {
44024412
var cis ClientInfo
4403-
if err := json.Unmarshal(getHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil {
4413+
if err := json.Unmarshal(sliceHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil {
44044414
ci = &cis
44054415
ci.Service = acc.Name
44064416
// Check if we are moving into a share details account from a non-shared
@@ -4409,7 +4419,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
44094419
c.addServerAndClusterInfo(ci)
44104420
}
44114421
}
4412-
} else if c.kind != LEAF || c.pa.hdr < 0 || len(getHeader(ClientInfoHdr, msg[:c.pa.hdr])) == 0 {
4422+
} else if c.kind != LEAF || c.pa.hdr < 0 || len(sliceHeader(ClientInfoHdr, msg[:c.pa.hdr])) == 0 {
44134423
ci = c.getClientInfo(share)
44144424
// If we did not share but the imports destination is the system account add in the server and cluster info.
44154425
if !share && isSysImport {
@@ -5076,7 +5086,7 @@ func (c *client) checkLeafClientInfoHeader(msg []byte) (dmsg []byte, setHdr bool
50765086
if c.pa.hdr < 0 || len(msg) < c.pa.hdr {
50775087
return msg, false
50785088
}
5079-
cir := getHeader(ClientInfoHdr, msg[:c.pa.hdr])
5089+
cir := sliceHeader(ClientInfoHdr, msg[:c.pa.hdr])
50805090
if len(cir) == 0 {
50815091
return msg, false
50825092
}

server/client_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2963,6 +2963,29 @@ func TestRemoveHeaderIfPrefixPresent(t *testing.T) {
29632963
}
29642964
}
29652965

2966+
func TestSliceHeader(t *testing.T) {
2967+
hdr := []byte("NATS/1.0\r\n\r\n")
2968+
2969+
hdr = genHeader(hdr, "a", "1")
2970+
hdr = genHeader(hdr, JSExpectedStream, "my-stream")
2971+
hdr = genHeader(hdr, JSExpectedLastSeq, "22")
2972+
hdr = genHeader(hdr, "b", "2")
2973+
hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24")
2974+
hdr = genHeader(hdr, JSExpectedLastMsgId, "1")
2975+
hdr = genHeader(hdr, "c", "3")
2976+
2977+
sliced := sliceHeader(JSExpectedLastSubjSeq, hdr)
2978+
copied := getHeader(JSExpectedLastSubjSeq, hdr)
2979+
2980+
require_NotNil(t, sliced)
2981+
require_Equal(t, cap(sliced), 2)
2982+
2983+
require_NotNil(t, copied)
2984+
require_Equal(t, cap(copied), len(copied))
2985+
2986+
require_True(t, bytes.Equal(sliced, copied))
2987+
}
2988+
29662989
func TestInProcessAllowedConnectionType(t *testing.T) {
29672990
tmpl := `
29682991
listen: "127.0.0.1:-1"

server/jetstream_api.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -823,7 +823,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
823823
s, rr := js.srv, js.apiSubs.Match(subject)
824824

825825
hdr, msg := c.msgParts(rmsg)
826-
if len(getHeader(ClientInfoHdr, hdr)) == 0 {
826+
if len(sliceHeader(ClientInfoHdr, hdr)) == 0 {
827827
// Check if this is the system account. We will let these through for the account info only.
828828
sacc := s.SystemAccount()
829829
if sacc != acc {
@@ -1166,7 +1166,7 @@ func (s *Server) getRequestInfo(c *client, raw []byte) (pci *ClientInfo, acc *Ac
11661166
var ci ClientInfo
11671167

11681168
if len(hdr) > 0 {
1169-
if err := json.Unmarshal(getHeader(ClientInfoHdr, hdr), &ci); err != nil {
1169+
if err := json.Unmarshal(sliceHeader(ClientInfoHdr, hdr), &ci); err != nil {
11701170
return nil, nil, nil, nil, err
11711171
}
11721172
}

0 commit comments

Comments
 (0)