Skip to content

Commit 16d3d83

Browse files
committed
Batch Coordinate cache
1 parent 5875f03 commit 16d3d83

35 files changed

+1758
-85
lines changed

core/src/main/scala/kafka/server/DelayedFetch.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ class DelayedFetch(
183183
if (requests.isEmpty) return Some(0)
184184

185185
val response = try {
186-
replicaManager.findDisklessBatches(requests, Int.MaxValue)
186+
replicaManager.findDisklessBatches(requests)
187187
} catch {
188188
case e: Throwable =>
189189
error("Error while trying to find diskless batches on delayed fetch.", e)

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package kafka.server
1919
import com.yammer.metrics.core.Meter
2020
import io.aiven.inkless.common.SharedState
2121
import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler}
22-
import io.aiven.inkless.control_plane.{FindBatchRequest, FindBatchResponse, MetadataView}
22+
import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse, MetadataView}
2323
import io.aiven.inkless.delete.{DeleteRecordsInterceptor, FileCleaner, RetentionEnforcer}
2424
import io.aiven.inkless.merge.FileMerger
2525
import io.aiven.inkless.produce.AppendHandler
@@ -1719,12 +1719,33 @@ class ReplicaManager(val config: KafkaConfig,
17191719
}
17201720
}
17211721

1722-
def findDisklessBatches(requests: Seq[FindBatchRequest], maxBytes: Int): Option[util.List[FindBatchResponse]] = {
1722+
private def findDisklessBatchesThroughControlPlane(requests: Seq[FindBatchRequest], maxBytes: Int = Int.MaxValue): Option[util.List[FindBatchResponse]] = {
17231723
inklessSharedState.map { sharedState =>
17241724
sharedState.controlPlane().findBatches(requests.asJava, maxBytes, sharedState.config().maxBatchesPerPartitionToFind())
17251725
}
17261726
}
17271727

1728+
def findDisklessBatches(requests: Seq[FindBatchRequest]): Option[util.List[FindBatchResponse]] = {
1729+
inklessSharedState.flatMap { sharedState =>
1730+
if (!sharedState.isBatchCoordinateCacheEnabled) {
1731+
findDisklessBatchesThroughControlPlane(requests)
1732+
} else {
1733+
Some(requests.map { request =>
1734+
val logFragment = sharedState.batchCoordinateCache().get(request.topicIdPartition(), request.offset())
1735+
if (logFragment == null) {
1736+
FindBatchResponse.success(util.List.of(), -1, -1)
1737+
} else {
1738+
FindBatchResponse.success(
1739+
logFragment.batches().stream().map[BatchInfo](batchCoordinate => batchCoordinate.batchInfo()).toList,
1740+
logFragment.logStartOffset(),
1741+
logFragment.highWaterMark()
1742+
)
1743+
}
1744+
}.asJava)
1745+
}
1746+
}
1747+
}
1748+
17281749
def fetchDisklessMessages(params: FetchParams,
17291750
fetchInfos: Seq[(TopicIdPartition, PartitionData)]): CompletableFuture[Seq[(TopicIdPartition, FetchPartitionData)]] = {
17301751
inklessFetchHandler match {

core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ class DelayedFetchTest {
346346
when(mockResponse.highWatermark()).thenReturn(endOffset) // endOffset < fetchOffset (truncation)
347347

348348
val future = Some(Collections.singletonList(mockResponse))
349-
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future)
349+
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]])).thenReturn(future)
350350

351351
// Mock fetchDisklessMessages for onComplete
352352
when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], any[Float])).thenAnswer(_.getArgument(0))
@@ -435,7 +435,7 @@ class DelayedFetchTest {
435435
when(mockResponse.estimatedByteSize(fetchOffset)).thenReturn(estimatedBatchSize)
436436

437437
val future = Some(Collections.singletonList(mockResponse))
438-
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future)
438+
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]])).thenReturn(future)
439439

