Skip to content

Commit 482bc4b

Browse files
committed
separated implementation classes into their own package
1 parent a50cd12 commit 482bc4b

17 files changed

+162
-97
lines changed

src/main/java/tech/ydb/spark/connector/YdbCatalog.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@
3232
import tech.ydb.scheme.SchemeClient;
3333
import tech.ydb.scheme.description.DescribePathResult;
3434
import tech.ydb.scheme.description.ListDirectoryResult;
35+
import tech.ydb.spark.connector.impl.YdbAlterTable;
36+
import tech.ydb.spark.connector.impl.YdbConnector;
37+
import tech.ydb.spark.connector.impl.YdbCreateTable;
38+
import tech.ydb.spark.connector.impl.YdbRegistry;
3539
import tech.ydb.table.SessionRetryContext;
3640
import tech.ydb.table.description.TableDescription;
3741
import tech.ydb.table.description.TableIndex;
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package tech.ydb.spark.connector;
2+
3+
import org.apache.spark.sql.SparkSessionExtensions;
4+
import org.apache.spark.sql.SparkSessionExtensionsProvider;
5+
import scala.runtime.BoxedUnit;
6+
7+
/**
8+
*
9+
* @author zinal
10+
*/
11+
public class YdbExtensions implements SparkSessionExtensionsProvider {
12+
13+
@Override
14+
public BoxedUnit apply(SparkSessionExtensions ext) {
15+
throw new UnsupportedOperationException("Not supported yet.");
16+
}
17+
18+
}

