Skip to content

Support S3TransferManager upload using Flux<ByteBuffer> #2714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
pkgonan opened this issue Sep 15, 2021 · 13 comments
Closed

Support S3TransferManager upload using Flux<ByteBuffer> #2714

pkgonan opened this issue Sep 15, 2021 · 13 comments
Assignees
Labels
feature-request A feature should be added or improved. transfer-manager

Comments

@pkgonan
Copy link

pkgonan commented Sep 15, 2021

Describe the Feature

Support S3TransferManager upload using Flux.
Consider changing UploadRequest's source File to Flux.

Additional Context

@pkgonan pkgonan added feature-request A feature should be added or improved. needs-triage This issue or PR still needs to be triaged. labels Sep 15, 2021
@pkgonan
Copy link
Author

pkgonan commented Sep 15, 2021

@zoewangg @debora-ito
Hi. Please consider this feature request.

@zoewangg zoewangg removed the needs-triage This issue or PR still needs to be triaged. label Sep 22, 2021
@debora-ito
Copy link
Member

Thank you for reaching out @pkgonan, feature request acknowledged.

@Bennett-Lynch
Copy link
Contributor

Bennett-Lynch commented Nov 1, 2021

Hi @pkgonan. I'll be taking a look at this feature request. It may entail backwards incompatible API changes as we try to settle on the correct API (since TransferManager is still in PREVIEW). I'll include you on the pull request once it's ready.

@Bennett-Lynch
Copy link
Contributor

Bennett-Lynch commented Nov 8, 2021

This has been resolved with #2817. Thanks for the feature request, @pkgonan!

As mentioned in the PR, you should be able to adapt a Flux<ByteBuffer> to an AsyncRequestBody by doing something like: AsyncRequestBody.fromPublisher(Flux<ByteBuffer>).

On the response/download side, this is currently a little bit more complicated. You would need to implement your own AsyncResponseTransformer and capture the SdkPublisher<ByteBuffer> given as part of the onStream(..) method. If this is a common use case, I would like to consider making this more convenient by offering a standard SDK implementation to do this.

@github-actions
Copy link

github-actions bot commented Nov 8, 2021

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see.
If you need more assistance, please open a new issue that references this one.
If you wish to keep having a conversation with other community members under this issue feel free to do so.

@djchapm
Copy link

djchapm commented Jun 16, 2022

@pkgonan did you ever get this to work? "AsyncRequestBody.fromPublisher(Flux)"

Still complains about content length if I try to do something like this with latest transferManager in v2.17.210-PREVIEW

@zoewangg
Copy link
Contributor

@djchapm what error did you get? Make sure you have contentLength specified in PutObjectRequest

@djchapm
Copy link

djchapm commented Jun 16, 2022

Copying this from issue 34 in v1... based on what I'm seeing in the v2 transfer manager preview and #2817 and #2814 ... Couldn't get it to work - same issue reported based on having content-size up front. Digging into the libraries - the fromPublisher ends up constructing a meta request that hardcodes content size to zero - so you'd think this would be intended for a publisher style interface. Here's my code and versions of everything for the test - if we could get this to work then I think most everyone's related feature request would be resolved.

/**
 * Java 17
 * s3-transfer-manager-2.17.210-PREVIEW.jar
 * sdk-core-2.17.210.jar
 * reactor-core-3.4.13.jar
 * Native CLIv2 on MacOS monterey: 
 *    aws-cli/2.7.7 Python/3.9.13 Darwin/21.5.0 source/x86_64
 */
class AWSTransferManagerTest {

    public static void main (String[] args) {
        S3ClientConfiguration s3TransferConfig = S3ClientConfiguration.builder()
                .targetThroughputInGbps(20.0)
                .minimumPartSizeInBytes(1024L)
                .build();
        S3TransferManager transferManager = S3TransferManager.builder()
                .s3ClientConfiguration(s3TransferConfig)
                .build();
        Flux<ByteBuffer> flux = Flux.just("one", "two", "three")
                .map(val -> ByteBuffer.wrap(val.getBytes()));
        //verify flux:
        //flux.subscribe(System.out::println);

        Log.initLoggingToFile(Log.LogLevel.Trace, "s3.logs");
        //output in s3.logs:
        // [INFO] [2022-06-16T15:19:49Z] [00007000056fd000] [S3Client] - id=0x7fab24928d20 Initiating making of meta request
        // [ERROR] [2022-06-16T15:19:49Z] [00007000056fd000] [S3MetaRequest] - Could not create auto-ranged-put meta request; there is no Content-Length header present.
        // [ERROR] [2022-06-16T15:19:49Z] [00007000056fd000] [S3Client] - id=0x7fab24928d20: Could not create new meta request.
        // [WARN] [2022-06-16T15:19:50Z] [0000700005d0f000] [JavaCrtGeneral] - Not all native threads were successfully joined during gentle shutdown.  Memory may be leaked.

        Upload upload =
                transferManager.upload(UploadRequest.builder()
                        .putObjectRequest(b -> b.bucket("bucket").key("tmp/flux.bin"))
                        .requestBody(AsyncRequestBody.fromPublisher(flux))
                        .overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
                        .build());
        CompletedUpload completedUpload = upload.completionFuture().join();
    }
}

Output in console:

