File tree Expand file tree Collapse file tree 1 file changed +11
-3
lines changed
commons/src/main/java/io/aiven/kafka/connect/common/config Expand file tree Collapse file tree 1 file changed +11
-3
lines changed Original file line number Diff line number Diff line change 2020import java .time .ZoneId ;
2121import java .time .ZonedDateTime ;
2222
23+ import org .apache .kafka .connect .data .Field ;
24+ import org .apache .kafka .connect .data .Schema ;
25+ import org .apache .kafka .connect .data .Struct ;
2326import org .apache .kafka .connect .header .Header ;
2427import org .apache .kafka .connect .sink .SinkRecord ;
2528
@@ -148,11 +151,11 @@ public ZonedDateTime time(final SinkRecord record) {
148151 }
149152
150153 final class DataTimestampSource extends AbstractTimestampSource {
151- private final String field ;
154+ private final String fieldName ;
152155
153156 DataTimestampSource (final String field , final ZoneId zoneId ) {
154157 super (zoneId , Type .DATA );
155- this .field = field ;
158+ this .fieldName = field ;
156159 }
157160
158161 @ Override
@@ -162,7 +165,12 @@ public ZonedDateTime time(final SinkRecord record) {
162165 }
163166
164167 private Object extract (Object value ) {
165- //TODO
168+ if (value instanceof Struct ) {
169+ Struct data = (Struct ) value ;
170+ Schema schema = data .schema ();
171+ Field field = schema .field (fieldName );
172+ return field == null ? null : data .get (field );
173+ }
166174 return null ;
167175 }
168176 }
You can’t perform that action at this time.
0 commit comments