Skip to content

GODRIVER-3587 Use raw bytes in valueReader #2120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 42 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
2ce17b5
Add valueReaderByteSrc interface
prestonvasquez Jul 11, 2025
2adb7cc
Add bufferedValueReader valueReaderByteSrc implementation
prestonvasquez Jul 11, 2025
1a436da
Add streamingValueReader valueReaderBytSrc implementation
prestonvasquez Jul 11, 2025
5e1fad9
Rename newDocumentReader to newBufferedDocumentReader
prestonvasquez Jul 11, 2025
8f960b0
Reorganize newBufferedDocumentReader to use bufferedValueReader
prestonvasquez Jul 11, 2025
36d3c18
Reorganize NewDocumentReader to use streamingValueReader
prestonvasquez Jul 11, 2025
031bf94
Update (*valueReader).pop() to support bVR + streaming
prestonvasquez Jul 11, 2025
54d08c4
Update (*valueReader).readValueBytes() to support bVR + streaming
prestonvasquez Jul 11, 2025
26765bd
Update (*valueReader).Skip() to support bVR + streaming
prestonvasquez Jul 11, 2025
8167d47
Update (*valueReader).ReadArray() to support bVR + streaming
prestonvasquez Jul 11, 2025
02d38df
Update (*valueReader).ReadBinary() to support bVR + streaming
prestonvasquez Jul 11, 2025
50a265a
Add comment to (*valueReader).ReadBoolean()
prestonvasquez Jul 11, 2025
4bf9f61
Update (*valueReader).ReadDocument() to support bVR + streaming
prestonvasquez Jul 11, 2025
828593c
Update (*valueReader).ReadCodeWithScope() to support bVR + streaming
prestonvasquez Jul 11, 2025
96b5eb8
Update (*valueReader).ReadDBPointer() to support bVR + streaming
prestonvasquez Jul 11, 2025
0861119
Update (*valueReader).ReadDateTime() to support bVR + streaming
prestonvasquez Jul 11, 2025
443fb81
Update (*valueReader).ReadDecimal128() to support bVR + streaming
prestonvasquez Jul 11, 2025
942e52e
Add comment to (*valueReader).ReadDouble()
prestonvasquez Jul 11, 2025
f934c70
Update (*valueReader).ReadInt32() to support bVR + streaming
prestonvasquez Jul 11, 2025
c3ba283
Update (*valueReader).ReadInt64() to support bVR + streaming
prestonvasquez Jul 11, 2025
1a3d33d
Update (*valueReader).ReadJavascript() to support bVR + streaming
prestonvasquez Jul 11, 2025
b7d0042
Update (*valueReader).ReadObjectID() to support bVR + streaming
prestonvasquez Jul 11, 2025
0cbacc1
Add comment to (*valueReader).Read(MinKey|MaxKey|Null)()
prestonvasquez Jul 11, 2025
7d16cf8
Add comment to (*valueReader).ReadRegex()
prestonvasquez Jul 11, 2025
a497774
Update (*valueReader).ReadString() to support bVR + streaming
prestonvasquez Jul 11, 2025
dfe5502
Update (*valueReader).ReadSymbol() to support bVR + streaming
prestonvasquez Jul 11, 2025
062adeb
Add comment to (*valueReader).ReadTimestamp()
prestonvasquez Jul 11, 2025
83a47d5
Add comment to (*valueReader).ReadUndefined()
prestonvasquez Jul 11, 2025
5b570de
Update (*valueReader).ReadTimestamp() to support bVR + streaming
prestonvasquez Jul 11, 2025
403445f
Update (*valueReader).ReadElement() to support bVR + streaming
prestonvasquez Jul 11, 2025
dd709d1
Update (*valueReader).ReadValue() to support bVR + streaming
prestonvasquez Jul 11, 2025
dea88a6
Update (*valueReader).readValue() to support bVR + streaming
prestonvasquez Jul 11, 2025
2e8be2a
Update (*valueReader).readCString() to support bVR + streaming
prestonvasquez Jul 11, 2025
0036af6
Update (*valueReader).readString() to support bVR + streaming
prestonvasquez Jul 11, 2025
f62a9de
Update (*valueReader).peekLength() to support bVR + streaming
prestonvasquez Jul 11, 2025
0ba5775
Update (*valueReader).readLength() to support bVR + streaming
prestonvasquez Jul 11, 2025
296ff25
Update (*valueReader).read(i32|u32|i64|u64)() to support bVR + streaming
prestonvasquez Jul 11, 2025
5a12f54
Remove read and appendBytes
prestonvasquez Jul 11, 2025
ef6b024
Update newValueReader to use bVR
prestonvasquez Jul 11, 2025
620eb5a
Update tests to support bRV + streaming
prestonvasquez Jul 11, 2025
7e44186
Update bson/value_reader.go
prestonvasquez Jul 14, 2025
383f64d
Extend valueReader tests for both streaming and buffered
prestonvasquez Jul 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions bson/buffered_value_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (C) MongoDB, Inc. 2025-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package bson

