diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index e4415b5e5..663bc3d99 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -23,6 +23,7 @@
+
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java
index 4b0d52c5a..c3c8b7926 100644
--- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java
@@ -52,8 +52,11 @@
import io.aiven.kafka.tieredstorage.fetch.ChunkManagerFactory;
import io.aiven.kafka.tieredstorage.fetch.FetchChunkEnumeration;
import io.aiven.kafka.tieredstorage.fetch.KeyNotFoundRuntimeException;
+import io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCache;
+import io.aiven.kafka.tieredstorage.fetch.index.SegmentIndexesCache;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
+import io.aiven.kafka.tieredstorage.manifest.SegmentIndex;
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1;
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1Builder;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
@@ -122,6 +125,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.
private Set customMetadataFields;
private SegmentManifestProvider segmentManifestProvider;
+ private SegmentIndexesCache segmentIndexesCache;
public RemoteStorageManager() {
this(Time.SYSTEM);
@@ -168,6 +172,9 @@ public void configure(final Map configs) {
mapper,
executor);
+ segmentIndexesCache = new MemorySegmentIndexesCache();
+ segmentIndexesCache.configure(config.fetchIndexesCacheConfigs());
+
customMetadataSerde = new SegmentCustomMetadataSerde();
customMetadataFields = config.customMetadataKeysIncluded();
}
@@ -513,19 +520,11 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet
if (segmentIndex == null) {
throw new RemoteResourceNotFoundException("Index " + indexType + " not found on " + key);
}
- final var in = fetcher.fetch(key, segmentIndex.range());
-
- DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(in);
- final Optional encryptionMetadata = segmentManifest.encryption();
- if (encryptionMetadata.isPresent()) {
- detransformEnum = new DecryptionChunkEnumeration(
- detransformEnum,
- encryptionMetadata.get().ivSize(),
- encryptedChunk -> aesEncryptionProvider.decryptionCipher(encryptedChunk, encryptionMetadata.get())
- );
- }
- final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum);
- return detransformFinisher.toInputStream();
+ return segmentIndexesCache.get(
+ key,
+ indexType,
+ () -> fetchIndexBytes(key, segmentIndex, segmentManifest)
+ );
} catch (final RemoteResourceNotFoundException e) {
throw e;
} catch (final KeyNotFoundException e) {
@@ -535,6 +534,36 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet
}
}
+ private byte[] fetchIndexBytes(
+ final ObjectKey key,
+ final SegmentIndex segmentIndex,
+ final SegmentManifest segmentManifest
+ ) {
+ final InputStream in;
+ try {
+ in = fetcher.fetch(key, segmentIndex.range());
+ } catch (final StorageBackendException e) {
+ throw new RuntimeException("Error fetching index from remote storage", e);
+ }
+
+ DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(in);
+ final Optional encryptionMetadata = segmentManifest.encryption();
+ if (encryptionMetadata.isPresent()) {
+ detransformEnum = new DecryptionChunkEnumeration(
+ detransformEnum,
+ encryptionMetadata.get().ivSize(),
+ encryptedChunk -> aesEncryptionProvider.decryptionCipher(encryptedChunk,
+ encryptionMetadata.get())
+ );
+ }
+ final var detransformFinisher = new DetransformFinisher(detransformEnum);
+ try (final var is = detransformFinisher.toInputStream()) {
+ return is.readAllBytes();
+ } catch (final IOException e) {
+ throw new RuntimeException("Error reading de-transformed index bytes", e);
+ }
+ }
+
private ObjectKey objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final ObjectKeyFactory.Suffix suffix) {
final ObjectKey segmentKey;
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/config/CacheConfig.java b/core/src/main/java/io/aiven/kafka/tieredstorage/config/CacheConfig.java
new file mode 100644
index 000000000..d082474b3
--- /dev/null
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/config/CacheConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.config;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public class CacheConfig extends AbstractConfig {
+ private static final String CACHE_SIZE_CONFIG = "size";
+ private static final String CACHE_SIZE_DOC = "Cache size in bytes, where \"-1\" represents unbounded cache";
+ private static final String CACHE_RETENTION_CONFIG = "retention.ms";
+ private static final String CACHE_RETENTION_DOC = "Cache retention time ms, "
+ + "where \"-1\" represents infinite retention";
+ private static final long DEFAULT_CACHE_RETENTION_MS = 600_000;
+
+ private static ConfigDef addCacheConfigs(final OptionalLong maybeDefaultSize) {
+ final ConfigDef configDef = new ConfigDef();
+ Object defaultValue = NO_DEFAULT_VALUE;
+ if (maybeDefaultSize.isPresent()) {
+ defaultValue = maybeDefaultSize.getAsLong();
+ }
+ configDef.define(
+ CACHE_SIZE_CONFIG,
+ ConfigDef.Type.LONG,
+ defaultValue,
+ ConfigDef.Range.between(-1L, Long.MAX_VALUE),
+ ConfigDef.Importance.MEDIUM,
+ CACHE_SIZE_DOC
+ );
+ configDef.define(
+ CACHE_RETENTION_CONFIG,
+ ConfigDef.Type.LONG,
+ DEFAULT_CACHE_RETENTION_MS,
+ ConfigDef.Range.between(-1L, Long.MAX_VALUE),
+ ConfigDef.Importance.MEDIUM,
+ CACHE_RETENTION_DOC
+ );
+ return configDef;
+ }
+
+ public CacheConfig(final Map props) {
+ super(addCacheConfigs(OptionalLong.empty()), props);
+ }
+
+ public CacheConfig(final Map props, final long defaultSize) {
+ super(addCacheConfigs(OptionalLong.of(defaultSize)), props);
+ }
+
+ public Optional cacheSize() {
+ final Long rawValue = getLong(CACHE_SIZE_CONFIG);
+ if (rawValue == -1) {
+ return Optional.empty();
+ }
+ return Optional.of(rawValue);
+ }
+
+ public Optional cacheRetention() {
+ final Long rawValue = getLong(CACHE_RETENTION_CONFIG);
+ if (rawValue == -1) {
+ return Optional.empty();
+ }
+ return Optional.of(Duration.ofMillis(rawValue));
+ }
+}
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java
index cf295ba72..f8eda53ca 100644
--- a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java
@@ -41,6 +41,7 @@
public class RemoteStorageManagerConfig extends AbstractConfig {
private static final String STORAGE_PREFIX = "storage.";
+ private static final String FETCH_INDEXES_CACHE_PREFIX = "fetch.indexes.cache.";
private static final String STORAGE_BACKEND_CLASS_CONFIG = STORAGE_PREFIX + "backend.class";
private static final String STORAGE_BACKEND_CLASS_DOC = "The storage backend implementation class";
@@ -393,4 +394,8 @@ public Set customMetadataKeysIncluded() {
.map(SegmentCustomMetadataField::valueOf)
.collect(Collectors.toSet());
}
+
+ public Map fetchIndexesCacheConfigs() {
+ return originalsWithPrefix(FETCH_INDEXES_CACHE_PREFIX);
+ }
}
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/MemorySegmentIndexesCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/MemorySegmentIndexesCache.java
new file mode 100644
index 000000000..8214587e2
--- /dev/null
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/MemorySegmentIndexesCache.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.fetch.index;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+
+import io.aiven.kafka.tieredstorage.config.CacheConfig;
+import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
+import io.aiven.kafka.tieredstorage.storage.ObjectKey;
+import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
+
+import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.github.benmanes.caffeine.cache.Weigher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemorySegmentIndexesCache implements SegmentIndexesCache {
+ private static final Logger log = LoggerFactory.getLogger(MemorySegmentIndexesCache.class);
+
+ private static final long GET_TIMEOUT_SEC = 10;
+ private static final long DEFAULT_MAX_SIZE_BYTES = 10 * 1024 * 1024;
+ private static final String METRIC_GROUP = "segment-indexes-cache";
+
+ private final Executor executor = new ForkJoinPool();
+ private final CaffeineStatsCounter statsCounter = new CaffeineStatsCounter(METRIC_GROUP);
+
+ protected AsyncCache cache;
+
+ // for testing
+ RemovalListener removalListener() {
+ return (key, content, cause) -> log.debug("Deleted cached value for key {} from cache."
+ + " The reason of the deletion is {}", key, cause);
+ }
+
+ private static Weigher weigher() {
+ return (key, value) -> value.length;
+ }
+
+ protected AsyncCache buildCache(final CacheConfig config) {
+ final Caffeine