Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/src/main/java/io/kafbat/ui/config/ClustersProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ public class ClustersProperties {
CacheProperties cache = new CacheProperties();
ClusterFtsProperties fts = new ClusterFtsProperties();

AdminClient adminClient = new AdminClient();

@Data
public static class AdminClient {
Integer timeout;
int describeConsumerGroupsPartitionSize = 50;
int describeConsumerGroupsConcurrency = 4;
int listConsumerGroupOffsetsPartitionSize = 50;
int listConsumerGroupOffsetsConcurrency = 4;
int getTopicsConfigPartitionSize = 200;
int describeTopicsPartitionSize = 200;
}

@Data
public static class Cluster {
@NotBlank(message = "field name for for cluster could not be blank")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable {

private final Map<String, ReactiveAdminClient> adminClientCache = new ConcurrentHashMap<>();
private final int clientTimeout;
private final ClustersProperties clustersProperties;

public AdminClientServiceImpl(ClustersProperties clustersProperties) {
this.clustersProperties = clustersProperties;
this.clientTimeout = Optional.ofNullable(clustersProperties.getAdminClientTimeout())
.orElse(DEFAULT_CLIENT_TIMEOUT_MS);
}
Expand All @@ -53,7 +55,9 @@ private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
);
return AdminClient.create(properties);
}).subscribeOn(Schedulers.boundedElastic())
.flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close()))
.flatMap(ac -> ReactiveAdminClient.create(ac, clustersProperties.getAdminClient())
.doOnError(th -> ac.close())
)
.onErrorMap(th -> new IllegalStateException(
"Error while creating AdminClient for the cluster " + cluster.getName(), th));
}
Expand Down
128 changes: 105 additions & 23 deletions api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
import io.kafbat.ui.model.InternalConsumerGroup;
import io.kafbat.ui.model.InternalTopicConsumerGroup;
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.model.ServerStatusDTO;
import io.kafbat.ui.model.SortOrderDTO;
import io.kafbat.ui.model.Statistics;
import io.kafbat.ui.service.index.ConsumerGroupFilter;
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
import io.kafbat.ui.service.rbac.AccessControlService;
import io.kafbat.ui.util.ApplicationMetrics;
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
Expand All @@ -19,7 +22,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -41,6 +46,7 @@ public class ConsumerGroupService {
private final AdminClientService adminClientService;
private final AccessControlService accessControlService;
private final ClustersProperties clustersProperties;
private final StatisticsCache statisticsCache;

private Mono<List<InternalConsumerGroup>> getConsumerGroups(
ReactiveAdminClient ac,
Expand All @@ -67,27 +73,63 @@ private Mono<List<InternalConsumerGroup>> getConsumerGroups(
public Mono<List<InternalTopicConsumerGroup>> getConsumerGroupsForTopic(KafkaCluster cluster,
String topic) {
return adminClientService.get(cluster)
// 1. getting topic's end offsets
.flatMap(ac -> ac.listTopicOffsets(topic, OffsetSpec.latest(), false)
.flatMap(endOffsets -> {
var tps = new ArrayList<>(endOffsets.keySet());
// 2. getting all consumer groups
return describeConsumerGroups(ac)
.flatMap((List<ConsumerGroupDescription> groups) -> {
// 3. trying to find committed offsets for topic
var groupNames = groups.stream().map(ConsumerGroupDescription::groupId).toList();
return ac.listConsumerGroupOffsets(groupNames, tps).map(offsets ->
groups.stream()
// 4. keeping only groups that relates to topic
.filter(g -> isConsumerGroupRelatesToTopic(topic, g, offsets.containsRow(g.groupId())))
.map(g ->
// 5. constructing results
InternalTopicConsumerGroup.create(topic, g, offsets.row(g.groupId()), endOffsets))
.toList()
);
}
);
}));
.flatMap(endOffsets ->
describeConsumerGroups(cluster, ac, true).flatMap(groups ->
filterConsumerGroups(cluster, ac, groups, topic, endOffsets)
)
)
);
}

private Mono<List<InternalTopicConsumerGroup>> filterConsumerGroups(
KafkaCluster cluster,
ReactiveAdminClient ac,
List<ConsumerGroupDescription> groups,
String topic,
Map<TopicPartition, Long> endOffsets) {

Set<ConsumerGroupState> inactiveStates = Set.of(
ConsumerGroupState.DEAD,
ConsumerGroupState.EMPTY
);

Map<Boolean, List<ConsumerGroupDescription>> partitioned = groups.stream().collect(
Collectors.partitioningBy((g) -> !inactiveStates.contains(g.state()))
);

List<ConsumerGroupDescription> stable = partitioned.get(true).stream()
.filter(g -> isConsumerGroupRelatesToTopic(topic, g, false))
.toList();

List<ConsumerGroupDescription> dead = partitioned.get(false);
if (!dead.isEmpty()) {
Statistics statistics = statisticsCache.get(cluster);
if (statistics.getStatus().equals(ServerStatusDTO.ONLINE)) {
Map<String, ScrapedClusterState.ConsumerGroupState> consumerGroupsStates =
statistics.getClusterState().getConsumerGroupsStates();
dead = dead.stream().filter(g ->
Optional.ofNullable(consumerGroupsStates.get(g.groupId()))
.map(s ->
s.committedOffsets().keySet().stream().anyMatch(tp -> tp.topic().equals(topic))
).orElse(false)
).toList();
}
}

List<ConsumerGroupDescription> filtered = new ArrayList<>(stable.size() + dead.size());
filtered.addAll(stable);
filtered.addAll(dead);

List<TopicPartition> partitions = new ArrayList<>(endOffsets.keySet());

List<String> groupIds = filtered.stream().map(ConsumerGroupDescription::groupId).toList();
return ac.listConsumerGroupOffsets(groupIds, partitions).map(offsets ->
filtered.stream().filter(g ->
isConsumerGroupRelatesToTopic(topic, g, offsets.containsRow(g.groupId()))
).map(g ->
InternalTopicConsumerGroup.create(topic, g, offsets.row(g.groupId()), endOffsets)
).toList());
}

private boolean isConsumerGroupRelatesToTopic(String topic,
Expand Down Expand Up @@ -208,13 +250,53 @@ private <T> Stream<T> sortAndPaginate(Collection<T> collection,
.limit(perPage);
}

private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(ReactiveAdminClient ac) {
private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(
KafkaCluster cluster,
ReactiveAdminClient ac,
boolean cache) {
return ac.listConsumerGroupNames()
.flatMap(ac::describeConsumerGroups)
.map(cgs -> new ArrayList<>(cgs.values()));
.flatMap(names -> describeConsumerGroups(names, cluster, ac, cache));
}

private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(
List<String> groupNames,
KafkaCluster cluster,
ReactiveAdminClient ac,
boolean cache) {

Statistics statistics = statisticsCache.get(cluster);

if (cache && statistics.getStatus().equals(ServerStatusDTO.ONLINE)) {
List<ConsumerGroupDescription> result = new ArrayList<>();
List<String> notFound = new ArrayList<>();
Map<String, ScrapedClusterState.ConsumerGroupState> consumerGroupsStates =
statistics.getClusterState().getConsumerGroupsStates();
for (String groupName : groupNames) {
ScrapedClusterState.ConsumerGroupState consumerGroupState = consumerGroupsStates.get(groupName);
if (consumerGroupState != null) {
result.add(consumerGroupState.description());
} else {
notFound.add(groupName);
}
}
if (!notFound.isEmpty()) {
return ac.describeConsumerGroups(notFound)
.map(descriptions -> {
result.addAll(descriptions.values());
return result;
});
} else {
return Mono.just(result);
}
} else {
return ac.describeConsumerGroups(groupNames)
.map(descriptions -> List.copyOf(descriptions.values()));
}
}




private Mono<List<ConsumerGroupDescription>> loadDescriptionsByInternalConsumerGroups(
ReactiveAdminClient ac,
List<ConsumerGroupListing> groups,
Expand Down
21 changes: 12 additions & 9 deletions api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Table;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.exception.IllegalEntityStateException;
import io.kafbat.ui.exception.NotFoundException;
import io.kafbat.ui.exception.ValidationException;
Expand Down Expand Up @@ -88,7 +89,6 @@
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -190,9 +190,11 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
}
}

public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
public static Mono<ReactiveAdminClient> create(AdminClient adminClient, ClustersProperties.AdminClient properties) {
Mono<ConfigRelatedInfo> configRelatedInfoMono = ConfigRelatedInfo.extract(adminClient);
return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info));
return configRelatedInfoMono.map(info ->
new ReactiveAdminClient(adminClient, configRelatedInfoMono, properties, info)
);
}


Expand Down Expand Up @@ -235,6 +237,7 @@ public static <T> Mono<T> toMono(KafkaFuture<T> future) {
@Getter(AccessLevel.PACKAGE) // visible for testing
private final AdminClient client;
private final Mono<ConfigRelatedInfo> configRelatedInfoMono;
private final ClustersProperties.AdminClient properties;

private volatile ConfigRelatedInfo configRelatedInfo;

Expand Down Expand Up @@ -280,7 +283,7 @@ public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> t
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
return partitionCalls(
topicNames,
200,
properties.getGetTopicsConfigPartitionSize(),
part -> getTopicsConfigImpl(part, includeDocFixed),
mapMerger()
);
Expand Down Expand Up @@ -348,7 +351,7 @@ public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> top
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
return partitionCalls(
topics,
200,
properties.getDescribeTopicsPartitionSize(),
this::describeTopicsImpl,
mapMerger()
);
Expand Down Expand Up @@ -517,8 +520,8 @@ public Mono<Collection<ConsumerGroupListing>> listConsumerGroups() {
public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
return partitionCalls(
groupIds,
25,
4,
properties.getDescribeConsumerGroupsPartitionSize(),
properties.getDescribeConsumerGroupsConcurrency(),
ids -> toMono(client.describeConsumerGroups(ids).all()),
mapMerger()
);
Expand All @@ -541,8 +544,8 @@ public Mono<Table<String, TopicPartition, Long>> listConsumerGroupOffsets(List<S

Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>> merged = partitionCalls(
consumerGroups,
25,
4,
properties.getListConsumerGroupOffsetsPartitionSize(),
properties.getListConsumerGroupOffsetsConcurrency(),
call,
mapMerger()
);
Expand Down
Loading
Loading