Skip to content

Commit d9f7fe9

Browse files
authored
CNDB-14359: Add tracking of dropped mutations by table (#1790)
### What is the issue Fixes #14359
1 parent 9de81c5 commit d9f7fe9

File tree

5 files changed

+225
-0
lines changed

5 files changed

+225
-0
lines changed

src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,12 @@ public DroppedMessageMetrics(Verb verb)
8888
crossNodeDroppedLatency = Metrics.timer(createMetricName(TYPE, "CrossNodeDroppedLatency", scope));
8989
}
9090
}
91+
92+
public DroppedMessageMetrics(String type, String scope)
93+
{
94+
MetricNameFactory factory = new DefaultNameFactory(type, scope);
95+
dropped = Metrics.meter(factory.createMetricName("Dropped"));
96+
internalDroppedLatency = Metrics.timer(factory.createMetricName("InternalDroppedLatency"));
97+
crossNodeDroppedLatency = Metrics.timer(factory.createMetricName("CrossNodeDroppedLatency"));
98+
}
9199
}

src/java/org/apache/cassandra/metrics/MessagingMetrics.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ private static final class DroppedForVerb
9898

9999
// total dropped message counts for server lifetime
100100
private final Map<Verb, DroppedForVerb> droppedMessages = new ConcurrentHashMap<>();
101+
102+
// dropped mutations by table
103+
private final Map<String, DroppedMessageMetrics> droppedMutationsByTable = new ConcurrentHashMap<>();
101104

