-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-19426: TopicBasedRemoteLogMetadataManager's initial should happen after the broker ready. #20203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
…he __remote_log_metadata topic if it already exists.
…en after the broker ready.
� Conflicts: � storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jiafu1115 for the patch!
The current approach require a KIP to proceed.
Another approach is not to call RLMM#configure() method while instantiating the RemoteLogManager#L422 and define a new method in RemoteLogManager#configureRLMM and this can be called from the BrokerServer.
public void configureRLMM(CompletableFuture<Void> brokerReadyFuture) {
brokerReadyFuture.whenComplete((r, t) -> {
if (t != null) {
remoteLogMetadataManagerPlugin.get().configure(rlmmProps);
}
});
}
...e/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
Outdated
Show resolved
Hide resolved
When we wait for the SocketServer to be ready before initializing the TBRLMM, then the user FETCH requests (that want to read from remote) might be served before the initialization, and ReplicaNotAvailableException gets thrown back to the caller which is a client retriable error so it is fine. If you're interested to know why ReplicaNotAvaibleException gets thrown, then refer to KIP-1007. |
Opened #20204 to refactor the TBRLMM implementation. |
@kamalcph thanks for so detail information. That is the key information which I worried about. Thanks a lot. I will write one KIP. thanks! |
� Conflicts: � storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@kamalcph Hi , I compare my original propose and your approach . I think your approach is much better on follow items: The possible shortcoming found when comparing the my propose. I update the code for your compare (you can compare it with https://github.com/apache/kafka/pull/20231/files which I create for your compare). Can you help to review it again for compare the two proposes. Thanks a lot. |
With this patch, RLMM#configure is called post the SocketServer initialization, this is required only for TBRLMM implementation as it has to create/read/write to the For other RLMM plugin impl, the Compared with and without this patch. Without patch, the
With this patch, the
The other approach #20231 is generic and allows any RLMM implementor to invoke broker based on the status. But, it requires KIP. @showuon / @FrankYang0529 Could you take a second pass? Thanks! Sample logs:
|
@kamalcph ok. thanks for your comments and test. the test result look well as expected . With this patch, we don't need to worry the max retry time for different Kafka clusters which take different time to complete startup. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Accroding to KIP-877
"If a plugin implements this interface, the withPluginMetrics() method will be called when the plugin is instantiated (after configure() if the plugin also implements Configurable). "
So I think the current change breaks that contract.
@Yunyung thanks for your review. I will also take some time to understand the contract. cc @kamalcph BTW. Can you also help to take a look at the original propose:c8c2280 WDTY for this one? |
@kamalcph I read the current code implement and the KIP provided by @Yunyung . WDTY? Or do we have some better idea? Thank you ! |
I didn't thought about the KIP-877. Then, the other proposal looks clean: c8c2280 |
# Conflicts: # core/src/main/scala/kafka/server/BrokerServer.scala
@kamalcph thanks. |
Hi, @kamalcph @Yunyung @showuon @FrankYang0529 |
Thanks for the update. This looks reasonable. Let's see if anyone else has any feedback |
@Yunyung thanks for your feedback! @kamalcph @Yunyung @showuon @FrankYang0529 |
refer to #20008. I submit another proposal to solve the issue which can also help to solve the following issue:
the DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS is hard to be defined due to that it relay on the startup process.
If it the value is not well defined. the initial will fail and cause local disk never get deleted. (propose #20007 to prevent it.)
Different env's kafka process need different value. It is caused by followed reason:
When restarting broker. The connection to query/create topic in "TopicBasedRemoteLogMetadataManager#initializeResources"will fail until the broker's self get ready. The detail reason can refer to :#20007 (comment)
So propose this change to improve it. After the improve it and default value is easy to be defined. I don't need to set a very big value to all our kafkas.
cc @kamalcph @FrankYang0529 @showuon