Skip to content

Commit 5281fdd

Browse files
Merge remote-tracking branch 'datastax/main' into revive-fused-adc
2 parents 7a37a55 + 42ae0f3 commit 5281fdd

File tree

59 files changed

+1975
-328
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1975
-328
lines changed

src/java/org/apache/cassandra/db/marshal/AbstractType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.cassandra.cql3.CQL3Type;
3838
import org.apache.cassandra.cql3.ColumnSpecification;
3939
import org.apache.cassandra.cql3.Term;
40+
import org.apache.cassandra.db.rows.Cell;
4041
import org.apache.cassandra.exceptions.InvalidColumnTypeException;
4142
import org.apache.cassandra.exceptions.SyntaxException;
4243
import org.apache.cassandra.io.util.DataInputPlus;

src/java/org/apache/cassandra/db/virtual/TableMetricTables.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,11 @@ public static Collection<VirtualTable> getAll(String name)
7171
new LatencyTableMetric(name, "local_read_latency", t -> t.readLatency.tableOrKeyspaceMetric().latency),
7272
new LatencyTableMetric(name, "local_scan_latency", t -> t.rangeLatency.tableOrKeyspaceMetric().latency),
7373
new LatencyTableMetric(name, "coordinator_read_latency", t -> t.coordinatorReadLatency.tableOrKeyspaceTimer()),
74+
new LatencyTableMetric(name, "coordinator_cas_read_latency", t -> t.coordinatorCasReadLatency.tableOrKeyspaceTimer()),
7475
new LatencyTableMetric(name, "coordinator_scan_latency", t -> t.coordinatorScanLatency.tableOrKeyspaceTimer()),
7576
new LatencyTableMetric(name, "local_write_latency", t -> t.writeLatency.tableOrKeyspaceMetric().latency),
7677
new LatencyTableMetric(name, "coordinator_write_latency", t -> t.coordinatorWriteLatency.tableOrKeyspaceTimer()),
78+
new LatencyTableMetric(name, "coordinator_cas_write_latency", t -> t.coordinatorCasWriteLatency.tableOrKeyspaceTimer()),
7779
new HistogramTableMetric(name, "tombstones_per_read", t -> t.tombstoneScannedHistogram.tableOrKeyspaceHistogram()),
7880
new HistogramTableMetric(name, "rows_per_read", t -> t.liveScannedHistogram.tableOrKeyspaceHistogram()),
7981
new StorageTableMetric(name, "disk_usage", (TableMetrics t) -> t.totalDiskSpaceUsed),

src/java/org/apache/cassandra/index/sai/IndexContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ public int getIntOption(String name, int defaultValue)
614614
}
615615
catch (NumberFormatException e)
616616
{
617-
logger.error("Failed to parse index configuration " + name + " = " + value + " as integer");
617+
logger.error("Failed to parse index configuration {} = {} as integer", name, value);
618618
return defaultValue;
619619
}
620620
}
@@ -1003,7 +1003,7 @@ public long indexFileCacheSize()
10031003
public IndexFeatureSet indexFeatureSet()
10041004
{
10051005
IndexFeatureSet.Accumulator accumulator = new IndexFeatureSet.Accumulator(version);
1006-
getView().getIndexes().stream().map(SSTableIndex::indexFeatureSet).forEach(set -> accumulator.accumulate(set));
1006+
getView().getIndexes().stream().map(SSTableIndex::indexFeatureSet).forEach(accumulator::accumulate);
10071007
return accumulator.complete();
10081008
}
10091009
}

src/java/org/apache/cassandra/index/sai/QueryContext.java

Lines changed: 116 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,31 @@ public class QueryContext
4444

