Skip to content

Commit d91ac50

Browse files
committed
格式调整
1 parent da651e6 commit d91ac50

File tree

13 files changed

+139
-30
lines changed

13 files changed

+139
-30
lines changed

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaConsumerFactory.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,48 +34,63 @@
3434
import java.util.Properties;
3535

3636
/**
37-
* company: www.dtstack.com
3837
*
38+
* company: www.dtstack.com
3939
* @author: toutian
4040
* create: 2019/12/24
4141
*/
4242
public abstract class AbstractKafkaConsumerFactory {
4343

44-
protected abstract FlinkKafkaConsumerBase<Row> createKafkaTableSource(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation, Properties props);
44+
protected abstract FlinkKafkaConsumerBase<Row> createKafkaTableSource(KafkaSourceTableInfo kafkaSourceTableInfo,
45+
TypeInformation<Row> typeInformation,
46+
Properties props);
4547

46-
protected DeserializationMetricWrapper createDeserializationMetricWrapper(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation, Calculate calculate) {
47-
return new KafkaDeserializationMetricWrapper(typeInformation, createDeserializationSchema(kafkaSourceTableInfo, typeInformation), calculate);
48+
protected DeserializationMetricWrapper createDeserializationMetricWrapper(KafkaSourceTableInfo kafkaSourceTableInfo,
49+
TypeInformation<Row> typeInformation,
50+
Calculate calculate) {
51+
return new KafkaDeserializationMetricWrapper(typeInformation,
52+
createDeserializationSchema(kafkaSourceTableInfo, typeInformation),
53+
calculate);
4854
}
4955

5056
private DeserializationSchema<Row> createDeserializationSchema(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation) {
5157
DeserializationSchema<Row> deserializationSchema = null;
5258
if (FormatType.DT_NEST.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
5359
deserializationSchema = new DtNestRowDeserializationSchema(typeInformation, kafkaSourceTableInfo.getPhysicalFields(), kafkaSourceTableInfo.getFieldExtraInfoList());
60+
5461
} else if (FormatType.JSON.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
62+
5563
if (StringUtils.isNotBlank(kafkaSourceTableInfo.getSchemaString())) {
5664
deserializationSchema = new JsonRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
5765
} else if (typeInformation != null && typeInformation.getArity() != 0) {
5866
deserializationSchema = new JsonRowDeserializationSchema(typeInformation);
5967
} else {
6068
throw new IllegalArgumentException("sourceDataType:" + FormatType.JSON.name() + " must set schemaString(JSON Schema)or TypeInformation<Row>");
6169
}
70+
6271
} else if (FormatType.CSV.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
72+
6373
if (StringUtils.isBlank(kafkaSourceTableInfo.getFieldDelimiter())) {
6474
throw new IllegalArgumentException("sourceDataType:" + FormatType.CSV.name() + " must set fieldDelimiter");
6575
}
76+
6677
final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(typeInformation);
6778
deserSchemaBuilder.setFieldDelimiter(kafkaSourceTableInfo.getFieldDelimiter().toCharArray()[0]);
6879
deserializationSchema = deserSchemaBuilder.build();
80+
6981
} else if (FormatType.AVRO.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
82+
7083
if (StringUtils.isBlank(kafkaSourceTableInfo.getSchemaString())) {
7184
throw new IllegalArgumentException("sourceDataType:" + FormatType.AVRO.name() + " must set schemaString");
7285
}
86+
7387
deserializationSchema = new AvroRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
7488
}
7589

7690
if (null == deserializationSchema) {
7791
throw new UnsupportedOperationException("FormatType:" + kafkaSourceTableInfo.getSourceDataType());
7892
}
93+
7994
return deserializationSchema;
8095
}
8196

kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,11 @@ public TypeInformation<Row> getRecordType() {
104104

105105
@Override
106106
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
107-
DataStream<Row> mapDataStream = dataStream.filter((Tuple2<Boolean, Row> record) -> record.f0).map((Tuple2<Boolean, Row> record) -> record.f1).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism);
107+
DataStream<Row> mapDataStream = dataStream.filter((Tuple2<Boolean, Row> record) -> record.f0)
108+
.map((Tuple2<Boolean, Row> record) -> record.f1)
109+
.returns(getOutputType().getTypeAt(1))
110+
.setParallelism(parallelism);
111+
108112
mapDataStream.addSink(flinkKafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
109113
}
110114

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
* Reason:
4242
* Date: 2018/10/19
4343
* Company: www.dtstack.com
44-
*
4544
* @author xuchao
4645
*/
4746
public class KafkaConsumer extends FlinkKafkaConsumer<Row> {
@@ -66,8 +65,24 @@ public void run(SourceFunction.SourceContext<Row> sourceContext) throws Exceptio
6665
}
6766

6867
@Override
69-
protected AbstractFetcher<Row, ?> createFetcher(SourceFunction.SourceContext<Row> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<Row>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<Row>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
70-
AbstractFetcher<Row, ?> fetcher = super.createFetcher(sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext, offsetCommitMode, consumerMetricGroup, useMetrics);
68+
protected AbstractFetcher<Row, ?> createFetcher(SourceFunction.SourceContext<Row> sourceContext,
69+
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
70+
SerializedValue<AssignerWithPeriodicWatermarks<Row>> watermarksPeriodic,
71+
SerializedValue<AssignerWithPunctuatedWatermarks<Row>> watermarksPunctuated,
72+
StreamingRuntimeContext runtimeContext,
73+
OffsetCommitMode offsetCommitMode,
74+
MetricGroup consumerMetricGroup,
75+
boolean useMetrics) throws Exception {
76+
77+
AbstractFetcher<Row, ?> fetcher = super.createFetcher(sourceContext,
78+
assignedPartitionsWithInitialOffsets,
79+
watermarksPeriodic,
80+
watermarksPunctuated,
81+
runtimeContext,
82+
offsetCommitMode,
83+
consumerMetricGroup,
84+
useMetrics);
85+
7186
((KafkaDeserializationMetricWrapper) deserializationMetricWrapper).setFetcher(fetcher);
7287
return fetcher;
7388
}

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumerFactory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.source.kafka;
2020

21+
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
2122
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
2223
import org.apache.flink.api.common.typeinfo.TypeInformation;
2324
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
@@ -40,9 +41,11 @@ public class KafkaConsumerFactory extends AbstractKafkaConsumerFactory {
4041
public FlinkKafkaConsumerBase<Row> createKafkaTableSource(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation, Properties props) {
4142
KafkaConsumer kafkaSrc = null;
4243
if (kafkaSourceTableInfo.getTopicIsPattern()) {
43-
kafkaSrc = new KafkaConsumer(Pattern.compile(kafkaSourceTableInfo.getTopic()), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> subscriptionState.partitionLag(tp, IsolationLevel.READ_UNCOMMITTED)), props);
44+
DeserializationMetricWrapper deserMetricWrapper = createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> subscriptionState.partitionLag(tp, IsolationLevel.READ_UNCOMMITTED));
45+
kafkaSrc = new KafkaConsumer(Pattern.compile(kafkaSourceTableInfo.getTopic()), deserMetricWrapper, props);
4446
} else {
45-
kafkaSrc = new KafkaConsumer(kafkaSourceTableInfo.getTopic(), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> subscriptionState.partitionLag(tp, IsolationLevel.READ_UNCOMMITTED)), props);
47+
DeserializationMetricWrapper deserMetricWrapper = createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> subscriptionState.partitionLag(tp, IsolationLevel.READ_UNCOMMITTED));
48+
kafkaSrc = new KafkaConsumer(kafkaSourceTableInfo.getTopic(), deserMetricWrapper, props);
4649
}
4750
return kafkaSrc;
4851
}

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
9797
this.parallelism = parallelism;
9898
}
9999

