From c4301d6a470f1b423dee8d3591b4026d45df87a7 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Tue, 5 Aug 2025 11:40:52 +0800 Subject: [PATCH 1/2] HDFS-17816. EC writing supports slow datanode isolation by end block group in advance. --- .../apache/hadoop/hdfs/DFSOutputStream.java | 3 +- .../hadoop/hdfs/DFSStripedOutputStream.java | 52 ++++++++++++++++++- .../org/apache/hadoop/hdfs/DataStreamer.java | 21 ++++++++ .../hdfs/client/HdfsClientConfigKeys.java | 3 ++ .../src/main/resources/hdfs-default.xml | 9 ++++ 5 files changed, 85 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index a1bfb7f5d594e..1b4b3499029f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -570,10 +570,11 @@ void setBytesCurBlock(final long bytesCurBlock) { * @throws IOException */ void endBlock() throws IOException { - if (getStreamer().getBytesCurBlock() == blockSize) { + if (getStreamer().getBytesCurBlock() == blockSize || getStreamer().isEndBlockFlag()) { setCurrentPacketToEmpty(); enqueueCurrentPacket(); getStreamer().setBytesCurBlock(0); + getStreamer().setEndBlockFlag(false); lastFlushOffset = 0; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index a6f703fcd43cf..f9c1a69397b5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -73,6 +73,9 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_EC_MAX_END_BLOCKGROUP_INADVANCE_COUNT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_EC_MAX_END_BLOCKGROUP_INADVANCE_COUNT_DEFAULT; + /** * This class supports writing files in striped layout and erasure coded format. * Each stripe contains a sequence of cells. @@ -283,6 +286,9 @@ private void flipDataBuffers() { private CompletionService flushAllExecutorCompletionService; private int blockGroupIndex; private long datanodeRestartTimeout; + private boolean endBlockGroupInAdvanceFlag = false; + private int endBlockGroupInAdvanceCount; + private int maxEndBlockGroupInAdvanceCount; /** Construct a new output stream for creating a file. */ DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, @@ -322,6 +328,10 @@ private void flipDataBuffers() { currentPackets = new DFSPacket[streamers.size()]; datanodeRestartTimeout = dfsClient.getConf().getDatanodeRestartTimeout(); setCurrentStreamer(0); + + maxEndBlockGroupInAdvanceCount = dfsClient.getConfiguration().getInt( + DFS_CLIENT_EC_MAX_END_BLOCKGROUP_INADVANCE_COUNT, + DFS_CLIENT_EC_MAX_END_BLOCKGROUP_INADVANCE_COUNT_DEFAULT); } /** Construct a new output stream for appending to a file. */ @@ -546,6 +556,34 @@ private boolean shouldEndBlockGroup() { currentBlockGroup.getNumBytes() == blockSize * numDataBlocks; } + private boolean shouldEndblockGroupInAdvance() { + Set slowStreamers = checkSlowStreamersWithoutThrowException(); + boolean meetSlowStreamer = !slowStreamers.isEmpty() && + getEndBlockGroupInAdvanceCount() < getMaxEndBlockGroupInAdvanceCount();; + boolean stripeFull = currentBlockGroup.getNumBytes() > 0 && + currentBlockGroup.getNumBytes() % ((long) numDataBlocks * cellSize) == 0; + if (meetSlowStreamer && stripeFull) { + LOG.info("Block group {} ends in advance.", currentBlockGroup); + this.endBlockGroupInAdvanceFlag = true; + this.endBlockGroupInAdvanceCount++; + return true; + } + return false; + } + + /** + * @return slow stripedDataStreamer set. + */ + private Set checkSlowStreamersWithoutThrowException() { + Set slowStreamers = new HashSet<>(); + for (StripedDataStreamer s : streamers) { + if (s.isHealthy() && s.isCurrentStreamerSlow()) { + slowStreamers.add(s); + } + } + return slowStreamers; + } + @Override protected synchronized void writeChunk(byte[] bytes, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException { @@ -553,7 +591,8 @@ protected synchronized void writeChunk(byte[] bytes, int offset, int len, final int pos = cellBuffers.addTo(index, bytes, offset, len); final boolean cellFull = pos == cellSize; - if (currentBlockGroup == null || shouldEndBlockGroup()) { + if (currentBlockGroup == null || shouldEndBlockGroup() || endBlockGroupInAdvanceFlag) { + this.endBlockGroupInAdvanceFlag = false; // the incoming data should belong to a new block. Allocate a new block. allocateNewBlock(); } @@ -583,13 +622,14 @@ protected synchronized void writeChunk(byte[] bytes, int offset, int len, next = 0; // if this is the end of the block group, end each internal block - if (shouldEndBlockGroup()) { + if (shouldEndBlockGroup() || shouldEndblockGroupInAdvance()) { flushAllInternals(); checkStreamerFailures(false); for (int i = 0; i < numAllBlocks; i++) { final StripedDataStreamer s = setCurrentStreamer(i); if (s.isHealthy()) { try { + getStreamer().setEndBlockFlag(true); endBlock(); } catch (IOException ignored) {} } @@ -1367,4 +1407,12 @@ private void logCorruptBlocks() { ExtendedBlock getBlock() { return currentBlockGroup; } + + public int getEndBlockGroupInAdvanceCount() { + return endBlockGroupInAdvanceCount; + } + + public int getMaxEndBlockGroupInAdvanceCount() { + return maxEndBlockGroupInAdvanceCount; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 8d13640eadb18..6877fc4b1739d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -540,6 +540,9 @@ boolean doWaitForRestart() { private final String[] favoredNodes; private final EnumSet addBlockFlags; + protected volatile boolean endBlockFlag = false; + protected volatile boolean currentStreamerSlow = false; + private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, @@ -1326,10 +1329,16 @@ void markSlowNode(List slownodesFromAck) throws IOException { } for (DatanodeInfo discontinuousNode : discontinuousNodes) { slowNodeMap.remove(discontinuousNode); + if (DataStreamer.this instanceof StripedDataStreamer) { + currentStreamerSlow = false; + } } if (!slowNodeMap.isEmpty()) { for (Map.Entry entry : slowNodeMap.entrySet()) { + if (DataStreamer.this instanceof StripedDataStreamer) { + currentStreamerSlow = true; + } if (entry.getValue() >= markSlowNodeAsBadNodeThreshold) { DatanodeInfo slowNode = entry.getKey(); int index = getDatanodeIndex(slowNode); @@ -2285,4 +2294,16 @@ public String toString() { return extendedBlock == null ? "block==null" : "" + extendedBlock.getLocalBlock(); } + + public boolean isEndBlockFlag() { + return endBlockFlag; + } + + public void setEndBlockFlag(boolean endBlockFlag) { + this.endBlockFlag = endBlockFlag; + } + + public boolean isCurrentStreamerSlow() { + return currentStreamerSlow; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 2044530506757..8413e2659fce4 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -159,6 +159,9 @@ public interface HdfsClientConfigKeys { String DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_KEY = "dfs.client.mark.slownode.as.badnode.threshold"; int DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_DEFAULT = 10; + String DFS_CLIENT_EC_MAX_END_BLOCKGROUP_INADVANCE_COUNT = + "dfs.client.ec.max.end.blockgroup.inadvance.count"; + int DFS_CLIENT_EC_MAX_END_BLOCKGROUP_INADVANCE_COUNT_DEFAULT = 10; String DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS = "dfs.client.key.provider.cache.expiry"; long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 41dfbbca443fe..3bd44d8f3c5a0 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -6631,6 +6631,15 @@ + + dfs.client.ec.max.end.blockgroup.inadvance.count + 10 + + The maximum number of end block group in advance that one DFSStripedOutputStream + can reach. + + + dfs.datanode.lockmanager.trace false From 3813ab25f9d40ed9869118ee9b83749b25f7a4b7 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Tue, 5 Aug 2025 17:07:53 +0800 Subject: [PATCH 2/2] fix checkstyle. --- .../java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java | 2 +- .../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index f9c1a69397b5e..f7390d89f4858 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -559,7 +559,7 @@ private boolean shouldEndBlockGroup() { private boolean shouldEndblockGroupInAdvance() { Set slowStreamers = checkSlowStreamersWithoutThrowException(); boolean meetSlowStreamer = !slowStreamers.isEmpty() && - getEndBlockGroupInAdvanceCount() < getMaxEndBlockGroupInAdvanceCount();; + getEndBlockGroupInAdvanceCount() < getMaxEndBlockGroupInAdvanceCount(); boolean stripeFull = currentBlockGroup.getNumBytes() > 0 && currentBlockGroup.getNumBytes() % ((long) numDataBlocks * cellSize) == 0; if (meetSlowStreamer && stripeFull) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 6877fc4b1739d..a91844c9a4a75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -540,8 +540,8 @@ boolean doWaitForRestart() { private final String[] favoredNodes; private final EnumSet addBlockFlags; - protected volatile boolean endBlockFlag = false; - protected volatile boolean currentStreamerSlow = false; + private volatile boolean endBlockFlag = false; + private volatile boolean currentStreamerSlow = false; private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, String src,