102105
public MessagingMetrics()
103106
{
@@ -160,6 +163,24 @@ public void recordTotalMessageProcessingTime(Verb verb, InetAddressAndPort from,
160163
public void recordDroppedMessage(Message<?> message, long timeElapsed, TimeUnit timeUnit)
161164
{
162165
recordDroppedMessage(message.verb(), timeElapsed, timeUnit, message.isCrossNode());
166+
167+
if (message.verb() == Verb.MUTATION_REQ && message.payload instanceof org.apache.cassandra.db.Mutation)
168+
{
169+
org.apache.cassandra.db.Mutation mutation = (org.apache.cassandra.db.Mutation) message.payload;
170+
for (org.apache.cassandra.db.partitions.PartitionUpdate update : mutation.getPartitionUpdates())
171+
{
172+
String tableKey = update.metadata().keyspace + '.' + update.metadata().name;
173+
DroppedMessageMetrics tableMetrics = droppedMutationsByTable.get(tableKey);
174+
if (tableMetrics == null)
175+
tableMetrics = droppedMutationsByTable.computeIfAbsent(tableKey,
176+
k -> new DroppedMessageMetrics("DroppedMutations", k));
177+
tableMetrics.dropped.mark();
178+
if (message.isCrossNode())
179+
tableMetrics.crossNodeDroppedLatency.update(timeElapsed, timeUnit);
180+
else
181+
tableMetrics.internalDroppedLatency.update(timeElapsed, timeUnit);
182+
}
183+
}
163184
}
164185

165186
public void recordDroppedMessage(Verb verb, long timeElapsed, TimeUnit timeUnit, boolean isCrossNode)
@@ -200,6 +221,14 @@ public Map<String, Integer> getDroppedMessages()
200221
map.put(entry.getKey().toString(), (int) entry.getValue().metrics.dropped.getCount());
201222
return map;
202223
}
224+
225+
public Map<String, Long> getDroppedMutationsByTable()
226+
{
227+
Map<String, Long> map = new HashMap<>(droppedMutationsByTable.size());
228+
for (Map.Entry<String, DroppedMessageMetrics> entry : droppedMutationsByTable.entrySet())
229+
map.put(entry.getKey(), entry.getValue().dropped.getCount());
230+
return map;
231+
}
203232

204233
private void logDroppedMessages()
205234
{
@@ -238,5 +267,6 @@ public int resetAndConsumeDroppedErrors(Consumer<String> messageConsumer)
238267
public void resetDroppedMessages()
239268
{
240269
droppedMessages.replaceAll((u, v) -> new DroppedForVerb(new DroppedMessageMetrics(u)));
270+
droppedMutationsByTable.clear();
241271
}
242272
}

src/java/org/apache/cassandra/net/MessagingServiceMBean.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ public interface MessagingServiceMBean
9999
* dropped message counts for server lifetime
100100
*/
101101
public Map<String, Integer> getDroppedMessages();
102+
103+
/**
104+
* dropped mutation counts by table for server lifetime
105+
*/
106+
public Map<String, Long> getDroppedMutationsByTable();
102107

103108
/**
104109
* Total number of timeouts happened on this node

src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,12 @@ public Map<String, Integer> getDroppedMessages()
222222
{
223223
return metrics.getDroppedMessages();
224224
}
225+
226+
@Override
227+
public Map<String, Long> getDroppedMutationsByTable()
228+
{
229+
return metrics.getDroppedMutationsByTable();
230+
}
225231

226232
@Override
227233
public long getTotalTimeouts()

test/unit/org/apache/cassandra/net/MessagingServiceTest.java

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,14 @@
4343
import org.apache.cassandra.config.CassandraRelevantProperties;
4444
import org.apache.cassandra.config.DatabaseDescriptor;
4545
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
46+
import org.apache.cassandra.db.Mutation;
47+
import org.apache.cassandra.db.RowUpdateBuilder;
4648
import org.apache.cassandra.db.commitlog.CommitLog;
4749
import org.apache.cassandra.exceptions.ConfigurationException;
4850
import org.apache.cassandra.locator.InetAddressAndPort;
4951
import org.apache.cassandra.metrics.MessagingMetrics;
52+
import org.apache.cassandra.schema.MockSchema;
53+
import org.apache.cassandra.schema.TableMetadata;
5054
import org.apache.cassandra.utils.FBUtilities;
5155
import org.caffinitas.ohc.histo.EstimatedHistogram;
5256

@@ -499,4 +503,176 @@ private static void sendMessages(int numOfMessages, Verb verb) throws UnknownHos
499503
MessagingService.instance().send(Message.out(verb, noPayload), address);
500504
}
501505
}
506+
507+
@Test
508+
public void testDroppedMutationsTrackedByTable()
509+
{
510+
// Reset metrics to ensure clean state
511+
messagingService.metrics.resetDroppedMessages();
512+
513+
// Create table metadata for a test table with unique name
514+
TableMetadata metadata = MockSchema.newTableMetadata("test_ks_drops", "test_table_drops");
515+
String tableKey = metadata.keyspace + "." + metadata.name;
516+
517+
// Create a mutation for the table
518+
Mutation mutation = new RowUpdateBuilder(metadata, 0, "key1")
519+
.clustering("col1")
520+
.add("value", "test_value")
521+
.build();
522+
523+
// Create a message with the mutation
524+
Message<Mutation> message = Message.builder(Verb.MUTATION_REQ, mutation).build();
525+
526+
// Record the mutation as dropped
527+
messagingService.metrics.recordDroppedMessage(message, 100, MILLISECONDS);
528+
529+
// Verify the table-specific metric was updated
530+
Map<String, Long> droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
531+
assertNotNull(droppedByTable);
532+
assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey));
533+
534+
// Drop another mutation for the same table
535+
messagingService.metrics.recordDroppedMessage(message, 200, MILLISECONDS);
536+
537+
// Verify the counter accumulated
538+
droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
539+
assertEquals(Long.valueOf(2L), droppedByTable.get(tableKey));
540+
}
541+
542+
@Test
543+
public void testMultipleTablesTrackedIndependently()
544+
{
545+
// Reset metrics to ensure clean state
546+
messagingService.metrics.resetDroppedMessages();
547+
548+
// Create metadata for multiple tables
549+
TableMetadata table1 = MockSchema.newTableMetadata("ks1", "table1");
550+
TableMetadata table2 = MockSchema.newTableMetadata("ks1", "table2");
551+
TableMetadata table3 = MockSchema.newTableMetadata("ks2", "table1");
552+
553+
String tableKey1 = table1.keyspace + "." + table1.name;
554+
String tableKey2 = table2.keyspace + "." + table2.name;
555+
String tableKey3 = table3.keyspace + "." + table3.name;
556+
557+
// Create mutations for each table
558+
Mutation mutation1 = new RowUpdateBuilder(table1, 0, "key1").clustering("col1").add("value", "val1").build();
559+
Mutation mutation2 = new RowUpdateBuilder(table2, 0, "key2").clustering("col1").add("value", "val2").build();
560+
Mutation mutation3 = new RowUpdateBuilder(table3, 0, "key3").clustering("col1").add("value", "val3").build();
561+
562+
// Drop mutations for different tables
563+
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation1).build(), 100, MILLISECONDS);
564+
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation1).build(), 100, MILLISECONDS);
565+
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation2).build(), 100, MILLISECONDS);
566+
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation3).build(), 100, MILLISECONDS);
567+
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation3).build(), 100, MILLISECONDS);
568+
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation3).build(), 100, MILLISECONDS);
569+
570+
// Verify each table tracked independently
571+
Map<String, Long> droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
572+
assertEquals(Long.valueOf(2L), droppedByTable.get(tableKey1));
573+
assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey2));
574+
assertEquals(Long.valueOf(3L), droppedByTable.get(tableKey3));
575+
}
576+
577+
@Test
578+
public void testCrossNodeVsInternalLatencyPerTable()
579+
{
580+
// Reset metrics to ensure clean state
581+
messagingService.metrics.resetDroppedMessages();
582+
583+
TableMetadata metadata = MockSchema.newTableMetadata("test_ks", "test_table");
584+
String tableKey = metadata.keyspace + "." + metadata.name;
585+
586+
Mutation mutation = new RowUpdateBuilder(metadata, 0, "key1")
587+
.clustering("col1")
588+
.add("value", "test_value")
589+
.build();
590+
591+
Message<Mutation> crossNodeMessage = Message.builder(Verb.MUTATION_REQ, mutation)
592+
.from(InetAddressAndPort.getLocalHost())
593+
.build();
594+
Message<Mutation> internalMessage = Message.builder(Verb.MUTATION_REQ, mutation).build();
595+
596+
// Record cross-node dropped mutations
597+
messagingService.metrics.recordDroppedMessage(crossNodeMessage, 100, MILLISECONDS);
598+
messagingService.metrics.recordDroppedMessage(crossNodeMessage, 150, MILLISECONDS);
599+
600+
// Record internal dropped mutations
601+
messagingService.metrics.recordDroppedMessage(internalMessage, 50, MILLISECONDS);
602+
603+
// Verify the table-specific metric was updated for both types
604+
Map<String, Long> droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
605+
assertEquals(Long.valueOf(3L), droppedByTable.get(tableKey));
606+
}
607+
608+
@Test
609+
public void testMultiplePartitionUpdatesInSingleMutation()
610+
{
611+
// Reset metrics to ensure clean state
612+
messagingService.metrics.resetDroppedMessages();
613+
614+
// Create two different tables in the same keyspace with unique names
615+
TableMetadata table1 = MockSchema.newTableMetadata("ks_multi_part", "table1_multi");
616+
TableMetadata table2 = MockSchema.newTableMetadata("ks_multi_part", "table2_multi");
617+
618+
String tableKey1 = table1.keyspace + "." + table1.name;
619+
String tableKey2 = table2.keyspace + "." + table2.name;
620+
621+
// Create mutations for each table with the same partition key
622+
Mutation mutation1 = new RowUpdateBuilder(table1, 0, "key1").clustering("col1").add("value", "val1").build();
623+
Mutation mutation2 = new RowUpdateBuilder(table2, 0, "key1").clustering("col1").add("value", "val2").build();
624+
625+
// Merge mutations into a single batch mutation (requires same keyspace and key)
626+
Mutation batchMutation = Mutation.merge(Arrays.asList(mutation1, mutation2));
627+
628+
// Drop the batch mutation
629+
Message<Mutation> message = Message.builder(Verb.MUTATION_REQ, batchMutation).build();
630+
messagingService.metrics.recordDroppedMessage(message, 100, MILLISECONDS);
631+
632+
// Verify both tables are tracked (each partition update counted)
633+
Map<String, Long> droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
634+
assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey1));
635+
assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey2));
636+
}
637+
638+
@Test
639+
public void testMBeanExposesDroppedMutationsByTable()
640+
{
641+
// Reset metrics to ensure clean state
642+
messagingService.metrics.resetDroppedMessages();
643+
644+
// Create table metadata with unique name
645+
TableMetadata metadata = MockSchema.newTableMetadata("test_ks_mbean", "test_table_mbean");
646+
String tableKey = metadata.keyspace + "." + metadata.name;
647+
648+
// Create and drop a mutation
649+
Mutation mutation = new RowUpdateBuilder(metadata, 0, "key1")
650+
.clustering("col1")
651+
.add("value", "test_value")
652+
.build();
653+
Message<Mutation> message = Message.builder(Verb.MUTATION_REQ, mutation).build();
654+
messagingService.metrics.recordDroppedMessage(message, 100, MILLISECONDS);
655+
656+
// Access via MBean interface
657+
MessagingServiceMBean mbean = new MessagingServiceMBeanImpl(true, messagingService.versions, messagingService.metrics);
658+
Map<String, Long> droppedByTable = mbean.getDroppedMutationsByTable();
659+
660+
// Verify the data is accessible through MBean
661+
assertNotNull(droppedByTable);
662+
assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey));
663+
}
664+
665+
@Test
666+
public void testEmptyDroppedMutationsByTableMap()
667+
{
668+
// Reset metrics to ensure clean state
669+
messagingService.metrics.resetDroppedMessages();
670+
671+
// Get dropped mutations by table when no mutations have been dropped
672+
Map<String, Long> droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
673+
674+
// Verify it returns an empty map (not null)
675+
assertNotNull(droppedByTable);
676+
assertEquals(0, droppedByTable.size());
677+
}
502678
}

0 commit comments

Comments
 (0)