Skip to content

Refactor thread filter mechanisms #209

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions ddprof-lib/src/main/cpp/flightRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ char *Recording::_jvm_flags = NULL;
char *Recording::_java_command = NULL;

Recording::Recording(int fd, Arguments &args)
: _fd(fd), _thread_set(), _method_map() {
: _fd(fd), _method_map() {

args.save(_args);
_chunk_start = lseek(_fd, 0, SEEK_END);
Expand All @@ -329,6 +329,8 @@ Recording::Recording(int fd, Arguments &args)
_bytes_written = 0;

_tid = OS::threadId();
_active_index.store(0, std::memory_order_relaxed);

VM::jvmti()->GetAvailableProcessors(&_available_processors);

writeHeader(_buf);
Expand Down Expand Up @@ -1053,11 +1055,18 @@ void Recording::writeExecutionModes(Buffer *buf) {
}

void Recording::writeThreads(Buffer *buf) {
addThread(_tid);
Copy link
Collaborator

@jbachorik jbachorik Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to remove this call? We should make sure that the recording thread is also included as events can be associated with it - recording info, settings, config - all point to _tid thread.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I do (with the threads.insert(_tid);)
I had to move it down a few lines.

std::vector<int> threads;
threads.reserve(_thread_set.size());
_thread_set.collect(threads);
_thread_set.clear();
int old_index = _active_index.fetch_xor(1, std::memory_order_acq_rel);
// After flip: new samples go into the new active set
// We flush from old_index (the previous active set)

std::unordered_set<int> threads;
threads.insert(_tid);

for (int i = 0; i < CONCURRENCY_LEVEL; ++i) {
// I can not use merge : cpp 17
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated - but we might try to bump the accepted C++ level to 17. It should be possible ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

threads.insert(_thread_ids[i][old_index].begin(), _thread_ids[i][old_index].end());
_thread_ids[i][old_index].clear();
}

Profiler *profiler = Profiler::instance();
ThreadInfo *t_info = &profiler->_thread_info;
Expand All @@ -1066,15 +1075,15 @@ void Recording::writeThreads(Buffer *buf) {

buf->putVar64(T_THREAD);
buf->putVar64(threads.size());
for (int i = 0; i < threads.size(); i++) {
for (auto tid : threads) {
const char *thread_name;
jlong thread_id;
std::pair<std::shared_ptr<std::string>, u64> info = t_info->get(threads[i]);
std::pair<std::shared_ptr<std::string>, u64> info = t_info->get(tid);
if (info.first) {
thread_name = info.first->c_str();
thread_id = info.second;
} else {
snprintf(name_buf, sizeof(name_buf), "[tid=%d]", threads[i]);
snprintf(name_buf, sizeof(name_buf), "[tid=%d]", tid);
thread_name = name_buf;
thread_id = 0;
}
Expand All @@ -1084,9 +1093,9 @@ void Recording::writeThreads(Buffer *buf) {
(thread_id == 0 ? length + 1 : 2 * length) -
3 * 10; // 3x max varint length
flushIfNeeded(buf, required);
buf->putVar64(threads[i]);
buf->putVar64(tid);
buf->putUtf8(thread_name, length);
buf->putVar64(threads[i]);
buf->putVar64(tid);
if (thread_id == 0) {
buf->put8(0);
} else {
Expand Down Expand Up @@ -1420,7 +1429,11 @@ void Recording::recordCpuLoad(Buffer *buf, float proc_user, float proc_system,
flushIfNeeded(buf);
}

void Recording::addThread(int tid) { _thread_set.add(tid); }
// assumption is that we hold the lock (with lock_index)
void Recording::addThread(int lock_index, int tid) {
int active = _active_index.load(std::memory_order_acquire);
_thread_ids[lock_index][active].insert(tid);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder - does the ordering matter here?
Eg. if we have [active][lock_index] can it make things better/worse in terms of data locality?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not acceptable: no allocations can take place here.

}

Error FlightRecorder::start(Arguments &args, bool reset) {
const char *file = args.file();
Expand Down Expand Up @@ -1577,7 +1590,7 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u32 call_trace_id,
break;
}
_rec->flushIfNeeded(buf);
_rec->addThread(tid);
_rec->addThread(lock_index, tid);
}
}

Expand Down
12 changes: 9 additions & 3 deletions ddprof-lib/src/main/cpp/flightRecorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define _FLIGHTRECORDER_H

#include <map>
#include <unordered_set>

#include <limits.h>
#include <string.h>
Expand Down Expand Up @@ -127,9 +128,13 @@ class Recording {
static char *_java_command;

RecordingBuffer _buf[CONCURRENCY_LEVEL];
// we have several sets to avoid lock contention
// we have a second dimension to allow a switch in the active set
std::unordered_set<int> _thread_ids[CONCURRENCY_LEVEL][2];
std::atomic<int> _active_index{0}; // 0 or 1 globally
Copy link
Collaborator

@jbachorik jbachorik Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder - can this be factored out to a separate type?


int _fd;
off_t _chunk_start;
ThreadFilter _thread_set;
MethodMap _method_map;

Arguments _args;
Expand Down Expand Up @@ -158,7 +163,7 @@ class Recording {
public:
Recording(int fd, Arguments &args);
~Recording();

void copyTo(int target_fd);
off_t finishChunk();

Expand Down Expand Up @@ -256,7 +261,8 @@ class Recording {
LockEvent *event);
void recordCpuLoad(Buffer *buf, float proc_user, float proc_system,
float machine_total);
void addThread(int tid);

void addThread(int lock_index, int tid);
};

class Lookup {
Expand Down
36 changes: 27 additions & 9 deletions ddprof-lib/src/main/cpp/javaApi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,22 +123,40 @@ Java_com_datadoghq_profiler_JavaProfiler_getSamples(JNIEnv *env,
return (jlong)Profiler::instance()->total_samples();
}

// some duplication between add and remove, though we want to avoid having an extra branch in the hot path
extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_filterThread0(JNIEnv *env,
jobject unused,
jboolean enable) {
int tid = ProfiledThread::currentTid();
if (tid < 0) {
Java_com_datadoghq_profiler_JavaProfiler_filterThread_1add(JNIEnv *env,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is actually a typo 1_add this will not be linked with the Java native method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably due to the mangling (I had to iterate a few times before I figured out how to link)

jobject unused) {
ProfiledThread *current = ProfiledThread::current();
int tid = current->tid();
if (unlikely(tid < 0)) {
return;
}
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
int slot_id = current->filterSlotId();
if (unlikely(slot_id == -1)) {
return;
}
thread_filter->add(tid, slot_id);
}

extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_filterThread_1remove(JNIEnv *env,
jobject unused) {
ProfiledThread *current = ProfiledThread::current();
int tid = current->tid();
if (unlikely(tid < 0)) {
return;
}
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
if (enable) {
thread_filter->add(tid);
} else {
thread_filter->remove(tid);
int slot_id = current->filterSlotId();
if (unlikely(slot_id == -1)) {
return;
}
thread_filter->remove(slot_id);
}


extern "C" DLLEXPORT jobject JNICALL
Java_com_datadoghq_profiler_JavaProfiler_getContextPage0(JNIEnv *env,
jobject unused,
Expand Down
12 changes: 8 additions & 4 deletions ddprof-lib/src/main/cpp/profiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ void Profiler::addRuntimeStub(const void *address, int length,

void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
ProfiledThread::initCurrentThread();

int tid = ProfiledThread::currentTid();
ProfiledThread *current = ProfiledThread::current();
int tid = current->tid();
if (_thread_filter.enabled()) {
current->setFilterSlotId(_thread_filter.registerThread());
_thread_filter.remove(tid);
}
updateThreadName(jvmti, jni, thread, true);
Expand All @@ -115,9 +116,12 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
}

void Profiler::onThreadEnd(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
int tid = ProfiledThread::currentTid();
ProfiledThread *current = ProfiledThread::current();
int slot_id = current->filterSlotId();
int tid = current->tid();
if (_thread_filter.enabled()) {
_thread_filter.remove(tid);
_thread_filter.unregisterThread(slot_id);
current->setFilterSlotId(-1);
}
updateThreadName(jvmti, jni, thread, true);

Expand Down
6 changes: 5 additions & 1 deletion ddprof-lib/src/main/cpp/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ class ProfiledThread : public ThreadLocalData {
u32 _wall_epoch;
u32 _call_trace_id;
u32 _recording_epoch;
int _filter_slot_id; // Slot ID for thread filtering

ProfiledThread(int buffer_pos, int tid)
: ThreadLocalData(), _pc(0), _span_id(0), _crash_depth(0), _buffer_pos(buffer_pos), _tid(tid), _cpu_epoch(0),
_wall_epoch(0), _call_trace_id(0), _recording_epoch(0) {};
_wall_epoch(0), _call_trace_id(0), _recording_epoch(0), _filter_slot_id(-1) {};

void releaseFromBuffer();

Expand Down Expand Up @@ -111,6 +112,9 @@ class ProfiledThread : public ThreadLocalData {
}

static void signalHandler(int signo, siginfo_t *siginfo, void *ucontext);

int filterSlotId() { return _filter_slot_id; }
void setFilterSlotId(int slotId) { _filter_slot_id = slotId; }
};

#endif // _THREAD_H
Loading