Caused by: software.amazon.awssdk.crt.CrtRuntimeException: S3Client.aws_s3_client_make_meta_request: creating aws_s3_meta_request failed (aws_last_error: AWS_ERROR_INVALID_ARGUMENT(34), An invalid argument was passed to a function.) AWS_ERROR_INVALID_ARGUMENT(34)
	at software.amazon.awssdk.crt.s3.S3Client.s3ClientMakeMetaRequest(Native Method)
	at software.amazon.awssdk.crt.s3.S3Client.makeMetaRequest(S3Client.java:70)
	at software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncHttpClient.execute(S3CrtAsyncHttpClient.java:97)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.doExecuteHttpRequest(MakeAsyncHttpRequestStage.java:175)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.executeHttpRequest(MakeAsyncHttpRequestStage.java:147)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$execute$1(MakeAsyncHttpRequestStage.java:99)
	at java.base/java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:757)
	at java.base/java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735)
	at java.base/java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2182)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.execute(MakeAsyncHttpRequestStage.java:95)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.execute(MakeAsyncHttpRequestStage.java:60)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallAttemptMetricCollectionStage.execute(AsyncApiCallAttemptMetricCollectionStage.java:55)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallAttemptMetricCollectionStage.execute(AsyncApiCallAttemptMetricCollectionStage.java:37)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.attemptExecute(AsyncRetryableStage.java:144)
	... 24 more

@zoewangg
Copy link
Contributor

Can you try specifying contentLength in PutObjectRequest? putObjectRequest(b -> b.bucket("bucket").key("tmp/flux.bin").contentLength(xxx))

@djchapm
Copy link

djchapm commented Jun 16, 2022

Hi @zoewangg - thanks for your prompt discussion on this!

Yes that works... but doesn't solve the main feature request I think.

Streaming uploads - heavily requested across a number of tickets. I thought that this ticket, #2817, #2814, and your efforts on the new TransferManager were all working towards uploading from a 'publisher' where the size is not known until publisher.onComplete. See the code referenced in #2817 about 'Arbitrary Object Transfers". There it's a request 'fromString', however the putObjectRequest does not specify a contentLength - additionally @Bennett-Lynch mentions later that the async fromPublisher should adapt to a Flux....

Is it expected then that even for this method of using a publisher - the input size needs to be known ahead of time?

@zoewangg
Copy link
Contributor

Yeah, using AsyncRequestBody.fromPublisher with unknown content size is not supported, yet. For now, you'd have to provide content length. Note that if the AsyncRequestBody is created using the factory methods such as AsyncRequestBody.fromString or AsyncRequestBody.fromFile, the SDK can get the size easily, and thus content length is not required)

We don't have a timeline for this feature at the moment. Could you create a feature request? https://github.com/aws/aws-sdk-java-v2/issues/new/choose It will help prioritization if there are more 👍🏼 on the feature request.

@djchapm
Copy link

djchapm commented Jun 21, 2022

Thanks @zoewangg - but quick question on process... you have issue #37 tracking requested features for Transfer Manager in v2 - the highest voted feature request there is 474 from v1 - which is what we're talking about here - streaming upload to S3 with unknown size. It's also the main goal of feature request #139 in v2, which was closed in favor of tracking/prioritizing in above ticket #37. So there are requests to cover this - I imagine if another was created, it would remove at least the history and severity of the request that these older ones have (2015 and 2017). Also - would be great if there were some action on #37 since other tickets point to it and are closed in favor of it.

Requests for streaming uploads without knowing size ahead of time - which is typical of any high performance streaming application - has been around and getting requests since ~2015 - that's 7 years... if it hasn't made implementation now despite all the upvotes and complaints, is there some motivation by AWS to specifically NOT implement this feature? I'm wondering if it's a cost issue. There are higher costs with allocating desk etc to do file based uploads in something like a kubernetes environment - so such a feature will mostly benefit clients removing the need for disk and thereby removing costs. I'm reaching here, just trying to understand why since an obvious service/API wouldn't be supplied as the primary API for S3. In reviewing the comments - I'm not the only one who is having a hard time understanding this. If it's simply not going to get prioritized ever - then a description/response with reasonable explanation might help us all from pushing/waiting/complaining about it.

@zoewangg
Copy link
Contributor

@djchapm thank you for the candid feedback. To clarify, this feature is on our radar and we do have plans to support it, however, it’s not going to be included as part of GA release. Our primary goal for Transfer Manger right now is to reach to the point where customers can migrate v1 Transfer Manager to v2. Once we achieve that, we can start to tackle new features such as this one(btw, we are hiring 🙂 #3156)

To answer your question on the process, #37 was created initially to track all feature requests for v2 transfer manager, and now that we have narrowed down the GA features, I think we should create separate issues for non-GA features. In hindsight, we should’ve made our plan more clear in #37. I was not aware of #139, and that’s why I suggested opening a new issue. I’ll go ahead and re-open #139 to track this specific feature.

aws-sdk-java-automation added a commit that referenced this issue Oct 12, 2023
…9e9f46e1e

Pull request: release <- staging/1aa9ea38-9161-4d55-b739-f469e9f46e1e
millems pushed a commit that referenced this issue Oct 13, 2023
…9e9f46e1e

Pull request: release <- staging/1aa9ea38-9161-4d55-b739-f469e9f46e1e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request A feature should be added or improved. transfer-manager
Projects
None yet
Development

No branches or pull requests

5 participants