Skip to content

Commit 55f7fa6

Browse files
jbachorikclaude
andcommitted
Implement 64-bit trace ID system with double-buffered storage and liveness tracking
This commit represents a major architectural overhaul of the call trace storage system, addressing trace ID stability, collision prevention, and implementing sophisticated liveness tracking capabilities. ## Key Changes ### 64-bit Trace ID System - Upgrade all trace ID references from u32 to u64 throughout codebase - Fix trace ID truncation bug in recordJVMTISample() return type - Implement instance-based ID generation (upper 32 bits = instance ID, lower 32 bits = slot) - Maintain JFR constant pool compatibility with 64-bit IDs - Virtually eliminate trace ID collisions with massive 64-bit address space ### Double-Buffered Call Trace Storage - Extract hash table logic into dedicated CallTraceHashTable class - Implement active/standby storage pattern for lock-free swapping - Add CallTraceStorage facade for coordination and liveness integration - Enable selective trace preservation during storage transitions ### Liveness Tracking Integration - Register callback-based liveness checkers in call trace storage - Implement selective trace preservation during storage swaps - Automatic cleanup of dead traces to optimize memory usage - Pre-allocated containers to avoid malloc() in hot paths ### Thread Safety and Performance - Enhance synchronization with shared/exclusive locking patterns - Improve concurrency by allowing multiple concurrent put() operations - Optimize memory allocation patterns to reduce fragmentation - Add atomic operations for thread-safe hash table operations ### Testing Infrastructure - Add comprehensive C++ unit tests (9 tests for CallTraceStorage, 1 for LivenessTracker) - Include thread safety and race condition tests - Add Java integration tests for end-to-end liveness tracking validation - Test trace ID preservation across storage swaps ## Files Added - ddprof-lib/src/main/cpp/callTraceHashTable.{h,cpp}: New dedicated hash table implementation - ddprof-lib/src/test/cpp/test_callTraceStorage.cpp: Comprehensive C++ unit tests - ddprof-lib/src/test/cpp/test_livenessTracker.cpp: Liveness tracker regression tests - ddprof-test/src/test/java/.../LivenessTrackingTest.java: Java integration tests ## Files Modified - Core profiler files: profiler.{h,cpp}, objectSampler.cpp, wallClock.{h,cpp} - Storage system: callTraceStorage.{h,cpp} (major refactoring) - JFR integration: flightRecorder.{h,cpp} - Liveness tracking: livenessTracker.{h,cpp} - Thread utilities: thread.h ## Validation - All C++ unit tests pass (35 total tests) - Build verification successful (debug and release) - Code formatting applied - Trace ID preservation validated: "Original trace ID: 42949686479, Preserved trace ID: 42949686479" This change provides significant improvements in trace stability, memory efficiency, and sets the foundation for advanced liveness tracking features while maintaining full backward compatibility. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent 9b2ac93 commit 55f7fa6

18 files changed

