Skip to content

Commit 1d4efc0

Browse files
committed
thread filter - refactor recording implementation
remove the usage of the thread filter in the recording
1 parent 258f23e commit 1d4efc0

File tree

6 files changed

+114
-99
lines changed

6 files changed

+114
-99
lines changed

ddprof-lib/src/main/cpp/flightRecorder.cpp

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ char *Recording::_jvm_flags = NULL;
317317
char *Recording::_java_command = NULL;
318318

319319
Recording::Recording(int fd, Arguments &args)
320-
: _fd(fd), _thread_set(), _method_map() {
320+
: _fd(fd), _method_map() {
321321

322322
args.save(_args);
323323
_chunk_start = lseek(_fd, 0, SEEK_END);
@@ -329,6 +329,8 @@ Recording::Recording(int fd, Arguments &args)
329329
_bytes_written = 0;
330330

331331
_tid = OS::threadId();
332+
_active_index.store(0, std::memory_order_relaxed);
333+
332334
VM::jvmti()->GetAvailableProcessors(&_available_processors);
333335

334336
writeHeader(_buf);
@@ -1053,10 +1055,18 @@ void Recording::writeExecutionModes(Buffer *buf) {
10531055
}
10541056

10551057
void Recording::writeThreads(Buffer *buf) {
1056-
addThread(_tid);
1057-
std::vector<int> threads;
1058-
_thread_set.collect(threads);
1059-
_thread_set.clear();
1058+
int old_index = _active_index.fetch_xor(1, std::memory_order_acq_rel);
1059+
// After flip: new samples go into the new active set
1060+
// We flush from old_index (the previous active set)
1061+
1062+
std::unordered_set<int> threads;
1063+
threads.insert(_tid);
1064+
1065+
for (int i = 0; i < CONCURRENCY_LEVEL; ++i) {
1066+
// I can not use merge : cpp 17
1067+
threads.insert(_thread_ids[i][old_index].begin(), _thread_ids[i][old_index].end());
1068+
_thread_ids[i][old_index].clear();
1069+
}
10601070

10611071
Profiler *profiler = Profiler::instance();
10621072
ThreadInfo *t_info = &profiler->_thread_info;
@@ -1065,15 +1075,15 @@ void Recording::writeThreads(Buffer *buf) {
10651075

10661076
buf->putVar64(T_THREAD);
10671077
buf->putVar64(threads.size());
1068-
for (int i = 0; i < threads.size(); i++) {
1078+
for (auto tid : threads) {
10691079
const char *thread_name;
10701080
jlong thread_id;
1071-
std::pair<std::shared_ptr<std::string>, u64> info = t_info->get(threads[i]);
1081+
std::pair<std::shared_ptr<std::string>, u64> info = t_info->get(tid);
10721082
if (info.first) {
10731083
thread_name = info.first->c_str();
10741084
thread_id = info.second;
10751085
} else {
1076-
snprintf(name_buf, sizeof(name_buf), "[tid=%d]", threads[i]);
1086+
snprintf(name_buf, sizeof(name_buf), "[tid=%d]", tid);
10771087
thread_name = name_buf;
10781088
thread_id = 0;
10791089
}
@@ -1083,9 +1093,9 @@ void Recording::writeThreads(Buffer *buf) {
10831093
(thread_id == 0 ? length + 1 : 2 * length) -
10841094
3 * 10; // 3x max varint length
10851095
flushIfNeeded(buf, required);
1086-
buf->putVar64(threads[i]);
1096+
buf->putVar64(tid);
10871097
buf->putUtf8(thread_name, length);
1088-
buf->putVar64(threads[i]);
1098+
buf->putVar64(tid);
10891099
if (thread_id == 0) {
10901100
buf->put8(0);
10911101
} else {
@@ -1419,8 +1429,10 @@ void Recording::recordCpuLoad(Buffer *buf, float proc_user, float proc_system,
14191429
flushIfNeeded(buf);
14201430
}
14211431

1422-
void Recording::addThread(int tid) {
1423-
_thread_set.add(tid, 0); // todo: add slot_id management
1432+
// assumption is that we hold the lock (with lock_index)
1433+
void Recording::addThread(int lock_index, int tid) {
1434+
int active = _active_index.load(std::memory_order_acquire);
1435+
_thread_ids[lock_index][active].insert(tid);
14241436
}
14251437

14261438
Error FlightRecorder::start(Arguments &args, bool reset) {
@@ -1578,7 +1590,7 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u32 call_trace_id,
15781590
break;
15791591
}
15801592
_rec->flushIfNeeded(buf);
1581-
_rec->addThread(tid);
1593+
_rec->addThread(lock_index, tid);
15821594
}
15831595
}
15841596

ddprof-lib/src/main/cpp/flightRecorder.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#define _FLIGHTRECORDER_H
1919

2020
#include <map>
21+
#include <unordered_set>
2122

2223
#include <limits.h>
2324
#include <string.h>
@@ -127,9 +128,11 @@ class Recording {
127128
static char *_java_command;
128129

129130
RecordingBuffer _buf[CONCURRENCY_LEVEL];
131+
std::unordered_set<int> _thread_ids[CONCURRENCY_LEVEL][2];
132+
std::atomic<int> _active_index{0}; // 0 or 1 globally
133+
130134
int _fd;
131135
off_t _chunk_start;
132-
ThreadFilter _thread_set;
133136
MethodMap _method_map;
134137

135138
Arguments _args;
@@ -158,7 +161,7 @@ class Recording {
158161
public:
159162
Recording(int fd, Arguments &args);
160163
~Recording();
161-
164+
162165
void copyTo(int target_fd);
163166
off_t finishChunk();
164167

@@ -256,7 +259,8 @@ class Recording {
256259
LockEvent *event);
257260
void recordCpuLoad(Buffer *buf, float proc_user, float proc_system,
258261
float machine_total);
259-
void addThread(int tid);
262+
263+
void addThread(int lock_index, int tid);
260264
};
261265

262266
class Lookup {

ddprof-lib/src/main/cpp/javaApi.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThread_1add(JNIEnv *env,
129129
jobject unused) {
130130
ProfiledThread *current = ProfiledThread::current();
131131
int tid = current->tid();
132-
if (tid < 0) {
132+
if (unlikely(tid < 0)) {
133133
return;
134134
}
135135
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
@@ -145,7 +145,7 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThread_1remove(JNIEnv *env,
145145
jobject unused) {
146146
ProfiledThread *current = ProfiledThread::current();
147147
int tid = current->tid();
148-
if (tid < 0) {
148+
if (unlikely(tid < 0)) {
149149
return;
150150
}
151151
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
Lines changed: 61 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,114 +1,104 @@
1-
// todo copyright stuff
2-
// as I rewrote all the implem
1+
// Copyright (C) Datadog 2025
2+
// Implementation of thread filter management
33

44
#include "threadFilter.h"
5-
#include <mutex>
6-
#include <thread>
7-
#include <cstdlib>
8-
#include <cstring>
9-
10-
static std::mutex slot_mutex;
115

6+
#include <cstdlib>
7+
#include <thread>
128

13-
ThreadFilter::ThreadFilter() : _next_index(0), _enabled(false) {
14-
std::lock_guard<std::mutex> lock(slot_mutex);
15-
_slots.resize(128); // preallocate some slots
16-
// Initialize all slots to -1
17-
for (auto& slot : _slots) {
18-
slot.value.store(-1, std::memory_order_relaxed);
19-
}
9+
ThreadFilter::ThreadFilter()
10+
: _next_index(0), _enabled(false) {
11+
std::unique_lock<std::mutex> lock(_slot_mutex);
12+
_slots.emplace_back(); // Allocate first chunk
13+
clear();
2014
}
2115

2216
ThreadFilter::~ThreadFilter() {
23-
std::lock_guard<std::mutex> lock(slot_mutex);
17+
std::unique_lock<std::mutex> lock(_slot_mutex);
2418
_slots.clear();
25-
_free_list.clear(); // todo: implement this, not needed for benchmark
2619
}
2720

2821
ThreadFilter::SlotID ThreadFilter::registerThread() {
29-
int index = _next_index.fetch_add(1, std::memory_order_relaxed);
30-
if (index < static_cast<int>(_slots.size())) {
31-
return index;
22+
SlotID index = _next_index.fetch_add(1, std::memory_order_relaxed);
23+
if (index >= kMaxThreads) {
24+
return -1;
3225
}
33-
// Lock required to safely grow the vector
26+
const int outer_idx = index >> kChunkShift;
3427
{
35-
std::lock_guard<std::mutex> lock(slot_mutex);
36-
size_t current_size = _slots.size();
37-
if (static_cast<size_t>(index) >= current_size) {
38-
_slots.resize(current_size * 2);
39-
// Initialize new slots
40-
for (size_t i = current_size; i < current_size * 2; ++i) {
41-
_slots[i].value.store(-1, std::memory_order_relaxed);
42-
}
28+
if (outer_idx < static_cast<int>(_slots.size())) {
29+
return index;
30+
}
31+
}
32+
33+
{
34+
std::unique_lock<std::mutex> write_lock(_slot_mutex);
35+
while (outer_idx >= static_cast<int>(_slots.size())) {
36+
_slots.emplace_back();
4337
}
4438
}
39+
4540
return index;
4641
}
4742

4843
void ThreadFilter::clear() {
49-
std::lock_guard<std::mutex> lock(slot_mutex);
50-
for (auto& slot : _slots) {
51-
slot.value.store(-1, std::memory_order_relaxed);
44+
for (auto& chunk : _slots) {
45+
for (auto& slot : chunk) {
46+
slot.value.store(-1, std::memory_order_relaxed);
47+
}
48+
}
49+
}
50+
51+
bool ThreadFilter::accept(SlotID slot_id) const {
52+
if (!_enabled) {
53+
return true;
5254
}
55+
if (slot_id < 0) return false;
56+
int outer_idx = slot_id >> kChunkShift;
57+
int inner_idx = slot_id & kChunkMask;
58+
if (outer_idx >= static_cast<int>(_slots.size())) return false;
59+
return _slots[outer_idx][inner_idx].value.load(std::memory_order_acquire) != -1;
5360
}
5461

55-
bool ThreadFilter::accept(int slot_id) const {
56-
if (!_enabled) return true;
57-
return slot_id >= 0 && slot_id < static_cast<int>(_slots.size()) && _slots[slot_id].value.load(std::memory_order_acquire) != -1;
62+
void ThreadFilter::add(int tid, SlotID slot_id) {
63+
int outer_idx = slot_id >> kChunkShift;
64+
int inner_idx = slot_id & kChunkMask;
65+
_slots[outer_idx][inner_idx].value.store(tid, std::memory_order_relaxed);
5866
}
5967

60-
void ThreadFilter::add(int tid, int slot_id) {
61-
_slots[slot_id].value.store(tid, std::memory_order_relaxed);
68+
void ThreadFilter::remove(SlotID slot_id) {
69+
int outer_idx = slot_id >> kChunkShift;
70+
int inner_idx = slot_id & kChunkMask;
71+
_slots[outer_idx][inner_idx].value.store(-1, std::memory_order_relaxed);
6272
}
6373

64-
void ThreadFilter::remove(int slot_id) {
65-
_slots[slot_id].value.store(-1, std::memory_order_relaxed);
74+
void ThreadFilter::unregisterThread(SlotID slot_id) {
75+
if (slot_id < 0) return;
76+
int outer_idx = slot_id >> kChunkShift;
77+
int inner_idx = slot_id & kChunkMask;
78+
_slots[outer_idx][inner_idx].value.store(-1, std::memory_order_relaxed);
6679
}
6780

6881
void ThreadFilter::collect(std::vector<int>& tids) const {
6982
tids.clear();
70-
std::lock_guard<std::mutex> lock(slot_mutex);
71-
for (const auto& slot : _slots) {
72-
int slot_tid = slot.value.load(std::memory_order_acquire);
73-
if (slot_tid != -1) {
74-
tids.push_back(slot_tid);
83+
// std::unique_lock<std::mutex> lock(_slot_mutex);
84+
for (const auto& chunk : _slots) {
85+
for (const auto& slot : chunk) {
86+
int slot_tid = slot.value.load(std::memory_order_acquire);
87+
if (slot_tid != -1) {
88+
tids.push_back(slot_tid);
89+
}
7590
}
7691
}
7792
}
7893

7994
void ThreadFilter::init(const char* filter) {
80-
if (filter == nullptr) {
95+
if (!filter) {
8196
return;
8297
}
83-
// char* end;
84-
// // todo understand this strange init
85-
// do {
86-
// int id = strtol(filter, &end, 0);
87-
// if (id <= 0) {
88-
// break;
89-
// }
90-
91-
// if (*end == '-') {
92-
// int to = strtol(end + 1, &end, 0);
93-
// while (id <= to) {
94-
// add(id++);
95-
// }
96-
// } else {
97-
// add(id);
98-
// }
99-
// filter = end + 1;
100-
// } while (*end);
98+
// TODO: Implement parsing of filter string if needed
10199
_enabled = true;
102100
}
103101

104102
bool ThreadFilter::enabled() const {
105103
return _enabled;
106104
}
107-
108-
// Implementation of unregisterThread - releases a slot by its ID
109-
void ThreadFilter::unregisterThread(SlotID slot_id) {
110-
if (slot_id >= 0 && slot_id < static_cast<int>(_slots.size())) {
111-
// Reset this slot to be available again
112-
_slots[slot_id].value.store(-1, std::memory_order_relaxed);
113-
}
114-
}

ddprof-lib/src/main/cpp/threadFilter.h

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,27 @@
22
#define _THREADFILTER_H
33

44
#include <atomic>
5+
#include <array>
56
#include <vector>
67
#include <cstdint>
8+
#include <mutex>
79

810
class ThreadFilter {
911
public:
1012
using SlotID = int;
11-
13+
static constexpr int kChunkSize = 128;
14+
static constexpr int kChunkShift = 7;
15+
static constexpr int kChunkMask = kChunkSize - 1;
16+
static constexpr int kMaxThreads = 2048;
1217
ThreadFilter();
1318
~ThreadFilter();
1419

1520
void init(const char* filter);
1621
void clear();
1722
bool enabled() const;
18-
bool accept(int slot_id) const;
19-
void add(int tid, int slot_id);
20-
void remove(int slot_id); // tid unused, for API consistency
23+
bool accept(SlotID slot_id) const;
24+
void add(int tid, SlotID slot_id);
25+
void remove(SlotID slot_id);
2126
void collect(std::vector<int>& tids) const;
2227

2328
SlotID registerThread();
@@ -27,15 +32,18 @@ class ThreadFilter {
2732
struct Slot {
2833
std::atomic<int> value{-1};
2934
Slot() = default;
30-
Slot(const Slot&o) { value.store(o.value.load(std::memory_order_relaxed), std::memory_order_relaxed); }
31-
Slot& operator=(const Slot& o) { value.store(o.value.load(std::memory_order_relaxed),
32-
std::memory_order_relaxed); return *this; }
35+
Slot(const Slot& o) { value.store(o.value.load(std::memory_order_relaxed), std::memory_order_relaxed); }
36+
Slot& operator=(const Slot& o) {
37+
value.store(o.value.load(std::memory_order_relaxed), std::memory_order_relaxed);
38+
return *this;
39+
}
3340
};
3441

3542
bool _enabled = false;
36-
std::vector<Slot> _slots;
37-
std::vector<Slot> _free_list;
38-
std::atomic<int> _next_index;
43+
std::vector<std::array<Slot, kChunkSize>> _slots;
44+
std::atomic<SlotID> _next_index;
45+
46+
mutable std::mutex _slot_mutex;
3947
};
4048

4149
#endif // _THREADFILTER_H

ddprof-lib/src/main/cpp/wallClock.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ void WallClockJVMTI::timerLoop() {
235235
}
236236

237237
void WallClockASGCT::timerLoop() {
238+
// todo: re-allocating the vector every time is not efficient
238239
auto collectThreads = [&](std::vector<int>& tids) {
239240
// Get thread IDs from the filter if it's enabled
240241
// Otherwise list all threads in the system

0 commit comments

Comments
 (0)