-
Couldn't load subscription status.
- Fork 41
feat: add async cache for segment indexes #472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
aed6581 to
becc24a
Compare
05c66b1 to
399dc2a
Compare
core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java
Outdated
Show resolved
Hide resolved
core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/MemorySegmentIndexesCache.java
Outdated
Show resolved
Hide resolved
| public RemovalListener<SegmentIndexKey, byte[]> removalListener() { | ||
| return (key, content, cause) -> log.debug("Deleted cached value for key {} from cache." | ||
| + " The reason of the deletion is {}", key, cause); | ||
| } | ||
|
|
||
| public Weigher<SegmentIndexKey, byte[]> weigher() { | ||
| return (key, value) -> value.length; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these two need to be higher-order functions?
| public RemovalListener<SegmentIndexKey, byte[]> removalListener() { | |
| return (key, content, cause) -> log.debug("Deleted cached value for key {} from cache." | |
| + " The reason of the deletion is {}", key, cause); | |
| } | |
| public Weigher<SegmentIndexKey, byte[]> weigher() { | |
| return (key, value) -> value.length; | |
| } | |
| private static void removalListener(final SegmentIndexKey key, final byte[] value, RemovalCause cause) { | |
| log.debug("Deleted cached value for key {} from cache. The reason of the deletion is {}", key, cause); | |
| } | |
| private static int weigher(final SegmentIndexKey key, byte[] value) { | |
| return value.length; | |
| } |
core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/SegmentIndexKey.java
Show resolved
Hide resolved
| verifyNoMoreInteractions(offsetIndexSupplier); | ||
| assertThat(cache.cache.asMap()).hasSize(1); | ||
|
|
||
| Thread.sleep(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worth adding a comment that this sleep is needed to spread away in time the creation (and eviction) of the cache entries, or something like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, let's do the latter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AnatolyPopov, falling back to using thread sleep (same as in ChunkCacheTest) as adding a stronger condition (e.g. waiting for cache to be empty) takes unknown amount of time github actions--unless there's a better approach here.
core/src/test/java/io/aiven/kafka/tieredstorage/fetch/index/MemorySegmentIndexesCacheTest.java
Outdated
Show resolved
Hide resolved
core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java
Outdated
Show resolved
Hide resolved
core/src/test/java/io/aiven/kafka/tieredstorage/fetch/index/MemorySegmentIndexesCacheTest.java
Outdated
Show resolved
Hide resolved
core/src/test/java/io/aiven/kafka/tieredstorage/config/CacheConfigTest.java
Outdated
Show resolved
Hide resolved
core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/MemorySegmentIndexesCache.java
Outdated
Show resolved
Hide resolved
973aed6 to
c49d3f3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Indexes are currently cached on broker-side but with a synchronous cache that is not able to source value when interrupted. This new cache will fetch index asynchronously, and retries would have chances to find the value sourced already.
6ac772f to
9d260c2
Compare
Indexes are currently cached on broker-side but with a synchronous cache that is not able to source value when interrupted.
This new cache will fetch index asynchronously, and retries would have chances to find the value sourced already.