Skip to content

Commit e0ac246

Browse files
committed
Exterminate the last remnants of false sharing
1 parent 2b8ebde commit e0ac246

File tree

5 files changed

+151
-132
lines changed

5 files changed

+151
-132
lines changed

ddprof-lib/src/main/cpp/threadFilter.cpp

Lines changed: 64 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@
88
#include <algorithm>
99
#include <cstring>
1010

11+
ThreadFilter::ShardHead ThreadFilter::_free_heads[ThreadFilter::kShardCount] {};
12+
1113
ThreadFilter::ThreadFilter() : _enabled(false) {
1214
// Initialize chunk pointers to null (lazy allocation)
1315
for (int i = 0; i < kMaxChunks; ++i) {
1416
_chunks[i].store(nullptr, std::memory_order_relaxed);
1517
}
1618
_free_list = std::make_unique<FreeListNode[]>(kFreeListSize);
17-
19+
1820
// Initialize the first chunk
1921
initializeChunk(0);
2022
clear();
@@ -30,14 +32,14 @@ ThreadFilter::~ThreadFilter() {
3032

3133
void ThreadFilter::initializeChunk(int chunk_idx) {
3234
if (chunk_idx >= kMaxChunks) return;
33-
35+
3436
// Check if chunk already exists
3537
ChunkStorage* existing = _chunks[chunk_idx].load(std::memory_order_acquire);
3638
if (existing != nullptr) return;
37-
39+
3840
// Allocate new chunk
3941
ChunkStorage* new_chunk = new ChunkStorage();
40-
42+
4143
// Try to install it atomically
4244
ChunkStorage* expected = nullptr;
4345
if (_chunks[chunk_idx].compare_exchange_strong(expected, new_chunk, std::memory_order_acq_rel)) {
@@ -68,24 +70,24 @@ ThreadFilter::SlotID ThreadFilter::registerThread() {
6870
}
6971

7072
const int chunk_idx = index >> kChunkShift;
71-
73+
7274
// Ensure the chunk is initialized (lock-free)
7375
if (chunk_idx >= _num_chunks.load(std::memory_order_acquire)) {
7476
// Update the chunk count atomically
7577
int expected_chunks = chunk_idx;
7678
int desired_chunks = chunk_idx + 1;
77-
while (!_num_chunks.compare_exchange_weak(expected_chunks, desired_chunks,
79+
while (!_num_chunks.compare_exchange_weak(expected_chunks, desired_chunks,
7880
std::memory_order_acq_rel)) {
7981
if (expected_chunks > chunk_idx) {
8082
break; // Another thread already updated it
8183
}
8284
desired_chunks = expected_chunks + 1;
8385
}
8486
}
85-
87+
8688
// Initialize the chunk if needed
8789
initializeChunk(chunk_idx);
88-
90+
8991
return index;
9092
}
9193

@@ -100,64 +102,61 @@ void ThreadFilter::clear() {
100102
}
101103
}
102104
}
103-
105+
104106
// Clear the free list
105107
for (int i = 0; i < kFreeListSize; ++i) {
106108
_free_list[i].value.store(-1, std::memory_order_relaxed);
107109
_free_list[i].next.store(-1, std::memory_order_relaxed);
108110
}
109-
_free_list_head.store(-1, std::memory_order_relaxed);
110-
_active_slots.store(0, std::memory_order_relaxed);
111+
112+
// Reset the free heads for each shard
113+
for (int s = 0; s < kShardCount; ++s) {
114+
_free_heads[s].head.store(-1, std::memory_order_relaxed);
115+
}
111116
}
112117

113118
bool ThreadFilter::accept(SlotID slot_id) const {
114119
if (!_enabled) {
115120
return true;
116121
}
117122
if (slot_id < 0) return false;
118-
123+
119124
int chunk_idx = slot_id >> kChunkShift;
120125
int slot_idx = slot_id & kChunkMask;
121-
126+
122127
if (chunk_idx >= kMaxChunks) return false;
123128
ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed);
124129
if (chunk == nullptr) return false; // Fail-fast if not allocated
125-
130+
126131
return chunk->slots[slot_idx].value.load(std::memory_order_acquire) != -1;
127132
}
128133

129134
void ThreadFilter::add(int tid, SlotID slot_id) {
130135
if (slot_id < 0) return;
131-
136+
132137
int chunk_idx = slot_id >> kChunkShift;
133138
int slot_idx = slot_id & kChunkMask;
134-
139+
135140
if (chunk_idx >= kMaxChunks) return;
136141
ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed);
137142
if (chunk == nullptr) return; // Fail-fast if not allocated
138-
139-
// Store the tid and increment active slots if this was previously empty
140-
int old_value = chunk->slots[slot_idx].value.exchange(tid, std::memory_order_acq_rel);
141-
if (old_value == -1) {
142-
_active_slots.fetch_add(1, std::memory_order_relaxed);
143-
}
143+
144+
// Store the tid
145+
chunk->slots[slot_idx].value.store(tid, std::memory_order_release);
144146
}
145147

