Conversation
…r into its own class (following pattern from known content)
…r into its own class (following pattern from known content)
…a-v2 into alexwoo/max_in_flight_s3
| "software.amazon.awssdk.services.s3.internal.crt.CrtResponseFileResponseTransformer"), | ||
| ArchUtils.classNameToPattern(RetryableSubAsyncRequestBody.class))); | ||
| ArchUtils.classNameToPattern(RetryableSubAsyncRequestBody.class), | ||
| ArchUtils.classNameToPattern(KnownContentLengthAsyncRequestBodySubscriber.class))); |
There was a problem hiding this comment.
This warn logging was pre-existing - see the change in UploadWithUnknownContentLengthHelper that moved the existing warning into KnownContentLengthAsyncRequestBodySubscriber
| import software.amazon.awssdk.utils.Pair; | ||
|
|
||
| @SdkInternalApi | ||
| public class UnknownContentLengthAsyncRequestBodySubscriber implements Subscriber<CloseableAsyncRequestBody> { |
There was a problem hiding this comment.
Thanks for moving it to its own class! I've always wanted to do that!
Are there any changes on the logic?
There was a problem hiding this comment.
It made it much easier to test!
There are changes in the logic - you can see them in the commit here before I moved it out: 335d6db#diff-4fe5c54707205f54fb73bd2acef1fd89181e7ccb1647b4a3a7cd64698c750a9d
| completeMultipartUploadIfFinished(asyncRequestBodyInFlight.decrementAndGet()); | ||
| int inFlight = asyncRequestBodyInFlight.decrementAndGet(); | ||
| if (!isDone && inFlight < maxInFlightParts) { | ||
| subscription.request(1); |
There was a problem hiding this comment.
Is it necessary to request here? It seems we could sent maxInFlightParts + 1 requests
1. Thread A (onNext): increments asyncRequestBodyInFlight to N, dispatches the part.
2. Thread B (whenComplete): decrements to N-1, sees N-1 < maxInFlightParts, calls request(1).
3. Thread A (inline): reads asyncRequestBodyInFlight.get() which is now N-1, sees N-1 < maxInFlightParts, calls request(1).
I think we also need synchronize subscription.request(1) here per https://github.com/reactive-streams/reactive-streams-jvm
§2.7: "A Subscriber MUST ensure that all calls on its Subscription's request and cancel methods are performed serially."
There was a problem hiding this comment.
Ugh, Reactive streams are always so hard to think through.... Okay, so I think it is necessary to call request here in the completion callback - thats the most natural way I can think of to request more once requests complete.
But you're right that this results in a potential race condition where we request more than maxParts. I was trying to be too clever and keep the logic between known and unknown content length more consistent. What the ParallelMultipartDownloaderSubscriber does is request(maxInFlightParts) up front in onSubscribe and then only request more whenComplete. I've updated the code now to match this pattern - request maxInFlightParts up front and then only request more whenComplete. However, it complicates the UnknownContentLength case a little bit - we still request only 1 onSubscribe, but then once we know its an MPU case we then request the maxInflight.
You're also right about the synchronize. My first pass used atomics alone to ensure this, but with this approach we do need to sycnhronize :-(
Great catches! Reactive streams always hurts my brain.
There was a problem hiding this comment.
Reactive streams always hurts my brain.
Haha, that makes two of us!
...ces/s3/src/main/java/software/amazon/awssdk/services/s3/multipart/ParallelConfiguration.java
Outdated
Show resolved
Hide resolved
| * The maximum number of concurrent part requests that are allowed for multipart operations, including both multipart | ||
| * download (GetObject) and multipart upload (PutObject). This limits the number of parts that can be in flight at any | ||
| * given time, preventing the client from overwhelming the HTTP connection pool when transferring large objects. For | ||
| * getObject it applies only when the {@link AsyncResponseTransformer} supports parallel split. |
There was a problem hiding this comment.
This is also true for putObject, right?
There was a problem hiding this comment.
Check my logic here, but I think its actually only true for GetObject - uploads only have a known/unknown length distinction rather than a parallel/serial split. In UploadObjectHelper#uploadObject we just check if we have contentLength and then delegate to either the known or unknown length helpers. Both of those work similarly (except the unknown requests a part first to see if the size is > part size before starting the MPU...) and both use asyncRequestBody.splitCloseable. (eg, see UploadWithKnownContentLengthHelper#uploadObject)
There was a problem hiding this comment.
Ah right, for uploads, we read in serial for non-file request bodies but send in parallel depending on the buffer
|




Motivation and Context
When
MultipartS3AsyncClient.putObject()performs a multipart upload of a large object, all UploadPart requests are dispatched eagerly with no concurrency limit. For example, a 2 GB upload with 8 MiB parts produces ~256 UploadPart requests that immediately compete for the HTTP connection pool (default maxConcurrency=50), leading to connection acquisition timeouts:The existing
maxInFlightPartsconfiguration inParallelConfigurationalready limits concurrent GetObject requests for multipart downloads, but was not applied to the upload path.Fixes #6623
Modifications
maxInFlightPartsconfiguration to also apply to multipart upload (putObject) concurrency. The setting now limits concurrent part requests for bothgetObject(download) andputObject(upload) operations. Updated the Javadoc onParallelConfiguration.maxInFlightParts()to reflect this broader scope.KnownContentLengthAsyncRequestBodySubscriber.onNext(): instead of unconditionally callingsubscription.request(1)after dispatching each UploadPart, the subscriber now checksasyncRequestBodyInFlight < maxInFlightParts. When a part completes and in-flight count drops below the limit,subscription.request(1)resumes flow.UnknownContentLengthAsyncRequestBodySubscriber.sendUploadPartRequest().UnknownContentLengthAsyncRequestBodySubscriberfrom an inner class ofUploadWithUnknownContentLengthHelperinto its own top-level class, matching the existing pattern ofKnownContentLengthAsyncRequestBodySubscriber.Testing
New and existing unit tests.
Types of changes
Checklist
mvn installsucceedsscripts/new-changescript and following the instructions. Commit the new file created by the script in.changes/next-releasewith your changes.License