Skip to content

Commit d08dfea

Browse files
authored
Add ThreadFilterBenchmark (#228)
1 parent 996bcbf commit d08dfea

File tree

1 file changed

+247
-0
lines changed

1 file changed

+247
-0
lines changed
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
package com.datadoghq.profiler.stresstest.scenarios;
2+
3+
import com.datadoghq.profiler.JavaProfiler;
4+
import com.datadoghq.profiler.stresstest.Configuration;
5+
import org.openjdk.jmh.annotations.*;
6+
import org.openjdk.jmh.infra.Blackhole;
7+
8+
import java.io.FileWriter;
9+
import java.io.IOException;
10+
import java.io.PrintWriter;
11+
import java.util.concurrent.*;
12+
import java.util.concurrent.atomic.AtomicBoolean;
13+
import java.util.concurrent.atomic.AtomicLong;
14+
import java.util.concurrent.atomic.AtomicIntegerArray;
15+
16+
@State(Scope.Benchmark)
17+
public class ThreadFilterBenchmark extends Configuration {
18+
19+
private static final int NUM_THREADS = 4;
20+
private ExecutorService executorService;
21+
private JavaProfiler profiler;
22+
private AtomicBoolean running;
23+
private CountDownLatch startLatch;
24+
private CountDownLatch stopLatch;
25+
private AtomicLong operationCount;
26+
private long startTime;
27+
private long stopTime;
28+
private PrintWriter logWriter;
29+
private static final int ARRAY_SIZE = 1024; // Larger array to stress memory
30+
private static final int[] sharedArray = new int[ARRAY_SIZE];
31+
private static final AtomicIntegerArray atomicArray = new AtomicIntegerArray(ARRAY_SIZE);
32+
private static final int CACHE_LINE_SIZE = 64; // Typical cache line size
33+
private static final int STRIDE = CACHE_LINE_SIZE / Integer.BYTES; // Elements per cache line
34+
private boolean useThreadFilters = true; // Flag to control the use of thread filters
35+
private AtomicLong addThreadCount = new AtomicLong(0);
36+
private AtomicLong removeThreadCount = new AtomicLong(0);
37+
38+
@Setup(Level.Trial)
39+
public void setup() throws IOException {
40+
System.out.println("Setting up benchmark...");
41+
System.out.println("Creating thread pool with " + NUM_THREADS + " threads");
42+
executorService = Executors.newFixedThreadPool(NUM_THREADS);
43+
System.out.println("Getting profiler instance");
44+
profiler = JavaProfiler.getInstance();
45+
46+
// Stop the profiler if it's already running
47+
try {
48+
profiler.stop();
49+
} catch (IllegalStateException e) {
50+
System.out.println("Profiler was not active at setup.");
51+
}
52+
53+
String config = "start,wall=10ms,filter=1,file=/tmp/thread_filter_profile.jfr";
54+
System.out.println("Starting profiler with " + config);
55+
profiler.execute(config);
56+
System.out.println("Started profiler with output file");
57+
running = new AtomicBoolean(true);
58+
operationCount = new AtomicLong(0);
59+
startTime = System.currentTimeMillis();
60+
stopTime = startTime + 30000; // Run for 30 seconds
61+
System.out.println("Benchmark setup completed at " + startTime);
62+
63+
try {
64+
String logFile = "/tmp/thread_filter_benchmark.log";
65+
System.out.println("Attempting to create log file at: " + logFile);
66+
logWriter = new PrintWriter(new FileWriter(logFile));
67+
logWriter.printf("Benchmark started at %d%n", startTime);
68+
logWriter.flush();
69+
System.out.println("Successfully created and wrote to log file");
70+
} catch (IOException e) {
71+
System.err.println("Failed to create log file: " + e.getMessage());
72+
e.printStackTrace();
73+
throw e;
74+
}
75+
}
76+
77+
@TearDown(Level.Trial)
78+
public void tearDown() {
79+
System.out.println("Tearing down benchmark...");
80+
running.set(false);
81+
82+
// Wait for all threads to finish with a timeout
83+
try {
84+
if (stopLatch != null) {
85+
if (!stopLatch.await(30, TimeUnit.SECONDS)) {
86+
System.err.println("Warning: Some threads did not finish within timeout");
87+
}
88+
}
89+
} catch (InterruptedException e) {
90+
Thread.currentThread().interrupt();
91+
}
92+
93+
// Shutdown executor with timeout
94+
executorService.shutdown();
95+
try {
96+
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
97+
executorService.shutdownNow();
98+
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
99+
System.err.println("Warning: Executor did not terminate");
100+
}
101+
}
102+
} catch (InterruptedException e) {
103+
executorService.shutdownNow();
104+
Thread.currentThread().interrupt();
105+
}
106+
107+
// Stop the profiler if it's active
108+
try {
109+
profiler.stop();
110+
} catch (IllegalStateException e) {
111+
System.out.println("Profiler was not active at teardown.");
112+
}
113+
114+
long endTime = System.currentTimeMillis();
115+
long totalOps = operationCount.get();
116+
double durationSecs = (endTime - startTime) / 1000.0;
117+
double opsPerSec = totalOps / durationSecs;
118+
double addOpsPerSec = addThreadCount.get() / durationSecs;
119+
double removeOpsPerSec = removeThreadCount.get() / durationSecs;
120+
121+
String stats = String.format("Thread Filter Stats:%n" +
122+
"Total operations: %,d%n" +
123+
"Duration: %.2f seconds%n" +
124+
"Operations/second: %,.0f%n" +
125+
"Operations/second/thread: %,.0f%n" +
126+
"AddThread operations/second: %,.0f%n" +
127+
"RemoveThread operations/second: %,.0f%n",
128+
totalOps, durationSecs, opsPerSec, opsPerSec / NUM_THREADS, addOpsPerSec, removeOpsPerSec);
129+
130+
System.out.print(stats);
131+
if (logWriter != null) {
132+
try {
133+
logWriter.print(stats);
134+
logWriter.flush();
135+
logWriter.close();
136+
System.out.println("Successfully closed log file");
137+
} catch (Exception e) {
138+
System.err.println("Error closing log file: " + e.getMessage());
139+
e.printStackTrace();
140+
}
141+
}
142+
}
143+
144+
public void setUseThreadFilters(boolean useThreadFilters) {
145+
this.useThreadFilters = useThreadFilters;
146+
}
147+
148+
@Benchmark
149+
@BenchmarkMode(Mode.Throughput)
150+
@Fork(value = 1, warmups = 0)
151+
@Warmup(iterations = 1, time = 1)
152+
@Measurement(iterations = 1, time = 2)
153+
@Threads(1)
154+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
155+
public long threadFilterStress() throws InterruptedException {
156+
System.out.println("Starting benchmark iteration...");
157+
startLatch = new CountDownLatch(NUM_THREADS);
158+
stopLatch = new CountDownLatch(NUM_THREADS);
159+
160+
// Start all worker threads
161+
for (int i = 0; i < NUM_THREADS; i++) {
162+
final int threadId = i;
163+
executorService.submit(() -> {
164+
try {
165+
startLatch.countDown();
166+
startLatch.await(30, TimeUnit.SECONDS);
167+
168+
String startMsg = String.format("Thread %d started%n", threadId);
169+
System.out.print(startMsg);
170+
if (logWriter != null) {
171+
logWriter.print(startMsg);
172+
logWriter.flush();
173+
}
174+
175+
while (running.get() && System.currentTimeMillis() < stopTime) {
176+
// Memory-intensive operations that would be sensitive to false sharing
177+
for (int j = 0; j < ARRAY_SIZE; j += STRIDE) {
178+
if (useThreadFilters) {
179+
// Register thread at the start of each cache line operation
180+
profiler.addThread();
181+
addThreadCount.incrementAndGet();
182+
}
183+
184+
// Each thread writes to its own cache line
185+
int baseIndex = (threadId * STRIDE) % ARRAY_SIZE;
186+
for (int k = 0; k < STRIDE; k++) {
187+
int index = (baseIndex + k) % ARRAY_SIZE;
188+
// Write to shared array
189+
sharedArray[index] = threadId;
190+
// Read and modify
191+
int value = sharedArray[index] + 1;
192+
// Atomic operation
193+
atomicArray.set(index, value);
194+
}
195+
196+
if (useThreadFilters) {
197+
// Remove thread after cache line operation
198+
profiler.removeThread();
199+
removeThreadCount.incrementAndGet();
200+
}
201+
operationCount.incrementAndGet();
202+
}
203+
204+
// More memory operations with thread registration
205+
for (int j = 0; j < ARRAY_SIZE; j += STRIDE) {
206+
if (useThreadFilters) {
207+
// Register thread at the start of each cache line operation
208+
profiler.addThread();
209+
addThreadCount.incrementAndGet();
210+
}
211+
212+
int baseIndex = (threadId * STRIDE) % ARRAY_SIZE;
213+
for (int k = 0; k < STRIDE; k++) {
214+
int index = (baseIndex + k) % ARRAY_SIZE;
215+
int value = atomicArray.get(index);
216+
sharedArray[index] = value * 2;
217+
}
218+
219+
if (useThreadFilters) {
220+
// Remove thread after cache line operation
221+
profiler.removeThread();
222+
removeThreadCount.incrementAndGet();
223+
}
224+
operationCount.incrementAndGet();
225+
}
226+
227+
if (operationCount.get() % 1000 == 0) {
228+
String progressMsg = String.format("Thread %d completed %d operations%n", threadId, operationCount.get());
229+
System.out.print(progressMsg);
230+
if (logWriter != null) {
231+
logWriter.print(progressMsg);
232+
logWriter.flush();
233+
}
234+
}
235+
}
236+
} catch (InterruptedException e) {
237+
Thread.currentThread().interrupt();
238+
} finally {
239+
stopLatch.countDown();
240+
}
241+
});
242+
}
243+
244+
stopLatch.await();
245+
return operationCount.get();
246+
}
247+
}

0 commit comments

Comments
 (0)