Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class DelayedFetch(
if (requests.isEmpty) return Some(0)

val response = try {
replicaManager.findDisklessBatches(requests, Int.MaxValue)
replicaManager.findDisklessBatches(requests)
} catch {
case e: Throwable =>
error("Error while trying to find diskless batches on delayed fetch.", e)
Expand Down
25 changes: 23 additions & 2 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.server
import com.yammer.metrics.core.Meter
import io.aiven.inkless.common.SharedState
import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler}
import io.aiven.inkless.control_plane.{FindBatchRequest, FindBatchResponse, MetadataView}
import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse, MetadataView}
import io.aiven.inkless.delete.{DeleteRecordsInterceptor, FileCleaner, RetentionEnforcer}
import io.aiven.inkless.merge.FileMerger
import io.aiven.inkless.produce.AppendHandler
Expand Down Expand Up @@ -1719,12 +1719,33 @@ class ReplicaManager(val config: KafkaConfig,
}
}

def findDisklessBatches(requests: Seq[FindBatchRequest], maxBytes: Int): Option[util.List[FindBatchResponse]] = {
private def findDisklessBatchesThroughControlPlane(requests: Seq[FindBatchRequest], maxBytes: Int = Int.MaxValue): Option[util.List[FindBatchResponse]] = {
inklessSharedState.map { sharedState =>
sharedState.controlPlane().findBatches(requests.asJava, maxBytes, sharedState.config().maxBatchesPerPartitionToFind())
}
}

def findDisklessBatches(requests: Seq[FindBatchRequest]): Option[util.List[FindBatchResponse]] = {
inklessSharedState.flatMap { sharedState =>
if (!sharedState.isBatchCoordinateCacheEnabled) {
findDisklessBatchesThroughControlPlane(requests)
} else {
Some(requests.map { request =>
val logFragment = sharedState.batchCoordinateCache().get(request.topicIdPartition(), request.offset())
if (logFragment == null) {
FindBatchResponse.success(util.List.of(), -1, -1)
} else {
Comment on lines +1735 to +1737
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't go to the control plane in case of misses, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, requests to the Control Plane are done only when the DelayedFetch completes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some other broker can append lots of batches without the cache in the local broker being aware of this. It becomes aware only when the local broker produces something to this partition and has to invalidate the cache due to offset mismatch. -- Do I understand this correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes exactly. This means that in theory you can have stale entries up to the TTL (e.g. the last 5 seconds of the log are not visible to a broker because no append was done through it).
This is fine as a best effort for checking if the DelayedFetch can be completed, but when the fetch is actually done the "real" batches will be retrieved from the control plane.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense. I need to stop seeing this cache as very consistent with PG and long-living :)

FindBatchResponse.success(
logFragment.batches().stream().map[BatchInfo](batchCoordinate => batchCoordinate.batchInfo(request.topicIdPartition())).toList,
logFragment.logStartOffset(),
logFragment.highWaterMark()
)
}
}.asJava)
}
}
}

def fetchDisklessMessages(params: FetchParams,
fetchInfos: Seq[(TopicIdPartition, PartitionData)]): CompletableFuture[Seq[(TopicIdPartition, FetchPartitionData)]] = {
inklessFetchHandler match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ class DelayedFetchTest {
when(mockResponse.highWatermark()).thenReturn(endOffset) // endOffset < fetchOffset (truncation)

val future = Some(Collections.singletonList(mockResponse))
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future)
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]])).thenReturn(future)

// Mock fetchDisklessMessages for onComplete
when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], any[Float])).thenAnswer(_.getArgument(0))
Expand Down Expand Up @@ -435,7 +435,7 @@ class DelayedFetchTest {
when(mockResponse.estimatedByteSize(fetchOffset)).thenReturn(estimatedBatchSize)

val future = Some(Collections.singletonList(mockResponse))
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future)
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]])).thenReturn(future)

when(replicaManager.readFromLog(
fetchParams,
Expand Down Expand Up @@ -520,7 +520,7 @@ class DelayedFetchTest {
when(mockResponse.estimatedByteSize(fetchOffset)).thenReturn(estimatedBatchSize)

val future = Some(Collections.singletonList(mockResponse))
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future)
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]])).thenReturn(future)

when(replicaManager.readFromLog(
fetchParams,
Expand Down Expand Up @@ -602,7 +602,7 @@ class DelayedFetchTest {
when(mockResponse.estimatedByteSize(fetchOffset)).thenReturn(estimatedBatchSize)

val future = Some(Collections.singletonList(mockResponse))
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future)
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]])).thenReturn(future)

// Mock fetchDisklessMessages for onComplete
when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0))
Expand Down Expand Up @@ -686,7 +686,7 @@ class DelayedFetchTest {
when(mockResponse.highWatermark()).thenReturn(600L)

val future = Some(Collections.singletonList(mockResponse))
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future)
when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]])).thenReturn(future)

// Mock fetchDisklessMessages for onComplete
when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0))
Expand Down
15 changes: 15 additions & 0 deletions docs/inkless/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,21 @@ Under ``inkless.``
* Valid Values: [0,...]
* Importance: medium

``consume.batch.coordinate.cache.enabled``
If true, the Batch Coordinate cache is enabled.

