Skip to content

Commit 5fa207a

Browse files
committed
use linkedlist as LRU and set a default cache size
1 parent 3702ba1 commit 5fa207a

File tree

2 files changed

+242
-30
lines changed

2 files changed

+242
-30
lines changed

core/src/main/scala/kafka/server/AutoTopicCreationManager.scala

Lines changed: 126 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import org.apache.kafka.common.utils.Time
4141
import scala.collection.{Map, Seq, Set, mutable}
4242
import scala.jdk.CollectionConverters._
4343
import scala.jdk.OptionConverters.RichOptional
44+
import scala.util.control.Breaks._
4445

4546
trait AutoTopicCreationManager {
4647

@@ -71,6 +72,7 @@ case class CachedTopicCreationError(
7172
val timestamp: Long = time.milliseconds()
7273
}
7374

75+
7476
class DefaultAutoTopicCreationManager(
7577
config: KafkaConfig,
7678
channelManager: NodeToControllerChannelManager,
@@ -81,7 +83,20 @@ class DefaultAutoTopicCreationManager(
8183
) extends AutoTopicCreationManager with Logging {
8284

8385
private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
84-
private val topicCreationErrorCache = new ConcurrentHashMap[String, CachedTopicCreationError]()
86+
87+
// Use MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG as the size limit for the error cache
88+
// This provides a reasonable bound (default 1000) to prevent unbounded growth
89+
private val maxCacheSize = config.maxIncrementalFetchSessionCacheSlots
90+
info(s"AutoTopicCreationManager initialized with error cache size limit: $maxCacheSize")
91+
92+
// LRU cache with size limit to prevent unbounded memory growth
93+
private val topicCreationErrorCache = Collections.synchronizedMap(
94+
new java.util.LinkedHashMap[String, CachedTopicCreationError](16, 0.75f, false) {
95+
override def removeEldestEntry(eldest: java.util.Map.Entry[String, CachedTopicCreationError]): Boolean = {
96+
size() > maxCacheSize
97+
}
98+
}
99+
)
85100

86101
/**
87102
* Initiate auto topic creation for the given topics.
@@ -126,38 +141,59 @@ class DefaultAutoTopicCreationManager(
126141
}
127142

128143
if (topics.nonEmpty) {
129-
sendCreateTopicRequest(topics, Some(requestContext))
144+
sendCreateTopicRequestWithErrorCaching(topics, Some(requestContext))
130145
}
131146
}
132147

133148
override def getTopicCreationErrors(
134149
topicNames: Set[String],
135150
errorCacheTtlMs: Long
136151
): Map[String, String] = {
137-
val currentTime = time.milliseconds()
152+
// Proactively expire old entries using the provided TTL
153+
expireOldEntries(errorCacheTtlMs)
154+
138155
val errors = mutable.Map.empty[String, String]
139-
val expiredKeys = mutable.Set.empty[String]
140156

141-
// Check requested topics and collect expired keys
157+
// Check requested topics
142158
topicNames.foreach { topicName =>
143159
Option(topicCreationErrorCache.get(topicName)) match {
144-
case Some(cachedError) if (currentTime - cachedError.timestamp) <= errorCacheTtlMs =>
145-
errors.put(topicName, cachedError.errorMessage)
146-
case Some(_) =>
147-
expiredKeys += topicName
160+
case Some(error) =>
161+
errors.put(topicName, error.errorMessage)
148162
case None =>
149163
}
150164
}
151165

152-
// Remove expired entries
153-
expiredKeys.foreach { key =>
154-
topicCreationErrorCache.remove(key)
155-
debug(s"Removed expired topic creation error cache entry for $key")
156-
}
157-
158166
errors.toMap
159167
}
160168

169+
/**
170+
* Remove expired entries from the cache using the provided TTL.
171+
* Since we use LinkedHashMap with insertion order, we only need to check
172+
* entries from the beginning until we find a non-expired entry.
173+
*/
174+
private def expireOldEntries(ttlMs: Long): Unit = {
175+
val currentTime = time.milliseconds()
176+
177+
// Iterate and remove expired entries
178+
val iterator = topicCreationErrorCache.entrySet().iterator()
179+
180+
breakable {
181+
while (iterator.hasNext) {
182+
val entry = iterator.next()
183+
val cachedError = entry.getValue
184+
185+
if (currentTime - cachedError.timestamp > ttlMs) {
186+
iterator.remove()
187+
debug(s"Removed expired topic creation error cache entry for ${entry.getKey}")
188+
} else {
189+
// Since entries are in insertion order, if this entry is not expired,
190+
// all following entries are also not expired
191+
break()
192+
}
193+
}
194+
}
195+
}
196+
161197
private def sendCreateTopicRequest(
162198
creatableTopics: Map[String, CreatableTopic],
163199
requestContext: Option[RequestContext]
@@ -182,18 +218,11 @@ class DefaultAutoTopicCreationManager(
182218
if (response.authenticationException() != null) {
183219
val authException = response.authenticationException()
184220
warn(s"Auto topic creation failed for ${creatableTopics.keys} with authentication exception: ${authException.getMessage}")
185-
cacheTopicCreationErrors(creatableTopics.keys.toSet, authException.getMessage)
186221
} else if (response.versionMismatch() != null) {
187222
val versionException = response.versionMismatch()
188223
warn(s"Auto topic creation failed for ${creatableTopics.keys} with version mismatch exception: ${versionException.getMessage}")
189-
cacheTopicCreationErrors(creatableTopics.keys.toSet, versionException.getMessage)
190224
} else {
191-
response.responseBody() match {
192-
case createTopicsResponse: CreateTopicsResponse =>
193-
cacheTopicCreationErrorsFromResponse(createTopicsResponse)
194-
case _ =>
195-
debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody}.")
196-
}
225+
debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody}.")
197226
}
198227
}
199228
}
@@ -319,6 +348,80 @@ class DefaultAutoTopicCreationManager(
319348
(creatableTopics, uncreatableTopics)
320349
}
321350

351+
private def sendCreateTopicRequestWithErrorCaching(
352+
creatableTopics: Map[String, CreatableTopic],
353+
requestContext: Option[RequestContext]
354+
): Seq[MetadataResponseTopic] = {
355+
val topicsToCreate = new CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size)
356+
topicsToCreate.addAll(creatableTopics.values.asJavaCollection)
357+
358+
val createTopicsRequest = new CreateTopicsRequest.Builder(
359+
new CreateTopicsRequestData()
360+
.setTimeoutMs(config.requestTimeoutMs)
361+
.setTopics(topicsToCreate)
362+
)
363+
364+
val requestCompletionHandler = new ControllerRequestCompletionHandler {
365+
override def onTimeout(): Unit = {
366+
clearInflightRequests(creatableTopics)
367+
debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
368+
}
369+
370+
override def onComplete(response: ClientResponse): Unit = {
371+
clearInflightRequests(creatableTopics)
372+
if (response.authenticationException() != null) {
373+
val authException = response.authenticationException()
374+
warn(s"Auto topic creation failed for ${creatableTopics.keys} with authentication exception: ${authException.getMessage}")
375+
cacheTopicCreationErrors(creatableTopics.keys.toSet, authException.getMessage)
376+
} else if (response.versionMismatch() != null) {
377+
val versionException = response.versionMismatch()
378+
warn(s"Auto topic creation failed for ${creatableTopics.keys} with version mismatch exception: ${versionException.getMessage}")
379+
cacheTopicCreationErrors(creatableTopics.keys.toSet, versionException.getMessage)
380+
} else {
381+
response.responseBody() match {
382+
case createTopicsResponse: CreateTopicsResponse =>
383+
cacheTopicCreationErrorsFromResponse(createTopicsResponse)
384+
case _ =>
385+
debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody}.")
386+
}
387+
}
388+
}
389+
}
390+
391+
val request = requestContext.map { context =>
392+
val requestVersion =
393+
channelManager.controllerApiVersions.toScala match {
394+
case None =>
395+
// We will rely on the Metadata request to be retried in the case
396+
// that the latest version is not usable by the controller.
397+
ApiKeys.CREATE_TOPICS.latestVersion()
398+
case Some(nodeApiVersions) =>
399+
nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
400+
}
401+
402+
// Borrow client information such as client id and correlation id from the original request,
403+
// in order to correlate the create request with the original metadata request.
404+
val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
405+
requestVersion,
406+
context.clientId,
407+
context.correlationId)
408+
ForwardingManager.buildEnvelopeRequest(context,
409+
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
410+
}.getOrElse(createTopicsRequest)
411+
412+
channelManager.sendRequest(request, requestCompletionHandler)
413+
414+
val creatableTopicResponses = creatableTopics.keySet.toSeq.map { topic =>
415+
new MetadataResponseTopic()
416+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
417+
.setName(topic)
418+
.setIsInternal(Topic.isInternal(topic))
419+
}
420+
421+
info(s"Sent auto-creation request for ${creatableTopics.keys} to the active controller.")
422+
creatableTopicResponses
423+
}
424+
322425
private def cacheTopicCreationErrors(topicNames: Set[String], errorMessage: String): Unit = {
323426
topicNames.foreach { topicName =>
324427
topicCreationErrorCache.put(topicName, CachedTopicCreationError(errorMessage, time))
@@ -337,7 +440,6 @@ class DefaultAutoTopicCreationManager(
337440
)
338441
debug(s"Cached topic creation error for ${topicResult.name()}: $errorMessage")
339442
}
340-
341443
}
342444
}
343445

