Skip to content

Commit 059af4e

Browse files
committed
cleanup
1 parent cf44291 commit 059af4e

File tree

14 files changed

+265
-259
lines changed

14 files changed

+265
-259
lines changed

core/src/main/scala/kafka/server/DelayedFetch.scala

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -159,13 +159,10 @@ class DelayedFetch(
159159
}
160160

161161
// Case G
162-
if (accumulatedSize >= minBytes.getOrElse(params.minBytes)) {
163-
DelayedFetchMetrics.enoughBytesMeter.mark()
162+
if (accumulatedSize >= minBytes.getOrElse(params.minBytes))
164163
forceComplete()
165-
} else {
166-
DelayedFetchMetrics.notEnoughBytesMeter.mark()
164+
else
167165
false
168-
}
169166
}
170167

171168
/**
@@ -186,12 +183,10 @@ class DelayedFetch(
186183
if (requests.isEmpty) return Some(0)
187184

188185
val response = try {
189-
DelayedFetchMetrics.findBatchesRequestsMeter.mark()
190186
replicaManager.findDisklessBatches(requests)
191187
} catch {
192188
case e: Throwable =>
193189
error("Error while trying to find diskless batches on delayed fetch.", e)
194-
DelayedFetchMetrics.findBatchesErrorsMeter.mark()
195190
return None // Case C
196191
}
197192

@@ -206,7 +201,6 @@ class DelayedFetch(
206201
val fetchPartitionStatus = fetchPartitionStatusMap.get(topicIdPartition)
207202
if (fetchPartitionStatus.isEmpty) {
208203
warn(s"Fetch partition status for $topicIdPartition not found in delayed fetch $this.")
209-
DelayedFetchMetrics.findBatchesErrorsMeter.mark()
210204
return None // Case C
211205
}
212206

@@ -217,20 +211,13 @@ class DelayedFetch(
217211
if (fetchOffset.messageOffset > endOffset) {
218212
// Truncation happened
219213
debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.")
220-
DelayedFetchMetrics.logTruncationsMeter.mark()
221214
return None // Case A
222215
} else if (fetchOffset.messageOffset < endOffset) {
223216
val bytesAvailable = r.estimatedByteSize(fetchOffset.messageOffset)
224217
accumulatedSize += bytesAvailable // Case B: accumulate the size of the batches
225-
} else {
226-
// Case D: same as fetchOffset == endOffset, no new data available
227-
DelayedFetchMetrics.noNewDataMeter.mark()
228-
}
218+
} // Case D: same as fetchOffset == endOffset, no new data available
229219
}
230-
case _ => {
231-
DelayedFetchMetrics.findBatchesErrorsMeter.mark()
232-
return None
233-
} // Case C
220+
case _ => return None // Case C
234221
}
235222
}
236223

@@ -310,12 +297,5 @@ object DelayedFetchMetrics {
310297
private val FetcherTypeKey = "fetcherType"
311298
val followerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava)
312299
val consumerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava)
313-
// diskless metrics
314-
val findBatchesRequestsMeter: Meter = metricsGroup.newMeter("FindBatchesRequestsPerSec", "requests", TimeUnit.SECONDS)
315-
val findBatchesErrorsMeter: Meter = metricsGroup.newMeter("FindBatchesErrorsPerSec", "requests", TimeUnit.SECONDS)
316-
val enoughBytesMeter: Meter = metricsGroup.newMeter("EnoughBytesPerSec", "requests", TimeUnit.SECONDS)
317-
val logTruncationsMeter: Meter = metricsGroup.newMeter("LogTruncationsPerSec", "requests", TimeUnit.SECONDS)
318-
val noNewDataMeter: Meter = metricsGroup.newMeter("NoNewDataPerSec", "requests", TimeUnit.SECONDS)
319-
val notEnoughBytesMeter: Meter = metricsGroup.newMeter("NotEnoughBytesPerSec", "requests", TimeUnit.SECONDS)
320300
}
321301

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1727,7 +1727,7 @@ class ReplicaManager(val config: KafkaConfig,
17271727

17281728
def findDisklessBatches(requests: Seq[FindBatchRequest]): Option[util.List[FindBatchResponse]] = {
17291729
inklessSharedState.flatMap { sharedState =>
1730-
if (!sharedState.isBatchCoordinatesCacheEnabled) {
1730+
if (!sharedState.isBatchCoordinateCacheEnabled) {
17311731
findDisklessBatchesThroughControlPlane(requests)
17321732
} else {
17331733
Some(requests.map { request =>

core/src/test/java/kafka/server/InklessClusterTest.java

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737
import org.apache.kafka.common.record.TimestampType;
3838
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
3939
import org.apache.kafka.common.serialization.ByteArraySerializer;
40-
import org.apache.kafka.common.serialization.IntegerDeserializer;
41-
import org.apache.kafka.common.serialization.IntegerSerializer;
4240
import org.apache.kafka.common.test.KafkaClusterTestKit;
4341
import org.apache.kafka.common.test.TestKitNodes;
4442
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
@@ -57,7 +55,6 @@
5755
import org.testcontainers.junit.jupiter.Testcontainers;
5856

5957
import java.time.Duration;
60-
import java.util.ArrayList;
6158
import java.util.Collections;
6259
import java.util.HashMap;
6360
import java.util.List;
@@ -192,81 +189,6 @@ public void createDisklessTopic(final boolean idempotenceEnable, final Timestamp
192189
consumeWithSubscription(timestampType, clientConfigs, topicName, now, numRecords);
193190
}
194191

195-
@Test
196-
public void test() throws Exception {
197-
Map<String, Object> clientConfigs = new HashMap<>();
198-
clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
199-
clientConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, String.valueOf(true));
200-
clientConfigs.put(ProducerConfig.LINGER_MS_CONFIG, "1000");
201-
clientConfigs.put(ProducerConfig.BATCH_SIZE_CONFIG, "100000");
202-
clientConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
203-
clientConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
204-
clientConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
205-
clientConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
206-
// by default is latest and nothing would get consumed.
207-
clientConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AutoOffsetResetStrategy.EARLIEST.name());
208-
String topicName = "inkless-cache-topic";
209-
int numRecords = 50;
210-
211-
try (Admin admin = AdminClient.create(clientConfigs)) {
212-
final NewTopic topic = new NewTopic(topicName, 2, (short) 1)
213-
.configs(Map.of(
214-
TopicConfig.DISKLESS_ENABLE_CONFIG, "true",
215-
TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, TimestampType.LOG_APPEND_TIME.name
216-
));
217-
CreateTopicsResult topics = admin.createTopics(Collections.singletonList(topic));
218-
topics.all().get(10, TimeUnit.SECONDS);
219-
}
220-
221-
AtomicInteger recordsProduced = new AtomicInteger();
222-
final long now = System.currentTimeMillis();
223-
Callback produceCb = (metadata, exception) -> {
224-
if (exception != null) {
225-
log.error("Failed to send record", exception);
226-
} else {
227-
log.info("Committed value for topic {} at offset {} at {}", metadata.topic(), metadata.offset(), now);
228-
recordsProduced.incrementAndGet();
229-
}
230-
};
231-
var producedRecords1 = new ArrayList<Integer>(numRecords);
232-
var producedRecords2 = new ArrayList<Integer>(numRecords);
233-
try (Producer<byte[], Integer> producer = new KafkaProducer<>(clientConfigs)) {
234-
for (int i = 0; i < numRecords; i++) {
235-
producedRecords1.add(i);
236-
final ProducerRecord<byte[], Integer> record1 = new ProducerRecord<>(topicName, 0, now, null, i);
237-
producedRecords2.add(i + 10000);
238-
final ProducerRecord<byte[], Integer> record2 = new ProducerRecord<>(topicName, 1, now, null, i + 10000);
239-
producer.send(record1, produceCb);
240-
Thread.sleep(100);
241-
producer.send(record2, produceCb);
242-
Thread.sleep(100);
243-
}
244-
producer.flush();
245-
}
246-
247-
var consumedRecords1 = new ArrayList<Integer>();
248-
var consumedRecords2 = new ArrayList<Integer>();
249-
assertEquals(numRecords * 2, recordsProduced.get());
250-
try (Consumer<byte[], Integer> consumer = new KafkaConsumer<>(clientConfigs)) {
251-
consumer.assign(Collections.singletonList(new TopicPartition(topicName, 0)));
252-
ConsumerRecords<byte[], Integer> poll = consumer.poll(Duration.ofSeconds(30));
253-
for (ConsumerRecord<byte[], Integer> record : poll) {
254-
assertTrue(record.timestamp() > now);
255-
consumedRecords1.add(record.value());
256-
}
257-
}
258-
try (Consumer<byte[], Integer> consumer = new KafkaConsumer<>(clientConfigs)) {
259-
consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1)));
260-
ConsumerRecords<byte[], Integer> poll = consumer.poll(Duration.ofSeconds(30));
261-
for (ConsumerRecord<byte[], Integer> record : poll) {
262-
assertTrue(record.timestamp() > now);
263-
consumedRecords2.add(record.value());
264-
}
265-
}
266-
assertEquals(producedRecords1, consumedRecords1);
267-
assertEquals(producedRecords2, consumedRecords2);
268-
}
269-
270192
@Test
271193
public void produceToDisklessAndClassic() throws Exception {
272194
Map<String, Object> clientConfigs = new HashMap<>();

docker/examples/docker-compose-files/inkless/docker-compose.yml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,7 @@ services:
2727
# Inkless
2828
KAFKA_DISKLESS_STORAGE_SYSTEM_ENABLE: "true"
2929
KAFKA_LOG_DISKLESS_ENABLE: "true"
30-
KAFKA_INKLESS_CONSUME_BATCH_COORDINATES_CACHE_ENABLED: "false"
31-
KAFKA_INKLESS_CONSUME_BATCH_COORDINATES_CACHE_TTL_MS: "5000"
3230
KAFKA_INKLESS_CONSUME_CACHE_MAX_COUNT: "100"
33-
KAFKA_INKLESS_CONSUME_CACHE_EXPIRATION_MAX_IDLE_SEC: "30"
3431

3532
## Control Plane
3633
### Postgresql
@@ -47,7 +44,7 @@ services:
4744

4845
# JMX
4946
KAFKA_JMX_PORT: 9999
50-
KAFKA_HEAP_OPTS: "-Xmx8G -Xms8G"
47+
KAFKA_HEAP_OPTS: "-Xmx2G -Xms2G"
5148
EXTRA_ARGS: -javaagent:/opt/prometheus-jmx-exporter/jmx_prometheus_javaagent.jar=7070:/opt/prometheus-jmx-exporter/kafka.yml
5249
volumes:
5350
- ./../../../extra/prometheus-jmx-exporter:/opt/prometheus-jmx-exporter:Z

docs/inkless/configs.rst

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,15 @@ Under ``inkless.``
6363
* Valid Values: [0,...]
6464
* Importance: medium
6565

66-
``consume.batch.coordinates.cache.enabled``
67-
If true, batch coordinates cache is enabled.
66+
``consume.batch.coordinate.cache.enabled``
67+
If true, the Batch Coordinate cache is enabled.
6868

6969
* Type: boolean
7070
* Default: true
7171
* Importance: low
7272

73-
``consume.batch.coordinates.cache.ttl.ms``
74-
Time to live in milliseconds for an entry in the Batch Coordinates cache. The time to live must be <= than half of the value of of file.cleaner.interval.ms.
73+
``consume.batch.coordinate.cache.ttl.ms``
74+
Time to live in milliseconds for an entry in the Batch Coordinate cache. The time to live must be <= than half of the value of of file.cleaner.interval.ms.
7575

7676
* Type: int
7777
* Default: 5000 (5 seconds)

storage/inkless/src/main/java/io/aiven/inkless/cache/CacheBatchCoordinate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import io.aiven.inkless.control_plane.BatchMetadata;
2525

2626
/**
27-
* Represents the coordinates of batch that is handled by the Batch Coordinate cache
27+
* Represents the coordinates of a batch that is handled by the Batch Coordinate cache
2828
*/
2929
public record CacheBatchCoordinate(
3030
TopicIdPartition topicIdPartition,

storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineBatchCoordinateCache.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@
1313
import java.time.Duration;
1414
import java.util.concurrent.atomic.AtomicInteger;
1515

16+
/**
17+
* A thread-safe cache that manages {@link LogFragment} instances for
18+
* multiple Kafka diskless topic partitions.
19+
* It uses a time-to-live (TTL) parameter as a cleanup policy.
20+
* This cache automatically handles stale and expired entries.
21+
*/
1622
public class CaffeineBatchCoordinateCache implements BatchCoordinateCache {
1723

1824
static class UncheckedStaleCacheEntryException extends RuntimeException {
@@ -57,6 +63,25 @@ public CaffeineBatchCoordinateCache(Duration ttl) {
5763
}
5864

5965

66+
/**
67+
* Retrieves a view of the log's tail for a specific partition, starting from
68+
* the requested offset.
69+
*
70+
* <p>This operation will result in a <b>cache miss</b> (returning {@code null})
71+
* in several scenarios:
72+
* <ul>
73+
* <li>If the entire partition is not currently tracked by the cache.</li>
74+
* <li>If the requested {@code offset} is older than the data held in the
75+
* fragment (i.e., it has already been evicted).</li>
76+
* <li>If the data at that offset is found but has expired at the moment of
77+
* the request.</li>
78+
* </ul></p>
79+
*
80+
* @param topicIdPartition The specific topic partition to read from.
81+
* @param offset The desired starting offset for the read.
82+
* @return A new {@code LogFragment} view containing the requested data, or
83+
* {@code null} if the data is not available (a cache miss).
84+
*/
6085
@Override
6186
public LogFragment get(TopicIdPartition topicIdPartition, long offset) {
6287
final LogFragment logFragment = cache.getIfPresent(topicIdPartition);
@@ -65,7 +90,14 @@ public LogFragment get(TopicIdPartition topicIdPartition, long offset) {
6590
return null;
6691
}
6792
metrics.recordCacheHit();
68-
return logFragment.subFragment(offset);
93+
final LogFragment subFragment = logFragment.subFragment(offset);
94+
if (subFragment == null || subFragment.isEmpty()) {
95+
metrics.recordCacheMiss();
96+
} else {
97+
metrics.recordCacheHit();
98+
}
99+
100+
return subFragment;
69101
}
70102

71103
private void putInternal(CacheBatchCoordinate value) throws StaleCacheEntryException {
@@ -103,6 +135,24 @@ private void evictExpiredEntries(LogFragment logFragment) {
103135
}
104136
}
105137

138+
/**
139+
* Appends a new batch coordinate to the cache, adding it to the correct log
140+
* fragment for its partition.
141+
*
142+
* <p>This method handles stale cache entries: when a batch is added,
143+
* it is passed to the appropriate {@link LogFragment}.
144+
* If that fragment detects that the batch would create an integrity problem
145+
* (such as a data gap, an overlap, or if the batch indicates the log start offset was
146+
* increased), the entire cache entry for the requested TopicPartition will be invalidated
147+
* and the batch insertion will be retried, creating a new log fragment.</p>
148+
*
149+
* <p>After a successful batch addition, this method also opportunistically
150+
* triggers an eviction check on the fragment to purge any expired batches.</p>
151+
*
152+
* @param cacheBatchCoordinate The batch coordinate to add.
153+
* @throws IllegalStateException If the new batch is rejected for being invalid (lower log start offset
154+
* than the one present in the cache, or lower base offset than the high watermark present in the cache)
155+
*/
106156
@Override
107157
public void put(CacheBatchCoordinate cacheBatchCoordinate) throws IllegalStateException {
108158
TopicIdPartition topicIdPartition = cacheBatchCoordinate.topicIdPartition();
@@ -126,7 +176,9 @@ public void put(CacheBatchCoordinate cacheBatchCoordinate) throws IllegalStateEx
126176
}
127177

128178
/**
129-
* Removes the entry from the cache for a topic partition
179+
* Removes the entry from the cache for a topic partition.
180+
* @param topicIdPartition The specific topic partition to remove from the cache.
181+
* @return the number of batches removed from the cache.
130182
*/
131183
@Override
132184
public int invalidatePartition(TopicIdPartition topicIdPartition) {

0 commit comments

Comments
 (0)