Skip to content

Commit 746d3f2

Browse files
committed
add UT.
1 parent 8af0bbf commit 746d3f2

File tree

2 files changed

+206
-2
lines changed

2 files changed

+206
-2
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
7171
}
7272

7373
@Override
74-
public Map getErasureCodingCodecs() throws IOException {
74+
public Map<String, String> getErasureCodingCodecs() throws IOException {
7575
rpcServer.checkOperation(NameNode.OperationCategory.READ);
7676

7777
RemoteMethod method = new RemoteMethod("getErasureCodingCodecs");
@@ -80,7 +80,7 @@ public Map getErasureCodingCodecs() throws IOException {
8080
rpcClient.invokeConcurrent(
8181
nss, method, true, false, Map.class);
8282

83-
asyncApply((ApplyFunction<Map<FederationNamespaceInfo, Map>, Map>) retCodecs -> {
83+
asyncApply((ApplyFunction<Map<FederationNamespaceInfo, Map<String, String>>, Map<String, String>>) retCodecs -> {
8484
Map<String, String> ret = new HashMap<>();
8585
Object obj = retCodecs;
8686
@SuppressWarnings("unchecked")
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router;
19+
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.FSDataOutputStream;
23+
import org.apache.hadoop.fs.FileSystem;
24+
import org.apache.hadoop.fs.Path;
25+
import org.apache.hadoop.fs.permission.FsPermission;
26+
import org.apache.hadoop.hdfs.StripedFileTestUtil;
27+
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
28+
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
29+
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
30+
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
31+
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
32+
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
33+
import org.apache.hadoop.hdfs.server.federation.MockResolver;
34+
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
35+
import org.apache.hadoop.io.erasurecode.ECSchema;
36+
import org.apache.hadoop.ipc.CallerContext;
37+
import org.junit.After;
38+
import org.junit.AfterClass;
39+
import org.junit.Before;
40+
import org.junit.BeforeClass;
41+
import org.junit.Test;
42+
import org.mockito.Mockito;
43+
44+
import java.io.IOException;
45+
import java.util.Map;
46+
import java.util.concurrent.TimeUnit;
47+
48+
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
49+
import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
50+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
51+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
52+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
53+
import static org.junit.Assert.assertArrayEquals;
54+
import static org.junit.Assert.assertEquals;
55+
import static org.junit.Assert.assertNotNull;
56+
import static org.junit.Assert.assertTrue;
57+
58+
public class TestRouterAsyncErasureCoding {
59+
private static Configuration routerConf;
60+
/** Federated HDFS cluster. */
61+
private static MiniRouterDFSCluster cluster;
62+
private static String ns0;
63+
64+
/** Random Router for this federated cluster. */
65+
private MiniRouterDFSCluster.RouterContext router;
66+
private FileSystem routerFs;
67+
private RouterRpcServer routerRpcServer;
68+
private AsyncErasureCoding asyncErasureCoding;
69+
70+
private final String testfilePath = "/testdir/testAsyncErasureCoding.file";
71+
72+
@BeforeClass
73+
public static void setUpCluster() throws Exception {
74+
cluster = new MiniRouterDFSCluster(true, 1, 2,
75+
DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
76+
cluster.setNumDatanodesPerNameservice(3);
77+
cluster.setRacks(
78+
new String[] {"/rack1", "/rack2", "/rack3"});
79+
cluster.startCluster();
80+
81+
// Making one Namenode active per nameservice
82+
if (cluster.isHighAvailability()) {
83+
for (String ns : cluster.getNameservices()) {
84+
cluster.switchToActive(ns, NAMENODES[0]);
85+
cluster.switchToStandby(ns, NAMENODES[1]);
86+
}
87+
}
88+
// Start routers with only an RPC service
89+
routerConf = new RouterConfigBuilder()
90+
.rpc()
91+
.build();
92+
93+
// Reduce the number of RPC clients threads to overload the Router easy
94+
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
95+
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
96+
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
97+
// We decrease the DN cache times to make the test faster
98+
routerConf.setTimeDuration(
99+
RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
100+
cluster.addRouterOverrides(routerConf);
101+
// Start routers with only an RPC service
102+
cluster.startRouters();
103+
104+
// Register and verify all NNs with all routers
105+
cluster.registerNamenodes();
106+
cluster.waitNamenodeRegistration();
107+
cluster.waitActiveNamespaces();
108+
ns0 = cluster.getNameservices().get(0);
109+
}
110+
111+
@AfterClass
112+
public static void shutdownCluster() throws Exception {
113+
if (cluster != null) {
114+
cluster.shutdown();
115+
}
116+
}
117+
118+
@Before
119+
public void setUp() throws IOException {
120+
router = cluster.getRandomRouter();
121+
routerFs = router.getFileSystem();
122+
routerRpcServer = router.getRouterRpcServer();
123+
routerRpcServer.initAsyncThreadPool();
124+
RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
125+
routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
126+
routerRpcServer.getRPCMonitor(),
127+
routerRpcServer.getRouterStateIdContext());
128+
RouterRpcServer spy = Mockito.spy(routerRpcServer);
129+
Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
130+
asyncErasureCoding = new AsyncErasureCoding(spy);
131+
132+
// Create mock locations
133+
MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
134+
resolver.addLocation("/", ns0, "/");
135+
FsPermission permission = new FsPermission("705");
136+
routerFs.mkdirs(new Path("/testdir"), permission);
137+
FSDataOutputStream fsDataOutputStream = routerFs.create(
138+
new Path(testfilePath), true);
139+
fsDataOutputStream.write(new byte[1024]);
140+
fsDataOutputStream.close();
141+
}
142+
143+
@After
144+
public void tearDown() throws IOException {
145+
// clear client context
146+
CallerContext.setCurrent(null);
147+
boolean delete = routerFs.delete(new Path("/testdir"));
148+
assertTrue(delete);
149+
if (routerFs != null) {
150+
routerFs.close();
151+
}
152+
}
153+
154+
@Test
155+
public void testRouterAsyncErasureCoding() throws Exception {
156+
String ecPolicyName = StripedFileTestUtil.getDefaultECPolicy().getName();
157+
HdfsFileStatus fileInfo = cluster.getNamenodes().get(0).getClient().getFileInfo(testfilePath);
158+
assertNotNull(fileInfo);
159+
160+
asyncErasureCoding.setErasureCodingPolicy("/testdir", ecPolicyName);
161+
syncReturn(null);
162+
163+
asyncErasureCoding.getErasureCodingPolicy("/testdir");
164+
ErasureCodingPolicy ecPolicy = syncReturn(ErasureCodingPolicy.class);
165+
assertEquals(StripedFileTestUtil.getDefaultECPolicy().getName(), ecPolicy.getName());
166+
167+
asyncErasureCoding.getErasureCodingPolicies();
168+
ErasureCodingPolicyInfo[] erasureCodingPolicies = syncReturn(ErasureCodingPolicyInfo[].class);
169+
int numECPolicies = erasureCodingPolicies.length;
170+
ErasureCodingPolicyInfo[] erasureCodingPoliciesFromNameNode =
171+
cluster.getNamenodes().get(0).getClient().getErasureCodingPolicies();
172+
173+
assertArrayEquals(erasureCodingPoliciesFromNameNode, erasureCodingPolicies);
174+
175+
asyncErasureCoding.getErasureCodingCodecs();
176+
Map<String, String> erasureCodingCodecs = syncReturn(Map.class);
177+
Map<String, String> erasureCodingCodecsFromNameNode =
178+
cluster.getNamenodes().get(0).getClient().getErasureCodingCodecs();
179+
180+
assertEquals(erasureCodingCodecs, erasureCodingCodecsFromNameNode);
181+
182+
// RS-12-4-1024k
183+
final ECSchema schema = new ECSchema("rs", 12, 4);
184+
ErasureCodingPolicy erasureCodingPolicy = new ErasureCodingPolicy(schema, 1024 * 1024);
185+
asyncErasureCoding.addErasureCodingPolicies(new ErasureCodingPolicy[]{erasureCodingPolicy});
186+
AddErasureCodingPolicyResponse[] response = syncReturn(AddErasureCodingPolicyResponse[].class);
187+
assertEquals(response[0].isSucceed(), true);
188+
189+
asyncErasureCoding.getErasureCodingPolicies();
190+
ErasureCodingPolicyInfo[] newErasureCodingPolicies = syncReturn(ErasureCodingPolicyInfo[].class);
191+
int numNewECPolicies = newErasureCodingPolicies.length;
192+
assertEquals(numECPolicies + 1, numNewECPolicies);
193+
194+
asyncErasureCoding.getECTopologyResultForPolicies(
195+
new String[]{"RS-6-3-1024k", "RS-12-4-1024k"});
196+
ECTopologyVerifierResult ecTopologyResultForPolicies = syncReturn(ECTopologyVerifierResult.class);
197+
assertEquals(false, ecTopologyResultForPolicies.isSupported());
198+
199+
asyncErasureCoding.getECTopologyResultForPolicies(
200+
new String[]{"XOR-2-1-1024k"});
201+
ECTopologyVerifierResult ecTopologyResultForPolicies1 = syncReturn(ECTopologyVerifierResult.class);
202+
assertEquals(true, ecTopologyResultForPolicies1.isSupported());
203+
}
204+
}

0 commit comments

Comments
 (0)