Skip to content

Commit eb5a0c8

Browse files
author
dapeng
committed
数据源队列溢出保护
1 parent e463694 commit eb5a0c8

File tree

1 file changed

+32
-26
lines changed

1 file changed

+32
-26
lines changed

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -121,35 +121,41 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
121121
AtomicLong failCounter = new AtomicLong(0);
122122
AtomicBoolean finishFlag = new AtomicBoolean(false);
123123
while(!finishFlag.get()){
124-
CountDownLatch latch = new CountDownLatch(1);
125-
rdbSqlClient.getConnection(conn -> {
126-
try {
127-
if(conn.failed()){
128-
connectionStatus.set(false);
129-
if(failCounter.getAndIncrement() % 1000 == 0){
130-
LOG.error("getConnection error", conn.cause());
131-
}
132-
if(failCounter.get() >= sideInfo.getSideTableInfo().getConnectRetryMaxNum(100)){
133-
resultFuture.completeExceptionally(conn.cause());
134-
finishFlag.set(true);
124+
try{
125+
CountDownLatch latch = new CountDownLatch(1);
126+
rdbSqlClient.getConnection(conn -> {
127+
try {
128+
if(conn.failed()){
129+
connectionStatus.set(false);
130+
if(failCounter.getAndIncrement() % 1000 == 0){
131+
LOG.error("getConnection error", conn.cause());
132+
}
133+
if(failCounter.get() >= sideInfo.getSideTableInfo().getConnectRetryMaxNum(100)){
134+
resultFuture.completeExceptionally(conn.cause());
135+
finishFlag.set(true);
136+
}
137+
return;
135138
}
136-
return;
139+
connectionStatus.set(true);
140+
ScheduledFuture<?> timerFuture = registerTimer(input, resultFuture);
141+
cancelTimerWhenComplete(resultFuture, timerFuture);
142+
handleQuery(conn.result(), inputParams, input, resultFuture);
143+
finishFlag.set(true);
144+
} catch (Exception e) {
145+
dealFillDataError(input, resultFuture, e);
146+
} finally {
147+
latch.countDown();
137148
}
138-
connectionStatus.set(true);
139-
ScheduledFuture<?> timerFuture = registerTimer(input, resultFuture);
140-
cancelTimerWhenComplete(resultFuture, timerFuture);
141-
handleQuery(conn.result(), inputParams, input, resultFuture);
142-
finishFlag.set(true);
143-
} catch (Exception e) {
144-
dealFillDataError(input, resultFuture, e);
145-
} finally {
146-
latch.countDown();
149+
});
150+
try {
151+
latch.await();
152+
} catch (InterruptedException e) {
153+
LOG.error("", e);
147154
}
148-
});
149-
try {
150-
latch.await();
151-
} catch (InterruptedException e) {
152-
LOG.error("", e);
155+
156+
} catch (Exception e){
157+
//数据源队列溢出情况
158+
connectionStatus.set(false);
153159
}
154160
if(!finishFlag.get()){
155161
try {

0 commit comments

Comments
 (0)