Skip to content

Commit 42d4b0b

Browse files
committed
http154 initial prototype
Signed-off-by: davidradl <[email protected]>
1 parent e00d576 commit 42d4b0b

File tree

9 files changed

+448
-84
lines changed

9 files changed

+448
-84
lines changed

README.md

Lines changed: 69 additions & 45 deletions
Large diffs are not rendered by default.

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ public final class HttpConnectorConfigConstants {
8888
public static final String SOURCE_CONNECTION_TIMEOUT =
8989
SOURCE_LOOKUP_PREFIX + "connection.timeout";
9090

91+
public static final String FAIL_JOB_ON_ERROR =
92+
SOURCE_LOOKUP_PREFIX + "fail-job-on-error";
93+
9194
public static final String SOURCE_PROXY_HOST =
9295
SOURCE_LOOKUP_PREFIX + "proxy.host";
9396

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ public class HttpLookupConnectorOptions {
8282
.noDefaultValue()
8383
.withDescription("Http client connection timeout.");
8484

85+
public static final ConfigOption<Boolean> SOURCE_LOOKUP_FAIL_JOB_ON_ERROR =
86+
ConfigOptions.key(FAIL_JOB_ON_ERROR)
87+
.booleanType()
88+
.defaultValue(true)
89+
.withDescription("Fail job on error.");
90+
8591
public static final ConfigOption<String> SOURCE_LOOKUP_PROXY_HOST =
8692
ConfigOptions.key(SOURCE_PROXY_HOST)
8793
.stringType()

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java

Lines changed: 128 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.getindata.connectors.http.internal.table.lookup;
22

3-
import java.util.ArrayList;
4-
import java.util.List;
3+
import java.net.http.HttpResponse;
4+
import java.util.*;
5+
import java.util.stream.Collectors;
6+
import java.util.stream.Stream;
57
import javax.annotation.Nullable;
68

79
import lombok.extern.slf4j.Slf4j;
@@ -15,12 +17,13 @@
1517
import org.apache.flink.table.connector.source.LookupTableSource;
1618
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
1719
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
20+
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
1821
import org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider ;
1922
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
2023
import org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider;
2124
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
2225
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
23-
import org.apache.flink.table.data.RowData;
26+
import org.apache.flink.table.data.*;
2427
import org.apache.flink.table.factories.DynamicTableFactory;
2528
import org.apache.flink.table.factories.FactoryUtil;
2629
import org.apache.flink.table.functions.AsyncLookupFunction;
@@ -42,7 +45,7 @@
4245

4346
@Slf4j
4447
public class HttpLookupTableSource
45-
implements LookupTableSource, SupportsProjectionPushDown, SupportsLimitPushDown {
48+
implements LookupTableSource, SupportsReadingMetadata, SupportsProjectionPushDown, SupportsLimitPushDown {
4649

4750
private DataType physicalRowDataType;
4851

@@ -54,6 +57,16 @@ public class HttpLookupTableSource
5457
@Nullable
5558
private final LookupCache cache;
5659

60+
// --------------------------------------------------------------------------------------------
61+
// Mutable attributes
62+
// --------------------------------------------------------------------------------------------
63+
64+
/** Data type that describes the final output of the source. */
65+
protected DataType producedDataType;
66+
67+
/** Metadata that is appended at the end of a physical source row. */
68+
protected List<String> metadataKeys;
69+
5770
public HttpLookupTableSource(
5871
DataType physicalRowDataType,
5972
HttpLookupConfig lookupConfig,
@@ -111,13 +124,26 @@ protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow,
111124
responseSchemaDecoder,
112125
PollingClientFactory<RowData>
113126
pollingClientFactory) {
114-
127+
MetadataConverter[] metadataConverters={};
128+
if (this.metadataKeys != null) {
129+
this.metadataKeys.stream()
130+
.map(
131+
k ->
132+
Stream.of(HttpLookupTableSource.ReadableMetadata.values())
133+
.filter(rm -> rm.key.equals(k))
134+
.findFirst()
135+
.orElseThrow(IllegalStateException::new))
136+
.map(m -> m.converter)
137+
.toArray(MetadataConverter[]::new);
138+
}
115139
HttpTableLookupFunction dataLookupFunction =
116140
new HttpTableLookupFunction(
117141
pollingClientFactory,
118142
responseSchemaDecoder,
119143
lookupRow,
120-
lookupConfig
144+
lookupConfig,
145+
metadataConverters,
146+
this.producedDataType
121147
);
122148
if (lookupConfig.isUseAsync()) {
123149
AsyncLookupFunction asyncLookupFunction =
@@ -256,4 +282,100 @@ private LookupSchemaEntry<RowData> processRow(RowField rowField, int parentIndex
256282
RowData.createFieldGetter(type1, parentIndex));
257283
}
258284
}
285+
286+
@Override
287+
public Map<String, DataType> listReadableMetadata() {
288+
final Map<String, DataType> metadataMap = new LinkedHashMap<>();
289+
290+
decodingFormat.listReadableMetadata()
291+
.forEach((key, value) -> metadataMap.put(key, value));
292+
293+
// according to convention, the order of the final row must be
294+
// PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
295+
// where the format metadata has highest precedence
296+
// add connector metadata
297+
Stream.of(ReadableMetadata.values()).forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
298+
return metadataMap;
299+
}
300+
301+
@Override
302+
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
303+
// separate connector and format metadata
304+
final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys);
305+
final Map<String, DataType> formatMetadata = decodingFormat.listReadableMetadata();
306+
// store non connector keys and remove them from the connectorMetadataKeys.
307+
List<String> formatMetadataKeys = new ArrayList<>();
308+
Set<String> metadataKeysSet = metadataKeys.stream().collect(Collectors.toSet());
309+
for (ReadableMetadata rm : ReadableMetadata.values()) {
310+
String metadataKeyToCheck = rm.name();
311+
if (!metadataKeysSet.contains(metadataKeyToCheck)) {
312+
formatMetadataKeys.add(metadataKeyToCheck);
313+
connectorMetadataKeys.remove(metadataKeyToCheck);
314+
}
315+
}
316+
// push down format metadata keys
317+
if (formatMetadata.size() > 0) {
318+
final List<String> requestedFormatMetadataKeys =
319+
formatMetadataKeys.stream().collect(Collectors.toList());
320+
decodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
321+
}
322+
this.metadataKeys = connectorMetadataKeys;
323+
this.producedDataType = producedDataType;
324+
}
325+
// --------------------------------------------------------------------------------------------
326+
// Metadata handling
327+
// --------------------------------------------------------------------------------------------
328+
enum ReadableMetadata {
329+
HTTP_ERROR_STRING(
330+
"error_string",
331+
DataTypes.STRING(),
332+
new MetadataConverter() {
333+
private static final long serialVersionUID = 1L;
334+
@Override
335+
public Object read(String msg, HttpResponse httpResponse) {
336+
return StringData.fromString(msg);
337+
}
338+
}),
339+
HTTP_ERROR_CODE(
340+
"error_code",
341+
DataTypes.INT(),
342+
new MetadataConverter() {
343+
private static final long serialVersionUID = 1L;
344+
@Override
345+
public Object read(String msg, HttpResponse httpResponse) {
346+
return httpResponse != null? httpResponse.statusCode():null;
347+
}
348+
}),
349+
HTTP_HEADERS(
350+
"error_headers",
351+
DataTypes.MAP(DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())),
352+
new MetadataConverter() {
353+
private static final long serialVersionUID = 1L;
354+
@Override
355+
public Object read(String msg, HttpResponse httpResponse) {
356+
if (httpResponse == null) return null;
357+
Map<String, List<String>> httpHeadersMap = httpResponse.headers().map();
358+
Map<StringData, ArrayData> stringDataMap = new HashMap<>();
359+
for (String key: httpHeadersMap.keySet()) {
360+
List<StringData> strDataList = new ArrayList<>();
361+
httpHeadersMap.get(key).stream()
362+
.forEach((c) -> strDataList.add(StringData.fromString(c)));
363+
stringDataMap.put(StringData.fromString(key), new GenericArrayData(strDataList.toArray()));
364+
}
365+
return new GenericMapData(stringDataMap);
366+
}
367+
});
368+
final String key;
369+
370+
final DataType dataType;
371+
final MetadataConverter converter;
372+
373+
//TODO decide if we need a MetadataConverter
374+
ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
375+
this.key = key;
376+
this.dataType = dataType;
377+
this.converter = converter;
378+
}
379+
}
259380
}
381+

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,27 @@
11
package com.getindata.connectors.http.internal.table.lookup;
22

