Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import org.apache.hadoop.net.Node;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

class ErasureCodingWork extends BlockReconstructionWork {
private final byte[] liveBlockIndices;
Expand Down Expand Up @@ -145,7 +147,17 @@ boolean addTaskToDatanode(NumberReplicas numberReplicas) {
// if we already have all the internal blocks, but not enough racks,
// we only need to replicate one internal block to a new rack
int sourceIndex = chooseSource4SimpleReplication();
createReplicationWork(sourceIndex, targets[0]);

// Try to find a target on a new rack
Set<String> racks = Arrays.stream(getSrcNodes())
.map(DatanodeDescriptor::getNetworkLocation)
.collect(Collectors.toSet());
DatanodeStorageInfo targetOnNewRack = Arrays.stream(targets).filter(target ->
target.getDatanodeDescriptor() != null &&
!racks.contains(target.getDatanodeDescriptor().getNetworkLocation())
).findFirst().orElse(targets[0]);

createReplicationWork(sourceIndex, targetOnNewRack);
} else if ((numberReplicas.decommissioning() > 0 ||
numberReplicas.liveEnteringMaintenanceReplicas() > 0) &&
hasAllInternalBlocks()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
import static org.apache.hadoop.util.Time.now;
Expand Down Expand Up @@ -231,8 +233,13 @@ static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src,
FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
previous, onRetryBlock);
final INodeFile pendingFile = fileState.inode;
final BlockType blockType = pendingFile.getBlockType();
src = fileState.path;

if (blockType == BlockType.STRIPED) {
targets = reorderTargetsForEC(targets);
}

if (onRetryBlock[0] != null) {
if (onRetryBlock[0].getLocations().length > 0) {
// This is a retry. Just return the last block if having locations.
Expand All @@ -241,7 +248,7 @@ static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src,
// add new chosen targets to already allocated block and return
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
lastBlockInFile.getUnderConstructionFeature().setExpectedLocations(
lastBlockInFile, targets, pendingFile.getBlockType());
lastBlockInFile, targets, blockType);
offset = pendingFile.computeFileSize();
return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
}
Expand All @@ -264,6 +271,42 @@ static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src,
return makeLocatedBlock(fsn, fsn.getStoredBlock(newBlock), targets, offset);
}

/**
* Reorder targets so that small EC files that don't fill the stripe width are properly distributed across racks.
* <p>
* For example, an RS-6-3-1024K EC file smaller than 1024K will have 1 data block and 3 parity blocks.
* While all 9 (6 + 3) storage locations are chosen by the block placement policy, only 4 of them are used,
* and the last 3 locations are for parity blocks. If the cluster has a very small number of racks (e.g. 3),
* with the current scheme to find a pipeline with the shortest path, the last nodes are likely to be in
* the same rack, leading to a suboptimal rack distribution.
* This method reorders the targets so that the parity blocks are distributed across as many racks as possible.
*/
private static DatanodeStorageInfo[] reorderTargetsForEC(DatanodeStorageInfo[] targets) {
List<List<DatanodeStorageInfo>> rackTargets = new ArrayList<>(
Arrays.stream(targets).collect(
Collectors.groupingBy(storage -> storage.getDatanodeDescriptor().getNetworkLocation())
).values()
);
// Only reorder if there are fewer racks than the number of data blocks and parity blocks.
int rackCount = rackTargets.size();
if (rackCount >= targets.length) {
return targets;
}
List<DatanodeStorageInfo> reordered = new LinkedList<>();
int rackIndex = 0;
while (reordered.size() < targets.length) {
List<DatanodeStorageInfo> rackNodes = rackTargets.get(rackIndex);
if (!rackNodes.isEmpty()) {
// We're building the list in reverse order to ensure that the last targets for the parity blocks are
// distributed across as many racks as possible, even when the racks have different number of nodes.
reordered.add(0, rackNodes.remove(0));
}
rackIndex = (rackIndex + 1) % rackCount;
}

return reordered.toArray(DatanodeStorageInfo.EMPTY_ARRAY);
}

static DatanodeStorageInfo[] chooseTargetForNewBlock(
BlockManager bm, String src, DatanodeInfo[] excludedNodes,
String[] favoredNodes, EnumSet<AddBlockFlag> flags,
Expand Down
Loading