Skip to content

Commit 5f03be1

Browse files
committed
issue 386 - handle cluster token overlap issue gracefully
1 parent a02ed1e commit 5f03be1

File tree

3 files changed

+194
-42
lines changed

3 files changed

+194
-42
lines changed

src/main/java/com/datastax/cdm/job/AbstractJobSession.java

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.datastax.cdm.job.IJobSessionFactory.JobType;
3131
import com.datastax.cdm.properties.KnownProperties;
3232
import com.datastax.cdm.properties.PropertyHelper;
33+
import com.datastax.cdm.schema.ClusterConfigurationException;
3334
import com.datastax.cdm.schema.CqlTable;
3435
import com.datastax.oss.driver.api.core.CqlSession;
3536
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
@@ -55,45 +56,53 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
5556
return;
5657
}
5758

58-
rateLimiterOrigin = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN));
59-
rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));
60-
61-
logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate());
62-
logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate());
63-
64-
CqlTable cqlTableOrigin, cqlTableTarget = null;
65-
this.originSession = new EnhancedSession(propertyHelper, originSession, true);
66-
cqlTableOrigin = this.originSession.getCqlTable();
67-
cqlTableOrigin.setFeatureMap(featureMap);
68-
69-
boolean allFeaturesValid = true;
70-
if (targetSession != null) {
71-
this.targetSession = new EnhancedSession(propertyHelper, targetSession, false);
72-
cqlTableTarget = this.targetSession.getCqlTable();
73-
cqlTableOrigin.setOtherCqlTable(cqlTableTarget);
74-
cqlTableTarget.setOtherCqlTable(cqlTableOrigin);
75-
cqlTableTarget.setFeatureMap(featureMap);
76-
for (Feature f : featureMap.values()) {
77-
if (!f.initializeAndValidate(cqlTableOrigin, cqlTableTarget)) {
78-
allFeaturesValid = false;
79-
logger.error("Feature {} is not valid. Please check the configuration.", f.getClass().getName());
59+
try {
60+
rateLimiterOrigin = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN));
61+
rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));
62+
63+
logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate());
64+
logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate());
65+
66+
CqlTable cqlTableOrigin, cqlTableTarget = null;
67+
this.originSession = new EnhancedSession(propertyHelper, originSession, true);
68+
cqlTableOrigin = this.originSession.getCqlTable();
69+
cqlTableOrigin.setFeatureMap(featureMap);
70+
71+
boolean allFeaturesValid = true;
72+
if (targetSession != null) {
73+
this.targetSession = new EnhancedSession(propertyHelper, targetSession, false);
74+
cqlTableTarget = this.targetSession.getCqlTable();
75+
cqlTableOrigin.setOtherCqlTable(cqlTableTarget);
76+
cqlTableTarget.setOtherCqlTable(cqlTableOrigin);
77+
cqlTableTarget.setFeatureMap(featureMap);
78+
for (Feature f : featureMap.values()) {
79+
if (!f.initializeAndValidate(cqlTableOrigin, cqlTableTarget)) {
80+
allFeaturesValid = false;
81+
logger.error("Feature {} is not valid. Please check the configuration.", f.getClass().getName());
82+
}
8083
}
81-
}
8284

83-
PKFactory pkFactory = new PKFactory(propertyHelper, cqlTableOrigin, cqlTableTarget);
84-
this.originSession.setPKFactory(pkFactory);
85-
this.targetSession.setPKFactory(pkFactory);
86-
}
85+
PKFactory pkFactory = new PKFactory(propertyHelper, cqlTableOrigin, cqlTableTarget);
86+
this.originSession.setPKFactory(pkFactory);
87+
this.targetSession.setPKFactory(pkFactory);
88+
}
8789

88-
if (!allFeaturesValid) {
89-
throw new RuntimeException("One or more features are not valid. Please check the configuration.");
90-
}
90+
if (!allFeaturesValid) {
91+
throw new RuntimeException("One or more features are not valid. Please check the configuration.");
92+
}
9193

