Skip to content

Commit efa094e

Browse files
committed
Thread filter - add a table with linear probing
Remove usage of set to ensure we are signal safe.
1 parent 115a60a commit efa094e

File tree

6 files changed

+111
-15
lines changed

6 files changed

+111
-15
lines changed

ddprof-lib/src/main/cpp/flightRecorder.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,8 +1063,8 @@ void Recording::writeThreads(Buffer *buf) {
10631063
threads.insert(_tid);
10641064

10651065
for (int i = 0; i < CONCURRENCY_LEVEL; ++i) {
1066-
// I can not use merge : cpp 17
1067-
threads.insert(_thread_ids[i][old_index].begin(), _thread_ids[i][old_index].end());
1066+
// Collect thread IDs from the fixed-size table into the main set
1067+
_thread_ids[i][old_index].collect(threads);
10681068
_thread_ids[i][old_index].clear();
10691069
}
10701070

ddprof-lib/src/main/cpp/flightRecorder.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "mutex.h"
3636
#include "objectSampler.h"
3737
#include "threadFilter.h"
38+
#include "threadIdTable.h"
3839
#include "vmEntry.h"
3940

4041
const u64 MAX_JLONG = 0x7fffffffffffffffULL;
@@ -128,9 +129,9 @@ class Recording {
128129
static char *_java_command;
129130

130131
RecordingBuffer _buf[CONCURRENCY_LEVEL];
131-
// we have several sets to avoid lock contention
132-
// we have a second dimension to allow a switch in the active set
133-
std::unordered_set<int> _thread_ids[CONCURRENCY_LEVEL][2];
132+
// we have several tables to avoid lock contention
133+
// we have a second dimension to allow a switch in the active table
134+
ThreadIdTable _thread_ids[CONCURRENCY_LEVEL][2];
134135
std::atomic<int> _active_index{0}; // 0 or 1 globally
135136

136137
int _fd;

ddprof-lib/src/main/cpp/javaApi.cpp

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ Java_com_datadoghq_profiler_JavaProfiler_getSamples(JNIEnv *env,
125125

126126
// some duplication between add and remove, though we want to avoid having an extra branch in the hot path
127127
extern "C" DLLEXPORT void JNICALL
128-
Java_com_datadoghq_profiler_JavaProfiler_filterThread_1add(JNIEnv *env,
129-
jobject unused) {
128+
Java_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(JNIEnv *env,
129+
jobject unused) {
130130
ProfiledThread *current = ProfiledThread::current();
131131
int tid = current->tid();
132132
if (unlikely(tid < 0)) {
@@ -141,8 +141,8 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThread_1add(JNIEnv *env,
141141
}
142142

143143
extern "C" DLLEXPORT void JNICALL
144-
Java_com_datadoghq_profiler_JavaProfiler_filterThread_1remove(JNIEnv *env,
145-
jobject unused) {
144+
Java_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0(JNIEnv *env,
145+
jobject unused) {
146146
ProfiledThread *current = ProfiledThread::current();
147147
int tid = current->tid();
148148
if (unlikely(tid < 0)) {
@@ -156,6 +156,28 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThread_1remove(JNIEnv *env,
156156
thread_filter->remove(slot_id);
157157
}
158158

159+
// Backward compatibility for existing code
160+
extern "C" DLLEXPORT void JNICALL
161+
Java_com_datadoghq_profiler_JavaProfiler_filterThread0(JNIEnv *env,
162+
jobject unused,
163+
jboolean enable) {
164+
ProfiledThread *current = ProfiledThread::current();
165+
int tid = current->tid();
166+
if (unlikely(tid < 0)) {
167+
return;
168+
}
169+
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
170+
int slot_id = current->filterSlotId();
171+
if (unlikely(slot_id == -1)) {
172+
return;
173+
}
174+
175+
if (enable) {
176+
thread_filter->add(tid, slot_id);
177+
} else {
178+
thread_filter->remove(slot_id);
179+
}
180+
}
159181

160182
extern "C" DLLEXPORT jobject JNICALL
161183
Java_com_datadoghq_profiler_JavaProfiler_getContextPage0(JNIEnv *env,

ddprof-lib/src/main/cpp/profiler.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,9 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
106106
ProfiledThread *current = ProfiledThread::current();
107107
int tid = current->tid();
108108
if (_thread_filter.enabled()) {
109-
current->setFilterSlotId(_thread_filter.registerThread());
110-
_thread_filter.remove(tid);
109+
int slot_id = _thread_filter.registerThread();
110+
current->setFilterSlotId(slot_id);
111+
_thread_filter.remove(slot_id); // Remove from filtering initially
111112
}
112113
updateThreadName(jvmti, jni, thread, true);
113114

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright The async-profiler authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright 2021, 2025 Datadog, Inc
5+
*/
6+
7+
#ifndef _THREADIDTABLE_H
8+
#define _THREADIDTABLE_H
9+
10+
#include <atomic>
11+
#include <unordered_set>
12+
13+
// Signal-safe thread ID table with fixed size
14+
class ThreadIdTable {
15+
private:
16+
static const int TABLE_SIZE = 256; // Should handle most realistic thread counts
17+
std::atomic<int> table[TABLE_SIZE];
18+
19+
int hash(int tid) const {
20+
// Simple hash function - could be improved if needed
21+
return tid % TABLE_SIZE;
22+
}
23+
24+
public:
25+
ThreadIdTable() {
26+
clear();
27+
}
28+
29+
// Signal-safe insertion using atomic operations only
30+
void insert(int tid) {
31+
if (tid == 0) return; // Invalid thread ID, 0 is reserved for empty slots
32+
33+
int start_slot = hash(tid);
34+
for (int probe = 0; probe < TABLE_SIZE; probe++) {
35+
int slot = (start_slot + probe) % TABLE_SIZE;
36+
int expected = 0;
37+
38+
// Try to claim empty slot
39+
if (table[slot].compare_exchange_strong(expected, tid, std::memory_order_relaxed)) {
40+
return; // Successfully inserted
41+
}
42+
43+
// Check if already present
44+
if (table[slot].load(std::memory_order_relaxed) == tid) {
45+
return; // Already exists
46+
}
47+
}
48+
// Table full - thread ID will be lost, but this is rare and non-critical
49+
// Could increment a counter here for diagnostics if needed
50+
}
51+
52+
// Clear the table (not signal-safe, called during buffer switch)
53+
void clear() {
54+
for (int i = 0; i < TABLE_SIZE; i++) {
55+
table[i].store(0, std::memory_order_relaxed);
56+
}
57+
}
58+
59+
// Collect all thread IDs into a set (not signal-safe, called during buffer switch)
60+
void collect(std::unordered_set<int>& result) {
61+
for (int i = 0; i < TABLE_SIZE; i++) {
62+
int tid = table[i].load(std::memory_order_relaxed);
63+
if (tid != 0) {
64+
result.insert(tid);
65+
}
66+
}
67+
}
68+
};
69+
70+
#endif // _THREADIDTABLE_H

ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,15 +207,15 @@ public boolean recordTraceRoot(long rootSpanId, String endpoint, int sizeLimit)
207207
* 'filter' option must be enabled to use this method.
208208
*/
209209
public void addThread() {
210-
filterThread_add();
210+
filterThreadAdd0();
211211
}
212212

213213
/**
214214
* Remove the given thread to the set of profiled threads.
215215
* 'filter' option must be enabled to use this method.
216216
*/
217217
public void removeThread() {
218-
filterThread_remove();
218+
filterThreadRemove0();
219219
}
220220

221221
/**
@@ -446,8 +446,10 @@ public Map<String, Long> getDebugCounters() {
446446
private native void stop0() throws IllegalStateException;
447447
private native String execute0(String command) throws IllegalArgumentException, IllegalStateException, IOException;
448448

449-
private native void filterThread_add();
450-
private native void filterThread_remove();
449+
private native void filterThreadAdd0();
450+
private native void filterThreadRemove0();
451+
// Backward compatibility for existing code
452+
private native void filterThread0(boolean enable);
451453

452454
private static native int getTid0();
453455
private static native ByteBuffer getContextPage0(int tid);

0 commit comments

Comments
 (0)