import (
"bytes"
"io"
)

// bufferedValueReader implements the low-level byteSrc interface by reading
// directly from an in-memory byte slice. It provides efficient, zero-copy
// access for parsing BSON when the entire document is buffered in memory.
type bufferedValueReader struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: This isn't a ValueReader, so the name is a bit confusing. Consider a name like bufferedByteSrc.

buf []byte // entire BSON document
offset int64 // Current read index into buf
}

var _ valueReaderByteSrc = (*bufferedValueReader)(nil)

// Read reads up to len(p) bytes from the in-memory buffer, advancing the offset
// by the number of bytes read.
func (b *bufferedValueReader) readExact(p []byte) (int, error) {
if b.offset >= int64(len(b.buf)) {
return 0, io.EOF
}
n := copy(p, b.buf[b.offset:])
b.offset += int64(n)
return n, nil
}

// ReadByte returns the single byte at buf[offset] and advances offset by 1.
func (b *bufferedValueReader) ReadByte() (byte, error) {
if b.offset >= int64(len(b.buf)) {
return 0, io.EOF
}
Comment on lines +37 to +39
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are your thoughts about returning the remaining buf as well as the EOF for ReadByte(), peek() and discard() when there are fewer bytes than required. That's also how bufio acts.

b.offset++
return b.buf[b.offset-1], nil
}

// peek returns buf[offset:offset+n] without advancing offset.
func (b *bufferedValueReader) peek(n int) ([]byte, error) {
// Ensure we don't read past the end of the buffer.
if int64(n)+b.offset > int64(len(b.buf)) {
return b.buf[b.offset:], io.EOF
}

// Return the next n bytes without advancing the offset
return b.buf[b.offset : b.offset+int64(n)], nil
}

// discard advances offset by n bytes, returning the number of bytes discarded.
func (b *bufferedValueReader) discard(n int) (int, error) {
// Ensure we don't read past the end of the buffer.
if int64(n)+b.offset > int64(len(b.buf)) {
// If we have exceeded the buffer length, discard only up to the end.
left := len(b.buf) - int(b.offset)
b.offset = int64(len(b.buf))

return left, io.EOF
}

// Advance the read position
b.offset += int64(n)
return n, nil
}

// readSlice scans buf[offset:] for the first occurrence of delim, returns
// buf[offset:idx+1], and advances offset past it; errors if delim not found.
func (b *bufferedValueReader) readSlice(delim byte) ([]byte, error) {
// Ensure we don't read past the end of the buffer.
if b.offset >= int64(len(b.buf)) {
return nil, io.EOF
}

// Look for the delimiter in the remaining bytes
rem := b.buf[b.offset:]
idx := bytes.IndexByte(rem, delim)
if idx < 0 {
return nil, io.EOF
}

// Build the result slice up through the delimiter.
result := rem[:idx+1]

// Advance the offset past the delimiter.
b.offset += int64(idx + 1)

return result, nil
}

// pos returns the current read position in the buffer.
func (b *bufferedValueReader) pos() int64 {
return b.offset
}

// regexLength will return the total byte length of a BSON regex value.
func (b *bufferedValueReader) regexLength() (int32, error) {
rem := b.buf[b.offset:]

// Find end of the first C-string (pattern).
i := bytes.IndexByte(rem, 0x00)
if i < 0 {
return 0, io.EOF
}

// Find end of second C-string (options).
j := bytes.IndexByte(rem[i+1:], 0x00)
if j < 0 {
return 0, io.EOF
}

// Total length = first C-string length (pattern) + second C-string length
// (options) + 2 null terminators
return int32(i + j + 2), nil
}

