Skip to content

Commit 5a1f4dd

Browse files
pjfanningsteveloughran
authored andcommitted
HADOOP-18180. Replace use of twitter util-core with java futures in S3A prefetching stream (apache#4115)
Contributed by PJ Fanning.
1 parent fd24290 commit 5a1f4dd

15 files changed

+218
-67
lines changed

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -468,12 +468,6 @@
468468
<artifactId>aws-java-sdk-bundle</artifactId>
469469
<scope>compile</scope>
470470
</dependency>
471-
<dependency>
472-
<groupId>com.twitter</groupId>
473-
<artifactId>util-core_2.11</artifactId>
474-
<version>21.2.0</version>
475-
<scope>compile</scope>
476-
</dependency>
477471
<dependency>
478472
<groupId>org.assertj</groupId>
479473
<artifactId>assertj-core</artifactId>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@
2222
import java.nio.ByteBuffer;
2323
import java.util.ArrayList;
2424
import java.util.List;
25+
import java.util.concurrent.Future;
2526
import java.util.zip.CRC32;
2627

27-
import com.twitter.util.Awaitable.CanAwait;
28-
import com.twitter.util.Future;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
3130

@@ -263,8 +262,6 @@ public boolean stateEqualsOneOf(State... states) {
263262
return false;
264263
}
265264

266-
private static final CanAwait CAN_AWAIT = () -> false;
267-
268265
public String toString() {
269266

270267
return String.format(
@@ -281,7 +278,7 @@ private String getFutureStr(Future<Void> f) {
281278
if (f == null) {
282279
return "--";
283280
} else {
284-
return this.action.isReady(CAN_AWAIT) ? "done" : "not done";
281+
return this.action.isDone() ? "done" : "not done";
285282
}
286283
}
287284

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@
2626
import java.util.IdentityHashMap;
2727
import java.util.List;
2828
import java.util.Map;
29-
import java.util.concurrent.CancellationException;
29+
import java.util.concurrent.Future;
3030

31-
import com.twitter.util.Future;
3231
import org.slf4j.Logger;
3332
import org.slf4j.LoggerFactory;
3433

@@ -233,7 +232,7 @@ public synchronized void close() {
233232
for (BufferData data : this.getAll()) {
234233
Future<Void> actionFuture = data.getActionFuture();
235234
if (actionFuture != null) {
236-
actionFuture.raise(new CancellationException("BufferPool is closing."));
235+
actionFuture.cancel(true);
237236
}
238237
}
239238

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121

2222
import java.io.IOException;
2323
import java.nio.ByteBuffer;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.Future;
26+
import java.util.concurrent.TimeUnit;
2427
import java.util.concurrent.atomic.AtomicBoolean;
2528
import java.util.concurrent.atomic.AtomicInteger;
29+
import java.util.function.Supplier;
2630

27-
import com.twitter.util.Await;
28-
import com.twitter.util.ExceptionalFunction0;
29-
import com.twitter.util.Future;
30-
import com.twitter.util.FuturePool;
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333

@@ -37,9 +37,10 @@
3737
*/
3838
public abstract class CachingBlockManager extends BlockManager {
3939
private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class);
40+
private static final int TIMEOUT_MINUTES = 60;
4041

4142
// Asynchronous tasks are performed in this pool.
42-
private final FuturePool futurePool;
43+
private final ExecutorServiceFuturePool futurePool;
4344

4445
// Pool of shared ByteBuffer instances.
4546
private BufferPool bufferPool;
@@ -78,7 +79,7 @@ public abstract class CachingBlockManager extends BlockManager {
7879
* @throws IllegalArgumentException if bufferPoolSize is zero or negative.
7980
*/
8081
public CachingBlockManager(
81-
FuturePool futurePool,
82+
ExecutorServiceFuturePool futurePool,
8283
BlockData blockData,
8384
int bufferPoolSize) {
8485
super(blockData);
@@ -247,7 +248,7 @@ public void requestPrefetch(int blockNumber) {
247248

248249
BlockOperations.Operation op = this.ops.requestPrefetch(blockNumber);
249250
PrefetchTask prefetchTask = new PrefetchTask(data, this);
250-
Future<Void> prefetchFuture = this.futurePool.apply(prefetchTask);
251+
Future<Void> prefetchFuture = this.futurePool.executeFunction(prefetchTask);
251252
data.setPrefetch(prefetchFuture);
252253
this.ops.end(op);
253254
}
@@ -344,7 +345,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
344345
/**
345346
* Read task that is submitted to the future pool.
346347
*/
347-
private static class PrefetchTask extends ExceptionalFunction0<Void> {
348+
private static class PrefetchTask implements Supplier<Void> {
348349
private final BufferData data;
349350
private final CachingBlockManager blockManager;
350351

@@ -354,7 +355,7 @@ private static class PrefetchTask extends ExceptionalFunction0<Void> {
354355
}
355356

356357
@Override
357-
public Void applyE() {
358+
public Void get() {
358359
try {
359360
this.blockManager.prefetch(data);
360361
} catch (Exception e) {
@@ -412,11 +413,13 @@ public void requestCaching(BufferData data) {
412413
if (state == BufferData.State.PREFETCHING) {
413414
blockFuture = data.getActionFuture();
414415
} else {
415-
blockFuture = Future.value(null);
416+
CompletableFuture<Void> cf = new CompletableFuture<>();
417+
cf.complete(null);
418+
blockFuture = cf;
416419
}
417420

418421
CachePutTask task = new CachePutTask(data, blockFuture, this);
419-
Future<Void> actionFuture = this.futurePool.apply(task);
422+
Future<Void> actionFuture = this.futurePool.executeFunction(task);
420423
data.setCaching(actionFuture);
421424
this.ops.end(op);
422425
}
@@ -433,14 +436,13 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture) {
433436
}
434437

435438
try {
436-
Await.result(blockFuture);
439+
blockFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
437440
if (data.stateEqualsOneOf(BufferData.State.DONE)) {
438441
// There was an error during prefetch.
439442
return;
440443
}
441444
} catch (Exception e) {
442-
String message = String.format("error waitng on blockFuture: %s", data);
443-
LOG.error(message, e);
445+
LOG.error("error waiting on blockFuture: {}", data, e);
444446
data.setDone();
445447
return;
446448
}
@@ -500,7 +502,7 @@ protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
500502
this.cache.put(blockNumber, buffer);
501503
}
502504

503-
private static class CachePutTask extends ExceptionalFunction0<Void> {
505+
private static class CachePutTask implements Supplier<Void> {
504506
private final BufferData data;
505507

506508
// Block being asynchronously fetched.
@@ -519,7 +521,7 @@ private static class CachePutTask extends ExceptionalFunction0<Void> {
519521
}
520522

521523
@Override
522-
public Void applyE() {
524+
public Void get() {
523525
this.blockManager.addToCacheAndRelease(this.data, this.blockFuture);
524526
return null;
525527
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hadoop.fs.common;
21+
22+
import java.util.Locale;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Future;
25+
import java.util.function.Supplier;
26+
27+
/**
28+
* A FuturePool implementation backed by a java.util.concurrent.ExecutorService.
29+
*
30+
* If a piece of work has started, it cannot (currently) be cancelled.
31+
*
32+
* This class is a simplified version of <code>com.twitter:util-core_2.11</code>
33+
* ExecutorServiceFuturePool designed to avoid depending on that Scala library.
34+
* One problem with using a Scala library is that many downstream projects
35+
* (eg Apache Spark) use Scala, and they might want to use a different version of Scala
36+
* from the version that Hadoop chooses to use.
37+
*
38+
*/
39+
public class ExecutorServiceFuturePool {
40+
private ExecutorService executor;
41+
42+
public ExecutorServiceFuturePool(ExecutorService executor) {
43+
this.executor = executor;
44+
}
45+
46+
/**
47+
* @param f function to run in future on executor pool
48+
* @return future
49+
* @throws java.util.concurrent.RejectedExecutionException can be thrown
50+
* @throws NullPointerException if f param is null
51+
*/
52+
public Future<Void> executeFunction(final Supplier<Void> f) {
53+
return executor.submit(f::get);
54+
}
55+
56+
/**
57+
* @param r runnable to run in future on executor pool
58+
* @return future
59+
* @throws java.util.concurrent.RejectedExecutionException can be thrown
60+
* @throws NullPointerException if r param is null
61+
*/
62+
@SuppressWarnings("unchecked")
63+
public Future<Void> executeRunnable(final Runnable r) {
64+
return (Future<Void>) executor.submit(r::run);
65+
}
66+
67+
public String toString() {
68+
return String.format(Locale.ROOT, "ExecutorServiceFuturePool(executor=%s)", executor);
69+
}
70+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,7 @@
7979
import com.amazonaws.services.s3.transfer.model.UploadResult;
8080
import com.amazonaws.event.ProgressListener;
8181

82-
import com.twitter.util.ExecutorServiceFuturePool;
83-
import com.twitter.util.FuturePool;
82+
import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
8483
import org.slf4j.Logger;
8584
import org.slf4j.LoggerFactory;
8685

@@ -294,7 +293,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
294293
private ThreadPoolExecutor unboundedThreadPool;
295294

296295
// S3 reads are prefetched asynchronously using this future pool.
297-
private FuturePool futurePool;
296+
private ExecutorServiceFuturePool futurePool;
298297

299298
// If true, the prefetching input stream is used for reads.
300299
private boolean prefetchEnabled;
@@ -1620,7 +1619,7 @@ protected S3AReadOpContext createReadContext(
16201619
statisticsContext,
16211620
fileStatus,
16221621
vectoredIOContext,
1623-
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
1622+
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator(),
16241623
futurePool,
16251624
prefetchBlockSize,
16261625
prefetchBlockCount)

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818

1919
package org.apache.hadoop.fs.s3a;
2020

21-
import com.twitter.util.FuturePool;
22-
2321
import org.apache.hadoop.fs.FileStatus;
2422
import org.apache.hadoop.fs.FileSystem;
2523
import org.apache.hadoop.fs.Path;
24+
import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
2625
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
2726
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
2827
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
@@ -77,7 +76,7 @@ public class S3AReadOpContext extends S3AOpContext {
7776
private final IOStatisticsAggregator ioStatisticsAggregator;
7877

7978
// S3 reads are prefetched asynchronously using this future pool.
80-
private FuturePool futurePool;
79+
private ExecutorServiceFuturePool futurePool;
8180

8281
// Size in bytes of a single prefetch block.
8382
private final int prefetchBlockSize;
@@ -94,7 +93,7 @@ public class S3AReadOpContext extends S3AOpContext {
9493
* @param dstFileStatus target file status
9594
* @param vectoredIOContext context for vectored read operation.
9695
* @param ioStatisticsAggregator IOStatistics aggregator for each thread.
97-
* @param futurePool the FuturePool instance used by async prefetches.
96+
* @param futurePool the ExecutorServiceFuturePool instance used by async prefetches.
9897
* @param prefetchBlockSize the size (in number of bytes) of each prefetched block.
9998
* @param prefetchBlockCount maximum number of prefetched blocks.
10099
*/
@@ -106,7 +105,7 @@ public S3AReadOpContext(
106105
FileStatus dstFileStatus,
107106
VectoredIOContext vectoredIOContext,
108107
IOStatisticsAggregator ioStatisticsAggregator,
109-
FuturePool futurePool,
108+
ExecutorServiceFuturePool futurePool,
110109
int prefetchBlockSize,
111110
int prefetchBlockCount) {
112111

@@ -258,11 +257,11 @@ public IOStatisticsAggregator getIOStatisticsAggregator() {
258257
}
259258

260259
/**
261-
* Gets the {@code FuturePool} used for asynchronous prefetches.
260+
* Gets the {@code ExecutorServiceFuturePool} used for asynchronous prefetches.
262261
*
263-
* @return the {@code FuturePool} used for asynchronous prefetches.
262+
* @return the {@code ExecutorServiceFuturePool} used for asynchronous prefetches.
264263
*/
265-
public FuturePool getFuturePool() {
264+
public ExecutorServiceFuturePool getFuturePool() {
266265
return this.futurePool;
267266
}
268267

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import java.io.IOException;
2323
import java.nio.ByteBuffer;
2424

25-
import com.twitter.util.FuturePool;
2625
import org.slf4j.Logger;
2726
import org.slf4j.LoggerFactory;
2827

2928
import org.apache.hadoop.fs.common.BlockData;
3029
import org.apache.hadoop.fs.common.CachingBlockManager;
30+
import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
3131
import org.apache.hadoop.fs.common.Validate;
3232

3333
/**
@@ -52,7 +52,7 @@ public class S3CachingBlockManager extends CachingBlockManager {
5252
* @throws IllegalArgumentException if reader is null.
5353
*/
5454
public S3CachingBlockManager(
55-
FuturePool futurePool,
55+
ExecutorServiceFuturePool futurePool,
5656
S3Reader reader,
5757
BlockData blockData,
5858
int bufferPoolSize) {

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121

2222
import java.io.IOException;
2323

24-
import com.twitter.util.FuturePool;
2524
import org.slf4j.Logger;
2625
import org.slf4j.LoggerFactory;
2726

2827
import org.apache.hadoop.fs.common.BlockData;
2928
import org.apache.hadoop.fs.common.BlockManager;
3029
import org.apache.hadoop.fs.common.BufferData;
30+
import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
3131
import org.apache.hadoop.fs.s3a.S3AInputStream;
3232
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
3333
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
@@ -186,7 +186,7 @@ public String toString() {
186186
}
187187

188188
protected BlockManager createBlockManager(
189-
FuturePool futurePool,
189+
ExecutorServiceFuturePool futurePool,
190190
S3Reader reader,
191191
BlockData blockData,
192192
int bufferPoolSize) {

0 commit comments

Comments
 (0)