100-
this.kafkaProducer09 = (FlinkKafkaProducer09<Row>) new KafkaProducer09Factory().createKafkaProducer(kafka09SinkTableInfo, getOutputType().getTypeAt(1), properties, partitioner);
100+
this.kafkaProducer09 = (FlinkKafkaProducer09<Row>) new KafkaProducer09Factory()
101+
.createKafkaProducer(kafka09SinkTableInfo, getOutputType().getTypeAt(1), properties, partitioner);
101102
return this;
102103
}
103104

@@ -108,8 +109,13 @@ public TypeInformation<Row> getRecordType() {
108109

109110
@Override
110111
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
111-
DataStream<Row> mapDataStream = dataStream.filter((Tuple2<Boolean, Row> record) -> record.f0).map((Tuple2<Boolean, Row> record) -> record.f1).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism);
112-
mapDataStream.addSink(kafkaProducer09).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
112+
DataStream<Row> mapDataStream = dataStream.filter((Tuple2<Boolean, Row> record) -> record.f0)
113+
.map((Tuple2<Boolean, Row> record) -> record.f1)
114+
.returns(getOutputType().getTypeAt(1))
115+
.setParallelism(parallelism);
116+
117+
mapDataStream.addSink(kafkaProducer09)
118+
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
113119
}
114120

