3232import software .amazon .awssdk .annotations .SdkInternalApi ;
3333import software .amazon .awssdk .core .async .AsyncRequestBody ;
3434import software .amazon .awssdk .core .async .listener .PublisherListener ;
35+ import software .amazon .awssdk .core .exception .SdkClientException ;
3536import software .amazon .awssdk .services .s3 .model .CompleteMultipartUploadResponse ;
3637import software .amazon .awssdk .services .s3 .model .CompletedPart ;
3738import software .amazon .awssdk .services .s3 .model .PutObjectRequest ;
@@ -54,9 +55,9 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
5455 private final AtomicBoolean failureActionInitiated = new AtomicBoolean (false );
5556 private final AtomicInteger partNumber = new AtomicInteger (1 );
5657 private final MultipartUploadHelper multipartUploadHelper ;
57- private final long contentLength ;
58+ private final long totalSize ;
5859 private final long partSize ;
59- private final int partCount ;
60+ private final int expectedNumPart ;
6061 private final int numExistingParts ;
6162 private final String uploadId ;
6263 private final Collection <CompletableFuture <CompletedPart >> futures = new ConcurrentLinkedQueue <>();
@@ -77,25 +78,21 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
7778 KnownContentLengthAsyncRequestBodySubscriber (MpuRequestContext mpuRequestContext ,
7879 CompletableFuture <PutObjectResponse > returnFuture ,
7980 MultipartUploadHelper multipartUploadHelper ) {
80- this .contentLength = mpuRequestContext .contentLength ();
81+ this .totalSize = mpuRequestContext .contentLength ();
8182 this .partSize = mpuRequestContext .partSize ();
82- this .partCount = determinePartCount ( contentLength , partSize );
83+ this .expectedNumPart = mpuRequestContext . expectedNumParts ( );
8384 this .putObjectRequest = mpuRequestContext .request ().left ();
8485 this .returnFuture = returnFuture ;
8586 this .uploadId = mpuRequestContext .uploadId ();
8687 this .existingParts = mpuRequestContext .existingParts () == null ? new HashMap <>() : mpuRequestContext .existingParts ();
8788 this .numExistingParts = NumericUtils .saturatedCast (mpuRequestContext .numPartsCompleted ());
88- this .completedParts = new AtomicReferenceArray <>(partCount );
89+ this .completedParts = new AtomicReferenceArray <>(expectedNumPart );
8990 this .multipartUploadHelper = multipartUploadHelper ;
9091 this .progressListener = putObjectRequest .overrideConfiguration ().map (c -> c .executionAttributes ()
9192 .getAttribute (JAVA_PROGRESS_LISTENER ))
9293 .orElseGet (PublisherListener ::noOp );
9394 }
9495
95- private int determinePartCount (long contentLength , long partSize ) {
96- return (int ) Math .ceil (contentLength / (double ) partSize );
97- }
98-
9996 public S3ResumeToken pause () {
10097 isPaused = true ;
10198
@@ -119,7 +116,7 @@ public S3ResumeToken pause() {
119116 return S3ResumeToken .builder ()
120117 .uploadId (uploadId )
121118 .partSize (partSize )
122- .totalNumParts ((long ) partCount )
119+ .totalNumParts ((long ) expectedNumPart )
123120 .numPartsCompleted (numPartsCompleted + numExistingParts )
124121 .build ();
125122 }
@@ -145,21 +142,23 @@ public void onSubscribe(Subscription s) {
145142
146143 @ Override
147144 public void onNext (AsyncRequestBody asyncRequestBody ) {
148- if (isPaused ) {
145+ if (isPaused || isDone ) {
149146 return ;
150147 }
151148
152- if ( existingParts . containsKey ( partNumber .get ())) {
153- partNumber . getAndIncrement ();
149+ int currentPartNum = partNumber .getAndIncrement ();
150+ if ( existingParts . containsKey ( currentPartNum )) {
154151 asyncRequestBody .subscribe (new CancelledSubscriber <>());
155152 subscription .request (1 );
156153 asyncRequestBody .contentLength ().ifPresent (progressListener ::subscriberOnNext );
157154 return ;
158155 }
159156
157+ validatePart (asyncRequestBody , currentPartNum );
158+
160159 asyncRequestBodyInFlight .incrementAndGet ();
161160 UploadPartRequest uploadRequest = SdkPojoConversionUtils .toUploadPartRequest (putObjectRequest ,
162- partNumber . getAndIncrement () ,
161+ currentPartNum ,
163162 uploadId );
164163
165164 Consumer <CompletedPart > completedPartConsumer = completedPart -> completedParts .set (completedPart .partNumber () - 1 ,
@@ -179,6 +178,49 @@ public void onNext(AsyncRequestBody asyncRequestBody) {
179178 subscription .request (1 );
180179 }
181180
181+ private void validatePart (AsyncRequestBody asyncRequestBody , int currentPartNum ) {
182+ if (!asyncRequestBody .contentLength ().isPresent ()) {
183+ SdkClientException e = SdkClientException .create ("Content length must be present on the AsyncRequestBody" );
184+ multipartUploadHelper .failRequestsElegantly (futures , e , uploadId , returnFuture , putObjectRequest );
185+ return ;
186+ }
187+
188+ Long currentPartSize = asyncRequestBody .contentLength ().get ();
189+ if (currentPartNum > expectedNumPart ) {
190+ SdkClientException exception = SdkClientException .create (String .format ("The number of parts divided is "
191+ + "not equal to the expected number of "
192+ + "parts. Expected: %d, Actual: %d" ,
193+ expectedNumPart , currentPartNum ));
194+ multipartUploadHelper .failRequestsElegantly (futures , exception , uploadId , returnFuture , putObjectRequest );
195+ return ;
196+ }
197+
198+ if (currentPartNum == expectedNumPart ) {
199+ validateLastPartSize (currentPartSize );
200+ return ;
201+ }
202+
203+ if (currentPartSize != partSize ) {
204+ SdkClientException e = SdkClientException .create (String .format ("Content length must not be greater than the "
205+ + "part size. Expected: %d, Actual: %d" ,
206+ partSize ,
207+ currentPartSize ));
208+ multipartUploadHelper .failRequestsElegantly (futures , e , uploadId , returnFuture , putObjectRequest );
209+ }
210+ }
211+
212+ private void validateLastPartSize (Long currentPartSize ) {
213+ long remainder = totalSize % partSize ;
214+ long expectedLastPartSize = remainder == 0 ? partSize : remainder ;
215+ if (currentPartSize != expectedLastPartSize ) {
216+ SdkClientException exception =
217+ SdkClientException .create ("Content length of the last part must be equal to the "
218+ + "expected last part size. Expected: " + expectedLastPartSize
219+ + ", Actual: " + currentPartSize );
220+ multipartUploadHelper .failRequestsElegantly (futures , exception , uploadId , returnFuture , putObjectRequest );
221+ }
222+ }
223+
182224 private boolean shouldFailRequest () {
183225 return failureActionInitiated .compareAndSet (false , true ) && !isPaused ;
184226 }
@@ -187,6 +229,7 @@ private boolean shouldFailRequest() {
187229 public void onError (Throwable t ) {
188230 log .debug (() -> "Received onError " , t );
189231 if (failureActionInitiated .compareAndSet (false , true )) {
232+ isDone = true ;
190233 multipartUploadHelper .failRequestsElegantly (futures , t , uploadId , returnFuture , putObjectRequest );
191234 }
192235 }
@@ -203,6 +246,7 @@ public void onComplete() {
203246 private void completeMultipartUploadIfFinished (int requestsInFlight ) {
204247 if (isDone && requestsInFlight == 0 && completedMultipartInitiated .compareAndSet (false , true )) {
205248 CompletedPart [] parts ;
249+
206250 if (existingParts .isEmpty ()) {
207251 parts =
208252 IntStream .range (0 , completedParts .length ())
@@ -213,14 +257,14 @@ private void completeMultipartUploadIfFinished(int requestsInFlight) {
213257 parts = mergeCompletedParts ();
214258 }
215259 completeMpuFuture = multipartUploadHelper .completeMultipartUpload (returnFuture , uploadId , parts , putObjectRequest ,
216- contentLength );
260+ totalSize );
217261 }
218262 }
219263
220264 private CompletedPart [] mergeCompletedParts () {
221- CompletedPart [] merged = new CompletedPart [partCount ];
265+ CompletedPart [] merged = new CompletedPart [expectedNumPart ];
222266 int currPart = 1 ;
223- while (currPart < partCount + 1 ) {
267+ while (currPart < expectedNumPart + 1 ) {
224268 CompletedPart completedPart = existingParts .containsKey (currPart ) ? existingParts .get (currPart ) :
225269 completedParts .get (currPart - 1 );
226270 merged [currPart - 1 ] = completedPart ;
0 commit comments