core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala

Lines changed: 116 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -511,12 +511,122 @@ class AutoTopicCreationManagerTest {
511511
shareCoordinator,
512512
mockTime)
513513

514-
val defaultTtlMs = config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs()
515-
val initialResult = autoTopicCreationManager.getTopicCreationErrors(Set("nonexistent-topic"), defaultTtlMs)
516-
assertTrue(initialResult.isEmpty)
517514

518-
mockTime.sleep(config.groupCoordinatorConfig.classicGroupMaxSessionTimeoutMs + 1000)
519-
val laterResult = autoTopicCreationManager.getTopicCreationErrors(Set("nonexistent-topic"), defaultTtlMs)
520-
assertTrue(laterResult.isEmpty)
515+
// First cache an error by simulating topic creation failure
516+
val topics = Map(
517+
"test-topic" -> new CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
518+
)
519+
val requestContext = initializeRequestContextWithUserPrincipal()
520+
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext)
521+
522+
val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
523+
Mockito.verify(brokerToController).sendRequest(
524+
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
525+
argumentCaptor.capture())
526+
527+
// Simulate a CreateTopicsResponse with error
528+
val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData()
529+
val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
530+
.setName("test-topic")
531+
.setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
532+
.setErrorMessage("Invalid replication factor")
533+
createTopicsResponseData.topics().add(topicResult)
534+
535+
val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData)
536+
val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
537+
val clientResponse = new ClientResponse(header, null, null,
538+
0, 0, false, null, null, createTopicsResponse)
539+
540+
// Cache the error at T0
541+
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
542+
543+
val shortTtlMs = 1000L // Use 1 second TTL for faster testing
544+
545+
// Verify error is cached and accessible within TTL
546+
val cachedErrors = autoTopicCreationManager.getTopicCreationErrors(Set("test-topic"), shortTtlMs)
547+
assertEquals(1, cachedErrors.size)
548+
assertEquals("Invalid replication factor", cachedErrors("test-topic"))
549+
550+
// Advance time beyond TTL
551+
mockTime.sleep(shortTtlMs + 100) // T0 + 1.1 seconds
552+
553+
// Verify error is now expired and proactively cleaned up
554+
val expiredErrors = autoTopicCreationManager.getTopicCreationErrors(Set("test-topic"), shortTtlMs)
555+
assertTrue(expiredErrors.isEmpty, "Expired errors should be proactively cleaned up")
556+
}
557+
558+
@Test
559+
def testErrorCacheLRUEviction(): Unit = {
560+
// Create a config with a small cache size for testing
561+
val props = TestUtils.createBrokerConfig(1)
562+
props.setProperty(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toString)
563+
props.setProperty(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG, "3") // Small cache size for testing
564+
565+
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, internalTopicPartitions.toString)
566+
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, internalTopicPartitions.toString)
567+
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_REPLICATION_FACTOR_CONFIG , internalTopicPartitions.toString)
568+
569+
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString)
570+
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString)
571+
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_NUM_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString)
572+
573+
val smallCacheConfig = KafkaConfig.fromProps(props)
574+
575+
// Verify the configuration was properly set
576+
assertEquals(3, smallCacheConfig.maxIncrementalFetchSessionCacheSlots, "Cache size configuration should be 3")
577+
578+
// Replace the test class's config with our smallCacheConfig
579+
// so that initializeRequestContext will use the correct config
580+
config = smallCacheConfig
581+
582+
val requestContext = initializeRequestContextWithUserPrincipal()
583+
584+
// Create 5 topics to exceed the cache size of 3
585+
val topicNames = (1 to 5).map(i => s"test-topic-$i")
586+
587+
// Add errors for all 5 topics to the cache
588+
topicNames.zipWithIndex.foreach { case (topicName, idx) =>
589+
val topics = Map(
590+
topicName -> new CreatableTopic().setName(topicName).setNumPartitions(1).setReplicationFactor(1)
591+
)
592+
593+
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext)
594+
595+
val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
596+
Mockito.verify(brokerToController, Mockito.atLeastOnce()).sendRequest(
597+
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
598+
argumentCaptor.capture())
599+
600+
// Simulate error response for this topic
601+
val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData()
602+
val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
603+
.setName(topicName)
604+
.setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
605+
.setErrorMessage(s"Topic '$topicName' already exists.")
606+
createTopicsResponseData.topics().add(topicResult)
607+
608+
val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData)
609+
val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
610+
val clientResponse = new ClientResponse(header, null, null,
611+
0, 0, false, null, null, createTopicsResponse)
612+
613+
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
614+
615+
// Advance time slightly between additions to ensure different timestamps
616+
mockTime.sleep(10)
617+
618+
}
619+
620+
// With cache size of 3, topics 1 and 2 should have been evicted
621+
val longTtlMs = 60000L // Use a long TTL to ensure entries aren't expired by time
622+
val cachedErrors = autoTopicCreationManager.getTopicCreationErrors(topicNames.toSet, longTtlMs)
623+
624+
// Only the last 3 topics should be in the cache (topics 3, 4, 5)
625+
assertEquals(3, cachedErrors.size, "Cache should contain only the most recent 3 entries")
626+
assertTrue(cachedErrors.contains("test-topic-3"), "test-topic-3 should be in cache")
627+
assertTrue(cachedErrors.contains("test-topic-4"), "test-topic-4 should be in cache")
628+
assertTrue(cachedErrors.contains("test-topic-5"), "test-topic-5 should be in cache")
629+
assertTrue(!cachedErrors.contains("test-topic-1"), "test-topic-1 should have been evicted")
630+
assertTrue(!cachedErrors.contains("test-topic-2"), "test-topic-2 should have been evicted")
521631
}
522632
}

0 commit comments

Comments
 (0)