Skip to content

Commit 9d260c2

Browse files
committed
feat: add async cache for segment indexes
Indexes are currently cached on broker-side but with a synchronous cache that is not able to source value when interrupted. This new cache will fetch index asynchronously, and retries would have chances to find the value sourced already.
1 parent 8b9fdee commit 9d260c2

File tree

10 files changed

+832
-13
lines changed

10 files changed

+832
-13
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
<suppress checks="ClassFanOutComplexity" files=".*Test\.java"/>
2424
<suppress checks="ClassFanOutComplexity" files="RemoteStorageManager.java"/>
2525
<suppress checks="ClassFanOutComplexity" files="ChunkCache.java"/>
26+
<suppress checks="ClassFanOutComplexity" files="MemorySegmentIndexesCache"/>
2627
<suppress checks="ClassFanOutComplexity" files="AzureBlobStorage.java"/>
2728
<suppress checks="ClassDataAbstractionCoupling" files="CaffeineStatsCounter.java"/>
2829
<suppress checks="ClassDataAbstractionCoupling" files="S3StorageConfig.java"/>

core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,11 @@
5252
import io.aiven.kafka.tieredstorage.fetch.ChunkManagerFactory;
5353
import io.aiven.kafka.tieredstorage.fetch.FetchChunkEnumeration;
5454
import io.aiven.kafka.tieredstorage.fetch.KeyNotFoundRuntimeException;
55+
import io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCache;
56+
import io.aiven.kafka.tieredstorage.fetch.index.SegmentIndexesCache;
5557
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
5658
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
59+
import io.aiven.kafka.tieredstorage.manifest.SegmentIndex;
5760
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1;
5861
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1Builder;
5962
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
@@ -122,6 +125,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.
122125
private Set<SegmentCustomMetadataField> customMetadataFields;
123126

124127
private SegmentManifestProvider segmentManifestProvider;
128+
private SegmentIndexesCache segmentIndexesCache;
125129

126130
public RemoteStorageManager() {
127131
this(Time.SYSTEM);
@@ -168,6 +172,9 @@ public void configure(final Map<String, ?> configs) {
168172
mapper,
169173
executor);
170174

175+
segmentIndexesCache = new MemorySegmentIndexesCache();
176+
segmentIndexesCache.configure(config.fetchIndexesCacheConfigs());
177+
171178
customMetadataSerde = new SegmentCustomMetadataSerde();
172179
customMetadataFields = config.customMetadataKeysIncluded();
173180
}
@@ -513,19 +520,11 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet
513520
if (segmentIndex == null) {
514521
throw new RemoteResourceNotFoundException("Index " + indexType + " not found on " + key);
515522
}
516-
final var in = fetcher.fetch(key, segmentIndex.range());
517-
518-
DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(in);
519-
final Optional<SegmentEncryptionMetadata> encryptionMetadata = segmentManifest.encryption();
520-
if (encryptionMetadata.isPresent()) {
521-
detransformEnum = new DecryptionChunkEnumeration(
522-
detransformEnum,
523-
encryptionMetadata.get().ivSize(),
524-
encryptedChunk -> aesEncryptionProvider.decryptionCipher(encryptedChunk, encryptionMetadata.get())
525-
);
526-
}
527-
final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum);
528-
return detransformFinisher.toInputStream();
523+
return segmentIndexesCache.get(
524+
key,
525+
indexType,
526+
() -> fetchIndexBytes(key, segmentIndex, segmentManifest)
527+
);
529528
} catch (final RemoteResourceNotFoundException e) {
530529
throw e;
531530
} catch (final KeyNotFoundException e) {
@@ -535,6 +534,36 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet
535534
}
536535
}
537536

537+
private byte[] fetchIndexBytes(
538+
final ObjectKey key,
539+
final SegmentIndex segmentIndex,
540+
final SegmentManifest segmentManifest
541+
) {
542+
final InputStream in;
543+
try {
544+
in = fetcher.fetch(key, segmentIndex.range());
545+
} catch (final StorageBackendException e) {
546+
throw new RuntimeException("Error fetching index from remote storage", e);
547+
}
548+
549+
DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(in);
550+
final Optional<SegmentEncryptionMetadata> encryptionMetadata = segmentManifest.encryption();
551+
if (encryptionMetadata.isPresent()) {
552+
detransformEnum = new DecryptionChunkEnumeration(
553+
detransformEnum,
554+
encryptionMetadata.get().ivSize(),
555+
encryptedChunk -> aesEncryptionProvider.decryptionCipher(encryptedChunk,
556+
encryptionMetadata.get())
557+
);
558+
}
559+
final var detransformFinisher = new DetransformFinisher(detransformEnum);
560+
try (final var is = detransformFinisher.toInputStream()) {
561+
return is.readAllBytes();
562+
} catch (final IOException e) {
563+
throw new RuntimeException("Error reading de-transformed index bytes", e);
564+
}
565+
}
566+
538567
private ObjectKey objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
539568
final ObjectKeyFactory.Suffix suffix) {
540569
final ObjectKey segmentKey;
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2024 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.tieredstorage.config;
18+
19+
import java.time.Duration;
20+
import java.util.Map;
21+
import java.util.Optional;
22+
import java.util.OptionalLong;
23+
24+
import org.apache.kafka.common.config.AbstractConfig;
25+
import org.apache.kafka.common.config.ConfigDef;
26+
27+
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
28+
29+
public class CacheConfig extends AbstractConfig {
30+
private static final String CACHE_SIZE_CONFIG = "size";
31+
private static final String CACHE_SIZE_DOC = "Cache size in bytes, where \"-1\" represents unbounded cache";
32+
private static final String CACHE_RETENTION_CONFIG = "retention.ms";
33+
private static final String CACHE_RETENTION_DOC = "Cache retention time ms, "
34+
+ "where \"-1\" represents infinite retention";
35+
private static final long DEFAULT_CACHE_RETENTION_MS = 600_000;
36+
37+
private static ConfigDef addCacheConfigs(final OptionalLong maybeDefaultSize) {
38+
final ConfigDef configDef = new ConfigDef();
39+
Object defaultValue = NO_DEFAULT_VALUE;
40+
if (maybeDefaultSize.isPresent()) {
41+
defaultValue = maybeDefaultSize.getAsLong();
42+
}
43+
configDef.define(
44+
CACHE_SIZE_CONFIG,
45+
ConfigDef.Type.LONG,
46+
defaultValue,
47+
ConfigDef.Range.between(-1L, Long.MAX_VALUE),
48+
ConfigDef.Importance.MEDIUM,
49+
CACHE_SIZE_DOC
50+
);
51+
configDef.define(
52+
CACHE_RETENTION_CONFIG,
53+
ConfigDef.Type.LONG,
54+
DEFAULT_CACHE_RETENTION_MS,
55+
ConfigDef.Range.between(-1L, Long.MAX_VALUE),
56+
ConfigDef.Importance.MEDIUM,
57+
CACHE_RETENTION_DOC
58+
);
59+
return configDef;
60+
}
61+
62+
public CacheConfig(final Map<String, ?> props) {
63+
super(addCacheConfigs(OptionalLong.empty()), props);
64+
}
65+
66+
public CacheConfig(final Map<String, ?> props, final long defaultSize) {
67+
super(addCacheConfigs(OptionalLong.of(defaultSize)), props);
68+
}
69+
70+
public Optional<Long> cacheSize() {
71+
final Long rawValue = getLong(CACHE_SIZE_CONFIG);
72+
if (rawValue == -1) {
73+
return Optional.empty();
74+
}
75+
return Optional.of(rawValue);
76+
}
77+
78+
public Optional<Duration> cacheRetention() {
79+
final Long rawValue = getLong(CACHE_RETENTION_CONFIG);
80+
if (rawValue == -1) {
81+
return Optional.empty();
82+
}
83+
return Optional.of(Duration.ofMillis(rawValue));
84+
}
85+
}

core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
public class RemoteStorageManagerConfig extends AbstractConfig {
4343
private static final String STORAGE_PREFIX = "storage.";
44+
private static final String FETCH_INDEXES_CACHE_PREFIX = "fetch.indexes.cache.";
4445

4546
private static final String STORAGE_BACKEND_CLASS_CONFIG = STORAGE_PREFIX + "backend.class";
4647
private static final String STORAGE_BACKEND_CLASS_DOC = "The storage backend implementation class";
@@ -393,4 +394,8 @@ public Set<SegmentCustomMetadataField> customMetadataKeysIncluded() {
393394
.map(SegmentCustomMetadataField::valueOf)
394395
.collect(Collectors.toSet());
395396
}
397+
398+
public Map<String, ?> fetchIndexesCacheConfigs() {
399+
return originalsWithPrefix(FETCH_INDEXES_CACHE_PREFIX);
400+
}
396401
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright 2024 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.tieredstorage.fetch.index;
18+
19+
import java.io.ByteArrayInputStream;
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.util.Map;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.ExecutionException;
25+
import java.util.concurrent.Executor;
26+
import java.util.concurrent.ForkJoinPool;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.TimeoutException;
29+
import java.util.function.Supplier;
30+
31+
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
32+
33+
import io.aiven.kafka.tieredstorage.config.CacheConfig;
34+
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
35+
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
36+
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
37+
38+
import com.github.benmanes.caffeine.cache.AsyncCache;
39+
import com.github.benmanes.caffeine.cache.Caffeine;
40+
import com.github.benmanes.caffeine.cache.RemovalListener;
41+
import com.github.benmanes.caffeine.cache.Scheduler;
42+
import com.github.benmanes.caffeine.cache.Weigher;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
45+
46+
public class MemorySegmentIndexesCache implements SegmentIndexesCache {
47+
private static final Logger log = LoggerFactory.getLogger(MemorySegmentIndexesCache.class);
48+
49+
private static final long GET_TIMEOUT_SEC = 10;
50+
private static final long DEFAULT_MAX_SIZE_BYTES = 10 * 1024 * 1024;
51+
private static final String METRIC_GROUP = "segment-indexes-cache";
52+
53+
private final Executor executor = new ForkJoinPool();
54+
private final CaffeineStatsCounter statsCounter = new CaffeineStatsCounter(METRIC_GROUP);
55+
56+
protected AsyncCache<SegmentIndexKey, byte[]> cache;
57+
58+
// for testing
59+
RemovalListener<SegmentIndexKey, byte[]> removalListener() {
60+
return (key, content, cause) -> log.debug("Deleted cached value for key {} from cache."
61+
+ " The reason of the deletion is {}", key, cause);
62+
}
63+
64+
private static Weigher<SegmentIndexKey, byte[]> weigher() {
65+
return (key, value) -> value.length;
66+
}
67+
68+
protected AsyncCache<SegmentIndexKey, byte[]> buildCache(final CacheConfig config) {
69+
final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
70+
config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight(maximumWeight).weigher(weigher()));
71+
config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess);
72+
final var cache = cacheBuilder.evictionListener(removalListener())
73+
.scheduler(Scheduler.systemScheduler())
74+
.executor(executor)
75+
.recordStats(() -> statsCounter)
76+
.buildAsync();
77+
statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize);
78+
return cache;
79+
}
80+
81+
@Override
82+
public InputStream get(
83+
final ObjectKey objectKey,
84+
final IndexType indexType,
85+
final Supplier<byte[]> indexSupplier
86+
) throws StorageBackendException, IOException {
87+
try {
88+
return cache.asMap()
89+
.compute(new SegmentIndexKey(objectKey, indexType), (key, val) -> {
90+
if (val == null) {
91+
statsCounter.recordMiss();
92+
return CompletableFuture.supplyAsync(indexSupplier, executor);
93+
} else {
94+
statsCounter.recordHit();
95+
return val;
96+
}
97+
})
98+
.thenApplyAsync(ByteArrayInputStream::new, executor)
99+
.get(GET_TIMEOUT_SEC, TimeUnit.SECONDS);
100+
} catch (final ExecutionException e) {
101+
// Unwrap previously wrapped exceptions if possible.
102+
Throwable cause = e.getCause();
103+
if (cause instanceof RuntimeException) {
104+
cause = cause.getCause();
105+
}
106+
107+
// We don't really expect this case, but handle it nevertheless.
108+
if (cause == null) {
109+
throw new RuntimeException(e);
110+
}
111+
112+
if (cause instanceof StorageBackendException) {
113+
throw (StorageBackendException) cause;
114+
}
115+
if (cause instanceof IOException) {
116+
throw (IOException) cause;
117+
}
118+
119+
throw new RuntimeException(e);
120+
} catch (final InterruptedException | TimeoutException e) {
121+
throw new RuntimeException(e);
122+
}
123+
}
124+
125+
@Override
126+
public void configure(final Map<String, ?> configs) {
127+
final var config = new CacheConfig(configs, DEFAULT_MAX_SIZE_BYTES);
128+
this.cache = buildCache(config);
129+
}
130+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2024 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.tieredstorage.fetch.index;
18+
19+
import java.util.Objects;
20+
21+
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
22+
23+
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
24+
25+
public class SegmentIndexKey {
26+
public final ObjectKey indexesKey;
27+
public final RemoteStorageManager.IndexType indexType;
28+
29+
public SegmentIndexKey(final ObjectKey indexesKey, final RemoteStorageManager.IndexType indexType) {
30+
this.indexesKey = indexesKey;
31+
this.indexType = indexType;
32+
}
33+
34+
@Override
35+
public String toString() {
36+
return "SegmentIndexKey{"
37+
+ "indexesKey=" + indexesKey
38+
+ ", indexType=" + indexType
39+
+ '}';
40+
}
41+
42+
@Override
43+
public boolean equals(final Object o) {
44+
if (this == o) {
45+
return true;
46+
}
47+
if (o == null || getClass() != o.getClass()) {
48+
return false;
49+
}
50+
final SegmentIndexKey that = (SegmentIndexKey) o;
51+
return Objects.equals(indexesKey, that.indexesKey) && indexType == that.indexType;
52+
}
53+
54+
@Override
55+
public int hashCode() {
56+
return Objects.hash(indexesKey, indexType);
57+
}
58+
}

0 commit comments

Comments
 (0)