115121
@Override

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer09.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,24 @@ public void run(SourceFunction.SourceContext<Row> sourceContext) throws Exceptio
6666
}
6767

6868
@Override
69-
protected AbstractFetcher<Row, ?> createFetcher(SourceFunction.SourceContext<Row> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<Row>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<Row>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
70-
AbstractFetcher<Row, ?> fetcher = super.createFetcher(sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext, offsetCommitMode, consumerMetricGroup, useMetrics);
69+
protected AbstractFetcher<Row, ?> createFetcher(SourceFunction.SourceContext<Row> sourceContext,
70+
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
71+
SerializedValue<AssignerWithPeriodicWatermarks<Row>> watermarksPeriodic,
72+
SerializedValue<AssignerWithPunctuatedWatermarks<Row>> watermarksPunctuated,
73+
StreamingRuntimeContext runtimeContext,
74+
OffsetCommitMode offsetCommitMode,
75+
MetricGroup consumerMetricGroup,
76+
boolean useMetrics) throws Exception {
77+
78+
AbstractFetcher<Row, ?> fetcher = super.createFetcher(sourceContext,
79+
assignedPartitionsWithInitialOffsets,
80+
watermarksPeriodic,
81+
watermarksPunctuated,
82+
runtimeContext,
83+
offsetCommitMode,
84+
consumerMetricGroup,
85+
useMetrics);
86+
7187
((KafkaDeserializationMetricWrapper) deserializationMetricWrapper).setFetcher(fetcher);
7288
return fetcher;
7389
}

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer09Factory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.source.kafka;
2020

21+
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
2122
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
2223
import org.apache.flink.api.common.typeinfo.TypeInformation;
2324
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
@@ -39,9 +40,11 @@ public class KafkaConsumer09Factory extends AbstractKafkaConsumerFactory {
3940
public FlinkKafkaConsumerBase<Row> createKafkaTableSource(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation, Properties props) {
4041
KafkaConsumer09 kafkaSrc = null;
4142
if (kafkaSourceTableInfo.getTopicIsPattern()) {
42-
kafkaSrc = new KafkaConsumer09(Pattern.compile(kafkaSourceTableInfo.getTopic()), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> 0L), props);
43+
DeserializationMetricWrapper deserMetricWrapper = createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> 0L);
44+
kafkaSrc = new KafkaConsumer09(Pattern.compile(kafkaSourceTableInfo.getTopic()), deserMetricWrapper, props);
4345
} else {
44-
kafkaSrc = new KafkaConsumer09(kafkaSourceTableInfo.getTopic(), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> 0L), props);
46+
DeserializationMetricWrapper deserMetricWrapper = createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> 0L);
47+
kafkaSrc = new KafkaConsumer09(kafkaSourceTableInfo.getTopic(), deserMetricWrapper, props);
4548
}
4649
return kafkaSrc;
4750
}

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,11 @@ public TypeInformation<Row> getRecordType() {
111111

112112
@Override
113113
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
114-
DataStream<Row> mapDataStream = dataStream.filter((Tuple2<Boolean, Row> record) -> record.f0).map((Tuple2<Boolean, Row> record) -> record.f1).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism);
114+
DataStream<Row> mapDataStream = dataStream.filter((Tuple2<Boolean, Row> record) -> record.f0)
115+
.map((Tuple2<Boolean, Row> record) -> record.f1)
116+
.returns(getOutputType().getTypeAt(1))
117+
.setParallelism(parallelism);
118+
115119
mapDataStream.addSink(kafkaProducer010).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
116120
}
117121

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer010.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,22 @@ public void run(SourceContext<Row> sourceContext) throws Exception {
6565
}
6666

