Skip to content

Commit 733b76c

Browse files
committed
Fixed multithreding problem
1 parent 620b568 commit 733b76c

File tree

4 files changed

+30
-25
lines changed

4 files changed

+30
-25
lines changed

src/main/java/tech/ydb/app/Application.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.HashMap;
66
import java.util.List;
77
import java.util.Map;
8+
import java.util.function.Supplier;
89

910
import jakarta.annotation.PreDestroy;
1011
import jakarta.xml.bind.JAXB;
@@ -84,7 +85,7 @@ private void loadXmlConfig(String content) {
8485
}
8586

8687
for (XmlConfig.Cdc cdc: xml.getCdcs()) {
87-
Result<CdcMsgParser> batcher = CdcMsgParser.parse(ydb, queries, cdc);
88+
Result<Supplier<CdcMsgParser>> batcher = CdcMsgParser.parseConfig(ydb, queries, cdc);
8889
if (!batcher.isSuccess()) {
8990
logger.error("can't create reader {} with problem {}", cdc.getConsumer(), batcher.getStatus());
9091
warnings.add("can't create reader " + cdc.getConsumer() + " with problem: " + batcher.getStatus());

src/main/java/tech/ydb/app/CdcMsgParser.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.HashSet;
66
import java.util.Map;
77
import java.util.Set;
8+
import java.util.function.Supplier;
89

910
import com.fasterxml.jackson.databind.JsonNode;
1011
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,9 +36,9 @@ public class CdcMsgParser {
3536
private final YqlQuery updateQuery;
3637
private final YqlQuery deleteQuery;
3738

38-
private CdcMsgParser(YqlQuery updateQuery, YqlQuery deleteQuery) {
39-
this.updateQuery = updateQuery;
40-
this.deleteQuery = deleteQuery;
39+
private CdcMsgParser(Supplier<YqlQuery> updateQuery, Supplier<YqlQuery> deleteQuery) {
40+
this.updateQuery = updateQuery.get();
41+
this.deleteQuery = deleteQuery.get();
4142
}
4243

4344
public YqlQuery parseJsonMessage(byte[] json) throws IOException {
@@ -80,7 +81,8 @@ public YqlQuery parseJsonMessage(byte[] json) throws IOException {
8081
return null;
8182
}
8283

83-
public static Result<CdcMsgParser> parse(YdbService ydb, Map<String, XmlConfig.Query> queries, XmlConfig.Cdc cdc) {
84+
public static Result<Supplier<CdcMsgParser>> parseConfig(YdbService ydb,
85+
Map<String, XmlConfig.Query> queries, XmlConfig.Cdc cdc) {
8486
return new Parser(ydb, cdc, queries).parse();
8587
}
8688

@@ -95,7 +97,7 @@ public Parser(YdbService ydb, XmlConfig.Cdc cdc, Map<String, XmlConfig.Query> xm
9597
this.xmlQueries = xmlQueries;
9698
}
9799

98-
public Result<CdcMsgParser> parse() {
100+
public Result<Supplier<CdcMsgParser>> parse() {
99101
String changefeed = ydb.expandPath(cdc.getChangefeed());
100102

101103
int index = changefeed.lastIndexOf("/");
@@ -112,20 +114,20 @@ public Result<CdcMsgParser> parse() {
112114
}
113115
TableDescription description = descRes.getValue();
114116

115-
Result<YqlQuery> updateQuery = findUpdateQuery(description);
117+
Result<Supplier<YqlQuery>> updateQuery = findUpdateQuery(description);
116118
if (!updateQuery.isSuccess()) {
117119
return updateQuery.map(null);
118120
}
119121

120-
Result<YqlQuery> deleteQuery = findDeleteQuery(description);
122+
Result<Supplier<YqlQuery>> deleteQuery = findDeleteQuery(description);
121123
if (!deleteQuery.isSuccess()) {
122124
return deleteQuery.map(null);
123125
}
124126

125-
return Result.success(new CdcMsgParser(updateQuery.getValue(), deleteQuery.getValue()));
127+
return Result.success(() -> new CdcMsgParser(updateQuery.getValue(), deleteQuery.getValue()));
126128
}
127129

128-
private Result<YqlQuery> findUpdateQuery(TableDescription source) {
130+
private Result<Supplier<YqlQuery>> findUpdateQuery(TableDescription source) {
129131
if (cdc.getQuery() != null && !cdc.getQuery().trim().isEmpty()) {
130132
return validate(source, cdc.getQuery().trim(), false);
131133
}
@@ -140,7 +142,7 @@ private Result<YqlQuery> findUpdateQuery(TableDescription source) {
140142
return Result.success(YqlQuery.skipMessages("update", "updateQueryId", source.getPrimaryKeys(), cdc));
141143
}
142144

143-
private Result<YqlQuery> findDeleteQuery(TableDescription source) {
145+
private Result<Supplier<YqlQuery>> findDeleteQuery(TableDescription source) {
144146
String queryId = cdc.getDeleteQueryId();
145147
if (queryId != null && xmlQueries.containsKey(queryId)) {
146148
XmlConfig.Query query = xmlQueries.get(queryId);
@@ -152,7 +154,7 @@ private Result<YqlQuery> findDeleteQuery(TableDescription source) {
152154
return Result.success(YqlQuery.skipMessages("erase", "deleteQueryId", source.getPrimaryKeys(), cdc));
153155
}
154156

155-
private Result<YqlQuery> validate(TableDescription source, String query, boolean keysOnly) {
157+
private Result<Supplier<YqlQuery>> validate(TableDescription source, String query, boolean keysOnly) {
156158
Result<DataQuery> parsed = ydb.parseQuery(query);
157159
if (!parsed.isSuccess()) {
158160
logger.error("Can't parse query for consumer {}, got status {}", cdc.getConsumer(), parsed.getStatus());

src/main/java/tech/ydb/app/YqlQuery.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.HashMap;
1010
import java.util.List;
1111
import java.util.Map;
12+
import java.util.function.Supplier;
1213

1314
import com.fasterxml.jackson.databind.JsonNode;
1415
import org.slf4j.Logger;
@@ -57,9 +58,7 @@ public void addMessage(JsonNode key, JsonNode update) throws IOException {
5758
Integer keyIndex = keyColumns.get(name);
5859
members[idx] = readValue(key.get(keyIndex), type);
5960
} else {
60-
if (update != null) {
61-
members[idx] = readValue(update.get(name), type);
62-
}
61+
members[idx] = readValue(update != null ? update.get(name) : null, type);
6362
}
6463
}
6564

@@ -162,9 +161,9 @@ private Value<?> readValue(JsonNode node, Type type) throws IOException {
162161
throw new IOException("Can't read node value " + node + " with type " + type);
163162
}
164163

165-
public static YqlQuery skipMessages(String type, String config, List<String> keys, XmlConfig.Cdc xml) {
164+
public static Supplier<YqlQuery> skipMessages(String type, String config, List<String> keys, XmlConfig.Cdc xml) {
166165
final int batchSize = xml.getBatchSize();
167-
return new YqlQuery(null, keys, batchSize) {
166+
return () -> new YqlQuery(null, keys, batchSize) {
168167
@Override
169168
public void addMessage(JsonNode key, JsonNode update) throws IOException {
170169
batch.add(NullValue.of());
@@ -178,10 +177,10 @@ public Status execute(YdbService ydb) {
178177
};
179178
}
180179

181-
public static YqlQuery executeYql(String query, List<String> keys, String name, StructType type, XmlConfig.Cdc config) {
180+
public static Supplier<YqlQuery> executeYql(String query, List<String> keys, String name, StructType type, XmlConfig.Cdc config) {
182181
final int batchSize = config.getBatchSize();
183182
final int timeout = config.getTimeoutSeconds();
184-
return new YqlQuery(type, keys, batchSize) {
183+
return () -> new YqlQuery(type, keys, batchSize) {
185184
@Override
186185
public Status execute(YdbService ydb) {
187186
Params prm = Params.of(name, ListType.of(type).newValue(batch));

src/main/java/tech/ydb/app/YqlWriter.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.concurrent.BlockingQueue;
1010
import java.util.concurrent.TimeUnit;
1111
import java.util.concurrent.atomic.AtomicLong;
12+
import java.util.function.Supplier;
1213

1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
@@ -27,7 +28,6 @@ public class YqlWriter implements AutoCloseable {
2728
private static final Logger logger = LoggerFactory.getLogger(YqlWriter.class);
2829

2930
private final YdbService ydb;
30-
private final CdcMsgParser parser;
3131
private final int errorThreshold;
3232

3333
private final List<Writer> writers;
@@ -38,9 +38,8 @@ public class YqlWriter implements AutoCloseable {
3838
private final AtomicLong lastPrinted = new AtomicLong();
3939
private final AtomicLong writtenCount = new AtomicLong();
4040

41-
public YqlWriter(YdbService ydb, CdcMsgParser parser, XmlConfig.Cdc config) {
41+
public YqlWriter(YdbService ydb, Supplier<CdcMsgParser> parser, XmlConfig.Cdc config) {
4242
this.ydb = ydb;
43-
this.parser = parser;
4443
this.errorThreshold = config.getErrorThreshold();
4544

4645
this.lastWrited = null;
@@ -49,7 +48,7 @@ public YqlWriter(YdbService ydb, CdcMsgParser parser, XmlConfig.Cdc config) {
4948

5049
for (int idx = 1; idx <= config.getThreadsCount(); idx++) {
5150
String name = "writer-" + config.getConsumer() + "[" + idx + "]";
52-
writers.add(new Writer(config.getBatchSize(), name));
51+
writers.add(new Writer(parser.get(), config.getBatchSize(), name));
5352
}
5453
}
5554

@@ -97,9 +96,11 @@ public void addMessage(long partitionId, Message msg) {
9796
private class Writer implements Runnable {
9897
private final BlockingQueue<Message> queue;
9998
private final Thread thread;
99+
private final CdcMsgParser parser;
100100
private volatile Status lastStatus = Status.SUCCESS;
101101

102-
public Writer(int batchSize, String threadName) {
102+
public Writer(CdcMsgParser parser, int batchSize, String threadName) {
103+
this.parser = parser;
103104
this.queue = new ArrayBlockingQueue<>(2 * batchSize);
104105
this.thread = new Thread(this, threadName);
105106
}
@@ -153,7 +154,9 @@ public void run() {
153154
long ms = now - printedAt;
154155
long written = writtenCount.getAndSet(0);
155156
double avg = 1000.0d * written / ms;
156-
logger.debug("writed {} rows, {} rps", written, String.format("%.2f", avg));
157+
String w = String.format("%7d", written);
158+
String a = String.format("%10.2f", avg);
159+
logger.debug("writed {} rows, {} rps", w, a);
157160
}
158161

159162
DeferredCommitter committer = DeferredCommitter.newInstance();

0 commit comments

Comments
 (0)