Skip to content

HDFS-17816. EC writing supports slow datanode isolation by end block group in advance. #7856

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -570,10 +570,11 @@ void setBytesCurBlock(final long bytesCurBlock) {
* @throws IOException
*/
void endBlock() throws IOException {
if (getStreamer().getBytesCurBlock() == blockSize) {
if (getStreamer().getBytesCurBlock() == blockSize || getStreamer().isEndBlockFlag()) {
setCurrentPacketToEmpty();
enqueueCurrentPacket();
getStreamer().setBytesCurBlock(0);
getStreamer().setEndBlockFlag(false);
lastFlushOffset = 0;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_EC_MAX_END_BLOCKGROUP_INADVANCE_COUNT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_EC_MAX_END_BLOCKGROUP_INADVANCE_COUNT_DEFAULT;

/**
* This class supports writing files in striped layout and erasure coded format.
* Each stripe contains a sequence of cells.
Expand Down Expand Up @@ -283,6 +286,9 @@ private void flipDataBuffers() {
private CompletionService<Void> flushAllExecutorCompletionService;
private int blockGroupIndex;
private long datanodeRestartTimeout;
private boolean endBlockGroupInAdvanceFlag = false;
private int endBlockGroupInAdvanceCount;
private int maxEndBlockGroupInAdvanceCount;

/** Construct a new output stream for creating a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
Expand Down Expand Up @@ -322,6 +328,10 @@ private void flipDataBuffers() {
currentPackets = new DFSPacket[streamers.size()];
datanodeRestartTimeout = dfsClient.getConf().getDatanodeRestartTimeout();
setCurrentStreamer(0);

maxEndBlockGroupInAdvanceCount = dfsClient.getConfiguration().getInt(
DFS_CLIENT_EC_MAX_END_BLOCKGROUP_INADVANCE_COUNT,
DFS_CLIENT_EC_MAX_END_BLOCKGROUP_INADVANCE_COUNT_DEFAULT);
}

/** Construct a new output stream for appending to a file. */
Expand Down Expand Up @@ -546,14 +556,43 @@ private boolean shouldEndBlockGroup() {
currentBlockGroup.getNumBytes() == blockSize * numDataBlocks;
}

private boolean shouldEndblockGroupInAdvance() {
Set<StripedDataStreamer> slowStreamers = checkSlowStreamersWithoutThrowException();
boolean meetSlowStreamer = !slowStreamers.isEmpty() &&
getEndBlockGroupInAdvanceCount() < getMaxEndBlockGroupInAdvanceCount();
boolean stripeFull = currentBlockGroup.getNumBytes() > 0 &&
currentBlockGroup.getNumBytes() % ((long) numDataBlocks * cellSize) == 0;
if (meetSlowStreamer && stripeFull) {
LOG.info("Block group {} ends in advance.", currentBlockGroup);
this.endBlockGroupInAdvanceFlag = true;
this.endBlockGroupInAdvanceCount++;
return true;
}
return false;
}

/**
* @return slow stripedDataStreamer set.
*/
private Set<StripedDataStreamer> checkSlowStreamersWithoutThrowException() {
Set<StripedDataStreamer> slowStreamers = new HashSet<>();
for (StripedDataStreamer s : streamers) {
if (s.isHealthy() && s.isCurrentStreamerSlow()) {
slowStreamers.add(s);
}
}
return slowStreamers;
}

@Override
protected synchronized void writeChunk(byte[] bytes, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
final int index = getCurrentIndex();
final int pos = cellBuffers.addTo(index, bytes, offset, len);
final boolean cellFull = pos == cellSize;

if (currentBlockGroup == null || shouldEndBlockGroup()) {
if (currentBlockGroup == null || shouldEndBlockGroup() || endBlockGroupInAdvanceFlag) {
this.endBlockGroupInAdvanceFlag = false;
// the incoming data should belong to a new block. Allocate a new block.
allocateNewBlock();
}
Expand Down Expand Up @@ -583,13 +622,14 @@ protected synchronized void writeChunk(byte[] bytes, int offset, int len,
next = 0;

// if this is the end of the block group, end each internal block
if (shouldEndBlockGroup()) {
if (shouldEndBlockGroup() || shouldEndblockGroupInAdvance()) {
flushAllInternals();
checkStreamerFailures(false);
for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i);
if (s.isHealthy()) {
try {
getStreamer().setEndBlockFlag(true);
endBlock();
} catch (IOException ignored) {}
}
Expand Down Expand Up @@ -1367,4 +1407,12 @@ private void logCorruptBlocks() {
ExtendedBlock getBlock() {
return currentBlockGroup;
}

public int getEndBlockGroupInAdvanceCount() {
return endBlockGroupInAdvanceCount;
}

public int getMaxEndBlockGroupInAdvanceCount() {
return maxEndBlockGroupInAdvanceCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,9 @@ boolean doWaitForRestart() {
private final String[] favoredNodes;
private final EnumSet<AddBlockFlag> addBlockFlags;

private volatile boolean endBlockFlag = false;
private volatile boolean currentStreamerSlow = false;

private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
DFSClient dfsClient, String src,
Progressable progress, DataChecksum checksum,
Expand Down Expand Up @@ -1326,10 +1329,16 @@ void markSlowNode(List<DatanodeInfo> slownodesFromAck) throws IOException {
}
for (DatanodeInfo discontinuousNode : discontinuousNodes) {
slowNodeMap.remove(discontinuousNode);
if (DataStreamer.this instanceof StripedDataStreamer) {
currentStreamerSlow = false;
}
}

if (!slowNodeMap.isEmpty()) {
for (Map.Entry<DatanodeInfo, Integer> entry : slowNodeMap.entrySet()) {
if (DataStreamer.this instanceof StripedDataStreamer) {
currentStreamerSlow = true;
}
if (entry.getValue() >= markSlowNodeAsBadNodeThreshold) {
DatanodeInfo slowNode = entry.getKey();
int index = getDatanodeIndex(slowNode);
Expand Down Expand Up @@ -2285,4 +2294,16 @@ public String toString() {
return extendedBlock == null ?
"block==null" : "" + extendedBlock.getLocalBlock();
}

public boolean isEndBlockFlag() {
return endBlockFlag;
}

public void setEndBlockFlag(boolean endBlockFlag) {
this.endBlockFlag = endBlockFlag;
}

public boolean isCurrentStreamerSlow() {
return currentStreamerSlow;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ public interface HdfsClientConfigKeys {
String DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_KEY =
"dfs.client.mark.slownode.as.badnode.threshold";
int DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_DEFAULT = 10;
String DFS_CLIENT_EC_MAX_END_BLOCKGROUP_INADVANCE_COUNT =
"dfs.client.ec.max.end.blockgroup.inadvance.count";
int DFS_CLIENT_EC_MAX_END_BLOCKGROUP_INADVANCE_COUNT_DEFAULT = 10;
String DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS =
"dfs.client.key.provider.cache.expiry";
long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6631,6 +6631,15 @@
</description>
</property>

<property>
<name>dfs.client.ec.max.end.blockgroup.inadvance.count</name>
<value>10</value>
<description>
The maximum number of end block group in advance that one DFSStripedOutputStream
can reach.
</description>
</property>

<property>
<name>dfs.datanode.lockmanager.trace</name>
<value>false</value>
Expand Down