Skip to content

Commit f9175da

Browse files
committed
WIP 3
1 parent e417204 commit f9175da

File tree

7 files changed

+639
-38
lines changed

7 files changed

+639
-38
lines changed

ddprof-lib/src/main/cpp/lockFree.h

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2025, Datadog, Inc.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#ifndef _LOCKFREE_H
7+
#define _LOCKFREE_H
8+
9+
#include <atomic>
10+
#include <cstddef>
11+
12+
/**
13+
* Lock-free atomic primitives and utilities.
14+
*
15+
* This header provides building blocks for lock-free data structures:
16+
* - PaddedAtomic: Cache-line padded atomics to prevent false sharing
17+
* - Future: Atomic counters, sequence locks, etc.
18+
*
19+
* For complete synchronization classes (SpinLock, mutexes), see spinLock.h
20+
*/
21+
22+
// Cache line size for preventing false sharing (typical for x86/ARM)
23+
// Note: This duplicates DEFAULT_CACHE_LINE_SIZE from arch_dd.h for standalone use
24+
constexpr size_t CACHE_LINE_SIZE = 64;
25+
26+
/**
27+
* Atomic value padded to its own cache line to prevent false sharing.
28+
*
29+
* Use this when you have an array of atomics that are frequently accessed
30+
* by different threads. Without padding, atomics in adjacent array elements
31+
* may share a cache line, causing false sharing that degrades performance.
32+
*
33+
* False sharing occurs when:
34+
* - Thread A modifies atomic at index 0
35+
* - Thread B modifies atomic at index 1
36+
* - Both atomics are on the same cache line
37+
* - CPU must invalidate entire cache line, forcing both threads to reload
38+
*
39+
* Example usage:
40+
* static PaddedAtomic<uint64_t> counters[128]; // Each counter on own cache line
41+
* counters[i].value.fetch_add(1, std::memory_order_relaxed);
42+
*
43+
* @tparam T The atomic value type (e.g., uint64_t, int, bool)
44+
*/
45+
template<typename T>
46+
struct alignas(CACHE_LINE_SIZE) PaddedAtomic {
47+
std::atomic<T> value;
48+
// Padding is automatic due to alignas - ensures this struct occupies full cache line
49+
};
50+
51+
#endif // _LOCKFREE_H

ddprof-lib/src/main/cpp/os_linux_dd.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#ifdef __linux__
22

