Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,10 @@ public void setIndices(byte[] indices) {
this.indices = indices;
}

public byte[] getIndices() {
return this.indices;
}

/**
* Adjust EC block indices,it will remove the element of adjustList from indices.
* @param adjustList the list will be removed from indices
Expand Down Expand Up @@ -889,8 +893,8 @@ private long getBlockList() throws IOException, IllegalArgumentException {
if (g != null) { // not unknown
block.addLocation(g);
} else if (blkLocs instanceof StripedBlockWithLocations) {
// some datanode may not in storageGroupMap due to decommission operation
// or balancer cli with "-exclude" parameter
// some datanode may not in storageGroupMap due to decommission or maintenance
// operation or balancer cli with "-exclude" parameter
adjustList.add(i);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.mover;

import org.apache.commons.cli.*;
import org.apache.commons.cli.Options;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
Expand Down Expand Up @@ -228,6 +230,27 @@ DBlock newDBlock(LocatedBlock lb, List<MLocation> locations,
db.addLocation(source);
}
}

List<Integer> adjustList = new ArrayList<>();
for (int i = 0; i < locations.size(); i++) {
MLocation ml = locations.get(i);
StorageGroup source = storages.getSource(ml);
if (source != null) {
db.addLocation(source);
} else if (lb.isStriped()) {
// some datanode may not in storages due to decommission or maintenance operation
// or balancer cli with "-exclude" parameter
adjustList.add(i);
}
}

if (!adjustList.isEmpty()) {
// block.locations mismatch with block.indices
// adjust indices to get correct internalBlock
((DBlockStriped) db).adjustIndices(adjustList);
Preconditions.checkArgument(((DBlockStriped) db).getIndices().length
== db.getLocations().size());
}
return db;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_KERBEROS_PRINCIPAL_KEY;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
Expand All @@ -82,10 +84,13 @@
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
Expand All @@ -98,6 +103,7 @@
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -1005,6 +1011,146 @@ public void testMoverWithStripedFile() throws Exception {
}
}

@Test(timeout = 300000)
public void testMoverWithStripedFileMaintenance() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConfWithStripe(conf);

// Start 9 datanodes
int numOfDatanodes = 9;
int storagesPerDatanode = 2;
long capacity = 9 * defaultBlockSize;
long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
for (int i = 0; i < numOfDatanodes; i++) {
for(int j = 0; j < storagesPerDatanode; j++){
capacities[i][j] = capacity;
}
}
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.storagesPerDatanode(storagesPerDatanode)
.storageTypes(new StorageType[][]{
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD}})
.storageCapacities(capacities)
.build();

try {
cluster.waitActive();
cluster.getFileSystem().enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());

ClientProtocol client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
String barDir = "/bar";
client.mkdirs(barDir, new FsPermission((short) 777), true);
// Set "/bar" directory with ALL_SSD storage policy.
client.setStoragePolicy(barDir, "ALL_SSD");
// Set an EC policy on "/bar" directory
client.setErasureCodingPolicy(barDir,
StripedFileTestUtil.getDefaultECPolicy().getName());

// Write file to barDir
final String fooFile = "/bar/foo";
long fileLen = 6 * defaultBlockSize;
DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
fileLen,(short) 3, 0);

// Verify storage types and locations
LocatedBlocks locatedBlocks =
client.getBlockLocations(fooFile, 0, fileLen);
DatanodeInfoWithStorage location = null;
for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){
location = lb.getLocations()[8];
for(StorageType type : lb.getStorageTypes()){
Assert.assertEquals(StorageType.SSD, type);
}
}

// Maintain the last datanode later
FSNamesystem ns = cluster.getNamesystem(0);
DatanodeManager datanodeManager = ns.getBlockManager().getDatanodeManager();
DatanodeDescriptor dn = datanodeManager.getDatanode(location.getDatanodeUuid());

StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
dataBlocks + parityBlocks);

// Start 5 more datanodes for mover
capacities = new long[5][storagesPerDatanode];
for (int i = 0; i < 5; i++) {
for(int j = 0; j < storagesPerDatanode; j++){
capacities[i][j] = capacity;
}
}
cluster.startDataNodes(conf, 5,
new StorageType[][]{
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK}},
true, null, null, null, capacities,
null, false, false, false, null, null, null);
cluster.triggerHeartbeats();

// Move blocks to DISK
client.setStoragePolicy(barDir, "HOT");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] { "-p", barDir });

// Maintain a datanode that simulates that one node in the location list
// is in ENTERING_MAINTENANCE status.
datanodeManager.getDatanode(dn.getDatanodeUuid()).startMaintenance();
waitNodeState(dn, DatanodeInfo.AdminStates.ENTERING_MAINTENANCE);

// Move blocks back to SSD.
// Without HDFS-17599, locations and indices lengths might not match,
// resulting in getting the wrong blockId in DBlockStriped#getInternalBlock,
// and mover will fail to run.
client.setStoragePolicy(barDir, "ALL_SSD");
rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] { "-p", barDir });

Assert.assertEquals("Movement to HOT should be successful", 0, rc);
} finally {
cluster.shutdown();
}
}

/**
* Wait till DataNode is transitioned to the expected state.
*/
protected void waitNodeState(DatanodeInfo node, DatanodeInfo.AdminStates state) {
waitNodeState(Lists.newArrayList(node), state);
}

/**
* Wait till all DataNodes are transitioned to the expected state.
*/
protected void waitNodeState(List<DatanodeInfo> nodes, DatanodeInfo.AdminStates state) {
for (DatanodeInfo node : nodes) {
boolean done = (state == node.getAdminState());
while (!done) {
LOG.info("Waiting for node " + node + " to change state to "
+ state + " current state: " + node.getAdminState());
try {
Thread.sleep(DFS_HEARTBEAT_INTERVAL_DEFAULT * 10);
} catch (InterruptedException e) {
// nothing
}
done = (state == node.getAdminState());
}
LOG.info("node " + node + " reached the state " + state);
}
}

/**
* Wait until Namenode reports expected storage type for all blocks of
* given file.
Expand Down
Loading