Skip to content

Retrieve message has error in logs #470

@bingkunyangvungle

Description

@bingkunyangvungle

What happened?

After I try to retrieve the message from client(maven kafka-clients with version 3.4.0), it can get the messages, but on the server side, it generate the following error:

[2023-12-14 14:23:49,286] ERROR Error occurred while reading the remote data for user_s3-0 (kafka.log.remote.RemoteLogReader)
org.apache.kafka.common.KafkaException: org.apache.kafka.server.log.remote.storage.RemoteStorageException: java.lang.RuntimeException: java.lang.InterruptedException
at org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$7(RemoteIndexCache.java:379)
at org.apache.kafka.storage.internals.log.RemoteIndexCache.loadIndexFile(RemoteIndexCache.java:342)
at org.apache.kafka.storage.internals.log.RemoteIndexCache.createCacheEntry(RemoteIndexCache.java:375)
at org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$getIndexEntry$6(RemoteIndexCache.java:365)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1916)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
at com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
at org.apache.kafka.storage.internals.log.RemoteIndexCache.getIndexEntry(RemoteIndexCache.java:364)
at org.apache.kafka.storage.internals.log.RemoteIndexCache.lookupOffset(RemoteIndexCache.java:436)
at kafka.log.remote.RemoteLogManager.lookupPositionForOffset(RemoteLogManager.java:1326)
at kafka.log.remote.RemoteLogManager.read(RemoteLogManager.java:1272)
at kafka.log.remote.RemoteLogReader.call(RemoteLogReader.java:62)
at kafka.log.remote.RemoteLogReader.call(RemoteLogReader.java:31)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: org.apache.kafka.server.log.remote.storage.RemoteStorageException: java.lang.RuntimeException: java.lang.InterruptedException
at io.aiven.kafka.tieredstorage.RemoteStorageManager.fetchIndex(RemoteStorageManager.java:532)
at org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager.lambda$fetchIndex$5(ClassLoaderAwareRemoteStorageManager.java:89)
at org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager.withClassLoader(ClassLoaderAwareRemoteStorageManager.java:66)
at org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager.fetchIndex(ClassLoaderAwareRemoteStorageManager.java:89)
at org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$7(RemoteIndexCache.java:377)
... 19 more
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
at io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider.get(SegmentManifestProvider.java:87)
at io.aiven.kafka.tieredstorage.RemoteStorageManager.fetchSegmentManifest(RemoteStorageManager.java:552)
at io.aiven.kafka.tieredstorage.RemoteStorageManager.fetchIndex(RemoteStorageManager.java:507)
... 23 more
Caused by: java.lang.InterruptedException
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:386)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096)
at io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider.get(SegmentManifestProvider.java:69)
... 25 more

What did you expect to happen?

The server side should have no error in logs.

What else do we need to know?

When I used the released package, it not only generated error, but the client couldn't get the messages as well.
I tried with self-built packages with this commit e06cae8 in main branch, it also generated error, but the client can get the messages.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions