Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.journalnode.edit-cache-size.fraction";
public static final float DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_DEFAULT = 0.5f;

public static final String DFS_JOURNALNODE_MAINTENANCE_NODES_KEY =
"dfs.journalnode.maintenance.nodes";
public static final String[] DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT = {};

// Journal-node related configs for the client side.
public static final String DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";
public static final int DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.server.blockmanagement.HostSet;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
Expand Down Expand Up @@ -1982,4 +1983,32 @@ public static void addTransferRateMetric(final DataNodeMetrics metrics, final lo
LOG.warn("Unexpected value for data transfer bytes={} duration={}", read, duration);
}
}

/**
* Construct a HostSet from an array of "ip:port" strings.
* @param nodesHostPort ip port string array.
* @return HostSet of InetSocketAddress.
*/
public static HostSet getHostSet(String[] nodesHostPort) {
HostSet retSet = new HostSet();
for (String hostPort : nodesHostPort) {
try {
URI uri = new URI("dummy", hostPort, null, null, null);
int port = uri.getPort();
if (port < 0) {
LOG.warn(String.format("The ip:port `%s` is invalid, skip this node.", hostPort));
continue;
}
InetSocketAddress inetSocketAddress = new InetSocketAddress(uri.getHost(), port);
if (inetSocketAddress.isUnresolved()) {
LOG.warn(String.format("Failed to resolve address `%s`", hostPort));
continue;
}
retSet.add(inetSocketAddress);
} catch (URISyntaxException e) {
LOG.warn(String.format("Failed to parse `%s`", hostPort));
}
}
return retSet;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,15 @@ class AsyncLoggerSet {

private static final long INVALID_EPOCH = -1;
private long myEpoch = INVALID_EPOCH;
private final int majoritySize;

public AsyncLoggerSet(List<AsyncLogger> loggers) {
AsyncLoggerSet(List<AsyncLogger> loggers) {
this(loggers, loggers.size());
}

AsyncLoggerSet(List<AsyncLogger> loggers, int quorumJournalCount) {
this.loggers = ImmutableList.copyOf(loggers);
this.majoritySize = quorumJournalCount / 2 + 1;
}

void setEpoch(long e) {
Expand Down Expand Up @@ -151,7 +157,7 @@ <V> Map<AsyncLogger, V> waitForWriteQuorum(QuorumCall<AsyncLogger, V> q,
* @return the number of nodes which are required to obtain a quorum.
*/
int getMajoritySize() {
return loggers.size() / 2 + 1;
return this.majoritySize;
}

/**
Expand All @@ -161,6 +167,11 @@ String getMajorityString() {
return getMajoritySize() + "/" + loggers.size();
}

@VisibleForTesting
List<AsyncLogger> getLoggerListForTests() {
return loggers;
}

/**
* @return the number of loggers behind this set
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.qjournal.client;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_MAINTENANCE_NODES_KEY;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
Expand All @@ -31,6 +34,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.hdfs.server.blockmanagement.HostSet;
import org.apache.hadoop.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -108,6 +112,7 @@ public class QuorumJournalManager implements JournalManager {
private static final int OUTPUT_BUFFER_CAPACITY_DEFAULT = 512 * 1024;
private int outputBufferCapacity;
private final URLConnectionFactory connectionFactory;
private int quorumJournalCount;

/** Limit logging about input stream selection to every 5 seconds max. */
private static final long SELECT_INPUT_STREAM_LOG_INTERVAL_MS = 5000;
Expand Down Expand Up @@ -144,7 +149,18 @@ public QuorumJournalManager(Configuration conf,
this.uri = uri;
this.nsInfo = nsInfo;
this.nameServiceId = nameServiceId;
this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));

// createLoggers() will set quorumJournalCount to total number of journal nodes while return a
// list of healthy/good journal nodes.
List<AsyncLogger> asyncLoggerList = createLoggers(loggerFactory);
this.loggers = new AsyncLoggerSet(asyncLoggerList, this.quorumJournalCount);

// Check whether the number of jn maintenance lists is valid
int quorumThreshold = quorumJournalCount / 2 + 1;
Preconditions.checkArgument(
this.loggers.size() >= quorumThreshold,
"The total journalnode minus %s the number of blacklists must be greater than or equal to"
+ " %s!", DFS_JOURNALNODE_MAINTENANCE_NODES_KEY, quorumThreshold);

this.maxTxnsPerRpc =
conf.getInt(QJM_RPC_MAX_TXNS_KEY, QJM_RPC_MAX_TXNS_DEFAULT);
Expand Down Expand Up @@ -250,6 +266,10 @@ Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()

@Override
public void format(NamespaceInfo nsInfo, boolean force) throws IOException {
if (isJNInMaintenanceMode()) {
throw new IOException(
"Formatting a journal node is not support while in jn maintenance mode");
}
QuorumCall<AsyncLogger, Void> call = loggers.format(nsInfo, force);
try {
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
Expand Down Expand Up @@ -406,21 +426,39 @@ private void recoverUnclosedSegment(long segmentTxId) throws IOException {
logToSync.getStartTxId(),
logToSync.getEndTxId()));
}

static List<AsyncLogger> createLoggers(Configuration conf,

List<AsyncLogger> createLoggers(Configuration conf,
URI uri,
NamespaceInfo nsInfo,
AsyncLogger.Factory factory,
String nameServiceId)
throws IOException {
String[] skipNodesHostPort = conf.getTrimmedStrings(
DFS_JOURNALNODE_MAINTENANCE_NODES_KEY, DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT);
return createLoggers(conf, uri, nsInfo, factory, nameServiceId, skipNodesHostPort);
}

private List<AsyncLogger> createLoggers(Configuration conf,
URI uri,
NamespaceInfo nsInfo,
AsyncLogger.Factory factory,
String nameServiceId)
String nameServiceId,
String[] skipNodesHostPort)
throws IOException {
List<AsyncLogger> ret = Lists.newArrayList();
List<InetSocketAddress> addrs = Util.getAddressesList(uri, conf);
if (addrs.size() % 2 == 0) {
LOG.warn("Quorum journal URI '" + uri + "' has an even number " +
"of Journal Nodes specified. This is not recommended!");
}
setQuorumJournalCount(addrs.size());
HostSet skipSet = DFSUtil.getHostSet(skipNodesHostPort);
String jid = parseJournalId(uri);
for (InetSocketAddress addr : addrs) {
if(skipSet.match(addr)) {
LOG.info("The node {} is a maintenance node and will be skipped.", addr);
continue;
}
ret.add(factory.createLogger(conf, nsInfo, jid, nameServiceId, addr));
}
return ret;
Expand Down Expand Up @@ -667,6 +705,9 @@ AsyncLoggerSet getLoggerSetForTests() {

@Override
public void doPreUpgrade() throws IOException {
if (isJNInMaintenanceMode()) {
throw new IOException("doPreUpgrade() is not support while in jn maintenance mode");
}
QuorumCall<AsyncLogger, Void> call = loggers.doPreUpgrade();
try {
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
Expand All @@ -684,6 +725,9 @@ public void doPreUpgrade() throws IOException {

@Override
public void doUpgrade(Storage storage) throws IOException {
if (isJNInMaintenanceMode()) {
throw new IOException("doUpgrade() is not support while in jn maintenance mode");
}
QuorumCall<AsyncLogger, Void> call = loggers.doUpgrade(storage);
try {
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
Expand All @@ -701,6 +745,9 @@ public void doUpgrade(Storage storage) throws IOException {

@Override
public void doFinalize() throws IOException {
if (isJNInMaintenanceMode()) {
throw new IOException("doFinalize() is not support while in jn maintenance mode");
}
QuorumCall<AsyncLogger, Void> call = loggers.doFinalize();
try {
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
Expand All @@ -719,6 +766,9 @@ public void doFinalize() throws IOException {
@Override
public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
int targetLayoutVersion) throws IOException {
if (isJNInMaintenanceMode()) {
throw new IOException("canRollBack() is not support while in jn maintenance mode");
}
QuorumCall<AsyncLogger, Boolean> call = loggers.canRollBack(storage,
prevStorage, targetLayoutVersion);
try {
Expand Down Expand Up @@ -753,6 +803,9 @@ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,

@Override
public void doRollback() throws IOException {
if (isJNInMaintenanceMode()) {
throw new IOException("doRollback() is not support while in jn maintenance mode");
}
QuorumCall<AsyncLogger, Void> call = loggers.doRollback();
try {
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
Expand All @@ -770,6 +823,9 @@ public void doRollback() throws IOException {

@Override
public void discardSegments(long startTxId) throws IOException {
if (isJNInMaintenanceMode()) {
throw new IOException("discardSegments() is not support while in jn maintenance mode");
}
QuorumCall<AsyncLogger, Void> call = loggers.discardSegments(startTxId);
try {
call.waitFor(loggers.size(), loggers.size(), 0,
Expand All @@ -789,6 +845,9 @@ public void discardSegments(long startTxId) throws IOException {

@Override
public long getJournalCTime() throws IOException {
if (isJNInMaintenanceMode()) {
throw new IOException("getJournalCTime() is not support while in jn maintenance mode");
}
QuorumCall<AsyncLogger, Long> call = loggers.getJournalCTime();
try {
call.waitFor(loggers.size(), loggers.size(), 0,
Expand Down Expand Up @@ -819,4 +878,12 @@ public long getJournalCTime() throws IOException {

throw new AssertionError("Unreachable code.");
}

public void setQuorumJournalCount(int quorumJournalCount) {
this.quorumJournalCount = quorumJournalCount;
}

private boolean isJNInMaintenanceMode() {
return this.loggers.size() < quorumJournalCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class HostSet implements Iterable<InetSocketAddress> {
* The function that checks whether there exists an entry foo in the set
* so that foo &lt;= addr.
*/
boolean matchedBy(InetSocketAddress addr) {
public boolean matchedBy(InetSocketAddress addr) {
Collection<Integer> ports = addrs.get(addr.getAddress());
return addr.getPort() == 0 ? !ports.isEmpty() : ports.contains(addr
.getPort());
Expand All @@ -55,23 +55,23 @@ boolean matchedBy(InetSocketAddress addr) {
* The function that checks whether there exists an entry foo in the set
* so that addr &lt;= foo.
*/
boolean match(InetSocketAddress addr) {
public boolean match(InetSocketAddress addr) {
int port = addr.getPort();
Collection<Integer> ports = addrs.get(addr.getAddress());
boolean exactMatch = ports.contains(port);
boolean genericMatch = ports.contains(0);
return exactMatch || genericMatch;
}

boolean isEmpty() {
public boolean isEmpty() {
return addrs.isEmpty();
}

int size() {
public int size() {
return addrs.size();
}

void add(InetSocketAddress addr) {
public void add(InetSocketAddress addr) {
Preconditions.checkArgument(!addr.isUnresolved());
addrs.put(addr.getAddress(), addr.getPort());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6333,6 +6333,23 @@
</description>
</property>

<property>
<name>dfs.journalnode.maintenance.nodes</name>
<value></value>
<description>
In the case that one out of three journal nodes being down, theoretically HDFS can still
function. However, in reality, the unavailable journal node may not recover quickly. During
this period, when we need to restart an Namenode, the Namenode will try to connect to the
unavailable journal node through the lengthy RPC retry mechanism, resulting in a long
initialization time for the Namenode. By adding these unavailable journal nodes to the
maintenance nodes, we will skip these unavailable journal nodes during Namenode initialization
and thus reduce namenode startup time.
1-node example values: jn01:8485
2-node example values: jn01:8485,jn02:8485
</description>
</property>


<property>
<name>dfs.namenode.lease-hard-limit-sec</name>
<value>1200</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -72,6 +73,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.HostSet;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
Expand Down Expand Up @@ -1137,4 +1139,24 @@ public void testAddTransferRateMetricForInvalidValue() {
DFSUtil.addTransferRateMetric(mockMetrics, 100, 0);
verify(mockMetrics, times(0)).addReadTransferRate(anyLong());
}

@Test
public void testGetHostSet() {
String[] testAddrs = new String[] {"unreachable-host1.com:9000", "unreachable-host2.com:9000"};
HostSet hostSet = DFSUtil.getHostSet(testAddrs);
assertNotNull(hostSet);
assertEquals(0, hostSet.size());

String strAddress = "localhost";
testAddrs = new String[] {strAddress};
hostSet = DFSUtil.getHostSet(testAddrs);
assertEquals(0, hostSet.size());

strAddress = "localhost:9000";
InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 9000);
testAddrs = new String[] {strAddress};
hostSet = DFSUtil.getHostSet(testAddrs);
assertNotNull(hostSet);
assertTrue(hostSet.match(inetSocketAddress));
}
}
Loading