Skip to content

Commit 3b59abd

Browse files
committed
Added errorThreshold to cdc config
1 parent b93322e commit 3b59abd

File tree

3 files changed

+28
-8
lines changed

3 files changed

+28
-8
lines changed

src/main/java/tech/ydb/app/CdcConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,6 @@ public interface CdcConfig {
1616
int getThreadsCount();
1717

1818
int getTimeoutSeconds();
19+
20+
int getErrorThreshold();
1921
}

src/main/java/tech/ydb/app/XmlConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public static class Cdc implements CdcConfig {
3636
private Integer threadsCount;
3737
@XmlAttribute(name = "timeoutSeconds")
3838
private Integer timeoutSeconds;
39+
@XmlAttribute(name = "errorThreshold")
40+
private Integer errorThreshold;
3941

4042
@XmlValue
4143
private String query;
@@ -78,5 +80,13 @@ public int getTimeoutSeconds() {
7880
}
7981
return timeoutSeconds;
8082
}
83+
84+
@Override
85+
public int getErrorThreshold() {
86+
if (errorThreshold == null) {
87+
return 0;
88+
}
89+
return errorThreshold;
90+
}
8191
}
8292
}

src/main/java/tech/ydb/app/YqlWriter.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class YqlWriter implements AutoCloseable {
3939
private final YdbService ydb;
4040
private final String queryYql;
4141
private final int timeoutSeconds;
42+
private final int errorThreshold;
4243

4344
private final List<Writer> writers;
4445

@@ -52,6 +53,7 @@ private YqlWriter(YdbService ydb, CdcConfig config, String prmName, StructType t
5253
this.ydb = ydb;
5354
this.queryYql = config.getQuery();
5455
this.timeoutSeconds = config.getTimeoutSeconds();
56+
this.errorThreshold = config.getErrorThreshold();
5557

5658
this.lastWrited = null;
5759
this.lastReaded = null;
@@ -152,6 +154,12 @@ public void run() {
152154
Random rnd = new Random();
153155

154156
while (!Thread.interrupted()) {
157+
Message msg = queue.poll();
158+
if (msg == null) {
159+
Thread.sleep(1000);
160+
continue;
161+
}
162+
155163
long now = System.currentTimeMillis();
156164
long printedAt = lastPrinted.get();
157165
if ((now - printedAt > 1000) && lastPrinted.compareAndSet(printedAt, now)) {
@@ -161,12 +169,6 @@ public void run() {
161169
logger.debug("writed {} rows, {} rps", written, String.format("%.2f", avg));
162170
}
163171

164-
Message msg = queue.poll();
165-
if (msg == null) {
166-
Thread.sleep(1000);
167-
continue;
168-
}
169-
170172
DeferredCommitter committer = DeferredCommitter.newInstance();
171173
Instant last = msg.getCreatedAt();
172174

@@ -198,8 +200,14 @@ public void run() {
198200
retry++;
199201
long delay = 25 << Math.min(retry, 8);
200202
delay = delay + rnd.nextLong(delay);
201-
logger.warn("got error {} after {} ms", lastStatus, ms);
202-
logger.warn("retry #{} in {} ms", retry, delay);
203+
if (retry > errorThreshold) {
204+
logger.warn("got error {} after {} ms", lastStatus, ms);
205+
logger.warn("retry #{} in {} ms", retry, delay);
206+
} else {
207+
logger.trace("got error {} after {} ms", lastStatus, ms);
208+
logger.trace("retry #{} in {} ms", retry, delay);
209+
}
210+
203211
Thread.sleep(delay);
204212

205213
now = System.currentTimeMillis();

0 commit comments

Comments
 (0)