4545
private final LongAdder sstablesHit = new LongAdder();
4646
private final LongAdder segmentsHit = new LongAdder();
47-
private final LongAdder partitionsRead = new LongAdder();
48-
private final LongAdder rowsPreFiltered = new LongAdder();
49-
private final LongAdder rowsFiltered = new LongAdder();
47+
48+
/**
49+
* The partition/row keys that will be used to fetch rows from the base table.
50+
* They will be either partition keys in AA, or row keys in the later row-aware disk formats.
51+
*/
52+
private final LongAdder keysFetched = new LongAdder();
53+
54+
/** The number of live partitions fetched from the storage engine, before post-filtering. */
55+
private final LongAdder partitionsFetched = new LongAdder();
56+
57+
/** The number of live partitions returned to the coordinator, after post-filtering. */
58+
private final LongAdder partitionsReturned = new LongAdder();
59+
60+
/** The number of deleted partitions that are fetched. */
61+
private final LongAdder partitionTombstonesFetched = new LongAdder();
62+
63+
/** The number of live rows fetched from the storage engine, before post-filtering. */
64+
private final LongAdder rowsFetched = new LongAdder();
65+
66+
/** The number of live rows returned to the coordinator, after post-filtering. */
67+
private final LongAdder rowsReturned = new LongAdder();
68+
69+
/** The number of deleted individual rows or ranges of rows that are fetched. */
70+
private final LongAdder rowTombstonesFetched = new LongAdder();
71+
5072
private final LongAdder trieSegmentsHit = new LongAdder();
5173

5274
private final LongAdder bkdPostingListsHit = new LongAdder();
@@ -64,8 +86,6 @@ public class QueryContext
6486

6587
private float annRerankFloor = 0.0f; // only called from single-threaded setup code
6688

67-
private final LongAdder shadowedPrimaryKeyCount = new LongAdder();
68-
6989
// Determines the order of using indexes for filtering and sorting.
7090
// Null means the query execution order hasn't been decided yet.
7191
private FilterSortOrder filterSortOrder = null;
@@ -92,49 +112,81 @@ public void addSstablesHit(long val)
92112
{
93113
sstablesHit.add(val);
94114
}
115+
95116
public void addSegmentsHit(long val) {
96117
segmentsHit.add(val);
97118
}
98-
public void addPartitionsRead(long val)
119+
120+
public void addKeysFetched(long val)
121+
{
122+
keysFetched.add(val);
123+
}
124+
125+
public void addPartitionsFetched(long val)
126+
{
127+
partitionsFetched.add(val);
128+
}
129+
130+
public void addPartitionsReturned(long val)
99131
{
100-
partitionsRead.add(val);
132+
partitionsReturned.add(val);
101133
}
102-
public void addRowsFiltered(long val)
134+
135+
public void addPartitionTombstonesFetched(long val)
103136
{
104-
rowsFiltered.add(val);
137+
partitionTombstonesFetched.add(val);
105138
}
106-
public void addRowsPreFiltered(long val)
139+
140+
public void addRowsFetched(long val)
107141
{
108-
rowsPreFiltered.add(val);
142+
rowsFetched.add(val);
109143
}
144+
145+
public void addRowsReturned(long val)
146+
{
147+
rowsReturned.add(val);
148+
}
149+
150+
public void addRowTombstonesFetched(long val)
151+
{
152+
rowTombstonesFetched.add(val);
153+
}
154+
110155
public void addTrieSegmentsHit(long val)
111156
{
112157
trieSegmentsHit.add(val);
113158
}
159+
114160
public void addBkdPostingListsHit(long val)
115161
{
116162
bkdPostingListsHit.add(val);
117163
}
164+
118165
public void addBkdSegmentsHit(long val)
119166
{
120167
bkdSegmentsHit.add(val);
121168
}
169+
122170
public void addBkdPostingsSkips(long val)
123171
{
124172
bkdPostingsSkips.add(val);
125173
}
174+
126175
public void addBkdPostingsDecodes(long val)
127176
{
128177
bkdPostingsDecodes.add(val);
129178
}
179+
130180
public void addTriePostingsSkips(long val)
131181
{
132182
triePostingsSkips.add(val);
133183
}
184+
134185
public void addTriePostingsDecodes(long val)
135186
{
136187
triePostingsDecodes.add(val);
137188
}
189+
138190
public void addQueryTimeouts(long val)
139191
{
140192
queryTimeouts.add(val);
@@ -156,53 +208,86 @@ public long sstablesHit()
156208
{
157209
return sstablesHit.longValue();
158210
}
211+
159212
public long segmentsHit() {
160213
return segmentsHit.longValue();
161214
}
162-
public long partitionsRead()
215+
216+
public long keysFetched()
163217
{
164-
return partitionsRead.longValue();
218+
return keysFetched.longValue();
165219
}
166-
public long rowsFiltered()
220+
221+
public long partitionsFetched()
167222
{
168-
return rowsFiltered.longValue();
223+
return partitionsFetched.longValue();
169224
}
170-
public long rowsPreFiltered()
225+
226+
public long partitionsReturned()
227+
{
228+
return partitionsReturned.longValue();
229+
}
230+
231+
public long partitionTombstonesFetched()
232+
{
233+
return partitionTombstonesFetched.longValue();
234+
}
235+
236+
public long rowsFetched()
237+
{
238+
return rowsFetched.longValue();
239+
}
240+
241+
public long rowsReturned()
242+
{
243+
return rowsReturned.longValue();
244+
}
245+
246+
public long rowTombstonesFetched()
171247
{
172-
return rowsPreFiltered.longValue();
248+
return rowTombstonesFetched.longValue();
173249
}
250+
174251
public long trieSegmentsHit()
175252
{
176253
return trieSegmentsHit.longValue();
177254
}
255+
178256
public long bkdPostingListsHit()
179257
{
180258
return bkdPostingListsHit.longValue();
181259
}
260+
182261
public long bkdSegmentsHit()
183262
{
184263
return bkdSegmentsHit.longValue();
185264
}
265+
186266
public long bkdPostingsSkips()
187267
{
188268
return bkdPostingsSkips.longValue();
189269
}
270+
190271
public long bkdPostingsDecodes()
191272
{
192273
return bkdPostingsDecodes.longValue();
193274
}
275+
194276
public long triePostingsSkips()
195277
{
196278
return triePostingsSkips.longValue();
197279
}
280+
198281
public long triePostingsDecodes()
199282
{
200283
return triePostingsDecodes.longValue();
201284
}
285+
202286
public long queryTimeouts()
203287
{
204288
return queryTimeouts.longValue();
205289
}
290+
206291
public long annGraphSearchLatency()
207292
{
208293
return annGraphSearchLatency.longValue();
@@ -222,19 +307,6 @@ public void checkpoint()
222307
}
223308
}
224309

