Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2163,6 +2163,16 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
return null;
}

// skip if source datanodes for reconstructing ec block are not enough
if (block.isStriped()) {
BlockInfoStriped stripedBlock = (BlockInfoStriped) block;
if (stripedBlock.getRealDataBlockNum() > srcNodes.length) {
LOG.debug("Block {} cannot be reconstructed due to shortage of source datanodes ", block);
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we increment the metrics before returning null

    NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();

}
}

// liveReplicaNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,102 @@ public void testChooseSrcDNWithDupECInDecommissioningNode() throws Exception {
0, numReplicas.redundantInternalBlocks());
}

@Test
public void testSkipReconstructionWithManyBusyNodes() {
long blockId = -9223372036854775776L; // real ec block id
// RS-3-2 EC policy
ErasureCodingPolicy ecPolicy =
SystemErasureCodingPolicies.getPolicies().get(1);

// create an EC block group: 3 data blocks + 2 parity blocks
Block aBlockGroup = new Block(blockId, ecPolicy.getCellSize() * ecPolicy.getNumDataUnits(), 0);
BlockInfoStriped aBlockInfoStriped = new BlockInfoStriped(aBlockGroup, ecPolicy);

// create 4 storageInfo, which means 1 block is missing
DatanodeStorageInfo ds1 = DFSTestUtil.createDatanodeStorageInfo(
"storage1", "1.1.1.1", "rack1", "host1");
DatanodeStorageInfo ds2 = DFSTestUtil.createDatanodeStorageInfo(
"storage2", "2.2.2.2", "rack2", "host2");
DatanodeStorageInfo ds3 = DFSTestUtil.createDatanodeStorageInfo(
"storage3", "3.3.3.3", "rack3", "host3");
DatanodeStorageInfo ds4 = DFSTestUtil.createDatanodeStorageInfo(
"storage4", "4.4.4.4", "rack4", "host4");

// link block with storage
aBlockInfoStriped.addStorage(ds1, aBlockGroup);
aBlockInfoStriped.addStorage(ds2, new Block(blockId + 1, 0, 0));
aBlockInfoStriped.addStorage(ds3, new Block(blockId + 2, 0, 0));
aBlockInfoStriped.addStorage(ds4, new Block(blockId + 3, 0, 0));

addEcBlockToBM(blockId, ecPolicy);
aBlockInfoStriped.setBlockCollectionId(mockINodeId);

// reconstruction should be scheduled
BlockReconstructionWork work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
assertNotNull(work);

// simulate the 2 nodes reach maxReplicationStreams
for(int i = 0; i < bm.maxReplicationStreams; i++){
ds3.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
ds4.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
}

// reconstruction should be skipped since the number of non-busy nodes are not enough
work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
assertNull(work);
}

@Test
public void testSkipReconstructionWithManyBusyNodes2() {
long blockId = -9223372036854775776L; // real ec block id
// RS-3-2 EC policy
ErasureCodingPolicy ecPolicy =
SystemErasureCodingPolicies.getPolicies().get(1);

// create an EC block group: 2 data blocks + 2 parity blocks
Block aBlockGroup = new Block(blockId,
ecPolicy.getCellSize() * (ecPolicy.getNumDataUnits() - 1), 0);
BlockInfoStriped aBlockInfoStriped = new BlockInfoStriped(aBlockGroup, ecPolicy);

// create 3 storageInfo, which means 1 block is missing
DatanodeStorageInfo ds1 = DFSTestUtil.createDatanodeStorageInfo(
"storage1", "1.1.1.1", "rack1", "host1");
DatanodeStorageInfo ds2 = DFSTestUtil.createDatanodeStorageInfo(
"storage2", "2.2.2.2", "rack2", "host2");
DatanodeStorageInfo ds3 = DFSTestUtil.createDatanodeStorageInfo(
"storage3", "3.3.3.3", "rack3", "host3");

// link block with storage
aBlockInfoStriped.addStorage(ds1, aBlockGroup);
aBlockInfoStriped.addStorage(ds2, new Block(blockId + 1, 0, 0));
aBlockInfoStriped.addStorage(ds3, new Block(blockId + 2, 0, 0));

addEcBlockToBM(blockId, ecPolicy);
aBlockInfoStriped.setBlockCollectionId(mockINodeId);

// reconstruction should be scheduled
BlockReconstructionWork work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
assertNotNull(work);

// simulate the 1 node reaches maxReplicationStreams
for(int i = 0; i < bm.maxReplicationStreams; i++){
ds2.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
}

// reconstruction should still be scheduled since there are 2 source nodes to create 2 blocks
work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
assertNotNull(work);

// simulate the 1 more node reaches maxReplicationStreams
for(int i = 0; i < bm.maxReplicationStreams; i++){
ds3.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
}

// reconstruction should be skipped since the number of non-busy nodes are not enough
work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
assertNull(work);
}

@Test
public void testFavorDecomUntilHardLimit() throws Exception {
bm.maxReplicationStreams = 0;
Expand Down