Skip to content

Commit b038ed4

Browse files
committed
Batch Coordinate cache PoC
1 parent 5875f03 commit b038ed4

38 files changed

+1755
-90
lines changed

core/src/main/scala/kafka/server/DelayedFetch.scala

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,13 @@ class DelayedFetch(
159159
}
160160

161161
// Case G
162-
if (accumulatedSize >= minBytes.getOrElse(params.minBytes))
162+
if (accumulatedSize >= minBytes.getOrElse(params.minBytes)) {
163+
DelayedFetchMetrics.enoughBytesMeter.mark()
163164
forceComplete()
164-
else
165+
} else {
166+
DelayedFetchMetrics.notEnoughBytesMeter.mark()
165167
false
168+
}
166169
}
167170

168171
/**
@@ -183,10 +186,12 @@ class DelayedFetch(
183186
if (requests.isEmpty) return Some(0)
184187

185188
val response = try {
186-
replicaManager.findDisklessBatches(requests, Int.MaxValue)
189+
DelayedFetchMetrics.findBatchesRequestsMeter.mark()
190+
replicaManager.findDisklessBatches(requests)
187191
} catch {
188192
case e: Throwable =>
189193
error("Error while trying to find diskless batches on delayed fetch.", e)
194+
DelayedFetchMetrics.findBatchesErrorsMeter.mark()
190195
return None // Case C
191196
}
192197

@@ -201,6 +206,7 @@ class DelayedFetch(
201206
val fetchPartitionStatus = fetchPartitionStatusMap.get(topicIdPartition)
202207
if (fetchPartitionStatus.isEmpty) {
203208
warn(s"Fetch partition status for $topicIdPartition not found in delayed fetch $this.")
209+
DelayedFetchMetrics.findBatchesErrorsMeter.mark()
204210
return None // Case C
205211
}
206212

@@ -211,13 +217,20 @@ class DelayedFetch(
211217
if (fetchOffset.messageOffset > endOffset) {
212218
// Truncation happened
213219
debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.")
220+
DelayedFetchMetrics.logTruncationsMeter.mark()
214221
return None // Case A
215222
} else if (fetchOffset.messageOffset < endOffset) {
216223
val bytesAvailable = r.estimatedByteSize(fetchOffset.messageOffset)
217224
accumulatedSize += bytesAvailable // Case B: accumulate the size of the batches
218-
} // Case D: same as fetchOffset == endOffset, no new data available
225+
} else {
226+
// Case D: same as fetchOffset == endOffset, no new data available
227+
DelayedFetchMetrics.noNewDataMeter.mark()
228+
}
219229
}
220-
case _ => return None // Case C
230+
case _ => {
231+
DelayedFetchMetrics.findBatchesErrorsMeter.mark()
232+
return None
233+
} // Case C
221234
}
222235
}
223236

@@ -297,5 +310,12 @@ object DelayedFetchMetrics {
297310
private val FetcherTypeKey = "fetcherType"
298311
val followerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava)
299312
val consumerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava)
313+
// diskless metrics
314+
val findBatchesRequestsMeter: Meter = metricsGroup.newMeter("FindBatchesRequestsPerSec", "requests", TimeUnit.SECONDS)
315+
val findBatchesErrorsMeter: Meter = metricsGroup.newMeter("FindBatchesErrorsPerSec", "requests", TimeUnit.SECONDS)
316+
val enoughBytesMeter: Meter = metricsGroup.newMeter("EnoughBytesPerSec", "requests", TimeUnit.SECONDS)
317+
val logTruncationsMeter: Meter = metricsGroup.newMeter("LogTruncationsPerSec", "requests", TimeUnit.SECONDS)
318+
val noNewDataMeter: Meter = metricsGroup.newMeter("NoNewDataPerSec", "requests", TimeUnit.SECONDS)
319+
val notEnoughBytesMeter: Meter = metricsGroup.newMeter("NotEnoughBytesPerSec", "requests", TimeUnit.SECONDS)
300320
}
301321

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package kafka.server
1919
import com.yammer.metrics.core.Meter
2020
import io.aiven.inkless.common.SharedState
2121
import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler}
22-
import io.aiven.inkless.control_plane.{FindBatchRequest, FindBatchResponse, MetadataView}
22+
import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse, MetadataView}
2323
import io.aiven.inkless.delete.{DeleteRecordsInterceptor, FileCleaner, RetentionEnforcer}
2424
import io.aiven.inkless.merge.FileMerger
2525
import io.aiven.inkless.produce.AppendHandler
@@ -1719,12 +1719,33 @@ class ReplicaManager(val config: KafkaConfig,
17191719
}
17201720
}
17211721

1722-
def findDisklessBatches(requests: Seq[FindBatchRequest], maxBytes: Int): Option[util.List[FindBatchResponse]] = {
1722+
private def findDisklessBatchesThroughControlPlane(requests: Seq[FindBatchRequest], maxBytes: Int = Int.MaxValue): Option[util.List[FindBatchResponse]] = {
17231723
inklessSharedState.map { sharedState =>
17241724
sharedState.controlPlane().findBatches(requests.asJava, maxBytes, sharedState.config().maxBatchesPerPartitionToFind())
17251725
}
17261726
}
17271727

1728+
def findDisklessBatches(requests: Seq[FindBatchRequest]): Option[util.List[FindBatchResponse]] = {
1729+
inklessSharedState.flatMap { sharedState =>
1730+
if (!sharedState.isBatchCoordinatesCacheEnabled) {
1731+
findDisklessBatchesThroughControlPlane(requests)
1732+
} else {
1733+
Some(requests.map { request =>
1734+
val logFragment = sharedState.batchCoordinateCache().get(request.topicIdPartition(), request.offset())
1735+
if (logFragment == null) {
1736+
FindBatchResponse.success(util.List.of(), -1, -1)
1737+
} else {
1738+
FindBatchResponse.success(
1739+
logFragment.batches().stream().map[BatchInfo](batchCoordinate => batchCoordinate.batchInfo()).toList,
1740+
logFragment.logStartOffset(),
1741+
logFragment.highWaterMark()
1742+
)
1743+
}
1744+
}.asJava)
1745+
}
1746+
}
1747+
}
1748+
17281749
def fetchDisklessMessages(params: FetchParams,
17291750
fetchInfos: Seq[(TopicIdPartition, PartitionData)]): CompletableFuture[Seq[(TopicIdPartition, FetchPartitionData)]] = {
17301751
inklessFetchHandler match {

core/src/test/java/kafka/server/InklessClusterTest.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.apache.kafka.common.record.TimestampType;
3838
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
3939
import org.apache.kafka.common.serialization.ByteArraySerializer;
40+
import org.apache.kafka.common.serialization.IntegerDeserializer;
41+
import org.apache.kafka.common.serialization.IntegerSerializer;
4042
import org.apache.kafka.common.test.KafkaClusterTestKit;
4143
import org.apache.kafka.common.test.TestKitNodes;
4244
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
@@ -55,6 +57,7 @@
5557
import org.testcontainers.junit.jupiter.Testcontainers;
5658

5759
import java.time.Duration;
60+
import java.util.ArrayList;
5861
import java.util.Collections;
5962
import java.util.HashMap;
6063
import java.util.List;
@@ -189,6 +192,81 @@ public void createDisklessTopic(final boolean idempotenceEnable, final Timestamp
189192
consumeWithSubscription(timestampType, clientConfigs, topicName, now, numRecords);
190193
}
191194

195+
@Test
196+
public void test() throws Exception {
197+
Map<String, Object> clientConfigs = new HashMap<>();
198+
clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
199+
clientConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, String.valueOf(true));
200+
clientConfigs.put(ProducerConfig.LINGER_MS_CONFIG, "1000");
201+
clientConfigs.put(ProducerConfig.BATCH_SIZE_CONFIG, "100000");
202+
clientConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
203+
clientConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
204+
clientConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
205+
clientConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
206+
// by default is latest and nothing would get consumed.
207+
clientConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AutoOffsetResetStrategy.EARLIEST.name());
208+
String topicName = "inkless-cache-topic";
209+
int numRecords = 50;
210+
211+
try (Admin admin = AdminClient.create(clientConfigs)) {
212+
final NewTopic topic = new NewTopic(topicName, 2, (short) 1)
213+
.configs(Map.of(
214+
TopicConfig.DISKLESS_ENABLE_CONFIG, "true",
215+
TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, TimestampType.LOG_APPEND_TIME.name
216+
));
217+
CreateTopicsResult topics = admin.createTopics(Collections.singletonList(topic));
218+
topics.all().get(10, TimeUnit.SECONDS);
219+
}
220+
221+
AtomicInteger recordsProduced = new AtomicInteger();
222+
final long now = System.currentTimeMillis();
223+
Callback produceCb = (metadata, exception) -> {
224+
if (exception != null) {
225+
log.error("Failed to send record", exception);
226+
} else {
227+
log.info("Committed value for topic {} at offset {} at {}", metadata.topic(), metadata.offset(), now);
228+
recordsProduced.incrementAndGet();
229+
}
230+
};
231+
var producedRecords1 = new ArrayList<Integer>(numRecords);
232+
var producedRecords2 = new ArrayList<Integer>(numRecords);
233+
try (Producer<byte[], Integer> producer = new KafkaProducer<>(clientConfigs)) {
234+
for (int i = 0; i < numRecords; i++) {
235+
producedRecords1.add(i);
236+
final ProducerRecord<byte[], Integer> record1 = new ProducerRecord<>(topicName, 0, now, null, i);
237+
producedRecords2.add(i + 10000);
238+
final ProducerRecord<byte[], Integer> record2 = new ProducerRecord<>(topicName, 1, now, null, i + 10000);
239+
producer.send(record1, produceCb);
240+
Thread.sleep(100);
241+
producer.send(record2, produceCb);
242+
Thread.sleep(100);
243+
}
244+
producer.flush();
245+
}
246+
247+
var consumedRecords1 = new ArrayList<Integer>();
248+
var consumedRecords2 = new ArrayList<Integer>();
249+
assertEquals(numRecords * 2, recordsProduced.get());
250+
try (Consumer<byte[], Integer> consumer = new KafkaConsumer<>(clientConfigs)) {
251+
consumer.assign(Collections.singletonList(new TopicPartition(topicName, 0)));
252+
ConsumerRecords<byte[], Integer> poll = consumer.poll(Duration.ofSeconds(30));
253+
for (ConsumerRecord<byte[], Integer> record : poll) {
254+
assertTrue(record.timestamp() > now);
255+
consumedRecords1.add(record.value());
256+
}
257+
}
258+
try (Consumer<byte[], Integer> consumer = new KafkaConsumer<>(clientConfigs)) {
259+
consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1)));
260+
ConsumerRecords<byte[], Integer> poll = consumer.poll(Duration.ofSeconds(30));
261+
for (ConsumerRecord<byte[], Integer> record : poll) {
262+
assertTrue(record.timestamp() > now);
263+
consumedRecords2.add(record.value());
264+
}
265+
}
266+
assertEquals(producedRecords1, consumedRecords1);
267+
assertEquals(producedRecords2, consumedRecords2);
268+
}
269+
192270
@Test
193271
public void produceToDisklessAndClassic() throws Exception {
194272
Map<String, Object> clientConfigs = new HashMap<>();

core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ class DelayedFetchTest {
346346
when(mockResponse.highWatermark()).thenReturn(endOffset) // endOffset < fetchOffset (truncation)
347347

348348
val future = Some(Collections.singletonList(mockResponse))
349-
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future)
349+
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]])).thenReturn(future)
350350

351351
// Mock fetchDisklessMessages for onComplete
352352
when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], any[Float])).thenAnswer(_.getArgument(0))
@@ -435,7 +435,7 @@ class DelayedFetchTest {
435435
when(mockResponse.estimatedByteSize(fetchOffset)).thenReturn(estimatedBatchSize)
436436

437437
val future = Some(Collections.singletonList(mockResponse))
438-
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future)
438+
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]])).thenReturn(future)
439439

440440
when(replicaManager.readFromLog(
441441
fetchParams,
@@ -520,7 +520,7 @@ class DelayedFetchTest {
520520
when(mockResponse.estimatedByteSize(fetchOffset)).thenReturn(estimatedBatchSize)
521521

522522
val future = Some(Collections.singletonList(mockResponse))
523-
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future)
523+
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]])).thenReturn(future)
524524

525525
when(replicaManager.readFromLog(
526526
fetchParams,
@@ -602,7 +602,7 @@ class DelayedFetchTest {
602602
when(mockResponse.estimatedByteSize(fetchOffset)).thenReturn(estimatedBatchSize)
603603

604604
val future = Some(Collections.singletonList(mockResponse))
605-
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future)
605+
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]])).thenReturn(future)
606606

607607
// Mock fetchDisklessMessages for onComplete
608608
when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0))
@@ -686,7 +686,7 @@ class DelayedFetchTest {
686686
when(mockResponse.highWatermark()).thenReturn(600L)
687687

688688
val future = Some(Collections.singletonList(mockResponse))
689-
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future)
689+
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]])).thenReturn(future)
690690

691691
// Mock fetchDisklessMessages for onComplete
692692
when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0))

docker/examples/docker-compose-files/inkless/docker-compose.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ services:
2727
# Inkless
2828
KAFKA_DISKLESS_STORAGE_SYSTEM_ENABLE: "true"
2929
KAFKA_LOG_DISKLESS_ENABLE: "true"
30+
KAFKA_INKLESS_CONSUME_BATCH_COORDINATES_CACHE_ENABLED: "false"
31+
KAFKA_INKLESS_CONSUME_BATCH_COORDINATES_CACHE_TTL_MS: "5000"
3032
KAFKA_INKLESS_CONSUME_CACHE_MAX_COUNT: "100"
33+
KAFKA_INKLESS_CONSUME_CACHE_EXPIRATION_MAX_IDLE_SEC: "30"
3134

3235
## Control Plane
3336
### Postgresql
@@ -44,7 +47,7 @@ services:
4447

4548
# JMX
4649
KAFKA_JMX_PORT: 9999
47-
KAFKA_HEAP_OPTS: "-Xmx2G -Xms2G"
50+
KAFKA_HEAP_OPTS: "-Xmx8G -Xms8G"
4851
EXTRA_ARGS: -javaagent:/opt/prometheus-jmx-exporter/jmx_prometheus_javaagent.jar=7070:/opt/prometheus-jmx-exporter/kafka.yml
4952
volumes:
5053
- ./../../../extra/prometheus-jmx-exporter:/opt/prometheus-jmx-exporter:Z

docs/inkless/configs.rst

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,21 @@ Under ``inkless.``
6363
* Valid Values: [0,...]
6464
* Importance: medium
6565

66+
``consume.batch.coordinates.cache.enabled``
67+
If true, batch coordinates cache is enabled.
68+
69+
* Type: boolean
70+
* Default: true
71+
* Importance: low
72+
73+
``consume.batch.coordinates.cache.ttl.ms``
74+
Time to live in milliseconds for an entry in the Batch Coordinates cache. The time to live must be <= than half of the value of of file.cleaner.interval.ms.
75+
76+
* Type: int
77+
* Default: 5000 (5 seconds)
78+
* Valid Values: [1,...]
79+
* Importance: low
80+
6681
``consume.cache.block.bytes``
6782
The number of bytes to fetch as a single block from object storage when serving fetch requests.
6883

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Inkless
3+
* Copyright (C) 2024 - 2025 Aiven OY
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
17+
*/
18+
package io.aiven.inkless.cache;
19+
20+
import org.apache.kafka.common.TopicIdPartition;
21+
22+
import java.io.Closeable;
23+
24+
public interface BatchCoordinateCache extends Closeable {
25+
26+
LogFragment get(TopicIdPartition topicIdPartition, long offset);
27+
28+
void put(CacheBatchCoordinate cacheBatchCoordinate) throws IllegalStateException;
29+
30+
int invalidatePartition(TopicIdPartition topicIdPartition);
31+
32+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package io.aiven.inkless.cache;
2+
3+
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
4+
5+
import java.io.Closeable;
6+
import java.util.concurrent.atomic.LongAdder;
7+
8+
9+
public final class BatchCoordinateCacheMetrics implements Closeable {
10+
static final String CACHE_HITS = "CacheHits";
11+
static final String CACHE_MISSES = "CacheMisses";
12+
static final String CACHE_INVALIDATIONS = "CacheInvalidations";
13+
static final String CACHE_EVICTIONS = "CacheEvictions";
14+
static final String CACHE_SIZE = "CacheSize";
15+
16+
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(BatchCoordinateCache.class);
17+
private final LongAdder cacheHits = new LongAdder();
18+
private final LongAdder cacheMisses = new LongAdder();
19+
private final LongAdder cacheInvalidations = new LongAdder();
20+
private final LongAdder cacheEvictions = new LongAdder();
21+
private final LongAdder cacheSize = new LongAdder();
22+
23+
public BatchCoordinateCacheMetrics() {
24+
metricsGroup.newGauge(CACHE_HITS, cacheHits::intValue);
25+
metricsGroup.newGauge(CACHE_MISSES, cacheMisses::intValue);
26+
metricsGroup.newGauge(CACHE_INVALIDATIONS, cacheInvalidations::intValue);
27+
metricsGroup.newGauge(CACHE_EVICTIONS, cacheEvictions::intValue);
28+
metricsGroup.newGauge(CACHE_SIZE, cacheSize::intValue);
29+
}
30+
31+
public void recordCacheHit() {
32+
cacheHits.increment();
33+
}
34+
35+
public void recordCacheMiss() {
36+
cacheMisses.increment();
37+
}
38+
39+
public void recordCacheInvalidation() {
40+
cacheInvalidations.increment();
41+
}
42+
43+
public void recordCacheEviction() {
44+
cacheEvictions.increment();
45+
}
46+
47+
public void incrementCacheSize() {
48+
cacheSize.increment();
49+
}
50+
51+
public void decreaseCacheSize(int removedEntries) {
52+
cacheSize.add(-removedEntries);
53+
}
54+
55+
@Override
56+
public void close() {
57+
metricsGroup.removeMetric(CACHE_HITS);
58+
metricsGroup.removeMetric(CACHE_MISSES);
59+
}
60+
}

0 commit comments

Comments
 (0)