diff --git a/ddprof-lib/src/main/cpp/flightRecorder.cpp b/ddprof-lib/src/main/cpp/flightRecorder.cpp index 12472a29..ab233e64 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.cpp +++ b/ddprof-lib/src/main/cpp/flightRecorder.cpp @@ -318,7 +318,7 @@ char *Recording::_jvm_flags = NULL; char *Recording::_java_command = NULL; Recording::Recording(int fd, Arguments &args) - : _fd(fd), _thread_set(), _method_map() { + : _fd(fd), _method_map() { args.save(_args); _chunk_start = lseek(_fd, 0, SEEK_END); @@ -330,6 +330,8 @@ Recording::Recording(int fd, Arguments &args) _bytes_written = 0; _tid = OS::threadId(); + _active_index.store(0, std::memory_order_relaxed); + VM::jvmti()->GetAvailableProcessors(&_available_processors); writeHeader(_buf); @@ -1059,11 +1061,18 @@ void Recording::writeExecutionModes(Buffer *buf) { } void Recording::writeThreads(Buffer *buf) { - addThread(_tid); - std::vector threads; - threads.reserve(_thread_set.size()); - _thread_set.collect(threads); - _thread_set.clear(); + int old_index = _active_index.fetch_xor(1, std::memory_order_acq_rel); + // After flip: new samples go into the new active set + // We flush from old_index (the previous active set) + + std::unordered_set threads; + threads.insert(_tid); + + for (int i = 0; i < CONCURRENCY_LEVEL; ++i) { + // Collect thread IDs from the fixed-size table into the main set + _thread_ids[i][old_index].collect(threads); + _thread_ids[i][old_index].clear(); + } Profiler *profiler = Profiler::instance(); ThreadInfo *t_info = &profiler->_thread_info; @@ -1072,15 +1081,15 @@ void Recording::writeThreads(Buffer *buf) { buf->putVar64(T_THREAD); buf->putVar64(threads.size()); - for (int i = 0; i < threads.size(); i++) { + for (auto tid : threads) { const char *thread_name; jlong thread_id; - std::pair, u64> info = t_info->get(threads[i]); + std::pair, u64> info = t_info->get(tid); if (info.first) { thread_name = info.first->c_str(); thread_id = info.second; } else { - snprintf(name_buf, sizeof(name_buf), "[tid=%d]", threads[i]); + snprintf(name_buf, sizeof(name_buf), "[tid=%d]", tid); thread_name = name_buf; thread_id = 0; } @@ -1090,9 +1099,9 @@ void Recording::writeThreads(Buffer *buf) { (thread_id == 0 ? length + 1 : 2 * length) - 3 * 10; // 3x max varint length flushIfNeeded(buf, required); - buf->putVar64(threads[i]); + buf->putVar64(tid); buf->putUtf8(thread_name, length); - buf->putVar64(threads[i]); + buf->putVar64(tid); if (thread_id == 0) { buf->put8(0); } else { @@ -1442,7 +1451,11 @@ void Recording::recordCpuLoad(Buffer *buf, float proc_user, float proc_system, flushIfNeeded(buf); } -void Recording::addThread(int tid) { _thread_set.add(tid); } +// assumption is that we hold the lock (with lock_index) +void Recording::addThread(int lock_index, int tid) { + int active = _active_index.load(std::memory_order_acquire); + _thread_ids[lock_index][active].insert(tid); +} Error FlightRecorder::start(Arguments &args, bool reset) { const char *file = args.file(); @@ -1599,7 +1612,7 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u32 call_trace_id, break; } _rec->flushIfNeeded(buf); - _rec->addThread(tid); + _rec->addThread(lock_index, tid); } } diff --git a/ddprof-lib/src/main/cpp/flightRecorder.h b/ddprof-lib/src/main/cpp/flightRecorder.h index 22c4b3d3..81ed85bc 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.h +++ b/ddprof-lib/src/main/cpp/flightRecorder.h @@ -18,6 +18,7 @@ #define _FLIGHTRECORDER_H #include +#include #include #include @@ -34,6 +35,7 @@ #include "mutex.h" #include "objectSampler.h" #include "threadFilter.h" +#include "threadIdTable.h" #include "vmEntry.h" const u64 MAX_JLONG = 0x7fffffffffffffffULL; @@ -127,9 +129,13 @@ class Recording { static char *_java_command; RecordingBuffer _buf[CONCURRENCY_LEVEL]; + // we have several tables to avoid lock contention + // we have a second dimension to allow a switch in the active table + ThreadIdTable _thread_ids[CONCURRENCY_LEVEL][2]; + std::atomic _active_index{0}; // 0 or 1 globally + int _fd; off_t _chunk_start; - ThreadFilter _thread_set; MethodMap _method_map; Arguments _args; @@ -158,7 +164,7 @@ class Recording { public: Recording(int fd, Arguments &args); ~Recording(); - + void copyTo(int target_fd); off_t finishChunk(); @@ -258,7 +264,8 @@ class Recording { LockEvent *event); void recordCpuLoad(Buffer *buf, float proc_user, float proc_system, float machine_total); - void addThread(int tid); + + void addThread(int lock_index, int tid); }; class Lookup { diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 017f23c4..4ee835f0 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -123,19 +123,59 @@ Java_com_datadoghq_profiler_JavaProfiler_getSamples(JNIEnv *env, return (jlong)Profiler::instance()->total_samples(); } +// some duplication between add and remove, though we want to avoid having an extra branch in the hot path +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(JNIEnv *env, + jobject unused) { + ProfiledThread *current = ProfiledThread::current(); + int tid = current->tid(); + if (unlikely(tid < 0)) { + return; + } + ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); + int slot_id = current->filterSlotId(); + if (unlikely(slot_id == -1)) { + return; + } + thread_filter->add(tid, slot_id); +} + +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0(JNIEnv *env, + jobject unused) { + ProfiledThread *current = ProfiledThread::current(); + int tid = current->tid(); + if (unlikely(tid < 0)) { + return; + } + ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); + int slot_id = current->filterSlotId(); + if (unlikely(slot_id == -1)) { + return; + } + thread_filter->remove(slot_id); +} + +// Backward compatibility for existing code extern "C" DLLEXPORT void JNICALL Java_com_datadoghq_profiler_JavaProfiler_filterThread0(JNIEnv *env, jobject unused, jboolean enable) { - int tid = ProfiledThread::currentTid(); - if (tid < 0) { + ProfiledThread *current = ProfiledThread::current(); + int tid = current->tid(); + if (unlikely(tid < 0)) { return; } ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); + int slot_id = current->filterSlotId(); + if (unlikely(slot_id == -1)) { + return; + } + if (enable) { - thread_filter->add(tid); + thread_filter->add(tid, slot_id); } else { - thread_filter->remove(tid); + thread_filter->remove(slot_id); } } @@ -406,24 +446,3 @@ Java_com_datadoghq_profiler_JVMAccess_healthCheck0(JNIEnv *env, jobject unused) { return true; } - -extern "C" DLLEXPORT jlong JNICALL -Java_com_datadoghq_profiler_ActiveBitmap_bitmapAddressFor0(JNIEnv *env, - jclass unused, - jint tid) { - u64* bitmap = Profiler::instance()->threadFilter()->bitmapAddressFor((int)tid); - return (jlong)bitmap; -} - -extern "C" DLLEXPORT jboolean JNICALL -Java_com_datadoghq_profiler_ActiveBitmap_isActive0(JNIEnv *env, - jclass unused, - jint tid) { - return Profiler::instance()->threadFilter()->accept((int)tid) ? JNI_TRUE : JNI_FALSE; -} - -extern "C" DLLEXPORT jlong JNICALL -Java_com_datadoghq_profiler_ActiveBitmap_getActiveCountAddr0(JNIEnv *env, - jclass unused) { - return (jlong)Profiler::instance()->threadFilter()->addressOfSize(); -} diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index 4a76de0c..1f4c3aaa 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -104,10 +104,12 @@ void Profiler::addRuntimeStub(const void *address, int length, void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { ProfiledThread::initCurrentThread(); - - int tid = ProfiledThread::currentTid(); + ProfiledThread *current = ProfiledThread::current(); + int tid = current->tid(); if (_thread_filter.enabled()) { - _thread_filter.remove(tid); + int slot_id = _thread_filter.registerThread(); + current->setFilterSlotId(slot_id); + _thread_filter.remove(slot_id); // Remove from filtering initially } updateThreadName(jvmti, jni, thread, true); @@ -116,9 +118,12 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { } void Profiler::onThreadEnd(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { - int tid = ProfiledThread::currentTid(); + ProfiledThread *current = ProfiledThread::current(); + int slot_id = current->filterSlotId(); + int tid = current->tid(); if (_thread_filter.enabled()) { - _thread_filter.remove(tid); + _thread_filter.unregisterThread(slot_id); + current->setFilterSlotId(-1); } updateThreadName(jvmti, jni, thread, true); diff --git a/ddprof-lib/src/main/cpp/thread.h b/ddprof-lib/src/main/cpp/thread.h index 264ad6b8..bba5145a 100644 --- a/ddprof-lib/src/main/cpp/thread.h +++ b/ddprof-lib/src/main/cpp/thread.h @@ -43,11 +43,12 @@ class ProfiledThread : public ThreadLocalData { u32 _wall_epoch; u32 _call_trace_id; u32 _recording_epoch; + int _filter_slot_id; // Slot ID for thread filtering UnwindFailures _unwind_failures; ProfiledThread(int buffer_pos, int tid) : ThreadLocalData(), _pc(0), _span_id(0), _crash_depth(0), _buffer_pos(buffer_pos), _tid(tid), _cpu_epoch(0), - _wall_epoch(0), _call_trace_id(0), _recording_epoch(0) {}; + _wall_epoch(0), _call_trace_id(0), _recording_epoch(0), _filter_slot_id(-1) {}; void releaseFromBuffer(); @@ -120,6 +121,9 @@ class ProfiledThread : public ThreadLocalData { } static void signalHandler(int signo, siginfo_t *siginfo, void *ucontext); + + int filterSlotId() { return _filter_slot_id; } + void setFilterSlotId(int slotId) { _filter_slot_id = slotId; } }; #endif // _THREAD_H diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index 31713ae5..6f179519 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -1,175 +1,147 @@ -/* - * Copyright 2020 Andrei Pangin - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +// Copyright (C) Datadog 2025 +// Implementation of thread filter management #include "threadFilter.h" -#include "counters.h" -#include "os.h" -#include "reverse_bits.h" -#include -#include -#include - -void trackPage() { - Counters::increment(THREAD_FILTER_PAGES, 1); - Counters::increment(THREAD_FILTER_BYTES, BITMAP_SIZE); -} -ThreadFilter::ThreadFilter() { - _max_thread_id = OS::getMaxThreadId(128 * 1024); - _max_bitmaps = (_max_thread_id + BITMAP_SIZE - 1) / BITMAP_SIZE; - u32 capacity = _max_bitmaps * sizeof(u64 *); - _bitmap = (u64 **)OS::safeAlloc(capacity); - memset(_bitmap, 0, capacity); - _bitmap[0] = (u64 *)OS::safeAlloc(BITMAP_SIZE); - trackPage(); - _enabled = false; - _size = 0; +#include +#include + +ThreadFilter::ThreadFilter() + : _next_index(0), _enabled(false) { + std::unique_lock lock(_slot_mutex); + _slots.emplace_back(); // Allocate first chunk + clear(); } ThreadFilter::~ThreadFilter() { - for (int i = 0; i < _max_bitmaps; i++) { - if (_bitmap[i] != NULL) { - OS::safeFree(_bitmap[i], BITMAP_SIZE); - } - } - if (_bitmap) { - OS::safeFree(_bitmap, _max_bitmaps * sizeof(u64 *)); - } + std::unique_lock lock(_slot_mutex); + _slots.clear(); } -void ThreadFilter::init(const char *filter) { - if (filter == NULL) { - _enabled = false; - return; - } - - char *end; - do { - int id = strtol(filter, &end, 0); - if (id <= 0) { - break; +ThreadFilter::SlotID ThreadFilter::registerThread() { + int top = _free_list_top.load(std::memory_order_acquire); + for (int i = 0; i < top; ++i) { + int value = _free_list[i].load(std::memory_order_relaxed); + if (value >= 0) { + int expected = value; + if (_free_list[i].compare_exchange_strong(expected, -1, std::memory_order_acq_rel)) { + return value; // Successfully claimed a free slot + } + // If CAS fails, someone else claimed it, continue scanning + } } - if (*end == '-') { - int to = strtol(end + 1, &end, 0); - while (id <= to) { - add(id++); - } - } else { - add(id); + SlotID index = _next_index.fetch_add(1, std::memory_order_relaxed); + if (index >= kMaxThreads) { + return -1; } - filter = end + 1; - } while (*end); + const int outer_idx = index >> kChunkShift; + { + if (outer_idx < static_cast(_slots.size())) { + return index; + } + } - _enabled = true; + { + std::unique_lock write_lock(_slot_mutex); + while (outer_idx >= static_cast(_slots.size())) { + _slots.emplace_back(); + } + } + + return index; } void ThreadFilter::clear() { - for (int i = 0; i < _max_bitmaps; i++) { - if (_bitmap[i] != NULL) { - memset(_bitmap[i], 0, BITMAP_SIZE); + for (auto& chunk : _slots) { + for (auto& slot : chunk) { + slot.value.store(-1, std::memory_order_relaxed); + } } - } - _size = 0; -} - -// The mapping has to be reversible: f(f(x)) == x -int ThreadFilter::mapThreadId(int thread_id) { - // We want to map the thread_id inside the same bitmap - static_assert(BITMAP_SIZE >= (u16)0xffff, "Potential verflow"); - u16 lower16 = (u16)(thread_id & 0xffff); - lower16 = reverse16(lower16); - int tid = (thread_id & ~0xffff) | lower16; - return tid; + for (int i = 0; i < kFreeListSize; ++i) { + _free_list[i].store(-1, std::memory_order_relaxed); + } + _free_list_top.store(0, std::memory_order_relaxed); } -// Get bitmap that contains the thread id, create one if it does not exist -u64* ThreadFilter::getBitmapFor(int thread_id) { - int index = thread_id / BITMAP_CAPACITY; - assert(index >= 0 && index < (int)_max_bitmaps); - u64* b = _bitmap[index]; - if (b == NULL) { - b = (u64 *)OS::safeAlloc(BITMAP_SIZE); - u64 *oldb = __sync_val_compare_and_swap( - &_bitmap[index], NULL, b); - if (oldb != NULL) { - OS::safeFree(b, BITMAP_SIZE); - b = oldb; - } else { - trackPage(); +bool ThreadFilter::accept(SlotID slot_id) const { + if (!_enabled) { + return true; } - } - return b; + if (slot_id < 0) return false; + int outer_idx = slot_id >> kChunkShift; + int inner_idx = slot_id & kChunkMask; + if (outer_idx >= static_cast(_slots.size())) return false; + return _slots[outer_idx][inner_idx].value.load(std::memory_order_acquire) != -1; } -u64* ThreadFilter::bitmapAddressFor(int thread_id) { - u64* b = getBitmapFor(thread_id); - thread_id = mapThreadId(thread_id); - assert(b == bitmap(thread_id)); - return wordAddress(b, thread_id); +void ThreadFilter::add(int tid, SlotID slot_id) { + int outer_idx = slot_id >> kChunkShift; + int inner_idx = slot_id & kChunkMask; + _slots[outer_idx][inner_idx].value.store(tid, std::memory_order_relaxed); } -bool ThreadFilter::accept(int thread_id) { - thread_id = mapThreadId(thread_id); - u64 *b = bitmap(thread_id); - return b != NULL && (word(b, thread_id) & (1ULL << (thread_id & 0x3f))); +void ThreadFilter::remove(SlotID slot_id) { + int outer_idx = slot_id >> kChunkShift; + int inner_idx = slot_id & kChunkMask; + _slots[outer_idx][inner_idx].value.store(-1, std::memory_order_relaxed); } -void ThreadFilter::add(int thread_id) { - u64 *b = getBitmapFor(thread_id); - assert (b != NULL); - thread_id = mapThreadId(thread_id); - assert(b == bitmap(thread_id)); - u64 bit = 1ULL << (thread_id & 0x3f); - if (!(__sync_fetch_and_or(&word(b, thread_id), bit) & bit)) { - atomicInc(_size); - } -} +void ThreadFilter::unregisterThread(SlotID slot_id) { + if (slot_id < 0) return; + + int outer_idx = slot_id >> kChunkShift; + int inner_idx = slot_id & kChunkMask; + _slots[outer_idx][inner_idx].value.store(-1, std::memory_order_relaxed); + + constexpr int try_limit = 16; + + int top = _free_list_top.load(std::memory_order_acquire); + int limit = top < kFreeListSize ? top : kFreeListSize; + int tries = 0; + + for (int i = 0; i < limit && tries < try_limit; ++i) { + int value = _free_list[i].load(std::memory_order_relaxed); + if (value == -1) { + int expected = -1; + if (_free_list[i].compare_exchange_strong(expected, slot_id, std::memory_order_acq_rel)) { + return; // Successfully claimed empty spot + } + ++tries; // Only count actual CAS attempts + } + } -void ThreadFilter::remove(int thread_id) { - thread_id = mapThreadId(thread_id); - u64 *b = bitmap(thread_id); - if (b == NULL) { - return; - } - - u64 bit = 1ULL << (thread_id & 0x3f); - if (__sync_fetch_and_and(&word(b, thread_id), ~bit) & bit) { - atomicInc(_size, -1); - } + // Fallback: append if no empty slot found + int pos = _free_list_top.fetch_add(1, std::memory_order_acq_rel); + if (pos < kFreeListSize) { + _free_list[pos].store(slot_id, std::memory_order_release); + } else { + // Free list overflow: ignore + } } -void ThreadFilter::collect(std::vector &v) { - for (int i = 0; i < _max_bitmaps; i++) { - u64 *b = _bitmap[i]; - if (b != NULL) { - int start_id = i * BITMAP_CAPACITY; - for (int j = 0; j < BITMAP_SIZE / sizeof(u64); j++) { - // Considering the functional impact, relaxed could be a reasonable - // order here - u64 word = __atomic_load_n(&b[j], __ATOMIC_ACQUIRE); - while (word != 0) { - int tid = start_id + j * 64 + __builtin_ctzl(word); - // restore thread id - tid = mapThreadId(tid); - v.push_back(tid); - word &= (word - 1); +void ThreadFilter::collect(std::vector& tids) const { + tids.clear(); + // std::unique_lock lock(_slot_mutex); + for (const auto& chunk : _slots) { + for (const auto& slot : chunk) { + int slot_tid = slot.value.load(std::memory_order_acquire); + if (slot_tid != -1) { + tids.push_back(slot_tid); + } } - } } - } +} + +void ThreadFilter::init(const char* filter) { + if (!filter) { + return; + } + // TODO: Implement parsing of filter string if needed + _enabled = true; +} + +bool ThreadFilter::enabled() const { + return _enabled; } diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index 8db82f9f..24cb741e 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -1,81 +1,53 @@ -/* - * Copyright 2020 Andrei Pangin - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - #ifndef _THREADFILTER_H #define _THREADFILTER_H -#include "arch_dd.h" +#include +#include #include +#include +#include -// The size of thread ID bitmap in bytes. Must be at least 64K to allow mmap() -const u32 BITMAP_SIZE = 65536; -// How many thread IDs one bitmap can hold -const u32 BITMAP_CAPACITY = BITMAP_SIZE * 8; - -// ThreadFilter query operations must be lock-free and signal-safe; -// update operations are mostly lock-free, except rare bitmap allocations class ThreadFilter { -private: - // Total number of bitmaps required to hold the entire range of thread IDs - u32 _max_thread_id; - u32 _max_bitmaps; - u64 **_bitmap; - bool _enabled; - volatile int _size; - - u64 *bitmap(int thread_id) { - if (thread_id >= _max_thread_id) { - return NULL; - } - return __atomic_load_n( - &(_bitmap[static_cast(thread_id) / BITMAP_CAPACITY]), - __ATOMIC_ACQUIRE); - } - - static int mapThreadId(int thread_id); - - u64 &word(u64 *bitmap, int thread_id) { - // todo: add thread safe APIs - return bitmap[((u32)thread_id % BITMAP_CAPACITY) >> 6]; - } - - u64* const wordAddress(u64 *bitmap, int thread_id) const { - return &bitmap[((u32)thread_id % BITMAP_CAPACITY) >> 6]; - } - - u64* getBitmapFor(int thread_id); public: - ThreadFilter(); - ThreadFilter(ThreadFilter &threadFilter) = delete; - ~ThreadFilter(); - - bool enabled() const { return _enabled; } - - int size() const { return _size; } - const volatile int* addressOfSize() const { return &_size; } + using SlotID = int; + static constexpr int kChunkSize = 128; + static constexpr int kChunkShift = 7; + static constexpr int kChunkMask = kChunkSize - 1; + static constexpr int kMaxThreads = 2048; + ThreadFilter(); + ~ThreadFilter(); + + void init(const char* filter); + void clear(); + bool enabled() const; + bool accept(SlotID slot_id) const; + void add(int tid, SlotID slot_id); + void remove(SlotID slot_id); + void collect(std::vector& tids) const; + + SlotID registerThread(); + void unregisterThread(SlotID slot_id); - void init(const char *filter); - void clear(); - - bool accept(int thread_id); - void add(int thread_id); - void remove(int thread_id); - u64* bitmapAddressFor(int thread_id); - - void collect(std::vector &v); +private: + struct Slot { + std::atomic value{-1}; + Slot() = default; + Slot(const Slot& o) { value.store(o.value.load(std::memory_order_relaxed), std::memory_order_relaxed); } + Slot& operator=(const Slot& o) { + value.store(o.value.load(std::memory_order_relaxed), std::memory_order_relaxed); + return *this; + } + }; + + bool _enabled = false; + std::vector> _slots; + std::atomic _next_index; + + static constexpr int kFreeListSize = 128; + std::atomic _free_list[kFreeListSize]; + std::atomic _free_list_top; // Points to next free slot + + mutable std::mutex _slot_mutex; }; #endif // _THREADFILTER_H diff --git a/ddprof-lib/src/main/cpp/threadIdTable.h b/ddprof-lib/src/main/cpp/threadIdTable.h new file mode 100644 index 00000000..9f62331d --- /dev/null +++ b/ddprof-lib/src/main/cpp/threadIdTable.h @@ -0,0 +1,70 @@ +/* + * Copyright The async-profiler authors + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2021, 2025 Datadog, Inc + */ + +#ifndef _THREADIDTABLE_H +#define _THREADIDTABLE_H + +#include +#include + +// Signal-safe thread ID table with fixed size +class ThreadIdTable { +private: + static const int TABLE_SIZE = 256; // Should handle most realistic thread counts + std::atomic table[TABLE_SIZE]; + + int hash(int tid) const { + // Simple hash function - could be improved if needed + return tid % TABLE_SIZE; + } + +public: + ThreadIdTable() { + clear(); + } + + // Signal-safe insertion using atomic operations only + void insert(int tid) { + if (tid == 0) return; // Invalid thread ID, 0 is reserved for empty slots + + int start_slot = hash(tid); + for (int probe = 0; probe < TABLE_SIZE; probe++) { + int slot = (start_slot + probe) % TABLE_SIZE; + int expected = 0; + + // Try to claim empty slot + if (table[slot].compare_exchange_strong(expected, tid, std::memory_order_relaxed)) { + return; // Successfully inserted + } + + // Check if already present + if (table[slot].load(std::memory_order_relaxed) == tid) { + return; // Already exists + } + } + // Table full - thread ID will be lost, but this is rare and non-critical + // Could increment a counter here for diagnostics if needed + } + + // Clear the table (not signal-safe, called during buffer switch) + void clear() { + for (int i = 0; i < TABLE_SIZE; i++) { + table[i].store(0, std::memory_order_relaxed); + } + } + + // Collect all thread IDs into a set (not signal-safe, called during buffer switch) + void collect(std::unordered_set& result) { + for (int i = 0; i < TABLE_SIZE; i++) { + int tid = table[i].load(std::memory_order_relaxed); + if (tid != 0) { + result.insert(tid); + } + } + } +}; + +#endif // _THREADIDTABLE_H \ No newline at end of file diff --git a/ddprof-lib/src/main/cpp/wallClock.cpp b/ddprof-lib/src/main/cpp/wallClock.cpp index d703b81d..6f6a5c9a 100644 --- a/ddprof-lib/src/main/cpp/wallClock.cpp +++ b/ddprof-lib/src/main/cpp/wallClock.cpp @@ -26,6 +26,7 @@ #include "vmStructs_dd.h" #include #include +#include // For std::sort and std::binary_search std::atomic BaseWallClock::_enabled{false}; @@ -174,6 +175,14 @@ void WallClockJVMTI::timerLoop() { bool do_filter = Profiler::instance()->threadFilter()->enabled(); int self = OS::threadId(); + // If filtering is enabled, collect the filtered TIDs first + std::vector filtered_tids; + if (do_filter) { + Profiler::instance()->threadFilter()->collect(filtered_tids); + // Sort the TIDs for efficient lookup + std::sort(filtered_tids.begin(), filtered_tids.end()); + } + for (int i = 0; i < threads_count; i++) { jthread thread = threads_ptr[i]; if (thread != nullptr) { @@ -182,7 +191,9 @@ void WallClockJVMTI::timerLoop() { continue; } int tid = nThread->osThreadId(); - if (tid != self && (!do_filter || Profiler::instance()->threadFilter()->accept(tid))) { + if (tid != self && (!do_filter || + // Use binary search to efficiently find if tid is in filtered_tids + std::binary_search(filtered_tids.begin(), filtered_tids.end(), tid))) { threads.push_back({nThread, thread}); } } @@ -224,13 +235,17 @@ void WallClockJVMTI::timerLoop() { } void WallClockASGCT::timerLoop() { + // todo: re-allocating the vector every time is not efficient auto collectThreads = [&](std::vector& tids) { + // Get thread IDs from the filter if it's enabled + // Otherwise list all threads in the system if (Profiler::instance()->threadFilter()->enabled()) { Profiler::instance()->threadFilter()->collect(tids); } else { ThreadList *thread_list = OS::listThreads(); int tid = thread_list->next(); while (tid != -1) { + // Don't include the current thread if (tid != OS::threadId()) { tids.push_back(tid); } diff --git a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java index 851c4baa..e3ea0662 100644 --- a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java +++ b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java @@ -108,7 +108,6 @@ public static synchronized JavaProfiler getInstance(String libLocation, String s throw new IOException("Failed to load Datadog Java profiler library", result.error); } init0(); - ActiveBitmap.initialize(); profiler.initializeContextStorage(); instance = profiler; @@ -209,11 +208,7 @@ public boolean recordTraceRoot(long rootSpanId, String endpoint, int sizeLimit) * 'filter' option must be enabled to use this method. */ public void addThread() { - if (UNSAFE != null) { - ActiveBitmap.setActive(TID.get(), true); - } else { - filterThread0(true); - } + filterThreadAdd0(); } /** @@ -221,14 +216,9 @@ public void addThread() { * 'filter' option must be enabled to use this method. */ public void removeThread() { - if (UNSAFE != null) { - ActiveBitmap.setActive(TID.get(), false); - } else { - filterThread0(false); - } + filterThreadRemove0(); } - /** * Passing context identifier to a profiler. This ID is thread-local and is dumped in * the JFR output only. 0 is a reserved value for "no-context". @@ -478,6 +468,10 @@ public Map getDebugCounters() { private static native boolean init0(); private native void stop0() throws IllegalStateException; private native String execute0(String command) throws IllegalArgumentException, IllegalStateException, IOException; + + private native void filterThreadAdd0(); + private native void filterThreadRemove0(); + // Backward compatibility for existing code private native void filterThread0(boolean enable); private static native int getTid0(); diff --git a/ddprof-lib/src/test/cpp/ddprof_ut.cpp b/ddprof-lib/src/test/cpp/ddprof_ut.cpp index 6c5c8b93..88d2e657 100644 --- a/ddprof-lib/src/test/cpp/ddprof_ut.cpp +++ b/ddprof-lib/src/test/cpp/ddprof_ut.cpp @@ -14,6 +14,9 @@ #include #include #include + #include // For std::sort + #include + #include ssize_t callback(char* ptr, int len) { return len; @@ -121,31 +124,109 @@ } TEST(ThreadFilter, testThreadFilter) { - int maxTid = OS::getMaxThreadId(); ThreadFilter filter; filter.init(""); ASSERT_TRUE(filter.enabled()); - EXPECT_EQ(0, filter.size()); - // increase step gradually to create different bit densities - int step = 1; - int size = 0; - for (int tid = 0; tid < maxTid - step - 1; tid += step, size++) { - EXPECT_FALSE(filter.accept(tid)); - filter.add(tid); - EXPECT_TRUE(filter.accept(tid)); - step++; + + const int num_threads = 10; + const int num_ops = 100; + std::vector threads; + std::atomic completed_threads{0}; + + // Each thread will add and remove its own thread ID multiple times + for (int i = 1; i <= num_threads; i++) { + threads.emplace_back([&filter, i, &completed_threads]() { + for (int j = 0; j < num_ops; j++) { + // Register a new slot for this thread + int slot_id = filter.registerThread(); + + // Add thread ID to slot + filter.add(i, slot_id); + bool accepted = filter.accept(slot_id); + if (!accepted) { + fprintf(stderr, "FAIL: Thread %d, op %d, slot %d: accept(slot=%d) returned false after add\n", + i, j, slot_id, slot_id); + } + EXPECT_TRUE(accepted); + + // Remove thread ID + filter.remove(slot_id); + accepted = filter.accept(slot_id); + if (accepted) { + fprintf(stderr, "FAIL: Thread %d, op %d, slot %d: accept(slot=%d) returned true after remove\n", + i, j, slot_id, slot_id); + } + EXPECT_FALSE(accepted); + } + completed_threads++; + }); + } + + // Wait for all threads to complete + for (auto& t : threads) { + t.join(); } - ASSERT_EQ(size, filter.size()); + + // Verify all threads completed + ASSERT_EQ(completed_threads.load(), num_threads); + + // Collect and verify all thread IDs were properly removed std::vector tids; - tids.reserve(size); filter.collect(tids); - ASSERT_EQ(size, tids.size()); - for (int tid : tids) { - ASSERT_TRUE(filter.accept(tid)); - filter.remove(tid); - ASSERT_FALSE(filter.accept(tid)); + ASSERT_EQ(tids.size(), 0); + } + + TEST(ThreadFilter, testThreadFilterCollect) { + ThreadFilter filter; + filter.init(""); + ASSERT_TRUE(filter.enabled()); + + const int num_threads = 10; + std::vector threads; + std::atomic completed_threads{0}; + std::vector expected_tids; + std::vector slots(num_threads); // Track slot IDs + + // Pre-register slots for each thread + for (int i = 0; i < num_threads; i++) { + slots[i] = filter.registerThread(); + } + + // Each thread will add its thread ID + for (int i = 1; i <= num_threads; i++) { + expected_tids.push_back(i); + int slot_id = slots[i-1]; // Use the pre-registered slot + + threads.emplace_back([&filter, i, slot_id, &completed_threads]() { + filter.add(i, slot_id); + EXPECT_TRUE(filter.accept(slot_id)); + completed_threads++; + }); + } + + // Wait for all threads to complete + for (auto& t : threads) { + t.join(); + } + + // Verify all threads completed + ASSERT_EQ(completed_threads.load(), num_threads); + + // Collect and verify all thread IDs are present + std::vector collected_tids; + filter.collect(collected_tids); + + // Sort both vectors for comparison + std::sort(expected_tids.begin(), expected_tids.end()); + std::sort(collected_tids.begin(), collected_tids.end()); + + ASSERT_EQ(expected_tids.size(), collected_tids.size()); + for (size_t i = 0; i < expected_tids.size(); i++) { + EXPECT_EQ(expected_tids[i], collected_tids[i]) + << "Mismatch at index " << i + << ": expected " << expected_tids[i] + << ", got " << collected_tids[i]; } - EXPECT_EQ(0, filter.size()); } TEST(ThreadInfoTest, testThreadInfoCleanupAllDead) { diff --git a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java index b0ec1dae..cd933fdf 100644 --- a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java +++ b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java @@ -16,7 +16,10 @@ @State(Scope.Benchmark) public class ThreadFilterBenchmark extends Configuration { - private static final int NUM_THREADS = 4; + @Param({"true", "false"}) // Parameterize the filter usage + public boolean useThreadFilters; + + private static final int NUM_THREADS = 15; private ExecutorService executorService; private JavaProfiler profiler; private AtomicBoolean running; @@ -31,13 +34,13 @@ public class ThreadFilterBenchmark extends Configuration { private static final AtomicIntegerArray atomicArray = new AtomicIntegerArray(ARRAY_SIZE); private static final int CACHE_LINE_SIZE = 64; // Typical cache line size private static final int STRIDE = CACHE_LINE_SIZE / Integer.BYTES; // Elements per cache line - private boolean useThreadFilters = true; // Flag to control the use of thread filters private AtomicLong addThreadCount = new AtomicLong(0); private AtomicLong removeThreadCount = new AtomicLong(0); @Setup(Level.Trial) public void setup() throws IOException { System.out.println("Setting up benchmark..."); + System.out.println("Thread filters enabled: " + useThreadFilters); System.out.println("Creating thread pool with " + NUM_THREADS + " threads"); executorService = Executors.newFixedThreadPool(NUM_THREADS); System.out.println("Getting profiler instance"); @@ -54,6 +57,7 @@ public void setup() throws IOException { System.out.println("Starting profiler with " + config); profiler.execute(config); System.out.println("Started profiler with output file"); + running = new AtomicBoolean(true); operationCount = new AtomicLong(0); startTime = System.currentTimeMillis(); @@ -107,6 +111,7 @@ public void tearDown() { // Stop the profiler if it's active try { profiler.stop(); + System.out.println("Profiler stopped."); } catch (IllegalStateException e) { System.out.println("Profiler was not active at teardown."); } @@ -147,7 +152,7 @@ public void setUseThreadFilters(boolean useThreadFilters) { @Benchmark @BenchmarkMode(Mode.Throughput) - @Fork(value = 1, warmups = 0) + @Fork(value = 1, warmups = 1) @Warmup(iterations = 1, time = 1) @Measurement(iterations = 1, time = 2) @Threads(1) @@ -157,14 +162,13 @@ public long threadFilterStress() throws InterruptedException { startLatch = new CountDownLatch(NUM_THREADS); stopLatch = new CountDownLatch(NUM_THREADS); - // Start all worker threads + // Start all worker threads[] for (int i = 0; i < NUM_THREADS; i++) { final int threadId = i; executorService.submit(() -> { try { startLatch.countDown(); startLatch.await(30, TimeUnit.SECONDS); - String startMsg = String.format("Thread %d started%n", threadId); System.out.print(startMsg); if (logWriter != null) { @@ -224,14 +228,14 @@ public long threadFilterStress() throws InterruptedException { operationCount.incrementAndGet(); } - if (operationCount.get() % 1000 == 0) { - String progressMsg = String.format("Thread %d completed %d operations%n", threadId, operationCount.get()); - System.out.print(progressMsg); - if (logWriter != null) { - logWriter.print(progressMsg); - logWriter.flush(); - } - } + // if (operationCount.get() % 1000 == 0) { + // String progressMsg = String.format("Thread %d completed %d operations%n", threadId, operationCount.get()); + // System.out.print(progressMsg); + // if (logWriter != null) { + // logWriter.print(progressMsg); + // logWriter.flush(); + // } + // } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -244,4 +248,4 @@ public long threadFilterStress() throws InterruptedException { stopLatch.await(); return operationCount.get(); } -} +} \ No newline at end of file