From d252cc4c522be3d5b83210bae52b94502769f237 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Wed, 13 Aug 2025 12:16:12 +0100 Subject: [PATCH] Fixed lost compression errors --- .../java/tech/ydb/topic/write/impl/EnqueuedMessage.java | 8 ++++---- .../main/java/tech/ydb/topic/write/impl/WriterImpl.java | 5 ++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java b/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java index f34e05fc..4841ff26 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java @@ -1,6 +1,5 @@ package tech.ydb.topic.write.impl; -import java.io.IOException; import java.time.Instant; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -35,7 +34,7 @@ public class EnqueuedMessage { private final YdbTransaction transaction; private volatile boolean isReady = false; - private volatile IOException compressError = null; + private volatile Throwable compressError = null; public EnqueuedMessage(Message message, SendSettings sendSettings, boolean noCompression) { this.bytes = message.getData(); @@ -60,7 +59,7 @@ public long getSize() { return bytes.length; } - public IOException getCompressError() { + public Throwable getCompressError() { return compressError; } @@ -71,8 +70,9 @@ public void encode(String writeId, Codec codec) { bytes = Encoder.encode(codec, bytes); isReady = true; logger.trace("[{}] Successfully finished encoding message", writeId); - } catch (IOException ex) { + } catch (Throwable ex) { logger.error("[{}] Exception while encoding message: ", writeId, ex); + compressError = ex; isReady = true; future.completeExceptionally(ex); } diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 7617fe9f..adf30c23 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -1,6 +1,5 @@ package tech.ydb.topic.write.impl; -import java.io.IOException; import java.util.Deque; import java.util.LinkedList; import java.util.List; @@ -165,7 +164,7 @@ private void acceptMessageIntoSendingQueue(EnqueuedMessage message) { } else { CompletableFuture .runAsync(() -> message.encode(id, settings.getCodec()), compressionExecutor) - .thenRun(this::moveEncodedMessagesToSendingQueue); + .whenComplete((res, th) -> moveEncodedMessagesToSendingQueue()); } } @@ -187,7 +186,7 @@ private void moveEncodedMessagesToSendingQueue() { break; } - IOException error = msg.getCompressError(); + Throwable error = msg.getCompressError(); if (error != null) { // just skip logger.warn("[{}] Message wasn't sent because of processing error", id, error); free(1, msg.getOriginalSize());