Skip to content

Commit d3c5ace

Browse files
GODRIVER-3587 Use raw bytes in valueReader (#2120)
Co-authored-by: Matt Dale <[email protected]>
1 parent 1950917 commit d3c5ace

13 files changed

+2881
-1383
lines changed

bson/buffered_byte_src.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright (C) MongoDB, Inc. 2025-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package bson
8+
9+
import (
10+
"bytes"
11+
"io"
12+
)
13+
14+
// bufferedByteSrc implements the low-level byteSrc interface by reading
15+
// directly from an in-memory byte slice. It provides efficient, zero-copy
16+
// access for parsing BSON when the entire document is buffered in memory.
17+
type bufferedByteSrc struct {
18+
buf []byte // entire BSON document
19+
offset int64 // Current read index into buf
20+
}
21+
22+
var _ byteSrc = (*bufferedByteSrc)(nil)
23+
24+
// Read reads up to len(p) bytes from the in-memory buffer, advancing the offset
25+
// by the number of bytes read.
26+
func (b *bufferedByteSrc) readExact(p []byte) (int, error) {
27+
if b.offset >= int64(len(b.buf)) {
28+
return 0, io.EOF
29+
}
30+
n := copy(p, b.buf[b.offset:])
31+
b.offset += int64(n)
32+
return n, nil
33+
}
34+
35+
// ReadByte returns the single byte at buf[offset] and advances offset by 1.
36+
func (b *bufferedByteSrc) ReadByte() (byte, error) {
37+
if b.offset >= int64(len(b.buf)) {
38+
return 0, io.EOF
39+
}
40+
b.offset++
41+
return b.buf[b.offset-1], nil
42+
}
43+
44+
// peek returns buf[offset:offset+n] without advancing offset.
45+
func (b *bufferedByteSrc) peek(n int) ([]byte, error) {
46+
// Ensure we don't read past the end of the buffer.
47+
if int64(n)+b.offset > int64(len(b.buf)) {
48+
return b.buf[b.offset:], io.EOF
49+
}
50+
51+
// Return the next n bytes without advancing the offset
52+
return b.buf[b.offset : b.offset+int64(n)], nil
53+
}
54+
55+
// discard advances offset by n bytes, returning the number of bytes discarded.
56+
func (b *bufferedByteSrc) discard(n int) (int, error) {
57+
// Ensure we don't read past the end of the buffer.
58+
if int64(n)+b.offset > int64(len(b.buf)) {
59+
// If we have exceeded the buffer length, discard only up to the end.
60+
left := len(b.buf) - int(b.offset)
61+
b.offset = int64(len(b.buf))
62+
63+
return left, io.EOF
64+
}
65+
66+
// Advance the read position
67+
b.offset += int64(n)
68+
return n, nil
69+
}
70+
71+
// readSlice scans buf[offset:] for the first occurrence of delim, returns
72+
// buf[offset:idx+1], and advances offset past it; errors if delim not found.
73+
func (b *bufferedByteSrc) readSlice(delim byte) ([]byte, error) {
74+
// Ensure we don't read past the end of the buffer.
75+
if b.offset >= int64(len(b.buf)) {
76+
return nil, io.EOF
77+
}
78+
79+
// Look for the delimiter in the remaining bytes
80+
rem := b.buf[b.offset:]
81+
idx := bytes.IndexByte(rem, delim)
82+
if idx < 0 {
83+
return nil, io.EOF
84+
}
85+
86+
// Build the result slice up through the delimiter.
87+
result := rem[:idx+1]
88+
89+
// Advance the offset past the delimiter.
90+
b.offset += int64(idx + 1)
91+
92+
return result, nil
93+
}
94+
95+
// pos returns the current read position in the buffer.
96+
func (b *bufferedByteSrc) pos() int64 {
97+
return b.offset
98+
}
99+
100+
// regexLength will return the total byte length of a BSON regex value.
101+
func (b *bufferedByteSrc) regexLength() (int32, error) {
102+
rem := b.buf[b.offset:]
103+
104+
// Find end of the first C-string (pattern).
105+
i := bytes.IndexByte(rem, 0x00)
106+
if i < 0 {
107+
return 0, io.EOF
108+
}
109+
110+
// Find end of second C-string (options).
111+
j := bytes.IndexByte(rem[i+1:], 0x00)
112+
if j < 0 {
113+
return 0, io.EOF
114+
}
115+
116+
// Total length = first C-string length (pattern) + second C-string length
117+
// (options) + 2 null terminators
118+
return int32(i + j + 2), nil
119+
}
120+
121+
func (*bufferedByteSrc) streamable() bool {
122+
return false
123+
}
124+
125+
func (b *bufferedByteSrc) reset() {
126+
b.buf = nil
127+
b.offset = 0
128+
}

bson/buffered_byte_src_test.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Copyright (C) MongoDB, Inc. 2025-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package bson
8+
9+
import (
10+
"bytes"
11+
"io"
12+
"testing"
13+
14+
"go.mongodb.org/mongo-driver/v2/internal/assert"
15+
"go.mongodb.org/mongo-driver/v2/internal/require"
16+
)
17+
18+
func TestBufferedvalueReader_discard(t *testing.T) {
19+
tests := []struct {
20+
name string
21+
buf []byte
22+
n int
23+
want int
24+
wantOffset int64
25+
wantErr error
26+
}{
27+
{
28+
name: "nothing",
29+
buf: bytes.Repeat([]byte("a"), 1024),
30+
n: 0,
31+
want: 0,
32+
wantOffset: 0,
33+
wantErr: nil,
34+
},
35+
{
36+
name: "amount less than buffer size",
37+
buf: bytes.Repeat([]byte("a"), 1024),
38+
n: 100,
39+
want: 100,
40+
wantOffset: 100,
41+
wantErr: nil,
42+
},
43+
{
44+
name: "amount greater than buffer size",
45+
buf: bytes.Repeat([]byte("a"), 1024),
46+
n: 10000,
47+
want: 1024,
48+
wantOffset: 1024,
49+
wantErr: io.EOF,
50+
},
51+
{
52+
name: "exact buffer size",
53+
buf: bytes.Repeat([]byte("a"), 1024),
54+
n: 1024,
55+
want: 1024,
56+
wantOffset: 1024,
57+
wantErr: nil,
58+
},
59+
{
60+
name: "from empty buffer",
61+
buf: []byte{},
62+
n: 10,
63+
want: 0,
64+
wantOffset: 0,
65+
wantErr: io.EOF,
66+
},
67+
}
68+
69+
for _, tt := range tests {
70+
t.Run(tt.name, func(t *testing.T) {
71+
reader := &bufferedByteSrc{buf: tt.buf, offset: 0}
72+
n, err := reader.discard(tt.n)
73+
if tt.wantErr != nil {
74+
assert.ErrorIs(t, err, tt.wantErr, "Expected error %v, got %v", tt.wantErr, err)
75+
} else {
76+
require.NoError(t, err, "Expected no error when discarding %d bytes", tt.n)
77+
}
78+
79+
assert.Equal(t, tt.want, n, "Expected to discard %d bytes, got %d", tt.want, n)
80+
assert.Equal(t, tt.wantOffset, reader.offset, "Expected offset to be %d, got %d", tt.wantOffset, reader.offset)
81+
})
82+
}
83+
}
84+
85+
func TestBufferedvalueReader_peek(t *testing.T) {
86+
tests := []struct {
87+
name string
88+
buf []byte
89+
n int
90+
offset int64
91+
want []byte
92+
wantErr error
93+
}{
94+
{
95+
name: "nothing",
96+
buf: bytes.Repeat([]byte("a"), 1024),
97+
n: 0,
98+
want: []byte{},
99+
wantErr: nil,
100+
},
101+
{
102+
name: "amount less than buffer size",
103+
buf: bytes.Repeat([]byte("a"), 1024),
104+
n: 100,
105+
want: bytes.Repeat([]byte("a"), 100),
106+
wantErr: nil,
107+
},
108+
{
109+
name: "amount greater than buffer size",
110+
buf: bytes.Repeat([]byte("a"), 1024),
111+
n: 10000,
112+
want: bytes.Repeat([]byte("a"), 1024),
113+
wantErr: io.EOF,
114+
},
115+
{
116+
name: "exact buffer size",
117+
buf: bytes.Repeat([]byte("a"), 1024),
118+
n: 1024,
119+
want: bytes.Repeat([]byte("a"), 1024),
120+
wantErr: nil,
121+
},
122+
{
123+
name: "from empty buffer",
124+
buf: []byte{},
125+
n: 10,
126+
want: []byte{},
127+
wantErr: io.EOF,
128+
},
129+
{
130+
name: "peek with offset",
131+
buf: append(bytes.Repeat([]byte("a"), 100), bytes.Repeat([]byte("b"), 100)...),
132+
offset: 100,
133+
n: 100,
134+
want: bytes.Repeat([]byte("b"), 100),
135+
wantErr: nil,
136+
},
137+
}
138+
139+
for _, tt := range tests {
140+
t.Run(tt.name, func(t *testing.T) {
141+
reader := &bufferedByteSrc{buf: tt.buf, offset: tt.offset}
142+
n, err := reader.peek(tt.n)
143+
if tt.wantErr != nil {
144+
assert.ErrorIs(t, err, tt.wantErr, "Expected error %v, got %v", tt.wantErr, err)
145+
} else {
146+
require.NoError(t, err, "Expected no error when peeking %d bytes", tt.n)
147+
}
148+
149+
assert.Equal(t, tt.want, n, "Expected to peek %d bytes, got %d", len(tt.want), len(n))
150+
assert.Equal(t, tt.offset, reader.offset, "Expected offset to be %d, got %d", tt.offset, reader.offset)
151+
})
152+
}
153+
}

bson/copier.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
package bson
88

99
import (
10-
"bytes"
1110
"errors"
1211
"fmt"
1312
"io"
@@ -180,8 +179,11 @@ func copyValueFromBytes(dst ValueWriter, t Type, src []byte) error {
180179
return wvb.writeValueBytes(t, src)
181180
}
182181

183-
vr := newDocumentReader(bytes.NewReader(src))
184-
vr.pushElement(t)
182+
vr := newBufferedDocumentReader(src)
183+
vr.advanceFrame()
184+
185+
vr.stack[vr.frame].mode = mElement
186+
vr.stack[vr.frame].vType = t
185187

186188
return copyValue(dst, vr)
187189
}

bson/copier_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestCopier(t *testing.T) {
4040
doc = bsoncore.AppendStringElement(doc, "Hello", "world")
4141
doc, err := bsoncore.AppendDocumentEnd(doc, idx)
4242
noerr(t, err)
43-
src := newDocumentReader(bytes.NewReader(doc))
43+
src := newBufferedDocumentReader(doc)
4444
dst := newValueWriterFromSlice(make([]byte, 0))
4545
want := doc
4646
err = copyDocument(dst, src)
@@ -77,7 +77,7 @@ func TestCopier(t *testing.T) {
7777
noerr(t, err)
7878
doc, err = bsoncore.AppendDocumentEnd(doc, idx)
7979
noerr(t, err)
80-
src := newDocumentReader(bytes.NewReader(doc))
80+
src := newBufferedDocumentReader(doc)
8181

8282
_, err = src.ReadDocument()
8383
noerr(t, err)
@@ -450,7 +450,7 @@ func TestCopier(t *testing.T) {
450450
idx,
451451
)
452452
noerr(t, err)
453-
vr := newDocumentReader(bytes.NewReader(b))
453+
vr := newBufferedDocumentReader(b)
454454
_, err = vr.ReadDocument()
455455
noerr(t, err)
456456
_, _, err = vr.ReadElement()
@@ -489,7 +489,7 @@ func TestCopier(t *testing.T) {
489489
idx,
490490
)
491491
noerr(t, err)
492-
vr := newDocumentReader(bytes.NewReader(b))
492+
vr := newBufferedDocumentReader(b)
493493
_, err = vr.ReadDocument()
494494
noerr(t, err)
495495
_, _, err = vr.ReadElement()

bson/mgoregistry_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ func (t *prefixPtr) SetBSON(raw RawValue) error {
485485
if err != nil {
486486
return err
487487
}
488-
vr := newValueReader(raw.Type, bytes.NewReader(raw.Value))
488+
vr := newBufferedValueReader(raw.Type, raw.Value)
489489
err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval)
490490
if err != nil {
491491
return err
@@ -512,7 +512,7 @@ func (t *prefixVal) SetBSON(raw RawValue) error {
512512
if err != nil {
513513
return err
514514
}
515-
vr := newValueReader(raw.Type, bytes.NewReader(raw.Value))
515+
vr := newBufferedValueReader(raw.Type, raw.Value)
516516
err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval)
517517
if err != nil {
518518
return err
@@ -936,7 +936,7 @@ func (o *setterType) SetBSON(raw RawValue) error {
936936
if raw.Type == 0x00 {
937937
raw.Type = TypeEmbeddedDocument
938938
}
939-
vr := newValueReader(raw.Type, bytes.NewReader(raw.Value))
939+
vr := newBufferedValueReader(raw.Type, raw.Value)
940940
err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval)
941941
if err != nil {
942942
return err
@@ -1289,7 +1289,7 @@ func (s *getterSetterD) SetBSON(raw RawValue) error {
12891289
if raw.Type == 0x00 {
12901290
raw.Type = TypeEmbeddedDocument
12911291
}
1292-
vr := newValueReader(raw.Type, bytes.NewReader(raw.Value))
1292+
vr := newBufferedValueReader(raw.Type, raw.Value)
12931293
err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval)
12941294
if err != nil {
12951295
return err
@@ -1315,7 +1315,7 @@ func (i *getterSetterInt) SetBSON(raw RawValue) error {
13151315
if raw.Type == 0x00 {
13161316
raw.Type = TypeEmbeddedDocument
13171317
}
1318-
vr := newValueReader(raw.Type, bytes.NewReader(raw.Value))
1318+
vr := newBufferedValueReader(raw.Type, raw.Value)
13191319
err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval)
13201320
if err != nil {
13211321
return err
@@ -1337,7 +1337,7 @@ func (s *ifaceSlice) SetBSON(raw RawValue) error {
13371337
if err != nil {
13381338
return err
13391339
}
1340-
vr := newValueReader(raw.Type, bytes.NewReader(raw.Value))
1340+
vr := newBufferedValueReader(raw.Type, raw.Value)
13411341
err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval)
13421342
if err != nil {
13431343
return err

0 commit comments

Comments
 (0)