3-
import java.util.Collection;
3+
import java.net.http.HttpResponse;
4+
import java.util.*;
45
import java.util.concurrent.atomic.AtomicInteger;
56

67
import lombok.AccessLevel;
78
import lombok.Getter;
89
import lombok.extern.slf4j.Slf4j;
910
import org.apache.flink.annotation.VisibleForTesting;
1011
import org.apache.flink.api.common.serialization.DeserializationSchema;
12+
import org.apache.flink.table.data.GenericRowData;
1113
import org.apache.flink.table.data.RowData;
1214
import org.apache.flink.table.functions.FunctionContext;
1315
import org.apache.flink.table.functions.LookupFunction;
16+
import org.apache.flink.table.types.DataType;
17+
import org.apache.flink.table.types.logical.LogicalType;
18+
import org.apache.flink.types.RowKind;
1419

20+
import com.getindata.connectors.http.HttpStatusCodeValidationFailedException;
1521
import com.getindata.connectors.http.internal.PollingClient;
1622
import com.getindata.connectors.http.internal.PollingClientFactory;
1723
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
24+
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_FAIL_JOB_ON_ERROR;
1825

1926
@Slf4j
2027
public class HttpTableLookupFunction extends LookupFunction {
@@ -30,21 +37,28 @@ public class HttpTableLookupFunction extends LookupFunction {
3037
@VisibleForTesting
3138
@Getter(AccessLevel.PACKAGE)
3239
private final HttpLookupConfig options;
33-
40+
private final DataType producedDataType;
41+
private final Boolean fail_job_on_error;
3442
private transient AtomicInteger localHttpCallCounter;
35-
3643
private transient PollingClient<RowData> client;
37-
44+
private final MetadataConverter[] metadataConverters;
3845
public HttpTableLookupFunction(
3946
PollingClientFactory<RowData> pollingClientFactory,
4047
DeserializationSchema<RowData> responseSchemaDecoder,
4148
LookupRow lookupRow,
42-
HttpLookupConfig options) {
49+
HttpLookupConfig options,
50+
MetadataConverter[] metadataConverters,
51+
DataType producedDataType
52+
) {
4353

4454
this.pollingClientFactory = pollingClientFactory;
4555
this.responseSchemaDecoder = responseSchemaDecoder;
4656
this.lookupRow = lookupRow;
4757
this.options = options;
58+
this.metadataConverters = metadataConverters;
59+
this.producedDataType = producedDataType;
60+
Optional<Boolean> optional = options.getReadableConfig().getOptional(SOURCE_LOOKUP_FAIL_JOB_ON_ERROR);
61+
this.fail_job_on_error = optional.orElse(true);
4862
}
4963

5064
@Override
@@ -66,6 +80,67 @@ public void open(FunctionContext context) throws Exception {
6680
@Override
6781
public Collection<RowData> lookup(RowData keyRow) {
6882
localHttpCallCounter.incrementAndGet();
69-
return client.pull(keyRow);
83+
List<RowData> outputList = new ArrayList<>();
84+
// TODO turn into a constant?
85+
final int metadataArity = metadataConverters.length;
86+
try {
87+
Collection<RowData> httpCollector = client.pull(keyRow);
88+
if (httpCollector.size() != 1) {
89+
throw new RuntimeException("Expected 1 row data got " + httpCollector.size());
90+
}
91+
GenericRowData physicalRow = (GenericRowData) httpCollector.iterator().next();
92+
final int physicalArity = physicalRow.getArity();
93+
final GenericRowData producedRow =
94+
new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
95+
// We need to copy in the physical row into the producedRow
96+
for (int pos = 0; pos < physicalArity; pos++) {
97+
producedRow.setField(pos, physicalRow.getField(pos));
98+
}
99+
outputList.add(producedRow);
100+
} catch (Exception e) {
101+
if (this.fail_job_on_error) throw e;
102+
Throwable cause =e.getCause();
103+
if (cause instanceof HttpStatusCodeValidationFailedException) {
104+
final GenericRowData producedRow = getProducedRowForError(
105+
cause.getMessage(),
106+
((HttpStatusCodeValidationFailedException) cause).getResponse(),
107+
metadataArity,
108+
metadataConverters);
109+
outputList.add(producedRow);
110+
} else {
111+
// cause might not have a message so just use it's toString.
112+
String msg= e.getMessage() +"," + cause;
113+
final GenericRowData producedRow = getProducedRowForError(
114+
msg,
115+
null,
116+
metadataArity,
117+
metadataConverters);
118+
outputList.add(producedRow);
119+
}
120+
}
121+
return outputList;
122+
}
123+
124+
private GenericRowData getProducedRowForError(String msg,
125+
HttpResponse httpResponse,
126+
int metadataArity,
127+
MetadataConverter[] metadataConverters) {
128+
List<LogicalType> childrenLogicalTypes=producedDataType.getLogicalType().getChildren();
129+
final GenericRowData producedRow =
130+
new GenericRowData(RowKind.INSERT, childrenLogicalTypes.size());
131+
int physicalArity=childrenLogicalTypes.size()-metadataArity;
132+
// by not specifying the physical field values we get:
133+
// for physical nullable fields, the values are null
134+
// for physical non nullable fields the values are the "default" for a type
135+
for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
136+
producedRow.setField(
137+
physicalArity + metadataPos,
138+
metadataConverters[metadataPos].read(msg, httpResponse));
139+
}
140+
return producedRow;
141+
}
142+
@VisibleForTesting
143+
Boolean getFail_job_on_error() {
144+
return fail_job_on_error;
70145
}
71146
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.getindata.connectors.http.internal.table.lookup;
2+
import java.io.Serializable;
3+
import java.net.http.HttpResponse;
4+
5+
interface MetadataConverter extends Serializable {
6+
Object read(String msg, HttpResponse httpResponse);
7+
}

0 commit comments

Comments
 (0)