|
19 | 19 | package org.apache.pulsar.client.impl.schema; |
20 | 20 |
|
21 | 21 | import com.fasterxml.jackson.core.JsonProcessingException; |
| 22 | +import com.fasterxml.jackson.core.type.TypeReference; |
| 23 | +import com.fasterxml.jackson.databind.JsonNode; |
22 | 24 | import com.fasterxml.jackson.databind.ObjectMapper; |
| 25 | +import com.fasterxml.jackson.databind.node.ArrayNode; |
| 26 | +import com.fasterxml.jackson.databind.node.ObjectNode; |
23 | 27 | import io.netty.buffer.ByteBuf; |
24 | 28 | import io.netty.buffer.ByteBufAllocator; |
| 29 | +import java.util.ArrayList; |
25 | 30 | import java.util.Collections; |
| 31 | +import java.util.Comparator; |
26 | 32 | import java.util.HashMap; |
| 33 | +import java.util.List; |
| 34 | +import java.util.Map; |
27 | 35 | import lombok.extern.slf4j.Slf4j; |
28 | 36 | import org.apache.avro.Schema; |
29 | 37 | import org.apache.pulsar.common.schema.SchemaType; |
@@ -68,6 +76,38 @@ public class ProtobufSchemaTest { |
68 | 76 | + "\"externalMessage\\\",\\\"type\\\":\\\"MESSAGE\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition" |
69 | 77 | + "\\\":null}]\"}"; |
70 | 78 |
|
| 79 | + private static final ObjectMapper MAPPER = new ObjectMapper(); |
| 80 | + |
| 81 | + private static ObjectNode normalizeAllProps(Map<String, String> props, String specialKey) |
| 82 | + throws JsonProcessingException { |
| 83 | + ObjectNode out = MAPPER.createObjectNode(); |
| 84 | + |
| 85 | + props.forEach((k, v) -> { |
| 86 | + if (specialKey.equals(k)) { |
| 87 | + try { |
| 88 | + // Store the special key's stringified json data as a json array |
| 89 | + ArrayNode arr = (ArrayNode) MAPPER.readTree(v); |
| 90 | + |
| 91 | + // sort deterministically by (number, name) |
| 92 | + List<JsonNode> items = new ArrayList<>(); |
| 93 | + arr.forEach(items::add); |
| 94 | + items.sort(Comparator |
| 95 | + .comparing((JsonNode n) -> n.path("number").asInt()) |
| 96 | + .thenComparing(n -> n.path("name").asText())); |
| 97 | + ArrayNode sorted = MAPPER.createArrayNode(); |
| 98 | + items.forEach(sorted::add); |
| 99 | + out.set(k, sorted); |
| 100 | + } catch (Exception e) { |
| 101 | + // If it's not valid JSON for some reason, store as raw string |
| 102 | + out.put(k, v); |
| 103 | + } |
| 104 | + } else { |
| 105 | + out.put(k, v); |
| 106 | + } |
| 107 | + }); |
| 108 | + return out; |
| 109 | + } |
| 110 | + |
71 | 111 | @Test |
72 | 112 | public void testEncodeAndDecode() { |
73 | 113 | Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setName(NAME).build(); |
@@ -120,9 +160,14 @@ public void testParsingInfoProperty() throws JsonProcessingException { |
120 | 160 | ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema = |
121 | 161 | ProtobufSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class); |
122 | 162 |
|
123 | | - Assert.assertEquals(new ObjectMapper().writeValueAsString( |
124 | | - protobufSchema.getSchemaInfo().getProperties()), EXPECTED_PARSING_INFO); |
| 163 | + Map<String, String> actualProps = protobufSchema.getSchemaInfo().getProperties(); |
| 164 | + Map<String, String> expectedProps = MAPPER.readValue( |
| 165 | + EXPECTED_PARSING_INFO, new TypeReference<Map<String, String>>() {}); |
| 166 | + |
| 167 | + ObjectNode normActual = normalizeAllProps(actualProps, "__PARSING_INFO__"); |
| 168 | + ObjectNode normExpected = normalizeAllProps(expectedProps, "__PARSING_INFO__"); |
125 | 169 |
|
| 170 | + Assert.assertEquals(normActual, normExpected); |
126 | 171 | } |
127 | 172 |
|
128 | 173 | @Test |
|
0 commit comments