@@ -75,6 +75,14 @@ type Stream struct {
7575 //
7676 // Note: Calls to KeyToList are concurrent.
7777 KeyToList func (key []byte , itr * Iterator ) (* pb.KVList , error )
78+ // UseKeyToListWithThreadId is used to indicate that KeyToListWithThreadId should be used
79+ // instead of KeyToList. This is a new api that can be used to figure out parallelism
80+ // of the stream. Each threadId would be run serially. KeyToList being concurrent makes you
81+ // take care of concurrency in KeyToList. Here threadId could be used to do some things serially.
82+ // Once a thread finishes FinishThread() would be called.
83+ UseKeyToListWithThreadId bool
84+ KeyToListWithThreadId func (key []byte , itr * Iterator , threadId int ) (* pb.KVList , error )
85+ FinishThread func (threadId int ) (* pb.KVList , error )
7886
7987 // This is the method where Stream sends the final output. All calls to Send are done by a
8088 // single goroutine, i.e. logic within Send method can expect single threaded execution.
@@ -143,7 +151,7 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) {
143151// keyRange is [start, end), including start, excluding end. Do ensure that the start,
144152// end byte slices are owned by keyRange struct.
145153func (st * Stream ) produceRanges (ctx context.Context ) {
146- ranges := st .db .Ranges (st .Prefix , 16 )
154+ ranges := st .db .Ranges (st .Prefix , st . NumGo )
147155 y .AssertTrue (len (ranges ) > 0 )
148156 y .AssertTrue (ranges [0 ].left == nil )
149157 y .AssertTrue (ranges [len (ranges )- 1 ].right == nil )
@@ -186,7 +194,7 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
186194 iterOpts := DefaultIteratorOptions
187195 iterOpts .AllVersions = true
188196 iterOpts .Prefix = st .Prefix
189- iterOpts .PrefetchValues = false
197+ iterOpts .PrefetchValues = true
190198 iterOpts .SinceTs = st .SinceTs
191199 itr := txn .NewIterator (iterOpts )
192200 itr .ThreadId = threadId
@@ -233,7 +241,13 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
233241
234242 // Now convert to key value.
235243 itr .Alloc .Reset ()
236- list , err := st .KeyToList (item .KeyCopy (nil ), itr )
244+ var list * pb.KVList
245+ var err error
246+ if st .UseKeyToListWithThreadId {
247+ list , err = st .KeyToListWithThreadId (item .KeyCopy (nil ), itr , threadId )
248+ } else {
249+ list , err = st .KeyToList (item .KeyCopy (nil ), itr )
250+ }
237251 if err != nil {
238252 st .db .opt .Warningf ("While reading key: %x, got error: %v" , item .Key (), err )
239253 continue
@@ -252,6 +266,23 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
252266 }
253267 }
254268 }
269+
270+ if st .UseKeyToListWithThreadId {
271+ if kvs , err := st .FinishThread (threadId ); err != nil {
272+ return err
273+ } else {
274+ for _ , kv := range kvs .Kv {
275+ kv .StreamId = streamId
276+ KVToBuffer (kv , outList )
277+ if outList .LenNoPadding () < batchSize {
278+ continue
279+ }
280+ if err := sendIt (); err != nil {
281+ return err
282+ }
283+ }
284+ }
285+ }
255286 // Mark the stream as done.
256287 if st .doneMarkers {
257288 kv := & pb.KV {
0 commit comments