func (*bufferedValueReader) streamable() bool {
return false
}

func (b *bufferedValueReader) reset() {
b.buf = nil
b.offset = 0
}
153 changes: 153 additions & 0 deletions bson/buffered_value_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright (C) MongoDB, Inc. 2025-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package bson

import (
"bytes"
"io"
"testing"

"go.mongodb.org/mongo-driver/v2/internal/assert"
"go.mongodb.org/mongo-driver/v2/internal/require"
)

func TestBufferedvalueReader_discard(t *testing.T) {
tests := []struct {
name string
buf []byte
n int
want int
wantOffset int64
wantErr error
}{
{
name: "nothing",
buf: bytes.Repeat([]byte("a"), 1024),
n: 0,
want: 0,
wantOffset: 0,
wantErr: nil,
},
{
name: "amount less than buffer size",
buf: bytes.Repeat([]byte("a"), 1024),
n: 100,
want: 100,
wantOffset: 100,
wantErr: nil,
},
{
name: "amount greater than buffer size",
buf: bytes.Repeat([]byte("a"), 1024),
n: 10000,
want: 1024,
wantOffset: 1024,
wantErr: io.EOF,
},
{
name: "exact buffer size",
buf: bytes.Repeat([]byte("a"), 1024),
n: 1024,
want: 1024,
wantOffset: 1024,
wantErr: nil,
},
{
name: "from empty buffer",
buf: []byte{},
n: 10,
want: 0,
wantOffset: 0,
wantErr: io.EOF,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
reader := &bufferedValueReader{buf: tt.buf, offset: 0}
n, err := reader.discard(tt.n)
if tt.wantErr != nil {
assert.ErrorIs(t, err, tt.wantErr, "Expected error %v, got %v", tt.wantErr, err)
} else {
require.NoError(t, err, "Expected no error when discarding %d bytes", tt.n)
}

assert.Equal(t, tt.want, n, "Expected to discard %d bytes, got %d", tt.want, n)
assert.Equal(t, tt.wantOffset, reader.offset, "Expected offset to be %d, got %d", tt.wantOffset, reader.offset)
})
}
}

func TestBufferedvalueReader_peek(t *testing.T) {
tests := []struct {
name string
buf []byte
n int
offset int64
want []byte
wantErr error
}{
{
name: "nothing",
buf: bytes.Repeat([]byte("a"), 1024),
n: 0,
want: []byte{},
wantErr: nil,
},
{
name: "amount less than buffer size",
buf: bytes.Repeat([]byte("a"), 1024),
n: 100,
want: bytes.Repeat([]byte("a"), 100),
wantErr: nil,
},
{
name: "amount greater than buffer size",
buf: bytes.Repeat([]byte("a"), 1024),
n: 10000,
want: bytes.Repeat([]byte("a"), 1024),
wantErr: io.EOF,
},
{
name: "exact buffer size",
buf: bytes.Repeat([]byte("a"), 1024),
n: 1024,
want: bytes.Repeat([]byte("a"), 1024),
wantErr: nil,
},
{
name: "from empty buffer",
buf: []byte{},
n: 10,
want: []byte{},
wantErr: io.EOF,
},
{
name: "peek with offset",
buf: append(bytes.Repeat([]byte("a"), 100), bytes.Repeat([]byte("b"), 100)...),
offset: 100,
n: 100,
want: bytes.Repeat([]byte("b"), 100),
wantErr: nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
reader := &bufferedValueReader{buf: tt.buf, offset: tt.offset}
n, err := reader.peek(tt.n)
if tt.wantErr != nil {
assert.ErrorIs(t, err, tt.wantErr, "Expected error %v, got %v", tt.wantErr, err)
} else {
require.NoError(t, err, "Expected no error when peeking %d bytes", tt.n)
}

assert.Equal(t, tt.want, n, "Expected to peek %d bytes, got %d", len(tt.want), len(n))
assert.Equal(t, tt.offset, reader.offset, "Expected offset to be %d, got %d", tt.offset, reader.offset)
})
}
}
8 changes: 5 additions & 3 deletions bson/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package bson

