Skip to content

Commit d2c71e9

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_3.10.x_27191' into 1.8_release_3.10.x
2 parents b7bfbfe + eb5a0c8 commit d2c71e9

File tree

1 file changed

+32
-26
lines changed

1 file changed

+32
-26
lines changed

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -131,35 +131,41 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
131131
AtomicLong failCounter = new AtomicLong(0);
132132
AtomicBoolean finishFlag = new AtomicBoolean(false);
133133
while(!finishFlag.get()){
134-
CountDownLatch latch = new CountDownLatch(1);
135-
rdbSqlClient.getConnection(conn -> {
136-
try {
137-
if(conn.failed()){
138-
connectionStatus.set(false);
139-
if(failCounter.getAndIncrement() % 1000 == 0){
140-
LOG.error("getConnection error", conn.cause());
141-
}
142-
if(failCounter.get() >= sideInfo.getSideTableInfo().getConnectRetryMaxNum(100)){
143-
resultFuture.completeExceptionally(conn.cause());
144-
finishFlag.set(true);
134+
try{
135+
CountDownLatch latch = new CountDownLatch(1);
136+
rdbSqlClient.getConnection(conn -> {
137+
try {
138+
if(conn.failed()){
139+
connectionStatus.set(false);
140+
if(failCounter.getAndIncrement() % 1000 == 0){
141+
LOG.error("getConnection error", conn.cause());
142+
}
143+
if(failCounter.get() >= sideInfo.getSideTableInfo().getConnectRetryMaxNum(100)){
144+
resultFuture.completeExceptionally(conn.cause());
145+
finishFlag.set(true);
146+
}
147+
return;
145148
}
146-
return;
149+
connectionStatus.set(true);
150+
ScheduledFuture<?> timerFuture = registerTimer(input, resultFuture);
151+
cancelTimerWhenComplete(resultFuture, timerFuture);
152+
handleQuery(conn.result(), inputParams, input, resultFuture);
153+
finishFlag.set(true);
154+
} catch (Exception e) {
155+
dealFillDataError(input, resultFuture, e);
156+
} finally {
157+
latch.countDown();
147158
}
148-
connectionStatus.set(true);
149-
ScheduledFuture<?> timerFuture = registerTimer(input, resultFuture);
150-
cancelTimerWhenComplete(resultFuture, timerFuture);
151-
handleQuery(conn.result(), inputParams, input, resultFuture);
152-
finishFlag.set(true);
153-
} catch (Exception e) {
154-
dealFillDataError(input, resultFuture, e);
155-
} finally {
156-
latch.countDown();
159+
});
160+
try {
161+
latch.await();
162+
} catch (InterruptedException e) {
163+
LOG.error("", e);
157164
}
158-
});
159-
try {
160-
latch.await();
161-
} catch (InterruptedException e) {
162-
LOG.error("", e);
165+
166+
} catch (Exception e){
167+
//数据源队列溢出情况
168+
connectionStatus.set(false);
163169
}
164170
if(!finishFlag.get()){
165171
try {

0 commit comments

Comments
 (0)