diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index dfb4427b8eda3..1438ca24547a7 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
+import org.apache.kafka.server.common.{ApiMessageAndVersion, BrokerReadyCallback, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics}
@@ -64,7 +64,7 @@ import java.util
import java.util.Optional
import java.util.concurrent.locks.{Condition, ReentrantLock}
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException}
-import scala.collection.Map
+import scala.collection.{Map, mutable}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
@@ -162,6 +162,8 @@ class BrokerServer(
var persister: Persister = _
+ private val brokerReadyCallbacks = new mutable.ListBuffer[BrokerReadyCallback]()
+
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
lock.lock()
try {
@@ -283,6 +285,9 @@ class BrokerServer(
remoteLogManagerOpt = createRemoteLogManager(listenerInfo)
+ remoteLogManagerOpt.foreach(rlm =>
+ registerBrokerReadyCallback(rlm.remoteLogMetadataManager()))
+
alterPartitionManager = AlterPartitionManager(
config,
scheduler = kafkaScheduler,
@@ -589,6 +594,15 @@ class BrokerServer(
"all of the SocketServer Acceptors to be started",
enableRequestProcessingFuture, startupDeadline, time)
+ brokerReadyCallbacks.foreach { callback =>
+ try {
+ callback.onBrokerReady()
+ } catch {
+ case e: Exception =>
+ error(s"Error executing broker ready callback: ${callback.getClass.getSimpleName}", e)
+ }
+ }
+
maybeChangeStatus(STARTING, STARTED)
} catch {
case e: Throwable =>
@@ -599,6 +613,10 @@ class BrokerServer(
}
}
+ def registerBrokerReadyCallback(callback: BrokerReadyCallback): Unit = {
+ brokerReadyCallbacks += callback
+ }
+
private def createGroupCoordinator(): GroupCoordinator = {
// Create group coordinator, but don't start it until we've started replica manager.
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/BrokerReadyCallback.java b/server-common/src/main/java/org/apache/kafka/server/common/BrokerReadyCallback.java
new file mode 100644
index 0000000000000..91a302f6c6875
Binary files /dev/null and b/server-common/src/main/java/org/apache/kafka/server/common/BrokerReadyCallback.java differ
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
index 189e0a1713e31..5d4b85449ab87 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
@@ -18,6 +18,7 @@
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.server.common.BrokerReadyCallback;
import java.io.Closeable;
import java.util.Iterator;
@@ -52,7 +53,7 @@
* The following tags are automatically added to all metrics registered: config
set to
* remote.log.metadata.manager.class.name
, and class
set to the RemoteLogMetadataManager class name.
*/
-public interface RemoteLogMetadataManager extends Configurable, Closeable {
+public interface RemoteLogMetadataManager extends BrokerReadyCallback, Configurable, Closeable {
/**
* This method is used to add {@link RemoteLogSegmentMetadata} asynchronously with the containing {@link RemoteLogSegmentId} into {@link RemoteLogMetadataManager}.
@@ -242,4 +243,6 @@ default Optional nextSegmentWithTxnIndex(TopicIdPartit
default boolean isReady(TopicIdPartition topicIdPartition) {
return true;
}
+
+ default void onBrokerReady() {}
}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 91011b1d9c2fb..8af2094b78da9 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -296,7 +296,6 @@ public void configure(Map configs) {
// successful.
initializationThread = KafkaThread.nonDaemon(
"RLMMInitializationThread", () -> initializeResources(rlmmConfig));
- initializationThread.start();
log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig);
} else {
log.info("Skipping configure as it is already configured.");
@@ -370,6 +369,11 @@ private void initializeResources(TopicBasedRemoteLogMetadataManagerConfig rlmmCo
initializationFailed = true;
}
}
+ @Override
+ public void onBrokerReady() {
+ log.info("Broker is ready for requests, now initializing RLMM resources");
+ initializationThread.start();
+ }
boolean doesTopicExist(Admin admin, String topic) throws ExecutionException, InterruptedException {
try {
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
index e3e75b9f473c1..57c36cecd0c28 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
@@ -69,8 +69,8 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
"retrying RemoteLogMetadataManager resources initialization again.";
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds " +
- "for retrying RemoteLogMetadataManager resources initialization. When total retry intervals reach this timeout, initialization " +
- "is considered as failed and broker starts shutting down.";
+ "for retrying RemoteLogMetadataManager resources initialization. The timer for this timeout begins after the broker is ready to handle requests. " +
+ "When total retry intervals reach this timeout, initialization is considered as failed and broker starts shutting down.";
public static final String REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX = "remote.log.metadata.common.client.";
public static final String REMOTE_LOG_METADATA_PRODUCER_PREFIX = "remote.log.metadata.producer.";
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index 5ca596ec7b7df..6ed479e6b9f6b 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -423,7 +423,7 @@ private Plugin configAndWrapRlmmPlugin(RemoteLogMetada
return Plugin.wrapInstance(rlmm, metrics, RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
}
- RemoteLogMetadataManager remoteLogMetadataManager() {
+ public RemoteLogMetadataManager remoteLogMetadataManager() {
return remoteLogMetadataManagerPlugin.get();
}