Skip to content

Commit f23b0bf

Browse files
authored
CNDB-14353: fetch non-compacting sstables instead of live sstables for UCS (#1861)
This is to fix a race in UCS: 1. sstable-1 is compacting 2. 2nd compaction thread fetches live sstable which includes sstable-1 3. sstable-1 is now compacted, no longer as compacting in Tracker 4. 2nd compaction thread continues: UCS#getCompactionArenas fetches list of compacting sstables which doesn't include sstable-1 5. sstable-1 is now included for next compaction but failed to create txn because it's already compacted ### What is the issue #14353: UCS failed to create txn because sstable is already compacted on non-compactor ### What does this PR fix and why was it fixed use non-compacting sstables instead of live sstables, so there won't be race
1 parent cbb5495 commit f23b0bf

File tree

2 files changed

+51
-5
lines changed

2 files changed

+51
-5
lines changed

src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
5858
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
5959
import org.apache.cassandra.db.lifecycle.PartialLifecycleTransaction;
60+
import org.apache.cassandra.db.lifecycle.SSTableSet;
6061
import org.apache.cassandra.dht.Range;
6162
import org.apache.cassandra.dht.Token;
6263
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -217,7 +218,7 @@ public synchronized CompactionTasks getUserDefinedTasks(Collection<? extends Com
217218
/// same effect as compacting all of the sstables in the arena together in one operation.
218219
public synchronized List<CompactionAggregate.UnifiedAggregate> getMaximalAggregates()
219220
{
220-
return getMaximalAggregates(realm.getLiveSSTables());
221+
return getMaximalAggregates(Sets.newHashSet(realm.getSSTables(SSTableSet.NONCOMPACTING)));
221222
}
222223

223224
public synchronized List<CompactionAggregate.UnifiedAggregate> getMaximalAggregates(Collection<? extends CompactionSSTable> sstables)
@@ -1209,7 +1210,7 @@ public Level getLevel(int index, double min, double max)
12091210
@VisibleForTesting
12101211
Map<Arena, List<Level>> getLevels()
12111212
{
1212-
return getLevels(realm.getLiveSSTables(), UnifiedCompactionStrategy::isSuitableForCompaction);
1213+
return getLevels(Sets.newHashSet(realm.getSSTables(SSTableSet.NONCOMPACTING)), UnifiedCompactionStrategy::isSuitableForCompaction);
12131214
}
12141215

12151216
private static boolean isSuitableForCompaction(CompactionSSTable sstable, boolean isCompacting)
@@ -1224,8 +1225,7 @@ Iterable<? extends CompactionSSTable> getSuitableSSTables()
12241225

12251226
Iterable<? extends CompactionSSTable> getFilteredSSTables(BiPredicate<CompactionSSTable, Boolean> predicate)
12261227
{
1227-
Set<? extends CompactionSSTable> compacting = realm.getCompactingSSTables();
1228-
return Iterables.filter(realm.getLiveSSTables(), s -> predicate.test(s, compacting.contains(s)));
1228+
return Iterables.filter(realm.getSSTables(SSTableSet.NONCOMPACTING), s -> predicate.test(s, false));
12291229
}
12301230

12311231
/// Groups the sstables passed in into arenas and buckets. This is used by the strategy to determine
@@ -1314,7 +1314,7 @@ public Map<Arena, List<Level>> getLevels(Collection<? extends CompactionSSTable>
13141314
@Override
13151315
public Map<String, String> getMaxOverlapsMap()
13161316
{
1317-
final Set<? extends CompactionSSTable> liveSSTables = realm.getLiveSSTables();
1317+
final Set<? extends CompactionSSTable> liveSSTables = Sets.newHashSet(realm.getSSTables(SSTableSet.NONCOMPACTING));
13181318
Map<UnifiedCompactionStrategy.Arena, List<UnifiedCompactionStrategy.Level>> arenas =
13191319
getLevels(liveSSTables, (i1, i2) -> true); // take all sstables
13201320

test/unit/org/apache/cassandra/db/compaction/UnifiedCompactionStrategyTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,52 @@ private void testGetMultipleBucketsOneArenaNonOverlappingAggregates(Map<Integer,
400400
System.out.println(strategy.getMaxOverlapsMap());
401401
}
402402

403+
@Test
404+
public void testGetLevelsWithCompactingSSTables()
405+
{
406+
final int m = 2; // minimal sorted run size in MB m
407+
long minimalSizeBytes = m << 20;
408+
409+
Controller controller = Mockito.mock(Controller.class);
410+
when(controller.getMinSstableSizeBytes()).thenReturn(minimalSizeBytes);
411+
when(controller.getNumShards(anyDouble())).thenReturn(1);
412+
when(controller.getBaseSstableSize(anyInt())).thenReturn((double) minimalSizeBytes);
413+
when(controller.maxConcurrentCompactions()).thenReturn(1000); // let it generate as many candidates as it can
414+
when(controller.maxThroughput()).thenReturn(Double.MAX_VALUE);
415+
when(controller.maxSSTablesToCompact()).thenReturn(1000);
416+
when(controller.prioritize(anyList())).thenAnswer(answ -> answ.getArgument(0));
417+
when(controller.getReservedThreads()).thenReturn(Integer.MAX_VALUE);
418+
when(controller.getReservationsType()).thenReturn(Reservations.Type.PER_LEVEL);
419+
when(controller.parallelizeOutputShards()).thenReturn(true);
420+
when(controller.getScalingParameter(anyInt())).thenReturn(2);
421+
when(controller.getFanout(anyInt())).thenCallRealMethod();
422+
when(controller.getThreshold(anyInt())).thenCallRealMethod();
423+
when(controller.getMaxLevelDensity(anyInt(), anyDouble())).thenCallRealMethod();
424+
when(controller.getSurvivalFactor(anyInt())).thenReturn(1.0);
425+
when(controller.random()).thenCallRealMethod();
426+
427+
UnifiedCompactionStrategy strategy = new UnifiedCompactionStrategy(strategyFactory, controller);
428+
429+
int sstableCount = 4;
430+
List<SSTableReader> sstables = mockSSTables(sstableCount, 100, 0, System.currentTimeMillis(), 0, true, null);
431+
dataTracker.addInitialSSTables(sstables);
432+
433+
// all sstables are non-compacting
434+
assertEquals(sstableCount, strategy.getLevels().values().stream().flatMap(Collection::stream).mapToInt(l -> l.sstables.size()).sum());
435+
436+
// mark one sstable as compacting
437+
try (LifecycleTransaction txn = dataTracker.tryModify(List.of(sstables.get(0)), OperationType.COMPACTION))
438+
{
439+
assertEquals(sstableCount - 1, strategy.getLevels().values().stream().flatMap(Collection::stream).mapToInt(l -> l.sstables.size()).sum());
440+
441+
txn.obsoleteOriginals();
442+
txn.abort(); // unmark compacting
443+
}
444+
445+
// all sstables are non-compacting
446+
assertEquals(sstableCount, strategy.getLevels().values().stream().flatMap(Collection::stream).mapToInt(l -> l.sstables.size()).sum());
447+
}
448+
403449
private BufferDecoratedKey key(long token)
404450
{
405451
return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(token), ByteBuffer.allocate(0));

0 commit comments

Comments
 (0)