Skip to content

Conversation

RaidenE1
Copy link
Contributor

@RaidenE1 RaidenE1 commented Aug 8, 2025

Summary

This PR implements error caching for internal topic creation failures
in Kafka Streams, allowing errors to be surfaced to users via the
Streams group heartbeat status instead of only appearing in broker logs.

Problem

Currently, when internal topic creation fails during Streams group
heartbeat processing, the error messages are only logged in the
broker logs and not exposed to users. As mentioned in the code comments
around KafkaApis.scala:2857-2893, the result of the create topic
call forwarded to the controller is not awaited, so if an internal
topic fails to be created, users cannot see the specific reason for the
failure.

Solution

  1. Error Caching in AutoTopicCreationManager
  • Added CachedTopicCreationError case class to store error messages
    with timestamps - Implemented getTopicCreationErrors() method with:

  • Lazy cleanup: Expired entries are removed during access based on
    configurable TTL - Size limits: Cache is limited to 1000 entries,
    with oldest entries removed when exceeded - TTL based on existing
    config: Uses 3 × request.timeout.ms (default: 90 seconds) for cache
    expiration - Enhanced
    ControllerRequestCompletionHandler.onComplete() to parse
    CreateTopicsResponse and cache errors for failed topics only -
    Added proper resource cleanup in close() method

    1. Integration with KafkaApis
    • Enhanced Streams group heartbeat processing in KafkaApis.scala
  • When MISSING_INTERNAL_TOPICS status is detected, query cached errors
    and append to status details - Only query cache when Group
    Coordinator has already reported missing topics

    1. Lifecycle Management
    • Added autoTopicCreationManager.close() call in
      BrokerServer.shutdown() to ensure proper cleanup

Key Features

  • Thread-safe: Uses ConcurrentHashMap for concurrent access -
    Memory efficient: TTL-based expiration and size limits prevent memory
    leaks - Configurable TTL: Based on existing request.timeout.ms
    configuration (3× multiplier) - Lazy cleanup: No background threads
    needed - cleanup happens during normal operation - Selective caching:
    Only caches actual failures (errorCode != NONE), successful creations
    are ignored - Comprehensive error handling: Handles authentication
    failures, version mismatches, and topic-specific errors - Backward
    compatible: No changes to existing APIs or behavior

Configuration

The error cache TTL is automatically calculated as 3 ×
request.timeout.ms: - Default: 90 seconds (3 × 30s default request
timeout) - Configurable: Adjusts automatically when
request.timeout.ms is modified - Cache size limit: 1000 entries
(hardcoded)

Testing

  • Added comprehensive unit tests for error caching, TTL cleanup, and
    size limit management - Added integration test for KafkaApis to
    verify end-to-end functionality - Updated test cases to use realistic
    TTL values based on test configuration - All existing tests pass
    without modification

Code Changes

  • AutoTopicCreationManager.scala: Added error caching functionality
    (~70 lines) - KafkaApis.scala: Enhanced Streams heartbeat
    processing (~15 lines) - BrokerServer.scala: Added cleanup call in
    shutdown (~2 lines) - Test files: Added comprehensive test coverage
    (~170 lines)

