Skip to content

Commit cbf7c89

Browse files
committed
fix test
1 parent 300658a commit cbf7c89

File tree

5 files changed

+38
-27
lines changed

5 files changed

+38
-27
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,28 @@
1818

1919
package org.apache.hadoop.hdfs.protocolPB;
2020

21+
import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
2122
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
2223
import org.apache.hadoop.io.Writable;
23-
import org.apache.hadoop.ipc.CallerContext;
2424
import org.apache.hadoop.ipc.Client;
2525
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
26-
import org.apache.hadoop.ipc.Server;
2726
import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
2827
import org.apache.hadoop.util.concurrent.AsyncGet;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
3130

3231
import java.io.IOException;
3332
import java.util.concurrent.CompletableFuture;
33+
import java.util.concurrent.Executor;
3434

3535
import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
36-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
3736
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
3837
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
3938
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
4039

4140
public final class AsyncRpcProtocolPBUtil {
4241
public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);
42+
private static Executor worker;
4343

4444
private AsyncRpcProtocolPBUtil() {}
4545

@@ -50,20 +50,24 @@ public static <T, R> R asyncIpcClient(
5050
AsyncGet<T, Exception> asyncReqMessage =
5151
(AsyncGet<T, Exception>) ProtobufRpcEngine2.getAsyncReturnMessage();
5252
CompletableFuture<Writable> responseFuture = Client.getResponseFuture();
53-
// transfer originCall & callerContext to worker threads of executor.
54-
final Server.Call originCall = Server.getCurCall().get();
55-
final CallerContext originContext = CallerContext.getCurrent();
56-
asyncCompleteWith(responseFuture);
57-
asyncApply(o -> {
53+
// transfer thread local context to worker threads of executor.
54+
ThreadLocalContext threadLocalContext = new ThreadLocalContext();
55+
asyncCompleteWith(responseFuture.handleAsync((result, e) -> {
56+
threadLocalContext.transfer();
57+
if (e != null) {
58+
throw warpCompletionException(e);
59+
}
5860
try {
59-
Server.getCurCall().set(originCall);
60-
CallerContext.setCurrent(originContext);
6161
T res = asyncReqMessage.get(-1, null);
6262
return response.apply(res);
63-
} catch (Exception e) {
64-
throw warpCompletionException(e);
63+
} catch (Exception ex) {
64+
throw warpCompletionException(ex);
6565
}
66-
});
66+
}, worker));
6767
return asyncReturn(clazz);
6868
}
69+
70+
public static void setWorker(Executor worker) {
71+
AsyncRpcProtocolPBUtil.worker = worker;
72+
}
6973
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.apache.hadoop.fs.Path;
6868
import org.apache.hadoop.hdfs.HAUtil;
6969
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
70+
import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
7071
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
7172
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
7273
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
@@ -461,18 +462,17 @@ protected void initAsyncThreadPool() {
461462
DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
462463
int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
463464
DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
464-
synchronized (RouterRpcServer.class) {
465-
if (asyncRouterHandler == null) {
466-
LOG.info("init router async handler count: {}", asyncHandlerCount);
467-
asyncRouterHandler = Executors.newFixedThreadPool(
468-
asyncHandlerCount, new AsyncThreadFactory("router async handler "));
469-
}
470-
if (asyncRouterResponder == null) {
471-
LOG.info("init router async responder count: {}", asyncResponderCount);
472-
asyncRouterResponder = Executors.newFixedThreadPool(
473-
asyncResponderCount, new AsyncThreadFactory("router async responder "));
474-
}
465+
if (asyncRouterHandler == null) {
466+
LOG.info("init router async handler count: {}", asyncHandlerCount);
467+
asyncRouterHandler = Executors.newFixedThreadPool(
468+
asyncHandlerCount, new AsyncThreadFactory("router async handler "));
469+
}
470+
if (asyncRouterResponder == null) {
471+
LOG.info("init router async responder count: {}", asyncResponderCount);
472+
asyncRouterResponder = Executors.newFixedThreadPool(
473+
asyncResponderCount, new AsyncThreadFactory("router async responder "));
475474
}
475+
AsyncRpcProtocolPBUtil.setWorker(asyncRouterResponder);
476476
}
477477

478478
/**

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,14 @@ public static <R> R asyncReturn(Class<R> clazz) {
8080
if (clazz == null) {
8181
return null;
8282
}
83-
if (clazz.equals(Boolean.class)) {
83+
if (clazz.equals(Boolean.class)
84+
| clazz.equals(boolean.class)) {
8485
return (R) BOOLEAN_RESULT;
85-
} else if (clazz.equals(Long.class)) {
86+
} else if (clazz.equals(Long.class)
87+
| clazz.equals(long.class)) {
8688
return (R) LONG_RESULT;
87-
} else if (clazz.equals(Integer.class)) {
89+
} else if (clazz.equals(Integer.class)
90+
| clazz.equals(int.class)) {
8891
return (R) INT_RESULT;
8992
}
9093
return (R) NULL_RESULT;

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import java.io.IOException;
4242
import java.net.InetSocketAddress;
43+
import java.util.concurrent.ForkJoinPool;
4344

4445
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
4546
import static org.junit.Assert.assertEquals;
@@ -53,6 +54,7 @@ public class TestAsyncRpcProtocolPBUtil {
5354

5455
@Before
5556
public void setUp() throws IOException {
57+
AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool());
5658
Configuration conf = new Configuration();
5759
RPC.setProtocolEngine(conf, TestRpcBase.TestRpcService.class,
5860
ProtobufRpcEngine2.class);

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.net.InetSocketAddress;
5353
import java.util.EnumSet;
5454
import java.util.List;
55+
import java.util.concurrent.ForkJoinPool;
5556
import java.util.concurrent.atomic.AtomicBoolean;
5657

5758
import static org.apache.hadoop.crypto.CryptoProtocolVersion.ENCRYPTION_ZONES;
@@ -81,6 +82,7 @@ public class TestRouterClientSideTranslatorPB {
8182

8283
@BeforeClass
8384
public static void setUp() throws Exception {
85+
AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool());
8486
conf = new HdfsConfiguration();
8587
cluster = (new MiniDFSCluster.Builder(conf))
8688
.numDataNodes(1).build();

0 commit comments

Comments
 (0)