Skip to content

Commit dfae097

Browse files
committed
[flinksql][kafka sink没有将Retract流中的false数据过滤掉][21182]
1 parent 5b1eef2 commit dfae097

File tree

4 files changed

+20
-12
lines changed
  • kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka

4 files changed

+20
-12
lines changed

kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,11 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
113113
serializationSchema
114114
);
115115

116-
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
117-
return record.f1;
118-
}).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism);
116+
DataStream<Row> ds = dataStream
117+
.filter((Tuple2<Boolean, Row> record) -> record.f0)
118+
.map((Tuple2<Boolean, Row> record) -> {return record.f1;})
119+
.returns(getOutputType().getTypeAt(1))
120+
.setParallelism(parallelism);
119121

120122
kafkaTableSink.emitDataStream(ds);
121123
}

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,11 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
117117
serializationSchema
118118
);
119119

120-
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
121-
return record.f1;
122-
}).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism);
120+
DataStream<Row> ds = dataStream
121+
.filter((Tuple2<Boolean, Row> record) -> record.f0)
122+
.map((Tuple2<Boolean, Row> record) -> {return record.f1;})
123+
.returns(getOutputType().getTypeAt(1))
124+
.setParallelism(parallelism);
123125

124126
kafkaTableSink.emitDataStream(ds);
125127
}

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,11 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
120120
serializationSchema
121121
);
122122

123-
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
124-
return record.f1;
125-
}).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism);
123+
DataStream<Row> ds = dataStream
124+
.filter((Tuple2<Boolean, Row> record) -> record.f0)
125+
.map((Tuple2<Boolean, Row> record) -> {return record.f1;})
126+
.returns(getOutputType().getTypeAt(1))
127+
.setParallelism(parallelism);
126128

127129
kafkaTableSink.emitDataStream(ds);
128130
}

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,11 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
119119
serializationSchema
120120
);
121121

122-
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
123-
return record.f1;
124-
}).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism);
122+
DataStream<Row> ds = dataStream
123+
.filter((Tuple2<Boolean, Row> record) -> record.f0)
124+
.map((Tuple2<Boolean, Row> record) -> {return record.f1;})
125+
.returns(getOutputType().getTypeAt(1))
126+
.setParallelism(parallelism);
125127

126128
kafkaTableSink.emitDataStream(ds);
127129
}

0 commit comments

Comments
 (0)