+1532
-447
lines changed
Lines changed: 365 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,365 @@
1+
/*
2+
* Copyright 2025, Datadog, Inc.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#include "callTraceHashTable.h"
7+
#include "counters.h"
8+
#include "os.h"
9+
#include <string.h>
10+
11+
#define COMMA ,
12+
13+
static const u32 INITIAL_CAPACITY = 65536;
14+
static const u32 CALL_TRACE_CHUNK = 8 * 1024 * 1024;
15+
static const u64 OVERFLOW_TRACE_ID = 0x7fffffffffffffffULL; // Max 64-bit signed value
16+
17+
// Define the sentinel value for CallTraceSample
18+
CallTrace* const CallTraceSample::PREPARING = reinterpret_cast<CallTrace*>(1);
19+
20+
class LongHashTable {
21+
private:
22+
LongHashTable *_prev;
23+
void *_padding0;
24+
u32 _capacity;
25+
u32 _padding1[15];
26+
volatile u32 _size;
27+
u32 _padding2[15];
28+
29+
static size_t getSize(u32 capacity) {
30+
size_t size = sizeof(LongHashTable) +
31+
(sizeof(u64) + sizeof(CallTraceSample)) * capacity;
32+
return (size + OS::page_mask) & ~OS::page_mask;
33+
}
34+
35+
public:
36+
LongHashTable() : _prev(NULL), _padding0(NULL), _capacity(0), _size(0) {
37+
memset(_padding1, 0, sizeof(_padding1));
38+
memset(_padding2, 0, sizeof(_padding2));
39+
}
40+
41+
static LongHashTable *allocate(LongHashTable *prev, u32 capacity) {
42+
LongHashTable *table = (LongHashTable *)OS::safeAlloc(getSize(capacity));
43+
if (table != NULL) {
44+
table->_prev = prev;
45+
table->_capacity = capacity;
46+
// The reset is not useful with the anon mmap setting the memory is
47+
// zeroed. However this silences a false positive and should not have a
48+
// performance impact.
49+
table->clear();
50+
}
51+
return table;
52+
}
53+
54+
LongHashTable *destroy() {
55+
LongHashTable *prev = _prev;
56+
OS::safeFree(this, getSize(_capacity));
57+
return prev;
58+
}
59+
60+
LongHashTable *prev() { return _prev; }
61+
62+
u32 capacity() { return _capacity; }
63+
64+
u32 size() { return _size; }
65+
66+
u32 incSize() { return __sync_add_and_fetch(&_size, 1); }
67+
68+
u64 *keys() { return (u64 *)(this + 1); }
69+
70+
CallTraceSample *values() { return (CallTraceSample *)(keys() + _capacity); }
71+
72+
void clear() {
73+
memset(keys(), 0, (sizeof(u64) + sizeof(CallTraceSample)) * _capacity);
74+
_size = 0;
75+
}
76+
};
77+
78+
CallTrace CallTraceHashTable::_overflow_trace = {false, 1, OVERFLOW_TRACE_ID, {BCI_ERROR, LP64_ONLY(0 COMMA) (jmethodID)"storage_overflow"}};
79+
80+
CallTraceHashTable::CallTraceHashTable() : _allocator(CALL_TRACE_CHUNK) {
81+
_instance_id = 0; // Will be set externally via setInstanceId()
82+
_current_table = LongHashTable::allocate(NULL, INITIAL_CAPACITY);
83+
_overflow = 0;
84+
}
85+
86+
CallTraceHashTable::~CallTraceHashTable() {
87+
while (_current_table != NULL) {
88+
_current_table = _current_table->destroy();
89+
}
90+
}
91+
92+
void CallTraceHashTable::clear() {
93+
if (_current_table != NULL) {
94+
while (_current_table->prev() != NULL) {
95+
_current_table = _current_table->destroy();
96+
}
97+
_current_table->clear();
98+
}
99+
_allocator.clear();
100+
_overflow = 0;
101+
}
102+
103+
// Adaptation of MurmurHash64A by Austin Appleby
104+
u64 CallTraceHashTable::calcHash(int num_frames, ASGCT_CallFrame *frames,
105+
bool truncated) {
106+
const u64 M = 0xc6a4a7935bd1e995ULL;
107+
const int R = 47;
108+
109+
int len = num_frames * sizeof(ASGCT_CallFrame);
110+
u64 h = len * M * (truncated ? 1 : 2);
111+
112+
const u64 *data = (const u64 *)frames;
113+
const u64 *end = data + len / sizeof(u64);
114+
115+
while (data != end) {
116+
u64 k = *data++;
117+
k *= M;
118+
k ^= k >> R;
119+
k *= M;
120+
h ^= k;
121+
h *= M;
122+
}
123+
124+
if (len & 4) {
125+
h ^= *(u32 *)data;
126+
h *= M;
127+
}
128+
129+
h ^= h >> R;
130+
h *= M;
131+
h ^= h >> R;
132+
133+
return h;
134+
}
135+
136+
CallTrace *CallTraceHashTable::storeCallTrace(int num_frames,
137+
ASGCT_CallFrame *frames,
138+
bool truncated, u64 trace_id) {
139+
const size_t header_size = sizeof(CallTrace) - sizeof(ASGCT_CallFrame);
140+
const size_t total_size = header_size + num_frames * sizeof(ASGCT_CallFrame);
141+
CallTrace *buf = (CallTrace *)_allocator.alloc(total_size);
142+
if (buf != NULL) {
143+
buf->num_frames = num_frames;
144+
// Do not use memcpy inside signal handler
145+
for (int i = 0; i < num_frames; i++) {
146+
buf->frames[i] = frames[i];
147+
}
148+
buf->truncated = truncated;
149+
buf->trace_id = trace_id;
150+
}
151+
return buf;
152+
}
153+
154+
CallTrace *CallTraceHashTable::findCallTrace(LongHashTable *table, u64 hash) {
155+
u64 *keys = table->keys();
156+
u32 capacity = table->capacity();
157+
u32 slot = hash & (capacity - 1);
158+
u32 step = 0;
159+
160+
while (keys[slot] != hash) {
161+
if (keys[slot] == 0) {
162+
return NULL;
163+
}
164+
if (++step >= capacity) {
165+
return NULL;
166+
}
167+
slot = (slot + step) & (capacity - 1);
168+
}
169+
170+
return table->values()[slot].trace;
171+
}
172+
173+
u64 CallTraceHashTable::put(int num_frames, ASGCT_CallFrame *frames,
174+
bool truncated, u64 weight) {
175+
// Synchronization is now handled at CallTraceStorage facade level
176+
177+
u64 hash = calcHash(num_frames, frames, truncated);
178+
179+
LongHashTable *table = _current_table;
180+
if (table == NULL) {
181+
// Table allocation failed or was cleared - drop sample
182+
return 0;
183+
}
184+
185+
u64 *keys = table->keys();
186+
u32 capacity = table->capacity();
187+
u32 slot = hash & (capacity - 1);
188+
u32 step = 0;
189+
while (true) {
190+
u64 key_value = __atomic_load_n(&keys[slot], __ATOMIC_RELAXED);
191+
if (key_value == hash) {
192+
// Hash matches - check if another thread is preparing this slot
193+
CallTrace* current_trace = table->values()[slot].acquireTrace();
194+
if (current_trace == CallTraceSample::PREPARING) {
195+
// Another thread is preparing this slot - wait briefly and retry
196+
for (volatile int i = 0; i < 50; i++) {
197+
// Busy wait for preparation to complete
198+
}
199+
continue; // Retry the same slot
200+
} else if (current_trace != nullptr) {
201+
// Trace is ready, use it
202+
return current_trace->trace_id;
203+
} else {
204+
// Trace is NULL but hash exists - shouldn't happen, but handle gracefully
205+
return 0;
206+
}
207+
}
208+
if (key_value == 0) {
209+
u64 expected = 0;
210+
if (!__atomic_compare_exchange_n(&keys[slot], &expected, hash, false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
211+
continue; // another thread claimed it, go to next slot
212+
}
213+
214+
// Mark the slot as being prepared so other threads know to wait
215+
if (!table->values()[slot].markPreparing()) {
216+
// Failed to mark as preparing (shouldn't happen), clear key and retry
217+
__atomic_store_n(&keys[slot], 0, __ATOMIC_RELEASE);
218+
continue;
219+
}
220+
221+
// Increment the table size, and if the load factor exceeds 0.75, reserve
222+
// a new table
223+
u32 current_size = table->incSize();
224+
if (current_size == capacity * 3 / 4) {
225+
LongHashTable *new_table = LongHashTable::allocate(table, capacity * 2);
226+
if (new_table != NULL) {
227+
// Use atomic CAS to safely update _current_table
228+
__atomic_compare_exchange_n(&_current_table, &table, new_table, false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
229+
}
230+
}
231+
232+
// Migrate from a previous table to save space
233+
CallTrace *trace =
234+
table->prev() == NULL ? NULL : findCallTrace(table->prev(), hash);
235+
if (trace == NULL) {
236+
// Generate unique trace ID: upper 32 bits = instance_id, lower 32 bits = slot
237+
// 64-bit provides massive collision space and JFR constant pool compatibility
238+
u64 trace_id = (_instance_id << 32) | slot;
239+
trace = storeCallTrace(num_frames, frames, truncated, trace_id);
240+
if (trace == NULL) {
241+
// Allocation failure - clear the key we claimed and reset trace to NULL
242+
__atomic_store_n(&keys[slot], 0, __ATOMIC_RELEASE);
243+
table->values()[slot].setTrace(nullptr);
244+
return 0;
245+
}
246+
}
247+
// Note: For migrated traces, we preserve their original trace_id from when they were first created
248+
// Set the actual trace (this changes state from PREPARING to ready)
249+
table->values()[slot].setTrace(trace);
250+
251+
// clear the slot in the prev table such it is not written out to constant
252+
// pool multiple times
253+
LongHashTable *prev_table = table->prev();
254+
if (prev_table != NULL) {
255+
__atomic_store_n(&prev_table->keys()[slot], 0, __ATOMIC_RELEASE);
256+
}
257+
258+
// Return immediately since we just created/set up this trace
259+
return trace->trace_id;
260+
}
261+
262+
if (++step >= capacity) {
263+
// Very unlikely case of a table overflow
264+
atomicInc(_overflow);
265+
return OVERFLOW_TRACE_ID;
266+
}
267+
// Improved version of linear probing
268+
slot = (slot + step) & (capacity - 1);
269+
}
270+
}
271+
272+
void CallTraceHashTable::collectAndCopySelective(std::unordered_set<CallTrace *> &traces,
273+
const std::unordered_set<u64> &trace_ids_to_preserve,
274+
CallTraceHashTable* target) {
275+
for (LongHashTable *table = _current_table; table != NULL; table = table->prev()) {
276+
u64 *keys = table->keys();
277+
CallTraceSample *values = table->values();
278+
u32 capacity = table->capacity();
279+
for (u32 slot = 0; slot < capacity; slot++) {
280+
if (keys[slot] != 0) {
281+
CallTrace *trace = values[slot].acquireTrace();
282+
if (trace != NULL) {
283+
// Always collect for JFR output - trace contains its own ID
284+
traces.insert(trace);
285+
286+
// Copy to target if this trace should be preserved, preserving the original trace ID
287+
if (trace_ids_to_preserve.find(trace->trace_id) != trace_ids_to_preserve.end()) {
288+
target->putWithExistingId(trace, 1);
289+
}
290+
}
291+
}
292+
}
293+
}
294+
295+
// Handle overflow trace
296+
if (_overflow > 0) {
297+
traces.insert(&_overflow_trace);
298+
if (trace_ids_to_preserve.find(OVERFLOW_TRACE_ID) != trace_ids_to_preserve.end()) {
299+
// Copy overflow trace to target - it's a static trace so just increment overflow counter
300+
atomicInc(target->_overflow);
301+
}
302+
}
303+
}
304+
305+
void CallTraceHashTable::putWithExistingId(CallTrace* source_trace, u64 weight) {
306+
// Synchronization is now handled at CallTraceStorage facade level
307+
308+
u64 hash = calcHash(source_trace->num_frames, source_trace->frames, source_trace->truncated);
309+
310+
LongHashTable *table = _current_table;
311+
if (table == NULL) {
312+
// Table allocation failed or was cleared - drop sample
313+
return;
314+
}
315+
316+
u64 *keys = table->keys();
317+
u32 capacity = table->capacity();
318+
u32 slot = hash & (capacity - 1);
319+
320+
// Look for existing entry or empty slot
321+
while (true) {
322+
u64 key_value = __atomic_load_n(&keys[slot], __ATOMIC_RELAXED);
323+
if (key_value == hash) {
324+
// Found existing entry - just use it
325+
break;
326+
}
327+
if (key_value == 0) {
328+
// Found empty slot - claim it
329+
u64 expected = 0;
330+
if (!__atomic_compare_exchange_n(&keys[slot], &expected, hash, false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
331+
continue; // another thread claimed it, try next slot
332+
}
333+
334+
// Create a copy of the source trace preserving its exact ID
335+
const size_t header_size = sizeof(CallTrace) - sizeof(ASGCT_CallFrame);
336+
const size_t total_size = header_size + source_trace->num_frames * sizeof(ASGCT_CallFrame);
337+
CallTrace* copied_trace = (CallTrace*)_allocator.alloc(total_size);
338+
if (copied_trace != NULL) {
339+
copied_trace->truncated = source_trace->truncated;
340+
copied_trace->num_frames = source_trace->num_frames;
341+
copied_trace->trace_id = source_trace->trace_id; // Preserve exact trace ID
342+
// Safe to use memcpy since this is not called from signal handler
343+
memcpy(copied_trace->frames, source_trace->frames, source_trace->num_frames * sizeof(ASGCT_CallFrame));
344+
table->values()[slot].setTrace(copied_trace);
345+
} else {
346+
// Allocation failure - clear the key we claimed and return
347+
__atomic_store_n(&keys[slot], 0, __ATOMIC_RELEASE);
348+
return;
349+
}
350+
351+
// Check if we need to expand the table
352+
u32 current_size = table->incSize();
353+
if (current_size == capacity * 3 / 4) {
354+
LongHashTable *new_table = LongHashTable::allocate(table, capacity * 2);
355+
if (new_table != NULL) {
356+
// Use atomic CAS to safely update _current_table
357+
__atomic_compare_exchange_n(&_current_table, &table, new_table, false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
358+
}
359+
}
360+
break;
361+
}
362+
363+
slot = (slot + 1) & (capacity - 1);
364+
}
365+
}

0 commit comments

Comments
 (0)