225-
public void addShadowed(long count)
226-
{
227-
shadowedPrimaryKeyCount.add(count);
228-
}
229-
230-
/**
231-
* @return shadowed primary keys, in ascending order
232-
*/
233-
public long getShadowedPrimaryKeyCount()
234-
{
235-
return shadowedPrimaryKeyCount.longValue();
236-
}
237-
238310
public float getAnnRerankFloor()
239311
{
240312
return annRerankFloor;
@@ -277,9 +349,13 @@ public static class Snapshot
277349
public final long totalQueryTimeNs;
278350
public final long sstablesHit;
279351
public final long segmentsHit;
280-
public final long partitionsRead;
281-
public final long rowsFiltered;
282-
public final long rowsPreFiltered;
352+
public final long keysFetched;
353+
public final long partitionsFetched;
354+
public final long partitionsReturned;
355+
public final long partitionTombstonesFetched;
356+
public final long rowsFetched;
357+
public final long rowsReturned;
358+
public final long rowTombstonesFetched;
283359
public final long trieSegmentsHit;
284360
public final long bkdPostingListsHit;
285361
public final long bkdSegmentsHit;
@@ -289,7 +365,6 @@ public static class Snapshot
289365
public final long triePostingsDecodes;
290366
public final long queryTimeouts;
291367
public final long annGraphSearchLatency;
292-
public final long shadowedPrimaryKeyCount;
293368
public final FilterSortOrder filterSortOrder;
294369

295370
/**
@@ -302,9 +377,13 @@ private Snapshot(QueryContext context)
302377
totalQueryTimeNs = context.totalQueryTimeNs();
303378
sstablesHit = context.sstablesHit();
304379
segmentsHit = context.segmentsHit();
305-
partitionsRead = context.partitionsRead();
306-
rowsFiltered = context.rowsFiltered();
307-
rowsPreFiltered = context.rowsPreFiltered();
380+
keysFetched = context.keysFetched();
381+
partitionsFetched = context.partitionsFetched();
382+
partitionsReturned = context.partitionsReturned();
383+
partitionTombstonesFetched = context.partitionTombstonesFetched();
384+
rowsFetched = context.rowsFetched();
385+
rowsReturned = context.rowsReturned();
386+
rowTombstonesFetched = context.rowTombstonesFetched();
308387
trieSegmentsHit = context.trieSegmentsHit();
309388
bkdPostingListsHit = context.bkdPostingListsHit();
310389
bkdSegmentsHit = context.bkdSegmentsHit();
@@ -314,7 +393,6 @@ private Snapshot(QueryContext context)
314393
triePostingsDecodes = context.triePostingsDecodes();
315394
queryTimeouts = context.queryTimeouts();
316395
annGraphSearchLatency = context.annGraphSearchLatency();
317-
shadowedPrimaryKeyCount = context.getShadowedPrimaryKeyCount();
318396
filterSortOrder = context.filterSortOrder();
319397
}
320398
}

src/java/org/apache/cassandra/index/sai/SSTableContext.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ private SSTableContext(SSTableContext copy)
6565
this.primaryKeyMapFactory = copy.primaryKeyMapFactory;
6666
}
6767

68-
@SuppressWarnings("resource")
6968
public static SSTableContext create(SSTableReader sstable, IndexComponents.ForRead perSSTableComponents)
7069
{
7170
var onDiskFormat = perSSTableComponents.onDiskFormat();

src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ public void rangeTombstone(RangeTombstone tombstone)
246246

247247
private void forEach(Consumer<Index.Indexer> action)
248248
{
249-
indexers.forEach(action::accept);
249+
indexers.forEach(action);
250250
}
251251
};
252252
}
@@ -336,7 +336,7 @@ public void handleNotification(INotification notification, Object sender)
336336

337337
// Avoid validation for index files just written following Memtable flush. ZCS streaming should
338338
// validate index checksum.
339-
boolean validate = notice.fromStreaming() || !notice.memtable().isPresent();
339+
boolean validate = notice.fromStreaming() || notice.memtable().isEmpty();
340340
onSSTableChanged(Collections.emptySet(), notice.added, indices, validate);
341341
}
342342
else if (notification instanceof SSTableListChangedNotification)
@@ -436,7 +436,7 @@ public int totalIndexBuildsInProgress()
436436
*/
437437
public int totalQueryableIndexCount()
438438
{
439-
return (int) indices.stream().filter(i -> baseCfs.indexManager.isIndexQueryable(i)).count();
439+
return (int) indices.stream().filter(baseCfs.indexManager::isIndexQueryable).count();
440440
}
441441

442442
/**
@@ -513,7 +513,7 @@ public void unsafeReload()
513513
public void reset()
514514
{
515515
contextManager.clear();
516-
indices.forEach(index -> index.makeIndexNonQueryable());
516+
indices.forEach(StorageAttachedIndex::makeIndexNonQueryable);
517517
onSSTableChanged(baseCfs.getLiveSSTables(), Collections.emptySet(), indices, false);
518518
}
519519
}

src/java/org/apache/cassandra/index/sai/disk/PerSSTableWriter.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,14 @@
2121

2222
import com.google.common.base.Stopwatch;
2323

24-
import org.apache.cassandra.db.DecoratedKey;
25-
import org.apache.cassandra.db.rows.Unfiltered;
2624
import org.apache.cassandra.index.sai.utils.PrimaryKey;
2725

2826
/**
2927
* Writes all SSTable-attached index token and offset structures.
3028
*/
3129
public interface PerSSTableWriter
3230
{
33-
public static final PerSSTableWriter NONE = (key) -> {};
31+
PerSSTableWriter NONE = key -> {};
3432

3533
default void startPartition(long position) throws IOException
3634
{}

0 commit comments

Comments
 (0)