Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 86 additions & 45 deletions README.md

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package com.getindata.connectors.http.internal;

import java.util.Collection;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;

import com.getindata.connectors.http.internal.table.lookup.HttpRowDataWrapper;

/**
* A client that is used to get enrichment data from external component.
*/
public interface PollingClient<T> {
public interface PollingClient {

/**
* Gets enrichment data from external component using provided lookup arguments.
* @param lookupRow A {@link RowData} containing request parameters.
* @return an optional result of data lookup.
* @return an optional result of data lookup with http information.
*/
Collection<T> pull(RowData lookupRow);
HttpRowDataWrapper pull(RowData lookupRow);

/**
* Initialize the client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import java.io.Serializable;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.ConfigurationException;

import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;

public interface PollingClientFactory<OUT> extends Serializable {
public interface PollingClientFactory extends Serializable {

PollingClient<OUT> createPollClient(
PollingClient createPollClient(
HttpLookupConfig options,
DeserializationSchema<OUT> schemaDecoder
DeserializationSchema<RowData> schemaDecoder
) throws ConfigurationException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public final class HttpConnectorConfigConstants {
public static final String SOURCE_CONNECTION_TIMEOUT =
SOURCE_LOOKUP_PREFIX + "connection.timeout";

public static final String CONTINUE_ON_ERROR_ =
SOURCE_LOOKUP_PREFIX + "continue_on_error";

public static final String SOURCE_PROXY_HOST =
SOURCE_LOOKUP_PREFIX + "proxy.host";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.metrics.MetricGroup;

import com.getindata.connectors.http.HttpStatusCodeValidationFailedException;
import com.getindata.connectors.http.internal.status.HttpResponseChecker;

@Slf4j
Expand Down Expand Up @@ -48,16 +47,11 @@ public void registerMetrics(MetricGroup metrics){
public <T> HttpResponse<T> send(
Supplier<HttpRequest> requestSupplier,
HttpResponse.BodyHandler<T> responseBodyHandler
) throws IOException, InterruptedException, HttpStatusCodeValidationFailedException {
) throws IOException, InterruptedException{
try {
var response = Retry.decorateCheckedSupplier(retry,
return Retry.decorateCheckedSupplier(retry,
() -> httpClient.send(requestSupplier.get(), responseBodyHandler)).apply();
if (!responseChecker.isSuccessful(response)) {
throw new HttpStatusCodeValidationFailedException(
"Incorrect response code: " + response.statusCode(), response);
}
return response;
} catch (IOException | InterruptedException | HttpStatusCodeValidationFailedException e) {
} catch (IOException | InterruptedException e) {
throw e; //re-throw without wrapping
} catch (Throwable t) {
throw new RuntimeException("Unexpected exception", t);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.getindata.connectors.http.internal.table.lookup;

public enum ContinueOnErrorType {
HTTP_FAILED_AFTER_RETRY,
HTTP_FAILED,
CLIENT_SIDE_EXCEPTION,
NONE
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -15,12 +16,13 @@
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider ;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider;
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.*;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.functions.AsyncLookupFunction;
Expand All @@ -42,7 +44,7 @@

@Slf4j
public class HttpLookupTableSource
implements LookupTableSource, SupportsProjectionPushDown, SupportsLimitPushDown {
implements LookupTableSource, SupportsReadingMetadata, SupportsProjectionPushDown, SupportsLimitPushDown {

private DataType physicalRowDataType;

Expand All @@ -54,6 +56,16 @@ public class HttpLookupTableSource
@Nullable
private final LookupCache cache;

// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------

/** Data type that describes the final output of the source. */
protected DataType producedDataType;

/** Metadata that is appended at the end of a physical source row. */
protected List<String> metadataKeys;

public HttpLookupTableSource(
DataType physicalRowDataType,
HttpLookupConfig lookupConfig,
Expand Down Expand Up @@ -100,7 +112,7 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex
dynamicTableFactoryContext
);

PollingClientFactory<RowData> pollingClientFactory =
PollingClientFactory pollingClientFactory =
createPollingClientFactory(lookupQueryCreator, lookupConfig);

return getLookupRuntimeProvider(lookupRow, responseSchemaDecoder, pollingClientFactory);
Expand All @@ -109,15 +121,28 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex
protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow,
DeserializationSchema<RowData>
responseSchemaDecoder,
PollingClientFactory<RowData>
PollingClientFactory
pollingClientFactory) {

MetadataConverter[] metadataConverters={};
if (this.metadataKeys != null) {
metadataConverters = this.metadataKeys.stream()
.map(
k ->
Stream.of(HttpLookupTableSource.ReadableMetadata.values())
.filter(rm -> rm.key.equals(k))
.findFirst()
.orElseThrow(IllegalStateException::new))
.map(m -> m.converter)
.toArray(MetadataConverter[]::new);
}
HttpTableLookupFunction dataLookupFunction =
new HttpTableLookupFunction(
pollingClientFactory,
responseSchemaDecoder,
lookupRow,
lookupConfig
lookupConfig,
metadataConverters,
this.producedDataType
);
if (lookupConfig.isUseAsync()) {
AsyncLookupFunction asyncLookupFunction =
Expand Down Expand Up @@ -165,7 +190,7 @@ public boolean supportsNestedProjection() {
return true;
}

private PollingClientFactory<RowData> createPollingClientFactory(
private PollingClientFactory createPollingClientFactory(
LookupQueryCreator lookupQueryCreator,
HttpLookupConfig lookupConfig) {

Expand Down Expand Up @@ -256,4 +281,117 @@ private LookupSchemaEntry<RowData> processRow(RowField rowField, int parentIndex
RowData.createFieldGetter(type1, parentIndex));
}
}

@Override
public Map<String, DataType> listReadableMetadata() {
final Map<String, DataType> metadataMap = new LinkedHashMap<>();

decodingFormat.listReadableMetadata()
.forEach((key, value) -> metadataMap.put(key, value));

// according to convention, the order of the final row must be
// PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
// where the format metadata has highest precedence
// add connector metadata
Stream.of(ReadableMetadata.values()).forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
return metadataMap;
}

@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
// separate connector and format metadata
final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys);
final Map<String, DataType> formatMetadata = decodingFormat.listReadableMetadata();
// store non connector keys and remove them from the connectorMetadataKeys.
List<String> formatMetadataKeys = new ArrayList<>();
Set<String> metadataKeysSet = metadataKeys.stream().collect(Collectors.toSet());
for (ReadableMetadata rm : ReadableMetadata.values()) {
String metadataKeyToCheck = rm.name();
if (!metadataKeysSet.contains(metadataKeyToCheck)) {
formatMetadataKeys.add(metadataKeyToCheck);
connectorMetadataKeys.remove(metadataKeyToCheck);
}
}
// push down format metadata keys
if (formatMetadata.size() > 0) {
final List<String> requestedFormatMetadataKeys =
formatMetadataKeys.stream().collect(Collectors.toList());
decodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
}
this.metadataKeys = connectorMetadataKeys;
this.producedDataType = producedDataType;
}

// --------------------------------------------------------------------------------------------
// Metadata handling
// --------------------------------------------------------------------------------------------
enum ReadableMetadata {
ERROR_STRING(
"error_string",
DataTypes.STRING(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;

public Object read(String msg, HttpRowDataWrapper httpRowDataWrapper) {
return StringData.fromString(msg);
}
}),
HTTP_STATUS_CODE(
"http_status_code",
DataTypes.INT(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;

public Object read(String msg, HttpRowDataWrapper httpRowDataWrapper) {
return (httpRowDataWrapper != null) ? httpRowDataWrapper.getHttpStatusCode() : null;
}
}
),
HTTP_HEADERS(
"http_headers",
DataTypes.MAP(DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
public Object read(String msg, HttpRowDataWrapper httpRowDataWrapper) {
if (httpRowDataWrapper == null) {
return null;
}
Map<String, List<String>> httpHeadersMap = httpRowDataWrapper.getHttpHeadersMap();
Map<StringData, ArrayData> stringDataMap = new HashMap<>();
for (String key : httpHeadersMap.keySet()) {
List<StringData> strDataList = new ArrayList<>();
httpHeadersMap.get(key).stream()
.forEach((c) -> strDataList.add(StringData.fromString(c)));
stringDataMap.put(StringData.fromString(key), new GenericArrayData(strDataList.toArray()));
}
return new GenericMapData(stringDataMap);
}
}
),
CONTINUE_ON_ERROR_TYPE(
"continue_on_error_type",
DataTypes.STRING(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;

public Object read(String msg, HttpRowDataWrapper httpRowDataWrapper) {
if (httpRowDataWrapper == null) {
return null;
}
return StringData.fromString(httpRowDataWrapper.getContinueOnErrorType().name());
}
})
;
final String key;

final DataType dataType;
final MetadataConverter converter;

ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
this.key = key;
this.dataType = dataType;
this.converter = converter;
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.getindata.connectors.http.internal.table.lookup;


import java.util.Collection;
import java.util.List;
import java.util.Map;

import lombok.Data;
import org.apache.flink.table.data.RowData;

/**
* This bean contains the RowData information (the response body as a flink RowData).
* It also contains information from the http response, namely the http headers map
* and the http status code for the metadata columns.
*/
@Data
public class HttpRowDataWrapper {
private final Collection<RowData> data;
private final Map<String, List<String>> httpHeadersMap;
private final Integer httpStatusCode;
private final ContinueOnErrorType continueOnErrorType;
}
Loading
Loading