6767
@Override
68-
protected AbstractFetcher<Row, ?> createFetcher(SourceContext<Row> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<Row>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<Row>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
69-
AbstractFetcher<Row, ?> fetcher = super.createFetcher(sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext, offsetCommitMode, consumerMetricGroup, useMetrics);
68+
protected AbstractFetcher<Row, ?> createFetcher(SourceContext<Row> sourceContext,
69+
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
70+
SerializedValue<AssignerWithPeriodicWatermarks<Row>> watermarksPeriodic,
71+
SerializedValue<AssignerWithPunctuatedWatermarks<Row>> watermarksPunctuated,
72+
StreamingRuntimeContext runtimeContext,
73+
OffsetCommitMode offsetCommitMode,
74+
MetricGroup consumerMetricGroup,
75+
boolean useMetrics) throws Exception {
76+
77+
AbstractFetcher<Row, ?> fetcher = super.createFetcher(sourceContext,
78+
assignedPartitionsWithInitialOffsets,
79+
watermarksPeriodic, watermarksPunctuated,
80+
runtimeContext, offsetCommitMode,
81+
consumerMetricGroup,
82+
useMetrics);
83+
7084
((KafkaDeserializationMetricWrapper) deserializationMetricWrapper).setFetcher(fetcher);
7185
return fetcher;
7286
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer010Factory.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.source.kafka;
2020

21+
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
2122
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
2223
import org.apache.flink.api.common.typeinfo.TypeInformation;
2324
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
@@ -30,19 +31,24 @@
3031

3132
/**
3233
* company: www.dtstack.com
33-
* author: toutian
34+
* @author: toutian
3435
* create: 2019/12/24
3536
*/
3637
public class KafkaConsumer010Factory extends AbstractKafkaConsumerFactory {
3738

3839
@Override
39-
public FlinkKafkaConsumerBase<Row> createKafkaTableSource(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation, Properties props) {
40+
public FlinkKafkaConsumerBase<Row> createKafkaTableSource(KafkaSourceTableInfo kafkaSourceTableInfo,
41+
TypeInformation<Row> typeInformation,
42+
Properties props) {
4043
KafkaConsumer010 kafkaSrc = null;
4144
if (kafkaSourceTableInfo.getTopicIsPattern()) {
42-
kafkaSrc = new KafkaConsumer010(Pattern.compile(kafkaSourceTableInfo.getTopic()), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) SubscriptionState::partitionLag), props);
45+
DeserializationMetricWrapper deserMetricWrapper = createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) SubscriptionState::partitionLag);
46+
kafkaSrc = new KafkaConsumer010(Pattern.compile(kafkaSourceTableInfo.getTopic()), deserMetricWrapper, props);
4347
} else {
44-
kafkaSrc = new KafkaConsumer010(kafkaSourceTableInfo.getTopic(), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) SubscriptionState::partitionLag), props);
48+
DeserializationMetricWrapper deserMetricWrapper = createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) SubscriptionState::partitionLag);
49+
kafkaSrc = new KafkaConsumer010(kafkaSourceTableInfo.getTopic(), deserMetricWrapper, props);
4550
}
51+
4652
return kafkaSrc;
4753
}
4854

0 commit comments

Comments
 (0)