Skip to content

Commit a474292

Browse files
authored
Fix closing writers race condition (#98)
1 parent 7754567 commit a474292

File tree

1 file changed

+9
-7
lines changed

1 file changed

+9
-7
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/storage/StorageWriteApiDefaultStream.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,16 @@ public void preShutdown() {
8080
* @param tableName The table name for which stream has to be removed.
8181
*/
8282
private void closeAndDelete(String tableName) {
83-
logger.debug("Closing stream on table {}", tableName);
84-
if (tableToStream.containsKey(tableName)) {
85-
synchronized (tableToStream) {
86-
tableToStream.get(tableName).close();
87-
tableToStream.remove(tableName);
83+
tableToStream.computeIfPresent(tableName, (t, writer) -> {
84+
logger.debug("Closing stream on table {}", t);
85+
try {
86+
writer.close();
87+
logger.debug("Closed stream on table {}", t);
88+
} catch (Throwable e) {
89+
logger.warn("Error closing stream for table {}", t, e);
8890
}
89-
logger.debug("Closed stream on table {}", tableName);
90-
}
91+
return null;
92+
});
9193
}
9294

9395
/**

0 commit comments

Comments
 (0)