Skip to content

Commit daf1e24

Browse files
committed
hbase retract sink modify
1 parent bc71498 commit daf1e24

File tree

14 files changed

+143
-73
lines changed

14 files changed

+143
-73
lines changed

clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseSink.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,19 @@
3030

3131

3232
public class ClickhouseSink extends RdbSink implements IStreamSinkGener<RdbSink> {
33-
34-
private static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
35-
3633
public ClickhouseSink() {
3734
super(new ClickhouseDialect());
3835
}
3936

4037
@Override
4138
public JDBCUpsertOutputFormat getOutputFormat() {
4239
JDBCOptions jdbcOptions = JDBCOptions.builder()
43-
.setDBUrl(dbURL).setDialect(jdbcDialect)
44-
.setUsername(userName).setPassword(password)
45-
.setTableName(tableName).build();
40+
.setDBUrl(dbURL)
41+
.setDialect(jdbcDialect)
42+
.setUsername(userName)
43+
.setPassword(password)
44+
.setTableName(tableName)
45+
.build();
4646

4747
return JDBCUpsertOutputFormat.builder()
4848
.setOptions(jdbcOptions)
@@ -52,7 +52,8 @@ public JDBCUpsertOutputFormat getOutputFormat() {
5252
.setFieldTypes(sqlTypes)
5353
.setKeyFields(primaryKeys)
5454
.setAllReplace(allReplace)
55-
.setUpdateMode(updateMode).build();
55+
.setUpdateMode(updateMode)
56+
.build();
5657
}
5758

5859

db2/db2-sink/src/main/java/com/dtstack/flink/sql/sink/db/DbSink.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99

1010
public class DbSink extends RdbSink {
1111

12-
private static final String DB2_DRIVER = "com.ibm.db2.jcc.DB2Driver";
13-
1412
public DbSink() {
1513
super(new DbDialect());
1614
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
import java.util.Set;
4747

4848
/**
49-
49+
* @author: [email protected]
5050
* date: 2017-6-29
5151
*/
5252
public class HbaseOutputFormat extends DtRichOutputFormat<Tuple2> {
@@ -94,16 +94,16 @@ public void open(int taskNumber, int numTasks) throws IOException {
9494
@Override
9595
public void writeRecord(Tuple2 tuple2) {
9696
Tuple2<Boolean, Row> tupleTrans = tuple2;
97-
Boolean retract = tupleTrans.getField(0);
98-
if (!retract && StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.UPSERT.name())) {
99-
dealDelete(tupleTrans);
100-
} else {
101-
dealInsert(tupleTrans);
97+
Boolean retract = tupleTrans.f0;
98+
Row row = tupleTrans.f1;
99+
if (retract) {
100+
dealInsert(row);
101+
} else if (!retract && StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.UPSERT.name())) {
102+
dealDelete(row);
102103
}
103104
}
104105

105-
protected void dealInsert(Tuple2<Boolean, Row> tupleTrans) {
106-
Row record = tupleTrans.getField(1);
106+
protected void dealInsert(Row record) {
107107
Put put = getPutByRow(record);
108108
if (put == null) {
109109
return;
@@ -112,11 +112,11 @@ protected void dealInsert(Tuple2<Boolean, Row> tupleTrans) {
112112
try {
113113
table.put(put);
114114
} catch (IOException e) {
115-
outDirtyRecords.inc();
116115
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
117-
LOG.error("record insert failed ..", record.toString());
116+
LOG.error("record insert failed ..{}", record.toString());
118117
LOG.error("", e);
119118
}
119+
outDirtyRecords.inc();
120120
}
121121

122122
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
@@ -125,19 +125,18 @@ protected void dealInsert(Tuple2<Boolean, Row> tupleTrans) {
125125
outRecords.inc();
126126
}
127127

128-
protected void dealDelete(Tuple2<Boolean, Row> tupleTrans) {
129-
Row record = tupleTrans.getField(1);
128+
protected void dealDelete(Row record) {
130129
String rowKey = buildRowKey(record);
131130
if (!StringUtils.isEmpty(rowKey)) {
132131
Delete delete = new Delete(Bytes.toBytes(rowKey));
133132
try {
134133
table.delete(delete);
135134
} catch (IOException e) {
136-
outDirtyRecords.inc();
137135
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
138-
LOG.error("record insert failed ..", record.toString());
136+
LOG.error("record insert failed ..{}", record.toString());
139137
LOG.error("", e);
140138
}
139+
outDirtyRecords.inc();
141140
}
142141
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
143142
LOG.info(record.toString());
@@ -170,7 +169,7 @@ private String buildRowKey(Row record) {
170169
List<String> rowKeyValues = getRowKeyValues(record);
171170
// all rowkey not null
172171
if (rowKeyValues.size() != rowkey.length) {
173-
LOG.error("row key value must not null,record is ..", record);
172+
LOG.error("row key value must not null,record is ..{}", record);
174173
outDirtyRecords.inc();
175174
return "";
176175
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.impala;
20+
21+
/**
22+
* impala kdnc AuthMech params
23+
* Date: 2020/2/18
24+
* Company: www.dtstack.com
25+
* @author maqi
26+
*/
27+
public enum EAuthMech {
28+
// 0 for No Authentication
29+
NoAuthentication(0),
30+
// 1 for Kerberos
31+
Kerberos(1),
32+
// 2 for User Name
33+
UserName(2),
34+
// 3 for User Name and Password
35+
NameANDPassword(3);
36+
37+
private int type;
38+
39+
EAuthMech(int type) {
40+
this.type = type;
41+
}
42+
43+
public int getType() {
44+
return this.type;
45+
}
46+
}

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaDialect.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
public class ImpalaDialect implements JDBCDialect {
3535
private static final long serialVersionUID = 1L;
3636

37-
private static final String IMPALA_PARTITION_KEYWORD = " partition";
37+
private static final String IMPALA_PARTITION_KEYWORD = "partition";
3838

3939
@Override
4040
public boolean canHandle(String url) {
@@ -76,7 +76,7 @@ public String getInsertIntoStatement(String schema, String tableName, String[] f
7676
.map(field -> field.replaceAll("\"", "'"))
7777
.collect(Collectors.joining(", "));
7878

79-
String partitionStatement = StringUtils.isEmpty(partitionFieldStr) ? "" : IMPALA_PARTITION_KEYWORD + "(" + partitionFieldStr + ")";
79+
String partitionStatement = StringUtils.isEmpty(partitionFieldStr) ? "" : " " + IMPALA_PARTITION_KEYWORD + "(" + partitionFieldStr + ")";
8080

8181
return "INSERT INTO " + schemaInfo + quoteIdentifier(tableName) +
8282
"(" + columns + ")" + partitionStatement + " VALUES (" + placeholders + ")";

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaSink.java

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,12 @@
2222
import com.dtstack.flink.sql.sink.impala.table.ImpalaTableInfo;
2323
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
2424
import com.dtstack.flink.sql.sink.rdb.RdbSink;
25-
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
2625
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
2726
import com.dtstack.flink.sql.table.TargetTableInfo;
28-
import com.dtstack.flink.sql.util.JDBCUtils;
29-
import org.apache.commons.lang.StringUtils;
30-
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
31-
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
3227
import org.apache.hadoop.conf.Configuration;
3328
import org.apache.hadoop.security.UserGroupInformation;
3429

3530
import java.io.IOException;
36-
import java.sql.DriverManager;
37-
import java.sql.SQLException;
38-
import java.util.ArrayList;
39-
import java.util.Arrays;
40-
import java.util.List;
41-
import java.util.Map;
4231

4332
/**
4433
* Date: 2019/11/11
@@ -58,9 +47,12 @@ public ImpalaSink() {
5847
@Override
5948
public JDBCUpsertOutputFormat getOutputFormat() {
6049
JDBCOptions jdbcOptions = JDBCOptions.builder()
61-
.setDBUrl(getImpalaJdbcUrl()).setDialect(jdbcDialect)
62-
.setUsername(userName).setPassword(password)
63-
.setTableName(tableName).build();
50+
.setDBUrl(getImpalaJdbcUrl())
51+
.setDialect(jdbcDialect)
52+
.setUsername(userName)
53+
.setPassword(password)
54+
.setTableName(tableName)
55+
.build();
6456

6557
return JDBCUpsertOutputFormat.builder()
6658
.setOptions(jdbcOptions)
@@ -71,27 +63,28 @@ public JDBCUpsertOutputFormat getOutputFormat() {
7163
.setKeyFields(primaryKeys)
7264
.setPartitionFields(impalaTableInfo.getPartitionFields())
7365
.setAllReplace(allReplace)
74-
.setUpdateMode(updateMode).build();
66+
.setUpdateMode(updateMode)
67+
.build();
7568
}
7669

7770

7871
public String getImpalaJdbcUrl() {
7972
Integer authMech = impalaTableInfo.getAuthMech();
8073
String newUrl = dbURL;
8174
StringBuffer urlBuffer = new StringBuffer(dbURL);
82-
if (authMech == 0) {
75+
if (authMech == EAuthMech.NoAuthentication.getType()) {
8376
return newUrl;
84-
} else if (authMech == 1) {
77+
} else if (authMech == EAuthMech.Kerberos.getType()) {
8578
String keyTabFilePath = impalaTableInfo.getKeyTabFilePath();
8679
String krb5FilePath = impalaTableInfo.getKrb5FilePath();
8780
String principal = impalaTableInfo.getPrincipal();
8881
String krbRealm = impalaTableInfo.getKrbRealm();
89-
String krbHostFQDN = impalaTableInfo.getKrbHostFQDN();
82+
String krbHostFqdn = impalaTableInfo.getKrbHostFQDN();
9083
String krbServiceName = impalaTableInfo.getKrbServiceName();
9184
urlBuffer.append(";"
9285
.concat("AuthMech=1;")
9386
.concat("KrbRealm=").concat(krbRealm).concat(";")
94-
.concat("KrbHostFQDN=").concat(krbHostFQDN).concat(";")
87+
.concat("KrbHostFQDN=").concat(krbHostFqdn).concat(";")
9588
.concat("KrbServiceName=").concat(krbServiceName).concat(";")
9689
);
9790
newUrl = urlBuffer.toString();
@@ -106,15 +99,15 @@ public String getImpalaJdbcUrl() {
10699
throw new RuntimeException("loginUserFromKeytab error ..", e);
107100
}
108101

109-
} else if (authMech == 2) {
102+
} else if (authMech == EAuthMech.UserName.getType()) {
110103
urlBuffer.append(";"
111104
.concat("AuthMech=3;")
112105
.concat("UID=").concat(userName).concat(";")
113106
.concat("PWD=;")
114107
.concat("UseSasl=0")
115108
);
116109
newUrl = urlBuffer.toString();
117-
} else if (authMech == 3) {
110+
} else if (authMech == EAuthMech.NameANDPassword.getType()) {
118111
urlBuffer.append(";"
119112
.concat("AuthMech=3;")
120113
.concat("UID=").concat(userName).concat(";")

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduOutputFormat.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,13 @@
2323
import org.apache.flink.api.java.tuple.Tuple2;
2424
import org.apache.flink.configuration.Configuration;
2525
import org.apache.flink.types.Row;
26-
import org.apache.kudu.client.*;
26+
import org.apache.kudu.client.AsyncKuduClient;
27+
import org.apache.kudu.client.AsyncKuduSession;
28+
import org.apache.kudu.client.KuduClient;
29+
import org.apache.kudu.client.KuduException;
30+
import org.apache.kudu.client.KuduTable;
31+
import org.apache.kudu.client.Operation;
32+
import org.apache.kudu.client.PartialRow;
2733
import org.slf4j.Logger;
2834
import org.slf4j.LoggerFactory;
2935

@@ -32,13 +38,24 @@
3238
import java.sql.Timestamp;
3339
import java.util.Date;
3440

41+
/**
42+
* @author gituser
43+
* @modify xiuzhu
44+
*/
3545
public class KuduOutputFormat extends DtRichOutputFormat<Tuple2> {
3646

3747
private static final long serialVersionUID = 1L;
3848

3949
private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
4050

41-
public enum WriteMode {INSERT, UPDATE, UPSERT}
51+
public enum WriteMode {
52+
// insert
53+
INSERT,
54+
// update
55+
UPDATE,
56+
// update or insert
57+
UPSERT
58+
}
4259

4360
private String kuduMasters;
4461

@@ -106,7 +123,7 @@ public void writeRecord(Tuple2 record) throws IOException {
106123
Row row = tupleTrans.getField(1);
107124
if (row.getArity() != fieldNames.length) {
108125
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0) {
109-
LOG.error("record insert failed ..", row.toString());
126+
LOG.error("record insert failed ..{}", row.toString());
110127
LOG.error("cause by row.getArity() != fieldNames.length");
111128
}
112129
outDirtyRecords.inc();
@@ -125,7 +142,7 @@ public void writeRecord(Tuple2 record) throws IOException {
125142
outRecords.inc();
126143
} catch (KuduException e) {
127144
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0){
128-
LOG.error("record insert failed ..", row.toString().substring(0, 100));
145+
LOG.error("record insert failed ..{}", row.toString().substring(0, 100));
129146
LOG.error("", e);
130147
}
131148
outDirtyRecords.inc();

mysql/mysql-sink/src/main/java/com/dtstack/flink/sql/sink/mysql/MysqlSink.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,12 @@ public MysqlSink() {
4242
@Override
4343
public JDBCUpsertOutputFormat getOutputFormat() {
4444
JDBCOptions jdbcOptions = JDBCOptions.builder()
45-
.setDBUrl(dbURL).setDialect(jdbcDialect)
46-
.setUsername(userName).setPassword(password)
47-
.setTableName(tableName).build();
45+
.setDBUrl(dbURL)
46+
.setDialect(jdbcDialect)
47+
.setUsername(userName)
48+
.setPassword(password)
49+
.setTableName(tableName)
50+
.build();
4851

4952
return JDBCUpsertOutputFormat.builder()
5053
.setOptions(jdbcOptions)
@@ -54,6 +57,7 @@ public JDBCUpsertOutputFormat getOutputFormat() {
5457
.setFieldTypes(sqlTypes)
5558
.setKeyFields(primaryKeys)
5659
.setAllReplace(allReplace)
57-
.setUpdateMode(updateMode).build();
60+
.setUpdateMode(updateMode)
61+
.build();
5862
}
5963
}

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,13 @@ public OracleSink() {
3838
@Override
3939
public JDBCUpsertOutputFormat getOutputFormat() {
4040
JDBCOptions jdbcOptions = JDBCOptions.builder()
41-
.setDBUrl(dbURL).setDialect(jdbcDialect)
42-
.setUsername(userName).setPassword(password)
43-
.setTableName(tableName).setScheam(schema).build();
41+
.setDBUrl(dbURL)
42+
.setDialect(jdbcDialect)
43+
.setUsername(userName)
44+
.setPassword(password)
45+
.setTableName(tableName)
46+
.setScheam(schema)
47+
.build();
4448

4549
return JDBCUpsertOutputFormat.builder()
4650
.setOptions(jdbcOptions)
@@ -50,6 +54,7 @@ public JDBCUpsertOutputFormat getOutputFormat() {
5054
.setFieldTypes(sqlTypes)
5155
.setKeyFields(primaryKeys)
5256
.setAllReplace(allReplace)
53-
.setUpdateMode(updateMode).build();
57+
.setUpdateMode(updateMode)
58+
.build();
5459
}
5560
}

0 commit comments

Comments
 (0)