440440
when(replicaManager.readFromLog(
441441
fetchParams,
@@ -520,7 +520,7 @@ class DelayedFetchTest {
520520
when(mockResponse.estimatedByteSize(fetchOffset)).thenReturn(estimatedBatchSize)
521521

522522
val future = Some(Collections.singletonList(mockResponse))
523-
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future)
523+
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]])).thenReturn(future)
524524

525525
when(replicaManager.readFromLog(
526526
fetchParams,
@@ -602,7 +602,7 @@ class DelayedFetchTest {
602602
when(mockResponse.estimatedByteSize(fetchOffset)).thenReturn(estimatedBatchSize)
603603

604604
val future = Some(Collections.singletonList(mockResponse))
605-
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future)
605+
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]])).thenReturn(future)
606606

607607
// Mock fetchDisklessMessages for onComplete
608608
when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0))
@@ -686,7 +686,7 @@ class DelayedFetchTest {
686686
when(mockResponse.highWatermark()).thenReturn(600L)
687687

688688
val future = Some(Collections.singletonList(mockResponse))
689-
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future)
689+
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]])).thenReturn(future)
690690

691691
// Mock fetchDisklessMessages for onComplete
692692
when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0))

docs/inkless/configs.rst

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,21 @@ Under ``inkless.``
6363
* Valid Values: [0,...]
6464
* Importance: medium
6565

