Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.core.type.CqlVectorType;
import com.datastax.oss.driver.api.core.type.VectorType;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.driver.api.core.type.ListType;
import com.datastax.oss.driver.api.core.type.MapType;
Expand Down Expand Up @@ -76,7 +76,7 @@ public AbstractNativeConverter(KeyspaceMetadata ksm, TableMetadata tm, List<Colu
log.info("Add collection schema {}={}", field.name(), collectionSchema);
break;
case ProtocolConstants.DataType.CUSTOM:
if (cm.getType() instanceof CqlVectorType) {
if (cm.getType() instanceof VectorType) {
Schema vectorSchema = dataTypeSchema(ksm, cm.getType());
subSchemas.put(field.name(), vectorSchema);
log.info("Add vector schema {}={}", field.name(), vectorSchema);
Expand Down Expand Up @@ -130,7 +130,7 @@ boolean isSupportedCqlType(DataType dataType) {
case ProtocolConstants.DataType.MAP:
return true;
case ProtocolConstants.DataType.CUSTOM:
return dataType instanceof CqlVectorType;
return dataType instanceof VectorType;
}
return false;
}
Expand Down Expand Up @@ -200,9 +200,9 @@ Schema dataTypeSchema(KeyspaceMetadata ksm, DataType dataType) {
MapType mapType = (MapType) dataType;
return org.apache.avro.Schema.createMap(dataTypeSchema(ksm, mapType.getValueType()));
case ProtocolConstants.DataType.CUSTOM:
if (dataType instanceof CqlVectorType) {
CqlVectorType vectorType = (CqlVectorType) dataType;
return org.apache.avro.Schema.createArray(dataTypeSchema(ksm, vectorType.getSubtype()));
if (dataType instanceof VectorType) {
VectorType vectorType = (VectorType) dataType;
return org.apache.avro.Schema.createArray(dataTypeSchema(ksm, vectorType.getElementType()));
}
default:
throw new UnsupportedOperationException("Ignoring unsupported type=" + dataType.asCql(false, true));
Expand Down Expand Up @@ -260,3 +260,4 @@ String stringify(DataType dataType, Object value) {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.core.type.CqlVectorType;
import com.datastax.oss.driver.api.core.type.VectorType;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.driver.api.core.type.ListType;
import com.datastax.oss.driver.api.core.type.MapType;
Expand Down Expand Up @@ -171,12 +171,13 @@ public byte[] toConnectData(Row row) {
}
break;
case ProtocolConstants.DataType.CUSTOM: {
if (cm.getType() instanceof CqlVectorType) {
if (cm.getType() instanceof VectorType) {
VectorType vectorType = (VectorType) cm.getType();
Schema vectorSchema = subSchemas.get(fieldName);
CqlVector<?> vector = row.getCqlVector(fieldName);
CqlVector<?> vector = row.getVector(fieldName, Number.class);
log.debug("field={} listSchema={} listValue={}", fieldName, vectorSchema, vector);
List<Object> vectorValue = new ArrayList<>();
vector.getValues().forEach(vectorValue::add);
vector.stream().forEach(vectorValue::add);
genericRecordBuilder.put(fieldName, buildArrayValue(vectorSchema, vectorValue));
}
}
Expand Down Expand Up @@ -275,35 +276,35 @@ GenericRecord buildUDTValue(Schema udtSchema, UdtValue udtValue) {
genericRecord.put(field.toString(), udtValue.getBigInteger(field));
break;
case ProtocolConstants.DataType.LIST: {
ListType listType = (ListType) udtValue.getType(field);
List listValue = udtValue.getList(field, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType());
String path = typeName + "." + field.toString();
Schema elementSchema = subSchemas.get(path);
log.debug("path={} elementSchema={} listType={} listValue={}", path, elementSchema, listType, listValue);
genericRecord.put(field.toString(), buildArrayValue(elementSchema, listValue));
}
break;
ListType listType = (ListType) udtValue.getType(field);
List listValue = udtValue.getList(field, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType());
String path = typeName + "." + field.toString();
Schema elementSchema = subSchemas.get(path);
log.debug("path={} elementSchema={} listType={} listValue={}", path, elementSchema, listType, listValue);
genericRecord.put(field.toString(), buildArrayValue(elementSchema, listValue));
}
break;
case ProtocolConstants.DataType.SET: {
SetType setType = (SetType) udtValue.getType(field);
Set setValue = udtValue.getSet(field, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType());
String path = typeName + "." + field.toString();
Schema elementSchema = subSchemas.get(path);
log.debug("path={} elementSchema={} setType={} setValue={}", path, elementSchema, setType, setValue);
genericRecord.put(field.toString(), buildArrayValue(elementSchema, setValue));
}
break;
SetType setType = (SetType) udtValue.getType(field);
Set setValue = udtValue.getSet(field, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType());
String path = typeName + "." + field.toString();
Schema elementSchema = subSchemas.get(path);
log.debug("path={} elementSchema={} setType={} setValue={}", path, elementSchema, setType, setValue);
genericRecord.put(field.toString(), buildArrayValue(elementSchema, setValue));
}
break;
case ProtocolConstants.DataType.MAP: {
MapType mapType = (MapType) udtValue.getType(field);
Map<String, Object> mapValue = udtValue.getMap(field,
CodecRegistry.DEFAULT.codecFor(mapType.getKeyType()).getJavaType().getRawType(),
CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType())
.entrySet().stream().collect(Collectors.toMap(e -> stringify(mapType.getKeyType(), e.getKey()), Map.Entry::getValue));
String path = typeName + "." + field.toString();
Schema valueSchema = subSchemas.get(path);
log.debug("path={} valueSchema={} mapType={} mapValue={}", path, valueSchema, mapType, mapValue);
genericRecord.put(field.toString(), mapValue);
}
break;
MapType mapType = (MapType) udtValue.getType(field);
Map<String, Object> mapValue = udtValue.getMap(field,
CodecRegistry.DEFAULT.codecFor(mapType.getKeyType()).getJavaType().getRawType(),
CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType())
.entrySet().stream().collect(Collectors.toMap(e -> stringify(mapType.getKeyType(), e.getKey()), Map.Entry::getValue));
String path = typeName + "." + field.toString();
Schema valueSchema = subSchemas.get(path);
log.debug("path={} valueSchema={} mapType={} mapValue={}", path, valueSchema, mapType, mapValue);
genericRecord.put(field.toString(), mapValue);
}
break;
default:
log.debug("Ignoring unsupported type field name={} type={}", field, dataType.asCql(false, true));
}
Expand Down Expand Up @@ -418,3 +419,4 @@ public IndexedRecord toRecord(CqlDuration value, Schema schema, LogicalType type
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
public class NativeJsonConverter extends AbstractNativeConverter<byte[]> {
private static final ObjectMapper mapper = new ObjectMapper();

private static final JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
private static final JsonNodeFactory jsonNodeFactory = new JsonNodeFactory(true);

public NativeJsonConverter(KeyspaceMetadata ksm, TableMetadata tm, List<ColumnMetadata> columns) {
super(ksm, tm, columns);
Expand Down
34 changes: 17 additions & 17 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ releasesRepoUrl=https://repo.datastax.com/artifactory/datastax-public-releases-l

# deps version
avroVersion=1.11.4
lombokVersion=1.18.20
ossDriverVersion=4.16.0
cassandra3Version=3.11.10
cassandra4Version=4.0.4
lombokVersion=1.18.38
ossDriverVersion=4.17.0
cassandra3Version=3.11.19
cassandra4Version=4.0.18
dse4Version=6.8.23

pulsarGroup=org.apache.pulsar
Expand All @@ -20,22 +20,22 @@ pulsarVersion=3.0.0
testPulsarImage=datastax/lunastreaming
testPulsarImageTag=2.10_3.4

kafkaVersion=3.4.0
vavrVersion=0.10.3
kafkaVersion=3.4.1
vavrVersion=0.10.6
testContainersVersion=1.19.1
caffeineVersion=2.8.8
caffeineVersion=2.9.3
guavaVersion=30.1-jre
messagingConnectorsCommonsVersion=1.0.14
slf4jVersion=1.7.30
messagingConnectorsCommonsVersion=1.0.15
slf4jVersion=1.7.36
# pulsar connector
logbackVersion=1.2.9
jacksonBomVersion=2.13.4.20221013
jnrVersion=3.1.15
nettyVersion=4.1.72.Final
nettyTcNativeVersion=2.0.46.Final
commonCompressVersion=1.21
logbackVersion=1.2.13
jacksonBomVersion=2.13.5
jnrVersion=3.1.20
nettyVersion=4.1.122.Final
nettyTcNativeVersion=2.0.72.Final
commonCompressVersion=1.27.1
gsonVersion=2.8.9
jsonVersion=20230227
jsonVersion=20250107

# cassandra settings for docker images
commitlog_sync_period_in_ms=2000
Expand All @@ -45,4 +45,4 @@ cdc_total_space_in_mb=70
dockerRepo=myrepo/

# CDC backfilling Client
dsbulkVersion=1.10.0
dsbulkVersion=1.11.0
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,7 @@
import com.datastax.testcontainers.PulsarContainer;
import com.datastax.testcontainers.cassandra.CassandraContainer;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
Expand All @@ -39,14 +33,11 @@
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.Network;
import org.testcontainers.images.builder.Transferable;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -229,10 +220,12 @@ public void testUnorderedMutations() throws InterruptedException, IOException {
String pulsarServiceUrl = "pulsar://pulsar:" + pulsarContainer.BROKER_PORT;
Long testId = Math.abs(AgentTestUtil.random.nextLong());
String randomDataDir = System.getProperty("buildDir") + "/data-" + testId + "-";
try (CassandraContainer<?> cassandraContainer1 = createCassandraContainer(1, pulsarServiceUrl, testNetwork)
try (
CassandraContainer<?> cassandraContainer1 = createCassandraContainer(1, pulsarServiceUrl, testNetwork)
.withFileSystemBind(randomDataDir + "1", "/var/lib/cassandra");
CassandraContainer<?> cassandraContainer2 = createCassandraContainer(2, pulsarServiceUrl, testNetwork)
.withFileSystemBind(randomDataDir + "2", "/var/lib/cassandra")) {
CassandraContainer<?> cassandraContainer2 = createCassandraContainer(2, pulsarServiceUrl, testNetwork)
.withFileSystemBind(randomDataDir + "2", "/var/lib/cassandra");
) {
cassandraContainer1.start();
cassandraContainer2.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.testcontainers.delegate.DatabaseDelegate;
import org.testcontainers.ext.ScriptUtils;
import org.testcontainers.ext.ScriptUtils.ScriptLoadException;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

Expand Down