Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions core/src/main/java/kafka/server/HeartbeatConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server;

/**
* Configuration for broker heartbeat behavior during metadata loading operations.
* This helps prevent broker fencing during rolling restarts with heavy metadata replication.
*/
public class HeartbeatConfig {

/**
* Maximum time to extend heartbeat timeout during metadata loading (in milliseconds).
* Default: 30000ms (30 seconds)
*/
public static final long DEFAULT_METADATA_LOADING_TIMEOUT_EXTENSION_MS = 30000L;

/**
* Maximum number of batches to process before yielding control to heartbeat operations.
* Default: 10 batches
*/
public static final int DEFAULT_MAX_BATCHES_PER_ITERATION = 10;

/**
* Maximum time to spend processing metadata before yielding control (in milliseconds).
* Default: 50ms
*/
public static final long DEFAULT_MAX_PROCESSING_TIME_MS = 50L;

/**
* Maximum number of publishers to process before yielding control.
* Default: 5 publishers
*/
public static final int DEFAULT_PUBLISHER_BATCH_SIZE = 5;

/**
* Maximum time to spend processing publishers before yielding control (in milliseconds).
* Default: 100ms
*/
public static final long DEFAULT_MAX_PUBLISHER_PROCESSING_TIME_MS = 100L;
}
13 changes: 10 additions & 3 deletions core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -579,9 +579,16 @@ class BrokerLifecycleManager(
trace(s"Scheduling next communication at ${MILLISECONDS.convert(adjustedIntervalNs, NANOSECONDS)} " +
"ms from now.")
val deadlineNs = time.nanoseconds() + adjustedIntervalNs
eventQueue.scheduleDeferred("communication",
new DeadlineFunction(deadlineNs),
new CommunicationEvent())

// Use prepend instead of scheduleDeferred for heartbeat communications to ensure they get priority
// over potentially long-running metadata operations
if (registered) {
eventQueue.prepend(new CommunicationEvent())
} else {
eventQueue.scheduleDeferred("communication",
new DeadlineFunction(deadlineNs),
new CommunicationEvent())
}
}

