Skip to content

Commit b491951

Browse files
authored
BE: Added schemas ordering backend (#1300)
1 parent f2fabd3 commit b491951

File tree

6 files changed

+160
-18
lines changed

6 files changed

+160
-18
lines changed

api/src/main/java/io/kafbat/ui/controller/SchemasController.java

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,27 @@
11
package io.kafbat.ui.controller;
22

33
import io.kafbat.ui.api.SchemasApi;
4+
import io.kafbat.ui.api.model.SchemaColumnsToSort;
45
import io.kafbat.ui.config.ClustersProperties;
56
import io.kafbat.ui.exception.ValidationException;
67
import io.kafbat.ui.mapper.KafkaSrMapper;
78
import io.kafbat.ui.mapper.KafkaSrMapperImpl;
89
import io.kafbat.ui.model.CompatibilityCheckResponseDTO;
910
import io.kafbat.ui.model.CompatibilityLevelDTO;
11+
import io.kafbat.ui.model.InternalTopic;
1012
import io.kafbat.ui.model.KafkaCluster;
1113
import io.kafbat.ui.model.NewSchemaSubjectDTO;
14+
import io.kafbat.ui.model.SchemaColumnsToSortDTO;
1215
import io.kafbat.ui.model.SchemaSubjectDTO;
1316
import io.kafbat.ui.model.SchemaSubjectsResponseDTO;
17+
import io.kafbat.ui.model.SortOrderDTO;
1418
import io.kafbat.ui.model.rbac.AccessContext;
1519
import io.kafbat.ui.model.rbac.permission.SchemaAction;
1620
import io.kafbat.ui.service.SchemaRegistryService;
21+
import io.kafbat.ui.service.SchemaRegistryService.SubjectWithCompatibilityLevel;
1722
import io.kafbat.ui.service.index.SchemasFilter;
1823
import io.kafbat.ui.service.mcp.McpTool;
24+
import java.util.Comparator;
1925
import java.util.List;
2026
import java.util.Map;
2127
import javax.validation.Valid;
@@ -208,6 +214,8 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
208214
@Valid Integer pageNum,
209215
@Valid Integer perPage,
210216
@Valid String search,
217+
SchemaColumnsToSortDTO orderBy,
218+
SortOrderDTO sortOrder,
211219
ServerWebExchange serverWebExchange) {
212220
var context = AccessContext.builder()
213221
.cluster(clusterName)
@@ -230,17 +238,66 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
230238

231239
var totalPages = (filteredSubjects.size() / pageSize)
232240
+ (filteredSubjects.size() % pageSize == 0 ? 0 : 1);
233-
List<String> subjectsToRender = filteredSubjects.stream()
234-
.skip(subjectToSkip)
235-
.limit(pageSize)
236-
.toList();
237-
return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender)
238-
.map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
241+
242+
List<String> subjectsToRetrieve;
243+
boolean paginate = true;
244+
var schemaComparator = getComparatorForSchema(orderBy);
245+
final Comparator<SubjectWithCompatibilityLevel> comparator =
246+
sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
247+
? schemaComparator : schemaComparator.reversed();
248+
if (orderBy == null || SchemaColumnsToSortDTO.SUBJECT.equals(orderBy)) {
249+
if (SortOrderDTO.DESC.equals(sortOrder)) {
250+
filteredSubjects.sort(Comparator.reverseOrder());
251+
}
252+
subjectsToRetrieve = filteredSubjects.stream()
253+
.skip(subjectToSkip)
254+
.limit(pageSize)
255+
.toList();
256+
paginate = false;
257+
} else {
258+
subjectsToRetrieve = filteredSubjects;
259+
}
260+
261+
final boolean shouldPaginate = paginate;
262+
263+
return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRetrieve, pageSize)
264+
.map(subjs ->
265+
paginateSchemas(subjs, comparator, shouldPaginate, pageSize, subjectToSkip)
266+
).map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
239267
.map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
240268
}).map(ResponseEntity::ok)
241269
.doOnEach(sig -> audit(context, sig));
242270
}
243271

