Skip to content

Commit b7bfbfe

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_3.10.x_27342' into 1.8_release_3.10.x
2 parents 4406520 + 81b35d3 commit b7bfbfe

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ protected void initCache() throws SQLException {
9393
protected void reloadCache() {
9494
//reload cacheRef and replace to old cacheRef
9595
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();
96-
cacheRef.set(newCache);
9796
try {
9897
loadData(newCache);
9998
} catch (SQLException e) {
@@ -123,6 +122,11 @@ public void flatMap(CRow value, Collector<CRow> out) throws Exception {
123122
List<Map<String, Object>> cacheList = cacheRef.get().get(cacheKey);
124123
if (CollectionUtils.isEmpty(cacheList) && sideInfo.getJoinType() == JoinType.LEFT) {
125124
out.collect(new CRow(fillData(value.row(), null), value.change()));
125+
return;
126+
}
127+
128+
if (CollectionUtils.isEmpty(cacheList)) {
129+
return;
126130
}
127131

128132
cacheList.forEach(one -> out.collect(new CRow(fillData(value.row(), one), value.change())));

0 commit comments

Comments
 (0)