From 7e6a54629d47b4022bdc35cef1ea909ca6ef7fef Mon Sep 17 00:00:00 2001 From: German Osin Date: Wed, 3 Sep 2025 14:03:23 +0200 Subject: [PATCH 1/2] BE: Improved speed of consumer groups requets issue #1245 --- .../kafbat/ui/config/ClustersProperties.java | 13 ++++ .../ui/service/AdminClientServiceImpl.java | 6 +- .../ui/service/ConsumerGroupService.java | 61 +++++++++++++------ .../ui/service/ReactiveAdminClient.java | 21 ++++--- 4 files changed, 71 insertions(+), 30 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index 8d5be375a..f0ced15e0 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -42,6 +42,19 @@ public class ClustersProperties { CacheProperties cache = new CacheProperties(); + AdminClient adminClient = new AdminClient(); + + @Data + public static class AdminClient { + Integer timeout; + int describeConsumerGroupsPartitionSize = 50; + int describeConsumerGroupsConcurrency = 4; + int listConsumerGroupOffsetsPartitionSize = 50; + int listConsumerGroupOffsetsConcurrency = 4; + int getTopicsConfigPartitionSize = 200; + int describeTopicsPartitionSize = 200; + } + @Data public static class Cluster { @NotBlank(message = "field name for for cluster could not be blank") diff --git a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java index 1e7ee53fb..a2cf96578 100644 --- a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java +++ b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java @@ -27,8 +27,10 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable { private final Map adminClientCache = new ConcurrentHashMap<>(); private final int clientTimeout; + private final ClustersProperties clustersProperties; public AdminClientServiceImpl(ClustersProperties clustersProperties) { + this.clustersProperties = clustersProperties; this.clientTimeout = Optional.ofNullable(clustersProperties.getAdminClientTimeout()) .orElse(DEFAULT_CLIENT_TIMEOUT_MS); } @@ -53,7 +55,9 @@ private Mono createAdminClient(KafkaCluster cluster) { ); return AdminClient.create(properties); }).subscribeOn(Schedulers.boundedElastic()) - .flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close())) + .flatMap(ac -> ReactiveAdminClient.create(ac, clustersProperties.getAdminClient()) + .doOnError(th -> ac.close()) + ) .onErrorMap(th -> new IllegalStateException( "Error while creating AdminClient for the cluster " + cluster.getName(), th)); } diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index 5fda6d4ce..ec3bc7ff9 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -20,6 +20,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.function.Predicate; import java.util.function.ToIntFunction; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -67,27 +69,46 @@ private Mono> getConsumerGroups( public Mono> getConsumerGroupsForTopic(KafkaCluster cluster, String topic) { return adminClientService.get(cluster) - // 1. getting topic's end offsets .flatMap(ac -> ac.listTopicOffsets(topic, OffsetSpec.latest(), false) - .flatMap(endOffsets -> { - var tps = new ArrayList<>(endOffsets.keySet()); - // 2. getting all consumer groups - return describeConsumerGroups(ac) - .flatMap((List groups) -> { - // 3. trying to find committed offsets for topic - var groupNames = groups.stream().map(ConsumerGroupDescription::groupId).toList(); - return ac.listConsumerGroupOffsets(groupNames, tps).map(offsets -> - groups.stream() - // 4. keeping only groups that relates to topic - .filter(g -> isConsumerGroupRelatesToTopic(topic, g, offsets.containsRow(g.groupId()))) - .map(g -> - // 5. constructing results - InternalTopicConsumerGroup.create(topic, g, offsets.row(g.groupId()), endOffsets)) - .toList() - ); - } - ); - })); + .flatMap(endOffsets -> + describeConsumerGroups(ac).flatMap(groups -> + filterConsumerGroups(ac, groups, topic, endOffsets) + ) + ) + ); + } + + private Mono> filterConsumerGroups( + ReactiveAdminClient ac, + List groups, + String topic, + Map endOffsets) { + List partitions = new ArrayList<>(endOffsets.keySet()); + + Set inactiveStates = Set.of( + ConsumerGroupState.DEAD, + ConsumerGroupState.EMPTY + ); + + Map> partitioned = groups.stream().collect( + Collectors.partitioningBy((g) -> !inactiveStates.contains(g.state())) + ); + + List stable = partitioned.get(true).stream() + .filter(g -> isConsumerGroupRelatesToTopic(topic, g, false)) + .toList(); + + List filtered = new ArrayList<>(); + filtered.addAll(stable); + filtered.addAll(partitioned.get(false)); + + List groupIds = filtered.stream().map(ConsumerGroupDescription::groupId).toList(); + return ac.listConsumerGroupOffsets(groupIds, partitions).map(offsets -> + filtered.stream().filter(g -> + isConsumerGroupRelatesToTopic(topic, g, offsets.containsRow(g.groupId())) + ).map(g -> + InternalTopicConsumerGroup.create(topic, g, offsets.row(g.groupId()), endOffsets) + ).toList()); } private boolean isConsumerGroupRelatesToTopic(String topic, diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index 0efe5e827..17e8931b3 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -9,6 +9,7 @@ import com.google.common.collect.ImmutableTable; import com.google.common.collect.Iterables; import com.google.common.collect.Table; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.exception.IllegalEntityStateException; import io.kafbat.ui.exception.NotFoundException; import io.kafbat.ui.exception.ValidationException; @@ -88,7 +89,6 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.quota.ClientQuotaFilter; -import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.resource.ResourcePatternFilter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -190,9 +190,11 @@ private static Mono extract(AdminClient ac) { } } - public static Mono create(AdminClient adminClient) { + public static Mono create(AdminClient adminClient, ClustersProperties.AdminClient properties) { Mono configRelatedInfoMono = ConfigRelatedInfo.extract(adminClient); - return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info)); + return configRelatedInfoMono.map(info -> + new ReactiveAdminClient(adminClient, configRelatedInfoMono, properties, info) + ); } @@ -235,6 +237,7 @@ public static Mono toMono(KafkaFuture future) { @Getter(AccessLevel.PACKAGE) // visible for testing private final AdminClient client; private final Mono configRelatedInfoMono; + private final ClustersProperties.AdminClient properties; private volatile ConfigRelatedInfo configRelatedInfo; @@ -280,7 +283,7 @@ public Mono>> getTopicsConfig(Collection t // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count return partitionCalls( topicNames, - 200, + properties.getGetTopicsConfigPartitionSize(), part -> getTopicsConfigImpl(part, includeDocFixed), mapMerger() ); @@ -348,7 +351,7 @@ public Mono> describeTopics(Collection top // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count return partitionCalls( topics, - 200, + properties.getDescribeTopicsPartitionSize(), this::describeTopicsImpl, mapMerger() ); @@ -517,8 +520,8 @@ public Mono> listConsumerGroups() { public Mono> describeConsumerGroups(Collection groupIds) { return partitionCalls( groupIds, - 25, - 4, + properties.getDescribeConsumerGroupsPartitionSize(), + properties.getDescribeConsumerGroupsConcurrency(), ids -> toMono(client.describeConsumerGroups(ids).all()), mapMerger() ); @@ -541,8 +544,8 @@ public Mono> listConsumerGroupOffsets(List>> merged = partitionCalls( consumerGroups, - 25, - 4, + properties.getListConsumerGroupOffsetsPartitionSize(), + properties.getListConsumerGroupOffsetsConcurrency(), call, mapMerger() ); From 9f4bf6fe69ada432eabadec6b47ae6f42776f5b0 Mon Sep 17 00:00:00 2001 From: German Osin Date: Mon, 15 Sep 2025 11:21:18 +0200 Subject: [PATCH 2/2] Use cache to speedup consumers query --- .../ui/service/ConsumerGroupService.java | 79 ++++++- .../ui/service/ConsumerGroupServiceTest.java | 212 ++++++++++++++++++ 2 files changed, 282 insertions(+), 9 deletions(-) create mode 100644 api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index 9c4835b23..ae2a81d6f 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -8,8 +8,11 @@ import io.kafbat.ui.model.InternalConsumerGroup; import io.kafbat.ui.model.InternalTopicConsumerGroup; import io.kafbat.ui.model.KafkaCluster; +import io.kafbat.ui.model.ServerStatusDTO; import io.kafbat.ui.model.SortOrderDTO; +import io.kafbat.ui.model.Statistics; import io.kafbat.ui.service.index.ConsumerGroupFilter; +import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState; import io.kafbat.ui.service.rbac.AccessControlService; import io.kafbat.ui.util.ApplicationMetrics; import io.kafbat.ui.util.KafkaClientSslPropertiesUtil; @@ -19,9 +22,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; -import java.util.function.Predicate; import java.util.function.ToIntFunction; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -43,6 +46,7 @@ public class ConsumerGroupService { private final AdminClientService adminClientService; private final AccessControlService accessControlService; private final ClustersProperties clustersProperties; + private final StatisticsCache statisticsCache; private Mono> getConsumerGroups( ReactiveAdminClient ac, @@ -71,19 +75,19 @@ public Mono> getConsumerGroupsForTopic(KafkaClu return adminClientService.get(cluster) .flatMap(ac -> ac.listTopicOffsets(topic, OffsetSpec.latest(), false) .flatMap(endOffsets -> - describeConsumerGroups(ac).flatMap(groups -> - filterConsumerGroups(ac, groups, topic, endOffsets) + describeConsumerGroups(cluster, ac, true).flatMap(groups -> + filterConsumerGroups(cluster, ac, groups, topic, endOffsets) ) ) ); } private Mono> filterConsumerGroups( + KafkaCluster cluster, ReactiveAdminClient ac, List groups, String topic, Map endOffsets) { - List partitions = new ArrayList<>(endOffsets.keySet()); Set inactiveStates = Set.of( ConsumerGroupState.DEAD, @@ -98,9 +102,26 @@ private Mono> filterConsumerGroups( .filter(g -> isConsumerGroupRelatesToTopic(topic, g, false)) .toList(); - List filtered = new ArrayList<>(); + List dead = partitioned.get(false); + if (!dead.isEmpty()) { + Statistics statistics = statisticsCache.get(cluster); + if (statistics.getStatus().equals(ServerStatusDTO.ONLINE)) { + Map consumerGroupsStates = + statistics.getClusterState().getConsumerGroupsStates(); + dead = dead.stream().filter(g -> + Optional.ofNullable(consumerGroupsStates.get(g.groupId())) + .map(s -> + s.committedOffsets().keySet().stream().anyMatch(tp -> tp.topic().equals(topic)) + ).orElse(false) + ).toList(); + } + } + + List filtered = new ArrayList<>(stable.size() + dead.size()); filtered.addAll(stable); - filtered.addAll(partitioned.get(false)); + filtered.addAll(dead); + + List partitions = new ArrayList<>(endOffsets.keySet()); List groupIds = filtered.stream().map(ConsumerGroupDescription::groupId).toList(); return ac.listConsumerGroupOffsets(groupIds, partitions).map(offsets -> @@ -229,12 +250,52 @@ private Stream sortAndPaginate(Collection collection, .limit(perPage); } - private Mono> describeConsumerGroups(ReactiveAdminClient ac) { + private Mono> describeConsumerGroups( + KafkaCluster cluster, + ReactiveAdminClient ac, + boolean cache) { return ac.listConsumerGroupNames() - .flatMap(ac::describeConsumerGroups) - .map(cgs -> new ArrayList<>(cgs.values())); + .flatMap(names -> describeConsumerGroups(names, cluster, ac, cache)); } + private Mono> describeConsumerGroups( + List groupNames, + KafkaCluster cluster, + ReactiveAdminClient ac, + boolean cache) { + + Statistics statistics = statisticsCache.get(cluster); + + if (cache && statistics.getStatus().equals(ServerStatusDTO.ONLINE)) { + List result = new ArrayList<>(); + List notFound = new ArrayList<>(); + Map consumerGroupsStates = + statistics.getClusterState().getConsumerGroupsStates(); + for (String groupName : groupNames) { + ScrapedClusterState.ConsumerGroupState consumerGroupState = consumerGroupsStates.get(groupName); + if (consumerGroupState != null) { + result.add(consumerGroupState.description()); + } else { + notFound.add(groupName); + } + } + if (!notFound.isEmpty()) { + return ac.describeConsumerGroups(notFound) + .map(descriptions -> { + result.addAll(descriptions.values()); + return result; + }); + } else { + return Mono.just(result); + } + } else { + return ac.describeConsumerGroups(groupNames) + .map(descriptions -> List.copyOf(descriptions.values())); + } + } + + + private Mono> loadDescriptionsByInternalConsumerGroups( ReactiveAdminClient ac, diff --git a/api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java b/api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java new file mode 100644 index 000000000..f81ed0d93 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java @@ -0,0 +1,212 @@ +package io.kafbat.ui.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.collect.ImmutableTable; +import io.kafbat.ui.config.ClustersProperties; +import io.kafbat.ui.model.InternalTopicConsumerGroup; +import io.kafbat.ui.model.KafkaCluster; +import io.kafbat.ui.model.Metrics; +import io.kafbat.ui.model.ServerStatusDTO; +import io.kafbat.ui.model.Statistics; +import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState; +import io.kafbat.ui.service.rbac.AccessControlService; +import java.time.Instant; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.admin.MemberAssignment; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import reactor.core.publisher.Mono; + +class ConsumerGroupServiceTest { + + @Test + void getConsumerGroupsForTopicConsumerGroups() { + // given + ClustersProperties.Cluster clusterProperties = new ClustersProperties.Cluster(); + clusterProperties.setName("test"); + + ClustersProperties clustersProperties = new ClustersProperties(); + clustersProperties.getClusters().add(clusterProperties); + + + KafkaCluster cluster = KafkaCluster.builder() + .name("test") + .originalProperties(clusterProperties) + .build(); + + ReactiveAdminClient client = Mockito.mock(ReactiveAdminClient.class); + AdminClientService admin = Mockito.mock(AdminClientService.class); + Mockito.when(admin.get(cluster)).thenReturn(Mono.just(client)); + + final String topic = UUID.randomUUID().toString(); + final String anotherTopic = UUID.randomUUID().toString(); + + Map consumersWithTopic = + Stream.generate(() -> generate( + List.of(new TopicPartition(topic, 0)), + Map.of(new TopicPartition(topic, 0), 100L), + ConsumerGroupState.DEAD + )).limit(10).collect(Collectors.toMap( + ScrapedClusterState.ConsumerGroupState::group, + s -> s + )); + + Map consumersWithoutTopic = + Stream.generate(() -> generate( + List.of(new TopicPartition(anotherTopic, 0)), + Map.of(new TopicPartition(anotherTopic, 0), 100L), + ConsumerGroupState.DEAD + )).limit(10).collect(Collectors.toMap( + ScrapedClusterState.ConsumerGroupState::group, + s -> s + )); + + Map stableConsumersWithTopic = + Stream.generate(() -> generate( + List.of(new TopicPartition(topic, 0)), + Map.of(new TopicPartition(topic, 0), 100L), + ConsumerGroupState.STABLE + )).limit(10).collect(Collectors.toMap( + ScrapedClusterState.ConsumerGroupState::group, + s -> s + )); + + Map stableConsumersWithoutTopic = + Stream.generate(() -> generate( + List.of(new TopicPartition(anotherTopic, 0)), + Map.of(new TopicPartition(anotherTopic, 0), 100L), + ConsumerGroupState.STABLE + )).limit(10).collect(Collectors.toMap( + ScrapedClusterState.ConsumerGroupState::group, + s -> s + )); + + Map consumerGroupStates = new HashMap<>(); + consumerGroupStates.putAll(consumersWithTopic); + consumerGroupStates.putAll(consumersWithoutTopic); + consumerGroupStates.putAll(stableConsumersWithTopic); + consumerGroupStates.putAll(stableConsumersWithoutTopic); + + Mockito.when(client.listConsumerGroups()).thenReturn(Mono.just( + consumerGroupStates.keySet() + .stream() + .map(s -> new ConsumerGroupListing(s, false)) + .toList() + )); + + Mockito.when(client.listConsumerGroupNames()).thenReturn(Mono.just( + List.copyOf(consumerGroupStates.keySet()) + )); + + Mockito.when(client.listTopicOffsets(Mockito.eq(topic), Mockito.any(), Mockito.eq(false))) + .thenReturn(Mono.just(Map.of(new TopicPartition(topic, 0), 100L))); + + Mockito.when(client.describeConsumerGroups( + Mockito.any()) + ).thenReturn( + Mono.just( + consumerGroupStates.values().stream() + .collect(Collectors.toMap( + ScrapedClusterState.ConsumerGroupState::group, + ScrapedClusterState.ConsumerGroupState::description + )) + ) + ); + + Mockito.when(client.listConsumerGroupOffsets(Mockito.any(), Mockito.any())).thenAnswer( + a -> { + List groupIds = (List) a.getArgument(0); + var table = ImmutableTable.builder(); + for (String groupId : groupIds) { + ScrapedClusterState.ConsumerGroupState state = consumerGroupStates.get(groupId); + for (Map.Entry entry : state.committedOffsets().entrySet()) { + table.put(groupId, entry.getKey(), entry.getValue()); + } + } + return Mono.just(table.build()); + } + ); + + ScrapedClusterState state = ScrapedClusterState.builder() + .scrapeFinishedAt(Instant.now()) + .nodesStates(Map.of()) + .topicStates(Map.of()) + .consumerGroupsStates(consumerGroupStates) + .build(); + + Statistics statistics = Statistics.builder() + .status(ServerStatusDTO.ONLINE) + .version("Unknown") + .features(List.of()) + .clusterDescription(ReactiveAdminClient.ClusterDescription.empty()) + .metrics(Metrics.empty()) + .clusterState(state) + .build(); + + StatisticsCache cache = Mockito.mock(StatisticsCache.class); + Mockito.when(cache.get(cluster)).thenReturn(statistics); + + AccessControlService acl = Mockito.mock(AccessControlService.class); + ConsumerGroupService consumerGroupService = + new ConsumerGroupService(admin, acl, clustersProperties, cache); + + // should + List groups = + consumerGroupService.getConsumerGroupsForTopic(cluster, topic).block(); + + assertThat(groups).size().isEqualTo( + consumersWithTopic.size() + stableConsumersWithTopic.size() + ); + + List resultedGroupIds = groups.stream().map(InternalTopicConsumerGroup::getGroupId).toList(); + assertThat(resultedGroupIds).containsAll(consumersWithTopic.keySet()); + + assertThat(resultedGroupIds).containsAll(stableConsumersWithTopic.keySet()); + } + + private ScrapedClusterState.ConsumerGroupState generate( + List topicPartitions, + Map lastOffsets, + ConsumerGroupState state + ) { + final String name = UUID.randomUUID().toString(); + Map commited = topicPartitions.stream() + .map(tp -> Map.entry(tp, lastOffsets.get(tp) - 1)) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue + )); + + List members = state.equals(ConsumerGroupState.STABLE) ? List.of( + new MemberDescription( + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + "localhost", + new MemberAssignment(new HashSet<>(topicPartitions)) + ) + ) : List.of(); + + return new ScrapedClusterState.ConsumerGroupState( + name, + new ConsumerGroupDescription( + name, + false, + members, "", + state, + null + ), commited + ); + } +}