Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,19 @@
package org.apache.pulsar.client.impl.schema;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.pulsar.common.schema.SchemaType;
Expand Down Expand Up @@ -68,6 +76,38 @@ public class ProtobufSchemaTest {
+ "\"externalMessage\\\",\\\"type\\\":\\\"MESSAGE\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition"
+ "\\\":null}]\"}";

private static final ObjectMapper MAPPER = new ObjectMapper();

private static ObjectNode normalizeAllProps(Map<String, String> props, String specialKey)
throws JsonProcessingException {
ObjectNode out = MAPPER.createObjectNode();

props.forEach((k, v) -> {
if (specialKey.equals(k)) {
try {
// Store the special key's stringified json data as a json array
ArrayNode arr = (ArrayNode) MAPPER.readTree(v);

// sort deterministically by (number, name)
List<JsonNode> items = new ArrayList<>();
arr.forEach(items::add);
items.sort(Comparator
.comparing((JsonNode n) -> n.path("number").asInt())
.thenComparing(n -> n.path("name").asText()));
ArrayNode sorted = MAPPER.createArrayNode();
items.forEach(sorted::add);
out.set(k, sorted);
} catch (Exception e) {
// If it's not valid JSON for some reason, store as raw string
out.put(k, v);
}
} else {
out.put(k, v);
}
});
return out;
}

@Test
public void testEncodeAndDecode() {
Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setName(NAME).build();
Expand Down Expand Up @@ -120,9 +160,14 @@ public void testParsingInfoProperty() throws JsonProcessingException {
ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema =
ProtobufSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class);

Assert.assertEquals(new ObjectMapper().writeValueAsString(
protobufSchema.getSchemaInfo().getProperties()), EXPECTED_PARSING_INFO);
Map<String, String> actualProps = protobufSchema.getSchemaInfo().getProperties();
Map<String, String> expectedProps = MAPPER.readValue(
EXPECTED_PARSING_INFO, new TypeReference<Map<String, String>>() {});

ObjectNode normActual = normalizeAllProps(actualProps, "__PARSING_INFO__");
ObjectNode normExpected = normalizeAllProps(expectedProps, "__PARSING_INFO__");

Assert.assertEquals(normActual, normExpected);
}

@Test
Expand Down
Loading