Skip to content

[TM DownloadDirectory Part3] Various updates on downloadDirectory #3020

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,4 @@ default CompletableFuture<Void> subscribe(Consumer<T> consumer) {
subscribe(new SequentialSubscriber<>(consumer, future));
return future;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.AfterClass;
Expand Down Expand Up @@ -103,37 +104,78 @@ public static void teardown() {
S3IntegrationTestBase.cleanUp();
}

/**
* The destination directory structure should match with the directory uploaded
* <pre>
* {@code
* - destination
* - 2021
* - 1.txt
* - 2.txt
* - 2022
* - 1.txt
* - important.txt
* }
* </pre>
*/
@Test
public void downloadDirectory() {
public void downloadDirectory() throws Exception {
DirectoryDownload downloadDirectory = tm.downloadDirectory(u -> u.destinationDirectory(destinationDirectory)
.bucket(TEST_BUCKET));
CompletedDirectoryDownload completedDirectoryDownload = downloadDirectory.completionFuture().join();
CompletedDirectoryDownload completedDirectoryDownload = downloadDirectory.completionFuture().get(5, TimeUnit.SECONDS);
assertThat(completedDirectoryDownload.failedTransfers()).isEmpty();
assertTwoDirectoriesHaveSameStructure(sourceDirectory, destinationDirectory);
}

/**
* The destination directory structure should be the following with prefix "notes"
* <pre>
* {@code
* - source
* - README.md
* - CHANGELOG.md
* - notes
* - 2021
* - 1.txt
* - 2.txt
* - 2022
* - 1.txt
* - important.txt
* }
* </pre>
*/
@Test
public void downloadDirectory_withPrefix() {
public void downloadDirectory_withPrefix() throws Exception {
String prefix = "notes";
DirectoryDownload downloadDirectory = tm.downloadDirectory(u -> u.destinationDirectory(destinationDirectory)
.prefix(prefix)
.bucket(TEST_BUCKET));
CompletedDirectoryDownload completedDirectoryDownload = downloadDirectory.completionFuture().join();
CompletedDirectoryDownload completedDirectoryDownload = downloadDirectory.completionFuture().get(5, TimeUnit.SECONDS);
assertThat(completedDirectoryDownload.failedTransfers()).isEmpty();

assertTwoDirectoriesHaveSameStructure(sourceDirectory.resolve(prefix), destinationDirectory.resolve(prefix));
assertTwoDirectoriesHaveSameStructure(sourceDirectory.resolve(prefix), destinationDirectory);
}

/**
* The destination directory structure should be the following with prefix "notes"
* <pre>
* {@code
* - destination
* - 1.txt
* - 2.txt
* }
* </pre>
*/
@Test
public void downloadDirectory_withDelimiter() {
String prefix = "notes";
public void downloadDirectory_withPrefixAndDelimiter() throws Exception {
String prefix = "notes-2021";
DirectoryDownload downloadDirectory = tm.downloadDirectory(u -> u.destinationDirectory(destinationDirectory)
.delimiter(CUSTOM_DELIMITER)
.prefix(prefix)
.bucket(TEST_BUCKET_CUSTOM_DELIMITER));
CompletedDirectoryDownload completedDirectoryDownload = downloadDirectory.completionFuture().join();
CompletedDirectoryDownload completedDirectoryDownload = downloadDirectory.completionFuture().get(5, TimeUnit.SECONDS);
assertThat(completedDirectoryDownload.failedTransfers()).isEmpty();
assertTwoDirectoriesHaveSameStructure(sourceDirectory.resolve(prefix), destinationDirectory.resolve(prefix));
assertTwoDirectoriesHaveSameStructure(sourceDirectory.resolve("notes").resolve("2021"), destinationDirectory);
}

private static void assertTwoDirectoriesHaveSameStructure(Path path, Path otherPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,70 @@ public interface Builder extends CopyableBuilder<Builder, DownloadDirectoryReque
* See <a href="https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html">Organizing objects using
* prefixes</a>
*
* <p>
* When a non-empty prefix is provided, the prefix is stripped from the directory structure of the files.
* <p>
* For example, assume that you have the following keys in your bucket:
* <ul>
* <li>sample.jpg</li>
* <li>photos/2022/January/sample.jpg</li>
* <li>photos/2022/February/sample1.jpg</li>
* <li>photos/2022/February/sample2.jpg</li>
* <li>photos/2022/February/sample3.jpg</li>
* </ul>
*
* Give a request to download the bucket to a destination with a prefix of "/photos" and destination path of "test", the
* downloaded directory would like this
*
* <pre>
* {@code
* |- test
* |- 2022
* |- January
* |- sample.jpg
* |- February
* |- sample1.jpg
* |- sample2.jpg
* |- sample3.jpg
* }
* </pre>
* @param prefix the key prefix
* @return This builder for method chaining.
*/
Builder prefix(String prefix);

/**
* Specify the delimiter that will be used to retrieve the objects within the provided bucket. A delimiter causes a list
* operation to roll up all the keys that share a common prefix into a single summary list result. If not provided, {@code
* "/"} will be used.
* operation to roll up all the keys that share a common prefix into a single summary list result. It's null by default.
*
* For example, assume that you have the following keys in your bucket:
*
* <ul>
* <li>sample.jpg</li>
* <li>photos-2022-January-sample.jpg</li>
* <li>photos-2022-February-sample1.jpg</li>
* <li>photos-2022-February-sample2.jpg</li>
* <li>photos-2022-February-sample3.jpg</li>
* </ul>
*
* Give a request to download the bucket to a destination with delimiter of "-", the downloaded directory would look
* like this
*
* <pre>
* {@code
* |- test
* |- sample.jpg
* |- photos
* |- 2022
* |- January
* |- sample.jpg
* |- February
* |- sample1.jpg
* |- sample2.jpg
* |- sample3.jpg
* }
* </pre>
*
* <p>
* See <a href="https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html">Organizing objects using
* prefixes</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,32 @@ default DirectoryUpload uploadDirectory(Consumer<UploadDirectoryRequest.Builder>
* bucket will be downloaded.
*
* <p>
* The downloaded directory structure will match with the provided S3 virtual bucket.
* For example, assume that you have the following keys in your bucket:
* <ul>
* <li>sample.jpg</li>
* <li>photos/2022/January/sample.jpg</li>
* <li>photos/2022/February/sample1.jpg</li>
* <li>photos/2022/February/sample2.jpg</li>
* <li>photos/2022/February/sample3.jpg</li>
* </ul>
* Give a request to download the bucket to a destination with path of "/test", the downloaded directory would look like this
*
* <pre>
* {@code
* |- test
* |- sample.jpg
* |- photos
* |- 2022
* |- January
* |- sample.jpg
* |- February
* |- sample1.jpg
* |- sample2.jpg
* |- sample3.jpg
* }
* </pre>
* <p>
* The returned {@link CompletableFuture} only completes exceptionally if the request cannot be attempted as a whole (the
* downloadDirectoryRequest is invalid for example). The future completes successfully for partial successful
* requests, i.e., there might be failed downloads in a successfully completed response. As a result, you should check for
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.transfer.s3.internal;

import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;

/**
* An implementation of {@link Subscriber} that execute the provided function for every event and limits the number of concurrent
* function execution to the given {@code maxConcurrentRequests}
*
* @param <T> Type of data requested
*/
@SdkInternalApi
public class AsyncBufferingSubscriber<T> implements Subscriber<T> {
private static final Logger log = Logger.loggerFor(AsyncBufferingSubscriber.class);
private static final Object COMPLETE_EVENT = new Object();
private final Queue<Object> buffer;
private final CompletableFuture<?> returnFuture;
private final Function<T, CompletableFuture<?>> consumer;
private final int maxConcurrentExecutions;
private final AtomicInteger numRequestsInFlight;
private final AtomicBoolean isDelivering = new AtomicBoolean(false);
private volatile boolean isStreamingDone;
private volatile Subscription subscription;

public AsyncBufferingSubscriber(Function<T, CompletableFuture<?>> consumer,
CompletableFuture<Void> returnFuture,
int maxConcurrentExecutions) {
this.buffer = new ConcurrentLinkedQueue<>();
this.returnFuture = returnFuture;
this.consumer = consumer;
this.maxConcurrentExecutions = maxConcurrentExecutions;
this.numRequestsInFlight = new AtomicInteger(0);
}

@Override
public void onSubscribe(Subscription subscription) {
Validate.paramNotNull(subscription, "subscription");
if (this.subscription != null) {
log.warn(() -> "The subscriber has already been subscribed. Cancelling the incoming subscription");
subscription.cancel();
return;
}
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(T item) {
if (item == null) {
subscription.cancel();
NullPointerException exception = new NullPointerException("Item must not be null");
returnFuture.completeExceptionally(exception);
throw exception;
}

try {
buffer.add(item);
flushBufferIfNeeded();
} catch (Exception e) {
isStreamingDone = true;
subscription.cancel();
returnFuture.completeExceptionally(e);
}
}

private void flushBufferIfNeeded() {
if (buffer.isEmpty()) {
if (isStreamingDone && numRequestsInFlight.get() == 0) {
returnFuture.complete(null);
} else {
subscription.request(1);
}
return;
}

if (isDelivering.compareAndSet(false, true)) {
try {
Object firstEvent = buffer.peek();
if (isCompleteEvent(firstEvent)) {
Object event = buffer.poll();
handleCompleteEvent(event);
return;
}

while (!buffer.isEmpty() && numRequestsInFlight.get() < maxConcurrentExecutions) {
Object item = buffer.poll();
if (item == null) {
break;
}

if (isCompleteEvent(item)) {
handleCompleteEvent(item);
return;
}

deliverItem((T) item);
}
} finally {
isDelivering.set(false);
}
}
}

private void deliverItem(T item) {
int numberOfRequestInFlight = numRequestsInFlight.incrementAndGet();
log.debug(() -> "Delivering next item, numRequestInFlight=" + numberOfRequestInFlight);

consumer.apply(item).whenComplete((r, t) -> {
numRequestsInFlight.decrementAndGet();
if (!isStreamingDone) {
subscription.request(1);
} else {
flushBufferIfNeeded();
}
});
}

private void handleCompleteEvent(Object event) {
isStreamingDone = true;
if (numRequestsInFlight.get() == 0) {
returnFuture.complete(null);
}
}

@Override
public void onError(Throwable t) {
handleError(t);
}

private void handleError(Throwable t) {
returnFuture.completeExceptionally(t);
buffer.clear();
}

@Override
public void onComplete() {
buffer.add(COMPLETE_EVENT);
flushBufferIfNeeded();
}

private static boolean isCompleteEvent(Object event) {
return COMPLETE_EVENT.equals(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public final class DefaultFileDownload implements FileDownload {
private final TransferProgress progress;

DefaultFileDownload(CompletableFuture<CompletedFileDownload> completionFuture,
TransferProgress progress) {
TransferProgress progress) {
this.completionFuture = completionFuture;
this.progress = progress;
}
Expand Down
Loading