Skip to content

Commit 3300088

Browse files
committed
perf(stream): use pool of bytes buffer
Signed-off-by: Alexandre Lecerf <[email protected]>
1 parent c64389f commit 3300088

File tree

2 files changed

+76
-12
lines changed

2 files changed

+76
-12
lines changed

stream/stream.go

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"encoding/binary"
66
"io"
7+
"sync"
78

89
"github.com/ovh/symmecrypt"
910
_ "github.com/ovh/symmecrypt/keyloader"
@@ -42,29 +43,54 @@ func (k key) DecryptPipe(r io.Reader, w io.Writer, extra ...[]byte) error {
4243
return nil
4344
}
4445

46+
var buffers = sync.Pool{
47+
New: func() interface{} {
48+
return new(bytes.Buffer)
49+
},
50+
}
51+
52+
func getBuffer() (*bytes.Buffer, error) {
53+
if b, ok := buffers.Get().(*bytes.Buffer); ok {
54+
b.Reset()
55+
return b, nil
56+
}
57+
58+
panic("buffers is not of type *bytes.Buffer")
59+
}
60+
61+
func putBuffer(buf *bytes.Buffer) {
62+
buffers.Put(buf)
63+
}
64+
4565
var _ io.WriteCloser = new(chunksWriter)
4666

4767
type chunksWriter struct {
4868
destination io.Writer
4969
k symmecrypt.Key
5070
extras [][]byte
5171
chunkSize int
52-
currentChunkWriter *bytes.Buffer
72+
buf *bytes.Buffer
5373
currentChunkBytesWritten int
5474
}
5575

5676
func (w *chunksWriter) encryptCurrentChunk() error {
57-
if w.currentChunkWriter == nil && w.currentChunkBytesWritten == 0 {
77+
if w.buf == nil && w.currentChunkBytesWritten == 0 {
5878
return nil
5979
}
60-
currentChunk := w.currentChunkWriter.Bytes()
80+
6181
// first step: encrypt the chunks
62-
btes, err := w.k.Encrypt(currentChunk, w.extras...)
82+
btes, err := w.k.Encrypt(w.buf.Bytes(), w.extras...)
6383
if err != nil {
6484
return err
6585
}
6686

6787
// then write into the destination writer the len of the encrypted chunks
88+
headerBytes, err := getBuffer()
89+
if err != nil {
90+
return err
91+
}
92+
defer putBuffer(headerBytes)
93+
6894
headerBuf := make([]byte, binary.MaxVarintLen32)
6995
binary.PutUvarint(headerBuf, uint64(len(btes)))
7096
if _, err := w.destination.Write(headerBuf); err != nil {
@@ -76,7 +102,8 @@ func (w *chunksWriter) encryptCurrentChunk() error {
76102

77103
// finally reset the current chunk
78104
w.currentChunkBytesWritten = 0
79-
w.currentChunkWriter = nil
105+
putBuffer(w.buf)
106+
w.buf = nil
80107

81108
return err
82109
}
@@ -85,8 +112,13 @@ func (w *chunksWriter) Write(p []byte) (int, error) {
85112
if len(p) == 0 {
86113
return len(p), nil
87114
}
88-
if w.currentChunkWriter == nil {
89-
w.currentChunkWriter = new(bytes.Buffer)
115+
if w.buf == nil {
116+
chunkWriter, err := getBuffer()
117+
if err != nil {
118+
return 0, err
119+
}
120+
121+
w.buf = chunkWriter
90122
w.currentChunkBytesWritten = 0
91123
}
92124

@@ -97,7 +129,7 @@ func (w *chunksWriter) Write(p []byte) (int, error) {
97129
}
98130

99131
if w.currentChunkBytesWritten+len(p) == w.chunkSize {
100-
n, err := w.currentChunkWriter.Write(p)
132+
n, err := w.buf.Write(p)
101133
if err != nil {
102134
return n, err
103135
}
@@ -110,7 +142,7 @@ func (w *chunksWriter) Write(p []byte) (int, error) {
110142

111143
x := w.chunkSize - w.currentChunkBytesWritten
112144
if len(p) < x {
113-
n, err := w.currentChunkWriter.Write(p)
145+
n, err := w.buf.Write(p)
114146
w.currentChunkBytesWritten += int(n)
115147
return n, err
116148
} else {
@@ -176,19 +208,29 @@ func NewReader(r io.Reader, k symmecrypt.Key, chunkSize int, extras ...[]byte) i
176208
}
177209

178210
func (r *chunksReader) readNewChunk() error {
211+
headerBtes, err := getBuffer()
212+
if err != nil {
213+
return err
214+
}
215+
defer putBuffer(headerBtes)
216+
179217
// read the chunksize
180-
var headerBtes = new(bytes.Buffer)
181218
if _, err := io.CopyN(headerBtes, r.src, binary.MaxVarintLen32); err != nil {
182219
return err
183220
}
184221

185-
n, err := binary.ReadUvarint(bytes.NewReader(headerBtes.Bytes())) // READ THE HEADER BUFFER
222+
n, err := binary.ReadUvarint(headerBtes) // READ THE HEADER BUFFER
186223
if err != nil {
187224
return err
188225
}
189226

190227
// read the chunk content
191-
var btsBuff = new(bytes.Buffer)
228+
btsBuff, err := getBuffer()
229+
if err != nil {
230+
return err
231+
}
232+
defer putBuffer(btsBuff)
233+
192234
if _, err := io.CopyN(btsBuff, r.src, int64(n)); err != nil && err != io.EOF {
193235
return err
194236
}

stream/stream_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,5 +70,27 @@ func TestIncompleteRead(t *testing.T) {
7070
assert.Contains(t, err.Error(), "EOF")
7171

7272
require.Equal(t, 32*1024+10, nbBytesReaden1+nbBytesReaden2)
73+
}
74+
75+
func BenchmarkStream(b *testing.B) {
76+
for i := 0; i < b.N; i++ {
77+
clearContent := make([]byte, 32*1024+10)
78+
rand.Read(clearContent) // nolint
79+
80+
k, _ := keyloader.LoadKey("test")
7381

82+
var bufWriter bytes.Buffer
83+
streamWriter := stream.NewWriter(&bufWriter, k, 32*1024)
84+
_, err := io.Copy(streamWriter, bytes.NewReader(clearContent))
85+
require.NoError(b, err)
86+
87+
streamReader := stream.NewReader(strings.NewReader(bufWriter.String()), k, 32*1024)
88+
var firstPart = make([]byte, 32*1024)
89+
_, err = streamReader.Read(firstPart)
90+
require.NoError(b, err)
91+
92+
var secondPart = make([]byte, 32*1024)
93+
_, err = streamReader.Read(secondPart)
94+
require.NoError(b, err)
95+
}
7496
}

0 commit comments

Comments
 (0)