92-
this.guardrailFeature = (Guardrail) cqlTableOrigin.getFeature(Featureset.GUARDRAIL_CHECK);
93-
if (!guardrailFeature.initializeAndValidate(cqlTableOrigin, null)) {
94-
allFeaturesValid = false;
95-
logger.error("Feature {} is not valid. Please check the configuration.",
96-
guardrailFeature.getClass().getName());
94+
this.guardrailFeature = (Guardrail) cqlTableOrigin.getFeature(Featureset.GUARDRAIL_CHECK);
95+
if (!guardrailFeature.initializeAndValidate(cqlTableOrigin, null)) {
96+
allFeaturesValid = false;
97+
logger.error("Feature {} is not valid. Please check the configuration.",
98+
guardrailFeature.getClass().getName());
99+
}
100+
} catch (ClusterConfigurationException e) {
101+
logger.error("Cluster configuration error may be present & detected: {}", e.getMessage());
102+
logger.error("Please check your Cassandra cluster for token overlap issues. This usually happens when multiple nodes in the cluster were started simultaneously when the cluster was originally built.");
103+
logger.error("You can verify this by running 'nodetool describering <keyspace>' and checking for overlapping token ranges.");
104+
logger.error("In general, to fix token overlap in a cluster: 1) Rebuild the entire cluster by removing nodes 2) Re-add nodes one at a time 3) Run nodetool cleanup on each node 4) Verify with 'nodetool describering <keyspace>'");
105+
throw e;
97106
}
98107
}
99108

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.cdm.schema;
17+
18+
/**
19+
* Exception thrown when there are issues with the Cassandra cluster configuration,
20+
* such as token overlap problems or other metadata-related issues.
21+
*/
22+
public class ClusterConfigurationException extends RuntimeException {
23+
24+
/**
25+
* Constructs a new exception with the specified detail message.
26+
*
27+
* @param message the detail message
28+
*/
29+
public ClusterConfigurationException(String message) {
30+
super(message);
31+
}
32+
33+
/**
34+
* Constructs a new exception with the specified detail message and cause.
35+
*
36+
* @param message the detail message
37+
* @param cause the cause of the exception
38+
*/
39+
public ClusterConfigurationException(String message, Throwable cause) {
40+
super(message, cause);
41+
}
42+
}

src/main/java/com/datastax/cdm/schema/CqlTable.java

Lines changed: 108 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
*/
1616
package com.datastax.cdm.schema;
1717

