diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java index acede80b..485ed28a 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java @@ -44,6 +44,7 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -74,10 +75,14 @@ abstract class FlinkPulsarSinkBase extends RichSinkFunction implements Che protected boolean failOnWrite; - /** Lock for accessing the pending records. */ + /** + * Lock for accessing the pending records. + */ protected final SerializableObject pendingRecordsLock = new SerializableObject(); - /** Number of unacknowledged records. */ + /** + * Number of unacknowledged records. + */ protected long pendingRecords = 0L; protected final boolean forcedTopic; @@ -86,7 +91,7 @@ abstract class FlinkPulsarSinkBase extends RichSinkFunction implements Che protected final TopicKeyExtractor topicKeyExtractor; - protected transient volatile Throwable failedWrite; + protected final AtomicReference failedWrite = new AtomicReference<>(); protected transient PulsarAdmin admin; @@ -155,12 +160,6 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { if (flushOnCheckpoint) { producerFlush(); - synchronized (pendingRecordsLock) { - if (pendingRecords != 0) { - throw new IllegalStateException("Pending record count must be zero at this point " + pendingRecords); - } - checkErroneous(); - } } } @@ -194,22 +193,25 @@ protected void initializeSendCallback() { if (failOnWrite) { this.sendCallback = (t, u) -> { - if (failedWrite == null && u == null) { - acknowledgeMessage(); - } else if (failedWrite == null && u != null) { - failedWrite = u; - } else { // failedWrite != null + if (failedWrite.get() != null) { // do nothing and wait next checkForError to throw exception + return; } - }; - } else { - this.sendCallback = (t, u) -> { - if (failedWrite == null && u != null) { - log.error("Error while sending message to Pulsar: {}", ExceptionUtils.stringifyException(u)); + if (u == null) { + acknowledgeMessage(); + return; } - acknowledgeMessage(); + failedWrite.compareAndSet(null, u); }; + return; } + + this.sendCallback = (t, u) -> { + if (u != null) { + log.error("Error while sending message to Pulsar: {}", ExceptionUtils.stringifyException(u)); + } + acknowledgeMessage(); + }; } private void uploadSchema(String topic) { @@ -273,7 +275,8 @@ public void producerFlush() throws Exception { synchronized (pendingRecordsLock) { while (pendingRecords > 0) { try { - pendingRecordsLock.wait(); + pendingRecordsLock.wait(100); + checkErroneous(); } catch (InterruptedException e) { // this can be interrupted when the Task has been cancelled. // by throwing an exception, we ensure that this checkpoint doesn't get confirmed @@ -284,7 +287,6 @@ public void producerFlush() throws Exception { } protected void producerClose() throws Exception { - producerFlush(); if (admin != null) { admin.close(); } @@ -301,11 +303,10 @@ protected void producerClose() throws Exception { } protected void checkErroneous() throws Exception { - Throwable e = failedWrite; - if (e != null) { - // prevent double throwing - failedWrite = null; - throw new Exception("Failed to send data to Pulsar: " + e.getMessage(), e); + Throwable t = failedWrite.get(); + if (t != null) { + log.error("Failed to send data to Pulsar: " + t.getMessage(), t); + throw new Exception("Failed to send data to Pulsar: " + t.getMessage(), t); } }