33
#include "os_dd.h"
4+
#include "thread.h"
45
#include "common.h"
56
#include <signal.h>
67
#include <unistd.h>
@@ -164,7 +165,7 @@ int ddprof::OS::getThreadCount() {
164165
if (!status) {
165166
return -1;
166167
}
167-
168+
168169
char line[256];
169170
int thread_count = -1;
170171
while (fgets(line, sizeof(line), status)) {
@@ -292,7 +293,15 @@ static void* threadDirectoryWatcherLoop(void* arg) {
292293
int tid = atoi(event->name);
293294
if (tid > 0) {
294295
if (event->mask & (IN_CREATE | IN_MOVED_TO)) {
295-
if (g_on_new_thread) g_on_new_thread(tid);
296+
// Small delay (20ms) to allow JVMTI ThreadStart callback to register Java threads
297+
// This virtually eliminates the race condition between thread creation and JVMTI callback
298+
struct timespec delay = {0, 20000000}; // 20ms
299+
nanosleep(&delay, nullptr);
300+
301+
// Skip sending signal to likely Java threads
302+
if (!ProfiledThread::isLikelyJavaThread(tid) && g_on_new_thread) {
303+
g_on_new_thread(tid);
304+
}
296305
} else if (event->mask & (IN_DELETE | IN_MOVED_FROM)) {
297306
if (g_on_dead_thread) g_on_dead_thread(tid);
298307
}

ddprof-lib/src/main/cpp/thread.cpp

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#include "arch_dd.h"
2+
#include "lockFree.h"
13
#include "thread.h"
24
#include "os_dd.h"
35
#include "profiler.h"
@@ -11,6 +13,15 @@
1113
// TLS priming signal number
1214
static int g_tls_prime_signal = -1;
1315

16+
// Define ProfiledThread static members for Java thread tracking
17+
PaddedAtomic<uint64_t> ProfiledThread::_java_thread_bitset[ProfiledThread::JAVA_THREAD_BITSET_WORDS];
18+
19+
void ProfiledThread::initJavaThreadBitset() {
20+
for (size_t i = 0; i < JAVA_THREAD_BITSET_WORDS; i++) {
21+
_java_thread_bitset[i].value.store(0, std::memory_order_relaxed);
22+
}
23+
}
24+
1425
pthread_key_t ProfiledThread::_tls_key;
1526
int ProfiledThread::_buffer_size = 0;
1627
volatile int ProfiledThread::_running_buffer_pos = 0;
@@ -35,7 +46,8 @@ inline void ProfiledThread::freeKey(void *key) {
3546
// Buffer-allocated: reset and return to buffer for reuse
3647
tls_ref->releaseFromBuffer();
3748
} else {
38-
// Non-buffer (JVMTI-allocated): delete the instance
49+
// Non-buffer (JVMTI-allocated): unregister Java thread and delete the instance
50+
ProfiledThread::unregisterJavaThread(tls_ref->_tid);
3951
delete tls_ref;
4052
}
4153
}
@@ -54,6 +66,9 @@ void ProfiledThread::initCurrentThread() {
5466
int tid = OS::threadId();
5567
ProfiledThread *tls = ProfiledThread::forTid(tid);
5668
pthread_setspecific(_tls_key, (const void *)tls);
69+
70+
// Register this thread as a Java thread for TLS priming optimization
71+
ProfiledThread::registerJavaThread(tid);
5772
}
5873

5974
void ProfiledThread::initExistingThreads() {
@@ -131,6 +146,9 @@ void ProfiledThread::doInitExistingThreads() {
131146
return; // Avoid double initialization
132147
}
133148

149+
// Initialize Java thread bitset
150+
initJavaThreadBitset();
151+
134152
// Register fork handler to prevent issues in forked child processes
135153
ensureTlsForkHandlerRegistered();
136154

@@ -152,17 +170,17 @@ void ProfiledThread::doInitExistingThreads() {
152170
// Set DD_PROFILER_TLS_WATCHER=1 to enable for native thread priming
153171
// Supports both environment variable and system property (for JMH forked JVMs)
154172
const char* watcher_env = std::getenv("DD_PROFILER_TLS_WATCHER");
155-
bool watcher_enabled = (watcher_env != nullptr && std::strcmp(watcher_env, "1") == 0);
156-
157-
// If not set via environment variable, check system property (for JMH compatibility)
158-
if (!watcher_enabled) {
159-
char* watcher_prop = nullptr;
160-
jvmtiEnv *jvmti = VM::jvmti();
161-
if (jvmti != nullptr && jvmti->GetSystemProperty("DD_PROFILER_TLS_WATCHER", &watcher_prop) == 0 && watcher_prop != nullptr) {
162-
watcher_enabled = (std::strcmp(watcher_prop, "1") == 0);
163-
jvmti->Deallocate((unsigned char*)watcher_prop);
164-
}
165-
}
173+
bool watcher_enabled = false; //(watcher_env == nullptr || std::strcmp(watcher_env, "1") == 0);
174+
175+
// // If not set via environment variable, check system property (for JMH compatibility)
176+
// if (watcher_enabled) {
177+
// char* watcher_prop = nullptr;
178+
// jvmtiEnv *jvmti = VM::jvmti();
179+
// if (jvmti != nullptr && jvmti->GetSystemProperty("DD_PROFILER_TLS_WATCHER", &watcher_prop) == 0 && watcher_prop != nullptr) {
180+
// watcher_enabled = (std::strcmp(watcher_prop, "1") != 0);
181+
// jvmti->Deallocate((unsigned char*)watcher_prop);
182+
// }
183+
// }
166184

167185
if (watcher_enabled) {
168186
// Start thread directory watcher to prime new threads (no mass-priming of existing threads)
@@ -390,3 +408,31 @@ void ProfiledThread::simpleTlsSignalHandler(int signo) {
390408
initCurrentThreadWithBuffer();
391409
}
392410
}
411+
412+
void ProfiledThread::registerJavaThread(int tid) {
413+
// Apply Knuth multiplicative hash for better distribution
414+
size_t hash = static_cast<size_t>(tid) * KNUTH_MULTIPLICATIVE_CONSTANT;
415+
size_t bit_index = hash % JAVA_THREAD_BITSET_SIZE;
416+
size_t word_index = bit_index / 64;
417+
uint64_t bit_mask = 1ULL << (bit_index % 64);
418+
_java_thread_bitset[word_index].value.fetch_or(bit_mask, std::memory_order_release);
419+
}
420+
421+
bool ProfiledThread::isLikelyJavaThread(int tid) {
422+
// Apply Knuth multiplicative hash for better distribution
423+
size_t hash = static_cast<size_t>(tid) * KNUTH_MULTIPLICATIVE_CONSTANT;
424+
size_t bit_index = hash % JAVA_THREAD_BITSET_SIZE;
425+
size_t word_index = bit_index / 64;
426+
uint64_t bit_mask = 1ULL << (bit_index % 64);
427+
uint64_t word = _java_thread_bitset[word_index].value.load(std::memory_order_acquire);
428+
return (word & bit_mask) != 0;
429+
}
430+
431+
void ProfiledThread::unregisterJavaThread(int tid) {
432+
// Apply Knuth multiplicative hash for better distribution
433+
size_t hash = static_cast<size_t>(tid) * KNUTH_MULTIPLICATIVE_CONSTANT;
434+
size_t bit_index = hash % JAVA_THREAD_BITSET_SIZE;
435+
size_t word_index = bit_index / 64;
436+
uint64_t bit_mask = 1ULL << (bit_index % 64);
437+
_java_thread_bitset[word_index].value.fetch_and(~bit_mask, std::memory_order_release);
438+
}

ddprof-lib/src/main/cpp/thread.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "os_dd.h"
1010
#include "threadLocalData.h"
1111
#include "unwindStats.h"
12+
#include "lockFree.h"
1213
#include <atomic>
1314
#include <cstdint>
1415
#include <jvmti.h>
@@ -184,8 +185,18 @@ class ProfiledThread : public ThreadLocalData {
184185
inline bool isContextTlsInitialized() {
185186
return _ctx_tls_initialized;
186187
}
187-
188+
189+
// Java thread tracking for TLS priming optimization
190+
static void registerJavaThread(int tid);
191+
static void unregisterJavaThread(int tid);
192+
static bool isLikelyJavaThread(int tid);
193+
188194
private:
195+
// Lock-free bitset for Java thread tracking
196+
static constexpr size_t JAVA_THREAD_BITSET_SIZE = 8192;
197+
static constexpr size_t JAVA_THREAD_BITSET_WORDS = JAVA_THREAD_BITSET_SIZE / 64;
198+
static PaddedAtomic<uint64_t> _java_thread_bitset[JAVA_THREAD_BITSET_WORDS];
199+
static void initJavaThreadBitset();
189200
// Atomic flag for signal handler reentrancy protection within the same thread
190201
// Must be atomic because a signal handler can interrupt normal execution mid-instruction,
191202
// and both contexts may attempt to enter the critical section. Without atomic exchange(),

0 commit comments

Comments
 (0)