18+
import java.math.BigInteger;
1819
import java.time.Instant;
1920
import java.util.ArrayList;
21+
import java.util.Collections;
2022
import java.util.HashMap;
2123
import java.util.List;
2224
import java.util.Map;
@@ -42,7 +44,9 @@
4244
import com.datastax.oss.driver.api.core.ConsistencyLevel;
4345
import com.datastax.oss.driver.api.core.CqlIdentifier;
4446
import com.datastax.oss.driver.api.core.CqlSession;
47+
import com.datastax.oss.driver.api.core.cql.ResultSet;
4548
import com.datastax.oss.driver.api.core.cql.Row;
49+
import com.datastax.oss.driver.api.core.metadata.TokenMap;
4650
import com.datastax.oss.driver.api.core.metadata.Metadata;
4751
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
4852
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
@@ -431,12 +435,33 @@ protected Metadata fetchMetadataFromSession(CqlSession cqlSession) {
431435

432436
private void setCqlMetadata(CqlSession cqlSession) {
433437
Metadata metadata = fetchMetadataFromSession(cqlSession);
434-
435-
String partitionerName = metadata.getTokenMap().get().getPartitionerName();
436-
if (null != partitionerName && partitionerName.endsWith("RandomPartitioner"))
437-
this.hasRandomPartitioner = true;
438-
else
439-
this.hasRandomPartitioner = false;
438+
439+
// Check for token overlap in the specific keyspace
440+
if (hasTokenOverlap(cqlSession, this.keyspaceName)) {
441+
throw new ClusterConfigurationException(
442+
"Token overlap detected in keyspace '" + this.keyspaceName + "'. This usually happens when multiple nodes " +
443+
"were started simultaneously. To fix: 1) Restart nodes one at a time 2) Run 'nodetool cleanup' " +
444+
"on each node 3) Verify with 'nodetool describering " + this.keyspaceName + "'.");
445+
}
446+
447+
// Add proper Optional handling for token map
448+
Optional<TokenMap> tokenMapOpt = metadata.getTokenMap();
449+
if (!tokenMapOpt.isPresent()) {
450+
throw new ClusterConfigurationException(
451+
"Token map is not available. This could indicate a cluster configuration issue.");
452+
}
453+
454+
try {
455+
String partitionerName = tokenMapOpt.get().getPartitionerName();
456+
if (null != partitionerName && partitionerName.endsWith("RandomPartitioner"))
457+
this.hasRandomPartitioner = true;
458+
else
459+
this.hasRandomPartitioner = false;
460+
} catch (Exception e) {
461+
throw new ClusterConfigurationException(
462+
"Error accessing token map: " + e.getMessage() +
463+
". This may indicate token overlap in the Cassandra cluster. Check your cluster configuration.", e);
464+
}
440465

441466
Optional<KeyspaceMetadata> keyspaceMetadataOpt = metadata.getKeyspace(formatName(this.keyspaceName));
442467
if (!keyspaceMetadataOpt.isPresent()) {
@@ -563,5 +588,81 @@ protected static ConsistencyLevel mapToConsistencyLevel(String level) {
563588

564589
return retVal;
565590
}
566-
591+
592+
/**
593+
* Checks if the specified keyspace has token overlap issues by querying the system.size_estimates table.
594+
*
595+
* @param cqlSession The CQL session to use for executing the query
596+
* @param keyspaceName The name of the keyspace to check
597+
* @return true if token overlap is detected, false otherwise
598+
*/
599+
private boolean hasTokenOverlap(CqlSession cqlSession, String keyspaceName) {
600+
try {
601+
// Execute query to check for token ranges for the specific keyspace
602+
String query = "SELECT start_token, end_token FROM system.size_estimates WHERE keyspace_name = ?";
603+
ResultSet rs = cqlSession.execute(query, keyspaceName);
604+
605+
// Create a list to store token ranges for the keyspace
606+
List<TokenRange> ranges = new ArrayList<>();
607+
608+
// Process the results
609+
for (Row row : rs) {
610+
BigInteger startToken = new BigInteger(row.getString("start_token"));
611+
BigInteger endToken = new BigInteger(row.getString("end_token"));
612+
ranges.add(new TokenRange(startToken, endToken));
613+
}
614+
615+
// Check for overlaps
616+
if (hasOverlappingTokens(ranges)) {
617+
logger.error("Token overlap detected in keyspace: {}", keyspaceName);
618+
return true;
619+
}
620+
621+
return false;
622+
} catch (Exception e) {
623+
logger.warn("Could not check for token overlap in keyspace {}: {}", keyspaceName, e.getMessage());
624+
return false;
625+
}
626+
}
627+
628+
/**
629+
* Determines if there are overlapping token ranges in the provided list.
630+
*
631+
* @param ranges List of token ranges to check
632+
* @return true if any ranges overlap, false otherwise
633+
*/
634+
private boolean hasOverlappingTokens(List<TokenRange> ranges) {
635+
// Sort ranges by start token
636+
Collections.sort(ranges);
637+
638+
// Check for overlaps
639+
for (int i = 0; i < ranges.size() - 1; i++) {
640+
TokenRange current = ranges.get(i);
641+
TokenRange next = ranges.get(i + 1);
642+
643+
if (current.endToken.compareTo(next.startToken) > 0) {
644+
return true;
645+
}
646+
}
647+
648+
return false;
649+
}
650+
651+
/**
652+
* Helper class to represent a token range with Comparable implementation for sorting.
653+
*/
654+
private static class TokenRange implements Comparable<TokenRange> {
655+
BigInteger startToken;
656+
BigInteger endToken;
657+
658+
TokenRange(BigInteger startToken, BigInteger endToken) {
659+
this.startToken = startToken;
660+
this.endToken = endToken;
661+
}
662+
663+
@Override
664+
public int compareTo(TokenRange other) {
665+
return this.startToken.compareTo(other.startToken);
666+
}
667+
}
567668
}

0 commit comments

Comments
 (0)