272+
private List<SubjectWithCompatibilityLevel> paginateSchemas(
273+
List<SubjectWithCompatibilityLevel> subjects,
274+
Comparator<SubjectWithCompatibilityLevel> comparator,
275+
boolean paginate,
276+
int pageSize,
277+
int subjectToSkip) {
278+
subjects.sort(comparator);
279+
if (paginate) {
280+
return subjects.subList(subjectToSkip, Math.min(subjectToSkip + pageSize, subjects.size()));
281+
} else {
282+
return subjects;
283+
}
284+
}
285+
286+
private Comparator<SubjectWithCompatibilityLevel> getComparatorForSchema(
287+
@Valid SchemaColumnsToSortDTO orderBy) {
288+
var defaultComparator = Comparator.comparing(SubjectWithCompatibilityLevel::getSubject);
289+
if (orderBy == null) {
290+
return defaultComparator;
291+
}
292+
return switch (orderBy) {
293+
case SUBJECT -> Comparator.comparing(SubjectWithCompatibilityLevel::getSubject);
294+
case ID -> Comparator.comparing(SubjectWithCompatibilityLevel::getId);
295+
case TYPE -> Comparator.comparing(SubjectWithCompatibilityLevel::getSchemaType);
296+
case COMPATIBILITY -> Comparator.comparing(SubjectWithCompatibilityLevel::getCompatibility);
297+
default -> defaultComparator;
298+
};
299+
}
300+
244301
@Override
245302
public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
246303
String clusterName, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,

api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,10 @@ private ReactiveFailover<KafkaSrClientApi> api(KafkaCluster cluster) {
4747
}
4848

4949
public Mono<List<SubjectWithCompatibilityLevel>> getAllLatestVersionSchemas(KafkaCluster cluster,
50-
List<String> subjects) {
50+
List<String> subjects,
51+
int pageSize) {
5152
return Flux.fromIterable(subjects)
52-
.concatMap(subject -> getLatestSchemaVersionBySubject(cluster, subject))
53+
.flatMap(subject -> getLatestSchemaVersionBySubject(cluster, subject), pageSize)
5354
.collect(Collectors.toList());
5455
}
5556

api/src/test/java/io/kafbat/ui/service/SchemaRegistryPaginationTest.java

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kafbat.ui.service;
22

