Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Release Notes

## [5.6.0] - 2025-09-10
- Added support for handling token collision scenario in the cluster and present users with an appropriate error message.

## [5.5.1] - 2025-08-01
- Fixed issue related to empty text fields not getting migrated (introduced in 5.4.0). `Null` fields will still be skipped, however not empty strings.
- Filtered rows will now be logged at LOG4J `TRACE` level to avoid filling the logs. Users can enabled `TRACE` level logging if such logs are needed.
Expand Down
83 changes: 48 additions & 35 deletions src/main/java/com/datastax/cdm/job/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.datastax.cdm.job.IJobSessionFactory.JobType;
import com.datastax.cdm.properties.KnownProperties;
import com.datastax.cdm.properties.PropertyHelper;
import com.datastax.cdm.schema.ClusterConfigurationException;
import com.datastax.cdm.schema.CqlTable;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
Expand All @@ -55,45 +56,57 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
return;
}

rateLimiterOrigin = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN));
rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));

logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate());
logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate());

CqlTable cqlTableOrigin, cqlTableTarget = null;
this.originSession = new EnhancedSession(propertyHelper, originSession, true);
cqlTableOrigin = this.originSession.getCqlTable();
cqlTableOrigin.setFeatureMap(featureMap);

boolean allFeaturesValid = true;
if (targetSession != null) {
this.targetSession = new EnhancedSession(propertyHelper, targetSession, false);
cqlTableTarget = this.targetSession.getCqlTable();
cqlTableOrigin.setOtherCqlTable(cqlTableTarget);
cqlTableTarget.setOtherCqlTable(cqlTableOrigin);
cqlTableTarget.setFeatureMap(featureMap);
for (Feature f : featureMap.values()) {
if (!f.initializeAndValidate(cqlTableOrigin, cqlTableTarget)) {
allFeaturesValid = false;
logger.error("Feature {} is not valid. Please check the configuration.", f.getClass().getName());
try {
rateLimiterOrigin = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN));
rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));

logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate());
logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate());

CqlTable cqlTableOrigin, cqlTableTarget = null;
this.originSession = new EnhancedSession(propertyHelper, originSession, true);
cqlTableOrigin = this.originSession.getCqlTable();
cqlTableOrigin.setFeatureMap(featureMap);

boolean allFeaturesValid = true;
if (targetSession != null) {
this.targetSession = new EnhancedSession(propertyHelper, targetSession, false);
cqlTableTarget = this.targetSession.getCqlTable();
cqlTableOrigin.setOtherCqlTable(cqlTableTarget);
cqlTableTarget.setOtherCqlTable(cqlTableOrigin);
cqlTableTarget.setFeatureMap(featureMap);
for (Feature f : featureMap.values()) {
if (!f.initializeAndValidate(cqlTableOrigin, cqlTableTarget)) {
allFeaturesValid = false;
logger.error("Feature {} is not valid. Please check the configuration.",
f.getClass().getName());
}
}
}

PKFactory pkFactory = new PKFactory(propertyHelper, cqlTableOrigin, cqlTableTarget);
this.originSession.setPKFactory(pkFactory);
this.targetSession.setPKFactory(pkFactory);
}
PKFactory pkFactory = new PKFactory(propertyHelper, cqlTableOrigin, cqlTableTarget);
this.originSession.setPKFactory(pkFactory);
this.targetSession.setPKFactory(pkFactory);
}

if (!allFeaturesValid) {
throw new RuntimeException("One or more features are not valid. Please check the configuration.");
}
if (!allFeaturesValid) {
throw new RuntimeException("One or more features are not valid. Please check the configuration.");
}

