Skip to content

Commit d9d6213

Browse files
committed
Use Jackson JSON parser instead of rolling own parsing
1 parent 750c68f commit d9d6213

File tree

6 files changed

+82
-63
lines changed

6 files changed

+82
-63
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212PlanUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ public Map<String, String> extractPlanProduct(TreeNode plan) {
1919
Object obj = it.next();
2020
String key = String.format("_dd.unknown_key.%d", i);
2121

22-
String val = parsePlanProduct(obj);
22+
Object val = parsePlanProduct(obj);
2323
if (val != null) {
24-
args.put(key, val);
24+
args.put(key, writeObjectToString(val));
2525
} else {
2626
unparsed.put(key, obj.getClass().getName());
2727
}
@@ -31,7 +31,7 @@ public Map<String, String> extractPlanProduct(TreeNode plan) {
3131

3232
if (unparsed.size() > 0) {
3333
// For now, place what we can't parse here with the types so we're aware of them
34-
args.put("_dd.unparsed", unparsed.toString());
34+
args.put("_dd.unparsed", writeObjectToString(unparsed.toString()));
3535
}
3636
return args;
3737
}

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213PlanUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ public Map<String, String> extractPlanProduct(TreeNode plan) {
1919
Object obj = it.next();
2020
String key = plan.productElementName(i);
2121

22-
String val = parsePlanProduct(obj);
22+
Object val = parsePlanProduct(obj);
2323
if (val != null) {
24-
args.put(key, val);
24+
args.put(key, writeObjectToString(val));
2525
} else {
2626
unparsed.put(key, obj.getClass().getName());
2727
}
@@ -31,7 +31,7 @@ public Map<String, String> extractPlanProduct(TreeNode plan) {
3131

3232
if (unparsed.size() > 0) {
3333
// For now, place what we can't parse here with the types so we're aware of them
34-
args.put("_dd.unparsed", unparsed.toString());
34+
args.put("_dd.unparsed", writeObjectToString(unparsed.toString()));
3535
}
3636
return args;
3737
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanUtils.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package datadog.trace.instrumentation.spark;
22

3+
import com.fasterxml.jackson.databind.DeserializationFeature;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import java.io.IOException;
36
import java.util.ArrayList;
47
import java.util.Map;
58
import org.apache.spark.sql.catalyst.plans.QueryPlan;
@@ -8,19 +11,41 @@
811
import scala.collection.Iterable;
912

1013
public abstract class AbstractSparkPlanUtils {
14+
ObjectMapper mapper =
15+
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
16+
1117
public abstract Map<String, String> extractPlanProduct(TreeNode node);
1218

13-
public String parsePlanProduct(Object value) {
19+
// Should only call on final values being written to `meta`
20+
public String writeObjectToString(Object value) {
21+
try {
22+
return mapper.writeValueAsString(value);
23+
} catch (IOException e) {
24+
return null;
25+
}
26+
}
27+
28+
// Should really only return valid JSON types (Array, Map, String, Boolean, Number, null)
29+
public Object parsePlanProduct(Object value) {
1430
if (value == null) {
1531
return "null";
1632
} else if (value instanceof Iterable) {
17-
ArrayList<String> list = new ArrayList<>();
33+
ArrayList<Object> list = new ArrayList<>();
1834
((Iterable) value).foreach(item -> list.add(parsePlanProduct(item)));
19-
return "[\"" + String.join("\", \"", list) + "\"]";
35+
return list;
2036
} else if (value instanceof Option) {
2137
return parsePlanProduct(((Option) value).getOrElse(() -> "none"));
2238
} else if (value instanceof QueryPlan) { // Filter out values referencing child nodes
2339
return null;
40+
// Eventually we want to traverse inner non-child TreeNodes too to get more details (e.g.
41+
// HashPartitioning)
42+
// Just need to find a way to get the name of the node and show it
43+
// We also may want to exclude certain node types (e.g. column representations?)
44+
/** meta: { output: { hashPartitioning: { numPartitions: 200, ... } } } */
45+
// } else if (value instanceof TreeNode) {
46+
// return extractPlanProduct(((TreeNode) value)).toString();
47+
} else if (value instanceof Boolean || Number.class.isInstance(value)) {
48+
return value;
2449
} else {
2550
return value.toString();
2651
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -151,15 +151,16 @@ public String toJson(Map<Long, AccumulatorWithStage> accumulators) {
151151

152152
ByteArrayOutputStream baos = new ByteArrayOutputStream();
153153
try (JsonGenerator generator = mapper.getFactory().createGenerator(baos)) {
154-
this.toJson(generator, accumulators);
154+
this.toJson(generator, accumulators, mapper);
155155
} catch (IOException e) {
156156
return null;
157157
}
158158

159159
return new String(baos.toByteArray(), StandardCharsets.UTF_8);
160160
}
161161

162-
private void toJson(JsonGenerator generator, Map<Long, AccumulatorWithStage> accumulators)
162+
private void toJson(
163+
JsonGenerator generator, Map<Long, AccumulatorWithStage> accumulators, ObjectMapper mapper)
163164
throws IOException {
164165
generator.writeStartObject();
165166
generator.writeStringField("node", plan.nodeName());
@@ -180,18 +181,11 @@ private void toJson(JsonGenerator generator, Map<Long, AccumulatorWithStage> acc
180181
generator.writeStartObject();
181182

182183
for (Tuple2<String, String> metadata : JavaConverters.asJavaCollection(plan.metadata())) {
183-
// If it looks like a string array, break apart and write as native JSON array
184-
if (metadata._2.startsWith("[\"") && metadata._2.endsWith("\"]")) {
185-
String[] list = metadata._2.substring(2, metadata._2.length() - 2).split("\", \"");
186-
187-
generator.writeFieldName(metadata._1);
188-
generator.writeStartArray();
189-
for (String entry : list) {
190-
generator.writeString(entry);
191-
}
192-
generator.writeEndArray();
193-
} else {
194-
generator.writeStringField(metadata._1, metadata._2);
184+
generator.writeFieldName(metadata._1);
185+
try {
186+
generator.writeTree(mapper.readTree(metadata._2));
187+
} catch (IOException e) {
188+
generator.writeString(metadata._2);
195189
}
196190
}
197191

@@ -219,7 +213,7 @@ private void toJson(JsonGenerator generator, Map<Long, AccumulatorWithStage> acc
219213
generator.writeFieldName("children");
220214
generator.writeStartArray();
221215
for (SparkPlanInfoForStage child : children) {
222-
child.toJson(generator, accumulators);
216+
child.toJson(generator, accumulators, mapper);
223217
}
224218
generator.writeEndArray();
225219
}

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
267267
"sum#16",
268268
"count#17L"
269269
],
270-
"_dd.unknown_key.4": "0",
270+
"_dd.unknown_key.4": 0,
271271
"_dd.unknown_key.5": [
272272
"string_col#0",
273273
"sum#18",
@@ -348,7 +348,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
348348
"_dd.unknown_key.1": ["string_col#0"],
349349
"_dd.unknown_key.2": ["avg(double_col#1)"],
350350
"_dd.unknown_key.3": ["avg(double_col#1)#4"],
351-
"_dd.unknown_key.4": "1",
351+
"_dd.unknown_key.4": 1,
352352
"_dd.unknown_key.5": [
353353
"string_col#0",
354354
"avg(double_col#1)#4 AS avg(double_col)#5"
@@ -552,10 +552,10 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
552552
"nodeDetailString": "(keys=[], functions=[partial_count(1)])",
553553
"meta": {
554554
"_dd.unknown_key.0": "none",
555-
"_dd.unknown_key.1": [""],
555+
"_dd.unknown_key.1": [],
556556
"_dd.unknown_key.2": ["partial_count(1)"],
557557
"_dd.unknown_key.3": ["count#38L"],
558-
"_dd.unknown_key.4": "0",
558+
"_dd.unknown_key.4": 0,
559559
"_dd.unknown_key.5": ["count#39L"],
560560
"_dd.unparsed": "any"
561561
},
@@ -574,7 +574,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
574574
"node": "Project",
575575
"nodeId": 1355342585,
576576
"meta": {
577-
"_dd.unknown_key.0": [""],
577+
"_dd.unknown_key.0": [],
578578
"_dd.unparsed": "any"
579579
},
580580
"children": [
@@ -618,8 +618,8 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
618618
"nodeDetailString": "[string_col#21 ASC NULLS FIRST], false, 0",
619619
"meta": {
620620
"_dd.unknown_key.0": ["string_col#21 ASC NULLS FIRST"],
621-
"_dd.unknown_key.1": "false",
622-
"_dd.unknown_key.3": "0",
621+
"_dd.unknown_key.1": false,
622+
"_dd.unknown_key.3": 0,
623623
"_dd.unparsed": "any"
624624
},
625625
"metrics": [
@@ -666,8 +666,8 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
666666
"nodeDetailString": "[string_col#25 ASC NULLS FIRST], false, 0",
667667
"meta": {
668668
"_dd.unknown_key.0": ["string_col#25 ASC NULLS FIRST"],
669-
"_dd.unknown_key.1": "false",
670-
"_dd.unknown_key.3": "0",
669+
"_dd.unknown_key.1": false,
670+
"_dd.unknown_key.3": 0,
671671
"_dd.unparsed": "any"
672672
},
673673
"metrics": [
@@ -716,11 +716,11 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
716716
"nodeId": 724815342,
717717
"nodeDetailString": "(keys=[], functions=[count(1)])",
718718
"meta": {
719-
"_dd.unknown_key.0": [""],
720-
"_dd.unknown_key.1": [""],
719+
"_dd.unknown_key.0": [],
720+
"_dd.unknown_key.1": [],
721721
"_dd.unknown_key.2": ["count(1)"],
722722
"_dd.unknown_key.3": ["count(1)#35L"],
723-
"_dd.unknown_key.4": "0",
723+
"_dd.unknown_key.4": 0,
724724
"_dd.unknown_key.5": ["count(1)#35L AS count#36L"],
725725
"_dd.unparsed": "any"
726726
},

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
8282
],
8383
"aggregateExpressions": ["partial_avg(double_col#1)"],
8484
"groupingExpressions": ["string_col#0"],
85-
"initialInputBufferOffset": "0",
86-
"isStreaming": "false",
85+
"initialInputBufferOffset": 0,
86+
"isStreaming": false,
8787
"numShufflePartitions": "none",
8888
"requiredChildDistributionExpressions": "none",
8989
"resultExpressions": [
@@ -157,8 +157,8 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
157157
"aggregateAttributes": ["avg(double_col#1)#4"],
158158
"aggregateExpressions": ["avg(double_col#1)"],
159159
"groupingExpressions": ["string_col#0"],
160-
"initialInputBufferOffset": "1",
161-
"isStreaming": "false",
160+
"initialInputBufferOffset": 1,
161+
"isStreaming": false,
162162
"numShufflePartitions": "none",
163163
"requiredChildDistributionExpressions": ["string_col#0"],
164164
"resultExpressions": [
@@ -206,7 +206,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
206206
"nodeDetailString": "0",
207207
"meta": {
208208
"_dd.unparsed": "any",
209-
"id": "0"
209+
"id": 0
210210
},
211211
"children": [
212212
{
@@ -313,8 +313,8 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
313313
"aggregateAttributes": ["avg(double_col#1)#4"],
314314
"aggregateExpressions": ["avg(double_col#1)"],
315315
"groupingExpressions": ["string_col#0"],
316-
"initialInputBufferOffset": "1",
317-
"isStreaming": "false",
316+
"initialInputBufferOffset": 1,
317+
"isStreaming": false,
318318
"numShufflePartitions": "none",
319319
"requiredChildDistributionExpressions": ["string_col#0"],
320320
"resultExpressions": [
@@ -362,7 +362,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
362362
"nodeDetailString": "0",
363363
"meta": {
364364
"_dd.unparsed": "any",
365-
"id": "0"
365+
"id": 0
366366
},
367367
"children": [
368368
{
@@ -452,9 +452,9 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
452452
"nodeDetailString": "[avg(double_col)#5 DESC NULLS LAST], true, 0",
453453
"meta": {
454454
"_dd.unparsed": "any",
455-
"global": "true",
455+
"global": true,
456456
"sortOrder": ["avg(double_col)#5 DESC NULLS LAST"],
457-
"testSpillFrequency": "0"
457+
"testSpillFrequency": 0
458458
},
459459
"metrics": [
460460
{
@@ -492,7 +492,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
492492
"nodeDetailString": "1",
493493
"meta": {
494494
"_dd.unparsed": "any",
495-
"id": "1"
495+
"id": 1
496496
},
497497
"children": [
498498
{
@@ -796,9 +796,9 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
796796
"_dd.unparsed": "any",
797797
"aggregateAttributes": ["count#45L"],
798798
"aggregateExpressions": ["partial_count(1)"],
799-
"groupingExpressions": [""],
800-
"initialInputBufferOffset": "0",
801-
"isStreaming": "false",
799+
"groupingExpressions": [],
800+
"initialInputBufferOffset": 0,
801+
"isStreaming": false,
802802
"numShufflePartitions": "none",
803803
"requiredChildDistributionExpressions": "none",
804804
"resultExpressions": ["count#46L"]
@@ -819,7 +819,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
819819
"nodeId": "nodeId_13",
820820
"meta": {
821821
"_dd.unparsed": "any",
822-
"projectList": [""]
822+
"projectList": []
823823
},
824824
"children": [
825825
{
@@ -829,7 +829,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
829829
"meta": {
830830
"_dd.unparsed": "any",
831831
"condition": "none",
832-
"isSkewJoin": "false",
832+
"isSkewJoin": false,
833833
"joinType": "Inner",
834834
"leftKeys": ["string_col#28"],
835835
"rightKeys": ["string_col#32"]
@@ -863,9 +863,9 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
863863
"nodeDetailString": "[string_col#28 ASC NULLS FIRST], false, 0",
864864
"meta": {
865865
"_dd.unparsed": "any",
866-
"global": "false",
866+
"global": false,
867867
"sortOrder": ["string_col#28 ASC NULLS FIRST"],
868-
"testSpillFrequency": "0"
868+
"testSpillFrequency": 0
869869
},
870870
"metrics": [
871871
{
@@ -903,7 +903,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
903903
"nodeDetailString": "0",
904904
"meta": {
905905
"_dd.unparsed": "any",
906-
"id": "0"
906+
"id": 0
907907
},
908908
"children": [
909909
{
@@ -984,9 +984,9 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
984984
"nodeDetailString": "[string_col#32 ASC NULLS FIRST], false, 0",
985985
"meta": {
986986
"_dd.unparsed": "any",
987-
"global": "false",
987+
"global": false,
988988
"sortOrder": ["string_col#32 ASC NULLS FIRST"],
989-
"testSpillFrequency": "0"
989+
"testSpillFrequency": 0
990990
},
991991
"metrics": [
992992
{
@@ -1024,7 +1024,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
10241024
"nodeDetailString": "1",
10251025
"meta": {
10261026
"_dd.unparsed": "any",
1027-
"id": "1"
1027+
"id": 1
10281028
},
10291029
"children": [
10301030
{
@@ -1114,11 +1114,11 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
11141114
"_dd.unparsed": "any",
11151115
"aggregateAttributes": ["count(1)#42L"],
11161116
"aggregateExpressions": ["count(1)"],
1117-
"groupingExpressions": [""],
1118-
"initialInputBufferOffset": "0",
1119-
"isStreaming": "false",
1117+
"groupingExpressions": [],
1118+
"initialInputBufferOffset": 0,
1119+
"isStreaming": false,
11201120
"numShufflePartitions": "none",
1121-
"requiredChildDistributionExpressions": [""],
1121+
"requiredChildDistributionExpressions": [],
11221122
"resultExpressions": ["count(1)#42L AS count#43L"]
11231123
},
11241124
"metrics": [
@@ -1143,7 +1143,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
11431143
"nodeDetailString": "2",
11441144
"meta": {
11451145
"_dd.unparsed": "any",
1146-
"id": "2"
1146+
"id": 2
11471147
},
11481148
"children": [
11491149
{

0 commit comments

Comments
 (0)