33
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.mockito.ArgumentMatchers.anyInt;
45
import static org.mockito.ArgumentMatchers.anyList;
56
import static org.mockito.ArgumentMatchers.isA;
67
import static org.mockito.Mockito.mock;
@@ -9,15 +10,22 @@
910
import io.kafbat.ui.config.ClustersProperties;
1011
import io.kafbat.ui.controller.SchemasController;
1112
import io.kafbat.ui.model.KafkaCluster;
13+
import io.kafbat.ui.model.SchemaColumnsToSortDTO;
1214
import io.kafbat.ui.model.SchemaSubjectDTO;
15+
import io.kafbat.ui.model.SortOrderDTO;
16+
import io.kafbat.ui.service.SchemaRegistryService.SubjectWithCompatibilityLevel;
1317
import io.kafbat.ui.service.audit.AuditService;
1418
import io.kafbat.ui.sr.model.Compatibility;
1519
import io.kafbat.ui.sr.model.SchemaSubject;
20+
import io.kafbat.ui.sr.model.SchemaType;
1621
import io.kafbat.ui.util.AccessControlServiceMock;
1722
import io.kafbat.ui.util.ReactiveFailover;
1823
import java.util.Comparator;
1924
import java.util.List;
25+
import java.util.Map;
2026
import java.util.Optional;
27+
import java.util.function.Function;
28+
import java.util.stream.Collectors;
2129
import java.util.stream.IntStream;
2230
import org.junit.jupiter.api.Test;
2331
import org.mockito.Mockito;
@@ -30,19 +38,31 @@ class SchemaRegistryPaginationTest {
3038
private SchemasController controller;
3139

3240
private void init(List<String> subjects) {
41+
initWithData(subjects.stream().map(s ->
42+
new SubjectWithCompatibilityLevel(
43+
new SchemaSubject().subject(s),
44+
Compatibility.FULL
45+
)
46+
).toList());
47+
}
48+
49+
private void initWithData(List<SubjectWithCompatibilityLevel> subjects) {
3350
ClustersStorage clustersStorage = Mockito.mock(ClustersStorage.class);
3451
when(clustersStorage.getClusterByName(isA(String.class)))
3552
.thenReturn(Optional.of(buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME)));
3653

54+
Map<String, SubjectWithCompatibilityLevel> subjectsMap = subjects.stream().collect(Collectors.toMap(
55+
SubjectWithCompatibilityLevel::getSubject,
56+
Function.identity()
57+
));
58+
3759
SchemaRegistryService schemaRegistryService = Mockito.mock(SchemaRegistryService.class);
3860
when(schemaRegistryService.getAllSubjectNames(isA(KafkaCluster.class)))
39-
.thenReturn(Mono.just(subjects));
61+
.thenReturn(Mono.just(subjects.stream().map(SubjectWithCompatibilityLevel::getSubject).toList()));
4062
when(schemaRegistryService
41-
.getAllLatestVersionSchemas(isA(KafkaCluster.class), anyList())).thenCallRealMethod();
63+
.getAllLatestVersionSchemas(isA(KafkaCluster.class), anyList(), anyInt())).thenCallRealMethod();
4264
when(schemaRegistryService.getLatestSchemaVersionBySubject(isA(KafkaCluster.class), isA(String.class)))
43-
.thenAnswer(a -> Mono.just(
44-
new SchemaRegistryService.SubjectWithCompatibilityLevel(
45-
new SchemaSubject().subject(a.getArgument(1)), Compatibility.FULL)));
65+
.thenAnswer(a -> Mono.just(subjectsMap.get(a.getArgument(1))));
4666

4767
this.controller = new SchemasController(schemaRegistryService, new ClustersProperties());
4868
this.controller.setAccessControlService(new AccessControlServiceMock().getMock());
@@ -59,7 +79,7 @@ void shouldListFirst25andThen10Schemas() {
5979
.toList()
6080
);
6181
var schemasFirst25 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
62-
null, null, null, null).block();
82+
null, null, null, null, null, null).block();
6383
assertThat(schemasFirst25).isNotNull();
6484
assertThat(schemasFirst25.getBody()).isNotNull();
6585
assertThat(schemasFirst25.getBody().getPageCount()).isEqualTo(4);
@@ -68,7 +88,7 @@ void shouldListFirst25andThen10Schemas() {
6888
.isSortedAccordingTo(Comparator.comparing(SchemaSubjectDTO::getSubject));
6989

7090
var schemasFirst10 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
71-
null, 10, null, null).block();
91+
null, 10, null, null, null, null).block();
7292

7393
assertThat(schemasFirst10).isNotNull();
7494
assertThat(schemasFirst10.getBody()).isNotNull();
@@ -87,7 +107,7 @@ void shouldListSchemasContaining_1() {
87107
.toList()
88108
);
89109
var schemasSearch7 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
90-
null, null, "1", null).block();
110+
null, null, "1", null, null, null).block();
91111
assertThat(schemasSearch7).isNotNull();
92112
assertThat(schemasSearch7.getBody()).isNotNull();
93113
assertThat(schemasSearch7.getBody().getPageCount()).isEqualTo(1);
@@ -103,7 +123,7 @@ void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
103123
.toList()
104124
);
105125
var schemas = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
106-
0, -1, null, null).block();
126+
0, -1, null, null, null, null).block();
107127

108128
assertThat(schemas).isNotNull();
109129
assertThat(schemas.getBody()).isNotNull();
@@ -122,7 +142,7 @@ void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
122142
);
123143

