Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AmazonS3-d8d7a87.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "Amazon S3",
"contributor": "",
"description": "Add support for maxInFlightParts to multipart upload (PutObject) in MultipartS3AsyncClient."
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import software.amazon.awssdk.utils.Pair;

@SdkInternalApi
public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<CloseableAsyncRequestBody> {
public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<CloseableAsyncRequestBody> {

private static final Logger log = Logger.loggerFor(KnownContentLengthAsyncRequestBodySubscriber.class);

Expand All @@ -70,6 +70,8 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
private final AtomicReferenceArray<CompletedPart> completedParts;
private final Map<Integer, CompletedPart> existingParts;
private final PublisherListener<Long> progressListener;
private final int maxInFlightParts;
private final Object subscriptionLock = new Object();
private Subscription subscription;
private volatile boolean isDone;
private volatile boolean isPaused;
Expand All @@ -80,8 +82,9 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
private volatile CompletableFuture<CompleteMultipartUploadResponse> completeMpuFuture;

KnownContentLengthAsyncRequestBodySubscriber(MpuRequestContext mpuRequestContext,
CompletableFuture<PutObjectResponse> returnFuture,
MultipartUploadHelper multipartUploadHelper) {
CompletableFuture<PutObjectResponse> returnFuture,
MultipartUploadHelper multipartUploadHelper,
int maxInFlightParts) {
this.totalSize = mpuRequestContext.contentLength();
this.partSize = mpuRequestContext.partSize();
this.expectedNumParts = mpuRequestContext.expectedNumParts();
Expand All @@ -92,8 +95,10 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
this.existingNumParts = NumericUtils.saturatedCast(mpuRequestContext.numPartsCompleted());
this.completedParts = new AtomicReferenceArray<>(expectedNumParts);
this.multipartUploadHelper = multipartUploadHelper;
this.progressListener = putObjectRequest.overrideConfiguration().map(c -> c.executionAttributes()
.getAttribute(JAVA_PROGRESS_LISTENER))
this.maxInFlightParts = maxInFlightParts;
this.progressListener = putObjectRequest.overrideConfiguration()
.map(c -> c.executionAttributes()
.getAttribute(JAVA_PROGRESS_LISTENER))
.orElseGet(PublisherListener::noOp);
}

Expand Down Expand Up @@ -133,7 +138,7 @@ public void onSubscribe(Subscription s) {
return;
}
this.subscription = s;
s.request(1);
s.request(maxInFlightParts);
returnFuture.whenComplete((r, t) -> {
if (t != null) {
s.cancel();
Expand All @@ -153,23 +158,26 @@ public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
int currentPartNum = partNumber.getAndIncrement();

log.debug(() -> String.format("Received asyncRequestBody for part number %d with length %s", currentPartNum,
asyncRequestBody.contentLength()));
asyncRequestBody.contentLength()));

if (existingParts.containsKey(currentPartNum)) {
asyncRequestBody.subscribe(new CancelledSubscriber<>());
asyncRequestBody.contentLength().ifPresent(progressListener::subscriberOnNext);
asyncRequestBody.close();
subscription.request(1);

synchronized (subscriptionLock) {
subscription.request(1);
}
return;
}

Optional<SdkClientException> sdkClientException = validatePart(asyncRequestBody, currentPartNum);
if (sdkClientException.isPresent()) {
multipartUploadHelper.failRequestsElegantly(futures,
sdkClientException.get(),
uploadId,
returnFuture,
putObjectRequest);
sdkClientException.get(),
uploadId,
returnFuture,
putObjectRequest);
subscription.cancel();
return;
}
Expand All @@ -179,8 +187,9 @@ public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
currentPartNum,
uploadId);

Consumer<CompletedPart> completedPartConsumer = completedPart -> completedParts.set(completedPart.partNumber() - 1,
completedPart);
Consumer<CompletedPart> completedPartConsumer = completedPart -> completedParts.set(
completedPart.partNumber() - 1,
completedPart);
multipartUploadHelper.sendIndividualUploadPartRequest(uploadId, completedPartConsumer, futures,
Pair.of(uploadRequest, asyncRequestBody), progressListener)
.whenComplete((r, t) -> {
Expand All @@ -192,10 +201,15 @@ public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
subscription.cancel();
}
} else {
completeMultipartUploadIfFinished(asyncRequestBodyInFlight.decrementAndGet());
int inFlight = asyncRequestBodyInFlight.decrementAndGet();
if (!isDone && inFlight < maxInFlightParts) {
synchronized (subscriptionLock) {
subscription.request(1);
}
}
completeMultipartUploadIfFinished(inFlight);
}
});
subscription.request(1);
}

private Optional<SdkClientException> validatePart(AsyncRequestBody asyncRequestBody, int currentPartNum) {
Expand Down Expand Up @@ -258,10 +272,9 @@ private void completeMultipartUploadIfFinished(int requestsInFlight) {
CompletedPart[] parts;

if (existingParts.isEmpty()) {
parts =
IntStream.range(0, completedParts.length())
.mapToObj(completedParts::get)
.toArray(CompletedPart[]::new);
parts = IntStream.range(0, completedParts.length())
.mapToObj(completedParts::get)
.toArray(CompletedPart[]::new);
} else {
// List of CompletedParts needs to be in ascending order
parts = mergeCompletedParts();
Expand All @@ -274,7 +287,8 @@ private void completeMultipartUploadIfFinished(int requestsInFlight) {
return;
}

completeMpuFuture = multipartUploadHelper.completeMultipartUpload(returnFuture, uploadId, parts, putObjectRequest,
completeMpuFuture = multipartUploadHelper.completeMultipartUpload(returnFuture, uploadId, parts,
putObjectRequest,
totalSize);
}
}
Expand All @@ -283,8 +297,8 @@ private CompletedPart[] mergeCompletedParts() {
CompletedPart[] merged = new CompletedPart[expectedNumParts];
int currPart = 1;
while (currPart < expectedNumParts + 1) {
CompletedPart completedPart = existingParts.containsKey(currPart) ? existingParts.get(currPart) :
completedParts.get(currPart - 1);
CompletedPart completedPart = existingParts.containsKey(currPart) ? existingParts.get(currPart)
: completedParts.get(currPart - 1);
merged[currPart - 1] = completedPart;
currPart++;
}
Expand Down
Loading
Loading