Skip to content

Commit bc71498

Browse files
committed
fix conflict
1 parent 49f2cdc commit bc71498

File tree

7 files changed

+16
-17
lines changed

7 files changed

+16
-17
lines changed

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
* @see Tuple
7070
* @see DriverManager
7171
*/
72-
public class CassandraOutputFormat extends DtRichOutputFormat {
72+
public class CassandraOutputFormat extends DtRichOutputFormat<Tuple2> {
7373
private static final long serialVersionUID = -7994311331389155692L;
7474

7575
private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
*
3838
* @author xuqianjin
3939
*/
40-
public class ConsoleOutputFormat extends DtRichOutputFormat {
40+
public class ConsoleOutputFormat extends DtRichOutputFormat<Tuple2> {
4141

4242
private static final Logger LOG = LoggerFactory.getLogger(ConsoleOutputFormat.class);
4343

core/src/main/java/com/dtstack/flink/sql/outputformat/DtRichOutputFormat.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
*/
3030
public abstract class DtRichOutputFormat<T> extends RichOutputFormat<T>{
3131

32-
protected transient Counter outRecords;
33-
protected transient Counter outDirtyRecords;
34-
protected transient Meter outRecordsRate;
32+
public transient Counter outRecords;
33+
public transient Counter outDirtyRecords;
34+
public transient Meter outRecordsRate;
3535

3636
protected static int ROW_PRINT_FREQUENCY = 1000;
3737
protected static int DIRTY_PRINT_FREQUENCY = 1000;

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/AbstractJDBCOutputFormat.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818

1919
package com.dtstack.flink.sql.sink.rdb.format;
2020

21-
import com.dtstack.flink.sql.sink.MetricOutputFormat;
21+
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
2222
import com.dtstack.flink.sql.util.JDBCUtils;
23-
import org.apache.flink.api.common.io.RichOutputFormat;
2423
import org.apache.flink.configuration.Configuration;
2524
import org.apache.flink.types.Row;
2625
import org.slf4j.Logger;
@@ -37,7 +36,7 @@
3736
* @see Row
3837
* @see DriverManager
3938
*/
40-
public abstract class AbstractJDBCOutputFormat<T> extends MetricOutputFormat<T> {
39+
public abstract class AbstractJDBCOutputFormat<T> extends DtRichOutputFormat<T> {
4140

4241
private static final long serialVersionUID = 1L;
4342
public static final int DEFAULT_FLUSH_MAX_SIZE = 100;

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Whitespace-only changes.

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/AppendOnlyWriter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package com.dtstack.flink.sql.sink.rdb.writer;
2020

21-
import com.dtstack.flink.sql.sink.MetricOutputFormat;
21+
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
2222
import org.apache.flink.api.java.tuple.Tuple2;
2323
import org.apache.flink.types.Row;
2424
import org.slf4j.Logger;
@@ -47,9 +47,9 @@ public class AppendOnlyWriter implements JDBCWriter {
4747
private transient PreparedStatement statement;
4848
private transient List<Row> rows;
4949
// only use metric
50-
private transient MetricOutputFormat metricOutputFormat;
50+
private transient DtRichOutputFormat metricOutputFormat;
5151

52-
public AppendOnlyWriter(String insertSQL, int[] fieldTypes, MetricOutputFormat metricOutputFormat) {
52+
public AppendOnlyWriter(String insertSQL, int[] fieldTypes, DtRichOutputFormat metricOutputFormat) {
5353
this.insertSQL = insertSQL;
5454
this.fieldTypes = fieldTypes;
5555
this.metricOutputFormat = metricOutputFormat;

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/UpsertWriter.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package com.dtstack.flink.sql.sink.rdb.writer;
2020

21-
import com.dtstack.flink.sql.sink.MetricOutputFormat;
21+
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
2222
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
2323
import org.apache.flink.api.java.tuple.Tuple2;
2424
import org.apache.flink.types.Row;
@@ -58,7 +58,7 @@ public static UpsertWriter create(
5858
String[] partitionFields,
5959
boolean objectReuse,
6060
boolean allReplace,
61-
MetricOutputFormat metricOutputFormat) {
61+
DtRichOutputFormat metricOutputFormat) {
6262

6363
checkNotNull(keyFields);
6464

@@ -96,9 +96,9 @@ public static UpsertWriter create(
9696
private transient Map<Row, Tuple2<Boolean, Row>> keyToRows;
9797
private transient PreparedStatement deleteStatement;
9898
// only use metric
99-
private transient MetricOutputFormat metricOutputFormat;
99+
private transient DtRichOutputFormat metricOutputFormat;
100100

101-
private UpsertWriter(int[] fieldTypes, int[] pkFields, int[] pkTypes, String deleteSQL, boolean objectReuse, MetricOutputFormat metricOutputFormat) {
101+
private UpsertWriter(int[] fieldTypes, int[] pkFields, int[] pkTypes, String deleteSQL, boolean objectReuse, DtRichOutputFormat metricOutputFormat) {
102102
this.fieldTypes = fieldTypes;
103103
this.pkFields = pkFields;
104104
this.pkTypes = pkTypes;
@@ -216,7 +216,7 @@ private UpsertWriterUsingUpsertStatement(
216216
boolean objectReuse,
217217
String deleteSQL,
218218
String upsertSQL,
219-
MetricOutputFormat metricOutputFormat) {
219+
DtRichOutputFormat metricOutputFormat) {
220220
super(fieldTypes, pkFields, pkTypes, deleteSQL, objectReuse, metricOutputFormat);
221221
this.upsertSQL = upsertSQL;
222222
}
@@ -274,7 +274,7 @@ private UpsertWriterUsingInsertUpdateStatement(
274274
String existSQL,
275275
String insertSQL,
276276
String updateSQL,
277-
MetricOutputFormat metricOutputFormat) {
277+
DtRichOutputFormat metricOutputFormat) {
278278
super(fieldTypes, pkFields, pkTypes, deleteSQL, objectReuse, metricOutputFormat);
279279
this.existSQL = existSQL;
280280
this.insertSQL = insertSQL;

0 commit comments

Comments
 (0)