124144
var schemas = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
125-
4, 33, null, null).block();
145+
4, 33, null, null, null, null).block();
126146

127147
assertThat(schemas).isNotNull();
128148
assertThat(schemas.getBody()).isNotNull();
@@ -138,4 +158,39 @@ private KafkaCluster buildKafkaCluster(String clusterName) {
138158
.schemaRegistryClient(mock(ReactiveFailover.class))
139159
.build();
140160
}
161+
162+
@Test
163+
void shouldOrderByAndPaginate() {
164+
List<SubjectWithCompatibilityLevel> schemas = IntStream.rangeClosed(1, 100)
165+
.boxed()
166+
.map(num -> new
167+
SubjectWithCompatibilityLevel(
168+
new SchemaSubject()
169+
.subject("subject" + num)
170+
.schemaType(SchemaType.AVRO)
171+
.id(num),
172+
Compatibility.FULL
173+
)
174+
).toList();
175+
176+
initWithData(schemas);
177+
178+
var schemasFirst25 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
179+
null, null, null,
180+
SchemaColumnsToSortDTO.ID, SortOrderDTO.DESC, null
181+
).block();
182+
183+
List<String> last25OrderedById = schemas.stream()
184+
.sorted(Comparator.comparing(SubjectWithCompatibilityLevel::getId).reversed())
185+
.map(SubjectWithCompatibilityLevel::getSubject)
186+
.limit(25)
187+
.toList();
188+
189+
assertThat(schemasFirst25).isNotNull();
190+
assertThat(schemasFirst25.getBody()).isNotNull();
191+
assertThat(schemasFirst25.getBody().getPageCount()).isEqualTo(4);
192+
assertThat(schemasFirst25.getBody().getSchemas()).hasSize(25);
193+
assertThat(schemasFirst25.getBody().getSchemas().stream().map(SchemaSubjectDTO::getSubject).toList())
194+
.isEqualTo(last25OrderedById);
195+
}
141196
}

contract-typespec/api/schemas.tsp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import "@typespec/openapi";
22
import "./responses.tsp";
3+
import "./models.tsp";
34

45
namespace Api;
56

@@ -26,6 +27,8 @@ interface SchemasApi {
2627
@query page?: int32,
2728
@query perPage?: int32,
2829
@query search?: string,
30+
@query orderBy?: SchemaColumnsToSort,
31+
@query sortOrder?: SortOrder,
2932
): SchemaSubjectsResponse;
3033

3134
@delete
@@ -172,3 +175,10 @@ model SchemaSubjectsResponse {
172175
pageCount?: int32;
173176
schemas?: SchemaSubject[];
174177
}
178+
179+
enum SchemaColumnsToSort {
180+
SUBJECT,
181+
ID,
182+
TYPE,
183+
COMPATIBILITY,
184+
}

contract-typespec/api/topics.tsp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import "@typespec/openapi";
22
import "./responses.tsp";
3+
import "./models.tsp";
34

45
namespace Api;
56

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1208,6 +1208,16 @@ paths:
12081208
required: false
12091209
schema:
12101210
type: string
1211+
- name: orderBy
1212+
in: query
1213+
required: false
1214+
schema:
1215+
$ref: '#/components/schemas/SchemaColumnsToSort'
1216+
- name: sortOrder
1217+
in: query
1218+
required: false
1219+
schema:
1220+
$ref: '#/components/schemas/SortOrder'
12111221
responses:
12121222
200:
12131223
description: OK
@@ -2683,6 +2693,14 @@ components:
26832693
- REPLICATION_FACTOR
26842694
- SIZE
26852695

2696+
SchemaColumnsToSort:
2697+
type: string
2698+
enum:
2699+
- SUBJECT
2700+
- ID
2701+
- TYPE
2702+
- COMPATIBILITY
2703+
26862704
ConnectorColumnsToSort:
26872705
type: string
26882706
enum:

0 commit comments

Comments
 (0)