@github-actions github-actions bot added triage PRs from the community core Kafka Broker labels Aug 8, 2025
@RaidenE1 RaidenE1 changed the title add error cache for auto topic creation failure MINOR: add error cache for auto topic creation failure Aug 8, 2025
@RaidenE1 RaidenE1 changed the title MINOR: add error cache for auto topic creation failure MINOR: Expose internal topic creation errors to the user Aug 8, 2025
@mjsax mjsax added streams ci-approved kip Requires or implements a KIP and removed triage PRs from the community labels Aug 11, 2025
@@ -64,6 +77,9 @@ class DefaultAutoTopicCreationManager(
) extends AutoTopicCreationManager with Logging {

private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
private val topicCreationErrorCache = new ConcurrentHashMap[String, CachedTopicCreationError]()
private val errorCacheTtlMs = config.requestTimeoutMs.toLong * 3 // 3x request timeout
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we should couple this to request timeout? -- Might be better to couple it to session timeout? If a client does not heartbeat within session timeout, we would remove it from the group.

Side question: would we need to track error per streams group, and use a group specific ttl, given that each group could set an individual session timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. changed
  2. I think the topic is globally used, so if it's missing then it's missing for all groups

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the topic is globally used, so if it's missing then it's missing for all groups

My understanding was, that AutoTopicCreationManager.scala is used to create internal topics for KS apps. Or course, the manager is used for all groups, but it does create topics which are individual to a specific group.

So if there is two KS applications, and there is an issue creating a topic for application A, we need to ensure to report this error back to application A (and only to application A, but not also to application B).

And we would also buffer error messages for application A with the application A specific session.timeout.ms (and not use session.timeout.ms of application B, for application A errors)?

@@ -64,6 +77,9 @@ class DefaultAutoTopicCreationManager(
) extends AutoTopicCreationManager with Logging {

private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
private val topicCreationErrorCache = new ConcurrentHashMap[String, CachedTopicCreationError]()
private val errorCacheTtlMs = config.requestTimeoutMs.toLong * 3 // 3x request timeout
private val maxCacheSize = 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why we would need to bound the cache size? What is the reasoning for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to make sure not caching too much errors and take up too much memory, but I want to remove it since it might requires a KIP?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we would need a KIP (we would need one, if we would make it configurable). But I would expect error rate to be low. If there is some issue at some point, we can still limit it in the future, and with a TTL we would expires old entries anyway.

// Check requested topics and collect expired keys
topicNames.foreach { topicName =>
Option(topicCreationErrorCache.get(topicName)) match {
case Some(cachedError) if (currentTime - cachedError.timestamp) <= errorCacheTtlMs =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we exclude the error message if we still have it? -- I thought the ttl would apply for the case, that we never returned an error, and want to drop it on the floor, via some cleanup process?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense

Option(topicCreationErrorCache.get(topicName)) match {
case Some(cachedError) if (currentTime - cachedError.timestamp) <= errorCacheTtlMs =>
errors.put(topicName, cachedError.errorMessage)
case Some(_) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are using Some(cacheError) above, would this case actually every be executed (I am not a Scala person, but my understanding is, that Same(cacheError) would be a "catch all"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax The case Some(_) will indeed be executed. In Scala pattern matching:

  1. case Some(cachedError) if (condition) only matches when the guard condition is true
  2. If the condition is false (TTL expired), it falls through to case Some(_)
  3. So we hit the second case when we have a cached error that's expired

The pattern matching is sequential with guard conditions, not a "catch all" for the
first Some(...).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see -- Some(cachedError) if is one statement in Scala. Interesting. Thanks for clarifying.

case Some(cachedError) if (currentTime - cachedError.timestamp) <= errorCacheTtlMs =>
errors.put(topicName, cachedError.errorMessage)
case Some(_) =>
expiredKeys += topicName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand this logic? I though we would expire an entry, if we never returned it to the client, and if TTL passed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being ttl passed in enough I assume.
@RaidenE1 please address this comment as well.

debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody}.")
response.responseBody() match {
case createTopicsResponse: CreateTopicsResponse =>
cacheTopicCreationErrorsFromResponse(createTopicsResponse)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought, for this else branch, the request was successful and no error would be returned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that method I check the error code, only cache if the code is not 0. I think we can definitely moved it out

Copy link
Contributor

@aliehsaeedii aliehsaeedii Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think no need to add the success case to the error cache. Not optimized + misleading code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see you check the error before putting it in cache. Mmm, hard to say. As the above if/else blocks may not cover the entire error cases, let's keep it.

transactionCoordinator,
shareCoordinator)

// Manually add an expired entry to the cache using reflection
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you need to use reflection as you cannot modify the time? If we use MockTime in the test, we should be able to avoid reflection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use mock time and remove reflection

assertEquals(Errors.NONE.code, response.data.errorCode())
assertEquals(null, response.data.errorMessage())

// Verify that the cached error was appended to the existing status detail
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we really verifying this? It seem our test code, does assemble the StreamsGroupHeartbeatResponseData, so we don't really execute prod code? So are we only verifying that our test code does setup the right response? For this case, it seems the test would not actually test anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added mock create topic

@@ -10949,6 +10950,59 @@ class KafkaApisTest extends Logging {
)
}

@Test
def testStreamsGroupHeartbeatRequestWithCachedTopicCreationErrors(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little unclear to me, what this method is supposed to actually verify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it tests that:

  1. Cached topic creation errors are retrieved and appended to the response
  2. The final status detail contains both the original "missing topics" message and the cached error details
  3. The format is: "Internal topics are missing: [test-topic]; Creation failed: test-topic (INVALID_REPLICATION_FACTOR)."

) extends AutoTopicCreationManager with Logging {

private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
private val topicCreationErrorCache = new ConcurrentHashMap[String, CachedTopicCreationError]()
// Use session timeout instead of request timeout for better semantic alignment with client lifecycle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add instead of...?

response.responseBody() match {
case createTopicsResponse: CreateTopicsResponse =>
cacheTopicCreationErrorsFromResponse(createTopicsResponse)
case _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What sort of response could it be?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is not CreateTopicsResponse, why the log message is about creating topic?!

)
debug(s"Cached topic creation error for ${topicResult.name()}: $errorMessage")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any log message here? topic creates successfully...

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good start to me! Left some high-level comments.

) extends AutoTopicCreationManager with Logging {

private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
private val topicCreationErrorCache = new ConcurrentHashMap[String, CachedTopicCreationError]()
// Use session timeout for better semantic alignment with client lifecycle
private val errorCacheTtlMs = config.groupCoordinatorConfig.classicGroupMaxSessionTimeoutMs.toLong
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd want to pass this in from the outside. Session timeouts can be overwritten for a specific group, so we need to use groupConfigManager which is present in KafkaApis.

        Optional<GroupConfig> groupConfig = groupConfigManager.groupConfig(groupId);
        return groupConfig.map(GroupConfig::streamsSessionTimeoutMs)
            .orElse(config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs());

I would also re-fetch this value every time we create an error message, since the session timeout may change over time. So better pass this into createStreamsInternalTopics.

topicCreationErrorCache.remove(key)
debug(s"Removed expired topic creation error cache entry for $key")
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need to make sure that eventually, errors are evicted even if we do not receive another topic creation request. I wonder if we shouldn't build an expiring cache for that implements org.apache.kafka.common.cache.Cache. It could use a LinkedHashMap similar LRUCache and every time we insert or get from the cache, we try to expire the last elements of the LinkedHashMap. WDYT?

@RaidenE1 RaidenE1 force-pushed the expose-internal-topic-error branch from 5fa207a to 511b517 Compare September 2, 2025 02:03
@lucasbru lucasbru requested a review from Copilot September 2, 2025 08:37
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements error caching for internal topic creation failures in Kafka Streams, allowing users to see specific topic creation errors through the Streams group heartbeat status instead of only in broker logs.

Key changes include:

  • Added error caching functionality to AutoTopicCreationManager with TTL-based expiration and size limits
  • Enhanced Streams group heartbeat processing in KafkaApis to append cached topic creation errors to status details
  • Added proper resource cleanup in broker shutdown

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
AutoTopicCreationManager.scala Added error caching with TTL expiration, size limits, and error retrieval methods
KafkaApis.scala Enhanced Streams heartbeat processing to query and append cached topic creation errors
BrokerServer.scala Added cleanup call for AutoTopicCreationManager during shutdown
AutoTopicCreationManagerTest.scala Added comprehensive tests for error caching, TTL, and LRU eviction
KafkaApisTest.scala Added integration test for end-to-end topic creation error handling

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines 68 to 71
errorMessage: String,
time: Time
) {
val timestamp: Long = time.milliseconds()
Copy link
Preview

Copilot AI Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timestamp should be captured when the error is created, not when accessed. The current implementation captures the timestamp on object initialization, but since time.milliseconds() is called every time the case class is instantiated, this could lead to inconsistent timestamps if the Time instance is mutable or if multiple instances share the same Time object.

Suggested change
errorMessage: String,
time: Time
) {
val timestamp: Long = time.milliseconds()
errorMessage: String
) {
val timestamp: Long = System.currentTimeMillis()

Copilot uses AI. Check for mistakes.

Comment on lines 166 to 179

breakable {
while (iterator.hasNext) {
val entry = iterator.next()
val cachedError = entry.getValue

if (currentTime - cachedError.timestamp > ttlMs) {
iterator.remove()
debug(s"Removed expired topic creation error cache entry for ${entry.getKey}")
} else {
// Since entries are in insertion order, if this entry is not expired,
// all following entries are also not expired
break()
}
Copy link
Preview

Copilot AI Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assumption that entries are in insertion order and that if one entry is not expired, all following entries are also not expired is incorrect. The LinkedHashMap is configured with accessOrder = false (line 94), meaning it maintains insertion order, but entries can have different timestamps based on when they were inserted. Breaking early could leave expired entries in the cache.

Suggested change
breakable {
while (iterator.hasNext) {
val entry = iterator.next()
val cachedError = entry.getValue
if (currentTime - cachedError.timestamp > ttlMs) {
iterator.remove()
debug(s"Removed expired topic creation error cache entry for ${entry.getKey}")
} else {
// Since entries are in insertion order, if this entry is not expired,
// all following entries are also not expired
break()
}
while (iterator.hasNext) {
val entry = iterator.next()
val cachedError = entry.getValue
if (currentTime - cachedError.timestamp > ttlMs) {
iterator.remove()
debug(s"Removed expired topic creation error cache entry for ${entry.getKey}")

Copilot uses AI. Check for mistakes.

Comment on lines 2909 to 2911
missingInternalTopicStatus.get().setStatusDetail(
missingInternalTopicStatus.get().statusDetail() + s"; Creation failed: $creationErrorDetails."
)
Copy link
Preview

Copilot AI Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential null pointer exception if statusDetail() returns null. The concatenation should handle the case where the existing status detail is null.

Copilot uses AI. Check for mistakes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check if this can happen or handle this

Comment on lines +540 to +542
// Replace the test class's config with our smallCacheConfig
// so that initializeRequestContext will use the correct config
config = smallCacheConfig
Copy link
Preview

Copilot AI Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifying the test class's config field directly could affect other tests if they run in the same instance. Consider creating a separate AutoTopicCreationManager instance with the small cache config instead of modifying the shared config field.

Copilot uses AI. Check for mistakes.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a pass on the production code and left some comments.

@@ -53,17 +55,47 @@ trait AutoTopicCreationManager {
requestContext: RequestContext
): Unit

def getTopicCreationErrors(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getStreamsInternalTopicCreationErrors?


// Use MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG as the size limit for the error cache
// This provides a reasonable bound (default 1000) to prevent unbounded growth
private val maxCacheSize = config.maxIncrementalFetchSessionCacheSlots
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using that config seems a bit random. I would just hard-code it.

// Use MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG as the size limit for the error cache
// This provides a reasonable bound (default 1000) to prevent unbounded growth
private val maxCacheSize = config.maxIncrementalFetchSessionCacheSlots
info(s"AutoTopicCreationManager initialized with error cache size limit: $maxCacheSize")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the extra logging

private val topicCreationErrorCache = Collections.synchronizedMap(
new java.util.LinkedHashMap[String, CachedTopicCreationError](16, 0.75f, false) {
override def removeEldestEntry(eldest: java.util.Map.Entry[String, CachedTopicCreationError]): Boolean = {
size() > maxCacheSize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of expected you to implement a new subclass of Cache for this. I think we can also do it this way.

Could we also remove the eldest entry if it is expired?

} else {
if (response.hasResponse) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you remove this code? I would revert all changes in sendCreateTopicRequest

.setIsInternal(Topic.isInternal(topic))
}

info(s"Sent auto-creation request for ${creatableTopics.keys} to the active controller.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this info log, or set it to debug level. We need to be careful to not create to many log messages that will spam the kafka logs with not so relevant information.

}

case class CachedTopicCreationError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this public or private? Can we make this a purely internal thing inside the topic creation manager, since it does not appear in the interface?


override def getTopicCreationErrors(
topicNames: Set[String],
errorCacheTtlMs: Long
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing the TTL here is incorrect. The TTL is defined group-specific, so you are expiring topic creation errors for one group with the TTL for a different group.

I think we need to pass the errorCacheTTL into createStreamsInternalTopics, and store the expiry time instead of the error receival timestamp in CachedTopicCreationError.

* Since we use LinkedHashMap with insertion order, we only need to check
* entries from the beginning until we find a non-expired entry.
*/
private def expireOldEntries(ttlMs: Long): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the expiration won't work if we have different TTLs for different groups, right?

Since the entries will not expire in insertion order.

I think we may need a priority queue and hashmap to solve this correctly. We should probably put this into a little helper class that is synchronized.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made another pass on this

shareCoordinator: ShareCoordinator
shareCoordinator: ShareCoordinator,
time: Time,
cacheCapacity: Int = 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: topicErrorCacheCapacity

@@ -122,9 +232,11 @@ class DefaultAutoTopicCreationManager(
override def onComplete(response: ClientResponse): Unit = {
clearInflightRequests(creatableTopics)
if (response.authenticationException() != null) {
warn(s"Auto topic creation failed for ${creatableTopics.keys} with authentication exception")
val authException = response.authenticationException()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lets not change this function at all, it's not related to what we are doing.

val sessionTimeoutMs = Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null))
.map(_.streamsSessionTimeoutMs().toLong)
.getOrElse(config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs().toLong)
val expirationTimeMs = time.milliseconds() + sessionTimeoutMs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things about this

  • I believe we said that we want to use heartbeatIntervalMs * 2 for the timeout, to make sure a client sees the topic creation errors.
  • I would pass an timeoutMs (a duration), not the expirationTimeMs (a time). We want to start the timeout from the point we receive the response, not here.

Comment on lines 2909 to 2911
missingInternalTopicStatus.get().setStatusDetail(
missingInternalTopicStatus.get().statusDetail() + s"; Creation failed: $creationErrorDetails."
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check if this can happen or handle this

}

def cleanupExpired(currentTimeMs: Long): Unit = {
lock.lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not private? Isn't this called from places where we already own the lock?

@lucasbru lucasbru self-assigned this Sep 4, 2025
private val expiryQueue = new java.util.PriorityQueue[Entry](11, new java.util.Comparator[Entry] {
override def compare(a: Entry, b: Entry): Int = java.lang.Long.compare(a.expirationTimeMs, b.expirationTimeMs)
})
private val lock = new ReentrantLock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make byTopic a ConcurrentHashMap and use this lock as a write lock only?
That is, make the read path lock contention free? That would mean we can only expire on the put path, which should be fine. However, then we may read expired entries when getting from the map, so in get you need to check if the returned entry is expired before returning it.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some more comments.

I wonder if it wouldn't make sense to have some dedicaed unit tests for the ErrorExpiryCache? It's mightily complicated.

// Create a config with a small cache size for testing
val props = TestUtils.createBrokerConfig(1)
props.setProperty(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toString)
props.setProperty(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG, "3") // Small cache size for testing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we are doing this

val smallCacheConfig = KafkaConfig.fromProps(props)

// Verify the configuration was properly set
assertEquals(3, smallCacheConfig.maxIncrementalFetchSessionCacheSlots, "Cache size configuration should be 3")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we doing this

val existing = byTopic.get(topicName)
if (existing != null) {
// Remove old instance from structures
expiryQueue.remove(existing)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This remove is a linear time operation, right? I think we should avoid that. I think it may be fine to just leave it in the expiryQueue, since once it expired, we will no deletethe key from the map if the new value was replaced.

while (!expiryQueue.isEmpty && expiryQueue.peek().expirationTimeMs <= currentTimeMs) {
val expired = expiryQueue.poll()
val current = byTopic.get(expired.topicName)
if (current != null && (current eq expired)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is eq doing a deep comparison here? Maybe it would be enough to compare the timestamps, the deep comparison is expensive

}

// Enforce capacity by removing entries with earliest expiration time first
while (byTopic.size() > maxSize && !expiryQueue.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you merge this loop into the loop above by just checking the condition

!expiryQueue.isEmpty && (expiryQueue.peek().expirationTimeMs <= currentTimeMs || byTopic.size() > maxSize)

in the while loop?

@@ -2888,10 +2888,35 @@ class KafkaApis(val requestChannel: RequestChannel,
)
}
} else {
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, requestContext);
// Compute group-specific timeout for caching errors (2 * heartbeat interval)
val heartbeatIntervalMs = Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TTL is calculated as 2 × heartbeat interval, but the PR description mentions 3 × request.timeout.ms. This inconsistency could be confusing. Can you please fix the PR description? I would keep it much shorter and less AI generated so that it is easier to keep up-to-date.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker kip Requires or implements a KIP streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants