Skip to content

S3 getObject combined with AsyncResponseTransformer.toBytes() copies too much data #3193

@chibenwa

Description

@chibenwa

Describe the bug

Given the following piece of code:

client.getObject(
                    builder -> builder.bucket(bucket),
                    AsyncResponseTransformer.toBytes())

Several data copies do take place:

  • Netty ByteBuf get's copied into a heap ByteBuffer. I understand this copy is done to present a common abstraction level for all transports. Yet maybe ByteBuf::nioBuffers could be relied on to minimize copies. I did not expore this option myself.
  • AsyncResponseTransformer.toBytes() relies internally on an unsized ByteArrayOutputStream (that thus will expend many times) and the result will be copied when calling toByteArray() and another defensive copy is carried other when transforming the future.

Expected Behavior

Minimize memory copies in such a scenario to minimize heap pressure.

Current Behavior

Screenshot from 2022-05-16 10-20-07

Reproduction Steps

client.getObject(
                    builder -> builder.bucket(bucket),
                    AsyncResponseTransformer.toBytes())

I used https://github.com/jvm-profiling-tools/async-profiler for the flame graphs.

Possible Solution

  • Remove needless defensive copies inside ByteArrayAsyncResponseTransformer. The byte array is passed to the caller, who then becomes responsible of it, and nobody else references the old byte array once the publisher completes. This can be an instant win coming at a very low price.
  • Rely on GetResponse::contentLength to size a byte array and copy incoming buffers to it in place. This requires knowledge about response type... Thus this might be hardly doable in a generic fashion.

I tested this successfully:

package org.apache.james.blob.objectstorage.aws;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;

/**
* Class copied for {@link software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer}
*
* Modified to take advantage of the content length of the get response in order to use a sized array
* upon content copy. This avoids the usage of a ByteArrayOutputStream that yields additional copies
* (resizing upon copy, copy of the resulting byte array).
*
* A defensive copy upon returning the result is also removed (responsibility transfered to the caller, no other usages)
*/
public class MinimalCopyBytesResponseTransformer implements AsyncResponseTransformer<GetObjectResponse, ResponseBytes<GetObjectResponse>> {
   private volatile CompletableFuture<byte[]> cf;
   private volatile GetObjectResponse response;

   public MinimalCopyBytesResponseTransformer() {

   }

   public CompletableFuture<ResponseBytes<GetObjectResponse>> prepare() {
       this.cf = new CompletableFuture();
       // Modifcation: Remove a defensive copy of the buffer upon completion: the caller is now the sole user of the array
       return this.cf.thenApply(arr -> ResponseBytes.fromByteArrayUnsafe(response, arr));
   }

   public void onResponse(GetObjectResponse response) {
       this.response = response;
   }

   public void onStream(SdkPublisher<ByteBuffer> publisher) {
       publisher.subscribe(new BaosSubscriber(this.cf, response.contentLength().intValue()));
   }

   public void exceptionOccurred(Throwable throwable) {
       this.cf.completeExceptionally(throwable);
   }

   static class BaosSubscriber implements Subscriber<ByteBuffer> {
       private final CompletableFuture<byte[]> resultFuture;
       // Modification: use a byte array instead of the ByteArrayInputStream and track position
       private final byte[] buffer;
       private int pos = 0;
       private Subscription subscription;

       BaosSubscriber(CompletableFuture<byte[]> resultFuture, int size) {
           this.resultFuture = resultFuture;
           this.buffer = new byte[size];
       }

       public void onSubscribe(Subscription s) {
           if (this.subscription != null) {
               s.cancel();
           } else {
               this.subscription = s;
               this.subscription.request(9223372036854775807L);
           }
       }

       public void onNext(ByteBuffer byteBuffer) {
           // Modification: copy the response part in place into the result buffer and track position
           int written = byteBuffer.remaining();
           byteBuffer.get(buffer, pos, written);
           pos += written;
           this.subscription.request(1L);
       }

       public void onError(Throwable throwable) {
           this.resultFuture.completeExceptionally(throwable);
       }

       public void onComplete() {
           this.resultFuture.complete(this.buffer);
       }
   }
}

Additional Information/Context

Apache James relies on the S3 driver for email content, thus speed and heap impact is important for us as S3 getObject will be called on each email content read.

AWS Java SDK version used

2.17.189

JDK version used

openjdk version "11.0.15" 2022-04-19

Operating System and version

Ubuntu 20.04.4 LTS

Metadata

Metadata

Assignees

No one assigned

    Labels

    feature-requestA feature should be added or improved.p2This is a standard priority issue

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions