Skip to content

Commit 7c01742

Browse files
committed
Refactor chunk segment
Before, the chunk segment was a wrapper over `DefaultChunkManager`. In some situations, this is not very convenient and also the cache has more responsibilities than it should (like prefetching). This commit refactors the cache to be a pluggable mechanism in the chunk manager.
1 parent 28f900c commit 7c01742

20 files changed

+756
-609
lines changed

checkstyle/suppressions.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
<suppress checks="ClassDataAbstractionCoupling" files=".*Test\.java"/>
2323
<suppress checks="ClassFanOutComplexity" files=".*Test\.java"/>
2424
<suppress checks="ClassFanOutComplexity" files="RemoteStorageManager.java"/>
25-
<suppress checks="ClassFanOutComplexity" files="ChunkCache.java"/>
25+
<suppress checks="ClassFanOutComplexity" files="AbstractChunkCache.java"/>
26+
<suppress checks="ClassFanOutComplexity" files="DefaultChunkManager.java"/>
2627
<suppress checks="ClassFanOutComplexity" files="AzureBlobStorage.java"/>
2728
<suppress checks="ClassDataAbstractionCoupling" files="CaffeineStatsCounter.java"/>
2829
<suppress checks="ClassDataAbstractionCoupling" files="S3StorageConfig.java"/>

