Skip to content

Commit eede260

Browse files
m1a2stk-apol
authored andcommitted
KAFKA-19530 RemoteLogManager should record lag stats when remote storage is offline (apache#20218)
When remote storage is offline, then the segmentLag and bytesLag metrics are not recorded. These metrics are useful to know the pending data to upload when remote storage is down. Reviewers: TaiJuWu <[email protected]>, Kamal Chandraprakash <[email protected]>
1 parent 2ffb306 commit eede260

File tree

2 files changed

+12
-0
lines changed

2 files changed

+12
-0
lines changed

storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -982,6 +982,9 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
982982
segmentIdsBeingCopied.add(segmentId);
983983
try {
984984
copyLogSegment(log, candidateLogSegment.logSegment, segmentId, candidateLogSegment.nextSegmentOffset);
985+
} catch (Exception e) {
986+
recordLagStats(log);
987+
throw e;
985988
} finally {
986989
segmentIdsBeingCopied.remove(segmentId);
987990
}
@@ -1088,6 +1091,10 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, RemoteLogSegment
10881091
logger.info("Copied {} to remote storage with segment-id: {}",
10891092
logFileName, copySegmentFinishedRlsm.remoteLogSegmentId());
10901093

1094+
recordLagStats(log);
1095+
}
1096+
1097+
private void recordLagStats(UnifiedLog log) {
10911098
long bytesLag = log.onlyLocalLogSegmentsSize() - log.activeSegment().size();
10921099
long segmentsLag = log.onlyLocalLogSegmentsCount() - 1;
10931100
recordLagStats(bytesLag, segmentsLag);

storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,8 @@ void testFailedCopyShouldDeleteTheDanglingSegment() throws Exception {
691691
long lastStableOffset = 150L;
692692
long logEndOffset = 150L;
693693

694+
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(12L);
695+
when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(2L);
694696
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
695697

696698
// leader epoch preparation
@@ -708,6 +710,7 @@ void testFailedCopyShouldDeleteTheDanglingSegment() throws Exception {
708710

709711
when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
710712
when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
713+
when(activeSegment.size()).thenReturn(2);
711714
verify(oldSegment, times(0)).readNextOffset();
712715
verify(activeSegment, times(0)).readNextOffset();
713716

@@ -764,6 +767,8 @@ void testFailedCopyShouldDeleteTheDanglingSegment() throws Exception {
764767
assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
765768
assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
766769
assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
770+
assertEquals(10, brokerTopicStats.allTopicsStats().remoteCopyLagBytesAggrMetric().value());
771+
assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyLagSegmentsAggrMetric().value());
767772
}
768773

769774
@Test

0 commit comments

Comments
 (0)