66+
``consume.batch.coordinate.cache.enabled``
67+
If true, the Batch Coordinate cache is enabled.
68+
69+
* Type: boolean
70+
* Default: true
71+
* Importance: low
72+
73+
``consume.batch.coordinate.cache.ttl.ms``
74+
Time to live in milliseconds for an entry in the Batch Coordinate cache. The time to live must be <= than half of the value of of file.cleaner.interval.ms.
75+
76+
* Type: int
77+
* Default: 5000 (5 seconds)
78+
* Valid Values: [1,...]
79+
* Importance: low
80+
6681
``consume.cache.block.bytes``
6782
The number of bytes to fetch as a single block from object storage when serving fetch requests.
6883

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Inkless
3+
* Copyright (C) 2024 - 2025 Aiven OY
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
17+
*/
18+
package io.aiven.inkless.cache;
19+
20+
import org.apache.kafka.common.TopicIdPartition;
21+
22+
import java.io.Closeable;
23+
24+
public interface BatchCoordinateCache extends Closeable {
25+
26+
LogFragment get(TopicIdPartition topicIdPartition, long offset);
27+
28+
void put(CacheBatchCoordinate cacheBatchCoordinate) throws IllegalStateException;
29+
30+
int invalidatePartition(TopicIdPartition topicIdPartition);
31+
32+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.aiven.inkless.cache;
2+
3+
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
4+
5+
import java.io.Closeable;
6+
import java.util.concurrent.atomic.LongAdder;
7+
8+
9+
public final class BatchCoordinateCacheMetrics implements Closeable {
10+
static final String CACHE_HITS = "CacheHits";
11+
static final String CACHE_MISSES = "CacheMisses";
12+
static final String CACHE_INVALIDATIONS = "CacheInvalidations";
13+
static final String CACHE_EVICTIONS = "CacheEvictions";
14+
static final String CACHE_SIZE = "CacheSize";
15+
16+
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(BatchCoordinateCache.class);
17+
private final LongAdder cacheHits = new LongAdder();
18+
private final LongAdder cacheMisses = new LongAdder();
19+
private final LongAdder cacheInvalidations = new LongAdder();
20+
private final LongAdder cacheEvictions = new LongAdder();
21+
private final LongAdder cacheSize = new LongAdder();
22+
23+
public BatchCoordinateCacheMetrics() {
24+
metricsGroup.newGauge(CACHE_HITS, cacheHits::intValue);
25+
metricsGroup.newGauge(CACHE_MISSES, cacheMisses::intValue);
26+
metricsGroup.newGauge(CACHE_INVALIDATIONS, cacheInvalidations::intValue);
27+
metricsGroup.newGauge(CACHE_EVICTIONS, cacheEvictions::intValue);
28+
metricsGroup.newGauge(CACHE_SIZE, cacheSize::intValue);
29+
}
30+
31+
public void recordCacheHit() {
32+
cacheHits.increment();
33+
}
34+
35+
public void recordCacheMiss() {
36+
cacheMisses.increment();
37+
}
38+
39+
public void recordCacheInvalidation() {
40+
cacheInvalidations.increment();
41+
}
42+
43+
public void recordCacheEviction() {
44+
cacheEvictions.increment();
45+
}
46+
47+
public void incrementCacheSize() {
48+
cacheSize.increment();
49+
}
50+
51+
public void decreaseCacheSize(int removedEntries) {
52+
cacheSize.add(-removedEntries);
53+
}
54+
55+
@Override
56+
public void close() {
57+
metricsGroup.removeMetric(CACHE_HITS);
58+
metricsGroup.removeMetric(CACHE_MISSES);
59+
metricsGroup.removeMetric(CACHE_INVALIDATIONS);
60+
metricsGroup.removeMetric(CACHE_EVICTIONS);
61+
metricsGroup.removeMetric(CACHE_SIZE);
62+
}
63+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Inkless
3+
* Copyright (C) 2024 - 2025 Aiven OY
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
17+
*/
18+
package io.aiven.inkless.cache;
19+
20+
import org.apache.kafka.common.TopicIdPartition;
21+
import org.apache.kafka.common.record.TimestampType;
22+
23+
import io.aiven.inkless.control_plane.BatchInfo;
24+
import io.aiven.inkless.control_plane.BatchMetadata;
25+
26+
/**
27+
* Represents the coordinates of a batch that is handled by the Batch Coordinate cache
28+
*/
29+
public record CacheBatchCoordinate(
30+
TopicIdPartition topicIdPartition,
31+
String objectKey,
32+
long byteOffset,
33+
long byteSize,
34+
long baseOffset,
35+
long lastOffset,
36+
TimestampType timestampType,
37+
long logAppendTimestamp,
38+
byte magic,
39+
long logStartOffset
40+
) {
41+
42+
public CacheBatchCoordinate {
43+
if (lastOffset < baseOffset) {
44+
throw new IllegalArgumentException(
45+
String.format(
46+
"lastOffset must be greater than or equal to baseOffset, but got lastOffset=%d and baseOffset=%d",
47+
lastOffset,
48+
baseOffset
49+
)
50+
);
51+
}
52+
if (byteSize <= 0) {
53+
throw new IllegalArgumentException(
54+
String.format("byteSize must be positive, but got %d", byteSize)
55+
);
56+
}
57+
if (byteOffset < 0) {
58+
throw new IllegalArgumentException(
59+
String.format("byteOffset must be non-negative, but got %d", byteOffset)
60+
);
61+
}
62+
}
63+
64+
public BatchInfo batchInfo(long batchId) {
65+
return new BatchInfo(
66+
batchId,
67+
objectKey,
68+
new BatchMetadata(
69+
magic,
70+
topicIdPartition,
71+
byteOffset,
72+
byteSize,
73+
baseOffset,
74+
lastOffset,
75+
logAppendTimestamp,
76+
-1,
77+
timestampType
78+
)
79+
);
80+
}
81+
82+
// To be used when batchId is not used by the caller
83+
public BatchInfo batchInfo() {
84+
return batchInfo(-1L);
85+
}
86+
87+
@Override
88+
public String toString() {
89+
return "BatchCoordinate[" + topicIdPartition.toString() + "-> " + objectKey + ", (" + baseOffset + ", " + lastOffset + ")]";
90+
}
91+
}

0 commit comments

Comments
 (0)