this.guardrailFeature = (Guardrail) cqlTableOrigin.getFeature(Featureset.GUARDRAIL_CHECK);
if (!guardrailFeature.initializeAndValidate(cqlTableOrigin, null)) {
allFeaturesValid = false;
logger.error("Feature {} is not valid. Please check the configuration.",
guardrailFeature.getClass().getName());
this.guardrailFeature = (Guardrail) cqlTableOrigin.getFeature(Featureset.GUARDRAIL_CHECK);
if (!guardrailFeature.initializeAndValidate(cqlTableOrigin, null)) {
allFeaturesValid = false;
logger.error("Feature {} is not valid. Please check the configuration.",
guardrailFeature.getClass().getName());
}
} catch (ClusterConfigurationException e) {
logger.error("Cluster configuration error may be present & detected: {}", e.getMessage());
logger.error(
"Please check your Cassandra cluster for token overlap issues. This usually happens when multiple nodes in the cluster were started simultaneously when the cluster was originally built.");
logger.error(
"You can verify this by running 'nodetool describering <keyspace>' and checking for overlapping token ranges.");
logger.error(
"In general, to fix token overlap in a cluster: 1) Rebuild the entire cluster by removing nodes 2) Re-add nodes one at a time 3) Run nodetool cleanup on each node 4) Verify with 'nodetool describering <keyspace>'");
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.cdm.schema;

/**
* Exception thrown when there are issues with the Cassandra cluster configuration, such as token overlap problems or
* other metadata-related issues.
*/
public class ClusterConfigurationException extends RuntimeException {

/**
* Constructs a new exception with the specified detail message.
*
* @param message
* the detail message
*/
public ClusterConfigurationException(String message) {
super(message);
}

/**
* Constructs a new exception with the specified detail message and cause.
*
* @param message
* the detail message
* @param cause
* the cause of the exception
*/
public ClusterConfigurationException(String message, Throwable cause) {
super(message, cause);
}
}
134 changes: 129 additions & 5 deletions src/main/java/com/datastax/cdm/schema/CqlTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package com.datastax.cdm.schema;

import java.math.BigInteger;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -42,8 +44,10 @@
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.TokenMap;
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
Expand Down Expand Up @@ -432,11 +436,34 @@ protected Metadata fetchMetadataFromSession(CqlSession cqlSession) {
private void setCqlMetadata(CqlSession cqlSession) {
Metadata metadata = fetchMetadataFromSession(cqlSession);

String partitionerName = metadata.getTokenMap().get().getPartitionerName();
if (null != partitionerName && partitionerName.endsWith("RandomPartitioner"))
this.hasRandomPartitioner = true;
else
this.hasRandomPartitioner = false;
// Check for token overlap in the specific keyspace
// Only run this check if the keyspace name is provided
if (this.keyspaceName != null && !this.keyspaceName.isEmpty()
&& hasTokenOverlap(cqlSession, this.keyspaceName)) {
throw new ClusterConfigurationException("Token overlap detected in keyspace '" + this.keyspaceName
+ "'. This usually happens when multiple nodes "
+ "were started simultaneously. To fix: 1) Restart nodes one at a time 2) Run 'nodetool cleanup' "
+ "on each node 3) Verify with 'nodetool describering " + this.keyspaceName + "'.");
}

// Add proper Optional handling for token map
Optional<TokenMap> tokenMapOpt = metadata.getTokenMap();
if (!tokenMapOpt.isPresent()) {
throw new ClusterConfigurationException(
"Token map is not available. This could indicate a cluster configuration issue.");
}

try {
String partitionerName = tokenMapOpt.get().getPartitionerName();
if (null != partitionerName && partitionerName.endsWith("RandomPartitioner"))
this.hasRandomPartitioner = true;
else
this.hasRandomPartitioner = false;
} catch (Exception e) {
throw new ClusterConfigurationException("Error accessing token map: " + e.getMessage()
+ ". This may indicate token overlap in the Cassandra cluster. Check your cluster configuration.",
e);
}

Optional<KeyspaceMetadata> keyspaceMetadataOpt = metadata.getKeyspace(formatName(this.keyspaceName));
if (!keyspaceMetadataOpt.isPresent()) {
Expand Down Expand Up @@ -564,4 +591,101 @@ protected static ConsistencyLevel mapToConsistencyLevel(String level) {
return retVal;
}

/**
* Checks if the specified keyspace has token overlap issues by querying the system.size_estimates table.
*
* @param cqlSession
* The CQL session to use for executing the query
* @param keyspaceName
* The name of the keyspace to check
*
* @return true if token overlap is detected, false otherwise
*/
private boolean hasTokenOverlap(CqlSession cqlSession, String keyspaceName) {
// Return false if either the session or keyspace name is null
if (cqlSession == null || keyspaceName == null || keyspaceName.isEmpty()) {
return false;
}

try {
// Execute query to check for token ranges for the specific keyspace
String query = "SELECT start_token, end_token FROM system.size_estimates WHERE keyspace_name = ?";
ResultSet rs = cqlSession.execute(query, keyspaceName);

// Add null check for ResultSet to handle potential driver issues
if (rs == null) {
logger.warn("Unable to query system.size_estimates for keyspace {}: ResultSet is null", keyspaceName);
return false;
}

// Create a list to store token ranges for the keyspace
List<TokenRange> ranges = new ArrayList<>();

// Process the results
for (Row row : rs) {
BigInteger startToken = new BigInteger(row.getString("start_token"));
BigInteger endToken = new BigInteger(row.getString("end_token"));
ranges.add(new TokenRange(startToken, endToken));
}

// Check for overlaps
if (hasOverlappingTokens(ranges)) {
logger.error("Token overlap detected in keyspace: {}", keyspaceName);
return true;
}

return false;
} catch (Exception e) {
logger.warn("Could not check for token overlap in keyspace {}: {}", keyspaceName, e.getMessage());
return false;
}
}

/**
* Determines if there are overlapping token ranges in the provided list.
*
* @param ranges
* List of token ranges to check
*
* @return true if any ranges overlap, false otherwise
*/
private boolean hasOverlappingTokens(List<TokenRange> ranges) {
// Return false if ranges is null or empty (no overlap possible)
if (ranges == null || ranges.isEmpty() || ranges.size() < 2) {
return false;
}

// Sort ranges by start token
Collections.sort(ranges);

// Check for overlaps
for (int i = 0; i < ranges.size() - 1; i++) {
TokenRange current = ranges.get(i);
TokenRange next = ranges.get(i + 1);

if (current.endToken.compareTo(next.startToken) > 0) {
return true;
}
}

return false;
}

/**
* Helper class to represent a token range with Comparable implementation for sorting.
*/
private static class TokenRange implements Comparable<TokenRange> {
BigInteger startToken;
BigInteger endToken;

TokenRange(BigInteger startToken, BigInteger endToken) {
this.startToken = startToken;
this.endToken = endToken;
}

@Override
public int compareTo(TokenRange other) {
return this.startToken.compareTo(other.startToken);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.cdm.job;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;

import com.datastax.cdm.schema.ClusterConfigurationException;

@ExtendWith(MockitoExtension.class)
public class AbstractJobSessionErrorHandlingTest {

@Mock
private Logger mockLogger;

/**
* Simple test to verify error logging messages when ClusterConfigurationException is caught
*/
@Test
void testClusterConfigurationExceptionHandling() {
// Create an exception with a test message
ClusterConfigurationException tokenOverlapException = new ClusterConfigurationException(
"Token overlap detected in keyspace 'test'. This usually happens when multiple nodes were started simultaneously.");

// Log the appropriate error messages
mockLogger.error("Cluster configuration error detected: {}", tokenOverlapException.getMessage());
mockLogger.error(
"Please check your Cassandra cluster for token overlap issues. This usually happens when multiple nodes in the cluster were started simultaneously.");
mockLogger.error(
"You can verify this by running 'nodetool describering <keyspace>' and checking for overlapping token ranges.");
mockLogger.error(
"To fix token overlap: 1) Restart nodes one at a time 2) Run nodetool cleanup on each node 3) Verify with nodetool describering");

// Verify that the logger was called with the expected messages
// Without matcher, verify exact method calls
verify(mockLogger).error(eq("Cluster configuration error detected: {}"), anyString());
verify(mockLogger).error(eq(
"Please check your Cassandra cluster for token overlap issues. This usually happens when multiple nodes in the cluster were started simultaneously."));
verify(mockLogger).error(eq(
"You can verify this by running 'nodetool describering <keyspace>' and checking for overlapping token ranges."));
verify(mockLogger).error(eq(
"To fix token overlap: 1) Restart nodes one at a time 2) Run nodetool cleanup on each node 3) Verify with nodetool describering"));
}
}
Loading