Conversation
…r into its own class (following pattern from known content)
…r into its own class (following pattern from known content)
…a-v2 into alexwoo/max_in_flight_s3
| "software.amazon.awssdk.services.s3.internal.crt.CrtResponseFileResponseTransformer"), | ||
| ArchUtils.classNameToPattern(RetryableSubAsyncRequestBody.class))); | ||
| ArchUtils.classNameToPattern(RetryableSubAsyncRequestBody.class), | ||
| ArchUtils.classNameToPattern(KnownContentLengthAsyncRequestBodySubscriber.class))); |
There was a problem hiding this comment.
This warn logging was pre-existing - see the change in UploadWithUnknownContentLengthHelper that moved the existing warning into KnownContentLengthAsyncRequestBodySubscriber
|
| import software.amazon.awssdk.utils.Pair; | ||
| | ||
| @SdkInternalApi | ||
| public class UnknownContentLengthAsyncRequestBodySubscriber implements Subscriber<CloseableAsyncRequestBody> { |
There was a problem hiding this comment.
Thanks for moving it to its own class! I've always wanted to do that!
Are there any changes on the logic?
| completeMultipartUploadIfFinished(asyncRequestBodyInFlight.decrementAndGet()); | ||
| int inFlight = asyncRequestBodyInFlight.decrementAndGet(); | ||
| if (!isDone && inFlight < maxInFlightParts) { | ||
| subscription.request(1); |
There was a problem hiding this comment.
Is it necessary to request here? It seems we could sent maxInFlightParts + 1 requests
1. Thread A (onNext): increments asyncRequestBodyInFlight to N, dispatches the part. 2. Thread B (whenComplete): decrements to N-1, sees N-1 < maxInFlightParts, calls request(1). 3. Thread A (inline): reads asyncRequestBodyInFlight.get() which is now N-1, sees N-1 < maxInFlightParts, calls request(1). I think we also need synchronize subscription.request(1) here per https://github.com/reactive-streams/reactive-streams-jvm
§2.7: "A Subscriber MUST ensure that all calls on its Subscription's request and cancel methods are performed serially."
| * @return The value for the maximum number of concurrent GetObject the that are allowed for multipart download. | ||
| * The maximum number of concurrent part requests that are allowed for multipart operations, including both multipart | ||
| * download (GetObject) and multipart upload (PutObject). This limits the number of parts that can be in flight at any | ||
| * given time, preventing the client from overwhelming the HTTP connection pool when transferring large objects. For |
There was a problem hiding this comment.
nit, extra space before For
| * The maximum number of concurrent part requests that are allowed for multipart operations, including both multipart | ||
| * download (GetObject) and multipart upload (PutObject). This limits the number of parts that can be in flight at any | ||
| * given time, preventing the client from overwhelming the HTTP connection pool when transferring large objects. For | ||
| * getObject it applies only when the {@link AsyncResponseTransformer} supports parallel split. |
There was a problem hiding this comment.
This is also true for putObject, right?




Motivation and Context
When
MultipartS3AsyncClient.putObject()performs a multipart upload of a large object, all UploadPart requests are dispatched eagerly with no concurrency limit. For example, a 2 GB upload with 8 MiB parts produces ~256 UploadPart requests that immediately compete for the HTTP connection pool (default maxConcurrency=50), leading to connection acquisition timeouts:The existing
maxInFlightPartsconfiguration inParallelConfigurationalready limits concurrent GetObject requests for multipart downloads, but was not applied to the upload path.Fixes #6623
Modifications
maxInFlightPartsconfiguration to also apply to multipart upload (putObject) concurrency. The setting now limits concurrent part requests for bothgetObject(download) andputObject(upload) operations. Updated the Javadoc onParallelConfiguration.maxInFlightParts()to reflect this broader scope.KnownContentLengthAsyncRequestBodySubscriber.onNext(): instead of unconditionally callingsubscription.request(1)after dispatching each UploadPart, the subscriber now checksasyncRequestBodyInFlight < maxInFlightParts. When a part completes and in-flight count drops below the limit,subscription.request(1)resumes flow.UnknownContentLengthAsyncRequestBodySubscriber.sendUploadPartRequest().UnknownContentLengthAsyncRequestBodySubscriberfrom an inner class ofUploadWithUnknownContentLengthHelperinto its own top-level class, matching the existing pattern ofKnownContentLengthAsyncRequestBodySubscriber.Testing
New and existing unit tests.
Types of changes
Checklist
mvn installsucceedsscripts/new-changescript and following the instructions. Commit the new file created by the script in.changes/next-releasewith your changes.License