Skip to content

Commit c873b5a

Browse files
author
Mike Skells
committed
allow a more complex path
1 parent dc0a153 commit c873b5a

File tree

2 files changed

+76
-16
lines changed

2 files changed

+76
-16
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package io.aiven.kafka.connect.common.config;
2+
3+
4+
import org.apache.kafka.connect.data.Field;
5+
import org.apache.kafka.connect.data.Schema;
6+
import org.apache.kafka.connect.data.Struct;
7+
import org.apache.kafka.connect.sink.SinkRecord;
8+
9+
import java.util.List;
10+
import java.util.Map;
11+
12+
public class Path {
13+
private final String[] terms;
14+
15+
public Path(String[] terms) {
16+
this.terms = terms;
17+
}
18+
19+
/**
20+
* Parse a path definition string into a Path object.
21+
* The path definition string is a '.' seperated series of strings, which are the terms in the path
22+
* If the '.' is something that should be included in the terms, and you want to use a different seperator, then
23+
* you can specify the seperator as the first character in the descriptions, and have the second character as ':'
24+
* @param pathDefinition
25+
* @return
26+
*/
27+
public static Path parse(String pathDefinition) {
28+
final String pathDescription;
29+
final String pathSeperator;
30+
if (pathDefinition.length()> 1 && pathDefinition.charAt(1) == ':' ) {
31+
pathDescription = pathDefinition.substring(2);
32+
pathSeperator = pathDefinition.substring(0,1);
33+
} else {
34+
pathDescription = pathDefinition;
35+
pathSeperator = ":";
36+
}
37+
return new Path(pathDescription.split(pathSeperator));
38+
39+
}
40+
41+
public Object extractDataFrom(SinkRecord value) {
42+
Object current = value;
43+
//Schema scurrentSchema = value.valueSchema();
44+
45+
for (String term : terms) {
46+
if (current == null) {
47+
return null;
48+
}
49+
if (current instanceof Struct) {
50+
Struct struct = (Struct) current;
51+
Schema schema = struct.schema();
52+
Field field = schema.field(term);
53+
if (field == null) {
54+
return null;
55+
}
56+
current = struct.get(field);
57+
}
58+
if (current instanceof Map) {
59+
current = ((Map<?, ?>) current).get(term);
60+
} else if (current instanceof List) {
61+
try {
62+
current = ((List<?>) current).get(Integer.parseInt(term));
63+
} catch (NumberFormatException|IndexOutOfBoundsException e) {
64+
return null;
65+
}
66+
} else {
67+
return null;
68+
}
69+
}
70+
return current;
71+
}
72+
}

commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@
2020
import java.time.ZoneId;
2121
import java.time.ZonedDateTime;
2222

23-
import org.apache.kafka.connect.data.Field;
24-
import org.apache.kafka.connect.data.Schema;
25-
import org.apache.kafka.connect.data.Struct;
2623
import org.apache.kafka.connect.header.Header;
2724
import org.apache.kafka.connect.sink.SinkRecord;
2825

@@ -151,28 +148,19 @@ public ZonedDateTime time(final SinkRecord record) {
151148
}
152149

153150
final class DataTimestampSource extends AbstractTimestampSource {
154-
private final String fieldName;
151+
private final Path path;
155152

156-
DataTimestampSource(final String field, final ZoneId zoneId) {
153+
DataTimestampSource(final String pathDefinition, final ZoneId zoneId) {
157154
super(zoneId, Type.DATA);
158-
this.fieldName = field;
155+
this.path = Path.parse(pathDefinition);
159156
}
160157

161158
@Override
162159
public ZonedDateTime time(final SinkRecord record) {
163-
Object value = extract(record.value());
160+
Object value = path.extractDataFrom(record);
164161
return rawTime(value);
165162
}
166163

167-
private Object extract(Object value) {
168-
if (value instanceof Struct) {
169-
Struct data = (Struct) value;
170-
Schema schema = data.schema();
171-
Field field = schema.field(fieldName);
172-
return field == null ? null : data.get(field);
173-
}
174-
return null;
175-
}
176164
}
177165

178166
final class HeaderTimestampSource extends AbstractTimestampSource {

0 commit comments

Comments
 (0)