|
| 1 | +/* |
| 2 | + * Copyright 2025, Datadog, Inc. |
| 3 | + * SPDX-License-Identifier: Apache-2.0 |
| 4 | + */ |
| 5 | + |
| 6 | +#include "callTraceHashTable.h" |
| 7 | +#include "counters.h" |
| 8 | +#include "os.h" |
| 9 | +#include <string.h> |
| 10 | + |
| 11 | +#define COMMA , |
| 12 | + |
| 13 | +static const u32 INITIAL_CAPACITY = 65536; |
| 14 | +static const u32 CALL_TRACE_CHUNK = 8 * 1024 * 1024; |
| 15 | +static const u64 OVERFLOW_TRACE_ID = 0x7fffffffffffffffULL; // Max 64-bit signed value |
| 16 | + |
| 17 | +// Define the sentinel value for CallTraceSample |
| 18 | +CallTrace* const CallTraceSample::PREPARING = reinterpret_cast<CallTrace*>(1); |
| 19 | + |
| 20 | +class LongHashTable { |
| 21 | +private: |
| 22 | + LongHashTable *_prev; |
| 23 | + void *_padding0; |
| 24 | + u32 _capacity; |
| 25 | + u32 _padding1[15]; |
| 26 | + volatile u32 _size; |
| 27 | + u32 _padding2[15]; |
| 28 | + |
| 29 | + static size_t getSize(u32 capacity) { |
| 30 | + size_t size = sizeof(LongHashTable) + |
| 31 | + (sizeof(u64) + sizeof(CallTraceSample)) * capacity; |
| 32 | + return (size + OS::page_mask) & ~OS::page_mask; |
| 33 | + } |
| 34 | + |
| 35 | +public: |
| 36 | + LongHashTable() : _prev(NULL), _padding0(NULL), _capacity(0), _size(0) { |
| 37 | + memset(_padding1, 0, sizeof(_padding1)); |
| 38 | + memset(_padding2, 0, sizeof(_padding2)); |
| 39 | + } |
| 40 | + |
| 41 | + static LongHashTable *allocate(LongHashTable *prev, u32 capacity) { |
| 42 | + LongHashTable *table = (LongHashTable *)OS::safeAlloc(getSize(capacity)); |
| 43 | + if (table != NULL) { |
| 44 | + table->_prev = prev; |
| 45 | + table->_capacity = capacity; |
| 46 | + // The reset is not useful with the anon mmap setting the memory is |
| 47 | + // zeroed. However this silences a false positive and should not have a |
| 48 | + // performance impact. |
| 49 | + table->clear(); |
| 50 | + } |
| 51 | + return table; |
| 52 | + } |
| 53 | + |
| 54 | + LongHashTable *destroy() { |
| 55 | + LongHashTable *prev = _prev; |
| 56 | + OS::safeFree(this, getSize(_capacity)); |
| 57 | + return prev; |
| 58 | + } |
| 59 | + |
| 60 | + LongHashTable *prev() { return _prev; } |
| 61 | + |
| 62 | + u32 capacity() { return _capacity; } |
| 63 | + |
| 64 | + u32 size() { return _size; } |
| 65 | + |
| 66 | + u32 incSize() { return __sync_add_and_fetch(&_size, 1); } |
| 67 | + |
| 68 | + u64 *keys() { return (u64 *)(this + 1); } |
| 69 | + |
| 70 | + CallTraceSample *values() { return (CallTraceSample *)(keys() + _capacity); } |
| 71 | + |
| 72 | + void clear() { |
| 73 | + memset(keys(), 0, (sizeof(u64) + sizeof(CallTraceSample)) * _capacity); |
| 74 | + _size = 0; |
| 75 | + } |
| 76 | +}; |
| 77 | + |
| 78 | +CallTrace CallTraceHashTable::_overflow_trace = {false, 1, OVERFLOW_TRACE_ID, {BCI_ERROR, LP64_ONLY(0 COMMA) (jmethodID)"storage_overflow"}}; |
| 79 | + |
| 80 | +CallTraceHashTable::CallTraceHashTable() : _allocator(CALL_TRACE_CHUNK) { |
| 81 | + _instance_id = 0; // Will be set externally via setInstanceId() |
| 82 | + _current_table = LongHashTable::allocate(NULL, INITIAL_CAPACITY); |
| 83 | + _overflow = 0; |
| 84 | +} |
| 85 | + |
| 86 | +CallTraceHashTable::~CallTraceHashTable() { |
| 87 | + while (_current_table != NULL) { |
| 88 | + _current_table = _current_table->destroy(); |
| 89 | + } |
| 90 | +} |
| 91 | + |
| 92 | +void CallTraceHashTable::clear() { |
| 93 | + if (_current_table != NULL) { |
| 94 | + while (_current_table->prev() != NULL) { |
| 95 | + _current_table = _current_table->destroy(); |
| 96 | + } |
| 97 | + _current_table->clear(); |
| 98 | + } |
| 99 | + _allocator.clear(); |
| 100 | + _overflow = 0; |
| 101 | +} |
| 102 | + |
| 103 | +// Adaptation of MurmurHash64A by Austin Appleby |
| 104 | +u64 CallTraceHashTable::calcHash(int num_frames, ASGCT_CallFrame *frames, |
| 105 | + bool truncated) { |
| 106 | + const u64 M = 0xc6a4a7935bd1e995ULL; |
| 107 | + const int R = 47; |
| 108 | + |
| 109 | + int len = num_frames * sizeof(ASGCT_CallFrame); |
| 110 | + u64 h = len * M * (truncated ? 1 : 2); |
| 111 | + |
| 112 | + const u64 *data = (const u64 *)frames; |
| 113 | + const u64 *end = data + len / sizeof(u64); |
| 114 | + |
| 115 | + while (data != end) { |
| 116 | + u64 k = *data++; |
| 117 | + k *= M; |
| 118 | + k ^= k >> R; |
| 119 | + k *= M; |
| 120 | + h ^= k; |
| 121 | + h *= M; |
| 122 | + } |
| 123 | + |
| 124 | + if (len & 4) { |
| 125 | + h ^= *(u32 *)data; |
| 126 | + h *= M; |
| 127 | + } |
| 128 | + |
| 129 | + h ^= h >> R; |
| 130 | + h *= M; |
| 131 | + h ^= h >> R; |
| 132 | + |
| 133 | + return h; |
| 134 | +} |
| 135 | + |
| 136 | +CallTrace *CallTraceHashTable::storeCallTrace(int num_frames, |
| 137 | + ASGCT_CallFrame *frames, |
| 138 | + bool truncated, u64 trace_id) { |
| 139 | + const size_t header_size = sizeof(CallTrace) - sizeof(ASGCT_CallFrame); |
| 140 | + const size_t total_size = header_size + num_frames * sizeof(ASGCT_CallFrame); |
| 141 | + CallTrace *buf = (CallTrace *)_allocator.alloc(total_size); |
| 142 | + if (buf != NULL) { |
| 143 | + buf->num_frames = num_frames; |
| 144 | + // Do not use memcpy inside signal handler |
| 145 | + for (int i = 0; i < num_frames; i++) { |
| 146 | + buf->frames[i] = frames[i]; |
| 147 | + } |
| 148 | + buf->truncated = truncated; |
| 149 | + buf->trace_id = trace_id; |
| 150 | + Counters::increment(CALLTRACE_STORAGE_BYTES, total_size); |
| 151 | + Counters::increment(CALLTRACE_STORAGE_TRACES); |
| 152 | + } |
| 153 | + return buf; |
| 154 | +} |
| 155 | + |
| 156 | +CallTrace *CallTraceHashTable::findCallTrace(LongHashTable *table, u64 hash) { |
| 157 | + u64 *keys = table->keys(); |
| 158 | + u32 capacity = table->capacity(); |
| 159 | + u32 slot = hash & (capacity - 1); |
| 160 | + u32 step = 0; |
| 161 | + |
| 162 | + while (keys[slot] != hash) { |
| 163 | + if (keys[slot] == 0) { |
| 164 | + return NULL; |
| 165 | + } |
| 166 | + if (++step >= capacity) { |
| 167 | + return NULL; |
| 168 | + } |
| 169 | + slot = (slot + step) & (capacity - 1); |
| 170 | + } |
| 171 | + |
| 172 | + return table->values()[slot].trace; |
| 173 | +} |
| 174 | + |
| 175 | +u64 CallTraceHashTable::put(int num_frames, ASGCT_CallFrame *frames, |
| 176 | + bool truncated, u64 weight) { |
| 177 | + // Synchronization is now handled at CallTraceStorage facade level |
| 178 | + |
| 179 | + u64 hash = calcHash(num_frames, frames, truncated); |
| 180 | + |
| 181 | + LongHashTable *table = _current_table; |
| 182 | + if (table == NULL) { |
| 183 | + // Table allocation failed or was cleared - drop sample |
| 184 | + return 0; |
| 185 | + } |
| 186 | + |
| 187 | + u64 *keys = table->keys(); |
| 188 | + u32 capacity = table->capacity(); |
| 189 | + u32 slot = hash & (capacity - 1); |
| 190 | + u32 step = 0; |
| 191 | + while (true) { |
| 192 | + u64 key_value = __atomic_load_n(&keys[slot], __ATOMIC_RELAXED); |
| 193 | + if (key_value == hash) { |
| 194 | + // Hash matches - check if another thread is preparing this slot |
| 195 | + CallTrace* current_trace = table->values()[slot].acquireTrace(); |
| 196 | + if (current_trace == CallTraceSample::PREPARING) { |
| 197 | + // Another thread is preparing this slot - wait briefly and retry |
| 198 | + for (volatile int i = 0; i < 50; i++) { |
| 199 | + // Busy wait for preparation to complete |
| 200 | + } |
| 201 | + continue; // Retry the same slot |
| 202 | + } else if (current_trace != nullptr) { |
| 203 | + // Trace is ready, use it |
| 204 | + return current_trace->trace_id; |
| 205 | + } else { |
| 206 | + // Trace is NULL but hash exists - shouldn't happen, but handle gracefully |
| 207 | + return 0; |
| 208 | + } |
| 209 | + } |
| 210 | + if (key_value == 0) { |
| 211 | + u64 expected = 0; |
| 212 | + if (!__atomic_compare_exchange_n(&keys[slot], &expected, hash, false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) { |
| 213 | + continue; // another thread claimed it, go to next slot |
| 214 | + } |
| 215 | + |
| 216 | + // Mark the slot as being prepared so other threads know to wait |
| 217 | + if (!table->values()[slot].markPreparing()) { |
| 218 | + // Failed to mark as preparing (shouldn't happen), clear key and retry |
| 219 | + __atomic_store_n(&keys[slot], 0, __ATOMIC_RELEASE); |
| 220 | + continue; |
| 221 | + } |
| 222 | + |
| 223 | + // Increment the table size, and if the load factor exceeds 0.75, reserve |
| 224 | + // a new table |
| 225 | + u32 current_size = table->incSize(); |
| 226 | + if (current_size == capacity * 3 / 4) { |
| 227 | + LongHashTable *new_table = LongHashTable::allocate(table, capacity * 2); |
| 228 | + if (new_table != NULL) { |
| 229 | + // Use atomic CAS to safely update _current_table |
| 230 | + __atomic_compare_exchange_n(&_current_table, &table, new_table, false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED); |
| 231 | + } |
| 232 | + } |
| 233 | + |
| 234 | + // Migrate from a previous table to save space |
| 235 | + CallTrace *trace = |
| 236 | + table->prev() == NULL ? NULL : findCallTrace(table->prev(), hash); |
| 237 | + if (trace == NULL) { |
| 238 | + // Generate unique trace ID: upper 32 bits = instance_id, lower 32 bits = slot |
| 239 | + // 64-bit provides massive collision space and JFR constant pool compatibility |
| 240 | + u64 trace_id = (_instance_id << 32) | slot; |
| 241 | + trace = storeCallTrace(num_frames, frames, truncated, trace_id); |
| 242 | + if (trace == NULL) { |
| 243 | + // Allocation failure - clear the key we claimed and reset trace to NULL |
| 244 | + __atomic_store_n(&keys[slot], 0, __ATOMIC_RELEASE); |
| 245 | + table->values()[slot].setTrace(nullptr); |
| 246 | + return 0; |
| 247 | + } |
| 248 | + } |
| 249 | + // Note: For migrated traces, we preserve their original trace_id from when they were first created |
| 250 | + // Set the actual trace (this changes state from PREPARING to ready) |
| 251 | + table->values()[slot].setTrace(trace); |
| 252 | + |
| 253 | + // clear the slot in the prev table such it is not written out to constant |
| 254 | + // pool multiple times |
| 255 | + LongHashTable *prev_table = table->prev(); |
| 256 | + if (prev_table != NULL) { |
| 257 | + __atomic_store_n(&prev_table->keys()[slot], 0, __ATOMIC_RELEASE); |
| 258 | + } |
| 259 | + |
| 260 | + // Return immediately since we just created/set up this trace |
| 261 | + return trace->trace_id; |
| 262 | + } |
| 263 | + |
| 264 | + if (++step >= capacity) { |
| 265 | + // Very unlikely case of a table overflow |
| 266 | + atomicInc(_overflow); |
| 267 | + return OVERFLOW_TRACE_ID; |
| 268 | + } |
| 269 | + // Improved version of linear probing |
| 270 | + slot = (slot + step) & (capacity - 1); |
| 271 | + } |
| 272 | +} |
| 273 | + |
| 274 | +void CallTraceHashTable::collectAndCopySelective(std::unordered_set<CallTrace *> &traces, |
| 275 | + const std::unordered_set<u64> &trace_ids_to_preserve, |
| 276 | + CallTraceHashTable* target) { |
| 277 | + for (LongHashTable *table = _current_table; table != NULL; table = table->prev()) { |
| 278 | + u64 *keys = table->keys(); |
| 279 | + CallTraceSample *values = table->values(); |
| 280 | + u32 capacity = table->capacity(); |
| 281 | + for (u32 slot = 0; slot < capacity; slot++) { |
| 282 | + if (keys[slot] != 0) { |
| 283 | + CallTrace *trace = values[slot].acquireTrace(); |
| 284 | + if (trace != NULL) { |
| 285 | + // Always collect for JFR output - trace contains its own ID |
| 286 | + traces.insert(trace); |
| 287 | + |
| 288 | + // Copy to target if this trace should be preserved, preserving the original trace ID |
| 289 | + if (trace_ids_to_preserve.find(trace->trace_id) != trace_ids_to_preserve.end()) { |
| 290 | + target->putWithExistingId(trace, 1); |
| 291 | + } |
| 292 | + } |
| 293 | + } |
| 294 | + } |
| 295 | + } |
| 296 | + |
| 297 | + // Handle overflow trace |
| 298 | + if (_overflow > 0) { |
| 299 | + traces.insert(&_overflow_trace); |
| 300 | + if (trace_ids_to_preserve.find(OVERFLOW_TRACE_ID) != trace_ids_to_preserve.end()) { |
| 301 | + // Copy overflow trace to target - it's a static trace so just increment overflow counter |
| 302 | + atomicInc(target->_overflow); |
| 303 | + } |
| 304 | + } |
| 305 | +} |
| 306 | + |
| 307 | +void CallTraceHashTable::putWithExistingId(CallTrace* source_trace, u64 weight) { |
| 308 | + // Synchronization is now handled at CallTraceStorage facade level |
| 309 | + |
| 310 | + u64 hash = calcHash(source_trace->num_frames, source_trace->frames, source_trace->truncated); |
| 311 | + |
| 312 | + LongHashTable *table = _current_table; |
| 313 | + if (table == NULL) { |
| 314 | + // Table allocation failed or was cleared - drop sample |
| 315 | + return; |
| 316 | + } |
| 317 | + |
| 318 | + u64 *keys = table->keys(); |
| 319 | + u32 capacity = table->capacity(); |
| 320 | + u32 slot = hash & (capacity - 1); |
| 321 | + |
| 322 | + // Look for existing entry or empty slot |
| 323 | + while (true) { |
| 324 | + u64 key_value = __atomic_load_n(&keys[slot], __ATOMIC_RELAXED); |
| 325 | + if (key_value == hash) { |
| 326 | + // Found existing entry - just use it |
| 327 | + break; |
| 328 | + } |
| 329 | + if (key_value == 0) { |
| 330 | + // Found empty slot - claim it |
| 331 | + u64 expected = 0; |
| 332 | + if (!__atomic_compare_exchange_n(&keys[slot], &expected, hash, false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) { |
| 333 | + continue; // another thread claimed it, try next slot |
| 334 | + } |
| 335 | + |
| 336 | + // Create a copy of the source trace preserving its exact ID |
| 337 | + const size_t header_size = sizeof(CallTrace) - sizeof(ASGCT_CallFrame); |
| 338 | + const size_t total_size = header_size + source_trace->num_frames * sizeof(ASGCT_CallFrame); |
| 339 | + CallTrace* copied_trace = (CallTrace*)_allocator.alloc(total_size); |
| 340 | + if (copied_trace != NULL) { |
| 341 | + copied_trace->truncated = source_trace->truncated; |
| 342 | + copied_trace->num_frames = source_trace->num_frames; |
| 343 | + copied_trace->trace_id = source_trace->trace_id; // Preserve exact trace ID |
| 344 | + // Safe to use memcpy since this is not called from signal handler |
| 345 | + memcpy(copied_trace->frames, source_trace->frames, source_trace->num_frames * sizeof(ASGCT_CallFrame)); |
| 346 | + table->values()[slot].setTrace(copied_trace); |
| 347 | + Counters::increment(CALLTRACE_STORAGE_BYTES, total_size); |
| 348 | + Counters::increment(CALLTRACE_STORAGE_TRACES); |
| 349 | + } else { |
| 350 | + // Allocation failure - clear the key we claimed and return |
| 351 | + __atomic_store_n(&keys[slot], 0, __ATOMIC_RELEASE); |
| 352 | + return; |
| 353 | + } |
| 354 | + |
| 355 | + // Check if we need to expand the table |
| 356 | + u32 current_size = table->incSize(); |
| 357 | + if (current_size == capacity * 3 / 4) { |
| 358 | + LongHashTable *new_table = LongHashTable::allocate(table, capacity * 2); |
| 359 | + if (new_table != NULL) { |
| 360 | + // Use atomic CAS to safely update _current_table |
| 361 | + __atomic_compare_exchange_n(&_current_table, &table, new_table, false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED); |
| 362 | + } |
| 363 | + } |
| 364 | + break; |
| 365 | + } |
| 366 | + |
| 367 | + slot = (slot + 1) & (capacity - 1); |
| 368 | + } |
| 369 | +} |
0 commit comments