6161 */
6262@ SdkInternalApi
6363public class ChunkedEncodedPublisher implements Publisher <ByteBuffer > {
64+ private final ByteBuffer EMPTY = ByteBuffer .allocate (0 );
6465 private static final byte [] CRLF = {'\r' , '\n' };
6566 private static final byte SEMICOLON = ';' ;
6667 private static final byte EQUALS = '=' ;
@@ -83,6 +84,7 @@ public ChunkedEncodedPublisher(Builder b) {
8384 this .extensions .addAll (b .extensions );
8485 this .trailers .addAll (b .trailers );
8586 this .addEmptyTrailingChunk = b .addEmptyTrailingChunk ;
87+ this .chunkBuffer = ByteBuffer .allocate (chunkSize );
8688 }
8789
8890 @ Override
@@ -93,9 +95,8 @@ public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
9395 Publisher <Iterable <ByteBuffer >> chunked = chunk (lengthEnforced );
9496 Publisher <Iterable <ByteBuffer >> trailingAdded = addTrailingChunks (chunked );
9597 Publisher <ByteBuffer > flattened = flatten (trailingAdded );
96- Publisher <ByteBuffer > encoded = map (flattened , this ::encodeChunk );
9798
98- encoded .subscribe (subscriber );
99+ flattened .subscribe (subscriber );
99100 }
100101
101102 public static Builder builder () {
@@ -105,7 +106,7 @@ public static Builder builder() {
105106 private void resetState () {
106107 extensions .forEach (Resettable ::reset );
107108 trailers .forEach (Resettable ::reset );
108- chunkBuffer = null ;
109+ chunkBuffer = ByteBuffer . allocate ( chunkSize ) ;
109110 }
110111
111112 private Iterable <Iterable <ByteBuffer >> getTrailingChunks () {
@@ -114,12 +115,13 @@ private Iterable<Iterable<ByteBuffer>> getTrailingChunks() {
114115 if (chunkBuffer != null ) {
115116 chunkBuffer .flip ();
116117 if (chunkBuffer .hasRemaining ()) {
117- trailing .add (chunkBuffer );
118+ trailing .add (encodeChunk (chunkBuffer ));
119+ chunkBuffer = null ;
118120 }
119121 }
120122
121123 if (addEmptyTrailingChunk ) {
122- trailing .add (ByteBuffer . allocate ( 0 ));
124+ trailing .add (encodeChunk ( EMPTY . duplicate () ));
123125 }
124126
125127 return Collections .singletonList (trailing );
@@ -174,6 +176,7 @@ private ByteBuffer encodeChunk(ByteBuffer byteBuffer) {
174176 .mapToInt (t -> t .remaining () + CRLF .length )
175177 .sum ();
176178
179+
177180 int encodedLen = chunkSizeHex .length + extensionsLength + CRLF .length + contentLen + trailerLen + CRLF .length ;
178181
179182 if (isTrailerChunk ) {
@@ -272,37 +275,57 @@ protected ChunkingSubscriber(Subscriber<? super Iterable<ByteBuffer>> subscriber
272275 }
273276
274277 @ Override
275- public void onNext (ByteBuffer byteBuffer ) {
276- if (chunkBuffer == null ) {
277- chunkBuffer = ByteBuffer .allocate (chunkSize );
278- }
279-
280- long totalBufferedBytes = (long ) chunkBuffer .position () + byteBuffer .remaining ();
278+ public void onNext (ByteBuffer inputBuffer ) {
279+ long totalBufferedBytes = (long ) chunkBuffer .position () + inputBuffer .remaining ();
280+ // compute the number full chunks we have currently
281281 int nBufferedChunks = (int ) (totalBufferedBytes / chunkSize );
282282
283283 List <ByteBuffer > chunks = new ArrayList <>(nBufferedChunks );
284284
285285 if (nBufferedChunks > 0 ) {
286- for (int i = 0 ; i < nBufferedChunks ; i ++) {
287- ByteBuffer slice = byteBuffer .slice ();
288- int maxBytesToCopy = Math .min (chunkBuffer .remaining (), slice .remaining ());
289- slice .limit (maxBytesToCopy );
286+ // We have some data from the previous inputBuffer
287+ if (chunkBuffer .position () > 0 ) {
288+ int bytesToFill = chunkBuffer .remaining ();
290289
290+ ByteBuffer slice = inputBuffer .slice ();
291+
292+ slice .limit (slice .position () + bytesToFill );
293+ inputBuffer .position (inputBuffer .position () + bytesToFill );
294+
295+ // At this point, we know chunkBuffer is full since inputBuffer has at least enough bytes to make up a full
296+ // chunk along with the data already in chunkBuffer
291297 chunkBuffer .put (slice );
292- if (!chunkBuffer .hasRemaining ()) {
293- chunkBuffer .flip ();
294- chunks .add (chunkBuffer );
295- chunkBuffer = ByteBuffer .allocate (chunkSize );
296- }
298+ chunkBuffer .flip ();
299+ chunks .add (encodeChunk (chunkBuffer ));
300+
301+ chunkBuffer .flip ();
297302
298- byteBuffer .position (byteBuffer .position () + maxBytesToCopy );
303+ nBufferedChunks --;
304+ }
305+
306+ // Now encode all the remaining full chunks from inputBuffer.
307+ // At this point chunkBuffer has no data in it; slice off chunks from inputBuffer and encode directly
308+ for (int i = 0 ; i < nBufferedChunks ; i ++) {
309+ ByteBuffer slice = inputBuffer .slice ();
310+
311+ int sliceLimit = Math .min (slice .limit (), chunkSize );
312+ slice .limit (sliceLimit );
313+
314+ inputBuffer .position (inputBuffer .position () + slice .remaining ());
315+
316+ if (slice .remaining () >= chunkSize ) {
317+ slice .limit (slice .position () + chunkSize );
318+ chunks .add (encodeChunk (slice ));
319+ } else {
320+ chunkBuffer .put (slice );
321+ }
299322 }
300323
301- if (byteBuffer .hasRemaining ()) {
302- chunkBuffer .put (byteBuffer );
324+ if (inputBuffer .hasRemaining ()) {
325+ chunkBuffer .put (inputBuffer );
303326 }
304327 } else {
305- chunkBuffer .put (byteBuffer );
328+ chunkBuffer .put (inputBuffer );
306329 }
307330
308331 subscriber .onNext (chunks );
0 commit comments