From 2ce17b5d7b39f5ea249d1e4aedfc975992a8bac9 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 11:26:39 -0600 Subject: [PATCH 01/42] Add valueReaderByteSrc interface --- bson/value_reader.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/bson/value_reader.go b/bson/value_reader.go index d713e669a9..edd6bff05c 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -16,6 +16,37 @@ import ( "sync" ) +type valueReaderByteSrc interface { + io.Reader + io.ByteReader + + // Peek returns the next n bytes without advancing the cursor. It must return + // exactly n bytes or n error if fewer are available. + peek(n int) ([]byte, error) + + // discard advanced the cursor by n bytes, returning the actual number + // discarded or an error if fewer were available. + discard(n int) (int, error) + + // readSlice reads until (and including) the first occurrence of delim, + // returning the entire slice [start...delimiter] and advancing the cursor. + // past it. Returns an error if delim is not found. + readSlice(delim byte) ([]byte, error) + + // pos returns the number of bytes consumed so far. + pos() int64 + + // regexLength returns the total byte length of a BSON regex value (two + // C-strings including their terminating NULs) in buffered mode. + regexLength() (int32, error) + + // streamable returns true if this source can be used in a streaming context. + streamable() bool + + // reset resets the source to its initial state. + reset() +} + var _ ValueReader = &valueReader{} // ErrEOA is the error returned when the end of a BSON array has been reached. From 2adb7cc24af5a00436247f1bec7ffbcd9e333cd4 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 11:27:31 -0600 Subject: [PATCH 02/42] Add bufferedValueReader valueReaderByteSrc implementation --- bson/buffered_value_reader.go | 127 ++++++++++++++++++++++++ bson/buffered_value_reader_test.go | 153 +++++++++++++++++++++++++++++ 2 files changed, 280 insertions(+) create mode 100644 bson/buffered_value_reader.go create mode 100644 bson/buffered_value_reader_test.go diff --git a/bson/buffered_value_reader.go b/bson/buffered_value_reader.go new file mode 100644 index 0000000000..d046bac8f5 --- /dev/null +++ b/bson/buffered_value_reader.go @@ -0,0 +1,127 @@ +// Copyright (C) MongoDB, Inc. 2025-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package bson + +import ( + "bytes" + "io" +) + +// bufferedValueReader implements the low-level byteSrc interface by reading +// directly from an in-memory byte slice. It provides efficient, zero-copy +// access for parsing BSON when the entire document is buffered in memory. +type bufferedValueReader struct { + buf []byte // entire BSON document + offset int64 // Current read index into buf +} + +var _ valueReaderByteSrc = (*bufferedValueReader)(nil) + +// Read reads up to len(p) bytes from the in-memory buffer, advancing the offset +// by the number of bytes read. +func (b *bufferedValueReader) Read(p []byte) (int, error) { + if b.offset >= int64(len(b.buf)) { + return 0, io.EOF + } + n := copy(p, b.buf[b.offset:]) + b.offset += int64(n) + return n, nil +} + +// ReadByte returns the single byte at buf[offset] and advances offset by 1. +func (b *bufferedValueReader) ReadByte() (byte, error) { + if b.offset >= int64(len(b.buf)) { + return 0, io.EOF + } + b.offset++ + return b.buf[b.offset-1], nil +} + +// peek returns buf[offset:offset+n] without advancing offset. +func (b *bufferedValueReader) peek(n int) ([]byte, error) { + // Ensure we don't read past the end of the buffer. + if int64(n)+b.offset > int64(len(b.buf)) { + return b.buf[b.offset:], io.EOF + } + + // Return the next n bytes without advancing the offset + return b.buf[b.offset : b.offset+int64(n)], nil +} + +// discard advances offset by n bytes, returning the number of bytes discarded. +func (b *bufferedValueReader) discard(n int) (int, error) { + // Ensure we don't read past the end of the buffer. + if int64(n)+b.offset > int64(len(b.buf)) { + // If we have exceeded the buffer length, discard only up to the end. + left := len(b.buf) - int(b.offset) + b.offset = int64(len(b.buf)) + + return left, io.EOF + } + + // Advance the read position + b.offset += int64(n) + return n, nil +} + +// readSlice scans buf[offset:] for the first occurrence of delim, returns +// buf[offset:idx+1], and advances offset past it; errors if delim not found. +func (b *bufferedValueReader) readSlice(delim byte) ([]byte, error) { + // Ensure we don't read past the end of the buffer. + if b.offset >= int64(len(b.buf)) { + return nil, io.EOF + } + + // Look for the delimiter in the remaining bytes + rem := b.buf[b.offset:] + idx := bytes.IndexByte(rem, delim) + if idx < 0 { + return nil, io.EOF + } + + // Build the result slice up through the delimiter. + result := rem[:idx+1] + + // Advance the offset past the delimiter. + b.offset += int64(idx + 1) + + return result, nil +} + +// pos returns the current read position in the buffer. +func (b *bufferedValueReader) pos() int64 { + return b.offset +} + +// regexLength will return the total byte length of a BSON regex value. +func (b *bufferedValueReader) regexLength() (int32, error) { + rem := b.buf[b.offset:] + + // Find end of the first C-string (pattern). + i := bytes.IndexByte(rem, 0x00) + if i < 0 { + return 0, io.EOF + } + + // Find end of second C-string (options). + j := bytes.IndexByte(rem[i+1:], 0x00) + if j < 0 { + return 0, io.EOF + } + + // Total length = first C-string length (pattern) + second C-string length + // (options) + 2 null terminators + return int32(i + j + 2), nil +} + +func (*bufferedValueReader) streamable() bool { + return false +} + +func (*bufferedValueReader) reset() { + // No resources to release for bufferedValueReader. +} diff --git a/bson/buffered_value_reader_test.go b/bson/buffered_value_reader_test.go new file mode 100644 index 0000000000..1c5d85bf84 --- /dev/null +++ b/bson/buffered_value_reader_test.go @@ -0,0 +1,153 @@ +// Copyright (C) MongoDB, Inc. 2025-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package bson + +import ( + "bytes" + "io" + "testing" + + "go.mongodb.org/mongo-driver/v2/internal/assert" + "go.mongodb.org/mongo-driver/v2/internal/require" +) + +func TestBufferedvalueReader_discard(t *testing.T) { + tests := []struct { + name string + buf []byte + n int + want int + wantOffset int64 + wantErr error + }{ + { + name: "nothing", + buf: bytes.Repeat([]byte("a"), 1024), + n: 0, + want: 0, + wantOffset: 0, + wantErr: nil, + }, + { + name: "amount less than buffer size", + buf: bytes.Repeat([]byte("a"), 1024), + n: 100, + want: 100, + wantOffset: 100, + wantErr: nil, + }, + { + name: "amount greater than buffer size", + buf: bytes.Repeat([]byte("a"), 1024), + n: 10000, + want: 1024, + wantOffset: 1024, + wantErr: io.EOF, + }, + { + name: "exact buffer size", + buf: bytes.Repeat([]byte("a"), 1024), + n: 1024, + want: 1024, + wantOffset: 1024, + wantErr: nil, + }, + { + name: "from empty buffer", + buf: []byte{}, + n: 10, + want: 0, + wantOffset: 0, + wantErr: io.EOF, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reader := &bufferedValueReader{buf: tt.buf, offset: 0} + n, err := reader.discard(tt.n) + if tt.wantErr != nil { + assert.ErrorIs(t, err, tt.wantErr, "Expected error %v, got %v", tt.wantErr, err) + } else { + require.NoError(t, err, "Expected no error when discarding %d bytes", tt.n) + } + + assert.Equal(t, tt.want, n, "Expected to discard %d bytes, got %d", tt.want, n) + assert.Equal(t, tt.wantOffset, reader.offset, "Expected offset to be %d, got %d", tt.wantOffset, reader.offset) + }) + } +} + +func TestBufferedvalueReader_peek(t *testing.T) { + tests := []struct { + name string + buf []byte + n int + offset int64 + want []byte + wantErr error + }{ + { + name: "nothing", + buf: bytes.Repeat([]byte("a"), 1024), + n: 0, + want: []byte{}, + wantErr: nil, + }, + { + name: "amount less than buffer size", + buf: bytes.Repeat([]byte("a"), 1024), + n: 100, + want: bytes.Repeat([]byte("a"), 100), + wantErr: nil, + }, + { + name: "amount greater than buffer size", + buf: bytes.Repeat([]byte("a"), 1024), + n: 10000, + want: bytes.Repeat([]byte("a"), 1024), + wantErr: io.EOF, + }, + { + name: "exact buffer size", + buf: bytes.Repeat([]byte("a"), 1024), + n: 1024, + want: bytes.Repeat([]byte("a"), 1024), + wantErr: nil, + }, + { + name: "from empty buffer", + buf: []byte{}, + n: 10, + want: []byte{}, + wantErr: io.EOF, + }, + { + name: "peek with offset", + buf: append(bytes.Repeat([]byte("a"), 100), bytes.Repeat([]byte("b"), 100)...), + offset: 100, + n: 100, + want: bytes.Repeat([]byte("b"), 100), + wantErr: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reader := &bufferedValueReader{buf: tt.buf, offset: tt.offset} + n, err := reader.peek(tt.n) + if tt.wantErr != nil { + assert.ErrorIs(t, err, tt.wantErr, "Expected error %v, got %v", tt.wantErr, err) + } else { + require.NoError(t, err, "Expected no error when peeking %d bytes", tt.n) + } + + assert.Equal(t, tt.want, n, "Expected to peek %d bytes, got %d", len(tt.want), len(n)) + assert.Equal(t, tt.offset, reader.offset, "Expected offset to be %d, got %d", tt.offset, reader.offset) + }) + } +} From 1a436da4b5c0314355c7bf09aaf50b49e4420440 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 11:28:24 -0600 Subject: [PATCH 03/42] Add streamingValueReader valueReaderBytSrc implementation --- bson/streaming_value_reader.go | 100 +++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 bson/streaming_value_reader.go diff --git a/bson/streaming_value_reader.go b/bson/streaming_value_reader.go new file mode 100644 index 0000000000..718d4e46dd --- /dev/null +++ b/bson/streaming_value_reader.go @@ -0,0 +1,100 @@ +// Copyright (C) MongoDB, Inc. 2025-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package bson + +import ( + "bufio" +) + +// streamingValueReader reads from an ioReader wrapped in a bufio.Reader. It +// first reads the BSON length header, then ensures it only ever reads exactly +// that many bytes. +// +// Note: this approach trades memory usage for extra buffering and reader calls, +// so it is less performanted than the in-memory bufferedValueReader. +type streamingValueReader struct { + br *bufio.Reader + offset int64 // offset is the current read position in the buffer +} + +var _ valueReaderByteSrc = (*streamingValueReader)(nil) + +// Read reads up to len(p) bytes from the underlying bufio.Reader, advancing +// the offset by the number of bytes read. +func (s *streamingValueReader) Read(p []byte) (int, error) { + n, err := s.br.Read(p) + s.offset += int64(n) + return n, err +} + +// ReadByte returns the single byte at buf[offset] and advances offset by 1. +func (s *streamingValueReader) ReadByte() (byte, error) { + c, err := s.br.ReadByte() + if err == nil { + s.offset++ + } + return c, err +} + +// peek returns buf[offset:offset+n] without advancing offset. +func (s *streamingValueReader) peek(n int) ([]byte, error) { + return s.br.Peek(n) +} + +// discard advances offset by n bytes, returning the number of bytes discarded. +func (s *streamingValueReader) discard(n int) (int, error) { + m, err := s.br.Discard(n) + s.offset += int64(m) + return m, err +} + +// readSlice scans buf[offset:] for the first occurrence of delim, returns +// buf[offset:idx+1], and advances offset past it; errors if delim not found. +func (s *streamingValueReader) readSlice(delim byte) ([]byte, error) { + data, err := s.br.ReadSlice(delim) + if err != nil { + return nil, err + } + s.offset += int64(len(data)) + return data, nil +} + +// pos returns the current read position in the buffer. +func (s *streamingValueReader) pos() int64 { + return s.offset +} + +// regexLength will return the total byte length of a BSON regex value. +func (s *streamingValueReader) regexLength() (int32, error) { + var ( + count int32 + nulCount int + ) + + for nulCount < 2 { + buf, err := s.br.Peek(int(count) + 1) + if err != nil { + return 0, err + } + + b := buf[count] + count++ + if b == 0x00 { + nulCount++ + } + } + + return count, nil +} + +func (*streamingValueReader) streamable() bool { + return true +} + +func (s *streamingValueReader) reset() { + s.offset = 0 +} From 5e1fad9ec94ed77f253a491b7b952176729a9083 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 11:29:37 -0600 Subject: [PATCH 04/42] Rename newDocumentReader to newBufferedDocumentReader --- bson/copier.go | 2 +- bson/copier_test.go | 8 ++++---- bson/value_reader.go | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/bson/copier.go b/bson/copier.go index 6279f07230..4c92f9e405 100644 --- a/bson/copier.go +++ b/bson/copier.go @@ -180,7 +180,7 @@ func copyValueFromBytes(dst ValueWriter, t Type, src []byte) error { return wvb.writeValueBytes(t, src) } - vr := newDocumentReader(bytes.NewReader(src)) + vr := newBufferedDocumentReader(bytes.NewReader(src)) vr.pushElement(t) return copyValue(dst, vr) diff --git a/bson/copier_test.go b/bson/copier_test.go index 890b084349..917057f903 100644 --- a/bson/copier_test.go +++ b/bson/copier_test.go @@ -40,7 +40,7 @@ func TestCopier(t *testing.T) { doc = bsoncore.AppendStringElement(doc, "Hello", "world") doc, err := bsoncore.AppendDocumentEnd(doc, idx) noerr(t, err) - src := newDocumentReader(bytes.NewReader(doc)) + src := newBufferedDocumentReader(bytes.NewReader(doc)) dst := newValueWriterFromSlice(make([]byte, 0)) want := doc err = copyDocument(dst, src) @@ -77,7 +77,7 @@ func TestCopier(t *testing.T) { noerr(t, err) doc, err = bsoncore.AppendDocumentEnd(doc, idx) noerr(t, err) - src := newDocumentReader(bytes.NewReader(doc)) + src := newBufferedDocumentReader(bytes.NewReader(doc)) _, err = src.ReadDocument() noerr(t, err) @@ -450,7 +450,7 @@ func TestCopier(t *testing.T) { idx, ) noerr(t, err) - vr := newDocumentReader(bytes.NewReader(b)) + vr := newBufferedDocumentReader(bytes.NewReader(b)) _, err = vr.ReadDocument() noerr(t, err) _, _, err = vr.ReadElement() @@ -489,7 +489,7 @@ func TestCopier(t *testing.T) { idx, ) noerr(t, err) - vr := newDocumentReader(bytes.NewReader(b)) + vr := newBufferedDocumentReader(bytes.NewReader(b)) _, err = vr.ReadDocument() noerr(t, err) _, _, err = vr.ReadElement() diff --git a/bson/value_reader.go b/bson/value_reader.go index edd6bff05c..900dd2cded 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -114,13 +114,13 @@ func putDocumentReader(vr *valueReader) { // NewDocumentReader returns a ValueReader using b for the underlying BSON // representation. func NewDocumentReader(r io.Reader) ValueReader { - return newDocumentReader(r) + return newBufferedDocumentReader(r) } // newValueReader returns a ValueReader that starts in the Value mode instead of in top // level document mode. This enables the creation of a ValueReader for a single BSON value. func newValueReader(t Type, r io.Reader) ValueReader { - vr := newDocumentReader(r) + vr := newBufferedDocumentReader(r) if len(vr.stack) == 0 { vr.stack = make([]vrState, 1, 5) } @@ -129,7 +129,7 @@ func newValueReader(t Type, r io.Reader) ValueReader { return vr } -func newDocumentReader(r io.Reader) *valueReader { +func newBufferedDocumentReader(r io.Reader) *valueReader { stack := make([]vrState, 1, 5) stack[0] = vrState{ mode: mTopLevel, From 8f960b08fd21a65c20cb0d6c4bc2a0ed24bb1e72 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 11:43:15 -0600 Subject: [PATCH 05/42] Reorganize newBufferedDocumentReader to use bufferedValueReader --- bson/buffered_value_reader.go | 5 +-- bson/copier.go | 3 +- bson/copier_test.go | 8 ++--- bson/unmarshal.go | 2 +- bson/value_reader.go | 62 +++++++++++++++++------------------ 5 files changed, 39 insertions(+), 41 deletions(-) diff --git a/bson/buffered_value_reader.go b/bson/buffered_value_reader.go index d046bac8f5..76695b71ad 100644 --- a/bson/buffered_value_reader.go +++ b/bson/buffered_value_reader.go @@ -122,6 +122,7 @@ func (*bufferedValueReader) streamable() bool { return false } -func (*bufferedValueReader) reset() { - // No resources to release for bufferedValueReader. +func (b *bufferedValueReader) reset() { + b.buf = nil + b.offset = 0 } diff --git a/bson/copier.go b/bson/copier.go index 4c92f9e405..0d83ff925d 100644 --- a/bson/copier.go +++ b/bson/copier.go @@ -7,7 +7,6 @@ package bson import ( - "bytes" "errors" "fmt" "io" @@ -180,7 +179,7 @@ func copyValueFromBytes(dst ValueWriter, t Type, src []byte) error { return wvb.writeValueBytes(t, src) } - vr := newBufferedDocumentReader(bytes.NewReader(src)) + vr := newBufferedDocumentReader(src) vr.pushElement(t) return copyValue(dst, vr) diff --git a/bson/copier_test.go b/bson/copier_test.go index 917057f903..32273f511a 100644 --- a/bson/copier_test.go +++ b/bson/copier_test.go @@ -40,7 +40,7 @@ func TestCopier(t *testing.T) { doc = bsoncore.AppendStringElement(doc, "Hello", "world") doc, err := bsoncore.AppendDocumentEnd(doc, idx) noerr(t, err) - src := newBufferedDocumentReader(bytes.NewReader(doc)) + src := newBufferedDocumentReader(doc) dst := newValueWriterFromSlice(make([]byte, 0)) want := doc err = copyDocument(dst, src) @@ -77,7 +77,7 @@ func TestCopier(t *testing.T) { noerr(t, err) doc, err = bsoncore.AppendDocumentEnd(doc, idx) noerr(t, err) - src := newBufferedDocumentReader(bytes.NewReader(doc)) + src := newBufferedDocumentReader(doc) _, err = src.ReadDocument() noerr(t, err) @@ -450,7 +450,7 @@ func TestCopier(t *testing.T) { idx, ) noerr(t, err) - vr := newBufferedDocumentReader(bytes.NewReader(b)) + vr := newBufferedDocumentReader(b) _, err = vr.ReadDocument() noerr(t, err) _, _, err = vr.ReadElement() @@ -489,7 +489,7 @@ func TestCopier(t *testing.T) { idx, ) noerr(t, err) - vr := newBufferedDocumentReader(bytes.NewReader(b)) + vr := newBufferedDocumentReader(b) _, err = vr.ReadDocument() noerr(t, err) _, _, err = vr.ReadElement() diff --git a/bson/unmarshal.go b/bson/unmarshal.go index 52bd94fed7..d3af2cd1d2 100644 --- a/bson/unmarshal.go +++ b/bson/unmarshal.go @@ -42,7 +42,7 @@ type ValueUnmarshaler interface { // When unmarshaling BSON, if the BSON value is null and the Go value is a // pointer, the pointer is set to nil without calling UnmarshalBSONValue. func Unmarshal(data []byte, val interface{}) error { - vr := getDocumentReader(bytes.NewReader(data)) + vr := getBufferedDocumentReader(bytes.NewReader(data)) defer putDocumentReader(vr) if l, err := vr.peekLength(); err != nil { diff --git a/bson/value_reader.go b/bson/value_reader.go index 900dd2cded..31afd360f0 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -77,36 +77,28 @@ var vrPool = sync.Pool{ // valueReader is for reading BSON values. type valueReader struct { - r *bufio.Reader + src valueReaderByteSrc offset int64 stack []vrState frame int64 } -func getDocumentReader(r io.Reader) *valueReader { - vr := vrPool.Get().(*valueReader) - - vr.offset = 0 - vr.frame = 0 - - vr.stack = vr.stack[:1] - vr.stack[0] = vrState{mode: mTopLevel} - - br := bufioReaderPool.Get().(*bufio.Reader) - br.Reset(r) - vr.r = br - - return vr +func getBufferedDocumentReader(b []byte) *valueReader { + return newBufferedDocumentReader(b) } -func putDocumentReader(vr *valueReader) { +func putBufferedDocumentReader(vr *valueReader) { if vr == nil { return } - bufioReaderPool.Put(vr.r) - vr.r = nil + vr.src.reset() + + // Reset src and stack to avoid holding onto memory. + vr.src = nil + vr.frame = 0 + vr.stack = vr.stack[:0] vrPool.Put(vr) } @@ -114,30 +106,36 @@ func putDocumentReader(vr *valueReader) { // NewDocumentReader returns a ValueReader using b for the underlying BSON // representation. func NewDocumentReader(r io.Reader) ValueReader { - return newBufferedDocumentReader(r) + panic("TODO") } // newValueReader returns a ValueReader that starts in the Value mode instead of in top // level document mode. This enables the creation of a ValueReader for a single BSON value. func newValueReader(t Type, r io.Reader) ValueReader { - vr := newBufferedDocumentReader(r) - if len(vr.stack) == 0 { + panic("TODO") +} + +func newBufferedDocumentReader(b []byte) *valueReader { + vr := vrPool.Get().(*valueReader) + + vr.src = &bufferedValueReader{} + vr.src.(*bufferedValueReader).buf = b + vr.src.(*bufferedValueReader).offset = 0 + + // Reset parse state. + vr.frame = 0 + if cap(vr.stack) < 1 { vr.stack = make([]vrState, 1, 5) + } else { + vr.stack = vr.stack[:1] } - vr.stack[0].mode = mValue - vr.stack[0].vType = t - return vr -} -func newBufferedDocumentReader(r io.Reader) *valueReader { - stack := make([]vrState, 1, 5) - stack[0] = vrState{ + vr.stack[0] = vrState{ mode: mTopLevel, + end: int64(len(b)), } - return &valueReader{ - r: bufio.NewReader(r), - stack: stack, - } + + return vr } func (vr *valueReader) advanceFrame() { From 36d3c18eac3fa4471a298e53143a5b4344e43fee Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 11:44:21 -0600 Subject: [PATCH 06/42] Reorganize NewDocumentReader to use streamingValueReader --- bson/value_reader.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 31afd360f0..957a072663 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -106,7 +106,15 @@ func putBufferedDocumentReader(vr *valueReader) { // NewDocumentReader returns a ValueReader using b for the underlying BSON // representation. func NewDocumentReader(r io.Reader) ValueReader { - panic("TODO") + stack := make([]vrState, 1, 5) + stack[0] = vrState{ + mode: mTopLevel, + } + + return &valueReader{ + src: &streamingValueReader{br: bufio.NewReader(r), offset: 0}, + stack: stack, + } } // newValueReader returns a ValueReader that starts in the Value mode instead of in top From 031bf94d4c5e5a03b27dfd442b3fcc3cad81ee2d Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 11:46:44 -0600 Subject: [PATCH 07/42] Update (*valueReader).pop() to support bVR + streaming --- bson/value_reader.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 957a072663..ce7b1ff7c9 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -230,22 +230,27 @@ func (vr *valueReader) pop() error { cnt = 2 // we pop twice to jump over the vrElement: vrDocument -> vrElement -> vrDocument/TopLevel/etc... } for i := 0; i < cnt && vr.frame > 0; i++ { - if vr.offset < vr.stack[vr.frame].end { - _, err := vr.r.Discard(int(vr.stack[vr.frame].end - vr.offset)) + if vr.src.pos() < vr.stack[vr.frame].end { + _, err := vr.src.discard(int(vr.stack[vr.frame].end - vr.src.pos())) if err != nil { return err } } vr.frame-- } - if vr.frame == 0 { - if vr.stack[0].end > vr.offset { - vr.stack[0].end -= vr.offset - } else { - vr.stack[0].end = 0 + + if vr.src.streamable() { + if vr.frame == 0 { + if vr.stack[0].end > vr.src.pos() { + vr.stack[0].end -= vr.src.pos() + } else { + vr.stack[0].end = 0 + } + + vr.src.reset() } - vr.offset = 0 } + return nil } From 54d08c456e3b6452935e9750369784cfd8d77e87 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:07:34 -0600 Subject: [PATCH 08/42] Update (*valueReader).readValueBytes() to support bVR + streaming --- bson/value_reader.go | 80 ++++++++++++++++++++++++-------------------- 1 file changed, 43 insertions(+), 37 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index ce7b1ff7c9..ed25e2aca1 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -293,7 +293,9 @@ func (vr *valueReader) Type() Type { return vr.stack[vr.frame].vType } -func (vr *valueReader) appendNextElement(dst []byte) ([]byte, error) { +// peekLength returns the length of the next value in the stream without +// offsetting the reader position. +func peekNextValueSize(vr *valueReader, dst []byte) (int32, error) { var length int32 var err error switch vr.stack[vr.frame].vType { @@ -321,46 +323,50 @@ func (vr *valueReader) appendNextElement(dst []byte) ([]byte, error) { case TypeObjectID: length = 12 case TypeRegex: - for n := 0; n < 2; n++ { // Read two C strings. - str, err := vr.r.ReadBytes(0x00) - if err != nil { - return nil, err - } - dst = append(dst, str...) - vr.offset += int64(len(str)) - } - return dst, nil + length, err = vr.src.regexLength() default: - return nil, fmt.Errorf("attempted to read bytes of unknown BSON type %v", vr.stack[vr.frame].vType) - } - if err != nil { - return nil, err + return 0, fmt.Errorf("attempted to read bytes of unknown BSON type %v", vr.stack[vr.frame].vType) } - buf, err := vr.r.Peek(int(length)) - if err != nil { - if err == bufio.ErrBufferFull { - temp := make([]byte, length) - if _, err = io.ReadFull(vr.r, temp); err != nil { - return nil, err + return length, err +} + +// readBytes tries to grab the next n bytes zero-allocation using peek+discard. +// If peek fails (e.g. bufio buffer full), it falls back to io.ReadFull. +func readBytes(src valueReaderByteSrc, n int) ([]byte, error) { + if src.streamable() { + data := make([]byte, n) + if _, err := io.ReadFull(src, data); err != nil { + if errors.Is(err, io.ErrUnexpectedEOF) { + err = io.EOF // Convert io.ErrUnexpectedEOF to io.EOF for consistency. } - dst = append(dst, temp...) - vr.offset += int64(len(temp)) - return dst, nil + return nil, err } - return nil, err + return data, nil } - dst = append(dst, buf...) - if _, err = vr.r.Discard(int(length)); err != nil { + // Zero-allocation path. + buf, err := src.peek(n) + if err != nil { return nil, err } - vr.offset += int64(length) - return dst, nil + _, _ = src.discard(n) // Discard the bytes from the source. + return buf, nil +} + +// readBytesValueReader returns a subslice [offset, offset+length) or EOF. +func (vr *valueReader) readBytes(n int32) ([]byte, error) { + if n < 0 { + return nil, fmt.Errorf("invalid length: %d", n) + } + + return readBytes(vr.src, int(n)) } +// readValueBytes returns the raw bytes of the next value (or top‐level +// document) without allocating intermediary buffers, then pops the frame. func (vr *valueReader) readValueBytes(dst []byte) (Type, []byte, error) { switch vr.stack[vr.frame].mode { case mTopLevel: @@ -368,25 +374,25 @@ func (vr *valueReader) readValueBytes(dst []byte) (Type, []byte, error) { if err != nil { return Type(0), nil, err } - dst, err = vr.appendBytes(dst, length) - if err != nil { - return Type(0), nil, err - } - return Type(0), dst, nil + b, err := vr.readBytes(length) + return Type(0), append(dst, b...), err case mElement, mValue: - dst, err := vr.appendNextElement(dst) + length, err := peekNextValueSize(vr, dst) if err != nil { return Type(0), dst, err } + b, err := vr.readBytes(length) + t := vr.stack[vr.frame].vType err = vr.pop() if err != nil { return Type(0), nil, err } - return t, dst, nil + + return t, append(dst, b...), err default: - return Type(0), nil, vr.invalidTransitionErr(0, "ReadValueBytes", []mode{mElement, mValue}) + return Type(0), nil, vr.invalidTransitionErr(0, "readValueBytes", []mode{mElement, mValue}) } } @@ -397,7 +403,7 @@ func (vr *valueReader) Skip() error { return vr.invalidTransitionErr(0, "Skip", []mode{mElement, mValue}) } - _, err := vr.appendNextElement(nil) + _, err := peekNextValueSize(vr, nil) if err != nil { return err } From 26765bdbe7672dec40115acee90b31be80444499 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:09:12 -0600 Subject: [PATCH 09/42] Update (*valueReader).Skip() to support bVR + streaming --- bson/value_reader.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index ed25e2aca1..0ea76b4adf 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -403,7 +403,12 @@ func (vr *valueReader) Skip() error { return vr.invalidTransitionErr(0, "Skip", []mode{mElement, mValue}) } - _, err := peekNextValueSize(vr, nil) + length, err := peekNextValueSize(vr, nil) + if err != nil { + return err + } + + _, err = vr.src.discard(int(length)) if err != nil { return err } From 8167d47ac813259bb84f3abf509363c825058a26 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:11:55 -0600 Subject: [PATCH 10/42] Update (*valueReader).ReadArray() to support bVR + streaming --- bson/value_reader.go | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 0ea76b4adf..2663f07f5b 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -179,20 +179,6 @@ func (vr *valueReader) pushDocument() error { return nil } -func (vr *valueReader) pushArray() error { - vr.advanceFrame() - - vr.stack[vr.frame].mode = mArray - - length, err := vr.readLength() - if err != nil { - return err - } - vr.stack[vr.frame].end = int64(length) + vr.offset - 4 - - return nil -} - func (vr *valueReader) pushElement(t Type) { vr.advanceFrame() @@ -416,16 +402,26 @@ func (vr *valueReader) Skip() error { return vr.pop() } +// ReadArray returns an ArrayReader for the next BSON array in the valueReader +// source, advancing the reader position to the end of the array. func (vr *valueReader) ReadArray() (ArrayReader, error) { if err := vr.ensureElementValue(TypeArray, mArray, "ReadArray"); err != nil { return nil, err } - err := vr.pushArray() + // Push a new frame for the array. + vr.advanceFrame() + + // Read the 4-byte length. + size, err := vr.readLength() if err != nil { return nil, err } + // Compute the end position: current position + total size - length. + vr.stack[vr.frame].mode = mArray + vr.stack[vr.frame].end = vr.src.pos() + int64(size) - 4 + return vr, nil } From 02d38dff0db99f578880256609a0b28472bc4384 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:15:07 -0600 Subject: [PATCH 11/42] Update (*valueReader).ReadBinary() to support bVR + streaming --- bson/value_reader.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 2663f07f5b..f6e75e84ad 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -425,7 +425,10 @@ func (vr *valueReader) ReadArray() (ArrayReader, error) { return vr, nil } -func (vr *valueReader) ReadBinary() (b []byte, btype byte, err error) { +// ReadBinary reads a BSON binary value, returning the byte slice and the +// type of the binary data (0x02 for old binary, 0x00 for new binary, etc.), +// advancing the reader position to the end of the binary value. +func (vr *valueReader) ReadBinary() ([]byte, byte, error) { if err := vr.ensureElementValue(TypeBinary, 0, "ReadBinary"); err != nil { return nil, 0, err } @@ -435,7 +438,7 @@ func (vr *valueReader) ReadBinary() (b []byte, btype byte, err error) { return nil, 0, err } - btype, err = vr.readByte() + btype, err := vr.readByte() if err != nil { return nil, 0, err } @@ -448,8 +451,7 @@ func (vr *valueReader) ReadBinary() (b []byte, btype byte, err error) { } } - b = make([]byte, length) - err = vr.read(b) + b, err := vr.readBytes(length) if err != nil { return nil, 0, err } @@ -457,6 +459,7 @@ func (vr *valueReader) ReadBinary() (b []byte, btype byte, err error) { if err := vr.pop(); err != nil { return nil, 0, err } + return b, btype, nil } From 50a265a15c668fbb41c2c61fec50b8e3e560de29 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:16:02 -0600 Subject: [PATCH 12/42] Add comment to (*valueReader).ReadBoolean() --- bson/value_reader.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bson/value_reader.go b/bson/value_reader.go index f6e75e84ad..ce6afbd3c7 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -463,6 +463,8 @@ func (vr *valueReader) ReadBinary() ([]byte, byte, error) { return b, btype, nil } +// ReadBoolean reads a BSON boolean value, returning true or false, advancing +// the reader position to the end of the boolean value. func (vr *valueReader) ReadBoolean() (bool, error) { if err := vr.ensureElementValue(TypeBoolean, 0, "ReadBoolean"); err != nil { return false, err From 4bf9f619d7356de981161b8ee18ef339cbc2bab7 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:17:47 -0600 Subject: [PATCH 13/42] Update (*valueReader).ReadDocument() to support bVR + streaming --- bson/value_reader.go | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index ce6afbd3c7..9223cf9d64 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -165,20 +165,6 @@ func (vr *valueReader) advanceFrame() { vr.stack[vr.frame].end = 0 } -func (vr *valueReader) pushDocument() error { - vr.advanceFrame() - - vr.stack[vr.frame].mode = mDocument - - length, err := vr.readLength() - if err != nil { - return err - } - vr.stack[vr.frame].end = int64(length) + vr.offset - 4 - - return nil -} - func (vr *valueReader) pushElement(t Type) { vr.advanceFrame() @@ -485,6 +471,8 @@ func (vr *valueReader) ReadBoolean() (bool, error) { return b == 1, nil } +// ReadDocument reads a BSON embedded document, returning a DocumentReader, +// advancing the reader position to the end of the document. func (vr *valueReader) ReadDocument() (DocumentReader, error) { switch vr.stack[vr.frame].mode { case mTopLevel: @@ -496,7 +484,7 @@ func (vr *valueReader) ReadDocument() (DocumentReader, error) { return nil, fmt.Errorf("invalid string length: %d", length) } - vr.stack[vr.frame].end = int64(length) + vr.offset - 4 + vr.stack[vr.frame].end = int64(length) + vr.src.pos() - 4 return vr, nil case mElement, mValue: if vr.stack[vr.frame].vType != TypeEmbeddedDocument { @@ -506,11 +494,16 @@ func (vr *valueReader) ReadDocument() (DocumentReader, error) { return nil, vr.invalidTransitionErr(mDocument, "ReadDocument", []mode{mTopLevel, mElement, mValue}) } - err := vr.pushDocument() + vr.advanceFrame() + + size, err := vr.readLength() if err != nil { return nil, err } + vr.stack[vr.frame].mode = mDocument + vr.stack[vr.frame].end = int64(size) + vr.src.pos() - 4 + return vr, nil } From 828593c6581cb9ba3158273f4e9364aa7747d799 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:20:37 -0600 Subject: [PATCH 14/42] Update (*valueReader).ReadCodeWithScope() to support bVR + streaming --- bson/value_reader.go | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 9223cf9d64..d3576c8a28 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -179,20 +179,6 @@ func (vr *valueReader) pushValue(t Type) { vr.stack[vr.frame].vType = t } -func (vr *valueReader) pushCodeWithScope() (int64, error) { - vr.advanceFrame() - - vr.stack[vr.frame].mode = mCodeWithScope - - length, err := vr.readLength() - if err != nil { - return 0, err - } - vr.stack[vr.frame].end = int64(length) + vr.offset - 4 - - return int64(length), nil -} - func (vr *valueReader) pop() error { var cnt int switch vr.stack[vr.frame].mode { @@ -507,7 +493,9 @@ func (vr *valueReader) ReadDocument() (DocumentReader, error) { return vr, nil } -func (vr *valueReader) ReadCodeWithScope() (code string, dr DocumentReader, err error) { +// ReadCodeWithScope reads a BSON CodeWithScope value, returning the code as a +// string, advancing the reader position to the end of the CodeWithScope value. +func (vr *valueReader) ReadCodeWithScope() (string, DocumentReader, error) { if err := vr.ensureElementValue(TypeCodeWithScope, 0, "ReadCodeWithScope"); err != nil { return "", nil, err } @@ -523,21 +511,26 @@ func (vr *valueReader) ReadCodeWithScope() (code string, dr DocumentReader, err if strLength <= 0 { return "", nil, fmt.Errorf("invalid string length: %d", strLength) } - strBytes := make([]byte, strLength) - err = vr.read(strBytes) + buf, err := vr.readBytes(strLength) if err != nil { return "", nil, err } - code = string(strBytes[:len(strBytes)-1]) - size, err := vr.pushCodeWithScope() + code := string(buf[:len(buf)-1]) + vr.advanceFrame() + + // Use readLength to ensure that we are not out of bounds. + size, err := vr.readLength() if err != nil { return "", nil, err } + vr.stack[vr.frame].mode = mCodeWithScope + vr.stack[vr.frame].end = vr.src.pos() + int64(size) - 4 + // The total length should equal: // 4 (total length) + strLength + 4 (the length of str itself) + (document length) - componentsLength := int64(4+strLength+4) + size + componentsLength := int64(4+strLength+4) + int64(size) if int64(totalLength) != componentsLength { return "", nil, fmt.Errorf( "length of CodeWithScope does not match lengths of components; total: %d; components: %d", From 96b5eb8ddfaa8788eb4f79b478f9509d2436ea47 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:23:56 -0600 Subject: [PATCH 15/42] Update (*valueReader).ReadDBPointer() to support bVR + streaming --- bson/value_reader.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index d3576c8a28..014cc9b963 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -540,21 +540,26 @@ func (vr *valueReader) ReadCodeWithScope() (string, DocumentReader, error) { return code, vr, nil } -func (vr *valueReader) ReadDBPointer() (ns string, oid ObjectID, err error) { +// ReadDBPointer reads a BSON DBPointer value, returning the namespace, the +// object ID, and an error if any, advancing the reader position to the end of +// the DBPointer value. +func (vr *valueReader) ReadDBPointer() (string, ObjectID, error) { if err := vr.ensureElementValue(TypeDBPointer, 0, "ReadDBPointer"); err != nil { - return "", oid, err + return "", ObjectID{}, err } - - ns, err = vr.readString() + ns, err := vr.readString() if err != nil { - return "", oid, err + return "", ObjectID{}, err } - err = vr.read(oid[:]) + oidBytes, err := vr.readBytes(12) if err != nil { return "", ObjectID{}, err } + var oid ObjectID + copy(oid[:], oidBytes) + if err := vr.pop(); err != nil { return "", ObjectID{}, err } From 086111953b41859bbf606e1f51dd8396e6a0a14a Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:24:48 -0600 Subject: [PATCH 16/42] Update (*valueReader).ReadDateTime() to support bVR + streaming --- bson/value_reader.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bson/value_reader.go b/bson/value_reader.go index 014cc9b963..b143c4f381 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -566,6 +566,8 @@ func (vr *valueReader) ReadDBPointer() (string, ObjectID, error) { return ns, oid, nil } +// ReadDateTime reads a BSON DateTime value, advancing the reader position to +// the end of the DateTime value. func (vr *valueReader) ReadDateTime() (int64, error) { if err := vr.ensureElementValue(TypeDateTime, 0, "ReadDateTime"); err != nil { return 0, err From 443fb818a1dcc7d35e9e66144ed609df29465fb4 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:26:15 -0600 Subject: [PATCH 17/42] Update (*valueReader).ReadDecimal128() to support bVR + streaming --- bson/value_reader.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index b143c4f381..5af3c64249 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -584,17 +584,16 @@ func (vr *valueReader) ReadDateTime() (int64, error) { return i, nil } +// ReadDecimal128 reads a BSON Decimal128 value, advancing the reader +// to the end of the Decimal128 value. func (vr *valueReader) ReadDecimal128() (Decimal128, error) { if err := vr.ensureElementValue(TypeDecimal128, 0, "ReadDecimal128"); err != nil { return Decimal128{}, err } - - var b [16]byte - err := vr.read(b[:]) + b, err := vr.readBytes(16) if err != nil { return Decimal128{}, err } - l := binary.LittleEndian.Uint64(b[0:8]) h := binary.LittleEndian.Uint64(b[8:16]) From 942e52ec6ca8aaa9cd40a10255312f853a508269 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:26:57 -0600 Subject: [PATCH 18/42] Add comment to (*valueReader).ReadDouble() --- bson/value_reader.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bson/value_reader.go b/bson/value_reader.go index 5af3c64249..168a14314a 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -603,6 +603,8 @@ func (vr *valueReader) ReadDecimal128() (Decimal128, error) { return NewDecimal128(h, l), nil } +// ReadDouble reads a BSON double value, advancing the reader position to +// to the end of the double value. func (vr *valueReader) ReadDouble() (float64, error) { if err := vr.ensureElementValue(TypeDouble, 0, "ReadDouble"); err != nil { return 0, err From f934c705bec3d06f7de2e965d6b3a1d8b1abb715 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:29:08 -0600 Subject: [PATCH 19/42] Update (*valueReader).ReadInt32() to support bVR + streaming --- bson/value_reader.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 168a14314a..2a8eec07f3 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -621,15 +621,21 @@ func (vr *valueReader) ReadDouble() (float64, error) { return math.Float64frombits(u), nil } +// ReadInt32 reads a BSON int32 value, advancing the reader position to the end +// of the int32 value. func (vr *valueReader) ReadInt32() (int32, error) { if err := vr.ensureElementValue(TypeInt32, 0, "ReadInt32"); err != nil { return 0, err } + i, err := vr.readi32() + if err != nil { + return 0, err + } if err := vr.pop(); err != nil { return 0, err } - return vr.readi32() + return i, nil } func (vr *valueReader) ReadInt64() (int64, error) { From c3ba283b3c95bd29e072a9bde0894e729865e719 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:34:39 -0600 Subject: [PATCH 20/42] Update (*valueReader).ReadInt64() to support bVR + streaming --- bson/value_reader.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 2a8eec07f3..0622c94d10 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -638,15 +638,21 @@ func (vr *valueReader) ReadInt32() (int32, error) { return i, nil } +// ReadInt64 reads a BSON int64 value, advancing the reader position to the end +// of the int64 value. func (vr *valueReader) ReadInt64() (int64, error) { if err := vr.ensureElementValue(TypeInt64, 0, "ReadInt64"); err != nil { return 0, err } + i, err := vr.readi64() + if err != nil { + return 0, err + } if err := vr.pop(); err != nil { return 0, err } - return vr.readi64() + return i, nil } func (vr *valueReader) ReadJavascript() (string, error) { From 1a3d33d1d28f74c3d347a6b36edc436baa578a25 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:35:24 -0600 Subject: [PATCH 21/42] Update (*valueReader).ReadJavascript() to support bVR + streaming --- bson/value_reader.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 0622c94d10..3b95ef5612 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -655,15 +655,21 @@ func (vr *valueReader) ReadInt64() (int64, error) { return i, nil } +// ReadJavascript reads a BSON JavaScript value, advancing the reader +// to the end of the JavaScript value. func (vr *valueReader) ReadJavascript() (string, error) { if err := vr.ensureElementValue(TypeJavaScript, 0, "ReadJavascript"); err != nil { return "", err } + s, err := vr.readString() + if err != nil { + return "", err + } if err := vr.pop(); err != nil { return "", err } - return vr.readString() + return s, nil } func (vr *valueReader) ReadMaxKey() error { From b7d0042d4ee6ec3144164ce26af033c00ca513a3 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:36:28 -0600 Subject: [PATCH 22/42] Update (*valueReader).ReadObjectID() to support bVR + streaming --- bson/value_reader.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 3b95ef5612..c83d573e23 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -696,17 +696,21 @@ func (vr *valueReader) ReadNull() error { return vr.pop() } +// ReadObjectID reads a BSON ObjectID value, advancing the reader to the end of +// the ObjectID value. func (vr *valueReader) ReadObjectID() (ObjectID, error) { if err := vr.ensureElementValue(TypeObjectID, 0, "ReadObjectID"); err != nil { return ObjectID{}, err } - var oid ObjectID - err := vr.read(oid[:]) + oidBytes, err := vr.readBytes(12) if err != nil { return ObjectID{}, err } + var oid ObjectID + copy(oid[:], oidBytes) + if err := vr.pop(); err != nil { return ObjectID{}, err } From 0cbacc1d9d5cb6c9a6df43742f4df6afacc20a4a Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:37:26 -0600 Subject: [PATCH 23/42] Add comment to (*valueReader).Read(MinKey|MaxKey|Null)() --- bson/value_reader.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/bson/value_reader.go b/bson/value_reader.go index c83d573e23..3ab5577bfd 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -672,6 +672,8 @@ func (vr *valueReader) ReadJavascript() (string, error) { return s, nil } +// ReadMaxKey reads a BSON MaxKey value, advancing the reader position to the +// end of the MaxKey value. func (vr *valueReader) ReadMaxKey() error { if err := vr.ensureElementValue(TypeMaxKey, 0, "ReadMaxKey"); err != nil { return err @@ -680,6 +682,8 @@ func (vr *valueReader) ReadMaxKey() error { return vr.pop() } +// ReadMinKey reads a BSON MinKey value, advancing the reader position to the +// end of the MinKey value. func (vr *valueReader) ReadMinKey() error { if err := vr.ensureElementValue(TypeMinKey, 0, "ReadMinKey"); err != nil { return err @@ -688,6 +692,8 @@ func (vr *valueReader) ReadMinKey() error { return vr.pop() } +// REadNull reads a BSON Null value, advancing the reader position to the +// end of the Null value. func (vr *valueReader) ReadNull() error { if err := vr.ensureElementValue(TypeNull, 0, "ReadNull"); err != nil { return err From 7d16cf8e9827cd52a383a7d7d319dd01e5bdef1f Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:38:15 -0600 Subject: [PATCH 24/42] Add comment to (*valueReader).ReadRegex() --- bson/value_reader.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bson/value_reader.go b/bson/value_reader.go index 3ab5577bfd..0eda8f08be 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -723,6 +723,8 @@ func (vr *valueReader) ReadObjectID() (ObjectID, error) { return oid, nil } +// ReadRegex reads a BSON Regex value, advancing the reader position to the +// regex value. func (vr *valueReader) ReadRegex() (string, string, error) { if err := vr.ensureElementValue(TypeRegex, 0, "ReadRegex"); err != nil { return "", "", err From a4977742d02c7dac7ee3a650c443ba1178f75ee9 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:39:01 -0600 Subject: [PATCH 25/42] Update (*valueReader).ReadString() to support bVR + streaming --- bson/value_reader.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 0eda8f08be..82e27eeefa 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -746,15 +746,21 @@ func (vr *valueReader) ReadRegex() (string, string, error) { return pattern, options, nil } +// ReadString reads a BSON String value, advancing the reader position to the +// end of the String value. func (vr *valueReader) ReadString() (string, error) { if err := vr.ensureElementValue(TypeString, 0, "ReadString"); err != nil { return "", err } + s, err := vr.readString() + if err != nil { + return "", err + } if err := vr.pop(); err != nil { return "", err } - return vr.readString() + return s, nil } func (vr *valueReader) ReadSymbol() (string, error) { From dfe55029679f9a3a619473de1680f07b8fc32c8e Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:43:04 -0600 Subject: [PATCH 26/42] Update (*valueReader).ReadSymbol() to support bVR + streaming --- bson/value_reader.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 82e27eeefa..83765e94e6 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -763,15 +763,20 @@ func (vr *valueReader) ReadString() (string, error) { return s, nil } +// ReadSymbol reads a BSON Symbol value, advancing the reader position to the +// end of the Symbol value. func (vr *valueReader) ReadSymbol() (string, error) { if err := vr.ensureElementValue(TypeSymbol, 0, "ReadSymbol"); err != nil { return "", err } - + s, err := vr.readString() + if err != nil { + return "", err + } if err := vr.pop(); err != nil { return "", err } - return vr.readString() + return s, nil } func (vr *valueReader) ReadTimestamp() (t uint32, i uint32, err error) { From 062adeb549b55b163263dc4efee556fc4c021f56 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:46:47 -0600 Subject: [PATCH 27/42] Add comment to (*valueReader).ReadTimestamp() --- bson/value_reader.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bson/value_reader.go b/bson/value_reader.go index 83765e94e6..5ae1640705 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -779,6 +779,8 @@ func (vr *valueReader) ReadSymbol() (string, error) { return s, nil } +// ReadTimestamp reads a BSON Timestamp value, advancing the reader to the end +// of the Timestamp value. func (vr *valueReader) ReadTimestamp() (t uint32, i uint32, err error) { if err := vr.ensureElementValue(TypeTimestamp, 0, "ReadTimestamp"); err != nil { return 0, 0, err From 83a47d599f28c4f17de98a45eec1793930c04fff Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:48:15 -0600 Subject: [PATCH 28/42] Add comment to (*valueReader).ReadUndefined() --- bson/value_reader.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bson/value_reader.go b/bson/value_reader.go index 5ae1640705..f664f37fa1 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -802,6 +802,8 @@ func (vr *valueReader) ReadTimestamp() (t uint32, i uint32, err error) { return t, i, nil } +// ReadUndefined reads a BSON Undefined value, advancing the reader position +// to the end of the Undefined value. func (vr *valueReader) ReadUndefined() error { if err := vr.ensureElementValue(TypeUndefined, 0, "ReadUndefined"); err != nil { return err From 5b570de8b0acbf71e8d711acc141cc37027f1693 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:49:07 -0600 Subject: [PATCH 29/42] Update (*valueReader).ReadTimestamp() to support bVR + streaming --- bson/value_reader.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index f664f37fa1..9821175095 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -781,17 +781,17 @@ func (vr *valueReader) ReadSymbol() (string, error) { // ReadTimestamp reads a BSON Timestamp value, advancing the reader to the end // of the Timestamp value. -func (vr *valueReader) ReadTimestamp() (t uint32, i uint32, err error) { +func (vr *valueReader) ReadTimestamp() (uint32, uint32, error) { if err := vr.ensureElementValue(TypeTimestamp, 0, "ReadTimestamp"); err != nil { return 0, 0, err } - i, err = vr.readu32() + i, err := vr.readu32() if err != nil { return 0, 0, err } - t, err = vr.readu32() + t, err := vr.readu32() if err != nil { return 0, 0, err } @@ -803,6 +803,7 @@ func (vr *valueReader) ReadTimestamp() (t uint32, i uint32, err error) { } // ReadUndefined reads a BSON Undefined value, advancing the reader position + // to the end of the Undefined value. func (vr *valueReader) ReadUndefined() error { if err := vr.ensureElementValue(TypeUndefined, 0, "ReadUndefined"); err != nil { From 403445f6c4941df7134e6079081a6257fcaee6aa Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:51:05 -0600 Subject: [PATCH 30/42] Update (*valueReader).ReadElement() to support bVR + streaming --- bson/value_reader.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 9821175095..b72a8fda36 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -165,13 +165,6 @@ func (vr *valueReader) advanceFrame() { vr.stack[vr.frame].end = 0 } -func (vr *valueReader) pushElement(t Type) { - vr.advanceFrame() - - vr.stack[vr.frame].mode = mElement - vr.stack[vr.frame].vType = t -} - func (vr *valueReader) pushValue(t Type) { vr.advanceFrame() @@ -803,7 +796,6 @@ func (vr *valueReader) ReadTimestamp() (uint32, uint32, error) { } // ReadUndefined reads a BSON Undefined value, advancing the reader position - // to the end of the Undefined value. func (vr *valueReader) ReadUndefined() error { if err := vr.ensureElementValue(TypeUndefined, 0, "ReadUndefined"); err != nil { @@ -813,6 +805,8 @@ func (vr *valueReader) ReadUndefined() error { return vr.pop() } +// ReadElement reads the next element in the BSON document, advancing the +// reader position to the end of the element. func (vr *valueReader) ReadElement() (string, ValueReader, error) { switch vr.stack[vr.frame].mode { case mTopLevel, mDocument, mCodeWithScope: @@ -826,7 +820,7 @@ func (vr *valueReader) ReadElement() (string, ValueReader, error) { } if t == 0 { - if vr.offset != vr.stack[vr.frame].end { + if vr.src.pos() != vr.stack[vr.frame].end { return "", nil, vr.invalidDocumentLengthError() } @@ -839,7 +833,10 @@ func (vr *valueReader) ReadElement() (string, ValueReader, error) { return "", nil, err } - vr.pushElement(Type(t)) + vr.advanceFrame() + + vr.stack[vr.frame].mode = mElement + vr.stack[vr.frame].vType = Type(t) return name, vr, nil } From dd709d1d4915dc1609f6026991551845f7f58d26 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:52:36 -0600 Subject: [PATCH 31/42] Update (*valueReader).ReadValue() to support bVR + streaming --- bson/value_reader.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index b72a8fda36..24fb3db071 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -165,13 +165,6 @@ func (vr *valueReader) advanceFrame() { vr.stack[vr.frame].end = 0 } -func (vr *valueReader) pushValue(t Type) { - vr.advanceFrame() - - vr.stack[vr.frame].mode = mValue - vr.stack[vr.frame].vType = t -} - func (vr *valueReader) pop() error { var cnt int switch vr.stack[vr.frame].mode { @@ -840,6 +833,8 @@ func (vr *valueReader) ReadElement() (string, ValueReader, error) { return name, vr, nil } +// ReadValue reads the next value in the BSON array, advancing the to the end of +// the value. func (vr *valueReader) ReadValue() (ValueReader, error) { switch vr.stack[vr.frame].mode { case mArray: @@ -853,7 +848,7 @@ func (vr *valueReader) ReadValue() (ValueReader, error) { } if t == 0 { - if vr.offset != vr.stack[vr.frame].end { + if vr.src.pos() != vr.stack[vr.frame].end { return nil, vr.invalidDocumentLengthError() } @@ -861,11 +856,15 @@ func (vr *valueReader) ReadValue() (ValueReader, error) { return nil, ErrEOA } - if _, err := vr.readCString(); err != nil { + _, err = vr.src.readSlice(0x00) + if err != nil { return nil, err } - vr.pushValue(Type(t)) + vr.advanceFrame() + + vr.stack[vr.frame].mode = mValue + vr.stack[vr.frame].vType = Type(t) return vr, nil } From dea88a6a4b962b36bc54854ffa3b5bf62e829af8 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:54:44 -0600 Subject: [PATCH 32/42] Update (*valueReader).readValue() to support bVR + streaming --- bson/value_reader.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 24fb3db071..e9ef5a7aad 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -887,11 +887,10 @@ func (vr *valueReader) appendBytes(dst []byte, length int32) ([]byte, error) { } func (vr *valueReader) readByte() (byte, error) { - b, err := vr.r.ReadByte() + b, err := vr.src.ReadByte() if err != nil { return 0x0, err } - vr.offset++ return b, nil } From 2e8be2a90b8a701479848b2ac1b7e28fa148585d Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 12:57:39 -0600 Subject: [PATCH 33/42] Update (*valueReader).readCString() to support bVR + streaming --- bson/value_reader.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index e9ef5a7aad..92c7b3d945 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -895,13 +895,11 @@ func (vr *valueReader) readByte() (byte, error) { } func (vr *valueReader) readCString() (string, error) { - str, err := vr.r.ReadString(0x00) + data, err := vr.src.readSlice(0x00) if err != nil { return "", err } - l := len(str) - vr.offset += int64(l) - return str[:l-1], nil + return string(data[:len(data)-1]), nil } func (vr *valueReader) readString() (string, error) { From 0036af6d39c46409db40e0ac67f5f7310683d816 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 13:02:26 -0600 Subject: [PATCH 34/42] Update (*valueReader).readString() to support bVR + streaming --- bson/value_reader.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 92c7b3d945..59636ba95a 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -907,21 +907,23 @@ func (vr *valueReader) readString() (string, error) { if err != nil { return "", err } + if length <= 0 { return "", fmt.Errorf("invalid string length: %d", length) } - buf := make([]byte, length) - err = vr.read(buf) + raw, err := readBytes(vr.src, int(length)) if err != nil { return "", err } - if buf[length-1] != 0x00 { - return "", fmt.Errorf("string does not end with null byte, but with %v", buf[length-1]) + // Check that the last byte is the NUL terminator. + if raw[len(raw)-1] != 0x00 { + return "", fmt.Errorf("string does not end with null byte, but with %v", raw[len(raw)-1]) } - return string(buf[:length-1]), nil + // Convert and strip the trailing NUL. + return string(raw[:len(raw)-1]), nil } func (vr *valueReader) peekLength() (int32, error) { From f62a9dec1827f6941eda94d7f6376b4d21b132ac Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 13:02:45 -0600 Subject: [PATCH 35/42] Update (*valueReader).peekLength() to support bVR + streaming --- bson/value_reader.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 59636ba95a..634d0072ea 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -927,11 +927,10 @@ func (vr *valueReader) readString() (string, error) { } func (vr *valueReader) peekLength() (int32, error) { - buf, err := vr.r.Peek(4) + buf, err := vr.src.peek(4) if err != nil { return 0, err } - return int32(binary.LittleEndian.Uint32(buf)), nil } From 0ba57752c2e4d7dd606d593b4b024f7cf8be7cef Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 13:03:30 -0600 Subject: [PATCH 36/42] Update (*valueReader).readLength() to support bVR + streaming --- bson/value_reader.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 634d0072ea..fc3c384f4a 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -935,14 +935,7 @@ func (vr *valueReader) peekLength() (int32, error) { } func (vr *valueReader) readLength() (int32, error) { - l, err := vr.readi32() - if err != nil { - return 0, err - } - if l < 0 { - return 0, fmt.Errorf("invalid negative length: %d", l) - } - return l, nil + return vr.readi32() } func (vr *valueReader) readi32() (int32, error) { From 296ff2570bdea80303ed9a995e1e4c9f4fc91cd0 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 13:05:04 -0600 Subject: [PATCH 37/42] Update (*valueReader).read(i32|u32|i64|u64)() to support bVR + streaming --- bson/value_reader.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index fc3c384f4a..114ab7e1c0 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -939,41 +939,37 @@ func (vr *valueReader) readLength() (int32, error) { } func (vr *valueReader) readi32() (int32, error) { - var buf [4]byte - err := vr.read(buf[:]) + raw, err := readBytes(vr.src, 4) if err != nil { return 0, err } - return int32(binary.LittleEndian.Uint32(buf[:])), nil + return int32(binary.LittleEndian.Uint32(raw)), nil } func (vr *valueReader) readu32() (uint32, error) { - var buf [4]byte - err := vr.read(buf[:]) + raw, err := readBytes(vr.src, 4) if err != nil { return 0, err } - return binary.LittleEndian.Uint32(buf[:]), nil + return binary.LittleEndian.Uint32(raw), nil } func (vr *valueReader) readi64() (int64, error) { - var buf [8]byte - err := vr.read(buf[:]) + raw, err := readBytes(vr.src, 8) if err != nil { return 0, err } - return int64(binary.LittleEndian.Uint64(buf[:])), nil + return int64(binary.LittleEndian.Uint64(raw)), nil } func (vr *valueReader) readu64() (uint64, error) { - var buf [8]byte - err := vr.read(buf[:]) + raw, err := readBytes(vr.src, 8) if err != nil { return 0, err } - return binary.LittleEndian.Uint64(buf[:]), nil + return binary.LittleEndian.Uint64(raw), nil } From 5a12f54e1becfe0a4a931b6e2c154e61c7dab12a Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 13:05:57 -0600 Subject: [PATCH 38/42] Remove read and appendBytes --- bson/value_reader.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 114ab7e1c0..dda31adcde 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -868,24 +868,6 @@ func (vr *valueReader) ReadValue() (ValueReader, error) { return vr, nil } -func (vr *valueReader) read(p []byte) error { - n, err := io.ReadFull(vr.r, p) - if err != nil { - return err - } - vr.offset += int64(n) - return nil -} - -func (vr *valueReader) appendBytes(dst []byte, length int32) ([]byte, error) { - buf := make([]byte, length) - err := vr.read(buf) - if err != nil { - return nil, err - } - return append(dst, buf...), nil -} - func (vr *valueReader) readByte() (byte, error) { b, err := vr.src.ReadByte() if err != nil { From ef6b0241f6f42dbe36317dbc64f3eb34b9a8cc52 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 13:26:54 -0600 Subject: [PATCH 39/42] Update newValueReader to use bVR --- bson/copier.go | 5 ++++- bson/mgoregistry_test.go | 12 ++++++------ bson/raw_value.go | 4 ++-- bson/unmarshal.go | 6 +++--- bson/unmarshal_test.go | 2 +- bson/unmarshal_value_test.go | 3 +-- bson/unmarshaling_cases_test.go | 9 ++++----- bson/value_reader.go | 12 ++++++++---- 8 files changed, 29 insertions(+), 24 deletions(-) diff --git a/bson/copier.go b/bson/copier.go index 0d83ff925d..c9a37c756b 100644 --- a/bson/copier.go +++ b/bson/copier.go @@ -180,7 +180,10 @@ func copyValueFromBytes(dst ValueWriter, t Type, src []byte) error { } vr := newBufferedDocumentReader(src) - vr.pushElement(t) + vr.advanceFrame() + + vr.stack[vr.frame].mode = mElement + vr.stack[vr.frame].vType = t return copyValue(dst, vr) } diff --git a/bson/mgoregistry_test.go b/bson/mgoregistry_test.go index fa0051ea14..d9dacbbce8 100644 --- a/bson/mgoregistry_test.go +++ b/bson/mgoregistry_test.go @@ -485,7 +485,7 @@ func (t *prefixPtr) SetBSON(raw RawValue) error { if err != nil { return err } - vr := newValueReader(raw.Type, bytes.NewReader(raw.Value)) + vr := newBufferedValueReader(raw.Type, raw.Value) err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval) if err != nil { return err @@ -512,7 +512,7 @@ func (t *prefixVal) SetBSON(raw RawValue) error { if err != nil { return err } - vr := newValueReader(raw.Type, bytes.NewReader(raw.Value)) + vr := newBufferedValueReader(raw.Type, raw.Value) err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval) if err != nil { return err @@ -936,7 +936,7 @@ func (o *setterType) SetBSON(raw RawValue) error { if raw.Type == 0x00 { raw.Type = TypeEmbeddedDocument } - vr := newValueReader(raw.Type, bytes.NewReader(raw.Value)) + vr := newBufferedValueReader(raw.Type, raw.Value) err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval) if err != nil { return err @@ -1289,7 +1289,7 @@ func (s *getterSetterD) SetBSON(raw RawValue) error { if raw.Type == 0x00 { raw.Type = TypeEmbeddedDocument } - vr := newValueReader(raw.Type, bytes.NewReader(raw.Value)) + vr := newBufferedValueReader(raw.Type, raw.Value) err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval) if err != nil { return err @@ -1315,7 +1315,7 @@ func (i *getterSetterInt) SetBSON(raw RawValue) error { if raw.Type == 0x00 { raw.Type = TypeEmbeddedDocument } - vr := newValueReader(raw.Type, bytes.NewReader(raw.Value)) + vr := newBufferedValueReader(raw.Type, raw.Value) err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval) if err != nil { return err @@ -1337,7 +1337,7 @@ func (s *ifaceSlice) SetBSON(raw RawValue) error { if err != nil { return err } - vr := newValueReader(raw.Type, bytes.NewReader(raw.Value)) + vr := newBufferedValueReader(raw.Type, raw.Value) err = decoder.DecodeValue(DecodeContext{Registry: NewMgoRegistry()}, vr, rval) if err != nil { return err diff --git a/bson/raw_value.go b/bson/raw_value.go index 53f053f03e..84b737128e 100644 --- a/bson/raw_value.go +++ b/bson/raw_value.go @@ -71,7 +71,7 @@ func (rv RawValue) UnmarshalWithRegistry(r *Registry, val interface{}) error { return ErrNilRegistry } - vr := newValueReader(rv.Type, bytes.NewReader(rv.Value)) + vr := newBufferedValueReader(rv.Type, rv.Value) rval := reflect.ValueOf(val) if rval.Kind() != reflect.Ptr { return fmt.Errorf("argument to Unmarshal* must be a pointer to a type, but got %v", rval) @@ -91,7 +91,7 @@ func (rv RawValue) UnmarshalWithContext(dc *DecodeContext, val interface{}) erro return ErrNilContext } - vr := newValueReader(rv.Type, bytes.NewReader(rv.Value)) + vr := newBufferedValueReader(rv.Type, rv.Value) rval := reflect.ValueOf(val) if rval.Kind() != reflect.Ptr { return fmt.Errorf("argument to Unmarshal* must be a pointer to a type, but got %v", rval) diff --git a/bson/unmarshal.go b/bson/unmarshal.go index d3af2cd1d2..5597bc7a92 100644 --- a/bson/unmarshal.go +++ b/bson/unmarshal.go @@ -42,8 +42,8 @@ type ValueUnmarshaler interface { // When unmarshaling BSON, if the BSON value is null and the Go value is a // pointer, the pointer is set to nil without calling UnmarshalBSONValue. func Unmarshal(data []byte, val interface{}) error { - vr := getBufferedDocumentReader(bytes.NewReader(data)) - defer putDocumentReader(vr) + vr := getBufferedDocumentReader(data) + defer putBufferedDocumentReader(vr) if l, err := vr.peekLength(); err != nil { return err @@ -57,7 +57,7 @@ func Unmarshal(data []byte, val interface{}) error { // stores the result in the value pointed to by val. If val is nil or not a pointer, // UnmarshalValue returns an error. func UnmarshalValue(t Type, data []byte, val interface{}) error { - vr := newValueReader(t, bytes.NewReader(data)) + vr := newBufferedValueReader(t, data) return unmarshalFromReader(DecodeContext{Registry: defaultRegistry}, vr, val) } diff --git a/bson/unmarshal_test.go b/bson/unmarshal_test.go index 06dfe4ff22..702abcd152 100644 --- a/bson/unmarshal_test.go +++ b/bson/unmarshal_test.go @@ -71,7 +71,7 @@ func TestUnmarshalWithRegistry(t *testing.T) { // Assert that unmarshaling the input data results in the expected value. gotValue := reflect.New(reflect.TypeOf(tc.val)) - dec := NewDecoder(newValueReader(tc.bsontype, bytes.NewReader(tc.bytes))) + dec := NewDecoder(newBufferedValueReader(tc.bsontype, tc.bytes)) dec.SetRegistry(reg) err := dec.Decode(gotValue.Interface()) noerr(t, err) diff --git a/bson/unmarshal_value_test.go b/bson/unmarshal_value_test.go index b9132221d9..a10474f601 100644 --- a/bson/unmarshal_value_test.go +++ b/bson/unmarshal_value_test.go @@ -7,7 +7,6 @@ package bson import ( - "bytes" "reflect" "strings" "testing" @@ -80,7 +79,7 @@ func BenchmarkSliceCodecUnmarshal(b *testing.B) { dec := NewDecoder(nil) dec.SetRegistry(reg) for pb.Next() { - dec.Reset(newValueReader(bm.bsontype, bytes.NewReader(bm.bytes))) + dec.Reset(newBufferedValueReader(bm.bsontype, bm.bytes)) err := dec.Decode(&[]byte{}) if err != nil { b.Fatal(err) diff --git a/bson/unmarshaling_cases_test.go b/bson/unmarshaling_cases_test.go index 6e84f80e28..61dbe9dc2a 100644 --- a/bson/unmarshaling_cases_test.go +++ b/bson/unmarshaling_cases_test.go @@ -7,7 +7,6 @@ package bson import ( - "bytes" "reflect" ) @@ -269,7 +268,7 @@ func (mi *myInt64) UnmarshalBSONValue(t byte, b []byte) error { } if Type(t) == TypeInt64 { - i, err := newValueReader(TypeInt64, bytes.NewReader(b)).ReadInt64() + i, err := newBufferedValueReader(TypeInt64, b).ReadInt64() if err != nil { return err } @@ -284,7 +283,7 @@ func (mi *myInt64) UnmarshalBSON(b []byte) error { if len(b) == 0 { return nil } - i, err := newValueReader(TypeInt64, bytes.NewReader(b)).ReadInt64() + i, err := newBufferedValueReader(TypeInt64, b).ReadInt64() if err != nil { return err } @@ -310,7 +309,7 @@ func (mb *myBytes) UnmarshalBSON(b []byte) error { if len(b) == 0 { return nil } - b, _, err := newValueReader(TypeBinary, bytes.NewReader(b)).ReadBinary() + b, _, err := newBufferedValueReader(TypeBinary, b).ReadBinary() if err != nil { return err } @@ -324,7 +323,7 @@ func (ms *myString) UnmarshalBSON(b []byte) error { if len(b) == 0 { return nil } - s, err := newValueReader(TypeString, bytes.NewReader(b)).ReadString() + s, err := newBufferedValueReader(TypeString, b).ReadString() if err != nil { return err } diff --git a/bson/value_reader.go b/bson/value_reader.go index dda31adcde..8d3726fafe 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -117,10 +117,14 @@ func NewDocumentReader(r io.Reader) ValueReader { } } -// newValueReader returns a ValueReader that starts in the Value mode instead of in top -// level document mode. This enables the creation of a ValueReader for a single BSON value. -func newValueReader(t Type, r io.Reader) ValueReader { - panic("TODO") +// newBufferedValueReader returns a ValueReader that starts in the Value mode +// instead of in top level document mode. This enables the creation of a +// ValueReader for a single BSON value. +func newBufferedValueReader(t Type, b []byte) ValueReader { + bVR := newBufferedDocumentReader(b) + bVR.stack[0].vType = t + + return bVR } func newBufferedDocumentReader(b []byte) *valueReader { From 620eb5a5a137248ed2b293e348c90b525e9977aa Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 14:12:16 -0600 Subject: [PATCH 40/42] Update tests to support bRV + streaming --- bson/value_reader.go | 27 +++++++----- bson/value_reader_test.go | 92 +++++++++++++++++++-------------------- 2 files changed, 60 insertions(+), 59 deletions(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 8d3726fafe..7f1e24ae12 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -122,7 +122,9 @@ func NewDocumentReader(r io.Reader) ValueReader { // ValueReader for a single BSON value. func newBufferedValueReader(t Type, b []byte) ValueReader { bVR := newBufferedDocumentReader(b) + bVR.stack[0].vType = t + bVR.stack[0].mode = mValue return bVR } @@ -243,7 +245,7 @@ func (vr *valueReader) Type() Type { // peekLength returns the length of the next value in the stream without // offsetting the reader position. -func peekNextValueSize(vr *valueReader, dst []byte) (int32, error) { +func peekNextValueSize(vr *valueReader) (int32, error) { var length int32 var err error switch vr.stack[vr.frame].vType { @@ -313,32 +315,31 @@ func (vr *valueReader) readBytes(n int32) ([]byte, error) { return readBytes(vr.src, int(n)) } -// readValueBytes returns the raw bytes of the next value (or top‐level -// document) without allocating intermediary buffers, then pops the frame. func (vr *valueReader) readValueBytes(dst []byte) (Type, []byte, error) { switch vr.stack[vr.frame].mode { case mTopLevel: length, err := vr.peekLength() if err != nil { - return Type(0), nil, err + return 0, nil, err } b, err := vr.readBytes(length) return Type(0), append(dst, b...), err case mElement, mValue: - length, err := peekNextValueSize(vr, dst) + t := vr.stack[vr.frame].vType + + length, err := peekNextValueSize(vr) if err != nil { - return Type(0), dst, err + return t, dst, err } b, err := vr.readBytes(length) - t := vr.stack[vr.frame].vType - err = vr.pop() - if err != nil { + if err := vr.pop(); err != nil { return Type(0), nil, err } return t, append(dst, b...), err + default: return Type(0), nil, vr.invalidTransitionErr(0, "readValueBytes", []mode{mElement, mValue}) } @@ -351,7 +352,7 @@ func (vr *valueReader) Skip() error { return vr.invalidTransitionErr(0, "Skip", []mode{mElement, mValue}) } - length, err := peekNextValueSize(vr, nil) + length, err := peekNextValueSize(vr) if err != nil { return err } @@ -418,11 +419,15 @@ func (vr *valueReader) ReadBinary() ([]byte, byte, error) { return nil, 0, err } + // copy so user doesn’t share underlying buffer + cp := make([]byte, len(b)) + copy(cp, b) + if err := vr.pop(); err != nil { return nil, 0, err } - return b, btype, nil + return cp, btype, nil } // ReadBoolean reads a BSON boolean value, returning true or false, advancing diff --git a/bson/value_reader_test.go b/bson/value_reader_test.go index 852a291dd4..e4635771d2 100644 --- a/bson/value_reader_test.go +++ b/bson/value_reader_test.go @@ -7,7 +7,6 @@ package bson import ( - "bufio" "bytes" _ "embed" "errors" @@ -81,7 +80,7 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.data)), + src: &bufferedValueReader{buf: tc.data}, stack: []vrState{ {mode: mTopLevel}, { @@ -146,7 +145,7 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.data)), + src: &bufferedValueReader{buf: tc.data}, stack: []vrState{ {mode: mTopLevel}, { @@ -176,16 +175,16 @@ func TestValueReader(t *testing.T) { } // invalid length - vr.r = bufio.NewReader(bytes.NewReader([]byte{0x00, 0x00})) + vr.src = &bufferedValueReader{buf: []byte{0x00, 0x00}} _, err := vr.ReadDocument() - if !errors.Is(err, io.ErrUnexpectedEOF) { + if !errors.Is(err, io.EOF) { t.Errorf("Expected io.ErrUnexpectedEOF with document length too small. got %v; want %v", err, io.EOF) } - if vr.offset != 0 { - t.Errorf("Expected 0 offset. got %d", vr.offset) + if vr.src.pos() != 0 { + t.Errorf("Expected 0 offset. got %d", vr.src.pos()) } - vr.r = bufio.NewReader(bytes.NewReader(doc)) + vr.src = &bufferedValueReader{buf: doc} _, err = vr.ReadDocument() noerr(t, err) if vr.stack[vr.frame].end != 5 { @@ -215,8 +214,10 @@ func TestValueReader(t *testing.T) { } vr.stack[1].mode, vr.stack[1].vType = mElement, TypeEmbeddedDocument - vr.offset = 4 - vr.r = bufio.NewReader(bytes.NewReader([]byte{0x05, 0x00, 0x00, 0x00, 0x00, 0x00})) + vr.src = &bufferedValueReader{ + buf: []byte{0x0A, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00}, + offset: 4, + } _, err = vr.ReadDocument() noerr(t, err) if len(vr.stack) != 3 { @@ -228,13 +229,13 @@ func TestValueReader(t *testing.T) { if vr.stack[2].end != 9 { t.Errorf("End of embedded document is not correct. got %d; want %d", vr.stack[2].end, 9) } - if vr.offset != 8 { - t.Errorf("Offset not incremented correctly. got %d; want %d", vr.offset, 8) + if vr.src.pos() != 8 { + t.Errorf("Offset not incremented correctly. got %d; want %d", vr.src.pos(), 8) } vr.frame-- _, err = vr.ReadDocument() - if !errors.Is(err, io.ErrUnexpectedEOF) { + if !errors.Is(err, io.EOF) { t.Errorf("Should return error when attempting to read length with not enough bytes. got %v; want %v", err, io.EOF) } }) @@ -312,7 +313,7 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.data)), + src: &bufferedValueReader{buf: tc.data}, stack: []vrState{ {mode: mTopLevel}, { @@ -331,9 +332,11 @@ func TestValueReader(t *testing.T) { } t.Run("success", func(t *testing.T) { + doc := []byte{0x00, 0x00, 0x00, 0x00} + doc = append(doc, codeWithScope...) + doc = append(doc, 0x00) vr := &valueReader{ - offset: 4, - r: bufio.NewReader(bytes.NewReader(codeWithScope)), + src: &bufferedValueReader{buf: doc, offset: 4}, stack: []vrState{ {mode: mTopLevel}, {mode: mElement, vType: TypeCodeWithScope}, @@ -355,8 +358,8 @@ func TestValueReader(t *testing.T) { if vr.stack[2].end != 21 { t.Errorf("End of scope is not correct. got %d; want %d", vr.stack[2].end, 21) } - if vr.offset != 20 { - t.Errorf("Offset not incremented correctly. got %d; want %d", vr.offset, 20) + if vr.src.pos() != 20 { + t.Errorf("Offset not incremented correctly. got %d; want %d", vr.src.pos(), 20) } }) }) @@ -417,7 +420,7 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.data)), + src: &bufferedValueReader{buf: tc.data}, stack: []vrState{ {mode: mTopLevel}, { @@ -475,7 +478,7 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.data)), + src: &bufferedValueReader{buf: tc.data}, stack: []vrState{ {mode: mTopLevel}, { @@ -533,7 +536,7 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.data)), + src: &bufferedValueReader{buf: tc.data}, stack: []vrState{ {mode: mTopLevel}, { @@ -593,7 +596,7 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.data)), + src: &bufferedValueReader{buf: tc.data}, stack: []vrState{ {mode: mTopLevel}, { @@ -648,7 +651,7 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.data)), + src: &bufferedValueReader{buf: tc.data}, stack: []vrState{ {mode: mTopLevel}, { @@ -703,7 +706,7 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.data)), + src: &bufferedValueReader{buf: tc.data}, stack: []vrState{ {mode: mTopLevel}, { @@ -778,7 +781,7 @@ func TestValueReader(t *testing.T) { append([]byte{0x40, 0x27, 0x00, 0x00}, testcstring...), (*valueReader).ReadString, "", - io.ErrUnexpectedEOF, + io.EOF, TypeString, }, { @@ -858,7 +861,7 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.data)), + src: &bufferedValueReader{buf: tc.data}, stack: []vrState{ {mode: mTopLevel}, { @@ -939,6 +942,7 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { vr := &valueReader{ + src: &bufferedValueReader{buf: []byte{}}, stack: []vrState{ {mode: mTopLevel}, { @@ -990,7 +994,7 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.data)), + src: &bufferedValueReader{buf: tc.data}, stack: []vrState{ {mode: mTopLevel}, { @@ -1057,7 +1061,7 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.data)), + src: &bufferedValueReader{buf: tc.data}, stack: []vrState{ {mode: mTopLevel}, { @@ -1127,7 +1131,7 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.data)), + src: &bufferedValueReader{buf: tc.data}, stack: []vrState{ {mode: mTopLevel}, { @@ -1347,46 +1351,38 @@ func TestValueReader(t *testing.T) { const startingEnd = 64 t.Run("Skip", func(t *testing.T) { vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.data[tc.startingOffset:tc.offset])), + src: &bufferedValueReader{buf: tc.data, offset: tc.startingOffset}, stack: []vrState{ {mode: mTopLevel, end: startingEnd}, {mode: mElement, vType: tc.t}, }, - frame: 1, - offset: tc.startingOffset, + frame: 1, } err := vr.Skip() if !errequal(t, err, tc.err) { t.Errorf("Did not receive expected error; got %v; want %v", err, tc.err) } - if tc.err == nil { - offset := startingEnd - vr.stack[0].end - if offset != tc.offset { - t.Errorf("Offset not set at correct position; got %d; want %d", offset, tc.offset) - } + if tc.err == nil && vr.src.pos() != tc.offset { + t.Errorf("Offset not set at correct position; got %d; want %d", vr.src.pos(), tc.offset) } }) t.Run("ReadBytes", func(t *testing.T) { vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.data[tc.startingOffset:tc.offset])), + src: &bufferedValueReader{buf: tc.data, offset: tc.startingOffset}, stack: []vrState{ {mode: mTopLevel, end: startingEnd}, {mode: mElement, vType: tc.t}, }, - frame: 1, - offset: tc.startingOffset, + frame: 1, } _, got, err := vr.readValueBytes(nil) if !errequal(t, err, tc.err) { t.Errorf("Did not receive expected error; got %v; want %v", err, tc.err) } - if tc.err == nil { - offset := startingEnd - vr.stack[0].end - if offset != tc.offset { - t.Errorf("Offset not set at correct position; got %d; want %d", vr.offset, tc.offset) - } + if tc.err == nil && vr.src.pos() != tc.offset { + t.Errorf("Offset not set at correct position; got %d; want %d", vr.src.pos(), tc.offset) } if tc.err == nil && !bytes.Equal(got, tc.data[tc.startingOffset:]) { t.Errorf("Did not receive expected bytes. got %v; want %v", got, tc.data[tc.startingOffset:]) @@ -1417,7 +1413,7 @@ func TestValueReader(t *testing.T) { "append bytes", []byte{0x01, 0x02, 0x03, 0x04}, Type(0), - io.ErrUnexpectedEOF, + io.EOF, }, } @@ -1426,7 +1422,7 @@ func TestValueReader(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() vr := &valueReader{ - r: bufio.NewReader(bytes.NewReader(tc.want)), + src: &bufferedValueReader{buf: tc.want}, stack: []vrState{ {mode: mTopLevel}, }, @@ -1460,7 +1456,7 @@ func TestValueReader(t *testing.T) { t.Run("ReadBytes", func(t *testing.T) { vr := &valueReader{stack: []vrState{{mode: mTopLevel}, {mode: mDocument}}, frame: 1} wanterr := (&valueReader{stack: []vrState{{mode: mTopLevel}, {mode: mDocument}}, frame: 1}). - invalidTransitionErr(0, "ReadValueBytes", []mode{mElement, mValue}) + invalidTransitionErr(0, "readValueBytes", []mode{mElement, mValue}) _, _, goterr := vr.readValueBytes(nil) if !cmp.Equal(goterr, wanterr, cmp.Comparer(assert.CompareErrors)) { t.Errorf("Expected correct invalid transition error. got %v; want %v", goterr, wanterr) From 7e4418645e19130e60d25e46e3630e7c1fb82cea Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Mon, 14 Jul 2025 15:58:22 -0600 Subject: [PATCH 41/42] Update bson/value_reader.go Co-authored-by: Matt Dale <9760375+matthewdale@users.noreply.github.com> --- bson/value_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bson/value_reader.go b/bson/value_reader.go index 7f1e24ae12..8ace53f4c0 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -243,7 +243,7 @@ func (vr *valueReader) Type() Type { return vr.stack[vr.frame].vType } -// peekLength returns the length of the next value in the stream without +// peekNextValueSize returns the length of the next value in the stream without // offsetting the reader position. func peekNextValueSize(vr *valueReader) (int32, error) { var length int32 From 383f64dae9dd0fb901806d23d9b686acbbfc5a64 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Mon, 14 Jul 2025 18:49:51 -0600 Subject: [PATCH 42/42] Extend valueReader tests for both streaming and buffered --- bson/buffered_value_reader.go | 2 +- bson/streaming_value_reader.go | 10 +- bson/value_reader.go | 15 +- bson/value_reader_test.go | 1125 ++++++++++++++++++++++---------- 4 files changed, 792 insertions(+), 360 deletions(-) diff --git a/bson/buffered_value_reader.go b/bson/buffered_value_reader.go index 76695b71ad..f155300adf 100644 --- a/bson/buffered_value_reader.go +++ b/bson/buffered_value_reader.go @@ -23,7 +23,7 @@ var _ valueReaderByteSrc = (*bufferedValueReader)(nil) // Read reads up to len(p) bytes from the in-memory buffer, advancing the offset // by the number of bytes read. -func (b *bufferedValueReader) Read(p []byte) (int, error) { +func (b *bufferedValueReader) readExact(p []byte) (int, error) { if b.offset >= int64(len(b.buf)) { return 0, io.EOF } diff --git a/bson/streaming_value_reader.go b/bson/streaming_value_reader.go index 718d4e46dd..cf84153f3f 100644 --- a/bson/streaming_value_reader.go +++ b/bson/streaming_value_reader.go @@ -8,6 +8,7 @@ package bson import ( "bufio" + "io" ) // streamingValueReader reads from an ioReader wrapped in a bufio.Reader. It @@ -25,9 +26,12 @@ var _ valueReaderByteSrc = (*streamingValueReader)(nil) // Read reads up to len(p) bytes from the underlying bufio.Reader, advancing // the offset by the number of bytes read. -func (s *streamingValueReader) Read(p []byte) (int, error) { - n, err := s.br.Read(p) - s.offset += int64(n) +func (s *streamingValueReader) readExact(p []byte) (int, error) { + n, err := io.ReadFull(s.br, p) + if err == nil { + s.offset += int64(n) + } + return n, err } diff --git a/bson/value_reader.go b/bson/value_reader.go index 8ace53f4c0..1cc5dde8cf 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -17,9 +17,10 @@ import ( ) type valueReaderByteSrc interface { - io.Reader io.ByteReader + readExact(p []byte) (int, error) + // Peek returns the next n bytes without advancing the cursor. It must return // exactly n bytes or n error if fewer are available. peek(n int) ([]byte, error) @@ -61,12 +62,6 @@ type vrState struct { end int64 } -var bufioReaderPool = sync.Pool{ - New: func() interface{} { - return bufio.NewReader(nil) - }, -} - var vrPool = sync.Pool{ New: func() interface{} { return &valueReader{ @@ -286,10 +281,7 @@ func peekNextValueSize(vr *valueReader) (int32, error) { func readBytes(src valueReaderByteSrc, n int) ([]byte, error) { if src.streamable() { data := make([]byte, n) - if _, err := io.ReadFull(src, data); err != nil { - if errors.Is(err, io.ErrUnexpectedEOF) { - err = io.EOF // Convert io.ErrUnexpectedEOF to io.EOF for consistency. - } + if _, err := src.readExact(data); err != nil { return nil, err } @@ -315,6 +307,7 @@ func (vr *valueReader) readBytes(n int32) ([]byte, error) { return readBytes(vr.src, int(n)) } +//nolint:unparam func (vr *valueReader) readValueBytes(dst []byte) (Type, []byte, error) { switch vr.stack[vr.frame].mode { case mTopLevel: diff --git a/bson/value_reader_test.go b/bson/value_reader_test.go index e4635771d2..9b77df12d4 100644 --- a/bson/value_reader_test.go +++ b/bson/value_reader_test.go @@ -7,6 +7,7 @@ package bson import ( + "bufio" "bytes" _ "embed" "errors" @@ -26,6 +27,26 @@ var lorem []byte var testcstring = append(lorem, []byte{0x00}...) func TestValueReader(t *testing.T) { + type subtest struct { + name string + src valueReaderByteSrc + } + + newSubtests := func(b []byte) []subtest { + return []subtest{ + {name: "buffered", src: &bufferedValueReader{buf: b}}, + {name: "streaming", src: &streamingValueReader{br: bufio.NewReader(bytes.NewReader(b))}}, + } + } + + runSubtests := func(t *testing.T, b []byte, fn func(*testing.T, valueReaderByteSrc)) { + for _, stt := range newSubtests(b) { + t.Run(stt.name, func(t *testing.T) { + fn(t, stt.src) + }) + } + } + t.Run("ReadBinary", func(t *testing.T) { testCases := []struct { name string @@ -79,28 +100,30 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - vr := &valueReader{ - src: &bufferedValueReader{buf: tc.data}, - stack: []vrState{ - {mode: mTopLevel}, - { - mode: mElement, - vType: tc.vType, + runSubtests(t, tc.data, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + src: src, + stack: []vrState{ + {mode: mTopLevel}, + { + mode: mElement, + vType: tc.vType, + }, }, - }, - frame: 1, - } + frame: 1, + } - b, btype, err := vr.ReadBinary() - if !errequal(t, err, tc.err) { - t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) - } - if btype != tc.btype { - t.Errorf("Incorrect binary type returned. got %v; want %v", btype, tc.btype) - } - if !bytes.Equal(b, tc.b) { - t.Errorf("Binary data does not match. got %v; want %v", b, tc.b) - } + b, btype, err := vr.ReadBinary() + if !errequal(t, err, tc.err) { + t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) + } + if btype != tc.btype { + t.Errorf("Incorrect binary type returned. got %v; want %v", btype, tc.btype) + } + if !bytes.Equal(b, tc.b) { + t.Errorf("Binary data does not match. got %v; want %v", b, tc.b) + } + }) }) } }) @@ -144,100 +167,142 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - vr := &valueReader{ - src: &bufferedValueReader{buf: tc.data}, - stack: []vrState{ - {mode: mTopLevel}, - { - mode: mElement, - vType: tc.vType, + runSubtests(t, tc.data, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + src: src, + stack: []vrState{ + {mode: mTopLevel}, + { + mode: mElement, + vType: tc.vType, + }, }, - }, - frame: 1, - } + frame: 1, + } - boolean, err := vr.ReadBoolean() - if !errequal(t, err, tc.err) { - t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) - } - if boolean != tc.boolean { - t.Errorf("Incorrect boolean returned. got %v; want %v", boolean, tc.boolean) - } + boolean, err := vr.ReadBoolean() + if !errequal(t, err, tc.err) { + t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) + } + if boolean != tc.boolean { + t.Errorf("Incorrect boolean returned. got %v; want %v", boolean, tc.boolean) + } + }) }) } }) t.Run("ReadDocument", func(t *testing.T) { t.Run("TopLevel", func(t *testing.T) { - doc := []byte{0x05, 0x00, 0x00, 0x00, 0x00} - vr := &valueReader{ - stack: []vrState{{mode: mTopLevel}}, - frame: 0, - } // invalid length - vr.src = &bufferedValueReader{buf: []byte{0x00, 0x00}} - _, err := vr.ReadDocument() - if !errors.Is(err, io.EOF) { - t.Errorf("Expected io.ErrUnexpectedEOF with document length too small. got %v; want %v", err, io.EOF) - } - if vr.src.pos() != 0 { - t.Errorf("Expected 0 offset. got %d", vr.src.pos()) - } + t.Run("invalid length", func(t *testing.T) { + runSubtests(t, []byte{0x00, 0x00}, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + stack: []vrState{{mode: mTopLevel}}, + frame: 0, + src: src, + } - vr.src = &bufferedValueReader{buf: doc} - _, err = vr.ReadDocument() - noerr(t, err) - if vr.stack[vr.frame].end != 5 { - t.Errorf("Incorrect end for document. got %d; want %d", vr.stack[vr.frame].end, 5) - } + var wantErr error + switch vr.src.(type) { + case *bufferedValueReader: + // The buffered case uses a peek+discard which returns io.EOF if the + // length is too small + wantErr = io.EOF + case *streamingValueReader: + // The streaming case actually attempts to read the length from the + // buffer + wantErr = io.ErrUnexpectedEOF + } + + _, err := vr.ReadDocument() + if !errors.Is(err, wantErr) { + t.Errorf("Expected io.ErrUnexpectedEOF with document length too small. got %v; want %v", err, io.EOF) + } + if vr.src.pos() != 0 { + t.Errorf("Expected 0 offset. got %d", vr.src.pos()) + } + }) + }) + + t.Run("valid document with incorrect end", func(t *testing.T) { + runSubtests(t, []byte{0x05, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + stack: []vrState{{mode: mTopLevel}}, + frame: 0, + src: src, + } + + _, err := vr.ReadDocument() + noerr(t, err) + if vr.stack[vr.frame].end != 5 { + t.Errorf("Incorrect end for document. got %d; want %d", vr.stack[vr.frame].end, 5) + } + }) + }) }) + t.Run("EmbeddedDocument", func(t *testing.T) { - vr := &valueReader{ - stack: []vrState{ - {mode: mTopLevel}, - {mode: mElement, vType: TypeBoolean}, - }, - frame: 1, - } + runSubtests(t, []byte{0x0a, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + stack: []vrState{ + {mode: mTopLevel}, + {mode: mElement, vType: TypeBoolean}, + }, + frame: 1, + } - var wanterr = (&valueReader{stack: []vrState{{mode: mElement, vType: TypeBoolean}}}).typeError(TypeEmbeddedDocument) - _, err := vr.ReadDocument() - if err == nil || err.Error() != wanterr.Error() { - t.Errorf("Incorrect returned error. got %v; want %v", err, wanterr) - } + var wanterr = (&valueReader{stack: []vrState{{mode: mElement, vType: TypeBoolean}}}).typeError(TypeEmbeddedDocument) + _, err := vr.ReadDocument() + if err == nil || err.Error() != wanterr.Error() { + t.Errorf("Incorrect returned error. got %v; want %v", err, wanterr) + } - vr.stack[1].mode = mArray - wanterr = vr.invalidTransitionErr(mDocument, "ReadDocument", []mode{mTopLevel, mElement, mValue}) - _, err = vr.ReadDocument() - if err == nil || err.Error() != wanterr.Error() { - t.Errorf("Incorrect returned error. got %v; want %v", err, wanterr) - } + vr.stack[1].mode = mArray + wanterr = vr.invalidTransitionErr(mDocument, "ReadDocument", []mode{mTopLevel, mElement, mValue}) + _, err = vr.ReadDocument() + if err == nil || err.Error() != wanterr.Error() { + t.Errorf("Incorrect returned error. got %v; want %v", err, wanterr) + } - vr.stack[1].mode, vr.stack[1].vType = mElement, TypeEmbeddedDocument - vr.src = &bufferedValueReader{ - buf: []byte{0x0A, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00}, - offset: 4, - } - _, err = vr.ReadDocument() - noerr(t, err) - if len(vr.stack) != 3 { - t.Errorf("Incorrect number of stack frames. got %d; want %d", len(vr.stack), 3) - } - if vr.stack[2].mode != mDocument { - t.Errorf("Incorrect mode set. got %v; want %v", vr.stack[2].mode, mDocument) - } - if vr.stack[2].end != 9 { - t.Errorf("End of embedded document is not correct. got %d; want %d", vr.stack[2].end, 9) - } - if vr.src.pos() != 8 { - t.Errorf("Offset not incremented correctly. got %d; want %d", vr.src.pos(), 8) - } + vr.stack[1].mode, vr.stack[1].vType = mElement, TypeEmbeddedDocument + vr.src = src + _, _ = vr.src.discard(4) - vr.frame-- - _, err = vr.ReadDocument() - if !errors.Is(err, io.EOF) { - t.Errorf("Should return error when attempting to read length with not enough bytes. got %v; want %v", err, io.EOF) - } + _, err = vr.ReadDocument() + noerr(t, err) + if len(vr.stack) != 3 { + t.Errorf("Incorrect number of stack frames. got %d; want %d", len(vr.stack), 3) + } + if vr.stack[2].mode != mDocument { + t.Errorf("Incorrect mode set. got %v; want %v", vr.stack[2].mode, mDocument) + } + if vr.stack[2].end != 9 { + t.Errorf("End of embedded document is not correct. got %d; want %d", vr.stack[2].end, 9) + } + if vr.src.pos() != 8 { + t.Errorf("Offset not incremented correctly. got %d; want %d", vr.src.pos(), 8) + } + + vr.frame-- + _, err = vr.ReadDocument() + + var wantErr error + switch vr.src.(type) { + case *bufferedValueReader: + // The buffered case uses a peek+discard which returns io.EOF if the + // length is too small + wantErr = io.EOF + case *streamingValueReader: + // The streaming case actually attempts to read the length from the + // buffer + wantErr = io.ErrUnexpectedEOF + } + + if !errors.Is(err, wantErr) { + t.Errorf("Should return error when attempting to read length with not enough bytes. got %v; want %v", err, io.EOF) + } + }) }) }) t.Run("ReadCodeWithScope", func(t *testing.T) { @@ -312,22 +377,24 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - vr := &valueReader{ - src: &bufferedValueReader{buf: tc.data}, - stack: []vrState{ - {mode: mTopLevel}, - { - mode: mElement, - vType: tc.vType, + runSubtests(t, tc.data, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + src: src, + stack: []vrState{ + {mode: mTopLevel}, + { + mode: mElement, + vType: tc.vType, + }, }, - }, - frame: 1, - } + frame: 1, + } - _, _, err := vr.ReadCodeWithScope() - if !errequal(t, err, tc.err) { - t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) - } + _, _, err := vr.ReadCodeWithScope() + if !errequal(t, err, tc.err) { + t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) + } + }) }) } @@ -335,32 +402,35 @@ func TestValueReader(t *testing.T) { doc := []byte{0x00, 0x00, 0x00, 0x00} doc = append(doc, codeWithScope...) doc = append(doc, 0x00) - vr := &valueReader{ - src: &bufferedValueReader{buf: doc, offset: 4}, - stack: []vrState{ - {mode: mTopLevel}, - {mode: mElement, vType: TypeCodeWithScope}, - }, - frame: 1, - } + runSubtests(t, doc, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + src: src, + stack: []vrState{ + {mode: mTopLevel}, + {mode: mElement, vType: TypeCodeWithScope}, + }, + frame: 1, + } + _, _ = vr.src.discard(4) // discard the document length - code, _, err := vr.ReadCodeWithScope() - noerr(t, err) - if code != "foo" { - t.Errorf("Code does not match. got %s; want %s", code, "foo") - } - if len(vr.stack) != 3 { - t.Errorf("Incorrect number of stack frames. got %d; want %d", len(vr.stack), 3) - } - if vr.stack[2].mode != mCodeWithScope { - t.Errorf("Incorrect mode set. got %v; want %v", vr.stack[2].mode, mDocument) - } - if vr.stack[2].end != 21 { - t.Errorf("End of scope is not correct. got %d; want %d", vr.stack[2].end, 21) - } - if vr.src.pos() != 20 { - t.Errorf("Offset not incremented correctly. got %d; want %d", vr.src.pos(), 20) - } + code, _, err := vr.ReadCodeWithScope() + noerr(t, err) + if code != "foo" { + t.Errorf("Code does not match. got %s; want %s", code, "foo") + } + if len(vr.stack) != 3 { + t.Errorf("Incorrect number of stack frames. got %d; want %d", len(vr.stack), 3) + } + if vr.stack[2].mode != mCodeWithScope { + t.Errorf("Incorrect mode set. got %v; want %v", vr.stack[2].mode, mDocument) + } + if vr.stack[2].end != 21 { + t.Errorf("End of scope is not correct. got %d; want %d", vr.stack[2].end, 21) + } + if vr.src.pos() != 20 { + t.Errorf("Offset not incremented correctly. got %d; want %d", vr.src.pos(), 20) + } + }) }) }) t.Run("ReadDBPointer", func(t *testing.T) { @@ -419,28 +489,30 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - vr := &valueReader{ - src: &bufferedValueReader{buf: tc.data}, - stack: []vrState{ - {mode: mTopLevel}, - { - mode: mElement, - vType: tc.vType, + runSubtests(t, tc.data, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + src: src, + stack: []vrState{ + {mode: mTopLevel}, + { + mode: mElement, + vType: tc.vType, + }, }, - }, - frame: 1, - } + frame: 1, + } - ns, oid, err := vr.ReadDBPointer() - if !errequal(t, err, tc.err) { - t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) - } - if ns != tc.ns { - t.Errorf("Incorrect namespace returned. got %v; want %v", ns, tc.ns) - } - if oid != tc.oid { - t.Errorf("ObjectIDs did not match. got %v; want %v", oid, tc.oid) - } + ns, oid, err := vr.ReadDBPointer() + if !errequal(t, err, tc.err) { + t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) + } + if ns != tc.ns { + t.Errorf("Incorrect namespace returned. got %v; want %v", ns, tc.ns) + } + if oid != tc.oid { + t.Errorf("ObjectIDs did not match. got %v; want %v", oid, tc.oid) + } + }) }) } }) @@ -477,25 +549,27 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - vr := &valueReader{ - src: &bufferedValueReader{buf: tc.data}, - stack: []vrState{ - {mode: mTopLevel}, - { - mode: mElement, - vType: tc.vType, + runSubtests(t, tc.data, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + src: src, + stack: []vrState{ + {mode: mTopLevel}, + { + mode: mElement, + vType: tc.vType, + }, }, - }, - frame: 1, - } + frame: 1, + } - dt, err := vr.ReadDateTime() - if !errequal(t, err, tc.err) { - t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) - } - if dt != tc.dt { - t.Errorf("Incorrect datetime returned. got %d; want %d", dt, tc.dt) - } + dt, err := vr.ReadDateTime() + if !errequal(t, err, tc.err) { + t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) + } + if dt != tc.dt { + t.Errorf("Incorrect datetime returned. got %d; want %d", dt, tc.dt) + } + }) }) } }) @@ -535,30 +609,32 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - vr := &valueReader{ - src: &bufferedValueReader{buf: tc.data}, - stack: []vrState{ - {mode: mTopLevel}, - { - mode: mElement, - vType: tc.vType, + runSubtests(t, tc.data, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + src: src, + stack: []vrState{ + {mode: mTopLevel}, + { + mode: mElement, + vType: tc.vType, + }, }, - }, - frame: 1, - } + frame: 1, + } - dc128, err := vr.ReadDecimal128() - if !errequal(t, err, tc.err) { - t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) - } - gotHigh, gotLow := dc128.GetBytes() - wantHigh, wantLow := tc.dc128.GetBytes() - if gotHigh != wantHigh { - t.Errorf("Retuired high byte does not match. got %d; want %d", gotHigh, wantHigh) - } - if gotLow != wantLow { - t.Errorf("Returned low byte does not match. got %d; want %d", gotLow, wantLow) - } + dc128, err := vr.ReadDecimal128() + if !errequal(t, err, tc.err) { + t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) + } + gotHigh, gotLow := dc128.GetBytes() + wantHigh, wantLow := tc.dc128.GetBytes() + if gotHigh != wantHigh { + t.Errorf("Retuired high byte does not match. got %d; want %d", gotHigh, wantHigh) + } + if gotLow != wantLow { + t.Errorf("Returned low byte does not match. got %d; want %d", gotLow, wantLow) + } + }) }) } }) @@ -595,25 +671,27 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - vr := &valueReader{ - src: &bufferedValueReader{buf: tc.data}, - stack: []vrState{ - {mode: mTopLevel}, - { - mode: mElement, - vType: tc.vType, + runSubtests(t, tc.data, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + src: src, + stack: []vrState{ + {mode: mTopLevel}, + { + mode: mElement, + vType: tc.vType, + }, }, - }, - frame: 1, - } + frame: 1, + } - double, err := vr.ReadDouble() - if !errequal(t, err, tc.err) { - t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) - } - if double != tc.double { - t.Errorf("Incorrect double returned. got %f; want %f", double, tc.double) - } + double, err := vr.ReadDouble() + if !errequal(t, err, tc.err) { + t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) + } + if double != tc.double { + t.Errorf("Incorrect double returned. got %f; want %f", double, tc.double) + } + }) }) } }) @@ -650,25 +728,27 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - vr := &valueReader{ - src: &bufferedValueReader{buf: tc.data}, - stack: []vrState{ - {mode: mTopLevel}, - { - mode: mElement, - vType: tc.vType, + runSubtests(t, tc.data, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + src: src, + stack: []vrState{ + {mode: mTopLevel}, + { + mode: mElement, + vType: tc.vType, + }, }, - }, - frame: 1, - } + frame: 1, + } - i32, err := vr.ReadInt32() - if !errequal(t, err, tc.err) { - t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) - } - if i32 != tc.i32 { - t.Errorf("Incorrect int32 returned. got %d; want %d", i32, tc.i32) - } + i32, err := vr.ReadInt32() + if !errequal(t, err, tc.err) { + t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) + } + if i32 != tc.i32 { + t.Errorf("Incorrect int32 returned. got %d; want %d", i32, tc.i32) + } + }) }) } }) @@ -705,36 +785,39 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - vr := &valueReader{ - src: &bufferedValueReader{buf: tc.data}, - stack: []vrState{ - {mode: mTopLevel}, - { - mode: mElement, - vType: tc.vType, + runSubtests(t, tc.data, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + src: src, + stack: []vrState{ + {mode: mTopLevel}, + { + mode: mElement, + vType: tc.vType, + }, }, - }, - frame: 1, - } + frame: 1, + } - i64, err := vr.ReadInt64() - if !errequal(t, err, tc.err) { - t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) - } - if i64 != tc.i64 { - t.Errorf("Incorrect int64 returned. got %d; want %d", i64, tc.i64) - } + i64, err := vr.ReadInt64() + if !errequal(t, err, tc.err) { + t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) + } + if i64 != tc.i64 { + t.Errorf("Incorrect int64 returned. got %d; want %d", i64, tc.i64) + } + }) }) } }) t.Run("ReadJavascript/ReadString/ReadSymbol", func(t *testing.T) { testCases := []struct { - name string - data []byte - fn func(*valueReader) (string, error) - css string // code, string, symbol :P - err error - vType Type + name string + data []byte + fn func(*valueReader) (string, error) + css string // code, string, symbol :P + streamingErr error + bufferedError error + vType Type }{ { "ReadJavascript/incorrect type", @@ -742,6 +825,7 @@ func TestValueReader(t *testing.T) { (*valueReader).ReadJavascript, "", (&valueReader{stack: []vrState{{vType: TypeEmbeddedDocument}}, frame: 0}).typeError(TypeJavaScript), + (&valueReader{stack: []vrState{{vType: TypeEmbeddedDocument}}, frame: 0}).typeError(TypeJavaScript), TypeEmbeddedDocument, }, { @@ -750,6 +834,7 @@ func TestValueReader(t *testing.T) { (*valueReader).ReadString, "", (&valueReader{stack: []vrState{{vType: TypeEmbeddedDocument}}, frame: 0}).typeError(TypeString), + (&valueReader{stack: []vrState{{vType: TypeEmbeddedDocument}}, frame: 0}).typeError(TypeString), TypeEmbeddedDocument, }, { @@ -758,6 +843,7 @@ func TestValueReader(t *testing.T) { (*valueReader).ReadSymbol, "", (&valueReader{stack: []vrState{{vType: TypeEmbeddedDocument}}, frame: 0}).typeError(TypeSymbol), + (&valueReader{stack: []vrState{{vType: TypeEmbeddedDocument}}, frame: 0}).typeError(TypeSymbol), TypeEmbeddedDocument, }, { @@ -766,6 +852,7 @@ func TestValueReader(t *testing.T) { (*valueReader).ReadJavascript, "", io.EOF, + io.EOF, TypeJavaScript, }, { @@ -774,6 +861,7 @@ func TestValueReader(t *testing.T) { (*valueReader).ReadString, "", io.EOF, + io.EOF, TypeString, }, { @@ -781,6 +869,7 @@ func TestValueReader(t *testing.T) { append([]byte{0x40, 0x27, 0x00, 0x00}, testcstring...), (*valueReader).ReadString, "", + io.ErrUnexpectedEOF, io.EOF, TypeString, }, @@ -790,6 +879,7 @@ func TestValueReader(t *testing.T) { (*valueReader).ReadSymbol, "", io.EOF, + io.EOF, TypeSymbol, }, { @@ -798,6 +888,7 @@ func TestValueReader(t *testing.T) { (*valueReader).ReadJavascript, "", fmt.Errorf("string does not end with null byte, but with %v", 0x05), + fmt.Errorf("string does not end with null byte, but with %v", 0x05), TypeJavaScript, }, { @@ -806,6 +897,7 @@ func TestValueReader(t *testing.T) { (*valueReader).ReadString, "", fmt.Errorf("string does not end with null byte, but with %v", 0x05), + fmt.Errorf("string does not end with null byte, but with %v", 0x05), TypeString, }, { @@ -814,6 +906,7 @@ func TestValueReader(t *testing.T) { (*valueReader).ReadString, "", fmt.Errorf("string does not end with null byte, but with %v", 0x20), + fmt.Errorf("string does not end with null byte, but with %v", 0x20), TypeString, }, { @@ -822,6 +915,7 @@ func TestValueReader(t *testing.T) { (*valueReader).ReadSymbol, "", fmt.Errorf("string does not end with null byte, but with %v", 0x05), + fmt.Errorf("string does not end with null byte, but with %v", 0x05), TypeSymbol, }, { @@ -830,6 +924,7 @@ func TestValueReader(t *testing.T) { (*valueReader).ReadJavascript, "foo", nil, + nil, TypeJavaScript, }, { @@ -838,6 +933,7 @@ func TestValueReader(t *testing.T) { (*valueReader).ReadString, "foo", nil, + nil, TypeString, }, { @@ -846,6 +942,7 @@ func TestValueReader(t *testing.T) { (*valueReader).ReadString, string(testcstring[:len(testcstring)-1]), nil, + nil, TypeString, }, { @@ -854,109 +951,139 @@ func TestValueReader(t *testing.T) { (*valueReader).ReadSymbol, "foo", nil, + nil, TypeSymbol, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - vr := &valueReader{ - src: &bufferedValueReader{buf: tc.data}, - stack: []vrState{ - {mode: mTopLevel}, - { - mode: mElement, - vType: tc.vType, + runSubtests(t, tc.data, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + src: src, + stack: []vrState{ + {mode: mTopLevel}, + { + mode: mElement, + vType: tc.vType, + }, }, - }, - frame: 1, - } + frame: 1, + } - css, err := tc.fn(vr) - if !errequal(t, err, tc.err) { - t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) - } - if css != tc.css { - t.Errorf("Incorrect (JavaScript,String,Symbol) returned. got %s; want %s", css, tc.css) - } + var wantErr error + switch src.(type) { + case *bufferedValueReader: + wantErr = tc.bufferedError + case *streamingValueReader: + wantErr = tc.streamingErr + } + + css, err := tc.fn(vr) + if !errequal(t, err, wantErr) { + t.Errorf("Returned errors do not match. got %v; want %v", err, wantErr) + } + if css != tc.css { + t.Errorf("Incorrect (JavaScript,String,Symbol) returned. got %s; want %s", css, tc.css) + } + }) }) } }) t.Run("ReadMaxKey/ReadMinKey/ReadNull/ReadUndefined", func(t *testing.T) { testCases := []struct { - name string - fn func(*valueReader) error - err error - vType Type + name string + fn func(*valueReader) error + streamingErr error + bufferedErr error + vType Type }{ { "ReadMaxKey/incorrect type", (*valueReader).ReadMaxKey, (&valueReader{stack: []vrState{{vType: TypeEmbeddedDocument}}, frame: 0}).typeError(TypeMaxKey), + (&valueReader{stack: []vrState{{vType: TypeEmbeddedDocument}}, frame: 0}).typeError(TypeMaxKey), TypeEmbeddedDocument, }, { "ReadMaxKey/success", (*valueReader).ReadMaxKey, nil, + nil, TypeMaxKey, }, { "ReadMinKey/incorrect type", (*valueReader).ReadMinKey, (&valueReader{stack: []vrState{{vType: TypeEmbeddedDocument}}, frame: 0}).typeError(TypeMinKey), + (&valueReader{stack: []vrState{{vType: TypeEmbeddedDocument}}, frame: 0}).typeError(TypeMinKey), TypeEmbeddedDocument, }, { "ReadMinKey/success", (*valueReader).ReadMinKey, nil, + nil, TypeMinKey, }, { "ReadNull/incorrect type", (*valueReader).ReadNull, (&valueReader{stack: []vrState{{vType: TypeEmbeddedDocument}}, frame: 0}).typeError(TypeNull), + (&valueReader{stack: []vrState{{vType: TypeEmbeddedDocument}}, frame: 0}).typeError(TypeNull), TypeEmbeddedDocument, }, { "ReadNull/success", (*valueReader).ReadNull, nil, + nil, TypeNull, }, { "ReadUndefined/incorrect type", (*valueReader).ReadUndefined, (&valueReader{stack: []vrState{{vType: TypeEmbeddedDocument}}, frame: 0}).typeError(TypeUndefined), + (&valueReader{stack: []vrState{{vType: TypeEmbeddedDocument}}, frame: 0}).typeError(TypeUndefined), TypeEmbeddedDocument, }, { "ReadUndefined/success", (*valueReader).ReadUndefined, nil, + nil, TypeUndefined, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - vr := &valueReader{ - src: &bufferedValueReader{buf: []byte{}}, - stack: []vrState{ - {mode: mTopLevel}, - { - mode: mElement, - vType: tc.vType, + runSubtests(t, []byte{}, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + src: src, + stack: []vrState{ + {mode: mTopLevel}, + { + mode: mElement, + vType: tc.vType, + }, }, - }, - frame: 1, - } + frame: 1, + } - err := tc.fn(vr) - if !errequal(t, err, tc.err) { - t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) - } + var wantErr error + switch src.(type) { + case *bufferedValueReader: + wantErr = tc.bufferedErr + case *streamingValueReader: + wantErr = tc.streamingErr + } + + err := tc.fn(vr) + if !errequal(t, err, wantErr) { + t.Errorf("Returned errors do not match. got %v; want %v", err, wantErr) + } + }) }) } }) @@ -993,25 +1120,27 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - vr := &valueReader{ - src: &bufferedValueReader{buf: tc.data}, - stack: []vrState{ - {mode: mTopLevel}, - { - mode: mElement, - vType: tc.vType, + runSubtests(t, tc.data, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + src: src, + stack: []vrState{ + {mode: mTopLevel}, + { + mode: mElement, + vType: tc.vType, + }, }, - }, - frame: 1, - } + frame: 1, + } - oid, err := vr.ReadObjectID() - if !errequal(t, err, tc.err) { - t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) - } - if oid != tc.oid { - t.Errorf("ObjectIDs did not match. got %v; want %v", oid, tc.oid) - } + oid, err := vr.ReadObjectID() + if !errequal(t, err, tc.err) { + t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) + } + if oid != tc.oid { + t.Errorf("ObjectIDs did not match. got %v; want %v", oid, tc.oid) + } + }) }) } }) @@ -1060,28 +1189,30 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - vr := &valueReader{ - src: &bufferedValueReader{buf: tc.data}, - stack: []vrState{ - {mode: mTopLevel}, - { - mode: mElement, - vType: tc.vType, + runSubtests(t, tc.data, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + src: src, + stack: []vrState{ + {mode: mTopLevel}, + { + mode: mElement, + vType: tc.vType, + }, }, - }, - frame: 1, - } + frame: 1, + } - pattern, options, err := vr.ReadRegex() - if !errequal(t, err, tc.err) { - t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) - } - if pattern != tc.pattern { - t.Errorf("Incorrect pattern returned. got %s; want %s", pattern, tc.pattern) - } - if options != tc.options { - t.Errorf("Incorrect options returned. got %s; want %s", options, tc.options) - } + pattern, options, err := vr.ReadRegex() + if !errequal(t, err, tc.err) { + t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) + } + if pattern != tc.pattern { + t.Errorf("Incorrect pattern returned. got %s; want %s", pattern, tc.pattern) + } + if options != tc.options { + t.Errorf("Incorrect options returned. got %s; want %s", options, tc.options) + } + }) }) } }) @@ -1130,28 +1261,30 @@ func TestValueReader(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - vr := &valueReader{ - src: &bufferedValueReader{buf: tc.data}, - stack: []vrState{ - {mode: mTopLevel}, - { - mode: mElement, - vType: tc.vType, + runSubtests(t, tc.data, func(t *testing.T, src valueReaderByteSrc) { + vr := &valueReader{ + src: src, + stack: []vrState{ + {mode: mTopLevel}, + { + mode: mElement, + vType: tc.vType, + }, }, - }, - frame: 1, - } + frame: 1, + } - ts, incr, err := vr.ReadTimestamp() - if !errequal(t, err, tc.err) { - t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) - } - if ts != tc.ts { - t.Errorf("Incorrect timestamp returned. got %d; want %d", ts, tc.ts) - } - if incr != tc.incr { - t.Errorf("Incorrect increment returned. got %d; want %d", incr, tc.incr) - } + ts, incr, err := vr.ReadTimestamp() + if !errequal(t, err, tc.err) { + t.Errorf("Returned errors do not match. got %v; want %v", err, tc.err) + } + if ts != tc.ts { + t.Errorf("Incorrect timestamp returned. got %d; want %d", ts, tc.ts) + } + if incr != tc.incr { + t.Errorf("Incorrect increment returned. got %d; want %d", incr, tc.incr) + } + }) }) } }) @@ -1443,6 +1576,308 @@ func TestValueReader(t *testing.T) { }) }) + // This test is too complicated to try to abstract using subtests. + t.Run("ReadBytes & Skip (streaming)", func(t *testing.T) { + index, docb := bsoncore.ReserveLength(nil) + docb = bsoncore.AppendNullElement(docb, "foobar") + docb = append(docb, 0x00) + docb = bsoncore.UpdateLength(docb, index, int32(len(docb))) + cwsbytes := bsoncore.AppendCodeWithScope(nil, "var hellow = world;", docb) + strbytes := []byte{0x04, 0x00, 0x00, 0x00, 'f', 'o', 'o', 0x00} + testCases := []struct { + name string + t Type + data []byte + err error + offset int64 + startingOffset int64 + }{ + { + "Array/invalid length", + TypeArray, + []byte{0x01, 0x02, 0x03}, + io.EOF, 0, 0, + }, + { + "Array/not enough bytes", + TypeArray, + []byte{0x0F, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03}, + io.EOF, 0, 0, + }, + { + "Array/success", + TypeArray, + []byte{0x08, 0x00, 0x00, 0x00, 0x0A, '1', 0x00, 0x00}, + nil, 8, 0, + }, + { + "EmbeddedDocument/invalid length", + TypeEmbeddedDocument, + []byte{0x01, 0x02, 0x03}, + io.EOF, 0, 0, + }, + { + "EmbeddedDocument/not enough bytes", + TypeEmbeddedDocument, + []byte{0x0F, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03}, + io.EOF, 0, 0, + }, + { + "EmbeddedDocument/success", + TypeEmbeddedDocument, + []byte{0x08, 0x00, 0x00, 0x00, 0x0A, 'A', 0x00, 0x00}, + nil, 8, 0, + }, + { + "CodeWithScope/invalid length", + TypeCodeWithScope, + []byte{0x01, 0x02, 0x03}, + io.EOF, 0, 0, + }, + { + "CodeWithScope/not enough bytes", + TypeCodeWithScope, + []byte{0x0F, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03}, + io.EOF, 0, 0, + }, + { + "CodeWithScope/success", + TypeCodeWithScope, + cwsbytes, + nil, 41, 0, + }, + { + "Binary/invalid length", + TypeBinary, + []byte{0x01, 0x02, 0x03}, + io.EOF, 0, 0, + }, + { + "Binary/not enough bytes", + TypeBinary, + []byte{0x0F, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03}, + io.EOF, 0, 0, + }, + { + "Binary/success", + TypeBinary, + []byte{0x03, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03}, + nil, 8, 0, + }, + { + "Boolean/invalid length", + TypeBoolean, + []byte{}, + io.EOF, 0, 0, + }, + { + "Boolean/success", + TypeBoolean, + []byte{0x01}, + nil, 1, 0, + }, + { + "DBPointer/invalid length", + TypeDBPointer, + []byte{0x01, 0x02, 0x03}, + io.EOF, 0, 0, + }, + { + "DBPointer/not enough bytes", + TypeDBPointer, + []byte{0x0F, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03}, + io.EOF, 0, 0, + }, + { + "DBPointer/success", + TypeDBPointer, + []byte{0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C}, + nil, 17, 0, + }, + {"DBPointer/not enough bytes", TypeDateTime, []byte{0x01, 0x02, 0x03, 0x04}, io.EOF, 0, 0}, + {"DBPointer/success", TypeDateTime, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, nil, 8, 0}, + {"Double/not enough bytes", TypeDouble, []byte{0x01, 0x02, 0x03, 0x04}, io.EOF, 0, 0}, + {"Double/success", TypeDouble, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, nil, 8, 0}, + {"Int64/not enough bytes", TypeInt64, []byte{0x01, 0x02, 0x03, 0x04}, io.EOF, 0, 0}, + {"Int64/success", TypeInt64, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, nil, 8, 0}, + {"Timestamp/not enough bytes", TypeTimestamp, []byte{0x01, 0x02, 0x03, 0x04}, io.EOF, 0, 0}, + {"Timestamp/success", TypeTimestamp, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, nil, 8, 0}, + { + "Decimal128/not enough bytes", + TypeDecimal128, + []byte{0x01, 0x02, 0x03, 0x04}, + io.EOF, 0, 0, + }, + { + "Decimal128/success", + TypeDecimal128, + []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10}, + nil, 16, 0, + }, + {"Int32/not enough bytes", TypeInt32, []byte{0x01, 0x02}, io.EOF, 0, 0}, + {"Int32/success", TypeInt32, []byte{0x01, 0x02, 0x03, 0x04}, nil, 4, 0}, + {"Javascript/invalid length", TypeJavaScript, strbytes[:2], io.EOF, 0, 0}, + {"Javascript/not enough bytes", TypeJavaScript, strbytes[:5], io.EOF, 0, 0}, + {"Javascript/success", TypeJavaScript, strbytes, nil, 8, 0}, + {"String/invalid length", TypeString, strbytes[:2], io.EOF, 0, 0}, + {"String/not enough bytes", TypeString, strbytes[:5], io.EOF, 0, 0}, + {"String/success", TypeString, strbytes, nil, 8, 0}, + {"Symbol/invalid length", TypeSymbol, strbytes[:2], io.EOF, 0, 0}, + {"Symbol/not enough bytes", TypeSymbol, strbytes[:5], io.EOF, 0, 0}, + {"Symbol/success", TypeSymbol, strbytes, nil, 8, 0}, + {"MaxKey/success", TypeMaxKey, []byte{}, nil, 0, 0}, + {"MinKey/success", TypeMinKey, []byte{}, nil, 0, 0}, + {"Null/success", TypeNull, []byte{}, nil, 0, 0}, + {"Undefined/success", TypeUndefined, []byte{}, nil, 0, 0}, + { + "ObjectID/not enough bytes", + TypeObjectID, + []byte{0x01, 0x02, 0x03, 0x04}, + io.EOF, 0, 0, + }, + { + "ObjectID/success", + TypeObjectID, + []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C}, + nil, 12, 0, + }, + { + "Regex/not enough bytes (first string)", + TypeRegex, + []byte{'f', 'o', 'o'}, + io.EOF, 0, 0, + }, + { + "Regex/not enough bytes (second string)", + TypeRegex, + []byte{'f', 'o', 'o', 0x00, 'b', 'a', 'r'}, + io.EOF, 0, 0, + }, + { + "Regex/success", + TypeRegex, + []byte{0x00, 0x00, 0x00, 'f', 'o', 'o', 0x00, 'i', 0x00}, + nil, 9, 3, + }, + { + "Unknown Type", + Type(0), + nil, + fmt.Errorf("attempted to read bytes of unknown BSON type %v", Type(0)), 0, 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + const startingEnd = 64 + t.Run("Skip", func(t *testing.T) { + vr := &valueReader{ + src: &streamingValueReader{ + br: bufio.NewReader(bytes.NewReader(tc.data[tc.startingOffset:tc.offset])), + offset: tc.startingOffset, + }, + stack: []vrState{ + {mode: mTopLevel, end: startingEnd}, + {mode: mElement, vType: tc.t}, + }, + frame: 1, + } + + err := vr.Skip() + if !errequal(t, err, tc.err) { + t.Errorf("Did not receive expected error; got %v; want %v", err, tc.err) + } + if tc.err == nil { + offset := startingEnd - vr.stack[0].end + if offset != tc.offset { + t.Errorf("Offset not set at correct position; got %d; want %d", offset, tc.offset) + } + } + }) + t.Run("ReadBytes", func(t *testing.T) { + vr := &valueReader{ + src: &streamingValueReader{ + br: bufio.NewReader(bytes.NewReader(tc.data[tc.startingOffset:tc.offset])), + offset: tc.startingOffset, + }, + stack: []vrState{ + {mode: mTopLevel, end: startingEnd}, + {mode: mElement, vType: tc.t}, + }, + frame: 1, + } + + _, got, err := vr.readValueBytes(nil) + if !errequal(t, err, tc.err) { + t.Errorf("Did not receive expected error; got %v; want %v", err, tc.err) + } + if tc.err == nil { + offset := startingEnd - vr.stack[0].end + if offset != tc.offset { + t.Errorf("Offset not set at correct position; got %d; want %d", vr.offset, tc.offset) + } + } + if tc.err == nil && !bytes.Equal(got, tc.data[tc.startingOffset:]) { + t.Errorf("Did not receive expected bytes. got %v; want %v", got, tc.data[tc.startingOffset:]) + } + }) + }) + } + t.Run("ReadValueBytes/Top Level Doc", func(t *testing.T) { + testCases := []struct { + name string + want []byte + wantType Type + wantErr error + }{ + { + "success", + bsoncore.BuildDocument(nil, bsoncore.AppendDoubleElement(nil, "pi", 3.14159)), + Type(0), + nil, + }, + { + "wrong length", + []byte{0x01, 0x02, 0x03}, + Type(0), + io.EOF, + }, + { + "append bytes", + []byte{0x01, 0x02, 0x03, 0x04}, + Type(0), + io.ErrUnexpectedEOF, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + vr := &valueReader{ + src: &streamingValueReader{ + br: bufio.NewReader(bytes.NewReader(tc.want)), + }, + stack: []vrState{ + {mode: mTopLevel}, + }, + frame: 0, + } + gotType, got, gotErr := vr.readValueBytes(nil) + if !errors.Is(gotErr, tc.wantErr) { + t.Errorf("Did not receive expected error. got %v; want %v", gotErr, tc.wantErr) + } + if tc.wantErr == nil && gotType != tc.wantType { + t.Errorf("Did not receive expected type. got %v; want %v", gotType, tc.wantType) + } + if tc.wantErr == nil && !bytes.Equal(got, tc.want) { + t.Errorf("Did not receive expected bytes. got %v; want %v", got, tc.want) + } + }) + } + }) + }) + t.Run("invalid transition", func(t *testing.T) { t.Run("Skip", func(t *testing.T) { vr := &valueReader{stack: []vrState{{mode: mTopLevel}}}