private class RegistrationTimeoutEvent extends EventQueue.Event {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server;

import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.controller.BrokerHeartbeatManager;
import org.apache.kafka.controller.BrokerIdAndEpoch;
import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeEach;

import static org.junit.jupiter.api.Assertions.*;

/**
* Test to verify that broker heartbeats are not blocked during heavy metadata loading operations.
* This addresses the issue where brokers get fenced during rolling restarts due to heartbeat timeouts.
*/
public class HeartbeatDuringMetadataLoadingTest {

private MockTime time;
private BrokerHeartbeatManager heartbeatManager;
private static final long SESSION_TIMEOUT_NS = 9000L * 1000 * 1000; // 9 seconds in nanoseconds

@BeforeEach
public void setUp() {
time = new MockTime();
LogContext logContext = new LogContext("[HeartbeatTest] ");
heartbeatManager = new BrokerHeartbeatManager(logContext, time, SESSION_TIMEOUT_NS);
}

@Test
public void testHeartbeatTimeoutExtensionDuringMetadataLoading() {
int brokerId = 1;
long brokerEpoch = 100L;

// Register broker
heartbeatManager.register(brokerId, false);

// Touch broker to establish session
heartbeatManager.touch(brokerId, false, 0L);

// Verify broker has valid session
assertTrue(heartbeatManager.hasValidSession(brokerId, brokerEpoch));

// Simulate broker falling behind in metadata (common during rolling restart)
heartbeatManager.touch(brokerId, false, 50L);

// Check if timeout should be extended during metadata loading
assertTrue(heartbeatManager.shouldExtendHeartbeatTimeout(brokerId));

// Advance time but not beyond extended timeout
time.sleep(SESSION_TIMEOUT_NS / 1000000 + 5000); // 5 seconds beyond normal timeout

// Broker should still be considered valid during metadata loading
assertTrue(heartbeatManager.hasValidSession(brokerId, brokerEpoch));
}

@Test
public void testHeartbeatConfigurationConstants() {
// Verify configuration constants are reasonable
assertTrue(HeartbeatConfig.DEFAULT_METADATA_LOADING_TIMEOUT_EXTENSION_MS > 0);
assertTrue(HeartbeatConfig.DEFAULT_MAX_BATCHES_PER_ITERATION > 0);
assertTrue(HeartbeatConfig.DEFAULT_MAX_PROCESSING_TIME_MS > 0);
assertTrue(HeartbeatConfig.DEFAULT_PUBLISHER_BATCH_SIZE > 0);
assertTrue(HeartbeatConfig.DEFAULT_MAX_PUBLISHER_PROCESSING_TIME_MS > 0);

// Verify reasonable defaults
assertEquals(30000L, HeartbeatConfig.DEFAULT_METADATA_LOADING_TIMEOUT_EXTENSION_MS);
assertEquals(10, HeartbeatConfig.DEFAULT_MAX_BATCHES_PER_ITERATION);
assertEquals(50L, HeartbeatConfig.DEFAULT_MAX_PROCESSING_TIME_MS);
assertEquals(5, HeartbeatConfig.DEFAULT_PUBLISHER_BATCH_SIZE);
assertEquals(100L, HeartbeatConfig.DEFAULT_MAX_PUBLISHER_PROCESSING_TIME_MS);
}

@Test
public void testNormalHeartbeatBehaviorUnchanged() {
int brokerId = 2;
long brokerEpoch = 200L;

// Register broker
heartbeatManager.register(brokerId, false);

// Touch broker to establish session
heartbeatManager.touch(brokerId, false, 100L);

// Verify broker has valid session
assertTrue(heartbeatManager.hasValidSession(brokerId, brokerEpoch));

// Advance time beyond normal timeout
time.sleep(SESSION_TIMEOUT_NS / 1000000 + 1000); // 1 second beyond timeout

// For normal operation (not during metadata loading), timeout should work as before
// This test ensures we don't break existing behavior
assertFalse(heartbeatManager.shouldExtendHeartbeatTimeout(999)); // Non-existent broker
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.apache.kafka.controller.BrokerControlState.FENCED;
import static org.apache.kafka.controller.BrokerControlState.SHUTDOWN_NOW;
import static org.apache.kafka.controller.BrokerControlState.UNFENCED;
import static java.util.concurrent.TimeUnit.NANOSECONDS;


/**
Expand Down Expand Up @@ -306,6 +307,24 @@ long lowestActiveOffset() {
BrokerHeartbeatState first = iterator.next();
return first.metadataOffset;
}

/**
* Check if a broker should be considered for fencing based on extended timeout during metadata loading.
* This provides additional grace period during heavy metadata operations.
*/
boolean shouldExtendHeartbeatTimeout(int brokerId) {
BrokerHeartbeatState broker = brokers.get(brokerId);
if (broker == null) {
return false;
}

// Extend timeout if broker is actively catching up with metadata
// This helps prevent fencing during rolling restarts with heavy metadata replication
long currentTime = tracker.time().nanoseconds();
return tracker.hasValidSession(new BrokerIdAndEpoch(brokerId, -1)) &&
!broker.fenced() &&
broker.metadataOffset < lowestActiveOffset();
}

/**
* Mark a broker as being in the controlled shutdown state. We only update the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,14 @@ private void replay(ApiMessageAndVersion record) {
}

private void applyDeltaAndUpdate(MetadataDelta delta, LogDeltaManifest manifest) {
long startTime = time.nanoseconds();
try {
image = delta.apply(manifest.provenance());
long elapsedMs = NANOSECONDS.toMillis(time.nanoseconds() - startTime);
if (elapsedMs > 100) {
log.debug("Delta application took {}ms for offset range {} to {}",
elapsedMs, image.offset(), manifest.provenance().lastContainedOffset());
}
} catch (Throwable e) {
faultHandler.handleFault("Error generating new metadata image from " +
"metadata delta between offset " + image.offset() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,35 @@ private void maybePublishMetadata(MetadataDelta delta, MetadataImage image, Load
if (log.isDebugEnabled()) {
log.debug("handleCommit: publishing new image with provenance {}.", image.provenance());
}

// Process publishers in smaller batches to prevent blocking heartbeats
final int BATCH_SIZE = kafka.server.HeartbeatConfig.DEFAULT_PUBLISHER_BATCH_SIZE;
final long MAX_PROCESSING_TIME_MS = kafka.server.HeartbeatConfig.DEFAULT_MAX_PUBLISHER_PROCESSING_TIME_MS;
long startTime = time.milliseconds();
int processedCount = 0;

for (MetadataPublisher publisher : publishers.values()) {
try {
publisher.onMetadataUpdate(delta, image, manifest);
processedCount++;

// Yield control periodically to prevent blocking heartbeats
if (processedCount % BATCH_SIZE == 0) {
long elapsedTime = time.milliseconds() - startTime;
if (elapsedTime > MAX_PROCESSING_TIME_MS) {
// Schedule remaining publishers to be processed later
log.debug("Yielding metadata processing after {}ms to prevent heartbeat blocking", elapsedTime);
Thread.yield();
startTime = time.milliseconds();
}
}
} catch (Throwable e) {
faultHandler.handleFault("Unhandled error publishing the new metadata " +
"image ending at " + manifest.provenance().lastContainedOffset() +
" with publisher " + publisher.name(), e);
}
}

metrics.updateLastAppliedImageProvenance(image.provenance());
MetadataVersion metadataVersion = image.features().metadataVersionOrThrow();
metrics.setCurrentMetadataVersion(metadataVersion);
Expand Down Expand Up @@ -377,12 +397,29 @@ private void maybePublishMetadata(MetadataDelta delta, MetadataImage image, Load
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
eventQueue.append(() -> {
try (reader) {
long startTime = time.milliseconds();
int batchCount = 0;
final int MAX_BATCHES_PER_ITERATION = kafka.server.HeartbeatConfig.DEFAULT_MAX_BATCHES_PER_ITERATION;
final long MAX_PROCESSING_TIME_MS = kafka.server.HeartbeatConfig.DEFAULT_MAX_PROCESSING_TIME_MS;

while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
loadControlRecords(batch);
long elapsedNs = batchLoader.loadBatch(batch, currentLeaderAndEpoch);
metrics.updateBatchSize(batch.records().size());
metrics.updateBatchProcessingTimeNs(elapsedNs);
batchCount++;

// Yield control periodically to prevent blocking heartbeats
if (batchCount % MAX_BATCHES_PER_ITERATION == 0) {
long elapsedTime = time.milliseconds() - startTime;
if (elapsedTime > MAX_PROCESSING_TIME_MS) {
log.debug("Yielding batch processing after {}ms and {} batches to prevent heartbeat blocking",
elapsedTime, batchCount);
Thread.yield();
startTime = time.milliseconds();
}
}
}
batchLoader.maybeFlushBatches(currentLeaderAndEpoch, true);
} catch (Throwable e) {
Expand Down