From 776811e4f74e735ed22a638afa86b8e574fe328b Mon Sep 17 00:00:00 2001 From: German Osin Date: Wed, 27 Aug 2025 17:11:57 +0200 Subject: [PATCH] ISSUE-208 Added schemas ordering backend --- .../ui/controller/SchemasController.java | 73 ++++++++++++++++-- .../ui/service/SchemaRegistryService.java | 5 +- .../service/SchemaRegistryPaginationTest.java | 75 ++++++++++++++++--- contract-typespec/api/schemas.tsp | 10 +++ contract-typespec/api/topics.tsp | 1 + .../main/resources/swagger/kafbat-ui-api.yaml | 18 +++++ 6 files changed, 162 insertions(+), 20 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/controller/SchemasController.java b/api/src/main/java/io/kafbat/ui/controller/SchemasController.java index a34110031..3a849f47f 100644 --- a/api/src/main/java/io/kafbat/ui/controller/SchemasController.java +++ b/api/src/main/java/io/kafbat/ui/controller/SchemasController.java @@ -3,19 +3,25 @@ import static org.apache.commons.lang3.Strings.CI; import io.kafbat.ui.api.SchemasApi; +import io.kafbat.ui.api.model.SchemaColumnsToSort; import io.kafbat.ui.exception.ValidationException; import io.kafbat.ui.mapper.KafkaSrMapper; import io.kafbat.ui.mapper.KafkaSrMapperImpl; import io.kafbat.ui.model.CompatibilityCheckResponseDTO; import io.kafbat.ui.model.CompatibilityLevelDTO; +import io.kafbat.ui.model.InternalTopic; import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.model.NewSchemaSubjectDTO; +import io.kafbat.ui.model.SchemaColumnsToSortDTO; import io.kafbat.ui.model.SchemaSubjectDTO; import io.kafbat.ui.model.SchemaSubjectsResponseDTO; +import io.kafbat.ui.model.SortOrderDTO; import io.kafbat.ui.model.rbac.AccessContext; import io.kafbat.ui.model.rbac.permission.SchemaAction; import io.kafbat.ui.service.SchemaRegistryService; +import io.kafbat.ui.service.SchemaRegistryService.SubjectWithCompatibilityLevel; import io.kafbat.ui.service.mcp.McpTool; +import java.util.Comparator; import java.util.List; import java.util.Map; import javax.validation.Valid; @@ -208,6 +214,8 @@ public Mono> getSchemas(String cluster @Valid Integer pageNum, @Valid Integer perPage, @Valid String search, + SchemaColumnsToSortDTO orderBy, + SortOrderDTO sortOrder, ServerWebExchange serverWebExchange) { var context = AccessContext.builder() .cluster(clusterName) @@ -222,23 +230,72 @@ public Mono> getSchemas(String cluster .flatMap(subjects -> { int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE; int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize; - List filteredSubjects = subjects + List filteredSubjects = new java.util.ArrayList<>(subjects .stream() .filter(subj -> search == null || CI.contains(subj, search)) - .sorted().toList(); + .sorted().toList()); var totalPages = (filteredSubjects.size() / pageSize) + (filteredSubjects.size() % pageSize == 0 ? 0 : 1); - List subjectsToRender = filteredSubjects.stream() - .skip(subjectToSkip) - .limit(pageSize) - .toList(); - return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender) - .map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList()) + + List subjectsToRender; + boolean paginate = true; + var schemaComparator = getComparatorForSchema(orderBy); + final Comparator comparator = + sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC) + ? schemaComparator : schemaComparator.reversed(); + if (orderBy == null || SchemaColumnsToSortDTO.SUBJECT.equals(orderBy)) { + if (SortOrderDTO.DESC.equals(sortOrder)) { + filteredSubjects.sort(Comparator.reverseOrder()); + } + subjectsToRender = filteredSubjects.stream() + .skip(subjectToSkip) + .limit(pageSize) + .toList(); + paginate = false; + } else { + subjectsToRender = filteredSubjects; + } + + final boolean shouldPaginate = paginate; + + return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender, pageSize) + .map(subjs -> + paginateSchemas(subjs, comparator, shouldPaginate, pageSize, subjectToSkip) + ).map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList()) .map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs)); }).map(ResponseEntity::ok) .doOnEach(sig -> audit(context, sig)); } + private List paginateSchemas( + List subjects, + Comparator comparator, + boolean paginate, + int pageSize, + int subjectToSkip) { + subjects.sort(comparator); + if (paginate) { + return subjects.subList(subjectToSkip, Math.min(subjectToSkip + pageSize, subjects.size())); + } else { + return subjects; + } + } + + private Comparator getComparatorForSchema( + @Valid SchemaColumnsToSortDTO orderBy) { + var defaultComparator = Comparator.comparing(SubjectWithCompatibilityLevel::getSubject); + if (orderBy == null) { + return defaultComparator; + } + return switch (orderBy) { + case SUBJECT -> Comparator.comparing(SubjectWithCompatibilityLevel::getSubject); + case ID -> Comparator.comparing(SubjectWithCompatibilityLevel::getId); + case TYPE -> Comparator.comparing(SubjectWithCompatibilityLevel::getSchemaType); + case COMPATIBILITY -> Comparator.comparing(SubjectWithCompatibilityLevel::getCompatibility); + default -> defaultComparator; + }; + } + @Override public Mono> updateGlobalSchemaCompatibilityLevel( String clusterName, @Valid Mono compatibilityLevelMono, diff --git a/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java b/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java index c725a787e..e3c7c9aa6 100644 --- a/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java +++ b/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java @@ -47,9 +47,10 @@ private ReactiveFailover api(KafkaCluster cluster) { } public Mono> getAllLatestVersionSchemas(KafkaCluster cluster, - List subjects) { + List subjects, + int pageSize) { return Flux.fromIterable(subjects) - .concatMap(subject -> getLatestSchemaVersionBySubject(cluster, subject)) + .flatMap(subject -> getLatestSchemaVersionBySubject(cluster, subject), pageSize) .collect(Collectors.toList()); } diff --git a/api/src/test/java/io/kafbat/ui/service/SchemaRegistryPaginationTest.java b/api/src/test/java/io/kafbat/ui/service/SchemaRegistryPaginationTest.java index 43cb29382..ca0f6e249 100644 --- a/api/src/test/java/io/kafbat/ui/service/SchemaRegistryPaginationTest.java +++ b/api/src/test/java/io/kafbat/ui/service/SchemaRegistryPaginationTest.java @@ -1,6 +1,7 @@ package io.kafbat.ui.service; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; @@ -8,15 +9,22 @@ import io.kafbat.ui.controller.SchemasController; import io.kafbat.ui.model.KafkaCluster; +import io.kafbat.ui.model.SchemaColumnsToSortDTO; import io.kafbat.ui.model.SchemaSubjectDTO; +import io.kafbat.ui.model.SortOrderDTO; +import io.kafbat.ui.service.SchemaRegistryService.SubjectWithCompatibilityLevel; import io.kafbat.ui.service.audit.AuditService; import io.kafbat.ui.sr.model.Compatibility; import io.kafbat.ui.sr.model.SchemaSubject; +import io.kafbat.ui.sr.model.SchemaType; import io.kafbat.ui.util.AccessControlServiceMock; import io.kafbat.ui.util.ReactiveFailover; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -29,19 +37,31 @@ class SchemaRegistryPaginationTest { private SchemasController controller; private void init(List subjects) { + initWithData(subjects.stream().map(s -> + new SubjectWithCompatibilityLevel( + new SchemaSubject().subject(s), + Compatibility.FULL + ) + ).toList()); + } + + private void initWithData(List subjects) { ClustersStorage clustersStorage = Mockito.mock(ClustersStorage.class); when(clustersStorage.getClusterByName(isA(String.class))) .thenReturn(Optional.of(buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME))); + Map subjectsMap = subjects.stream().collect(Collectors.toMap( + SubjectWithCompatibilityLevel::getSubject, + Function.identity() + )); + SchemaRegistryService schemaRegistryService = Mockito.mock(SchemaRegistryService.class); when(schemaRegistryService.getAllSubjectNames(isA(KafkaCluster.class))) - .thenReturn(Mono.just(subjects)); + .thenReturn(Mono.just(subjects.stream().map(SubjectWithCompatibilityLevel::getSubject).toList())); when(schemaRegistryService - .getAllLatestVersionSchemas(isA(KafkaCluster.class), anyList())).thenCallRealMethod(); + .getAllLatestVersionSchemas(isA(KafkaCluster.class), anyList(), anyInt())).thenCallRealMethod(); when(schemaRegistryService.getLatestSchemaVersionBySubject(isA(KafkaCluster.class), isA(String.class))) - .thenAnswer(a -> Mono.just( - new SchemaRegistryService.SubjectWithCompatibilityLevel( - new SchemaSubject().subject(a.getArgument(1)), Compatibility.FULL))); + .thenAnswer(a -> Mono.just(subjectsMap.get(a.getArgument(1)))); this.controller = new SchemasController(schemaRegistryService); this.controller.setAccessControlService(new AccessControlServiceMock().getMock()); @@ -58,7 +78,7 @@ void shouldListFirst25andThen10Schemas() { .toList() ); var schemasFirst25 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME, - null, null, null, null).block(); + null, null, null, null, null, null).block(); assertThat(schemasFirst25).isNotNull(); assertThat(schemasFirst25.getBody()).isNotNull(); assertThat(schemasFirst25.getBody().getPageCount()).isEqualTo(4); @@ -67,7 +87,7 @@ void shouldListFirst25andThen10Schemas() { .isSortedAccordingTo(Comparator.comparing(SchemaSubjectDTO::getSubject)); var schemasFirst10 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME, - null, 10, null, null).block(); + null, 10, null, null, null, null).block(); assertThat(schemasFirst10).isNotNull(); assertThat(schemasFirst10.getBody()).isNotNull(); @@ -86,7 +106,7 @@ void shouldListSchemasContaining_1() { .toList() ); var schemasSearch7 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME, - null, null, "1", null).block(); + null, null, "1", null, null, null).block(); assertThat(schemasSearch7).isNotNull(); assertThat(schemasSearch7.getBody()).isNotNull(); assertThat(schemasSearch7.getBody().getPageCount()).isEqualTo(1); @@ -102,7 +122,7 @@ void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() { .toList() ); var schemas = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME, - 0, -1, null, null).block(); + 0, -1, null, null, null, null).block(); assertThat(schemas).isNotNull(); assertThat(schemas.getBody()).isNotNull(); @@ -121,7 +141,7 @@ void shouldCalculateCorrectPageCountForNonDivisiblePageSize() { ); var schemas = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME, - 4, 33, null, null).block(); + 4, 33, null, null, null, null).block(); assertThat(schemas).isNotNull(); assertThat(schemas.getBody()).isNotNull(); @@ -137,4 +157,39 @@ private KafkaCluster buildKafkaCluster(String clusterName) { .schemaRegistryClient(mock(ReactiveFailover.class)) .build(); } + + @Test + void shouldOrderByAndPaginate() { + List schemas = IntStream.rangeClosed(1, 100) + .boxed() + .map(num -> new + SubjectWithCompatibilityLevel( + new SchemaSubject() + .subject("subject" + num) + .schemaType(SchemaType.AVRO) + .id(num), + Compatibility.FULL + ) + ).toList(); + + initWithData(schemas); + + var schemasFirst25 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME, + null, null, null, + SchemaColumnsToSortDTO.ID, SortOrderDTO.DESC, null + ).block(); + + List last25OrderedById = schemas.stream() + .sorted(Comparator.comparing(SubjectWithCompatibilityLevel::getId).reversed()) + .map(SubjectWithCompatibilityLevel::getSubject) + .limit(25) + .toList(); + + assertThat(schemasFirst25).isNotNull(); + assertThat(schemasFirst25.getBody()).isNotNull(); + assertThat(schemasFirst25.getBody().getPageCount()).isEqualTo(4); + assertThat(schemasFirst25.getBody().getSchemas()).hasSize(25); + assertThat(schemasFirst25.getBody().getSchemas().stream().map(SchemaSubjectDTO::getSubject).toList()) + .isEqualTo(last25OrderedById); + } } diff --git a/contract-typespec/api/schemas.tsp b/contract-typespec/api/schemas.tsp index 2d4d7e997..1022a8773 100644 --- a/contract-typespec/api/schemas.tsp +++ b/contract-typespec/api/schemas.tsp @@ -1,5 +1,6 @@ import "@typespec/openapi"; import "./responses.tsp"; +import "./models.tsp"; namespace Api; @@ -26,6 +27,8 @@ interface SchemasApi { @query page?: int32, @query perPage?: int32, @query search?: string, + @query orderBy?: SchemaColumnsToSort, + @query sortOrder?: SortOrder, ): SchemaSubjectsResponse; @delete @@ -172,3 +175,10 @@ model SchemaSubjectsResponse { pageCount?: int32; schemas?: SchemaSubject[]; } + +enum SchemaColumnsToSort { + SUBJECT, + ID, + TYPE, + COMPATIBILITY, +} diff --git a/contract-typespec/api/topics.tsp b/contract-typespec/api/topics.tsp index c41696764..7c03856bc 100644 --- a/contract-typespec/api/topics.tsp +++ b/contract-typespec/api/topics.tsp @@ -1,5 +1,6 @@ import "@typespec/openapi"; import "./responses.tsp"; +import "./models.tsp"; namespace Api; diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 7eea7bd21..09f1c45f8 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -1208,6 +1208,16 @@ paths: required: false schema: type: string + - name: orderBy + in: query + required: false + schema: + $ref: '#/components/schemas/SchemaColumnsToSort' + - name: sortOrder + in: query + required: false + schema: + $ref: '#/components/schemas/SortOrder' responses: 200: description: OK @@ -2683,6 +2693,14 @@ components: - REPLICATION_FACTOR - SIZE + SchemaColumnsToSort: + type: string + enum: + - SUBJECT + - ID + - TYPE + - COMPATIBILITY + ConnectorColumnsToSort: type: string enum: