Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Arrays;
import java.util.List;

import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;

/**
* A servlet to print out the network topology from router.
*/
Expand All @@ -49,9 +51,19 @@ public void doGet(HttpServletRequest request, HttpServletResponse response)
}

Router router = RouterHttpServer.getRouterFromContext(context);
DatanodeInfo[] datanodeReport =
router.getRpcServer().getDatanodeReport(
HdfsConstants.DatanodeReportType.ALL);
DatanodeInfo[] datanodeReport = null;
if (router.getRpcServer().isAsync()) {
router.getRpcServer().getDatanodeReportAsync(
HdfsConstants.DatanodeReportType.ALL, true, 0);
try {
datanodeReport = syncReturn(DatanodeInfo[].class);
} catch (Exception e) {
throw new IOException(e);
}
} else {
datanodeReport = router.getRpcServer().getDatanodeReport(
HdfsConstants.DatanodeReportType.ALL);
}
List<Node> datanodeInfos = Arrays.asList(datanodeReport);

try (PrintStream out = new PrintStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,17 @@
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.apache.hadoop.io.IOUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.ByteArrayOutputStream;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Iterator;
Expand All @@ -36,21 +43,31 @@
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTP_ENABLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.apache.hadoop.hdfs.server.federation.router.TestRouterNetworkTopologyServlet.clusterNoDatanodes;
import static org.apache.hadoop.hdfs.server.federation.router.TestRouterNetworkTopologyServlet.clusterWithDatanodes;
import static org.apache.hadoop.hdfs.server.federation.router.TestRouterNetworkTopologyServlet.setUp;

@SuppressWarnings("checkstyle:VisibilityModifier")
public class TestRouterNetworkTopologyServlet {

private static StateStoreDFSCluster clusterWithDatanodes;
private static StateStoreDFSCluster clusterNoDatanodes;
public static StateStoreDFSCluster clusterWithDatanodes;
public static StateStoreDFSCluster clusterNoDatanodes;

public static final String ASYNC_MODE = "ASYNC";
public static final String SYNC_MODE = "SYNC";

@BeforeAll
public static void setUp() throws Exception {
// Builder configuration
public static void setUp(String rpcMode) throws Exception {
// Builder configuration.
Configuration routerConf =
new RouterConfigBuilder().stateStore().admin().quota().rpc().build();
routerConf.set(DFS_ROUTER_HTTP_ENABLE, "true");
// Use async router rpc.
if (rpcMode.equals("ASYNC")) {
routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
}
Configuration hdfsConf = new Configuration(false);

// Build and start a federated cluster
// Build and start a federated cluster.
clusterWithDatanodes = new StateStoreDFSCluster(false, 2,
MultipleDestinationMountTableResolver.class);
clusterWithDatanodes.addNamenodeOverrides(hdfsConf);
Expand All @@ -67,7 +84,7 @@ public static void setUp() throws Exception {
clusterWithDatanodes.waitClusterUp();
clusterWithDatanodes.waitActiveNamespaces();

// Build and start a federated cluster
// Build and start a federated cluster.
clusterNoDatanodes = new StateStoreDFSCluster(false, 2,
MultipleDestinationMountTableResolver.class);
clusterNoDatanodes.addNamenodeOverrides(hdfsConf);
Expand All @@ -80,13 +97,74 @@ public static void setUp() throws Exception {
clusterNoDatanodes.waitActiveNamespaces();
}

@Test
@Nested
@ExtendWith(RouterServerHelper.class)
class TestWithAsyncRouterRpc {

@ParameterizedTest
@ValueSource(strings = {ASYNC_MODE})
public void testPrintTopologyTextFormatAsync(String rpcMode) throws Exception {
testPrintTopologyTextFormat();
}

@ParameterizedTest
@ValueSource(strings = {ASYNC_MODE})
public void testPrintTopologyJsonFormatAsync(String rpcMode) throws Exception {
testPrintTopologyJsonFormat();
}

@ParameterizedTest
@ValueSource(strings = {ASYNC_MODE})
public void testPrintTopologyNoDatanodesTextFormatAsync(String rpcMode)
throws Exception {
testPrintTopologyNoDatanodesTextFormat();
}

@ParameterizedTest
@ValueSource(strings = {ASYNC_MODE})
public void testPrintTopologyNoDatanodesJsonFormatAsync(String rpcMode)
throws Exception {
testPrintTopologyNoDatanodesJsonFormat();
}
}

@Nested
@ExtendWith(RouterServerHelper.class)
class TestWithSyncRouterRpc {

@ParameterizedTest
@ValueSource(strings = {SYNC_MODE})
public void testPrintTopologyTextFormatSync(String rpcMode) throws Exception {
testPrintTopologyTextFormat();
}

@ParameterizedTest
@ValueSource(strings = {SYNC_MODE})
public void testPrintTopologyJsonFormatSync(String rpcMode) throws Exception {
testPrintTopologyJsonFormat();
}

@ParameterizedTest
@ValueSource(strings = {SYNC_MODE})
public void testPrintTopologyNoDatanodesTextFormatSync(String rpcMode)
throws Exception {
testPrintTopologyNoDatanodesTextFormat();
}

@ParameterizedTest
@ValueSource(strings = {SYNC_MODE})
public void testPrintTopologyNoDatanodesJsonFormatSync(String rpcMode)
throws Exception {
testPrintTopologyNoDatanodesJsonFormat();
}
}

public void testPrintTopologyTextFormat() throws Exception {
// get http Address
// Get http Address.
String httpAddress = clusterWithDatanodes.getRandomRouter().getRouter()
.getHttpServerAddress().toString();

// send http request
// Send http request.
URL url = new URL("http:/" + httpAddress + "/topology");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setReadTimeout(20000);
Expand All @@ -101,26 +179,25 @@ public void testPrintTopologyTextFormat() throws Exception {
sb.append("\n-- Network Topology -- ");
String topology = sb.toString();

// assert rack info
// Assert rack info.
assertTrue(topology.contains("/ns0/rack1"));
assertTrue(topology.contains("/ns0/rack2"));
assertTrue(topology.contains("/ns0/rack3"));
assertTrue(topology.contains("/ns1/rack4"));
assertTrue(topology.contains("/ns1/rack5"));
assertTrue(topology.contains("/ns1/rack6"));

// assert node number
// Assert node number.
assertEquals(18,
topology.split("127.0.0.1").length - 1);
}

@Test
public void testPrintTopologyJsonFormat() throws Exception {
// get http Address
// Get http Address.
String httpAddress = clusterWithDatanodes.getRandomRouter().getRouter()
.getHttpServerAddress().toString();
.getHttpServerAddress().toString();

// send http request
// Send http request.
URL url = new URL("http:/" + httpAddress + "/topology");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setReadTimeout(20000);
Expand All @@ -132,21 +209,21 @@ public void testPrintTopologyJsonFormat() throws Exception {
IOUtils.copyBytes(conn.getInputStream(), out, 4096, true);
String topology = out.toString();

// parse json
// Parse json.
JsonNode racks = new ObjectMapper().readTree(topology);

// assert rack number
// Assert rack number.
assertEquals(6, racks.size());

// assert rack info
// Assert rack info.
assertTrue(topology.contains("/ns0/rack1"));
assertTrue(topology.contains("/ns0/rack2"));
assertTrue(topology.contains("/ns0/rack3"));
assertTrue(topology.contains("/ns1/rack4"));
assertTrue(topology.contains("/ns1/rack5"));
assertTrue(topology.contains("/ns1/rack6"));

// assert node number
// Assert node number.
Iterator<JsonNode> elements = racks.elements();
int dataNodesCount = 0;
while(elements.hasNext()){
Expand All @@ -159,13 +236,12 @@ public void testPrintTopologyJsonFormat() throws Exception {
assertEquals(18, dataNodesCount);
}

@Test
public void testPrintTopologyNoDatanodesTextFormat() throws Exception {
// get http Address
// Get http Address.
String httpAddress = clusterNoDatanodes.getRandomRouter().getRouter()
.getHttpServerAddress().toString();

// send http request
// Send http request.
URL url = new URL("http:/" + httpAddress + "/topology");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setReadTimeout(20000);
Expand All @@ -179,17 +255,16 @@ public void testPrintTopologyNoDatanodesTextFormat() throws Exception {
sb.append("\n-- Network Topology -- ");
String topology = sb.toString();

// assert node number
// Assert node number.
assertTrue(topology.contains("No DataNodes"));
}

@Test
public void testPrintTopologyNoDatanodesJsonFormat() throws Exception {
// get http Address
// Get http Address.
String httpAddress = clusterNoDatanodes.getRandomRouter().getRouter()
.getHttpServerAddress().toString();

// send http request
// Send http request.
URL url = new URL("http:/" + httpAddress + "/topology");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setReadTimeout(20000);
Expand All @@ -204,7 +279,41 @@ public void testPrintTopologyNoDatanodesJsonFormat() throws Exception {
sb.append("\n-- Network Topology -- ");
String topology = sb.toString();

// assert node number
// Assert node number.
assertTrue(topology.contains("No DataNodes"));
}
}

class RouterServerHelper implements BeforeEachCallback, AfterAllCallback {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question. Shouldn't RouterServerHelper be extracted from this class and made into a separate utility class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @slfan1989 . Actually, I'd ever consider this way, but i found each test class have its own initialize logic. For example, some test class may need mkdis or addMountPoints which is unused for other test class. What's your opinions?

Copy link
Contributor

@slfan1989 slfan1989 Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the current changes in this PR are acceptable, but it would be better to extract RouterServerHelper to avoid potential code redundancy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can merge this firstly and i will try to extract common logic in #7470 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KeeProMise Do you have any other suggestions? We can take a look at the details in #7470 first. If you think there are no issues, we can merge this PR anytime. Thanks again!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


private static final ThreadLocal<RouterServerHelper> TEST_ROUTER_SERVER_TL =
new InheritableThreadLocal<RouterServerHelper>();

@Override
public void beforeEach(ExtensionContext context) throws Exception {
Method testMethod = context.getRequiredTestMethod();
ValueSource enumAnnotation = testMethod.getAnnotation(ValueSource.class);
if (enumAnnotation != null) {
String[] strings = enumAnnotation.strings();
for (String rpcMode : strings) {
if (TEST_ROUTER_SERVER_TL.get() == null) {
setUp(rpcMode);
}
}
}
TEST_ROUTER_SERVER_TL.set(RouterServerHelper.this);
}

@Override
public void afterAll(ExtensionContext context) {
if (clusterWithDatanodes != null) {
clusterWithDatanodes.shutdown();
clusterWithDatanodes = null;
}
if (clusterNoDatanodes != null) {
clusterNoDatanodes.shutdown();
clusterNoDatanodes = null;
}
TEST_ROUTER_SERVER_TL.remove();
}
}