Skip to content

Commit dff53d0

Browse files
Add streaming support
1 parent 10a069b commit dff53d0

File tree

6 files changed

+197
-90
lines changed

6 files changed

+197
-90
lines changed

bson/buffered_value_reader.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,6 @@ type bufferedValueReader struct {
2121

2222
var _ valueReaderByteSrc = (*bufferedValueReader)(nil)
2323

24-
// Read reads up to len(p) bytes from buf[offset:] into p, advancing offset.
25-
//
26-
// TODO: Depending on streaming requirements, we might be able to remove this.
27-
func (b *bufferedValueReader) Read([]byte) (n int, err error) {
28-
panic("Read not implemented for bufferedValueReader")
29-
}
30-
3124
// ReadByte returns the single byte at buf[offset] and advances offset by 1.
3225
func (b *bufferedValueReader) ReadByte() (byte, error) {
3326
if b.offset >= int64(len(b.buf)) {
@@ -109,3 +102,11 @@ func (b *bufferedValueReader) regexLength() (int32, error) {
109102
// (options) + 2 null terminators
110103
return int32(i + j + 2), nil
111104
}
105+
106+
func (*bufferedValueReader) streamable() bool {
107+
return false
108+
}
109+
110+
func (*bufferedValueReader) reset() {
111+
// No resources to release for bufferedValueReader.
112+
}

bson/copier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func copyValueFromBytes(dst ValueWriter, t Type, src []byte) error {
180180
return wvb.writeValueBytes(t, src)
181181
}
182182

183-
vr := newDocumentReader(bytes.NewReader(src))
183+
vr := newBufferedDocumentReader(bytes.NewReader(src))
184184
vr.pushElement(t)
185185

186186
return copyValue(dst, vr)

bson/copier_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestCopier(t *testing.T) {
4040
doc = bsoncore.AppendStringElement(doc, "Hello", "world")
4141
doc, err := bsoncore.AppendDocumentEnd(doc, idx)
4242
noerr(t, err)
43-
src := newDocumentReader(bytes.NewReader(doc))
43+
src := newBufferedDocumentReader(bytes.NewReader(doc))
4444
dst := newValueWriterFromSlice(make([]byte, 0))
4545
want := doc
4646
err = copyDocument(dst, src)
@@ -77,7 +77,7 @@ func TestCopier(t *testing.T) {
7777
noerr(t, err)
7878
doc, err = bsoncore.AppendDocumentEnd(doc, idx)
7979
noerr(t, err)
80-
src := newDocumentReader(bytes.NewReader(doc))
80+
src := newBufferedDocumentReader(bytes.NewReader(doc))
8181

8282
_, err = src.ReadDocument()
8383
noerr(t, err)
@@ -450,7 +450,7 @@ func TestCopier(t *testing.T) {
450450
idx,
451451
)
452452
noerr(t, err)
453-
vr := newDocumentReader(bytes.NewReader(b))
453+
vr := newBufferedDocumentReader(bytes.NewReader(b))
454454
_, err = vr.ReadDocument()
455455
noerr(t, err)
456456
_, _, err = vr.ReadElement()
@@ -489,7 +489,7 @@ func TestCopier(t *testing.T) {
489489
idx,
490490
)
491491
noerr(t, err)
492-
vr := newDocumentReader(bytes.NewReader(b))
492+
vr := newBufferedDocumentReader(bytes.NewReader(b))
493493
_, err = vr.ReadDocument()
494494
noerr(t, err)
495495
_, _, err = vr.ReadElement()

bson/streaming_value_reader.go

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
package bson
88

9-
import "bufio"
9+
import (
10+
"bufio"
11+
)
1012

1113
// streamingValueReader reads from an ioReader wrapped in a bufio.Reader. It
1214
// first reads the BSON length header, then ensures it only ever reads exactly
@@ -15,43 +17,76 @@ import "bufio"
1517
// Note: this approach trades memory usage for extra buffering and reader calls,
1618
// so it is less performanted than the in-memory bufferedValueReader.
1719
type streamingValueReader struct {
18-
br *bufio.Reader
20+
br *bufio.Reader
21+
offset int64 // offset is the current read position in the buffer
1922
}
2023

2124
var _ valueReaderByteSrc = (*streamingValueReader)(nil)
2225

23-
// Read reads up to len(p) bytes from buf[offset:] into p, advancing offset.
24-
func (b *streamingValueReader) Read([]byte) (n int, err error) {
25-
panic("Read not implemented for streamingValueReader")
26-
}
27-
2826
// ReadByte returns the single byte at buf[offset] and advances offset by 1.
29-
func (b *streamingValueReader) ReadByte() (byte, error) {
30-
panic("ReadByte not implemented for streamingValueReader")
27+
func (s *streamingValueReader) ReadByte() (byte, error) {
28+
c, err := s.br.ReadByte()
29+
if err == nil {
30+
s.offset++
31+
}
32+
return c, err
3133
}
3234

3335
// peek returns buf[offset:offset+n] without advancing offset.
34-
func (b *streamingValueReader) peek(int) ([]byte, error) {
35-
panic("peek not implemented for streamingValueReader")
36+
func (s *streamingValueReader) peek(n int) ([]byte, error) {
37+
return s.br.Peek(n)
3638
}
3739

3840
// discard advances offset by n bytes, returning the number of bytes discarded.
39-
func (b *streamingValueReader) discard(int) (int, error) {
40-
panic("discard not implemented for streamingValueReader")
41+
func (s *streamingValueReader) discard(n int) (int, error) {
42+
m, err := s.br.Discard(n)
43+
s.offset += int64(m)
44+
return m, err
4145
}
4246

4347
// readSlice scans buf[offset:] for the first occurrence of delim, returns
4448
// buf[offset:idx+1], and advances offset past it; errors if delim not found.
45-
func (b *streamingValueReader) readSlice(byte) ([]byte, error) {
46-
panic("readSlice not implemented for streamingValueReader")
49+
func (s *streamingValueReader) readSlice(delim byte) ([]byte, error) {
50+
data, err := s.br.ReadSlice(delim)
51+
if err != nil {
52+
return nil, err
53+
}
54+
s.offset += int64(len(data))
55+
return data, nil
4756
}
4857

4958
// pos returns the current read position in the buffer.
50-
func (b *streamingValueReader) pos() int64 {
51-
panic("pos not implemented for streamingValueReader")
59+
func (s *streamingValueReader) pos() int64 {
60+
return s.offset
5261
}
5362

5463
// regexLength will return the total byte length of a BSON regex value.
55-
func (b *streamingValueReader) regexLength() (int32, error) {
56-
panic("regexLength not implemented for streamingValueReader")
64+
func (s *streamingValueReader) regexLength() (int32, error) {
65+
var (
66+
count int32
67+
nulCount int
68+
)
69+
70+
for nulCount < 2 {
71+
buf, err := s.br.Peek(int(count) + 1)
72+
if err != nil {
73+
return 0, err
74+
}
75+
76+
b := buf[count]
77+
count++
78+
if b == 0x00 {
79+
nulCount++
80+
}
81+
}
82+
83+
return count, nil
84+
}
85+
86+
func (*streamingValueReader) streamable() bool {
87+
return true
88+
}
89+
90+
func (s *streamingValueReader) reset() {
91+
s.offset = 0
5792
}

0 commit comments

Comments
 (0)