* Type: boolean
* Default: true
* Importance: low

``consume.batch.coordinate.cache.ttl.ms``
Time to live in milliseconds for an entry in the Batch Coordinate cache. The time to live must be <= than half of the value of of file.cleaner.interval.ms.

* Type: int
* Default: 5000 (5 seconds)
* Valid Values: [1,...]
* Importance: low

``consume.cache.block.bytes``
The number of bytes to fetch as a single block from object storage when serving fetch requests.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Inkless
* Copyright (C) 2024 - 2025 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.cache;

import org.apache.kafka.common.TopicIdPartition;

import java.io.Closeable;

public interface BatchCoordinateCache extends Closeable {

LogFragment get(TopicIdPartition topicIdPartition, long offset);

void put(TopicIdPartition topicIdPartition, CacheBatchCoordinate cacheBatchCoordinate);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.aiven.inkless.cache;

import org.apache.kafka.server.metrics.KafkaMetricsGroup;

import java.io.Closeable;
import java.util.concurrent.atomic.LongAdder;


public final class BatchCoordinateCacheMetrics implements Closeable {
static final String CACHE_HITS = "CacheHits";
static final String CACHE_HITS_WITHOUT_DATA = "CacheHitsWithoutData";
static final String CACHE_MISSES = "CacheMisses";
static final String CACHE_INVALIDATIONS = "CacheInvalidations";
static final String CACHE_EVICTIONS = "CacheEvictions";
static final String CACHE_SIZE = "CacheSize";

private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(BatchCoordinateCache.class);
private final LongAdder cacheHits = new LongAdder();
private final LongAdder cacheHitsWithoutData = new LongAdder();
private final LongAdder cacheMisses = new LongAdder();
private final LongAdder cacheInvalidations = new LongAdder();
private final LongAdder cacheEvictions = new LongAdder();
private final LongAdder cacheSize = new LongAdder();

public BatchCoordinateCacheMetrics() {
metricsGroup.newGauge(CACHE_HITS, cacheHits::intValue);
metricsGroup.newGauge(CACHE_HITS_WITHOUT_DATA, cacheHitsWithoutData::intValue);
metricsGroup.newGauge(CACHE_MISSES, cacheMisses::intValue);
metricsGroup.newGauge(CACHE_INVALIDATIONS, cacheInvalidations::intValue);
metricsGroup.newGauge(CACHE_EVICTIONS, cacheEvictions::intValue);
metricsGroup.newGauge(CACHE_SIZE, cacheSize::intValue);
}

public void recordCacheHit() {
cacheHits.increment();
}

public void recordCacheHitWithoutData() {
cacheHitsWithoutData.increment();
}


public void recordCacheMiss() {
cacheMisses.increment();
}

public void recordCacheInvalidation() {
cacheInvalidations.increment();
}

public void recordCacheEviction() {
cacheEvictions.increment();
}

public void incrementCacheSize() {
cacheSize.increment();
}

public void decreaseCacheSize(int removedEntries) {
cacheSize.add(-removedEntries);
}

@Override
public void close() {
metricsGroup.removeMetric(CACHE_HITS);
metricsGroup.removeMetric(CACHE_HITS_WITHOUT_DATA);
metricsGroup.removeMetric(CACHE_MISSES);
metricsGroup.removeMetric(CACHE_INVALIDATIONS);
metricsGroup.removeMetric(CACHE_EVICTIONS);
metricsGroup.removeMetric(CACHE_SIZE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Inkless
* Copyright (C) 2024 - 2025 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.cache;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.record.TimestampType;

import io.aiven.inkless.control_plane.BatchInfo;
import io.aiven.inkless.control_plane.BatchMetadata;

/**
* Represents the coordinates of a batch that is handled by the Batch Coordinate cache
*/
public record CacheBatchCoordinate(
String objectKey,
long byteOffset,
long byteSize,
long baseOffset,
long lastOffset,
TimestampType timestampType,
long logAppendTimestamp,
byte magic,
long logStartOffset
) {

public CacheBatchCoordinate {
if (lastOffset < baseOffset) {
throw new IllegalArgumentException(
String.format(
"lastOffset must be greater than or equal to baseOffset, but got lastOffset=%d and baseOffset=%d",
lastOffset,
baseOffset
)
);
}
if (byteSize <= 0) {
throw new IllegalArgumentException(
String.format("byteSize must be positive, but got %d", byteSize)
);
}
if (byteOffset < 0) {
throw new IllegalArgumentException(
String.format("byteOffset must be non-negative, but got %d", byteOffset)
);
}
}

public BatchInfo batchInfo(TopicIdPartition topicIdPartition, long batchId) {
return new BatchInfo(
batchId,
objectKey,
new BatchMetadata(
magic,
topicIdPartition,
byteOffset,
byteSize,
baseOffset,
lastOffset,
logAppendTimestamp,
-1,
timestampType
)
);
}

// To be used when batchId is not used by the caller
public BatchInfo batchInfo(TopicIdPartition topicIdPartition) {
return batchInfo(topicIdPartition, -1L);
}

@Override
public String toString() {
return "BatchCoordinate[" + objectKey + " -> (" + baseOffset + ", " + lastOffset + ")]";
}
}
Loading