From 3b91530d69bb0dd4f8d77a2153b6ede225590361 Mon Sep 17 00:00:00 2001 From: r1viollet Date: Fri, 21 Mar 2025 10:41:00 +0100 Subject: [PATCH 01/10] Bench - Add a benchmark for the thread filter mechanism --- .../scenarios/ThreadFilterBenchmark.java | 155 ++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java diff --git a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java new file mode 100644 index 000000000..26ce9298d --- /dev/null +++ b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java @@ -0,0 +1,155 @@ +package com.datadoghq.profiler.stresstest.scenarios; + +import com.datadoghq.profiler.JavaProfiler; +import com.datadoghq.profiler.stresstest.Configuration; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +@State(Scope.Benchmark) +public class ThreadFilterBenchmark extends Configuration { + + private static final int NUM_THREADS = 15; + private ExecutorService executorService; + private JavaProfiler profiler; + private AtomicBoolean running; + private volatile CountDownLatch startLatch; + private volatile CountDownLatch stopLatch; + private AtomicLong operationCount; + private long startTime; + private PrintWriter logWriter; + + @Setup(Level.Trial) + public void setup() throws IOException { + System.out.println("Setting up benchmark..."); + executorService = Executors.newFixedThreadPool(NUM_THREADS); + profiler = JavaProfiler.getInstance(); + // Enable thread filter and wall clock profiling to see the threads in profiles + profiler.execute("start,wall=1ms,filter=0"); + running = new AtomicBoolean(true); + operationCount = new AtomicLong(0); + startTime = System.currentTimeMillis(); + + // Setup logging to file + try { + String logFile = "/tmp/thread_filter_benchmark.log"; + System.out.println("Attempting to create log file at: " + logFile); + logWriter = new PrintWriter(new FileWriter(logFile)); + logWriter.printf("Benchmark started at %d%n", startTime); + logWriter.flush(); + System.out.println("Successfully created and wrote to log file"); + } catch (IOException e) { + System.err.println("Failed to create log file: " + e.getMessage()); + e.printStackTrace(); + throw e; + } + } + + @TearDown(Level.Trial) + public void tearDown() { + System.out.println("Tearing down benchmark..."); + running.set(false); + executorService.shutdown(); + try { + executorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + profiler.stop(); + long endTime = System.currentTimeMillis(); + long totalOps = operationCount.get(); + double durationSecs = (endTime - startTime) / 1000.0; + double opsPerSec = totalOps / durationSecs; + + // Log final stats to both console and file + String stats = String.format("Thread Filter Stats:%n" + + "Total operations: %,d%n" + + "Duration: %.2f seconds%n" + + "Operations/second: %,.0f%n" + + "Operations/second/thread: %,.0f%n", + totalOps, durationSecs, opsPerSec, opsPerSec / NUM_THREADS); + + System.out.print(stats); + if (logWriter != null) { + try { + logWriter.print(stats); + logWriter.flush(); + logWriter.close(); + System.out.println("Successfully closed log file"); + } catch (Exception e) { + System.err.println("Error closing log file: " + e.getMessage()); + e.printStackTrace(); + } + } + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + @Fork(1) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 2, time = 2) + @Threads(1) + public long threadFilterStress() throws InterruptedException { + System.out.println("Starting benchmark iteration..."); + startLatch = new CountDownLatch(NUM_THREADS); + stopLatch = new CountDownLatch(NUM_THREADS); + + // Start all worker threads + for (int i = 0; i < NUM_THREADS; i++) { + final int threadId = i; + executorService.submit(() -> { + startLatch.countDown(); + try { + startLatch.await(); // Wait for all threads to be ready + String startMsg = String.format("Thread %d started%n", threadId); + System.out.print(startMsg); + if (logWriter != null) { + logWriter.print(startMsg); + logWriter.flush(); + } + + while (running.get()) { + // Add and remove thread repeatedly as fast as possible + profiler.addThread(); + operationCount.incrementAndGet(); + profiler.removeThread(); + operationCount.incrementAndGet(); + + // Log progress every 1000 operations + if (operationCount.get() % 1000 == 0) { + String progressMsg = String.format("Thread %d completed %d operations%n", threadId, operationCount.get()); + System.out.print(progressMsg); + if (logWriter != null) { + logWriter.print(progressMsg); + logWriter.flush(); + } + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + stopLatch.countDown(); + String finishMsg = String.format("Thread %d finished%n", threadId); + System.out.print(finishMsg); + if (logWriter != null) { + logWriter.print(finishMsg); + logWriter.flush(); + } + } + }); + } + + // Wait for all threads to finish + stopLatch.await(); + return operationCount.get(); + } +} \ No newline at end of file From 913d2b6750f029170897bbc46b5d807233b2af68 Mon Sep 17 00:00:00 2001 From: r1viollet Date: Fri, 21 Mar 2025 10:41:00 +0100 Subject: [PATCH 02/10] Bench - Add a benchmark for the thread filter mechanism --- .../scenarios/ThreadFilterBenchmark.java | 177 ++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java diff --git a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java new file mode 100644 index 000000000..a373e5ae4 --- /dev/null +++ b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java @@ -0,0 +1,177 @@ +package com.datadoghq.profiler.stresstest.scenarios; + +import com.datadoghq.profiler.JavaProfiler; +import com.datadoghq.profiler.stresstest.Configuration; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +@State(Scope.Benchmark) +public class ThreadFilterBenchmark extends Configuration { + + private static final int NUM_THREADS = 15; + private ExecutorService executorService; + private JavaProfiler profiler; + private AtomicBoolean running; + private CountDownLatch startLatch; + private CountDownLatch stopLatch; + private AtomicLong operationCount; + private long startTime; + private PrintWriter logWriter; + + @Setup(Level.Trial) + public void setup() throws IOException { + System.out.println("Setting up benchmark..."); + System.out.println("Creating thread pool with " + NUM_THREADS + " threads"); + executorService = Executors.newFixedThreadPool(NUM_THREADS); + System.out.println("Getting profiler instance"); + profiler = JavaProfiler.getInstance(); + System.out.println("Starting profiler with wall=1ms,filter=0,file=/tmp/thread_filter_profile.jfr"); + profiler.execute("start,wall=1ms,filter=0,file=/tmp/thread_filter_profile.jfr"); + System.out.println("Started profiler with output file"); + running = new AtomicBoolean(true); + operationCount = new AtomicLong(0); + startTime = System.currentTimeMillis(); + System.out.println("Benchmark setup completed at " + startTime); + + try { + String logFile = "/tmp/thread_filter_benchmark.log"; + System.out.println("Attempting to create log file at: " + logFile); + logWriter = new PrintWriter(new FileWriter(logFile)); + logWriter.printf("Benchmark started at %d%n", startTime); + logWriter.flush(); + System.out.println("Successfully created and wrote to log file"); + } catch (IOException e) { + System.err.println("Failed to create log file: " + e.getMessage()); + e.printStackTrace(); + throw e; + } + } + + @TearDown(Level.Trial) + public void tearDown() { + System.out.println("Tearing down benchmark..."); + running.set(false); + + // Wait for all threads to finish with a timeout + try { + if (stopLatch != null) { + if (!stopLatch.await(5, TimeUnit.SECONDS)) { + System.err.println("Warning: Some threads did not finish within timeout"); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Shutdown executor with timeout + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + System.err.println("Warning: Executor did not terminate"); + } + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + + profiler.stop(); + long endTime = System.currentTimeMillis(); + long totalOps = operationCount.get(); + double durationSecs = (endTime - startTime) / 1000.0; + double opsPerSec = totalOps / durationSecs; + + String stats = String.format("Thread Filter Stats:%n" + + "Total operations: %,d%n" + + "Duration: %.2f seconds%n" + + "Operations/second: %,.0f%n" + + "Operations/second/thread: %,.0f%n", + totalOps, durationSecs, opsPerSec, opsPerSec / NUM_THREADS); + + System.out.print(stats); + if (logWriter != null) { + try { + logWriter.print(stats); + logWriter.flush(); + logWriter.close(); + System.out.println("Successfully closed log file"); + } catch (Exception e) { + System.err.println("Error closing log file: " + e.getMessage()); + e.printStackTrace(); + } + } + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + @Fork(value = 1, warmups = 0) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 1, time = 2) + @Threads(Threads.MAX) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public long threadFilterStress() throws InterruptedException { + System.out.println("Starting benchmark iteration..."); + startLatch = new CountDownLatch(NUM_THREADS); + stopLatch = new CountDownLatch(NUM_THREADS); + + // Start all worker threads + for (int i = 0; i < NUM_THREADS; i++) { + final int threadId = i; + executorService.submit(() -> { + try { + startLatch.countDown(); + startLatch.await(5, TimeUnit.SECONDS); // Add timeout for thread startup + + String startMsg = String.format("Thread %d started%n", threadId); + System.out.print(startMsg); + if (logWriter != null) { + logWriter.print(startMsg); + logWriter.flush(); + } + + while (running.get()) { + profiler.addThread(); + operationCount.incrementAndGet(); + profiler.removeThread(); + operationCount.incrementAndGet(); + + if (operationCount.get() % 1000 == 0) { + String progressMsg = String.format("Thread %d completed %d operations%n", threadId, operationCount.get()); + System.out.print(progressMsg); + if (logWriter != null) { + logWriter.print(progressMsg); + logWriter.flush(); + } + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + stopLatch.countDown(); + String finishMsg = String.format("Thread %d finished%n", threadId); + System.out.print(finishMsg); + if (logWriter != null) { + logWriter.print(finishMsg); + logWriter.flush(); + } + } + }); + } + + // Wait for all threads to finish with timeout + if (!stopLatch.await(5, TimeUnit.SECONDS)) { + System.err.println("Warning: Benchmark did not complete within timeout"); + } + + return operationCount.get(); + } +} \ No newline at end of file From 873fcb2dcd318754e8d0b0ff29ae373f3e1a1000 Mon Sep 17 00:00:00 2001 From: r1viollet Date: Fri, 21 Mar 2025 19:17:28 +0100 Subject: [PATCH 03/10] thread filter - reverse lower bits The idea is to check if false sharing could be problematic. --- ddprof-lib/src/main/cpp/threadFilter.cpp | 49 ++++++++++++------- ddprof-lib/src/main/cpp/threadFilter.h | 18 +++++-- ddprof-lib/src/test/cpp/ddprof_ut.cpp | 60 ++++++++++++++++++++++++ 3 files changed, 107 insertions(+), 20 deletions(-) diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index 034aabf9b..8b8eef722 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -19,6 +19,8 @@ #include "os.h" #include #include +#include +#include void trackPage() { Counters::increment(THREAD_FILTER_PAGES, 1); @@ -87,7 +89,9 @@ void ThreadFilter::clear() { bool ThreadFilter::accept(int thread_id) { u64 *b = bitmap(thread_id); - return b != NULL && (word(b, thread_id) & (1ULL << (thread_id & 0x3f))); + if (b == NULL) return false; + u32 reversed = reverseBits(thread_id); + return word(b, thread_id) & (1ULL << (reversed & 0x3f)); } void ThreadFilter::add(int thread_id) { @@ -104,7 +108,8 @@ void ThreadFilter::add(int thread_id) { } } - u64 bit = 1ULL << (thread_id & 0x3f); + u32 reversed = reverseBits(thread_id); + u64 bit = 1ULL << (reversed & 0x3f); if (!(__sync_fetch_and_or(&word(b, thread_id), bit) & bit)) { atomicInc(_size); } @@ -116,26 +121,36 @@ void ThreadFilter::remove(int thread_id) { return; } - u64 bit = 1ULL << (thread_id & 0x3f); + u32 reversed = reverseBits(thread_id); + u64 bit = 1ULL << (reversed & 0x3f); if (__sync_fetch_and_and(&word(b, thread_id), ~bit) & bit) { atomicInc(_size, -1); } } -void ThreadFilter::collect(std::vector &v) { - for (int i = 0; i < _max_bitmaps; i++) { - u64 *b = _bitmap[i]; - if (b != NULL) { - int start_id = i * BITMAP_CAPACITY; - for (int j = 0; j < BITMAP_SIZE / sizeof(u64); j++) { - // Considering the functional impact, relaxed could be a reasonable - // order here - u64 word = __atomic_load_n(&b[j], __ATOMIC_ACQUIRE); - while (word != 0) { - v.push_back(start_id + j * 64 + __builtin_ctzl(word)); - word &= (word - 1); +void ThreadFilter::collect(std::vector& tids) { + tids.reserve(_size); // Pre-allocate space for efficiency + + // Iterate through the bitmap array + for (int i = 0; i < _max_bitmaps; i++) { + u64* b = _bitmap[i]; + if (b != NULL) { + int start_id = i * BITMAP_CAPACITY; + for (int j = 0; j < BITMAP_SIZE / sizeof(u64); j++) { + u64 word = __atomic_load_n(&b[j], __ATOMIC_ACQUIRE); + while (word != 0) { + int bit_pos = __builtin_ctzl(word); + // For each bit position, we need to find all thread IDs that would map to it + for (int k = 0; k < 64; k++) { + int thread_id = start_id + (j << 6) + k; + u32 reversed = reverseBits(thread_id); + if ((reversed & 0x3f) == bit_pos) { + tids.push_back(thread_id); + } + } + word &= (word - 1); + } + } } - } } - } } diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index 53358406b..03c53d08f 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -45,9 +45,21 @@ class ThreadFilter { __ATOMIC_ACQUIRE); } - u64 &word(u64 *bitmap, int thread_id) { - // todo: add thread safe APIs - return bitmap[((u32)thread_id % BITMAP_CAPACITY) >> 6]; + static u32 reverseBits(u32 n) { + u32 x = n & 0x3f; // isolate lower 6 bits + x = ((x & 0x01) << 5) | + ((x & 0x02) << 3) | + ((x & 0x04) << 1) | + ((x & 0x08) >> 1) | + ((x & 0x10) >> 3) | + ((x & 0x20) >> 5); + return (n & ~0x3f) | x; + } + + + // Map thread ID to word index + u64& word(u64 *bitmap, u32 thread_id) { + return bitmap[thread_id >> 6]; } public: diff --git a/ddprof-lib/src/test/cpp/ddprof_ut.cpp b/ddprof-lib/src/test/cpp/ddprof_ut.cpp index 93cbbe40f..020893e9f 100644 --- a/ddprof-lib/src/test/cpp/ddprof_ut.cpp +++ b/ddprof-lib/src/test/cpp/ddprof_ut.cpp @@ -11,6 +11,7 @@ #include "threadLocalData.h" #include "vmEntry.h" #include + #include // For std::sort ssize_t callback(char* ptr, int len) { return len; @@ -145,6 +146,65 @@ EXPECT_EQ(0, filter.size()); } + TEST(ThreadFilter, testThreadFilterCollect) { + ThreadFilter filter; + filter.init(""); + ASSERT_TRUE(filter.enabled()); + EXPECT_EQ(0, filter.size()); + + // Add some thread IDs that would map to different bit positions + std::vector test_tids = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + for (int tid : test_tids) { + filter.add(tid); + EXPECT_TRUE(filter.accept(tid)); + } + ASSERT_EQ(test_tids.size(), filter.size()); + + // Collect all thread IDs + std::vector collected_tids; + filter.collect(collected_tids); + + // Verify size matches + ASSERT_EQ(test_tids.size(), collected_tids.size()); + + // Sort both vectors for comparison + std::sort(test_tids.begin(), test_tids.end()); + std::sort(collected_tids.begin(), collected_tids.end()); + + // Verify all thread IDs are present + for (size_t i = 0; i < test_tids.size(); i++) { + EXPECT_EQ(test_tids[i], collected_tids[i]) + << "Mismatch at index " << i + << ": expected " << test_tids[i] + << ", got " << collected_tids[i]; + } + + // Test with a larger range of thread IDs + filter.clear(); + test_tids.clear(); + for (int i = 0; i < 100; i++) { + int tid = i * 64 + 1; // Use IDs that would map to different words + test_tids.push_back(tid); + filter.add(tid); + EXPECT_TRUE(filter.accept(tid)); + } + ASSERT_EQ(test_tids.size(), filter.size()); + + collected_tids.clear(); + filter.collect(collected_tids); + + ASSERT_EQ(test_tids.size(), collected_tids.size()); + std::sort(test_tids.begin(), test_tids.end()); + std::sort(collected_tids.begin(), collected_tids.end()); + + for (size_t i = 0; i < test_tids.size(); i++) { + EXPECT_EQ(test_tids[i], collected_tids[i]) + << "Mismatch at index " << i + << ": expected " << test_tids[i] + << ", got " << collected_tids[i]; + } + } + TEST(ThreadInfoTest, testThreadInfoCleanupAllDead) { ThreadInfo info; info.set(1, "main", 1); From 0f929170794ee796d93a923b4bc97947c71db3bd Mon Sep 17 00:00:00 2001 From: r1viollet Date: Thu, 3 Apr 2025 15:56:16 +0200 Subject: [PATCH 04/10] thread filter - adjust implementation --- ddprof-lib/src/main/cpp/flightRecorder.cpp | 1 - ddprof-lib/src/main/cpp/profiler.cpp | 1 + ddprof-lib/src/main/cpp/threadFilter.cpp | 206 +++++++----------- ddprof-lib/src/main/cpp/threadFilter.h | 98 +++------ ddprof-lib/src/test/cpp/ddprof_ut.cpp | 126 ++++++----- .../scenarios/ThreadFilterBenchmark.java | 44 +++- 6 files changed, 217 insertions(+), 259 deletions(-) diff --git a/ddprof-lib/src/main/cpp/flightRecorder.cpp b/ddprof-lib/src/main/cpp/flightRecorder.cpp index 9b6fa3bd6..d753f5b6e 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.cpp +++ b/ddprof-lib/src/main/cpp/flightRecorder.cpp @@ -1055,7 +1055,6 @@ void Recording::writeExecutionModes(Buffer *buf) { void Recording::writeThreads(Buffer *buf) { addThread(_tid); std::vector threads; - threads.reserve(_thread_set.size()); _thread_set.collect(threads); _thread_set.clear(); diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index b35448c9c..b9e38c969 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -106,6 +106,7 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { int tid = ProfiledThread::currentTid(); if (_thread_filter.enabled()) { + _thread_filter.ensureThreadRegistered(); _thread_filter.remove(tid); } updateThreadName(jvmti, jni, thread, true); diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index 8b8eef722..61bcaf13b 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -1,156 +1,108 @@ -/* - * Copyright 2020 Andrei Pangin - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +// todo copyright stuff +// as I rewrote all the implem #include "threadFilter.h" -#include "counters.h" -#include "os.h" -#include -#include -#include #include +#include +#include +#include -void trackPage() { - Counters::increment(THREAD_FILTER_PAGES, 1); - Counters::increment(THREAD_FILTER_BYTES, BITMAP_SIZE); -} +static ThreadFilter* global_filter = nullptr; +thread_local ThreadFilter::SlotID tls_slot_id = -1; // todo, use the signal safe stuff +static std::mutex slot_mutex; -ThreadFilter::ThreadFilter() { - _max_thread_id = OS::getMaxThreadId(128 * 1024); - _max_bitmaps = (_max_thread_id + BITMAP_SIZE - 1) / BITMAP_SIZE; - u32 capacity = _max_bitmaps * sizeof(u64 *); - _bitmap = (u64 **)OS::safeAlloc(capacity); - memset(_bitmap, 0, capacity); - _bitmap[0] = (u64 *)OS::safeAlloc(BITMAP_SIZE); - trackPage(); - _enabled = false; - _size = 0; +ThreadFilter::ThreadFilter() : _next_index(0) { + std::lock_guard lock(slot_mutex); + _slots.resize(128); // preallocate some slots } ThreadFilter::~ThreadFilter() { - for (int i = 0; i < _max_bitmaps; i++) { - if (_bitmap[i] != NULL) { - OS::safeFree(_bitmap[i], BITMAP_SIZE); - } - } - if (_bitmap) { - OS::safeFree(_bitmap, _max_bitmaps * sizeof(u64 *)); - } + std::lock_guard lock(slot_mutex); + _slots.clear(); } -void ThreadFilter::init(const char *filter) { - if (filter == NULL) { - _enabled = false; - return; +ThreadFilter::SlotID ThreadFilter::registerThread() { + int index = _next_index.fetch_add(1, std::memory_order_relaxed); + if (index < static_cast(_slots.size())) { + return index; } - - char *end; - do { - int id = strtol(filter, &end, 0); - if (id <= 0) { - break; + // Lock required to safely grow the vector + { + std::lock_guard lock(slot_mutex); + size_t current_size = _slots.size(); + if (static_cast(index) >= current_size) { + _slots.resize(current_size * 2); } + } + return index; +} - if (*end == '-') { - int to = strtol(end + 1, &end, 0); - while (id <= to) { - add(id++); - } - } else { - add(id); +ThreadFilter::SlotID ThreadFilter::ensureThreadRegistered() { + if (tls_slot_id == -1) { + tls_slot_id = registerThread(); } - - filter = end + 1; - } while (*end); - - _enabled = true; + return tls_slot_id; } void ThreadFilter::clear() { - for (int i = 0; i < _max_bitmaps; i++) { - if (_bitmap[i] != NULL) { - memset(_bitmap[i], 0, BITMAP_SIZE); + std::lock_guard lock(slot_mutex); + for (auto& slot : _slots) { + slot.value.store(-1, std::memory_order_relaxed); } - } - _size = 0; } -bool ThreadFilter::accept(int thread_id) { - u64 *b = bitmap(thread_id); - if (b == NULL) return false; - u32 reversed = reverseBits(thread_id); - return word(b, thread_id) & (1ULL << (reversed & 0x3f)); +bool ThreadFilter::accept(int tid) const { + if (!_enabled) return true; + SlotID id = tls_slot_id; + return id >= 0 && id < static_cast(_slots.size()) && _slots[id].value.load(std::memory_order_acquire) != -1; } -void ThreadFilter::add(int thread_id) { - u64 *b = bitmap(thread_id); - if (b == NULL) { - b = (u64 *)OS::safeAlloc(BITMAP_SIZE); - u64 *oldb = __sync_val_compare_and_swap( - &_bitmap[(u32)thread_id / BITMAP_CAPACITY], NULL, b); - if (oldb != NULL) { - OS::safeFree(b, BITMAP_SIZE); - b = oldb; - } else { - trackPage(); - } - } - - u32 reversed = reverseBits(thread_id); - u64 bit = 1ULL << (reversed & 0x3f); - if (!(__sync_fetch_and_or(&word(b, thread_id), bit) & bit)) { - atomicInc(_size); - } +void ThreadFilter::add(int tid) { + SlotID id = ensureThreadRegistered(); + _slots[id].value.store(tid, std::memory_order_relaxed); } -void ThreadFilter::remove(int thread_id) { - u64 *b = bitmap(thread_id); - if (b == NULL) { - return; - } - - u32 reversed = reverseBits(thread_id); - u64 bit = 1ULL << (reversed & 0x3f); - if (__sync_fetch_and_and(&word(b, thread_id), ~bit) & bit) { - atomicInc(_size, -1); - } +void ThreadFilter::remove(int /*tid*/) { + SlotID id = ensureThreadRegistered(); // we probably are already registered + _slots[id].value.store(-1, std::memory_order_relaxed); } -void ThreadFilter::collect(std::vector& tids) { - tids.reserve(_size); // Pre-allocate space for efficiency - - // Iterate through the bitmap array - for (int i = 0; i < _max_bitmaps; i++) { - u64* b = _bitmap[i]; - if (b != NULL) { - int start_id = i * BITMAP_CAPACITY; - for (int j = 0; j < BITMAP_SIZE / sizeof(u64); j++) { - u64 word = __atomic_load_n(&b[j], __ATOMIC_ACQUIRE); - while (word != 0) { - int bit_pos = __builtin_ctzl(word); - // For each bit position, we need to find all thread IDs that would map to it - for (int k = 0; k < 64; k++) { - int thread_id = start_id + (j << 6) + k; - u32 reversed = reverseBits(thread_id); - if ((reversed & 0x3f) == bit_pos) { - tids.push_back(thread_id); - } - } - word &= (word - 1); - } - } +void ThreadFilter::collect(std::vector& tids) const { + tids.clear(); + std::lock_guard lock(slot_mutex); + for (const auto& slot : _slots) { + int tid = slot.value.load(std::memory_order_acquire); + if (tid != -1) { + tids.push_back(tid); } } } + +void ThreadFilter::init(const char* filter) { + if (filter == nullptr) { + return; + } + // char* end; + // // todo understand this strange init + // do { + // int id = strtol(filter, &end, 0); + // if (id <= 0) { + // break; + // } + + // if (*end == '-') { + // int to = strtol(end + 1, &end, 0); + // while (id <= to) { + // add(id++); + // } + // } else { + // add(id); + // } + // filter = end + 1; + // } while (*end); + _enabled = true; +} + +bool ThreadFilter::enabled() const { + return _enabled; +} diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index 747b334b6..af6869e0c 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -1,84 +1,40 @@ -/* - * Copyright 2020 Andrei Pangin - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - #ifndef _THREADFILTER_H #define _THREADFILTER_H -#include "arch.h" +#include #include +#include -// The size of thread ID bitmap in bytes. Must be at least 64K to allow mmap() -const u32 BITMAP_SIZE = 65536; -// How many thread IDs one bitmap can hold -const u32 BITMAP_CAPACITY = BITMAP_SIZE * 8; - -// ThreadFilter query operations must be lock-free and signal-safe; -// update operations are mostly lock-free, except rare bitmap allocations class ThreadFilter { -private: - // Total number of bitmaps required to hold the entire range of thread IDs - u32 _max_thread_id; - u32 _max_bitmaps; - u64 **_bitmap; - bool _enabled; - volatile int _size; - - u64 *bitmap(int thread_id) { - if (thread_id >= _max_thread_id) { - return NULL; - } - return __atomic_load_n( - &(_bitmap[static_cast(thread_id) / BITMAP_CAPACITY]), - __ATOMIC_ACQUIRE); - } - - static u32 reverseBits(u32 n) { - u32 x = n & 0x3f; // isolate lower 6 bits - x = ((x & 0x01) << 5) | - ((x & 0x02) << 3) | - ((x & 0x04) << 1) | - ((x & 0x08) >> 1) | - ((x & 0x10) >> 3) | - ((x & 0x20) >> 5); - return (n & ~0x3f) | x; - } - - - // Map thread ID to word index - u64& word(u64 *bitmap, u32 thread_id) { - return bitmap[thread_id >> 6]; - } - public: - ThreadFilter(); - ThreadFilter(ThreadFilter &threadFilter) = delete; - ~ThreadFilter(); - - bool enabled() { return _enabled; } + using SlotID = int; - int size() { return _size; } + ThreadFilter(); + ~ThreadFilter(); - void init(const char *filter); - void clear(); + void init(const char* filter); + void clear(); + bool enabled() const; + bool accept(int thread_id) const; + void add(int thread_id); + void remove(int thread_id); // tid unused, for API consistency + void collect(std::vector& tids) const; + SlotID ensureThreadRegistered(); - bool accept(int thread_id); - void add(int thread_id); - void remove(int thread_id); - - void collect(std::vector &v); +private: + struct Slot { + std::atomic value{-1}; + Slot() = default; + Slot(const Slot&o) { value.store(o.value.load(std::memory_order_relaxed), std::memory_order_relaxed); } + Slot& operator=(const Slot& o) { value.store(o.value.load(std::memory_order_relaxed), + std::memory_order_relaxed); return *this; } + }; + + SlotID registerThread(); + + bool _enabled = false; + std::vector _slots; + std::atomic _next_index; }; #endif // _THREADFILTER_H diff --git a/ddprof-lib/src/test/cpp/ddprof_ut.cpp b/ddprof-lib/src/test/cpp/ddprof_ut.cpp index 020893e9f..f242d67fb 100644 --- a/ddprof-lib/src/test/cpp/ddprof_ut.cpp +++ b/ddprof-lib/src/test/cpp/ddprof_ut.cpp @@ -12,6 +12,8 @@ #include "vmEntry.h" #include #include // For std::sort + #include + #include ssize_t callback(char* ptr, int len) { return len; @@ -119,88 +121,94 @@ } TEST(ThreadFilter, testThreadFilter) { - int maxTid = OS::getMaxThreadId(); ThreadFilter filter; filter.init(""); ASSERT_TRUE(filter.enabled()); - EXPECT_EQ(0, filter.size()); - // increase step gradually to create different bit densities - int step = 1; - int size = 0; - for (int tid = 0; tid < maxTid - step - 1; tid += step, size++) { - EXPECT_FALSE(filter.accept(tid)); - filter.add(tid); - EXPECT_TRUE(filter.accept(tid)); - step++; + + const int num_threads = 10; + const int num_ops = 100; + std::vector threads; + std::atomic completed_threads{0}; + + // Each thread will add and remove its own thread ID multiple times + for (int i = 1; i <= num_threads; i++) { + threads.emplace_back([&filter, i, &completed_threads]() { + for (int j = 0; j < num_ops; j++) { + // Add thread ID + filter.add(i); + bool accepted = filter.accept(i); + if (!accepted) { + fprintf(stderr, "FAIL: Thread %d, op %d: accept(%d) returned false after add\n", i, j, i); + } + EXPECT_TRUE(accepted); + + // Remove thread ID + filter.remove(i); + accepted = filter.accept(i); + if (accepted) { + fprintf(stderr, "FAIL: Thread %d, op %d: accept(%d) returned true after remove\n", i, j, i); + } + EXPECT_FALSE(accepted); + } + completed_threads++; + }); } - ASSERT_EQ(size, filter.size()); + + // Wait for all threads to complete + for (auto& t : threads) { + t.join(); + } + + // Verify all threads completed + ASSERT_EQ(completed_threads.load(), num_threads); + + // Collect and verify all thread IDs were properly removed std::vector tids; - tids.reserve(size); filter.collect(tids); - ASSERT_EQ(size, tids.size()); - for (int tid : tids) { - ASSERT_TRUE(filter.accept(tid)); - filter.remove(tid); - ASSERT_FALSE(filter.accept(tid)); - } - EXPECT_EQ(0, filter.size()); + ASSERT_EQ(tids.size(), 0); } TEST(ThreadFilter, testThreadFilterCollect) { ThreadFilter filter; filter.init(""); ASSERT_TRUE(filter.enabled()); - EXPECT_EQ(0, filter.size()); - // Add some thread IDs that would map to different bit positions - std::vector test_tids = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; - for (int tid : test_tids) { - filter.add(tid); - EXPECT_TRUE(filter.accept(tid)); + const int num_threads = 10; + std::vector threads; + std::atomic completed_threads{0}; + std::vector expected_tids; + + // Each thread will add its thread ID + for (int i = 1; i <= num_threads; i++) { + expected_tids.push_back(i); + threads.emplace_back([&filter, i, &completed_threads]() { + filter.add(i); + EXPECT_TRUE(filter.accept(i)); + completed_threads++; + }); } - ASSERT_EQ(test_tids.size(), filter.size()); - // Collect all thread IDs - std::vector collected_tids; - filter.collect(collected_tids); - - // Verify size matches - ASSERT_EQ(test_tids.size(), collected_tids.size()); - - // Sort both vectors for comparison - std::sort(test_tids.begin(), test_tids.end()); - std::sort(collected_tids.begin(), collected_tids.end()); - - // Verify all thread IDs are present - for (size_t i = 0; i < test_tids.size(); i++) { - EXPECT_EQ(test_tids[i], collected_tids[i]) - << "Mismatch at index " << i - << ": expected " << test_tids[i] - << ", got " << collected_tids[i]; + // Wait for all threads to complete + for (auto& t : threads) { + t.join(); } - // Test with a larger range of thread IDs - filter.clear(); - test_tids.clear(); - for (int i = 0; i < 100; i++) { - int tid = i * 64 + 1; // Use IDs that would map to different words - test_tids.push_back(tid); - filter.add(tid); - EXPECT_TRUE(filter.accept(tid)); - } - ASSERT_EQ(test_tids.size(), filter.size()); + // Verify all threads completed + ASSERT_EQ(completed_threads.load(), num_threads); - collected_tids.clear(); + // Collect and verify all thread IDs are present + std::vector collected_tids; filter.collect(collected_tids); - ASSERT_EQ(test_tids.size(), collected_tids.size()); - std::sort(test_tids.begin(), test_tids.end()); + // Sort both vectors for comparison + std::sort(expected_tids.begin(), expected_tids.end()); std::sort(collected_tids.begin(), collected_tids.end()); - for (size_t i = 0; i < test_tids.size(); i++) { - EXPECT_EQ(test_tids[i], collected_tids[i]) + ASSERT_EQ(expected_tids.size(), collected_tids.size()); + for (size_t i = 0; i < expected_tids.size(); i++) { + EXPECT_EQ(expected_tids[i], collected_tids[i]) << "Mismatch at index " << i - << ": expected " << test_tids[i] + << ": expected " << expected_tids[i] << ", got " << collected_tids[i]; } } diff --git a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java index a373e5ae4..d29162e9b 100644 --- a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java +++ b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java @@ -11,6 +11,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicIntegerArray; @State(Scope.Benchmark) public class ThreadFilterBenchmark extends Configuration { @@ -24,6 +25,11 @@ public class ThreadFilterBenchmark extends Configuration { private AtomicLong operationCount; private long startTime; private PrintWriter logWriter; + private static final int ARRAY_SIZE = 1024; // Larger array to stress memory + private static final int[] sharedArray = new int[ARRAY_SIZE]; + private static final AtomicIntegerArray atomicArray = new AtomicIntegerArray(ARRAY_SIZE); + private static final int CACHE_LINE_SIZE = 64; // Typical cache line size + private static final int STRIDE = CACHE_LINE_SIZE / Integer.BYTES; // Elements per cache line @Setup(Level.Trial) public void setup() throws IOException { @@ -32,6 +38,12 @@ public void setup() throws IOException { executorService = Executors.newFixedThreadPool(NUM_THREADS); System.out.println("Getting profiler instance"); profiler = JavaProfiler.getInstance(); + System.out.println("Stopping any existing profiler session"); + try { + profiler.stop(); + } catch (Exception e) { + // Ignore if profiler wasn't started + } System.out.println("Starting profiler with wall=1ms,filter=0,file=/tmp/thread_filter_profile.jfr"); profiler.execute("start,wall=1ms,filter=0,file=/tmp/thread_filter_profile.jfr"); System.out.println("Started profiler with output file"); @@ -129,7 +141,7 @@ public long threadFilterStress() throws InterruptedException { executorService.submit(() -> { try { startLatch.countDown(); - startLatch.await(5, TimeUnit.SECONDS); // Add timeout for thread startup + startLatch.await(5, TimeUnit.SECONDS); String startMsg = String.format("Thread %d started%n", threadId); System.out.print(startMsg); @@ -139,9 +151,39 @@ public long threadFilterStress() throws InterruptedException { } while (running.get()) { + // Register thread profiler.addThread(); + + // Memory-intensive operations that would be sensitive to false sharing + for (int j = 0; j < ARRAY_SIZE; j += STRIDE) { + // Each thread writes to its own cache line + int baseIndex = (threadId * STRIDE) % ARRAY_SIZE; + for (int k = 0; k < STRIDE; k++) { + int index = (baseIndex + k) % ARRAY_SIZE; + // Write to shared array + sharedArray[index] = threadId; + // Read and modify + int value = sharedArray[index] + 1; + // Atomic operation + atomicArray.set(index, value); + } + } + operationCount.incrementAndGet(); + + // Remove thread profiler.removeThread(); + + // More memory operations + for (int j = 0; j < ARRAY_SIZE; j += STRIDE) { + int baseIndex = (threadId * STRIDE) % ARRAY_SIZE; + for (int k = 0; k < STRIDE; k++) { + int index = (baseIndex + k) % ARRAY_SIZE; + int value = atomicArray.get(index); + sharedArray[index] = value * 2; + } + } + operationCount.incrementAndGet(); if (operationCount.get() % 1000 == 0) { From 258f23e4816b5ba34a3243fd9f7869db0b7ef76a Mon Sep 17 00:00:00 2001 From: r1viollet Date: Mon, 28 Apr 2025 10:33:14 +0200 Subject: [PATCH 05/10] thread filter delegate slot management to the thread object --- ddprof-lib/src/main/cpp/flightRecorder.cpp | 4 +- ddprof-lib/src/main/cpp/javaApi.cpp | 34 ++++++--- ddprof-lib/src/main/cpp/profiler.cpp | 13 ++-- ddprof-lib/src/main/cpp/thread.h | 6 +- ddprof-lib/src/main/cpp/threadFilter.cpp | 72 ++++++++++--------- ddprof-lib/src/main/cpp/threadFilter.h | 13 ++-- ddprof-lib/src/main/cpp/wallClock.cpp | 16 ++++- .../com/datadoghq/profiler/JavaProfiler.java | 9 +-- ddprof-lib/src/test/cpp/ddprof_ut.cpp | 33 ++++++--- 9 files changed, 131 insertions(+), 69 deletions(-) diff --git a/ddprof-lib/src/main/cpp/flightRecorder.cpp b/ddprof-lib/src/main/cpp/flightRecorder.cpp index d753f5b6e..35f9391e7 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.cpp +++ b/ddprof-lib/src/main/cpp/flightRecorder.cpp @@ -1419,7 +1419,9 @@ void Recording::recordCpuLoad(Buffer *buf, float proc_user, float proc_system, flushIfNeeded(buf); } -void Recording::addThread(int tid) { _thread_set.add(tid); } +void Recording::addThread(int tid) { + _thread_set.add(tid, 0); // todo: add slot_id management +} Error FlightRecorder::start(Arguments &args, bool reset) { const char *file = args.file(); diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 365d1dc45..37b5f5719 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -123,22 +123,40 @@ Java_com_datadoghq_profiler_JavaProfiler_getSamples(JNIEnv *env, return (jlong)Profiler::instance()->total_samples(); } +// some duplication between add and remove, though we want to avoid having an extra branch in the hot path extern "C" DLLEXPORT void JNICALL -Java_com_datadoghq_profiler_JavaProfiler_filterThread0(JNIEnv *env, - jobject unused, - jboolean enable) { - int tid = ProfiledThread::currentTid(); +Java_com_datadoghq_profiler_JavaProfiler_filterThread_1add(JNIEnv *env, + jobject unused) { + ProfiledThread *current = ProfiledThread::current(); + int tid = current->tid(); + if (tid < 0) { + return; + } + ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); + int slot_id = current->filterSlotId(); + if (unlikely(slot_id == -1)) { + return; + } + thread_filter->add(tid, slot_id); +} + +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_filterThread_1remove(JNIEnv *env, + jobject unused) { + ProfiledThread *current = ProfiledThread::current(); + int tid = current->tid(); if (tid < 0) { return; } ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); - if (enable) { - thread_filter->add(tid); - } else { - thread_filter->remove(tid); + int slot_id = current->filterSlotId(); + if (unlikely(slot_id == -1)) { + return; } + thread_filter->remove(slot_id); } + extern "C" DLLEXPORT jobject JNICALL Java_com_datadoghq_profiler_JavaProfiler_getContextPage0(JNIEnv *env, jobject unused, diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index b9e38c969..49bb5e3c3 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -103,10 +103,10 @@ void Profiler::addRuntimeStub(const void *address, int length, void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { ProfiledThread::initCurrentThread(); - - int tid = ProfiledThread::currentTid(); + ProfiledThread *current = ProfiledThread::current(); + int tid = current->tid(); if (_thread_filter.enabled()) { - _thread_filter.ensureThreadRegistered(); + current->setFilterSlotId(_thread_filter.registerThread()); _thread_filter.remove(tid); } updateThreadName(jvmti, jni, thread, true); @@ -116,9 +116,12 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { } void Profiler::onThreadEnd(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { - int tid = ProfiledThread::currentTid(); + ProfiledThread *current = ProfiledThread::current(); + int slot_id = current->filterSlotId(); + int tid = current->tid(); if (_thread_filter.enabled()) { - _thread_filter.remove(tid); + _thread_filter.unregisterThread(slot_id); + current->setFilterSlotId(-1); } updateThreadName(jvmti, jni, thread, true); diff --git a/ddprof-lib/src/main/cpp/thread.h b/ddprof-lib/src/main/cpp/thread.h index 085b217d0..a8464eb8a 100644 --- a/ddprof-lib/src/main/cpp/thread.h +++ b/ddprof-lib/src/main/cpp/thread.h @@ -42,10 +42,11 @@ class ProfiledThread : public ThreadLocalData { u32 _wall_epoch; u32 _call_trace_id; u32 _recording_epoch; + int _filter_slot_id; // Slot ID for thread filtering ProfiledThread(int buffer_pos, int tid) : ThreadLocalData(), _pc(0), _span_id(0), _crash_depth(0), _buffer_pos(buffer_pos), _tid(tid), _cpu_epoch(0), - _wall_epoch(0), _call_trace_id(0), _recording_epoch(0) {}; + _wall_epoch(0), _call_trace_id(0), _recording_epoch(0), _filter_slot_id(-1) {}; void releaseFromBuffer(); @@ -111,6 +112,9 @@ class ProfiledThread : public ThreadLocalData { } static void signalHandler(int signo, siginfo_t *siginfo, void *ucontext); + + int filterSlotId() { return _filter_slot_id; } + void setFilterSlotId(int slotId) { _filter_slot_id = slotId; } }; #endif // _THREAD_H diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index 61bcaf13b..aa29a11a7 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -7,41 +7,42 @@ #include #include -static ThreadFilter* global_filter = nullptr; -thread_local ThreadFilter::SlotID tls_slot_id = -1; // todo, use the signal safe stuff static std::mutex slot_mutex; -ThreadFilter::ThreadFilter() : _next_index(0) { + +ThreadFilter::ThreadFilter() : _next_index(0), _enabled(false) { std::lock_guard lock(slot_mutex); _slots.resize(128); // preallocate some slots + // Initialize all slots to -1 + for (auto& slot : _slots) { + slot.value.store(-1, std::memory_order_relaxed); + } } ThreadFilter::~ThreadFilter() { std::lock_guard lock(slot_mutex); _slots.clear(); + _free_list.clear(); // todo: implement this, not needed for benchmark } ThreadFilter::SlotID ThreadFilter::registerThread() { - int index = _next_index.fetch_add(1, std::memory_order_relaxed); - if (index < static_cast(_slots.size())) { - return index; - } - // Lock required to safely grow the vector - { - std::lock_guard lock(slot_mutex); - size_t current_size = _slots.size(); - if (static_cast(index) >= current_size) { - _slots.resize(current_size * 2); + int index = _next_index.fetch_add(1, std::memory_order_relaxed); + if (index < static_cast(_slots.size())) { + return index; } - } - return index; -} - -ThreadFilter::SlotID ThreadFilter::ensureThreadRegistered() { - if (tls_slot_id == -1) { - tls_slot_id = registerThread(); + // Lock required to safely grow the vector + { + std::lock_guard lock(slot_mutex); + size_t current_size = _slots.size(); + if (static_cast(index) >= current_size) { + _slots.resize(current_size * 2); + // Initialize new slots + for (size_t i = current_size; i < current_size * 2; ++i) { + _slots[i].value.store(-1, std::memory_order_relaxed); + } + } } - return tls_slot_id; + return index; } void ThreadFilter::clear() { @@ -51,29 +52,26 @@ void ThreadFilter::clear() { } } -bool ThreadFilter::accept(int tid) const { +bool ThreadFilter::accept(int slot_id) const { if (!_enabled) return true; - SlotID id = tls_slot_id; - return id >= 0 && id < static_cast(_slots.size()) && _slots[id].value.load(std::memory_order_acquire) != -1; + return slot_id >= 0 && slot_id < static_cast(_slots.size()) && _slots[slot_id].value.load(std::memory_order_acquire) != -1; } -void ThreadFilter::add(int tid) { - SlotID id = ensureThreadRegistered(); - _slots[id].value.store(tid, std::memory_order_relaxed); +void ThreadFilter::add(int tid, int slot_id) { + _slots[slot_id].value.store(tid, std::memory_order_relaxed); } -void ThreadFilter::remove(int /*tid*/) { - SlotID id = ensureThreadRegistered(); // we probably are already registered - _slots[id].value.store(-1, std::memory_order_relaxed); +void ThreadFilter::remove(int slot_id) { + _slots[slot_id].value.store(-1, std::memory_order_relaxed); } void ThreadFilter::collect(std::vector& tids) const { tids.clear(); std::lock_guard lock(slot_mutex); for (const auto& slot : _slots) { - int tid = slot.value.load(std::memory_order_acquire); - if (tid != -1) { - tids.push_back(tid); + int slot_tid = slot.value.load(std::memory_order_acquire); + if (slot_tid != -1) { + tids.push_back(slot_tid); } } } @@ -106,3 +104,11 @@ void ThreadFilter::init(const char* filter) { bool ThreadFilter::enabled() const { return _enabled; } + +// Implementation of unregisterThread - releases a slot by its ID +void ThreadFilter::unregisterThread(SlotID slot_id) { + if (slot_id >= 0 && slot_id < static_cast(_slots.size())) { + // Reset this slot to be available again + _slots[slot_id].value.store(-1, std::memory_order_relaxed); + } +} diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index af6869e0c..35dae8fa7 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -15,11 +15,13 @@ class ThreadFilter { void init(const char* filter); void clear(); bool enabled() const; - bool accept(int thread_id) const; - void add(int thread_id); - void remove(int thread_id); // tid unused, for API consistency + bool accept(int slot_id) const; + void add(int tid, int slot_id); + void remove(int slot_id); // tid unused, for API consistency void collect(std::vector& tids) const; - SlotID ensureThreadRegistered(); + + SlotID registerThread(); + void unregisterThread(SlotID slot_id); private: struct Slot { @@ -30,10 +32,9 @@ class ThreadFilter { std::memory_order_relaxed); return *this; } }; - SlotID registerThread(); - bool _enabled = false; std::vector _slots; + std::vector _free_list; std::atomic _next_index; }; diff --git a/ddprof-lib/src/main/cpp/wallClock.cpp b/ddprof-lib/src/main/cpp/wallClock.cpp index fa93fe66d..3f50e2110 100644 --- a/ddprof-lib/src/main/cpp/wallClock.cpp +++ b/ddprof-lib/src/main/cpp/wallClock.cpp @@ -26,6 +26,7 @@ #include "vmStructs.h" #include #include +#include // For std::sort and std::binary_search std::atomic BaseWallClock::_enabled{false}; @@ -174,6 +175,14 @@ void WallClockJVMTI::timerLoop() { bool do_filter = Profiler::instance()->threadFilter()->enabled(); int self = OS::threadId(); + // If filtering is enabled, collect the filtered TIDs first + std::vector filtered_tids; + if (do_filter) { + Profiler::instance()->threadFilter()->collect(filtered_tids); + // Sort the TIDs for efficient lookup + std::sort(filtered_tids.begin(), filtered_tids.end()); + } + for (int i = 0; i < threads_count; i++) { jthread thread = threads_ptr[i]; if (thread != nullptr) { @@ -182,7 +191,9 @@ void WallClockJVMTI::timerLoop() { continue; } int tid = nThread->osThreadId(); - if (tid != self && (!do_filter || Profiler::instance()->threadFilter()->accept(tid))) { + if (tid != self && (!do_filter || + // Use binary search to efficiently find if tid is in filtered_tids + std::binary_search(filtered_tids.begin(), filtered_tids.end(), tid))) { threads.push_back({nThread, thread}); } } @@ -225,12 +236,15 @@ void WallClockJVMTI::timerLoop() { void WallClockASGCT::timerLoop() { auto collectThreads = [&](std::vector& tids) { + // Get thread IDs from the filter if it's enabled + // Otherwise list all threads in the system if (Profiler::instance()->threadFilter()->enabled()) { Profiler::instance()->threadFilter()->collect(tids); } else { ThreadList *thread_list = OS::listThreads(); int tid = thread_list->next(); while (tid != -1) { + // Don't include the current thread if (tid != OS::threadId()) { tids.push_back(tid); } diff --git a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java index b1a2795bf..0a38637b3 100644 --- a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java +++ b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java @@ -207,7 +207,7 @@ public boolean recordTraceRoot(long rootSpanId, String endpoint, int sizeLimit) * 'filter' option must be enabled to use this method. */ public void addThread() { - filterThread0(true); + filterThread_add(); } /** @@ -215,10 +215,9 @@ public void addThread() { * 'filter' option must be enabled to use this method. */ public void removeThread() { - filterThread0(false); + filterThread_remove(); } - /** * Passing context identifier to a profiler. This ID is thread-local and is dumped in * the JFR output only. 0 is a reserved value for "no-context". @@ -446,7 +445,9 @@ public Map getDebugCounters() { private static native boolean init0(); private native void stop0() throws IllegalStateException; private native String execute0(String command) throws IllegalArgumentException, IllegalStateException, IOException; - private native void filterThread0(boolean enable); + + private native void filterThread_add(); + private native void filterThread_remove(); private static native int getTid0(); private static native ByteBuffer getContextPage0(int tid); diff --git a/ddprof-lib/src/test/cpp/ddprof_ut.cpp b/ddprof-lib/src/test/cpp/ddprof_ut.cpp index f242d67fb..e06a04a8d 100644 --- a/ddprof-lib/src/test/cpp/ddprof_ut.cpp +++ b/ddprof-lib/src/test/cpp/ddprof_ut.cpp @@ -134,19 +134,24 @@ for (int i = 1; i <= num_threads; i++) { threads.emplace_back([&filter, i, &completed_threads]() { for (int j = 0; j < num_ops; j++) { - // Add thread ID - filter.add(i); - bool accepted = filter.accept(i); + // Register a new slot for this thread + int slot_id = filter.registerThread(); + + // Add thread ID to slot + filter.add(i, slot_id); + bool accepted = filter.accept(slot_id); if (!accepted) { - fprintf(stderr, "FAIL: Thread %d, op %d: accept(%d) returned false after add\n", i, j, i); + fprintf(stderr, "FAIL: Thread %d, op %d, slot %d: accept(slot=%d) returned false after add\n", + i, j, slot_id, slot_id); } EXPECT_TRUE(accepted); // Remove thread ID - filter.remove(i); - accepted = filter.accept(i); + filter.remove(slot_id); + accepted = filter.accept(slot_id); if (accepted) { - fprintf(stderr, "FAIL: Thread %d, op %d: accept(%d) returned true after remove\n", i, j, i); + fprintf(stderr, "FAIL: Thread %d, op %d, slot %d: accept(slot=%d) returned true after remove\n", + i, j, slot_id, slot_id); } EXPECT_FALSE(accepted); } @@ -177,13 +182,21 @@ std::vector threads; std::atomic completed_threads{0}; std::vector expected_tids; + std::vector slots(num_threads); // Track slot IDs + + // Pre-register slots for each thread + for (int i = 0; i < num_threads; i++) { + slots[i] = filter.registerThread(); + } // Each thread will add its thread ID for (int i = 1; i <= num_threads; i++) { expected_tids.push_back(i); - threads.emplace_back([&filter, i, &completed_threads]() { - filter.add(i); - EXPECT_TRUE(filter.accept(i)); + int slot_id = slots[i-1]; // Use the pre-registered slot + + threads.emplace_back([&filter, i, slot_id, &completed_threads]() { + filter.add(i, slot_id); + EXPECT_TRUE(filter.accept(slot_id)); completed_threads++; }); } From 1d4efc094d1a429ec96f9f6c02e5d4dbf5fb89f2 Mon Sep 17 00:00:00 2001 From: r1viollet Date: Mon, 28 Apr 2025 17:55:05 +0200 Subject: [PATCH 06/10] thread filter - refactor recording implementation remove the usage of the thread filter in the recording --- ddprof-lib/src/main/cpp/flightRecorder.cpp | 38 ++++-- ddprof-lib/src/main/cpp/flightRecorder.h | 10 +- ddprof-lib/src/main/cpp/javaApi.cpp | 4 +- ddprof-lib/src/main/cpp/threadFilter.cpp | 132 ++++++++++----------- ddprof-lib/src/main/cpp/threadFilter.h | 28 +++-- ddprof-lib/src/main/cpp/wallClock.cpp | 1 + 6 files changed, 114 insertions(+), 99 deletions(-) diff --git a/ddprof-lib/src/main/cpp/flightRecorder.cpp b/ddprof-lib/src/main/cpp/flightRecorder.cpp index 35f9391e7..37b2af4f3 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.cpp +++ b/ddprof-lib/src/main/cpp/flightRecorder.cpp @@ -317,7 +317,7 @@ char *Recording::_jvm_flags = NULL; char *Recording::_java_command = NULL; Recording::Recording(int fd, Arguments &args) - : _fd(fd), _thread_set(), _method_map() { + : _fd(fd), _method_map() { args.save(_args); _chunk_start = lseek(_fd, 0, SEEK_END); @@ -329,6 +329,8 @@ Recording::Recording(int fd, Arguments &args) _bytes_written = 0; _tid = OS::threadId(); + _active_index.store(0, std::memory_order_relaxed); + VM::jvmti()->GetAvailableProcessors(&_available_processors); writeHeader(_buf); @@ -1053,10 +1055,18 @@ void Recording::writeExecutionModes(Buffer *buf) { } void Recording::writeThreads(Buffer *buf) { - addThread(_tid); - std::vector threads; - _thread_set.collect(threads); - _thread_set.clear(); + int old_index = _active_index.fetch_xor(1, std::memory_order_acq_rel); + // After flip: new samples go into the new active set + // We flush from old_index (the previous active set) + + std::unordered_set threads; + threads.insert(_tid); + + for (int i = 0; i < CONCURRENCY_LEVEL; ++i) { + // I can not use merge : cpp 17 + threads.insert(_thread_ids[i][old_index].begin(), _thread_ids[i][old_index].end()); + _thread_ids[i][old_index].clear(); + } Profiler *profiler = Profiler::instance(); ThreadInfo *t_info = &profiler->_thread_info; @@ -1065,15 +1075,15 @@ void Recording::writeThreads(Buffer *buf) { buf->putVar64(T_THREAD); buf->putVar64(threads.size()); - for (int i = 0; i < threads.size(); i++) { + for (auto tid : threads) { const char *thread_name; jlong thread_id; - std::pair, u64> info = t_info->get(threads[i]); + std::pair, u64> info = t_info->get(tid); if (info.first) { thread_name = info.first->c_str(); thread_id = info.second; } else { - snprintf(name_buf, sizeof(name_buf), "[tid=%d]", threads[i]); + snprintf(name_buf, sizeof(name_buf), "[tid=%d]", tid); thread_name = name_buf; thread_id = 0; } @@ -1083,9 +1093,9 @@ void Recording::writeThreads(Buffer *buf) { (thread_id == 0 ? length + 1 : 2 * length) - 3 * 10; // 3x max varint length flushIfNeeded(buf, required); - buf->putVar64(threads[i]); + buf->putVar64(tid); buf->putUtf8(thread_name, length); - buf->putVar64(threads[i]); + buf->putVar64(tid); if (thread_id == 0) { buf->put8(0); } else { @@ -1419,8 +1429,10 @@ void Recording::recordCpuLoad(Buffer *buf, float proc_user, float proc_system, flushIfNeeded(buf); } -void Recording::addThread(int tid) { - _thread_set.add(tid, 0); // todo: add slot_id management +// assumption is that we hold the lock (with lock_index) +void Recording::addThread(int lock_index, int tid) { + int active = _active_index.load(std::memory_order_acquire); + _thread_ids[lock_index][active].insert(tid); } Error FlightRecorder::start(Arguments &args, bool reset) { @@ -1578,7 +1590,7 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u32 call_trace_id, break; } _rec->flushIfNeeded(buf); - _rec->addThread(tid); + _rec->addThread(lock_index, tid); } } diff --git a/ddprof-lib/src/main/cpp/flightRecorder.h b/ddprof-lib/src/main/cpp/flightRecorder.h index b80e0e20e..4ac0221d8 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.h +++ b/ddprof-lib/src/main/cpp/flightRecorder.h @@ -18,6 +18,7 @@ #define _FLIGHTRECORDER_H #include +#include #include #include @@ -127,9 +128,11 @@ class Recording { static char *_java_command; RecordingBuffer _buf[CONCURRENCY_LEVEL]; + std::unordered_set _thread_ids[CONCURRENCY_LEVEL][2]; + std::atomic _active_index{0}; // 0 or 1 globally + int _fd; off_t _chunk_start; - ThreadFilter _thread_set; MethodMap _method_map; Arguments _args; @@ -158,7 +161,7 @@ class Recording { public: Recording(int fd, Arguments &args); ~Recording(); - + void copyTo(int target_fd); off_t finishChunk(); @@ -256,7 +259,8 @@ class Recording { LockEvent *event); void recordCpuLoad(Buffer *buf, float proc_user, float proc_system, float machine_total); - void addThread(int tid); + + void addThread(int lock_index, int tid); }; class Lookup { diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 37b5f5719..779beacbe 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -129,7 +129,7 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThread_1add(JNIEnv *env, jobject unused) { ProfiledThread *current = ProfiledThread::current(); int tid = current->tid(); - if (tid < 0) { + if (unlikely(tid < 0)) { return; } ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); @@ -145,7 +145,7 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThread_1remove(JNIEnv *env, jobject unused) { ProfiledThread *current = ProfiledThread::current(); int tid = current->tid(); - if (tid < 0) { + if (unlikely(tid < 0)) { return; } ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index aa29a11a7..84d38c9b8 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -1,114 +1,104 @@ -// todo copyright stuff -// as I rewrote all the implem +// Copyright (C) Datadog 2025 +// Implementation of thread filter management #include "threadFilter.h" -#include -#include -#include -#include - -static std::mutex slot_mutex; +#include +#include -ThreadFilter::ThreadFilter() : _next_index(0), _enabled(false) { - std::lock_guard lock(slot_mutex); - _slots.resize(128); // preallocate some slots - // Initialize all slots to -1 - for (auto& slot : _slots) { - slot.value.store(-1, std::memory_order_relaxed); - } +ThreadFilter::ThreadFilter() + : _next_index(0), _enabled(false) { + std::unique_lock lock(_slot_mutex); + _slots.emplace_back(); // Allocate first chunk + clear(); } ThreadFilter::~ThreadFilter() { - std::lock_guard lock(slot_mutex); + std::unique_lock lock(_slot_mutex); _slots.clear(); - _free_list.clear(); // todo: implement this, not needed for benchmark } ThreadFilter::SlotID ThreadFilter::registerThread() { - int index = _next_index.fetch_add(1, std::memory_order_relaxed); - if (index < static_cast(_slots.size())) { - return index; + SlotID index = _next_index.fetch_add(1, std::memory_order_relaxed); + if (index >= kMaxThreads) { + return -1; } - // Lock required to safely grow the vector + const int outer_idx = index >> kChunkShift; { - std::lock_guard lock(slot_mutex); - size_t current_size = _slots.size(); - if (static_cast(index) >= current_size) { - _slots.resize(current_size * 2); - // Initialize new slots - for (size_t i = current_size; i < current_size * 2; ++i) { - _slots[i].value.store(-1, std::memory_order_relaxed); - } + if (outer_idx < static_cast(_slots.size())) { + return index; + } + } + + { + std::unique_lock write_lock(_slot_mutex); + while (outer_idx >= static_cast(_slots.size())) { + _slots.emplace_back(); } } + return index; } void ThreadFilter::clear() { - std::lock_guard lock(slot_mutex); - for (auto& slot : _slots) { - slot.value.store(-1, std::memory_order_relaxed); + for (auto& chunk : _slots) { + for (auto& slot : chunk) { + slot.value.store(-1, std::memory_order_relaxed); + } + } +} + +bool ThreadFilter::accept(SlotID slot_id) const { + if (!_enabled) { + return true; } + if (slot_id < 0) return false; + int outer_idx = slot_id >> kChunkShift; + int inner_idx = slot_id & kChunkMask; + if (outer_idx >= static_cast(_slots.size())) return false; + return _slots[outer_idx][inner_idx].value.load(std::memory_order_acquire) != -1; } -bool ThreadFilter::accept(int slot_id) const { - if (!_enabled) return true; - return slot_id >= 0 && slot_id < static_cast(_slots.size()) && _slots[slot_id].value.load(std::memory_order_acquire) != -1; +void ThreadFilter::add(int tid, SlotID slot_id) { + int outer_idx = slot_id >> kChunkShift; + int inner_idx = slot_id & kChunkMask; + _slots[outer_idx][inner_idx].value.store(tid, std::memory_order_relaxed); } -void ThreadFilter::add(int tid, int slot_id) { - _slots[slot_id].value.store(tid, std::memory_order_relaxed); +void ThreadFilter::remove(SlotID slot_id) { + int outer_idx = slot_id >> kChunkShift; + int inner_idx = slot_id & kChunkMask; + _slots[outer_idx][inner_idx].value.store(-1, std::memory_order_relaxed); } -void ThreadFilter::remove(int slot_id) { - _slots[slot_id].value.store(-1, std::memory_order_relaxed); +void ThreadFilter::unregisterThread(SlotID slot_id) { + if (slot_id < 0) return; + int outer_idx = slot_id >> kChunkShift; + int inner_idx = slot_id & kChunkMask; + _slots[outer_idx][inner_idx].value.store(-1, std::memory_order_relaxed); } void ThreadFilter::collect(std::vector& tids) const { tids.clear(); - std::lock_guard lock(slot_mutex); - for (const auto& slot : _slots) { - int slot_tid = slot.value.load(std::memory_order_acquire); - if (slot_tid != -1) { - tids.push_back(slot_tid); + // std::unique_lock lock(_slot_mutex); + for (const auto& chunk : _slots) { + for (const auto& slot : chunk) { + int slot_tid = slot.value.load(std::memory_order_acquire); + if (slot_tid != -1) { + tids.push_back(slot_tid); + } } } } void ThreadFilter::init(const char* filter) { - if (filter == nullptr) { + if (!filter) { return; } - // char* end; - // // todo understand this strange init - // do { - // int id = strtol(filter, &end, 0); - // if (id <= 0) { - // break; - // } - - // if (*end == '-') { - // int to = strtol(end + 1, &end, 0); - // while (id <= to) { - // add(id++); - // } - // } else { - // add(id); - // } - // filter = end + 1; - // } while (*end); + // TODO: Implement parsing of filter string if needed _enabled = true; } bool ThreadFilter::enabled() const { return _enabled; } - -// Implementation of unregisterThread - releases a slot by its ID -void ThreadFilter::unregisterThread(SlotID slot_id) { - if (slot_id >= 0 && slot_id < static_cast(_slots.size())) { - // Reset this slot to be available again - _slots[slot_id].value.store(-1, std::memory_order_relaxed); - } -} diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index 35dae8fa7..b78a6c2ae 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -2,22 +2,27 @@ #define _THREADFILTER_H #include +#include #include #include +#include class ThreadFilter { public: using SlotID = int; - + static constexpr int kChunkSize = 128; + static constexpr int kChunkShift = 7; + static constexpr int kChunkMask = kChunkSize - 1; + static constexpr int kMaxThreads = 2048; ThreadFilter(); ~ThreadFilter(); void init(const char* filter); void clear(); bool enabled() const; - bool accept(int slot_id) const; - void add(int tid, int slot_id); - void remove(int slot_id); // tid unused, for API consistency + bool accept(SlotID slot_id) const; + void add(int tid, SlotID slot_id); + void remove(SlotID slot_id); void collect(std::vector& tids) const; SlotID registerThread(); @@ -27,15 +32,18 @@ class ThreadFilter { struct Slot { std::atomic value{-1}; Slot() = default; - Slot(const Slot&o) { value.store(o.value.load(std::memory_order_relaxed), std::memory_order_relaxed); } - Slot& operator=(const Slot& o) { value.store(o.value.load(std::memory_order_relaxed), - std::memory_order_relaxed); return *this; } + Slot(const Slot& o) { value.store(o.value.load(std::memory_order_relaxed), std::memory_order_relaxed); } + Slot& operator=(const Slot& o) { + value.store(o.value.load(std::memory_order_relaxed), std::memory_order_relaxed); + return *this; + } }; bool _enabled = false; - std::vector _slots; - std::vector _free_list; - std::atomic _next_index; + std::vector> _slots; + std::atomic _next_index; + + mutable std::mutex _slot_mutex; }; #endif // _THREADFILTER_H diff --git a/ddprof-lib/src/main/cpp/wallClock.cpp b/ddprof-lib/src/main/cpp/wallClock.cpp index 3f50e2110..94eac971b 100644 --- a/ddprof-lib/src/main/cpp/wallClock.cpp +++ b/ddprof-lib/src/main/cpp/wallClock.cpp @@ -235,6 +235,7 @@ void WallClockJVMTI::timerLoop() { } void WallClockASGCT::timerLoop() { + // todo: re-allocating the vector every time is not efficient auto collectThreads = [&](std::vector& tids) { // Get thread IDs from the filter if it's enabled // Otherwise list all threads in the system From 115a60a2258625e99e6b7ad59996287e3bce18c4 Mon Sep 17 00:00:00 2001 From: r1viollet Date: Mon, 28 Apr 2025 18:43:28 +0200 Subject: [PATCH 07/10] thread filter - add a free list --- ddprof-lib/src/main/cpp/flightRecorder.h | 2 ++ ddprof-lib/src/main/cpp/threadFilter.cpp | 44 +++++++++++++++++++++++- ddprof-lib/src/main/cpp/threadFilter.h | 4 +++ 3 files changed, 49 insertions(+), 1 deletion(-) diff --git a/ddprof-lib/src/main/cpp/flightRecorder.h b/ddprof-lib/src/main/cpp/flightRecorder.h index 4ac0221d8..cd292e2eb 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.h +++ b/ddprof-lib/src/main/cpp/flightRecorder.h @@ -128,6 +128,8 @@ class Recording { static char *_java_command; RecordingBuffer _buf[CONCURRENCY_LEVEL]; + // we have several sets to avoid lock contention + // we have a second dimension to allow a switch in the active set std::unordered_set _thread_ids[CONCURRENCY_LEVEL][2]; std::atomic _active_index{0}; // 0 or 1 globally diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index 84d38c9b8..bf51e0981 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -17,12 +17,24 @@ ThreadFilter::~ThreadFilter() { std::unique_lock lock(_slot_mutex); _slots.clear(); } - ThreadFilter::SlotID ThreadFilter::registerThread() { + int top = _free_list_top.load(std::memory_order_acquire); + for (int i = 0; i < top; ++i) { + int value = _free_list[i].load(std::memory_order_relaxed); + if (value >= 0) { + int expected = value; + if (_free_list[i].compare_exchange_strong(expected, -1, std::memory_order_acq_rel)) { + return value; // Successfully claimed a free slot + } + // If CAS fails, someone else claimed it, continue scanning + } + } + SlotID index = _next_index.fetch_add(1, std::memory_order_relaxed); if (index >= kMaxThreads) { return -1; } + const int outer_idx = index >> kChunkShift; { if (outer_idx < static_cast(_slots.size())) { @@ -46,6 +58,10 @@ void ThreadFilter::clear() { slot.value.store(-1, std::memory_order_relaxed); } } + for (int i = 0; i < kFreeListSize; ++i) { + _free_list[i].store(-1, std::memory_order_relaxed); + } + _free_list_top.store(0, std::memory_order_relaxed); } bool ThreadFilter::accept(SlotID slot_id) const { @@ -73,9 +89,35 @@ void ThreadFilter::remove(SlotID slot_id) { void ThreadFilter::unregisterThread(SlotID slot_id) { if (slot_id < 0) return; + int outer_idx = slot_id >> kChunkShift; int inner_idx = slot_id & kChunkMask; _slots[outer_idx][inner_idx].value.store(-1, std::memory_order_relaxed); + + constexpr int try_limit = 16; + + int top = _free_list_top.load(std::memory_order_acquire); + int limit = top < kFreeListSize ? top : kFreeListSize; + int tries = 0; + + for (int i = 0; i < limit && tries < try_limit; ++i) { + int value = _free_list[i].load(std::memory_order_relaxed); + if (value == -1) { + int expected = -1; + if (_free_list[i].compare_exchange_strong(expected, slot_id, std::memory_order_acq_rel)) { + return; // Successfully claimed empty spot + } + ++tries; // Only count actual CAS attempts + } + } + + // Fallback: append if no empty slot found + int pos = _free_list_top.fetch_add(1, std::memory_order_acq_rel); + if (pos < kFreeListSize) { + _free_list[pos].store(slot_id, std::memory_order_release); + } else { + // Free list overflow: ignore + } } void ThreadFilter::collect(std::vector& tids) const { diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index b78a6c2ae..24cb741e9 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -43,6 +43,10 @@ class ThreadFilter { std::vector> _slots; std::atomic _next_index; + static constexpr int kFreeListSize = 128; + std::atomic _free_list[kFreeListSize]; + std::atomic _free_list_top; // Points to next free slot + mutable std::mutex _slot_mutex; }; From efa094ee588ac0dcf83b57981929d38a7a62d375 Mon Sep 17 00:00:00 2001 From: r1viollet Date: Wed, 25 Jun 2025 11:16:23 +0200 Subject: [PATCH 08/10] Thread filter - add a table with linear probing Remove usage of set to ensure we are signal safe. --- ddprof-lib/src/main/cpp/flightRecorder.cpp | 4 +- ddprof-lib/src/main/cpp/flightRecorder.h | 7 +- ddprof-lib/src/main/cpp/javaApi.cpp | 30 ++++++-- ddprof-lib/src/main/cpp/profiler.cpp | 5 +- ddprof-lib/src/main/cpp/threadIdTable.h | 70 +++++++++++++++++++ .../com/datadoghq/profiler/JavaProfiler.java | 10 +-- 6 files changed, 111 insertions(+), 15 deletions(-) create mode 100644 ddprof-lib/src/main/cpp/threadIdTable.h diff --git a/ddprof-lib/src/main/cpp/flightRecorder.cpp b/ddprof-lib/src/main/cpp/flightRecorder.cpp index 37b2af4f3..fd1f9f885 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.cpp +++ b/ddprof-lib/src/main/cpp/flightRecorder.cpp @@ -1063,8 +1063,8 @@ void Recording::writeThreads(Buffer *buf) { threads.insert(_tid); for (int i = 0; i < CONCURRENCY_LEVEL; ++i) { - // I can not use merge : cpp 17 - threads.insert(_thread_ids[i][old_index].begin(), _thread_ids[i][old_index].end()); + // Collect thread IDs from the fixed-size table into the main set + _thread_ids[i][old_index].collect(threads); _thread_ids[i][old_index].clear(); } diff --git a/ddprof-lib/src/main/cpp/flightRecorder.h b/ddprof-lib/src/main/cpp/flightRecorder.h index cd292e2eb..1df3e611a 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.h +++ b/ddprof-lib/src/main/cpp/flightRecorder.h @@ -35,6 +35,7 @@ #include "mutex.h" #include "objectSampler.h" #include "threadFilter.h" +#include "threadIdTable.h" #include "vmEntry.h" const u64 MAX_JLONG = 0x7fffffffffffffffULL; @@ -128,9 +129,9 @@ class Recording { static char *_java_command; RecordingBuffer _buf[CONCURRENCY_LEVEL]; - // we have several sets to avoid lock contention - // we have a second dimension to allow a switch in the active set - std::unordered_set _thread_ids[CONCURRENCY_LEVEL][2]; + // we have several tables to avoid lock contention + // we have a second dimension to allow a switch in the active table + ThreadIdTable _thread_ids[CONCURRENCY_LEVEL][2]; std::atomic _active_index{0}; // 0 or 1 globally int _fd; diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 779beacbe..da5d82747 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -125,8 +125,8 @@ Java_com_datadoghq_profiler_JavaProfiler_getSamples(JNIEnv *env, // some duplication between add and remove, though we want to avoid having an extra branch in the hot path extern "C" DLLEXPORT void JNICALL -Java_com_datadoghq_profiler_JavaProfiler_filterThread_1add(JNIEnv *env, - jobject unused) { +Java_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(JNIEnv *env, + jobject unused) { ProfiledThread *current = ProfiledThread::current(); int tid = current->tid(); if (unlikely(tid < 0)) { @@ -141,8 +141,8 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThread_1add(JNIEnv *env, } extern "C" DLLEXPORT void JNICALL -Java_com_datadoghq_profiler_JavaProfiler_filterThread_1remove(JNIEnv *env, - jobject unused) { +Java_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0(JNIEnv *env, + jobject unused) { ProfiledThread *current = ProfiledThread::current(); int tid = current->tid(); if (unlikely(tid < 0)) { @@ -156,6 +156,28 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThread_1remove(JNIEnv *env, thread_filter->remove(slot_id); } +// Backward compatibility for existing code +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_filterThread0(JNIEnv *env, + jobject unused, + jboolean enable) { + ProfiledThread *current = ProfiledThread::current(); + int tid = current->tid(); + if (unlikely(tid < 0)) { + return; + } + ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); + int slot_id = current->filterSlotId(); + if (unlikely(slot_id == -1)) { + return; + } + + if (enable) { + thread_filter->add(tid, slot_id); + } else { + thread_filter->remove(slot_id); + } +} extern "C" DLLEXPORT jobject JNICALL Java_com_datadoghq_profiler_JavaProfiler_getContextPage0(JNIEnv *env, diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index 49bb5e3c3..13f57d6e5 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -106,8 +106,9 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { ProfiledThread *current = ProfiledThread::current(); int tid = current->tid(); if (_thread_filter.enabled()) { - current->setFilterSlotId(_thread_filter.registerThread()); - _thread_filter.remove(tid); + int slot_id = _thread_filter.registerThread(); + current->setFilterSlotId(slot_id); + _thread_filter.remove(slot_id); // Remove from filtering initially } updateThreadName(jvmti, jni, thread, true); diff --git a/ddprof-lib/src/main/cpp/threadIdTable.h b/ddprof-lib/src/main/cpp/threadIdTable.h new file mode 100644 index 000000000..9f62331db --- /dev/null +++ b/ddprof-lib/src/main/cpp/threadIdTable.h @@ -0,0 +1,70 @@ +/* + * Copyright The async-profiler authors + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2021, 2025 Datadog, Inc + */ + +#ifndef _THREADIDTABLE_H +#define _THREADIDTABLE_H + +#include +#include + +// Signal-safe thread ID table with fixed size +class ThreadIdTable { +private: + static const int TABLE_SIZE = 256; // Should handle most realistic thread counts + std::atomic table[TABLE_SIZE]; + + int hash(int tid) const { + // Simple hash function - could be improved if needed + return tid % TABLE_SIZE; + } + +public: + ThreadIdTable() { + clear(); + } + + // Signal-safe insertion using atomic operations only + void insert(int tid) { + if (tid == 0) return; // Invalid thread ID, 0 is reserved for empty slots + + int start_slot = hash(tid); + for (int probe = 0; probe < TABLE_SIZE; probe++) { + int slot = (start_slot + probe) % TABLE_SIZE; + int expected = 0; + + // Try to claim empty slot + if (table[slot].compare_exchange_strong(expected, tid, std::memory_order_relaxed)) { + return; // Successfully inserted + } + + // Check if already present + if (table[slot].load(std::memory_order_relaxed) == tid) { + return; // Already exists + } + } + // Table full - thread ID will be lost, but this is rare and non-critical + // Could increment a counter here for diagnostics if needed + } + + // Clear the table (not signal-safe, called during buffer switch) + void clear() { + for (int i = 0; i < TABLE_SIZE; i++) { + table[i].store(0, std::memory_order_relaxed); + } + } + + // Collect all thread IDs into a set (not signal-safe, called during buffer switch) + void collect(std::unordered_set& result) { + for (int i = 0; i < TABLE_SIZE; i++) { + int tid = table[i].load(std::memory_order_relaxed); + if (tid != 0) { + result.insert(tid); + } + } + } +}; + +#endif // _THREADIDTABLE_H \ No newline at end of file diff --git a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java index 0a38637b3..dbfc8c3be 100644 --- a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java +++ b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java @@ -207,7 +207,7 @@ public boolean recordTraceRoot(long rootSpanId, String endpoint, int sizeLimit) * 'filter' option must be enabled to use this method. */ public void addThread() { - filterThread_add(); + filterThreadAdd0(); } /** @@ -215,7 +215,7 @@ public void addThread() { * 'filter' option must be enabled to use this method. */ public void removeThread() { - filterThread_remove(); + filterThreadRemove0(); } /** @@ -446,8 +446,10 @@ public Map getDebugCounters() { private native void stop0() throws IllegalStateException; private native String execute0(String command) throws IllegalArgumentException, IllegalStateException, IOException; - private native void filterThread_add(); - private native void filterThread_remove(); + private native void filterThreadAdd0(); + private native void filterThreadRemove0(); + // Backward compatibility for existing code + private native void filterThread0(boolean enable); private static native int getTid0(); private static native ByteBuffer getContextPage0(int tid); From 1be1c3ace22916d8f13953a914eaba4b9c9abc60 Mon Sep 17 00:00:00 2001 From: r1viollet Date: Wed, 25 Jun 2025 16:18:26 +0200 Subject: [PATCH 09/10] Thread filter bench - Add parameters to run different configurations of the bench --- .../scenarios/ThreadFilterBenchmark.java | 143 ++++++++++++------ 1 file changed, 93 insertions(+), 50 deletions(-) diff --git a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java index d29162e9b..91300d56e 100644 --- a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java +++ b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java @@ -16,6 +16,12 @@ @State(Scope.Benchmark) public class ThreadFilterBenchmark extends Configuration { + @Param({"true", "false"}) // Parameterize the filter usage + public boolean useThreadFilters; + + @Param({"true"}) + public boolean useProfiler; + private static final int NUM_THREADS = 15; private ExecutorService executorService; private JavaProfiler profiler; @@ -24,32 +30,44 @@ public class ThreadFilterBenchmark extends Configuration { private CountDownLatch stopLatch; private AtomicLong operationCount; private long startTime; + private long stopTime; private PrintWriter logWriter; private static final int ARRAY_SIZE = 1024; // Larger array to stress memory private static final int[] sharedArray = new int[ARRAY_SIZE]; private static final AtomicIntegerArray atomicArray = new AtomicIntegerArray(ARRAY_SIZE); private static final int CACHE_LINE_SIZE = 64; // Typical cache line size private static final int STRIDE = CACHE_LINE_SIZE / Integer.BYTES; // Elements per cache line + private AtomicLong addThreadCount = new AtomicLong(0); + private AtomicLong removeThreadCount = new AtomicLong(0); @Setup(Level.Trial) public void setup() throws IOException { System.out.println("Setting up benchmark..."); + System.out.println("Thread filters enabled: " + useThreadFilters); System.out.println("Creating thread pool with " + NUM_THREADS + " threads"); executorService = Executors.newFixedThreadPool(NUM_THREADS); System.out.println("Getting profiler instance"); profiler = JavaProfiler.getInstance(); - System.out.println("Stopping any existing profiler session"); + + // Stop the profiler if it's already running try { profiler.stop(); - } catch (Exception e) { - // Ignore if profiler wasn't started + } catch (IllegalStateException e) { + System.out.println("Profiler was not active at setup."); + } + + if (useProfiler) { + String config = "start,wall=10ms,filter=1,file=/tmp/thread_filter_profile.jfr"; + System.out.println("Starting profiler with " + config); + profiler.execute(config); + System.out.println("Started profiler with output file"); + } else { + System.out.println("Profiler is disabled for this run."); } - System.out.println("Starting profiler with wall=1ms,filter=0,file=/tmp/thread_filter_profile.jfr"); - profiler.execute("start,wall=1ms,filter=0,file=/tmp/thread_filter_profile.jfr"); - System.out.println("Started profiler with output file"); running = new AtomicBoolean(true); operationCount = new AtomicLong(0); startTime = System.currentTimeMillis(); + stopTime = startTime + 30000; // Run for 30 seconds System.out.println("Benchmark setup completed at " + startTime); try { @@ -74,7 +92,7 @@ public void tearDown() { // Wait for all threads to finish with a timeout try { if (stopLatch != null) { - if (!stopLatch.await(5, TimeUnit.SECONDS)) { + if (!stopLatch.await(30, TimeUnit.SECONDS)) { System.err.println("Warning: Some threads did not finish within timeout"); } } @@ -85,9 +103,9 @@ public void tearDown() { // Shutdown executor with timeout executorService.shutdown(); try { - if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) { executorService.shutdownNow(); - if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) { System.err.println("Warning: Executor did not terminate"); } } @@ -96,18 +114,31 @@ public void tearDown() { Thread.currentThread().interrupt(); } - profiler.stop(); + // Stop the profiler if it's active and was started + if (useProfiler) { + try { + profiler.stop(); + System.out.println("Profiler stopped."); + } catch (IllegalStateException e) { + System.out.println("Profiler was not active at teardown."); + } + } + long endTime = System.currentTimeMillis(); long totalOps = operationCount.get(); double durationSecs = (endTime - startTime) / 1000.0; double opsPerSec = totalOps / durationSecs; + double addOpsPerSec = addThreadCount.get() / durationSecs; + double removeOpsPerSec = removeThreadCount.get() / durationSecs; String stats = String.format("Thread Filter Stats:%n" + "Total operations: %,d%n" + "Duration: %.2f seconds%n" + "Operations/second: %,.0f%n" + - "Operations/second/thread: %,.0f%n", - totalOps, durationSecs, opsPerSec, opsPerSec / NUM_THREADS); + "Operations/second/thread: %,.0f%n" + + "AddThread operations/second: %,.0f%n" + + "RemoveThread operations/second: %,.0f%n", + totalOps, durationSecs, opsPerSec, opsPerSec / NUM_THREADS, addOpsPerSec, removeOpsPerSec); System.out.print(stats); if (logWriter != null) { @@ -123,26 +154,32 @@ public void tearDown() { } } + public void setUseThreadFilters(boolean useThreadFilters) { + this.useThreadFilters = useThreadFilters; + } + @Benchmark @BenchmarkMode(Mode.Throughput) - @Fork(value = 1, warmups = 0) + @Fork(value = 1, warmups = 1) @Warmup(iterations = 1, time = 1) @Measurement(iterations = 1, time = 2) - @Threads(Threads.MAX) + @Threads(1) @OutputTimeUnit(TimeUnit.MILLISECONDS) public long threadFilterStress() throws InterruptedException { System.out.println("Starting benchmark iteration..."); startLatch = new CountDownLatch(NUM_THREADS); stopLatch = new CountDownLatch(NUM_THREADS); - // Start all worker threads + // Start all worker threads[] for (int i = 0; i < NUM_THREADS; i++) { final int threadId = i; executorService.submit(() -> { try { startLatch.countDown(); - startLatch.await(5, TimeUnit.SECONDS); - + startLatch.await(30, TimeUnit.SECONDS); + if (useProfiler) { + profiler.addThread(); // Initial registration if profiler is on + } String startMsg = String.format("Thread %d started%n", threadId); System.out.print(startMsg); if (logWriter != null) { @@ -150,12 +187,15 @@ public long threadFilterStress() throws InterruptedException { logWriter.flush(); } - while (running.get()) { - // Register thread - profiler.addThread(); - + while (running.get() && System.currentTimeMillis() < stopTime) { // Memory-intensive operations that would be sensitive to false sharing for (int j = 0; j < ARRAY_SIZE; j += STRIDE) { + if (useThreadFilters && useProfiler) { + // Register thread at the start of each cache line operation + profiler.addThread(); + addThreadCount.incrementAndGet(); + } + // Each thread writes to its own cache line int baseIndex = (threadId * STRIDE) % ARRAY_SIZE; for (int k = 0; k < STRIDE; k++) { @@ -167,53 +207,56 @@ public long threadFilterStress() throws InterruptedException { // Atomic operation atomicArray.set(index, value); } + + if (useThreadFilters && useProfiler) { + // Remove thread after cache line operation + profiler.removeThread(); + removeThreadCount.incrementAndGet(); + } + operationCount.incrementAndGet(); } - - operationCount.incrementAndGet(); - - // Remove thread - profiler.removeThread(); - - // More memory operations + + // More memory operations with thread registration for (int j = 0; j < ARRAY_SIZE; j += STRIDE) { + if (useThreadFilters && useProfiler) { + // Register thread at the start of each cache line operation + profiler.addThread(); + addThreadCount.incrementAndGet(); + } + int baseIndex = (threadId * STRIDE) % ARRAY_SIZE; for (int k = 0; k < STRIDE; k++) { int index = (baseIndex + k) % ARRAY_SIZE; int value = atomicArray.get(index); sharedArray[index] = value * 2; } - } - - operationCount.incrementAndGet(); - - if (operationCount.get() % 1000 == 0) { - String progressMsg = String.format("Thread %d completed %d operations%n", threadId, operationCount.get()); - System.out.print(progressMsg); - if (logWriter != null) { - logWriter.print(progressMsg); - logWriter.flush(); + + if (useThreadFilters && useProfiler) { + // Remove thread after cache line operation + profiler.removeThread(); + removeThreadCount.incrementAndGet(); } + operationCount.incrementAndGet(); } + + // if (operationCount.get() % 1000 == 0) { + // String progressMsg = String.format("Thread %d completed %d operations%n", threadId, operationCount.get()); + // System.out.print(progressMsg); + // if (logWriter != null) { + // logWriter.print(progressMsg); + // logWriter.flush(); + // } + // } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { stopLatch.countDown(); - String finishMsg = String.format("Thread %d finished%n", threadId); - System.out.print(finishMsg); - if (logWriter != null) { - logWriter.print(finishMsg); - logWriter.flush(); - } } }); } - // Wait for all threads to finish with timeout - if (!stopLatch.await(5, TimeUnit.SECONDS)) { - System.err.println("Warning: Benchmark did not complete within timeout"); - } - + stopLatch.await(); return operationCount.get(); } -} \ No newline at end of file +} \ No newline at end of file From 60ef0601ab11807674bb5be7287671b74c67e27e Mon Sep 17 00:00:00 2001 From: r1viollet Date: Wed, 25 Jun 2025 18:08:05 +0200 Subject: [PATCH 10/10] Minor thread filter adjustement remove the logs --- .../stresstest/scenarios/ThreadFilterBenchmark.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java index 3e6c8e163..cd933fdf1 100644 --- a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java +++ b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/ThreadFilterBenchmark.java @@ -40,6 +40,7 @@ public class ThreadFilterBenchmark extends Configuration { @Setup(Level.Trial) public void setup() throws IOException { System.out.println("Setting up benchmark..."); + System.out.println("Thread filters enabled: " + useThreadFilters); System.out.println("Creating thread pool with " + NUM_THREADS + " threads"); executorService = Executors.newFixedThreadPool(NUM_THREADS); System.out.println("Getting profiler instance"); @@ -110,6 +111,7 @@ public void tearDown() { // Stop the profiler if it's active try { profiler.stop(); + System.out.println("Profiler stopped."); } catch (IllegalStateException e) { System.out.println("Profiler was not active at teardown."); } @@ -144,6 +146,10 @@ public void tearDown() { } } + public void setUseThreadFilters(boolean useThreadFilters) { + this.useThreadFilters = useThreadFilters; + } + @Benchmark @BenchmarkMode(Mode.Throughput) @Fork(value = 1, warmups = 1) @@ -156,7 +162,7 @@ public long threadFilterStress() throws InterruptedException { startLatch = new CountDownLatch(NUM_THREADS); stopLatch = new CountDownLatch(NUM_THREADS); - // Start all worker threads + // Start all worker threads[] for (int i = 0; i < NUM_THREADS; i++) { final int threadId = i; executorService.submit(() -> { @@ -242,4 +248,4 @@ public long threadFilterStress() throws InterruptedException { stopLatch.await(); return operationCount.get(); } -} +} \ No newline at end of file