146148
void ThreadFilter::remove(SlotID slot_id) {
147149
if (slot_id < 0) return;
148-
150+
149151
int chunk_idx = slot_id >> kChunkShift;
150152
int slot_idx = slot_id & kChunkMask;
151-
153+
152154
if (chunk_idx >= kMaxChunks) return;
153155
ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed);
154156
if (chunk == nullptr) return; // Fail-fast if not allocated
155-
156-
// Remove the tid and decrement active slots if this was previously occupied
157-
int old_value = chunk->slots[slot_idx].value.exchange(-1, std::memory_order_acq_rel);
158-
if (old_value != -1) {
159-
_active_slots.fetch_sub(1, std::memory_order_relaxed);
160-
}
157+
158+
// Remove the tid
159+
chunk->slots[slot_idx].value.store(-1, std::memory_order_acq_rel);
161160
}
162161

163162
void ThreadFilter::unregisterThread(SlotID slot_id) {
@@ -171,54 +170,60 @@ void ThreadFilter::unregisterThread(SlotID slot_id) {
171170
}
172171

173172
bool ThreadFilter::pushToFreeList(SlotID slot_id) {
174-
// Lock-free Treiber stack push
173+
// Lock-free sharded Treiber stack push
174+
const int shard = shardOfSlot(slot_id);
175+
auto& head = _free_heads[shard].head; // private cache-line
176+
175177
for (int i = 0; i < kFreeListSize; ++i) {
176178
int expected = -1;
177-
if (_free_list[i].value.compare_exchange_strong(expected, slot_id, std::memory_order_acq_rel)) {
178-
// Successfully stored in this slot
179-
int old_head = _free_list_head.load(std::memory_order_acquire);
179+
if (_free_list[i].value.compare_exchange_strong(
180+
expected, slot_id, std::memory_order_acq_rel)) {
181+
// Link node into this shard’s Treiber stack
182+
int old_head = head.load(std::memory_order_acquire);
180183
do {
181184
_free_list[i].next.store(old_head, std::memory_order_relaxed);
182-
} while (!_free_list_head.compare_exchange_weak(old_head, i, std::memory_order_acq_rel));
185+
} while (!head.compare_exchange_weak(old_head, i,
186+
std::memory_order_acq_rel, std::memory_order_relaxed));
183187
return true;
184188
}
185189
}
186190
return false; // Free list full, slot is lost but this is rare
187191
}
188192

189193
ThreadFilter::SlotID ThreadFilter::popFromFreeList() {
190-
// Lock-free Treiber stack pop
191-
while (true) {
192-
int head = _free_list_head.load(std::memory_order_acquire);
193-
if (head == -1) {
194-
return -1; // Empty list
195-
}
196-
197-
int slot_id = _free_list[head].value.load(std::memory_order_acquire);
198-
int next = _free_list[head].next.load(std::memory_order_acquire);
199-
200-
// Try to update the head
201-
if (_free_list_head.compare_exchange_weak(head, next, std::memory_order_acq_rel)) {
202-
// Clear the node
203-
_free_list[head].value.store(-1, std::memory_order_relaxed);
204-
_free_list[head].next.store(-1, std::memory_order_relaxed);
205-
return slot_id;
194+
// Lock-free sharded Treiber stack pop
195+
int hash = static_cast<int>(std::hash<std::thread::id>{}(std::this_thread::get_id()));
196+
int start = shardOf(hash);
197+
198+
for (int pass = 0; pass < kShardCount; ++pass) {
199+
int s = (start + pass) & (kShardCount - 1);
200+
auto& head = _free_heads[s].head;
201+
202+
while (true) {
203+
int node = head.load(std::memory_order_acquire);
204+
if (node == -1) break; // shard empty → try next
205+
206+
int next = _free_list[node].next.load(std::memory_order_relaxed);
207+
if (head.compare_exchange_weak(node, next,
208+
std::memory_order_acq_rel,
209+
std::memory_order_relaxed))
210+
{
211+
int id = _free_list[node].value.exchange(-1,
212+
std::memory_order_relaxed);
213+
_free_list[node].next.store(-1, std::memory_order_relaxed);
214+
return id;
215+
}
206216
}
207-
// Retry if another thread modified the head
208217
}
218+
return -1; // Empty list
209219
}
210220

211221
void ThreadFilter::collect(std::vector<int>& tids) const {
212222
tids.clear();
213223

214-
// Early exit if no active slots
215-
int active_count = _active_slots.load(std::memory_order_relaxed);
216-
if (active_count == 0) {
217-
return;
218-
}
219-
220224
// Reserve space for efficiency
221-
tids.reserve(active_count);
225+
// The eventual resize is not the bottleneck, so we reserve a reasonable size
226+
tids.reserve(512);
222227

223228
// Scan only initialized chunks
224229
int num_chunks = _num_chunks.load(std::memory_order_relaxed);

ddprof-lib/src/main/cpp/threadFilter.h

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ class ThreadFilter {
1717
static constexpr int kChunkMask = kChunkSize - 1;
1818
static constexpr int kMaxThreads = 2048;
1919
static constexpr int kMaxChunks = (kMaxThreads + kChunkSize - 1) / kChunkSize; // = 8 chunks
20-
20+
// High-performance free list using Treiber stack, 64 shards
21+
static constexpr int kFreeListSize = 1024;
22+
static constexpr int kShardCount = 64; // power-of-two
23+
2124
ThreadFilter();
2225
~ThreadFilter();
2326

@@ -52,21 +55,21 @@ class ThreadFilter {
5255
};
5356

5457
bool _enabled = false;
55-
58+
5659
// Lazily allocated storage for chunks
5760
std::atomic<ChunkStorage*> _chunks[kMaxChunks];
5861
std::atomic<int> _num_chunks{1};
59-
62+
6063
// Lock-free slot allocation
6164
std::atomic<SlotID> _next_index{0};
62-
63-
// High-performance free list using Treiber stack
64-
static constexpr int kFreeListSize = 1024; // Increased from 128
65+
6566
std::unique_ptr<FreeListNode[]> _free_list;
66-
std::atomic<int> _free_list_head{-1};
67-
68-
// Active slot tracking for efficient collect()
69-
std::atomic<int> _active_slots{0};
67+
68+
struct alignas(64) ShardHead { std::atomic<int> head{-1}; };
69+
static ShardHead _free_heads[kShardCount]; // one cache-line each
70+
71+
static inline int shardOf(int tid) { return tid & (kShardCount - 1); }
72+
static inline int shardOfSlot(int s){ return s & (kShardCount - 1); }
7073

7174
// Helper methods for lock-free operations
7275
void initializeChunk(int chunk_idx);

ddprof-lib/src/main/cpp/threadIdTable.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class ThreadIdTable {
3232

3333
int start_slot = hash(tid);
3434
for (int probe = 0; probe < TABLE_SIZE; probe++) {
35-
int slot = (start_slot + probe) % TABLE_SIZE;
35+
int slot = (start_slot + (probe * 7)) % TABLE_SIZE;
3636
int expected = 0;
3737

3838
// Try to claim empty slot

ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/Main.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.openjdk.jmh.runner.options.OptionsBuilder;
99
import org.openjdk.jmh.runner.options.TimeValue;
1010

11+
import java.util.Arrays;
1112
import java.util.Collection;
1213
import java.util.concurrent.TimeUnit;
1314

@@ -17,11 +18,9 @@ public class Main {
1718

1819
public static void main(String... args) throws Exception {
1920
String filter = "*";
20-
if (args.length == 1) {
21+
if (args.length >= 1) {
2122
filter = args[0];
22-
} else if (args.length > 1) {
23-
System.err.println("Usage: java -jar ddprof-stresstest.jar [scenario filter]");
24-
System.exit(1);
23+
args = Arrays.copyOfRange(args, 1, args.length);
2524
}
2625
CommandLineOptions commandLineOptions = new CommandLineOptions(args);
2726
Mode mode = Mode.AverageTime;

0 commit comments

Comments
 (0)