Skip to content

Commit 5f415d7

Browse files
顾鹏顾鹏
authored andcommitted
HDFS-17223. Add journalnode maintenance node list
1 parent 000a39b commit 5f415d7

File tree

9 files changed

+161
-11
lines changed

9 files changed

+161
-11
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1466,6 +1466,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
14661466
"dfs.journalnode.edit-cache-size.fraction";
14671467
public static final float DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_DEFAULT = 0.5f;
14681468

1469+
public static final String DFS_JOURNALNODE_MAINTENANCE_NODES_KEY =
1470+
"dfs.journalnode.maintenance.nodes";
1471+
public static final String[] DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT = {};
1472+
14691473
// Journal-node related configs for the client side.
14701474
public static final String DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";
14711475
public static final int DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT = 10;

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.apache.hadoop.classification.InterfaceStability;
7171
import org.apache.hadoop.fs.ParentNotDirectoryException;
7272
import org.apache.hadoop.fs.UnresolvedLinkException;
73+
import org.apache.hadoop.hdfs.server.blockmanagement.HostSet;
7374
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
7475
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
7576
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
@@ -1982,4 +1983,28 @@ public static void addTransferRateMetric(final DataNodeMetrics metrics, final lo
19821983
LOG.warn("Unexpected value for data transfer bytes={} duration={}", read, duration);
19831984
}
19841985
}
1986+
1987+
/**
1988+
* Retrieve InetSocketAddress set by ip port string array.
1989+
* @param nodesHostPort ip port string array.
1990+
* @return HostSet of InetSocketAddress.
1991+
*/
1992+
public static HostSet convertHostSet(String[] nodesHostPort) {
1993+
HostSet retSet = new HostSet();
1994+
for (String hostPort : nodesHostPort) {
1995+
try {
1996+
URI uri = new URI("dummy", hostPort, null, null, null);
1997+
int port = uri.getPort() == -1 ? 0 : uri.getPort();
1998+
InetSocketAddress inetSocketAddress = new InetSocketAddress(uri.getHost(), port);
1999+
if (inetSocketAddress.isUnresolved()) {
2000+
LOG.warn(String.format("Failed to resolve address `%s`", hostPort));
2001+
continue;
2002+
}
2003+
retSet.add(inetSocketAddress);
2004+
} catch (URISyntaxException e) {
2005+
LOG.warn(String.format("Failed to parse `%s`", hostPort));
2006+
}
2007+
}
2008+
return retSet;
2009+
}
19852010
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,15 @@ class AsyncLoggerSet {
5353

5454
private static final long INVALID_EPOCH = -1;
5555
private long myEpoch = INVALID_EPOCH;
56+
private final int majoritySize;
5657

57-
public AsyncLoggerSet(List<AsyncLogger> loggers) {
58+
AsyncLoggerSet(List<AsyncLogger> loggers) {
59+
this(loggers, loggers.size());
60+
}
61+
62+
AsyncLoggerSet(List<AsyncLogger> loggers, int quorumJournalCount) {
5863
this.loggers = ImmutableList.copyOf(loggers);
64+
this.majoritySize = quorumJournalCount / 2 + 1;
5965
}
6066

6167
void setEpoch(long e) {
@@ -151,7 +157,7 @@ <V> Map<AsyncLogger, V> waitForWriteQuorum(QuorumCall<AsyncLogger, V> q,
151157
* @return the number of nodes which are required to obtain a quorum.
152158
*/
153159
int getMajoritySize() {
154-
return loggers.size() / 2 + 1;
160+
return this.majoritySize;
155161
}
156162

157163
/**

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
*/
1818
package org.apache.hadoop.hdfs.qjournal.client;
1919

20+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT;
21+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_MAINTENANCE_NODES_KEY;
22+
2023
import java.io.IOException;
2124
import java.net.InetSocketAddress;
2225
import java.net.URI;
@@ -31,6 +34,7 @@
3134
import java.util.concurrent.TimeUnit;
3235
import java.util.concurrent.TimeoutException;
3336

37+
import org.apache.hadoop.hdfs.server.blockmanagement.HostSet;
3438
import org.apache.hadoop.util.Lists;
3539
import org.slf4j.Logger;
3640
import org.slf4j.LoggerFactory;
@@ -62,6 +66,7 @@
6266
import org.apache.hadoop.classification.VisibleForTesting;
6367
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
6468
import org.apache.hadoop.util.Preconditions;
69+
6570
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
6671

6772
/**
@@ -108,6 +113,7 @@ public class QuorumJournalManager implements JournalManager {
108113
private static final int OUTPUT_BUFFER_CAPACITY_DEFAULT = 512 * 1024;
109114
private int outputBufferCapacity;
110115
private final URLConnectionFactory connectionFactory;
116+
private int quorumJournalCount;
111117

112118
/** Limit logging about input stream selection to every 5 seconds max. */
113119
private static final long SELECT_INPUT_STREAM_LOG_INTERVAL_MS = 5000;
@@ -144,7 +150,14 @@ public QuorumJournalManager(Configuration conf,
144150
this.uri = uri;
145151
this.nsInfo = nsInfo;
146152
this.nameServiceId = nameServiceId;
147-
this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
153+
this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory), this.quorumJournalCount);
154+
155+
// Check whether the number of jn maintenance lists is valid
156+
int quorumThreshold = quorumJournalCount / 2 + 1;
157+
Preconditions.checkArgument(
158+
this.loggers.size() >= quorumThreshold,
159+
"The total journalnode minus %s the number of blacklists must be greater than or equal to"
160+
+ " %s!", DFS_JOURNALNODE_MAINTENANCE_NODES_KEY, quorumThreshold);
148161

149162
this.maxTxnsPerRpc =
150163
conf.getInt(QJM_RPC_MAX_TXNS_KEY, QJM_RPC_MAX_TXNS_DEFAULT);
@@ -250,6 +263,9 @@ Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()
250263

251264
@Override
252265
public void format(NamespaceInfo nsInfo, boolean force) throws IOException {
266+
if (isEnableJnMaintenance()) {
267+
throw new IOException("format() does not support enabling jn maintenance mode");
268+
}
253269
QuorumCall<AsyncLogger, Void> call = loggers.format(nsInfo, force);
254270
try {
255271
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
@@ -406,21 +422,39 @@ private void recoverUnclosedSegment(long segmentTxId) throws IOException {
406422
logToSync.getStartTxId(),
407423
logToSync.getEndTxId()));
408424
}
409-
410-
static List<AsyncLogger> createLoggers(Configuration conf,
425+
426+
List<AsyncLogger> createLoggers(Configuration conf,
427+
URI uri,
428+
NamespaceInfo nsInfo,
429+
AsyncLogger.Factory factory,
430+
String nameServiceId)
431+
throws IOException {
432+
String[] skipNodesHostPort = conf.getTrimmedStrings(
433+
DFS_JOURNALNODE_MAINTENANCE_NODES_KEY, DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT);
434+
return createLoggers(conf, uri, nsInfo, factory, nameServiceId, skipNodesHostPort);
435+
}
436+
437+
private List<AsyncLogger> createLoggers(Configuration conf,
411438
URI uri,
412439
NamespaceInfo nsInfo,
413440
AsyncLogger.Factory factory,
414-
String nameServiceId)
441+
String nameServiceId,
442+
String[] skipNodesHostPort)
415443
throws IOException {
416444
List<AsyncLogger> ret = Lists.newArrayList();
417445
List<InetSocketAddress> addrs = Util.getAddressesList(uri, conf);
418446
if (addrs.size() % 2 == 0) {
419447
LOG.warn("Quorum journal URI '" + uri + "' has an even number " +
420448
"of Journal Nodes specified. This is not recommended!");
421449
}
450+
setQuorumJournalCount(addrs.size());
451+
HostSet skipSet = DFSUtil.convertHostSet(skipNodesHostPort);
422452
String jid = parseJournalId(uri);
423453
for (InetSocketAddress addr : addrs) {
454+
if(skipSet.match(addr)) {
455+
LOG.info("The node {} is a maintenance node and will skip initialization.", addr);
456+
continue;
457+
}
424458
ret.add(factory.createLogger(conf, nsInfo, jid, nameServiceId, addr));
425459
}
426460
return ret;
@@ -667,6 +701,9 @@ AsyncLoggerSet getLoggerSetForTests() {
667701

668702
@Override
669703
public void doPreUpgrade() throws IOException {
704+
if (isEnableJnMaintenance()) {
705+
throw new IOException("doPreUpgrade() does not support enabling jn maintenance mode");
706+
}
670707
QuorumCall<AsyncLogger, Void> call = loggers.doPreUpgrade();
671708
try {
672709
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
@@ -684,6 +721,9 @@ public void doPreUpgrade() throws IOException {
684721

685722
@Override
686723
public void doUpgrade(Storage storage) throws IOException {
724+
if (isEnableJnMaintenance()) {
725+
throw new IOException("doUpgrade() does not support enabling jn maintenance mode");
726+
}
687727
QuorumCall<AsyncLogger, Void> call = loggers.doUpgrade(storage);
688728
try {
689729
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
@@ -701,6 +741,9 @@ public void doUpgrade(Storage storage) throws IOException {
701741

702742
@Override
703743
public void doFinalize() throws IOException {
744+
if (isEnableJnMaintenance()) {
745+
throw new IOException("doFinalize() does not support enabling jn maintenance mode");
746+
}
704747
QuorumCall<AsyncLogger, Void> call = loggers.doFinalize();
705748
try {
706749
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
@@ -719,6 +762,9 @@ public void doFinalize() throws IOException {
719762
@Override
720763
public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
721764
int targetLayoutVersion) throws IOException {
765+
if (isEnableJnMaintenance()) {
766+
throw new IOException("canRollBack() does not support enabling jn maintenance mode");
767+
}
722768
QuorumCall<AsyncLogger, Boolean> call = loggers.canRollBack(storage,
723769
prevStorage, targetLayoutVersion);
724770
try {
@@ -753,6 +799,9 @@ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
753799

754800
@Override
755801
public void doRollback() throws IOException {
802+
if (isEnableJnMaintenance()) {
803+
throw new IOException("doRollback() does not support enabling jn maintenance mode");
804+
}
756805
QuorumCall<AsyncLogger, Void> call = loggers.doRollback();
757806
try {
758807
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
@@ -770,6 +819,9 @@ public void doRollback() throws IOException {
770819

771820
@Override
772821
public void discardSegments(long startTxId) throws IOException {
822+
if (isEnableJnMaintenance()) {
823+
throw new IOException("discardSegments() does not support enabling jn maintenance mode");
824+
}
773825
QuorumCall<AsyncLogger, Void> call = loggers.discardSegments(startTxId);
774826
try {
775827
call.waitFor(loggers.size(), loggers.size(), 0,
@@ -789,6 +841,9 @@ public void discardSegments(long startTxId) throws IOException {
789841

790842
@Override
791843
public long getJournalCTime() throws IOException {
844+
if (isEnableJnMaintenance()) {
845+
throw new IOException("getJournalCTime() does not support enabling jn maintenance mode");
846+
}
792847
QuorumCall<AsyncLogger, Long> call = loggers.getJournalCTime();
793848
try {
794849
call.waitFor(loggers.size(), loggers.size(), 0,
@@ -819,4 +874,12 @@ public long getJournalCTime() throws IOException {
819874

820875
throw new AssertionError("Unreachable code.");
821876
}
877+
878+
public void setQuorumJournalCount(int quorumJournalCount) {
879+
this.quorumJournalCount = quorumJournalCount;
880+
}
881+
882+
private boolean isEnableJnMaintenance() {
883+
return this.loggers.size() < quorumJournalCount;
884+
}
822885
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class HostSet implements Iterable<InetSocketAddress> {
4545
* The function that checks whether there exists an entry foo in the set
4646
* so that foo &lt;= addr.
4747
*/
48-
boolean matchedBy(InetSocketAddress addr) {
48+
public boolean matchedBy(InetSocketAddress addr) {
4949
Collection<Integer> ports = addrs.get(addr.getAddress());
5050
return addr.getPort() == 0 ? !ports.isEmpty() : ports.contains(addr
5151
.getPort());
@@ -55,23 +55,23 @@ boolean matchedBy(InetSocketAddress addr) {
5555
* The function that checks whether there exists an entry foo in the set
5656
* so that addr &lt;= foo.
5757
*/
58-
boolean match(InetSocketAddress addr) {
58+
public boolean match(InetSocketAddress addr) {
5959
int port = addr.getPort();
6060
Collection<Integer> ports = addrs.get(addr.getAddress());
6161
boolean exactMatch = ports.contains(port);
6262
boolean genericMatch = ports.contains(0);
6363
return exactMatch || genericMatch;
6464
}
6565

66-
boolean isEmpty() {
66+
public boolean isEmpty() {
6767
return addrs.isEmpty();
6868
}
6969

70-
int size() {
70+
public int size() {
7171
return addrs.size();
7272
}
7373

74-
void add(InetSocketAddress addr) {
74+
public void add(InetSocketAddress addr) {
7575
Preconditions.checkArgument(!addr.isUnresolved());
7676
addrs.put(addr.getAddress(), addr.getPort());
7777
}

hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6333,6 +6333,20 @@
63336333
</description>
63346334
</property>
63356335

6336+
<property>
6337+
<name>dfs.journalnode.maintenance.nodes</name>
6338+
<value></value>
6339+
<description>
6340+
In the case of one out of three journal nodes being down, theoretically the service can still
6341+
continue. However, in reality, the downed node may not recover quickly. If the Namenode needs
6342+
to be restarted, it will try the downed journal node through the lengthy RPC retry mechanism,
6343+
resulting in a long initialization time for the Namenode to provide services. By adding the
6344+
downed journal node to the maintenance nodes, the initialization time of the Namenode in such
6345+
scenarios can be accelerated.
6346+
</description>
6347+
</property>
6348+
6349+
63366350
<property>
63376351
<name>dfs.namenode.lease-hard-limit-sec</name>
63386352
<value>1200</value>

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static org.junit.Assert.assertArrayEquals;
4242
import static org.junit.Assert.assertEquals;
4343
import static org.junit.Assert.assertFalse;
44+
import static org.junit.Assert.assertNotNull;
4445
import static org.junit.Assert.assertNull;
4546
import static org.junit.Assert.assertThat;
4647
import static org.junit.Assert.assertTrue;
@@ -72,6 +73,7 @@
7273
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
7374
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
7475
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
76+
import org.apache.hadoop.hdfs.server.blockmanagement.HostSet;
7577
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
7678
import org.apache.hadoop.hdfs.server.namenode.NameNode;
7779
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
@@ -1137,4 +1139,19 @@ public void testAddTransferRateMetricForInvalidValue() {
11371139
DFSUtil.addTransferRateMetric(mockMetrics, 100, 0);
11381140
verify(mockMetrics, times(0)).addReadTransferRate(anyLong());
11391141
}
1142+
1143+
@Test
1144+
public void testConvertHostSet() {
1145+
String strAddress = "localhost:9000";
1146+
InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 9000);
1147+
String[] testAddrs = new String[] {NS1_NN_ADDR, NS1_NN1_ADDR};
1148+
HostSet hostSet = DFSUtil.convertHostSet(testAddrs);
1149+
assertNotNull(hostSet);
1150+
assertEquals(0, hostSet.size());
1151+
1152+
testAddrs = new String[] {strAddress};
1153+
hostSet = DFSUtil.convertHostSet(testAddrs);
1154+
assertNotNull(hostSet);
1155+
assertTrue(hostSet.match(inetSocketAddress));
1156+
}
11401157
}

0 commit comments

Comments
 (0)