Skip to content

Refactor thread filter mechanisms #209

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions ddprof-lib/src/main/cpp/flightRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ char *Recording::_jvm_flags = NULL;
char *Recording::_java_command = NULL;

Recording::Recording(int fd, Arguments &args)
: _fd(fd), _thread_set(), _method_map() {
: _fd(fd), _method_map() {

args.save(_args);
_chunk_start = lseek(_fd, 0, SEEK_END);
Expand All @@ -330,6 +330,8 @@ Recording::Recording(int fd, Arguments &args)
_bytes_written = 0;

_tid = OS::threadId();
_active_index.store(0, std::memory_order_relaxed);

VM::jvmti()->GetAvailableProcessors(&_available_processors);

writeHeader(_buf);
Expand Down Expand Up @@ -1059,11 +1061,18 @@ void Recording::writeExecutionModes(Buffer *buf) {
}

void Recording::writeThreads(Buffer *buf) {
addThread(_tid);
Copy link
Collaborator

@jbachorik jbachorik Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to remove this call? We should make sure that the recording thread is also included as events can be associated with it - recording info, settings, config - all point to _tid thread.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I do (with the threads.insert(_tid);)
I had to move it down a few lines.

std::vector<int> threads;
threads.reserve(_thread_set.size());
_thread_set.collect(threads);
_thread_set.clear();
int old_index = _active_index.fetch_xor(1, std::memory_order_acq_rel);
// After flip: new samples go into the new active set
// We flush from old_index (the previous active set)

std::unordered_set<int> threads;
threads.insert(_tid);

for (int i = 0; i < CONCURRENCY_LEVEL; ++i) {
// Collect thread IDs from the fixed-size table into the main set
_thread_ids[i][old_index].collect(threads);
_thread_ids[i][old_index].clear();
}

Profiler *profiler = Profiler::instance();
ThreadInfo *t_info = &profiler->_thread_info;
Expand All @@ -1072,15 +1081,15 @@ void Recording::writeThreads(Buffer *buf) {

buf->putVar64(T_THREAD);
buf->putVar64(threads.size());
for (int i = 0; i < threads.size(); i++) {
for (auto tid : threads) {
const char *thread_name;
jlong thread_id;
std::pair<std::shared_ptr<std::string>, u64> info = t_info->get(threads[i]);
std::pair<std::shared_ptr<std::string>, u64> info = t_info->get(tid);
if (info.first) {
thread_name = info.first->c_str();
thread_id = info.second;
} else {
snprintf(name_buf, sizeof(name_buf), "[tid=%d]", threads[i]);
snprintf(name_buf, sizeof(name_buf), "[tid=%d]", tid);
thread_name = name_buf;
thread_id = 0;
}
Expand All @@ -1090,9 +1099,9 @@ void Recording::writeThreads(Buffer *buf) {
(thread_id == 0 ? length + 1 : 2 * length) -
3 * 10; // 3x max varint length
flushIfNeeded(buf, required);
buf->putVar64(threads[i]);
buf->putVar64(tid);
buf->putUtf8(thread_name, length);
buf->putVar64(threads[i]);
buf->putVar64(tid);
if (thread_id == 0) {
buf->put8(0);
} else {
Expand Down Expand Up @@ -1442,7 +1451,11 @@ void Recording::recordCpuLoad(Buffer *buf, float proc_user, float proc_system,
flushIfNeeded(buf);
}

void Recording::addThread(int tid) { _thread_set.add(tid); }
// assumption is that we hold the lock (with lock_index)
void Recording::addThread(int lock_index, int tid) {
int active = _active_index.load(std::memory_order_acquire);
_thread_ids[lock_index][active].insert(tid);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder - does the ordering matter here?
Eg. if we have [active][lock_index] can it make things better/worse in terms of data locality?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not acceptable: no allocations can take place here.

}

Error FlightRecorder::start(Arguments &args, bool reset) {
const char *file = args.file();
Expand Down Expand Up @@ -1599,7 +1612,7 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u32 call_trace_id,
break;
}
_rec->flushIfNeeded(buf);
_rec->addThread(tid);
_rec->addThread(lock_index, tid);
}
}

Expand Down
13 changes: 10 additions & 3 deletions ddprof-lib/src/main/cpp/flightRecorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define _FLIGHTRECORDER_H

#include <map>
#include <unordered_set>

#include <limits.h>
#include <string.h>
Expand All @@ -34,6 +35,7 @@
#include "mutex.h"
#include "objectSampler.h"
#include "threadFilter.h"
#include "threadIdTable.h"
#include "vmEntry.h"

const u64 MAX_JLONG = 0x7fffffffffffffffULL;
Expand Down Expand Up @@ -127,9 +129,13 @@ class Recording {
static char *_java_command;

RecordingBuffer _buf[CONCURRENCY_LEVEL];
// we have several tables to avoid lock contention
// we have a second dimension to allow a switch in the active table
ThreadIdTable _thread_ids[CONCURRENCY_LEVEL][2];
std::atomic<int> _active_index{0}; // 0 or 1 globally

int _fd;
off_t _chunk_start;
ThreadFilter _thread_set;
MethodMap _method_map;

Arguments _args;
Expand Down Expand Up @@ -158,7 +164,7 @@ class Recording {
public:
Recording(int fd, Arguments &args);
~Recording();

void copyTo(int target_fd);
off_t finishChunk();

Expand Down Expand Up @@ -258,7 +264,8 @@ class Recording {
LockEvent *event);
void recordCpuLoad(Buffer *buf, float proc_user, float proc_system,
float machine_total);
void addThread(int tid);

void addThread(int lock_index, int tid);
};

class Lookup {
Expand Down
69 changes: 44 additions & 25 deletions ddprof-lib/src/main/cpp/javaApi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,59 @@ Java_com_datadoghq_profiler_JavaProfiler_getSamples(JNIEnv *env,
return (jlong)Profiler::instance()->total_samples();
}

// some duplication between add and remove, though we want to avoid having an extra branch in the hot path
extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(JNIEnv *env,
jobject unused) {
ProfiledThread *current = ProfiledThread::current();
int tid = current->tid();
if (unlikely(tid < 0)) {
return;
}
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
int slot_id = current->filterSlotId();
if (unlikely(slot_id == -1)) {
return;
}
thread_filter->add(tid, slot_id);
}

extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0(JNIEnv *env,
jobject unused) {
ProfiledThread *current = ProfiledThread::current();
int tid = current->tid();
if (unlikely(tid < 0)) {
return;
}
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
int slot_id = current->filterSlotId();
if (unlikely(slot_id == -1)) {
return;
}
thread_filter->remove(slot_id);
}

// Backward compatibility for existing code
extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_filterThread0(JNIEnv *env,
jobject unused,
jboolean enable) {
int tid = ProfiledThread::currentTid();
if (tid < 0) {
ProfiledThread *current = ProfiledThread::current();
int tid = current->tid();
if (unlikely(tid < 0)) {
return;
}
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
int slot_id = current->filterSlotId();
if (unlikely(slot_id == -1)) {
return;
}

if (enable) {
thread_filter->add(tid);
thread_filter->add(tid, slot_id);
} else {
thread_filter->remove(tid);
thread_filter->remove(slot_id);
}
}

Expand Down Expand Up @@ -406,24 +446,3 @@ Java_com_datadoghq_profiler_JVMAccess_healthCheck0(JNIEnv *env,
jobject unused) {
return true;
}

extern "C" DLLEXPORT jlong JNICALL
Java_com_datadoghq_profiler_ActiveBitmap_bitmapAddressFor0(JNIEnv *env,
jclass unused,
jint tid) {
u64* bitmap = Profiler::instance()->threadFilter()->bitmapAddressFor((int)tid);
return (jlong)bitmap;
}

extern "C" DLLEXPORT jboolean JNICALL
Java_com_datadoghq_profiler_ActiveBitmap_isActive0(JNIEnv *env,
jclass unused,
jint tid) {
return Profiler::instance()->threadFilter()->accept((int)tid) ? JNI_TRUE : JNI_FALSE;
}

extern "C" DLLEXPORT jlong JNICALL
Java_com_datadoghq_profiler_ActiveBitmap_getActiveCountAddr0(JNIEnv *env,
jclass unused) {
return (jlong)Profiler::instance()->threadFilter()->addressOfSize();
}
15 changes: 10 additions & 5 deletions ddprof-lib/src/main/cpp/profiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@ void Profiler::addRuntimeStub(const void *address, int length,

void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
ProfiledThread::initCurrentThread();

int tid = ProfiledThread::currentTid();
ProfiledThread *current = ProfiledThread::current();
int tid = current->tid();
if (_thread_filter.enabled()) {
_thread_filter.remove(tid);
int slot_id = _thread_filter.registerThread();
current->setFilterSlotId(slot_id);
_thread_filter.remove(slot_id); // Remove from filtering initially
}
updateThreadName(jvmti, jni, thread, true);

Expand All @@ -116,9 +118,12 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
}

void Profiler::onThreadEnd(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
int tid = ProfiledThread::currentTid();
ProfiledThread *current = ProfiledThread::current();
int slot_id = current->filterSlotId();
int tid = current->tid();
if (_thread_filter.enabled()) {
_thread_filter.remove(tid);
_thread_filter.unregisterThread(slot_id);
current->setFilterSlotId(-1);
}
updateThreadName(jvmti, jni, thread, true);

Expand Down
6 changes: 5 additions & 1 deletion ddprof-lib/src/main/cpp/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ class ProfiledThread : public ThreadLocalData {
u32 _wall_epoch;
u32 _call_trace_id;
u32 _recording_epoch;
int _filter_slot_id; // Slot ID for thread filtering
UnwindFailures _unwind_failures;

ProfiledThread(int buffer_pos, int tid)
: ThreadLocalData(), _pc(0), _span_id(0), _crash_depth(0), _buffer_pos(buffer_pos), _tid(tid), _cpu_epoch(0),
_wall_epoch(0), _call_trace_id(0), _recording_epoch(0) {};
_wall_epoch(0), _call_trace_id(0), _recording_epoch(0), _filter_slot_id(-1) {};

void releaseFromBuffer();

Expand Down Expand Up @@ -120,6 +121,9 @@ class ProfiledThread : public ThreadLocalData {
}

static void signalHandler(int signo, siginfo_t *siginfo, void *ucontext);

int filterSlotId() { return _filter_slot_id; }
void setFilterSlotId(int slotId) { _filter_slot_id = slotId; }
};

#endif // _THREAD_H
Loading
Loading