Skip to content

Commit c33c129

Browse files
committed
fixed issue in ydb table scan when the queue is full at the end of the scan
1 parent 486dbca commit c33c129

File tree

2 files changed

+22
-16
lines changed

2 files changed

+22
-16
lines changed

NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ cd /home/zinal/Software/spark-3.3.3-bin-hadoop3
3636
```sql
3737
create schema ydb1.spark;
3838
create table ydb1.spark.test1(a integer not null, b bigint, c varchar(100)) tblproperties('primary_key'='b,a');
39+
40+
CREATE TABLE ydb1.fhrw2 TBLPROPERTIES('primary_key'='h2,unique_key') AS SELECT hash(unique_key) AS h2, x.* FROM ydb1.fhrw0 x;
3941
```
4042

4143

src/main/java/tech/ydb/spark/connector/YdbScanViaReadTable.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,22 @@ private void configureRanges(ReadTableSettings.Builder rtsb) {
274274
}
275275
}
276276

277+
private void putToQueue(QueueItem qi) {
278+
while (true) {
279+
try {
280+
queue.add(qi);
281+
break; // exit the "queue put" retry loop
282+
} catch (IllegalStateException ise) {
283+
// The unlikely case of thread interrupt should not prevent us
284+
// from putting an item into the queue
285+
try {
286+
Thread.sleep(35L);
287+
} catch (InterruptedException ix) {
288+
}
289+
}
290+
}
291+
}
292+
277293
static final class QueueItem {
278294

279295
final ResultSetReader reader;
@@ -298,21 +314,9 @@ public void run() {
298314
try {
299315
stream.start(part -> {
300316
final ResultSetReader rsr = part.getResultSetReader();
301-
while (true) {
302-
try {
303-
queue.add(new QueueItem(rsr));
304-
LOG.debug("Added portion of {} rows for table {} to the queue.",
305-
rsr.getRowCount(), tablePath);
306-
return; // exit the "queue put" retry loop - and lambda too
307-
} catch (IllegalStateException ise) {
308-
// The unlikely case of interrupt should not prevent us
309-
// from putting an item into the queue
310-
try {
311-
Thread.sleep(35L);
312-
} catch (InterruptedException ix) {
313-
}
314-
}
315-
}
317+
putToQueue(new QueueItem(rsr));
318+
LOG.debug("Added portion of {} rows for table {} to the queue.",
319+
rsr.getRowCount(), tablePath);
316320
}).join().expectSuccess();
317321
} catch (Exception ex) {
318322
boolean needReport = true;
@@ -327,7 +331,7 @@ public void run() {
327331
setIssue(ex);
328332
}
329333
}
330-
queue.add(END_OF_SCAN);
334+
putToQueue(END_OF_SCAN);
331335
LOG.debug("Completed background scan for table {}, range {}", tablePath, keyRange);
332336
}
333337
}

0 commit comments

Comments
 (0)