src/main/java/tech/ydb/spark/connector/YdbOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public abstract class YdbOptions {
167167
* @param referenced properties for the connection to be found or created
168168
* @return true, if properties connectionMatches, false otherwise
169169
*/
170-
public static boolean connectionMatches(Map<String, String> existing, Map<String, String> referenced) {
170+
public static boolean optionsMatches(Map<String, String> existing, Map<String, String> referenced) {
171171
for (String propName : CONN_IDENTITY) {
172172
String v1 = existing.get(propName);
173173
String v2 = referenced.get(propName);

src/main/java/tech/ydb/spark/connector/YdbPartitionReader.java

Lines changed: 0 additions & 53 deletions
This file was deleted.

src/main/java/tech/ydb/spark/connector/YdbPartitionReaderFactory.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,24 @@
11
package tech.ydb.spark.connector;
22

3+
import java.io.IOException;
4+
35
import org.apache.spark.sql.catalyst.InternalRow;
46
import org.apache.spark.sql.connector.read.InputPartition;
57
import org.apache.spark.sql.connector.read.PartitionReader;
68
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
79

10+
import tech.ydb.spark.connector.impl.YdbScanImpl;
11+
812
/**
913
* Partition reader factory delivers the scan options to partition reader instances.
1014
*
1115
* @author zinal
1216
*/
1317
public class YdbPartitionReaderFactory implements PartitionReaderFactory {
1418

19+
private static final org.slf4j.Logger LOG =
20+
org.slf4j.LoggerFactory.getLogger(YdbPartitionReaderFactory.class);
21+
1522
private static final long serialVersionUID = 1L;
1623

1724
private final YdbScanOptions options;
@@ -22,6 +29,46 @@ public YdbPartitionReaderFactory(YdbScanOptions options) {
2229

2330
@Override
2431
public PartitionReader<InternalRow> createReader(InputPartition partition) {
25-
return new YdbPartitionReader(options, (YdbTablePartition) partition);
32+
return new YdbReader(options, (YdbTablePartition) partition);
33+
}
34+
35+
static class YdbReader implements PartitionReader<InternalRow> {
36+
37+
private final YdbScanOptions options;
38+
private final YdbTablePartition partition;
39+
private YdbScanImpl scan;
40+
41+
YdbReader(YdbScanOptions options, YdbTablePartition partition) {
42+
this.options = options;
43+
this.partition = partition;
44+
}
45+
46+
@Override
47+
public boolean next() throws IOException {
48+
if (scan == null) {
49+
LOG.debug("Preparing scan for table {} at partition {}",
50+
options.getTablePath(), partition);
51+
scan = new YdbScanImpl(options, partition.getRange());
52+
scan.prepare();
53+
LOG.debug("Scan prepared, ready to fetch...");
54+
}
55+
return scan.next();
56+
}
57+
58+
@Override
59+
public InternalRow get() {
60+
return scan.get();
61+
}
62+
63+
@Override
64+
public void close() throws IOException {
65+
if (scan != null) {
66+
LOG.debug("Closing the scan.");
67+
scan.close();
68+
}
69+
scan = null;
70+
}
71+
2672
}
73+
2774
}

src/main/java/tech/ydb/spark/connector/YdbTable.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.Set;
1010
import java.util.stream.Collectors;
1111

12+
import org.apache.spark.sql.connector.catalog.SupportsDelete;
1213
import org.apache.spark.sql.connector.catalog.SupportsRead;
1314
import org.apache.spark.sql.connector.catalog.SupportsWrite;
1415
import org.apache.spark.sql.connector.catalog.Table;
@@ -18,12 +19,14 @@
1819
import org.apache.spark.sql.connector.read.ScanBuilder;
1920
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
2021
import org.apache.spark.sql.connector.write.WriteBuilder;
22+
import org.apache.spark.sql.sources.Filter;
2123
import org.apache.spark.sql.types.DataType;
2224
import org.apache.spark.sql.types.Metadata;
2325
import org.apache.spark.sql.types.StructField;
2426
import org.apache.spark.sql.types.StructType;
2527
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
2628

29+
import tech.ydb.spark.connector.impl.YdbConnector;
2730
import tech.ydb.table.description.KeyRange;
2831
import tech.ydb.table.description.TableColumn;
2932
import tech.ydb.table.description.TableDescription;
@@ -35,7 +38,7 @@
3538
*
3639
* @author zinal
3740
*/
38-
public class YdbTable implements Table, SupportsRead, SupportsWrite {
41+
public class YdbTable implements Table, SupportsRead, SupportsWrite, SupportsDelete {
3942

4043
private static final org.slf4j.Logger LOG
4144
= org.slf4j.LoggerFactory.getLogger(YdbTable.class);
@@ -270,6 +273,24 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
270273
return new YdbWriteBuilder(this, info);
271274
}
272275

276+
@Override
277+
public boolean canDeleteWhere(Filter[] filters) {
278+
// We prefer per-row operations
279+
return false;
280+
}
281+
282+
@Override
283+
public void deleteWhere(Filter[] filters) {
284+
// Should not be called, as canDeleteWhere() returns false.
285+
throw new UnsupportedOperationException("Not supported yet.");
286+
}
287+
288+
@Override
289+
public boolean truncateTable() {
290+
// TODO: implementation
291+
throw new UnsupportedOperationException("Not supported yet.");
292+
}
293+
273294
final YdbConnector getConnector() {
274295
return connector;
275296
}

src/main/java/tech/ydb/spark/connector/YdbTableProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import tech.ydb.core.Result;
1515
import tech.ydb.core.Status;
1616
import tech.ydb.core.StatusCode;
17+
import tech.ydb.spark.connector.impl.YdbConnector;
18+
import tech.ydb.spark.connector.impl.YdbRegistry;
1719
import tech.ydb.table.description.TableDescription;
1820
import tech.ydb.table.description.TableIndex;
1921
import tech.ydb.table.settings.DescribeTableSettings;

src/main/java/tech/ydb/spark/connector/YdbWrite.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@
1212
import org.apache.spark.sql.connector.write.WriteBuilder;
1313
import org.apache.spark.sql.connector.write.WriterCommitMessage;
1414

15+
import tech.ydb.spark.connector.impl.YdbWriterImpl;
16+
1517
/**
1618
* YDB table writer: orchestration and partition writer factory.
1719
*
1820
* @author zinal
1921
*/
20-
public class YdbWrite implements Serializable, WriteBuilder, Write, BatchWrite, DataWriterFactory {
22+
public class YdbWrite implements Serializable,
23+
WriteBuilder, Write, BatchWrite, DataWriterFactory {
2124

2225
private static final long serialVersionUID = 1L;
2326

@@ -60,7 +63,7 @@ public void abort(WriterCommitMessage[] messages) {
6063

6164
@Override
6265
public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
63-
return new YdbWriterBasic(options);
66+
return new YdbWriterImpl(options);
6467
}
6568

6669
}

src/main/java/tech/ydb/spark/connector/YdbAlterTable.java renamed to src/main/java/tech/ydb/spark/connector/impl/YdbAlterTable.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package tech.ydb.spark.connector;
1+
package tech.ydb.spark.connector.impl;
22

33
import java.util.Arrays;
44
import java.util.HashMap;
@@ -10,6 +10,10 @@
1010
import org.apache.spark.sql.connector.catalog.TableChange;
1111

1212
import tech.ydb.core.Status;
13+
import tech.ydb.spark.connector.YdbFieldInfo;
14+
import tech.ydb.spark.connector.YdbFieldType;
15+
import tech.ydb.spark.connector.YdbOptions;
16+
import tech.ydb.spark.connector.YdbTypes;
1317
import tech.ydb.table.Session;
1418
import tech.ydb.table.description.TableColumn;
1519
import tech.ydb.table.description.TableDescription;
@@ -22,7 +26,7 @@
2226
*
2327
* @author zinal
2428
*/
25-
class YdbAlterTable extends YdbPropertyHelper {
29+
public class YdbAlterTable extends YdbPropertyHelper {
2630

2731
final YdbTypes types;
2832
final String tablePath;
@@ -31,7 +35,7 @@ class YdbAlterTable extends YdbPropertyHelper {
3135
final Map<String, YdbFieldInfo> addColumns = new HashMap<>();
3236
final Set<String> removeColumns = new HashSet<>();
3337

34-
YdbAlterTable(YdbConnector connector, String tablePath) {
38+
public YdbAlterTable(YdbConnector connector, String tablePath) {
3539
super(null);
3640
this.types = connector.getDefaultTypes();
3741
this.tablePath = tablePath;
@@ -43,7 +47,7 @@ class YdbAlterTable extends YdbPropertyHelper {
4347
}
4448
}
4549

46-
void prepare(TableChange.AddColumn change) {
50+
public void prepare(TableChange.AddColumn change) {
4751
YdbFieldType yft = types.mapTypeSpark2Ydb(change.dataType());
4852
if (null == yft) {
4953
throw new UnsupportedOperationException("Unsupported data type for column: " + change.dataType());
@@ -68,7 +72,7 @@ void prepare(TableChange.AddColumn change) {
6872
}
6973
}
7074

71-
void prepare(TableChange.DeleteColumn change) {
75+
public void prepare(TableChange.DeleteColumn change) {
7276
if (change.fieldNames().length != 1) {
7377
throw new UnsupportedOperationException("Illegal field name value: "
7478
+ Arrays.toString(change.fieldNames()));
@@ -88,7 +92,7 @@ void prepare(TableChange.DeleteColumn change) {
8892
}
8993
}
9094

91-
void prepare(TableChange.SetProperty change) {
95+
public void prepare(TableChange.SetProperty change) {
9296
String property = change.property();
9397
if (!YdbOptions.TABLE_UPDATABLE.contains(property.toUpperCase())) {
9498
throw new UnsupportedOperationException("Unsupported property for table alteration: "
@@ -97,7 +101,7 @@ void prepare(TableChange.SetProperty change) {
97101
properties.put(property.toLowerCase(), change.value());
98102
}
99103

100-
void prepare(TableChange.RemoveProperty change) {
104+
public void prepare(TableChange.RemoveProperty change) {
101105
String property = change.property();
102106
if (!YdbOptions.TABLE_UPDATABLE.contains(property.toUpperCase())) {
103107
throw new UnsupportedOperationException("Unsupported property for table alteration: "
@@ -142,7 +146,7 @@ private void applyProperty(String name, String value, PartitioningSettings ps) {
142146
}
143147
}
144148

145-
CompletableFuture<Status> run(Session session) {
149+
public CompletableFuture<Status> run(Session session) {
146150
final AlterTableSettings settings = new AlterTableSettings();
147151
for (YdbFieldInfo yfi : addColumns.values()) {
148152
if (yfi.isNullable()) {

src/main/java/tech/ydb/spark/connector/YdbConnector.java renamed to src/main/java/tech/ydb/spark/connector/impl/YdbConnector.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package tech.ydb.spark.connector;
1+
package tech.ydb.spark.connector.impl;
22

33
import java.io.IOException;
44
import java.nio.charset.StandardCharsets;
@@ -13,6 +13,10 @@
1313
import tech.ydb.core.grpc.GrpcTransport;
1414
import tech.ydb.core.grpc.GrpcTransportBuilder;
1515
import tech.ydb.scheme.SchemeClient;
16+
import tech.ydb.spark.connector.YdbAuthMode;
17+
import tech.ydb.spark.connector.YdbIngestMethod;
18+
import tech.ydb.spark.connector.YdbOptions;
19+
import tech.ydb.spark.connector.YdbTypes;
1620
import tech.ydb.table.SessionRetryContext;
1721
import tech.ydb.table.TableClient;
1822

0 commit comments

Comments
 (0)