import (
"bytes"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -180,8 +179,11 @@ func copyValueFromBytes(dst ValueWriter, t Type, src []byte) error {
return wvb.writeValueBytes(t, src)
}

vr := newDocumentReader(bytes.NewReader(src))
vr.pushElement(t)
vr := newBufferedDocumentReader(src)
vr.advanceFrame()

vr.stack[vr.frame].mode = mElement
vr.stack[vr.frame].vType = t

return copyValue(dst, vr)
}
Expand Down
8 changes: 4 additions & 4 deletions bson/copier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestCopier(t *testing.T) {
doc = bsoncore.AppendStringElement(doc, "Hello", "world")
doc, err := bsoncore.AppendDocumentEnd(doc, idx)
noerr(t, err)
src := newDocumentReader(bytes.NewReader(doc))
src := newBufferedDocumentReader(doc)
dst := newValueWriterFromSlice(make([]byte, 0))
want := doc
err = copyDocument(dst, src)
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestCopier(t *testing.T) {
noerr(t, err)
doc, err = bsoncore.AppendDocumentEnd(doc, idx)
noerr(t, err)
src := newDocumentReader(bytes.NewReader(doc))
src := newBufferedDocumentReader(doc)

_, err = src.ReadDocument()
noerr(t, err)
Expand Down Expand Up @@ -450,7 +450,7 @@ func TestCopier(t *testing.T) {
idx,
)
noerr(t, err)
vr := newDocumentReader(bytes.NewReader(b))
vr := newBufferedDocumentReader(b)
_, err = vr.ReadDocument()
noerr(t, err)
_, _, err = vr.ReadElement()
Expand Down Expand Up @@ -489,7 +489,7 @@ func TestCopier(t *testing.T) {
idx,
)
noerr(t, err)
vr := newDocumentReader(bytes.NewReader(b))
vr := newBufferedDocumentReader(b)
_, err = vr.ReadDocument()
noerr(t, err)
_, _, err = vr.ReadElement()
Expand Down
12 changes: 6 additions & 6 deletions bson/mgoregistry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func (t *prefixPtr) SetBSON(raw RawValue) error {
if err != nil {
return err
}
vr := newValueReader(raw.Type, bytes.NewReader(raw.Value))
vr := newBufferedValueReader(raw.Type, raw.Value)
err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval)
if err != nil {
return err
Expand All @@ -512,7 +512,7 @@ func (t *prefixVal) SetBSON(raw RawValue) error {
if err != nil {
return err
}
vr := newValueReader(raw.Type, bytes.NewReader(raw.Value))
vr := newBufferedValueReader(raw.Type, raw.Value)
err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval)
if err != nil {
return err
Expand Down Expand Up @@ -936,7 +936,7 @@ func (o *setterType) SetBSON(raw RawValue) error {
if raw.Type == 0x00 {
raw.Type = TypeEmbeddedDocument
}
vr := newValueReader(raw.Type, bytes.NewReader(raw.Value))
vr := newBufferedValueReader(raw.Type, raw.Value)
err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval)
if err != nil {
return err
Expand Down Expand Up @@ -1289,7 +1289,7 @@ func (s *getterSetterD) SetBSON(raw RawValue) error {
if raw.Type == 0x00 {
raw.Type = TypeEmbeddedDocument
}
vr := newValueReader(raw.Type, bytes.NewReader(raw.Value))
vr := newBufferedValueReader(raw.Type, raw.Value)
err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval)
if err != nil {
return err
Expand All @@ -1315,7 +1315,7 @@ func (i *getterSetterInt) SetBSON(raw RawValue) error {
if raw.Type == 0x00 {
raw.Type = TypeEmbeddedDocument
}
vr := newValueReader(raw.Type, bytes.NewReader(raw.Value))
vr := newBufferedValueReader(raw.Type, raw.Value)
err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval)
if err != nil {
return err
Expand All @@ -1337,7 +1337,7 @@ func (s *ifaceSlice) SetBSON(raw RawValue) error {
if err != nil {
return err
}
vr := newValueReader(raw.Type, bytes.NewReader(raw.Value))
vr := newBufferedValueReader(raw.Type, raw.Value)
err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval)
if err != nil {
return err
Expand Down
Loading
Loading