diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNetworkTopologyServlet.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNetworkTopologyServlet.java index e517066c81c20..850f281b2bb12 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNetworkTopologyServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNetworkTopologyServlet.java @@ -31,6 +31,8 @@ import java.util.Arrays; import java.util.List; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; + /** * A servlet to print out the network topology from router. */ @@ -49,9 +51,19 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) } Router router = RouterHttpServer.getRouterFromContext(context); - DatanodeInfo[] datanodeReport = - router.getRpcServer().getDatanodeReport( - HdfsConstants.DatanodeReportType.ALL); + DatanodeInfo[] datanodeReport = null; + if (router.getRpcServer().isAsync()) { + router.getRpcServer().getDatanodeReportAsync( + HdfsConstants.DatanodeReportType.ALL, true, 0); + try { + datanodeReport = syncReturn(DatanodeInfo[].class); + } catch (Exception e) { + throw new IOException(e); + } + } else { + datanodeReport = router.getRpcServer().getDatanodeReport( + HdfsConstants.DatanodeReportType.ALL); + } List datanodeInfos = Arrays.asList(datanodeReport); try (PrintStream out = new PrintStream( diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNetworkTopologyServlet.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNetworkTopologyServlet.java index 02455c1672572..39ff50bf60dd1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNetworkTopologyServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNetworkTopologyServlet.java @@ -24,10 +24,17 @@ import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver; import org.apache.hadoop.io.IOUtils; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.ByteArrayOutputStream; +import java.lang.reflect.Method; import java.net.HttpURLConnection; import java.net.URL; import java.util.Iterator; @@ -36,21 +43,31 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTP_ENABLE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.apache.hadoop.hdfs.server.federation.router.TestRouterNetworkTopologyServlet.clusterNoDatanodes; +import static org.apache.hadoop.hdfs.server.federation.router.TestRouterNetworkTopologyServlet.clusterWithDatanodes; +import static org.apache.hadoop.hdfs.server.federation.router.TestRouterNetworkTopologyServlet.setUp; +@SuppressWarnings("checkstyle:VisibilityModifier") public class TestRouterNetworkTopologyServlet { - private static StateStoreDFSCluster clusterWithDatanodes; - private static StateStoreDFSCluster clusterNoDatanodes; + public static StateStoreDFSCluster clusterWithDatanodes; + public static StateStoreDFSCluster clusterNoDatanodes; + + public static final String ASYNC_MODE = "ASYNC"; + public static final String SYNC_MODE = "SYNC"; - @BeforeAll - public static void setUp() throws Exception { - // Builder configuration + public static void setUp(String rpcMode) throws Exception { + // Builder configuration. Configuration routerConf = new RouterConfigBuilder().stateStore().admin().quota().rpc().build(); routerConf.set(DFS_ROUTER_HTTP_ENABLE, "true"); + // Use async router rpc. + if (rpcMode.equals("ASYNC")) { + routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + } Configuration hdfsConf = new Configuration(false); - // Build and start a federated cluster + // Build and start a federated cluster. clusterWithDatanodes = new StateStoreDFSCluster(false, 2, MultipleDestinationMountTableResolver.class); clusterWithDatanodes.addNamenodeOverrides(hdfsConf); @@ -67,7 +84,7 @@ public static void setUp() throws Exception { clusterWithDatanodes.waitClusterUp(); clusterWithDatanodes.waitActiveNamespaces(); - // Build and start a federated cluster + // Build and start a federated cluster. clusterNoDatanodes = new StateStoreDFSCluster(false, 2, MultipleDestinationMountTableResolver.class); clusterNoDatanodes.addNamenodeOverrides(hdfsConf); @@ -80,13 +97,74 @@ public static void setUp() throws Exception { clusterNoDatanodes.waitActiveNamespaces(); } - @Test + @Nested + @ExtendWith(RouterServerHelper.class) + class TestWithAsyncRouterRpc { + + @ParameterizedTest + @ValueSource(strings = {ASYNC_MODE}) + public void testPrintTopologyTextFormatAsync(String rpcMode) throws Exception { + testPrintTopologyTextFormat(); + } + + @ParameterizedTest + @ValueSource(strings = {ASYNC_MODE}) + public void testPrintTopologyJsonFormatAsync(String rpcMode) throws Exception { + testPrintTopologyJsonFormat(); + } + + @ParameterizedTest + @ValueSource(strings = {ASYNC_MODE}) + public void testPrintTopologyNoDatanodesTextFormatAsync(String rpcMode) + throws Exception { + testPrintTopologyNoDatanodesTextFormat(); + } + + @ParameterizedTest + @ValueSource(strings = {ASYNC_MODE}) + public void testPrintTopologyNoDatanodesJsonFormatAsync(String rpcMode) + throws Exception { + testPrintTopologyNoDatanodesJsonFormat(); + } + } + + @Nested + @ExtendWith(RouterServerHelper.class) + class TestWithSyncRouterRpc { + + @ParameterizedTest + @ValueSource(strings = {SYNC_MODE}) + public void testPrintTopologyTextFormatSync(String rpcMode) throws Exception { + testPrintTopologyTextFormat(); + } + + @ParameterizedTest + @ValueSource(strings = {SYNC_MODE}) + public void testPrintTopologyJsonFormatSync(String rpcMode) throws Exception { + testPrintTopologyJsonFormat(); + } + + @ParameterizedTest + @ValueSource(strings = {SYNC_MODE}) + public void testPrintTopologyNoDatanodesTextFormatSync(String rpcMode) + throws Exception { + testPrintTopologyNoDatanodesTextFormat(); + } + + @ParameterizedTest + @ValueSource(strings = {SYNC_MODE}) + public void testPrintTopologyNoDatanodesJsonFormatSync(String rpcMode) + throws Exception { + testPrintTopologyNoDatanodesJsonFormat(); + } + } + public void testPrintTopologyTextFormat() throws Exception { - // get http Address + // Get http Address. String httpAddress = clusterWithDatanodes.getRandomRouter().getRouter() .getHttpServerAddress().toString(); - // send http request + // Send http request. URL url = new URL("http:/" + httpAddress + "/topology"); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setReadTimeout(20000); @@ -101,7 +179,7 @@ public void testPrintTopologyTextFormat() throws Exception { sb.append("\n-- Network Topology -- "); String topology = sb.toString(); - // assert rack info + // Assert rack info. assertTrue(topology.contains("/ns0/rack1")); assertTrue(topology.contains("/ns0/rack2")); assertTrue(topology.contains("/ns0/rack3")); @@ -109,18 +187,17 @@ public void testPrintTopologyTextFormat() throws Exception { assertTrue(topology.contains("/ns1/rack5")); assertTrue(topology.contains("/ns1/rack6")); - // assert node number + // Assert node number. assertEquals(18, topology.split("127.0.0.1").length - 1); } - @Test public void testPrintTopologyJsonFormat() throws Exception { - // get http Address + // Get http Address. String httpAddress = clusterWithDatanodes.getRandomRouter().getRouter() - .getHttpServerAddress().toString(); + .getHttpServerAddress().toString(); - // send http request + // Send http request. URL url = new URL("http:/" + httpAddress + "/topology"); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setReadTimeout(20000); @@ -132,13 +209,13 @@ public void testPrintTopologyJsonFormat() throws Exception { IOUtils.copyBytes(conn.getInputStream(), out, 4096, true); String topology = out.toString(); - // parse json + // Parse json. JsonNode racks = new ObjectMapper().readTree(topology); - // assert rack number + // Assert rack number. assertEquals(6, racks.size()); - // assert rack info + // Assert rack info. assertTrue(topology.contains("/ns0/rack1")); assertTrue(topology.contains("/ns0/rack2")); assertTrue(topology.contains("/ns0/rack3")); @@ -146,7 +223,7 @@ public void testPrintTopologyJsonFormat() throws Exception { assertTrue(topology.contains("/ns1/rack5")); assertTrue(topology.contains("/ns1/rack6")); - // assert node number + // Assert node number. Iterator elements = racks.elements(); int dataNodesCount = 0; while(elements.hasNext()){ @@ -159,13 +236,12 @@ public void testPrintTopologyJsonFormat() throws Exception { assertEquals(18, dataNodesCount); } - @Test public void testPrintTopologyNoDatanodesTextFormat() throws Exception { - // get http Address + // Get http Address. String httpAddress = clusterNoDatanodes.getRandomRouter().getRouter() .getHttpServerAddress().toString(); - // send http request + // Send http request. URL url = new URL("http:/" + httpAddress + "/topology"); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setReadTimeout(20000); @@ -179,17 +255,16 @@ public void testPrintTopologyNoDatanodesTextFormat() throws Exception { sb.append("\n-- Network Topology -- "); String topology = sb.toString(); - // assert node number + // Assert node number. assertTrue(topology.contains("No DataNodes")); } - @Test public void testPrintTopologyNoDatanodesJsonFormat() throws Exception { - // get http Address + // Get http Address. String httpAddress = clusterNoDatanodes.getRandomRouter().getRouter() .getHttpServerAddress().toString(); - // send http request + // Send http request. URL url = new URL("http:/" + httpAddress + "/topology"); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setReadTimeout(20000); @@ -204,7 +279,41 @@ public void testPrintTopologyNoDatanodesJsonFormat() throws Exception { sb.append("\n-- Network Topology -- "); String topology = sb.toString(); - // assert node number + // Assert node number. assertTrue(topology.contains("No DataNodes")); } } + +class RouterServerHelper implements BeforeEachCallback, AfterAllCallback { + + private static final ThreadLocal TEST_ROUTER_SERVER_TL = + new InheritableThreadLocal(); + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + Method testMethod = context.getRequiredTestMethod(); + ValueSource enumAnnotation = testMethod.getAnnotation(ValueSource.class); + if (enumAnnotation != null) { + String[] strings = enumAnnotation.strings(); + for (String rpcMode : strings) { + if (TEST_ROUTER_SERVER_TL.get() == null) { + setUp(rpcMode); + } + } + } + TEST_ROUTER_SERVER_TL.set(RouterServerHelper.this); + } + + @Override + public void afterAll(ExtensionContext context) { + if (clusterWithDatanodes != null) { + clusterWithDatanodes.shutdown(); + clusterWithDatanodes = null; + } + if (clusterNoDatanodes != null) { + clusterNoDatanodes.shutdown(); + clusterNoDatanodes = null; + } + TEST_ROUTER_SERVER_TL.remove(); + } +}