Skip to content

Commit bab8169

Browse files
authored
fix(image): guard streams image access with lock to prevent data loss (#2653)
fix(image): guard streams image access with lock to prevent compaction skip data Signed-off-by: Robin Han <[email protected]>
1 parent 7c9cdd0 commit bab8169

File tree

2 files changed

+39
-24
lines changed

2 files changed

+39
-24
lines changed

core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -221,19 +221,21 @@ public List<StreamMetadata> getStreamMetadataList(List<Long> streamIds) {
221221
try (Image image = getImage()) {
222222
final S3StreamsMetadataImage streamsImage = image.streamsMetadata();
223223

224-
List<StreamMetadata> streamMetadataList = new ArrayList<>();
225-
for (Long streamId : streamIds) {
226-
S3StreamMetadataImage streamImage = streamsImage.timelineStreamMetadata().get(streamId);
227-
if (streamImage == null) {
228-
LOGGER.warn("[GetStreamMetadataList]: stream: {} not exists", streamId);
229-
continue;
224+
List<StreamMetadata> streamMetadataList = new ArrayList<>(streamIds.size());
225+
streamsImage.inLockRun(() -> {
226+
for (Long streamId : streamIds) {
227+
S3StreamMetadataImage streamImage = streamsImage.timelineStreamMetadata().get(streamId);
228+
if (streamImage == null) {
229+
LOGGER.warn("[GetStreamMetadataList]: stream: {} not exists", streamId);
230+
continue;
231+
}
232+
// If there is a streamImage, it means the stream exists.
233+
@SuppressWarnings("OptionalGetWithoutIsPresent") long endOffset = streamsImage.streamEndOffset(streamId).getAsLong();
234+
StreamMetadata streamMetadata = new StreamMetadata(streamId, streamImage.getEpoch(),
235+
streamImage.getStartOffset(), endOffset, streamImage.state());
236+
streamMetadataList.add(streamMetadata);
230237
}
231-
// If there is a streamImage, it means the stream exists.
232-
@SuppressWarnings("OptionalGetWithoutIsPresent") long endOffset = streamsImage.streamEndOffset(streamId).getAsLong();
233-
StreamMetadata streamMetadata = new StreamMetadata(streamId, streamImage.getEpoch(),
234-
streamImage.getStartOffset(), endOffset, streamImage.state());
235-
streamMetadataList.add(streamMetadata);
236-
}
238+
});
237239
return streamMetadataList;
238240
}
239241
}

metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,7 @@ public int hashCode() {
595595
return Objects.hash(nextAssignedStreamId, streamMetadataList(), nodeMetadataList(), streamEndOffsets());
596596
}
597597

598+
// caller use this value should be protected by registryRef lock
598599
public TimelineHashMap<Integer, NodeS3StreamSetObjectMetadataImage> timelineNodeMetadata() {
599600
return nodeMetadataMap;
600601
}
@@ -610,10 +611,15 @@ List<NodeS3StreamSetObjectMetadataImage> nodeMetadataList() {
610611
});
611612
}
612613

614+
// caller use this value should be protected by registryRef lock
613615
public TimelineHashMap<Long, S3StreamMetadataImage> timelineStreamMetadata() {
614616
return streamMetadataMap;
615617
}
616618

619+
public void inLockRun(Runnable runnable) {
620+
registryRef.inLock(runnable);
621+
}
622+
617623
List<S3StreamMetadataImage> streamMetadataList() {
618624
if (registryRef == RegistryRef.NOOP) {
619625
return Collections.emptyList();
@@ -630,30 +636,33 @@ public long nextAssignedStreamId() {
630636
}
631637

632638
public OptionalLong streamEndOffset(long streamId) {
633-
Long endOffset = streamEndOffsets.get(streamId);
634-
if (endOffset != null) {
635-
return OptionalLong.of(endOffset);
636-
}
637-
// There is no record in a new stream
638-
if (streamMetadataMap.containsKey(streamId)) {
639-
return OptionalLong.of(0L);
640-
} else {
639+
if (registryRef == RegistryRef.NOOP) {
641640
return OptionalLong.empty();
642641
}
642+
return registryRef.inLock(() -> {
643+
Long endOffset = streamEndOffsets.get(streamId);
644+
if (endOffset != null) {
645+
return OptionalLong.of(endOffset);
646+
}
647+
// There is no record in a new stream
648+
if (streamMetadataMap.containsKey(streamId)) {
649+
return OptionalLong.of(0L);
650+
} else {
651+
return OptionalLong.empty();
652+
}
653+
});
643654
}
644655

656+
// caller use this value should be protected by registryRef lock
645657
TimelineHashMap<TopicIdPartition, Set<Long>> partition2streams() {
646658
return partition2streams;
647659
}
648660

661+
// caller use this value should be protected by registryRef lock
649662
TimelineHashMap<Long, TopicIdPartition> stream2partition() {
650663
return stream2partition;
651664
}
652665

653-
RegistryRef registryRef() {
654-
return registryRef;
655-
}
656-
657666
// caller use this value should be protected by registryRef lock
658667
TimelineHashMap<Long, Long> timelineStreamEndOffsets() {
659668
return streamEndOffsets;
@@ -671,6 +680,10 @@ Map<Long, Long> streamEndOffsets() {
671680
});
672681
}
673682

683+
RegistryRef registryRef() {
684+
return registryRef;
685+
}
686+
674687
@Override
675688
public String toString() {
676689
return "S3StreamsMetadataImage{nextAssignedStreamId=" + nextAssignedStreamId + '}';

0 commit comments

Comments
 (0)