Skip to content

Commit b404311

Browse files
hfutatzhanghbKeeProMise
authored andcommitted
HDFS-17595. [ARR] ErasureCoding supports asynchronous rpc. (apache#6983). Contributed by hfutatzhanghb.
Reviewed-by: Jian Zhang <[email protected]> Signed-off-by: He Xiaoqiao <[email protected]>
1 parent c959453 commit b404311

File tree

2 files changed

+384
-0
lines changed

2 files changed

+384
-0
lines changed
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router;
19+
20+
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
21+
import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
22+
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
23+
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
24+
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
25+
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
26+
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
27+
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
28+
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
29+
import org.apache.hadoop.hdfs.server.namenode.NameNode;
30+
31+
import java.io.IOException;
32+
import java.util.Collection;
33+
import java.util.HashMap;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.Set;
37+
38+
import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
39+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
40+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
41+
42+
public class AsyncErasureCoding extends ErasureCoding {
43+
/** RPC server to receive client calls. */
44+
private final RouterRpcServer rpcServer;
45+
/** RPC clients to connect to the Namenodes. */
46+
private final RouterRpcClient rpcClient;
47+
/** Interface to identify the active NN for a nameservice or blockpool ID. */
48+
private final ActiveNamenodeResolver namenodeResolver;
49+
50+
public AsyncErasureCoding(RouterRpcServer server) {
51+
super(server);
52+
this.rpcServer = server;
53+
this.rpcClient = this.rpcServer.getRPCClient();
54+
this.namenodeResolver = this.rpcClient.getNamenodeResolver();
55+
}
56+
57+
public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
58+
throws IOException {
59+
rpcServer.checkOperation(NameNode.OperationCategory.READ);
60+
61+
RemoteMethod method = new RemoteMethod("getErasureCodingPolicies");
62+
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
63+
64+
rpcClient.invokeConcurrent(
65+
nss, method, true, false, ErasureCodingPolicyInfo[].class);
66+
asyncApply(
67+
(ApplyFunction<Map<FederationNamespaceInfo, ErasureCodingPolicyInfo[]>,
68+
ErasureCodingPolicyInfo[]>) ret -> merge(ret, ErasureCodingPolicyInfo.class));
69+
70+
return asyncReturn(ErasureCodingPolicyInfo[].class);
71+
}
72+
73+
@Override
74+
public Map<String, String> getErasureCodingCodecs() throws IOException {
75+
rpcServer.checkOperation(NameNode.OperationCategory.READ);
76+
77+
RemoteMethod method = new RemoteMethod("getErasureCodingCodecs");
78+
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
79+
80+
rpcClient.invokeConcurrent(
81+
nss, method, true, false, Map.class);
82+
83+
asyncApply((ApplyFunction<Map<FederationNamespaceInfo,
84+
Map<String, String>>, Map<String, String>>) retCodecs -> {
85+
Map<String, String> ret = new HashMap<>();
86+
Object obj = retCodecs;
87+
@SuppressWarnings("unchecked")
88+
Map<FederationNamespaceInfo, Map<String, String>> results =
89+
(Map<FederationNamespaceInfo, Map<String, String>>)obj;
90+
Collection<Map<String, String>> allCodecs = results.values();
91+
for (Map<String, String> codecs : allCodecs) {
92+
ret.putAll(codecs);
93+
}
94+
return ret;
95+
});
96+
97+
return asyncReturn(Map.class);
98+
}
99+
100+
@Override
101+
public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
102+
ErasureCodingPolicy[] policies) throws IOException {
103+
rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
104+
105+
RemoteMethod method = new RemoteMethod("addErasureCodingPolicies",
106+
new Class<?>[] {ErasureCodingPolicy[].class}, new Object[] {policies});
107+
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
108+
109+
rpcClient.invokeConcurrent(
110+
nss, method, true, false, AddErasureCodingPolicyResponse[].class);
111+
112+
asyncApply(
113+
(ApplyFunction<Map<FederationNamespaceInfo, AddErasureCodingPolicyResponse[]>,
114+
AddErasureCodingPolicyResponse[]>) ret -> {
115+
return merge(ret, AddErasureCodingPolicyResponse.class);
116+
});
117+
return asyncReturn(AddErasureCodingPolicyResponse[].class);
118+
}
119+
120+
@Override
121+
public ErasureCodingPolicy getErasureCodingPolicy(String src)
122+
throws IOException {
123+
rpcServer.checkOperation(NameNode.OperationCategory.READ);
124+
125+
final List<RemoteLocation> locations =
126+
rpcServer.getLocationsForPath(src, false, false);
127+
RemoteMethod remoteMethod = new RemoteMethod("getErasureCodingPolicy",
128+
new Class<?>[] {String.class}, new RemoteParam());
129+
rpcClient.invokeSequential(
130+
locations, remoteMethod, null, null);
131+
132+
asyncApply(ret -> {
133+
return (ErasureCodingPolicy) ret;
134+
});
135+
136+
return asyncReturn(ErasureCodingPolicy.class);
137+
}
138+
139+
@Override
140+
public ECTopologyVerifierResult getECTopologyResultForPolicies(
141+
String[] policyNames) throws IOException {
142+
RemoteMethod method = new RemoteMethod("getECTopologyResultForPolicies",
143+
new Class<?>[] {String[].class}, new Object[] {policyNames});
144+
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
145+
if (nss.isEmpty()) {
146+
throw new IOException("No namespace availaible.");
147+
}
148+
149+
rpcClient.invokeConcurrent(nss, method, true, false,
150+
ECTopologyVerifierResult.class);
151+
asyncApply((ApplyFunction<Map<FederationNamespaceInfo, ECTopologyVerifierResult>,
152+
ECTopologyVerifierResult>) ret -> {
153+
for (Map.Entry<FederationNamespaceInfo, ECTopologyVerifierResult> entry :
154+
ret.entrySet()) {
155+
if (!entry.getValue().isSupported()) {
156+
return entry.getValue();
157+
}
158+
}
159+
// If no negative result, return the result from the first namespace.
160+
return ret.get(nss.iterator().next());
161+
});
162+
return asyncReturn(ECTopologyVerifierResult.class);
163+
}
164+
165+
@Override
166+
public ECBlockGroupStats getECBlockGroupStats() throws IOException {
167+
rpcServer.checkOperation(NameNode.OperationCategory.READ);
168+
169+
RemoteMethod method = new RemoteMethod("getECBlockGroupStats");
170+
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
171+
rpcClient.invokeConcurrent(
172+
nss, method, true, false, ECBlockGroupStats.class);
173+
174+
asyncApply((ApplyFunction<Map<FederationNamespaceInfo, ECBlockGroupStats>,
175+
ECBlockGroupStats>) allStats -> {
176+
return ECBlockGroupStats.merge(allStats.values());
177+
});
178+
return asyncReturn(ECBlockGroupStats.class);
179+
}
180+
}
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router;
19+
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.FSDataOutputStream;
23+
import org.apache.hadoop.fs.FileSystem;
24+
import org.apache.hadoop.fs.Path;
25+
import org.apache.hadoop.fs.permission.FsPermission;
26+
import org.apache.hadoop.hdfs.StripedFileTestUtil;
27+
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
28+
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
29+
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
30+
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
31+
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
32+
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
33+
import org.apache.hadoop.hdfs.server.federation.MockResolver;
34+
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
35+
import org.apache.hadoop.io.erasurecode.ECSchema;
36+
import org.apache.hadoop.ipc.CallerContext;
37+
import org.junit.After;
38+
import org.junit.AfterClass;
39+
import org.junit.Before;
40+
import org.junit.BeforeClass;
41+
import org.junit.Test;
42+
import org.mockito.Mockito;
43+
44+
import java.io.IOException;
45+
import java.util.Map;
46+
import java.util.concurrent.TimeUnit;
47+
48+
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
49+
import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
50+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
51+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
52+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
53+
import static org.junit.Assert.assertArrayEquals;
54+
import static org.junit.Assert.assertEquals;
55+
import static org.junit.Assert.assertNotNull;
56+
import static org.junit.Assert.assertTrue;
57+
58+
public class TestRouterAsyncErasureCoding {
59+
private static Configuration routerConf;
60+
/** Federated HDFS cluster. */
61+
private static MiniRouterDFSCluster cluster;
62+
private static String ns0;
63+
64+
/** Random Router for this federated cluster. */
65+
private MiniRouterDFSCluster.RouterContext router;
66+
private FileSystem routerFs;
67+
private RouterRpcServer routerRpcServer;
68+
private AsyncErasureCoding asyncErasureCoding;
69+
70+
private final String testfilePath = "/testdir/testAsyncErasureCoding.file";
71+
72+
@BeforeClass
73+
public static void setUpCluster() throws Exception {
74+
cluster = new MiniRouterDFSCluster(true, 1, 2,
75+
DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
76+
cluster.setNumDatanodesPerNameservice(3);
77+
cluster.setRacks(
78+
new String[] {"/rack1", "/rack2", "/rack3"});
79+
cluster.startCluster();
80+
81+
// Making one Namenode active per nameservice
82+
if (cluster.isHighAvailability()) {
83+
for (String ns : cluster.getNameservices()) {
84+
cluster.switchToActive(ns, NAMENODES[0]);
85+
cluster.switchToStandby(ns, NAMENODES[1]);
86+
}
87+
}
88+
// Start routers with only an RPC service
89+
routerConf = new RouterConfigBuilder()
90+
.rpc()
91+
.build();
92+
93+
// Reduce the number of RPC clients threads to overload the Router easy
94+
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
95+
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
96+
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
97+
// We decrease the DN cache times to make the test faster
98+
routerConf.setTimeDuration(
99+
RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
100+
cluster.addRouterOverrides(routerConf);
101+
// Start routers with only an RPC service
102+
cluster.startRouters();
103+
104+
// Register and verify all NNs with all routers
105+
cluster.registerNamenodes();
106+
cluster.waitNamenodeRegistration();
107+
cluster.waitActiveNamespaces();
108+
ns0 = cluster.getNameservices().get(0);
109+
}
110+
111+
@AfterClass
112+
public static void shutdownCluster() throws Exception {
113+
if (cluster != null) {
114+
cluster.shutdown();
115+
}
116+
}
117+
118+
@Before
119+
public void setUp() throws IOException {
120+
router = cluster.getRandomRouter();
121+
routerFs = router.getFileSystem();
122+
routerRpcServer = router.getRouterRpcServer();
123+
routerRpcServer.initAsyncThreadPool();
124+
RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
125+
routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
126+
routerRpcServer.getRPCMonitor(),
127+
routerRpcServer.getRouterStateIdContext());
128+
RouterRpcServer spy = Mockito.spy(routerRpcServer);
129+
Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
130+
asyncErasureCoding = new AsyncErasureCoding(spy);
131+
132+
// Create mock locations
133+
MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
134+
resolver.addLocation("/", ns0, "/");
135+
FsPermission permission = new FsPermission("705");
136+
routerFs.mkdirs(new Path("/testdir"), permission);
137+
FSDataOutputStream fsDataOutputStream = routerFs.create(
138+
new Path(testfilePath), true);
139+
fsDataOutputStream.write(new byte[1024]);
140+
fsDataOutputStream.close();
141+
}
142+
143+
@After
144+
public void tearDown() throws IOException {
145+
// clear client context
146+
CallerContext.setCurrent(null);
147+
boolean delete = routerFs.delete(new Path("/testdir"));
148+
assertTrue(delete);
149+
if (routerFs != null) {
150+
routerFs.close();
151+
}
152+
}
153+
154+
@Test
155+
public void testRouterAsyncErasureCoding() throws Exception {
156+
String ecPolicyName = StripedFileTestUtil.getDefaultECPolicy().getName();
157+
HdfsFileStatus fileInfo = cluster.getNamenodes().get(0).getClient().getFileInfo(testfilePath);
158+
assertNotNull(fileInfo);
159+
160+
asyncErasureCoding.setErasureCodingPolicy("/testdir", ecPolicyName);
161+
syncReturn(null);
162+
163+
asyncErasureCoding.getErasureCodingPolicy("/testdir");
164+
ErasureCodingPolicy ecPolicy = syncReturn(ErasureCodingPolicy.class);
165+
assertEquals(StripedFileTestUtil.getDefaultECPolicy().getName(), ecPolicy.getName());
166+
167+
asyncErasureCoding.getErasureCodingPolicies();
168+
ErasureCodingPolicyInfo[] erasureCodingPolicies = syncReturn(ErasureCodingPolicyInfo[].class);
169+
int numECPolicies = erasureCodingPolicies.length;
170+
ErasureCodingPolicyInfo[] erasureCodingPoliciesFromNameNode =
171+
cluster.getNamenodes().get(0).getClient().getErasureCodingPolicies();
172+
173+
assertArrayEquals(erasureCodingPoliciesFromNameNode, erasureCodingPolicies);
174+
175+
asyncErasureCoding.getErasureCodingCodecs();
176+
Map<String, String> erasureCodingCodecs = syncReturn(Map.class);
177+
Map<String, String> erasureCodingCodecsFromNameNode =
178+
cluster.getNamenodes().get(0).getClient().getErasureCodingCodecs();
179+
180+
assertEquals(erasureCodingCodecs, erasureCodingCodecsFromNameNode);
181+
182+
// RS-12-4-1024k
183+
final ECSchema schema = new ECSchema("rs", 12, 4);
184+
ErasureCodingPolicy erasureCodingPolicy = new ErasureCodingPolicy(schema, 1024 * 1024);
185+
asyncErasureCoding.addErasureCodingPolicies(new ErasureCodingPolicy[]{erasureCodingPolicy});
186+
AddErasureCodingPolicyResponse[] response = syncReturn(AddErasureCodingPolicyResponse[].class);
187+
assertEquals(response[0].isSucceed(), true);
188+
189+
asyncErasureCoding.getErasureCodingPolicies();
190+
ErasureCodingPolicyInfo[] erasureCodingPolicies2 = syncReturn(ErasureCodingPolicyInfo[].class);
191+
int numNewECPolicies = erasureCodingPolicies2.length;
192+
assertEquals(numECPolicies + 1, numNewECPolicies);
193+
194+
asyncErasureCoding.getECTopologyResultForPolicies(
195+
new String[]{"RS-6-3-1024k", "RS-12-4-1024k"});
196+
ECTopologyVerifierResult ecTResultForPolicies = syncReturn(ECTopologyVerifierResult.class);
197+
assertEquals(false, ecTResultForPolicies.isSupported());
198+
199+
asyncErasureCoding.getECTopologyResultForPolicies(
200+
new String[]{"XOR-2-1-1024k"});
201+
ECTopologyVerifierResult ecTResultForPolicies2 = syncReturn(ECTopologyVerifierResult.class);
202+
assertEquals(true, ecTResultForPolicies2.isSupported());
203+
}
204+
}

0 commit comments

Comments
 (0)