core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactory.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,13 @@ public void configure(final Map<String, ?> configs) {
3434

3535
public ChunkManager initChunkManager(final ObjectFetcher fileFetcher,
3636
final AesEncryptionProvider aesEncryptionProvider) {
37-
final DefaultChunkManager defaultChunkManager = new DefaultChunkManager(fileFetcher, aesEncryptionProvider);
38-
if (config.cacheClass() != null) {
39-
try {
40-
final ChunkCache<?> chunkCache = config
41-
.cacheClass()
42-
.getDeclaredConstructor(ChunkManager.class)
43-
.newInstance(defaultChunkManager);
44-
chunkCache.configure(config.originalsWithPrefix(ChunkManagerFactoryConfig.CHUNK_CACHE_PREFIX));
45-
return chunkCache;
46-
} catch (final ReflectiveOperationException e) {
47-
throw new RuntimeException(e);
48-
}
49-
} else {
50-
return defaultChunkManager;
37+
final ChunkCache<?> chunkCache;
38+
try {
39+
chunkCache = config.cacheClass().getDeclaredConstructor().newInstance();
40+
} catch (final ReflectiveOperationException e) {
41+
throw new RuntimeException(e);
5142
}
43+
chunkCache.configure(config.originalsWithPrefix(ChunkManagerFactoryConfig.CHUNK_CACHE_PREFIX));
44+
return new DefaultChunkManager(fileFetcher, aesEncryptionProvider, chunkCache, config.cachePrefetchingSize());
5245
}
5346
}

core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryConfig.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.common.config.ConfigDef;
2323

2424
import io.aiven.kafka.tieredstorage.chunkmanager.cache.ChunkCache;
25+
import io.aiven.kafka.tieredstorage.chunkmanager.cache.NoOpChunkCache;
2526
import io.aiven.kafka.tieredstorage.config.validators.Subclass;
2627

2728
public class ChunkManagerFactoryConfig extends AbstractConfig {
@@ -30,6 +31,11 @@ public class ChunkManagerFactoryConfig extends AbstractConfig {
3031
public static final String CHUNK_CACHE_CONFIG = CHUNK_CACHE_PREFIX + "class";
3132
private static final String CHUNK_CACHE_DOC = "The chunk cache implementation";
3233

34+
private static final String CACHE_PREFETCH_MAX_SIZE_CONFIG = CHUNK_CACHE_PREFIX + "prefetch.max.size";
35+
private static final String CACHE_PREFETCH_MAX_SIZE_DOC =
36+
"The amount of data that should be eagerly prefetched and cached";
37+
private static final int CACHE_PREFETCHING_SIZE_DEFAULT = 0; //TODO find out what it should be
38+
3339
private static final ConfigDef CONFIG;
3440

3541
static {
@@ -38,11 +44,19 @@ public class ChunkManagerFactoryConfig extends AbstractConfig {
3844
CONFIG.define(
3945
CHUNK_CACHE_CONFIG,
4046
ConfigDef.Type.CLASS,
41-
null,
47+
NoOpChunkCache.class,
4248
Subclass.of(ChunkCache.class),
4349
ConfigDef.Importance.MEDIUM,
4450
CHUNK_CACHE_DOC
4551
);
52+
CONFIG.define(
53+
CACHE_PREFETCH_MAX_SIZE_CONFIG,
54+
ConfigDef.Type.INT,
55+
CACHE_PREFETCHING_SIZE_DEFAULT,
56+
ConfigDef.Range.between(0, Integer.MAX_VALUE),
57+
ConfigDef.Importance.MEDIUM,
58+
CACHE_PREFETCH_MAX_SIZE_DOC
59+
);
4660
}
4761

4862
public ChunkManagerFactoryConfig(final Map<?, ?> originals) {
@@ -53,4 +67,8 @@ public ChunkManagerFactoryConfig(final Map<?, ?> originals) {
5367
public Class<ChunkCache<?>> cacheClass() {
5468
return (Class<ChunkCache<?>>) getClass(CHUNK_CACHE_CONFIG);
5569
}
70+
71+
public int cachePrefetchingSize() {
72+
return getInt(CACHE_PREFETCH_MAX_SIZE_CONFIG);
73+
}
5674
}

core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java

Lines changed: 68 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,22 @@
1616

1717
package io.aiven.kafka.tieredstorage.chunkmanager;
1818

19+
import java.io.IOException;
1920
import java.io.InputStream;
2021
import java.util.List;
2122
import java.util.Optional;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.CompletionException;
25+
import java.util.concurrent.Executor;
26+
import java.util.concurrent.ForkJoinPool;
27+
import java.util.function.Supplier;
2228

2329
import io.aiven.kafka.tieredstorage.Chunk;
30+
import io.aiven.kafka.tieredstorage.chunkmanager.cache.ChunkCache;
2431
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
2532
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
2633
import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
34+
import io.aiven.kafka.tieredstorage.storage.BytesRange;
2735
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
2836
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
2937
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
@@ -36,10 +44,19 @@
3644
public class DefaultChunkManager implements ChunkManager {
3745
private final ObjectFetcher fetcher;
3846
private final AesEncryptionProvider aesEncryptionProvider;
47+
final ChunkCache<?> chunkCache;
48+
private final int prefetchingSize;
3949

40-
public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvider aesEncryptionProvider) {
50+
private final Executor executor = new ForkJoinPool();
51+
52+
public DefaultChunkManager(final ObjectFetcher fetcher,
53+
final AesEncryptionProvider aesEncryptionProvider,
54+
final ChunkCache<?> chunkCache,
55+
final int prefetchingSize) {
4156
this.fetcher = fetcher;
4257
this.aesEncryptionProvider = aesEncryptionProvider;
58+
this.chunkCache = chunkCache;
59+
this.prefetchingSize = prefetchingSize;
4360
}
4461

4562
/**
@@ -48,24 +65,60 @@ public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvi
4865
* @return an {@link InputStream} of the chunk, plain text (i.e., decrypted and decompressed).
4966
*/
5067
public InputStream getChunk(final ObjectKey objectKey, final SegmentManifest manifest,
51-
final int chunkId) throws StorageBackendException {
52-
final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId);
68+
final int chunkId) throws StorageBackendException, IOException {
69+
final var currentChunk = manifest.chunkIndex().chunks().get(chunkId);
70+
startPrefetching(objectKey, manifest, currentChunk.originalPosition + currentChunk.originalSize);
71+
72+
final ChunkKey chunkKey = new ChunkKey(objectKey.value(), chunkId);
73+
return chunkCache.getChunk(chunkKey, createChunkSupplier(objectKey, manifest, chunkId));
74+
}
5375

54-
final InputStream chunkContent = fetcher.fetch(objectKey, chunk.range());
76+
private void startPrefetching(final ObjectKey segmentKey,
77+
final SegmentManifest segmentManifest,
78+
final int startPosition) {
79+
if (prefetchingSize > 0) {
80+
final BytesRange prefetchingRange;
81+
if (Integer.MAX_VALUE - startPosition < prefetchingSize) {
82+
prefetchingRange = BytesRange.of(startPosition, Integer.MAX_VALUE);
83+
} else {
84+
prefetchingRange = BytesRange.ofFromPositionAndSize(startPosition, prefetchingSize);
85+
}
86+
final var chunks = segmentManifest.chunkIndex().chunksForRange(prefetchingRange);
87+
chunks.forEach(chunk -> {
88+
final ChunkKey chunkKey = new ChunkKey(segmentKey.value(), chunk.id);
89+
chunkCache.supplyIfAbsent(chunkKey, createChunkSupplier(segmentKey, segmentManifest, chunk.id));
90+
});
91+
}
92+
}
5593

56-
DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk));
57-
final Optional<SegmentEncryptionMetadata> encryptionMetadata = manifest.encryption();
58-
if (encryptionMetadata.isPresent()) {
59-
detransformEnum = new DecryptionChunkEnumeration(
94+
private Supplier<CompletableFuture<InputStream>> createChunkSupplier(final ObjectKey objectKey,
95+
final SegmentManifest manifest,
96+
final int chunkId) {
97+
return () -> CompletableFuture.supplyAsync(() -> {
98+
final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId);
99+
100+
final InputStream chunkContent;
101+
try {
102+
chunkContent = fetcher.fetch(objectKey, chunk.range());
103+
} catch (final StorageBackendException e) {
104+
throw new CompletionException(e);
105+
}
106+
107+
DetransformChunkEnumeration detransformEnum =
108+
new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk));
109+
final Optional<SegmentEncryptionMetadata> encryptionMetadata = manifest.encryption();
110+
if (encryptionMetadata.isPresent()) {
111+
detransformEnum = new DecryptionChunkEnumeration(
60112
detransformEnum,
61113
encryptionMetadata.get().ivSize(),
62114
encryptedChunk -> aesEncryptionProvider.decryptionCipher(encryptedChunk, encryptionMetadata.get())
63-
);
64-
}
65-
if (manifest.compression()) {
66-
detransformEnum = new DecompressionChunkEnumeration(detransformEnum);
67-
}
68-
final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum);
69-
return detransformFinisher.toInputStream();
115+
);
116+
}
117+
if (manifest.compression()) {
118+
detransformEnum = new DecompressionChunkEnumeration(detransformEnum);
119+
}
120+
final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum);
121+
return detransformFinisher.toInputStream();
122+
}, executor);
70123
}
71124
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.tieredstorage.chunkmanager.cache;
18+
19+
import java.io.IOException;
20+
import java.io.InputStream;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.CompletionException;
23+
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.Executor;
25+
import java.util.concurrent.ForkJoinPool;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.TimeoutException;
28+
import java.util.concurrent.atomic.AtomicReference;
29+
import java.util.function.Supplier;
30+
31+
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey;
32+
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
33+
34+
import com.github.benmanes.caffeine.cache.AsyncCache;
35+
import com.github.benmanes.caffeine.cache.Caffeine;
36+
import com.github.benmanes.caffeine.cache.RemovalListener;
37+
import com.github.benmanes.caffeine.cache.Scheduler;
38+
import com.github.benmanes.caffeine.cache.Weigher;
39+
40+
public abstract class AbstractChunkCache<T> extends BaseChunkCache<T> {
41+
private static final long GET_TIMEOUT_SEC = 10;
42+
private static final String METRIC_GROUP = "chunk-cache";
43+
44+
private final Executor executor = new ForkJoinPool();
45+
46+
final CaffeineStatsCounter statsCounter = new CaffeineStatsCounter(METRIC_GROUP);
47+
48+
protected AsyncCache<ChunkKey, T> cache;
49+
50+
/**
51+
* Get a chunk from the cache.
52+
*
53+
* <p>If the chunk is not present in the cache, use {@literal chunkSupplier} to get it and cache.
54+
*
55+
* <p>Since it's not possible to cache an opened InputStream, the actual data is cached, and everytime
56+
* there is a call to cache the InputStream is recreated from the data stored in cache and stored into local
57+
* variable. This also allows solving the race condition between eviction and fetching. Since the InputStream is
58+
* opened right when fetching from cache happens even if the actual value is removed from the cache,
59+
* the InputStream will still contain the data.
60+
*/
61+
@Override
62+
protected InputStream getChunk0(final ChunkKey chunkKey,
63+
final Supplier<CompletableFuture<InputStream>> chunkSupplier)
64+
throws ExecutionException, InterruptedException, TimeoutException {
65+
final AtomicReference<InputStream> result = new AtomicReference<>();
66+
return cache.asMap()
67+
.compute(chunkKey, (key, val) -> {
68+
if (val == null) {
69+
statsCounter.recordMiss();
70+
// TODO do not put a failed future into the cache
71+
return chunkSupplier.get().thenApplyAsync(chunk -> {
72+
try {
73+
final T t = this.cacheChunk(chunkKey, chunk);
74+
result.getAndSet(cachedChunkToInputStream(t));
75+
return t;
76+
} catch (final IOException e) {
77+
throw new CompletionException(e);
78+
}
79+
}, executor);
80+
} else {
81+
statsCounter.recordHit();
82+
return CompletableFuture.supplyAsync(() -> {
83+
try {
84+
final T cachedChunk = val.get();
85+
result.getAndSet(cachedChunkToInputStream(cachedChunk));
86+
return cachedChunk;
87+
} catch (final InterruptedException | ExecutionException e) {
88+
throw new CompletionException(e);
89+
}
90+
}, executor);
91+
}
92+
})
93+
.thenApplyAsync(t -> result.get())
94+
.get(GET_TIMEOUT_SEC, TimeUnit.SECONDS);
95+
}
96+
97+
public void supplyIfAbsent(final ChunkKey chunkKey,
98+
final Supplier<CompletableFuture<InputStream>> chunkSupplier) {
99+
// TODO do some logging if error
100+
// TODO do not put a failed future into the cache
101+
cache.asMap().computeIfAbsent(chunkKey,
102+
key -> chunkSupplier.get().thenApplyAsync(chunk -> {
103+
try {
104+
return this.cacheChunk(chunkKey, chunk);
105+
} catch (final IOException e) {
106+
throw new CompletionException(e);
107+
}
108+
}, executor));
109+
}
110+
111+
public abstract InputStream cachedChunkToInputStream(final T cachedChunk);
112+
113+
public abstract T cacheChunk(final ChunkKey chunkKey, final InputStream chunk) throws IOException;
114+
115+
public abstract RemovalListener<ChunkKey, T> removalListener();
116+
117+
public abstract Weigher<ChunkKey, T> weigher();
118+
119+
protected AsyncCache<ChunkKey, T> buildCache(final ChunkCacheConfig config) {
120+
final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
121+
config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight(maximumWeight).weigher(weigher()));
122+
config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess);
123+
final var cache = cacheBuilder.evictionListener(removalListener())
124+
.scheduler(Scheduler.systemScheduler())
125+
.executor(executor)
126+
.recordStats(() -> statsCounter)
127+
.buildAsync();
128+
statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize);
129+
return cache;
130+
}
131+
}

0 commit comments

Comments
 (0)