From 94b2559432a4dc6ef2a71a68667f459e9d6bc1ef Mon Sep 17 00:00:00 2001 From: r1viollet Date: Mon, 7 Jul 2025 09:49:19 +0200 Subject: [PATCH 01/13] Thread filter optim - Reserve padded slots - Introduce a register / unregister to retrieve slots - manage a free list --- ddprof-lib/src/main/cpp/flightRecorder.cpp | 39 +- ddprof-lib/src/main/cpp/flightRecorder.h | 13 +- ddprof-lib/src/main/cpp/javaApi.cpp | 92 +++-- ddprof-lib/src/main/cpp/profiler.cpp | 15 +- ddprof-lib/src/main/cpp/thread.h | 6 +- ddprof-lib/src/main/cpp/threadFilter.cpp | 359 +++++++++++------- ddprof-lib/src/main/cpp/threadFilter.h | 132 +++---- ddprof-lib/src/main/cpp/threadIdTable.h | 70 ++++ ddprof-lib/src/main/cpp/wallClock.cpp | 38 +- .../com/datadoghq/profiler/JavaProfiler.java | 18 +- ddprof-lib/src/test/cpp/ddprof_ut.cpp | 117 +++++- .../counters/ThreadFilterBenchmark.java | 32 +- 12 files changed, 617 insertions(+), 314 deletions(-) create mode 100644 ddprof-lib/src/main/cpp/threadIdTable.h diff --git a/ddprof-lib/src/main/cpp/flightRecorder.cpp b/ddprof-lib/src/main/cpp/flightRecorder.cpp index 12472a297..ab233e643 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.cpp +++ b/ddprof-lib/src/main/cpp/flightRecorder.cpp @@ -318,7 +318,7 @@ char *Recording::_jvm_flags = NULL; char *Recording::_java_command = NULL; Recording::Recording(int fd, Arguments &args) - : _fd(fd), _thread_set(), _method_map() { + : _fd(fd), _method_map() { args.save(_args); _chunk_start = lseek(_fd, 0, SEEK_END); @@ -330,6 +330,8 @@ Recording::Recording(int fd, Arguments &args) _bytes_written = 0; _tid = OS::threadId(); + _active_index.store(0, std::memory_order_relaxed); + VM::jvmti()->GetAvailableProcessors(&_available_processors); writeHeader(_buf); @@ -1059,11 +1061,18 @@ void Recording::writeExecutionModes(Buffer *buf) { } void Recording::writeThreads(Buffer *buf) { - addThread(_tid); - std::vector threads; - threads.reserve(_thread_set.size()); - _thread_set.collect(threads); - _thread_set.clear(); + int old_index = _active_index.fetch_xor(1, std::memory_order_acq_rel); + // After flip: new samples go into the new active set + // We flush from old_index (the previous active set) + + std::unordered_set threads; + threads.insert(_tid); + + for (int i = 0; i < CONCURRENCY_LEVEL; ++i) { + // Collect thread IDs from the fixed-size table into the main set + _thread_ids[i][old_index].collect(threads); + _thread_ids[i][old_index].clear(); + } Profiler *profiler = Profiler::instance(); ThreadInfo *t_info = &profiler->_thread_info; @@ -1072,15 +1081,15 @@ void Recording::writeThreads(Buffer *buf) { buf->putVar64(T_THREAD); buf->putVar64(threads.size()); - for (int i = 0; i < threads.size(); i++) { + for (auto tid : threads) { const char *thread_name; jlong thread_id; - std::pair, u64> info = t_info->get(threads[i]); + std::pair, u64> info = t_info->get(tid); if (info.first) { thread_name = info.first->c_str(); thread_id = info.second; } else { - snprintf(name_buf, sizeof(name_buf), "[tid=%d]", threads[i]); + snprintf(name_buf, sizeof(name_buf), "[tid=%d]", tid); thread_name = name_buf; thread_id = 0; } @@ -1090,9 +1099,9 @@ void Recording::writeThreads(Buffer *buf) { (thread_id == 0 ? length + 1 : 2 * length) - 3 * 10; // 3x max varint length flushIfNeeded(buf, required); - buf->putVar64(threads[i]); + buf->putVar64(tid); buf->putUtf8(thread_name, length); - buf->putVar64(threads[i]); + buf->putVar64(tid); if (thread_id == 0) { buf->put8(0); } else { @@ -1442,7 +1451,11 @@ void Recording::recordCpuLoad(Buffer *buf, float proc_user, float proc_system, flushIfNeeded(buf); } -void Recording::addThread(int tid) { _thread_set.add(tid); } +// assumption is that we hold the lock (with lock_index) +void Recording::addThread(int lock_index, int tid) { + int active = _active_index.load(std::memory_order_acquire); + _thread_ids[lock_index][active].insert(tid); +} Error FlightRecorder::start(Arguments &args, bool reset) { const char *file = args.file(); @@ -1599,7 +1612,7 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u32 call_trace_id, break; } _rec->flushIfNeeded(buf); - _rec->addThread(tid); + _rec->addThread(lock_index, tid); } } diff --git a/ddprof-lib/src/main/cpp/flightRecorder.h b/ddprof-lib/src/main/cpp/flightRecorder.h index 22c4b3d33..81ed85bce 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.h +++ b/ddprof-lib/src/main/cpp/flightRecorder.h @@ -18,6 +18,7 @@ #define _FLIGHTRECORDER_H #include +#include #include #include @@ -34,6 +35,7 @@ #include "mutex.h" #include "objectSampler.h" #include "threadFilter.h" +#include "threadIdTable.h" #include "vmEntry.h" const u64 MAX_JLONG = 0x7fffffffffffffffULL; @@ -127,9 +129,13 @@ class Recording { static char *_java_command; RecordingBuffer _buf[CONCURRENCY_LEVEL]; + // we have several tables to avoid lock contention + // we have a second dimension to allow a switch in the active table + ThreadIdTable _thread_ids[CONCURRENCY_LEVEL][2]; + std::atomic _active_index{0}; // 0 or 1 globally + int _fd; off_t _chunk_start; - ThreadFilter _thread_set; MethodMap _method_map; Arguments _args; @@ -158,7 +164,7 @@ class Recording { public: Recording(int fd, Arguments &args); ~Recording(); - + void copyTo(int target_fd); off_t finishChunk(); @@ -258,7 +264,8 @@ class Recording { LockEvent *event); void recordCpuLoad(Buffer *buf, float proc_user, float proc_system, float machine_total); - void addThread(int tid); + + void addThread(int lock_index, int tid); }; class Lookup { diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 017f23c45..8b3e0ff0b 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -123,19 +123,82 @@ Java_com_datadoghq_profiler_JavaProfiler_getSamples(JNIEnv *env, return (jlong)Profiler::instance()->total_samples(); } +// some duplication between add and remove, though we want to avoid having an extra branch in the hot path +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(JNIEnv *env, + jobject unused) { + ProfiledThread *current = ProfiledThread::current(); + if (unlikely(current == nullptr)) { + printf("[DEBUG] ProfiledThread::current() returned null in addThread() - thread not initialized\n"); + fflush(stdout); + return; + } + int tid = current->tid(); + if (unlikely(tid < 0)) { + return; + } + ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); + if (unlikely(!thread_filter->enabled())) { + return; + } + + int slot_id = current->filterSlotId(); + if (unlikely(slot_id == -1)) { + return; + } + thread_filter->add(tid, slot_id); +} + +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0(JNIEnv *env, + jobject unused) { + ProfiledThread *current = ProfiledThread::current(); + if (unlikely(current == nullptr)) { + return; + } + int tid = current->tid(); + if (unlikely(tid < 0)) { + return; + } + ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); + if (unlikely(!thread_filter->enabled())) { + return; + } + + int slot_id = current->filterSlotId(); + if (unlikely(slot_id == -1)) { + return; + } + thread_filter->remove(slot_id); +} + +// Backward compatibility for existing code extern "C" DLLEXPORT void JNICALL Java_com_datadoghq_profiler_JavaProfiler_filterThread0(JNIEnv *env, jobject unused, jboolean enable) { - int tid = ProfiledThread::currentTid(); - if (tid < 0) { + ProfiledThread *current = ProfiledThread::current(); + if (unlikely(current == nullptr)) { + return; + } + int tid = current->tid(); + if (unlikely(tid < 0)) { return; } ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); + if (unlikely(!thread_filter->enabled())) { + return; + } + + int slot_id = current->filterSlotId(); + if (unlikely(slot_id == -1)) { + return; + } + if (enable) { - thread_filter->add(tid); + thread_filter->add(tid, slot_id); } else { - thread_filter->remove(tid); + thread_filter->remove(slot_id); } } @@ -406,24 +469,3 @@ Java_com_datadoghq_profiler_JVMAccess_healthCheck0(JNIEnv *env, jobject unused) { return true; } - -extern "C" DLLEXPORT jlong JNICALL -Java_com_datadoghq_profiler_ActiveBitmap_bitmapAddressFor0(JNIEnv *env, - jclass unused, - jint tid) { - u64* bitmap = Profiler::instance()->threadFilter()->bitmapAddressFor((int)tid); - return (jlong)bitmap; -} - -extern "C" DLLEXPORT jboolean JNICALL -Java_com_datadoghq_profiler_ActiveBitmap_isActive0(JNIEnv *env, - jclass unused, - jint tid) { - return Profiler::instance()->threadFilter()->accept((int)tid) ? JNI_TRUE : JNI_FALSE; -} - -extern "C" DLLEXPORT jlong JNICALL -Java_com_datadoghq_profiler_ActiveBitmap_getActiveCountAddr0(JNIEnv *env, - jclass unused) { - return (jlong)Profiler::instance()->threadFilter()->addressOfSize(); -} diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index 30a4576b7..3c2d3988e 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -104,10 +104,12 @@ void Profiler::addRuntimeStub(const void *address, int length, void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { ProfiledThread::initCurrentThread(); - - int tid = ProfiledThread::currentTid(); + ProfiledThread *current = ProfiledThread::current(); + int tid = current->tid(); if (_thread_filter.enabled()) { - _thread_filter.remove(tid); + int slot_id = _thread_filter.registerThread(); + current->setFilterSlotId(slot_id); + _thread_filter.remove(slot_id); // Remove from filtering initially } updateThreadName(jvmti, jni, thread, true); @@ -116,9 +118,12 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { } void Profiler::onThreadEnd(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { - int tid = ProfiledThread::currentTid(); + ProfiledThread *current = ProfiledThread::current(); + int slot_id = current->filterSlotId(); + int tid = current->tid(); if (_thread_filter.enabled()) { - _thread_filter.remove(tid); + _thread_filter.unregisterThread(slot_id); + current->setFilterSlotId(-1); } updateThreadName(jvmti, jni, thread, true); diff --git a/ddprof-lib/src/main/cpp/thread.h b/ddprof-lib/src/main/cpp/thread.h index 264ad6b8b..bba5145a2 100644 --- a/ddprof-lib/src/main/cpp/thread.h +++ b/ddprof-lib/src/main/cpp/thread.h @@ -43,11 +43,12 @@ class ProfiledThread : public ThreadLocalData { u32 _wall_epoch; u32 _call_trace_id; u32 _recording_epoch; + int _filter_slot_id; // Slot ID for thread filtering UnwindFailures _unwind_failures; ProfiledThread(int buffer_pos, int tid) : ThreadLocalData(), _pc(0), _span_id(0), _crash_depth(0), _buffer_pos(buffer_pos), _tid(tid), _cpu_epoch(0), - _wall_epoch(0), _call_trace_id(0), _recording_epoch(0) {}; + _wall_epoch(0), _call_trace_id(0), _recording_epoch(0), _filter_slot_id(-1) {}; void releaseFromBuffer(); @@ -120,6 +121,9 @@ class ProfiledThread : public ThreadLocalData { } static void signalHandler(int signo, siginfo_t *siginfo, void *ucontext); + + int filterSlotId() { return _filter_slot_id; } + void setFilterSlotId(int slotId) { _filter_slot_id = slotId; } }; #endif // _THREAD_H diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index 13e6c2ae0..dfc3d09e6 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -1,175 +1,254 @@ -/* - * Copyright 2020 Andrei Pangin - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +// Copyright (C) Datadog 2025 +// High-performance lock-free thread filter implementation #include "threadFilter.h" -#include "counters.h" -#include "os.h" -#include "reverse_bits.h" -#include -#include -#include - -void trackPage() { - Counters::increment(THREAD_FILTER_PAGES, 1); - Counters::increment(THREAD_FILTER_BYTES, BITMAP_SIZE); -} -ThreadFilter::ThreadFilter() { - _max_thread_id = OS::getMaxThreadId(128 * 1024); - _max_bitmaps = (_max_thread_id + BITMAP_SIZE - 1) / BITMAP_SIZE; - u32 capacity = _max_bitmaps * sizeof(u64 *); - _bitmap = (u64 **)OS::safeAlloc(capacity); - memset(_bitmap, 0, capacity); - _bitmap[0] = (u64 *)OS::safeAlloc(BITMAP_SIZE); - trackPage(); - _enabled = false; - _size = 0; +#include +#include +#include + +ThreadFilter::ThreadFilter() : _enabled(false) { + // Initialize chunk pointers to null (lazy allocation) + for (int i = 0; i < kMaxChunks; ++i) { + _chunks[i].store(nullptr, std::memory_order_relaxed); + } + _free_list = std::make_unique(kFreeListSize); + + // Initialize the first chunk + initializeChunk(0); + clear(); } ThreadFilter::~ThreadFilter() { - for (int i = 0; i < _max_bitmaps; i++) { - if (_bitmap[i] != NULL) { - OS::safeFree(_bitmap[i], BITMAP_SIZE); - } - } - if (_bitmap) { - OS::safeFree(_bitmap, _max_bitmaps * sizeof(u64 *)); - } + // Clean up allocated chunks + for (int i = 0; i < kMaxChunks; ++i) { + ChunkStorage* chunk = _chunks[i].load(std::memory_order_relaxed); + delete chunk; + } } -void ThreadFilter::init(const char *filter) { - if (filter == NULL) { - _enabled = false; - return; - } - - char *end; - do { - int id = strtol(filter, &end, 0); - if (id <= 0) { - break; +void ThreadFilter::initializeChunk(int chunk_idx) { + if (chunk_idx >= kMaxChunks) return; + + // Check if chunk already exists + ChunkStorage* existing = _chunks[chunk_idx].load(std::memory_order_acquire); + if (existing != nullptr) return; + + // Allocate new chunk + ChunkStorage* new_chunk = new ChunkStorage(); + + // Try to install it atomically + ChunkStorage* expected = nullptr; + if (_chunks[chunk_idx].compare_exchange_strong(expected, new_chunk, std::memory_order_acq_rel)) { + // Successfully installed - initialize all slots + for (auto& slot : new_chunk->slots) { + slot.value.store(-1, std::memory_order_relaxed); + } + new_chunk->initialized.store(true, std::memory_order_release); + } else { + // Another thread beat us to it - clean up our allocation + delete new_chunk; } +} - if (*end == '-') { - int to = strtol(end + 1, &end, 0); - while (id <= to) { - add(id++); - } - } else { - add(id); +ThreadFilter::SlotID ThreadFilter::registerThread() { + // First, try to get a slot from the free list (lock-free stack) + SlotID reused_slot = popFromFreeList(); + if (reused_slot >= 0) { + return reused_slot; } - filter = end + 1; - } while (*end); + // Allocate a new slot + SlotID index = _next_index.fetch_add(1, std::memory_order_relaxed); + if (index >= kMaxThreads) { + // Revert the increment and return failure + _next_index.fetch_sub(1, std::memory_order_relaxed); + return -1; + } - _enabled = true; + const int chunk_idx = index >> kChunkShift; + + // Ensure the chunk is initialized (lock-free) + if (chunk_idx >= _num_chunks.load(std::memory_order_acquire)) { + // Update the chunk count atomically + int expected_chunks = chunk_idx; + int desired_chunks = chunk_idx + 1; + while (!_num_chunks.compare_exchange_weak(expected_chunks, desired_chunks, + std::memory_order_acq_rel)) { + if (expected_chunks > chunk_idx) { + break; // Another thread already updated it + } + desired_chunks = expected_chunks + 1; + } + } + + // Initialize the chunk if needed + initializeChunk(chunk_idx); + + return index; } void ThreadFilter::clear() { - for (int i = 0; i < _max_bitmaps; i++) { - if (_bitmap[i] != NULL) { - memset(_bitmap[i], 0, BITMAP_SIZE); + // Clear all initialized chunks + int num_chunks = _num_chunks.load(std::memory_order_relaxed); + for (int i = 0; i < num_chunks; ++i) { + ChunkStorage* chunk = _chunks[i].load(std::memory_order_relaxed); + if (chunk != nullptr) { + for (auto& slot : chunk->slots) { + slot.value.store(-1, std::memory_order_relaxed); + } + } + } + + // Clear the free list + for (int i = 0; i < kFreeListSize; ++i) { + _free_list[i].value.store(-1, std::memory_order_relaxed); + _free_list[i].next.store(-1, std::memory_order_relaxed); } - } - _size = 0; + _free_list_head.store(-1, std::memory_order_relaxed); + _active_slots.store(0, std::memory_order_relaxed); } -// The mapping has to be reversible: f(f(x)) == x -int ThreadFilter::mapThreadId(int thread_id) { - // We want to map the thread_id inside the same bitmap - static_assert(BITMAP_SIZE >= (u16)0xffff, "Potential verflow"); - u16 lower16 = (u16)(thread_id & 0xffff); - lower16 = reverse16(lower16); - int tid = (thread_id & ~0xffff) | lower16; - return tid; +bool ThreadFilter::accept(SlotID slot_id) const { + if (!_enabled) { + return true; + } + if (slot_id < 0) return false; + + int chunk_idx = slot_id >> kChunkShift; + int slot_idx = slot_id & kChunkMask; + + if (chunk_idx >= kMaxChunks) return false; + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed); + if (chunk == nullptr) return false; // Fail-fast if not allocated + + return chunk->slots[slot_idx].value.load(std::memory_order_acquire) != -1; } -// Get bitmap that contains the thread id, create one if it does not exist -u64* ThreadFilter::getBitmapFor(int thread_id) { - int index = thread_id / BITMAP_CAPACITY; - assert(index >= 0 && index < (int)_max_bitmaps); - u64* b = _bitmap[index]; - if (b == NULL) { - b = (u64 *)OS::safeAlloc(BITMAP_SIZE); - u64 *oldb = __sync_val_compare_and_swap( - &_bitmap[index], NULL, b); - if (oldb != NULL) { - OS::safeFree(b, BITMAP_SIZE); - b = oldb; - } else { - trackPage(); +void ThreadFilter::add(int tid, SlotID slot_id) { + if (slot_id < 0) return; + + int chunk_idx = slot_id >> kChunkShift; + int slot_idx = slot_id & kChunkMask; + + if (chunk_idx >= kMaxChunks) return; + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed); + if (chunk == nullptr) return; // Fail-fast if not allocated + + // Store the tid and increment active slots if this was previously empty + int old_value = chunk->slots[slot_idx].value.exchange(tid, std::memory_order_acq_rel); + if (old_value == -1) { + _active_slots.fetch_add(1, std::memory_order_relaxed); } - } - return b; } -u64* ThreadFilter::bitmapAddressFor(int thread_id) { - u64* b = getBitmapFor(thread_id); - thread_id = mapThreadId(thread_id); - assert(b == bitmap(thread_id)); - return wordAddress(b, thread_id); +void ThreadFilter::remove(SlotID slot_id) { + if (slot_id < 0) return; + + int chunk_idx = slot_id >> kChunkShift; + int slot_idx = slot_id & kChunkMask; + + if (chunk_idx >= kMaxChunks) return; + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed); + if (chunk == nullptr) return; // Fail-fast if not allocated + + // Remove the tid and decrement active slots if this was previously occupied + int old_value = chunk->slots[slot_idx].value.exchange(-1, std::memory_order_acq_rel); + if (old_value != -1) { + _active_slots.fetch_sub(1, std::memory_order_relaxed); + } } -bool ThreadFilter::accept(int thread_id) { - thread_id = mapThreadId(thread_id); - u64 *b = bitmap(thread_id); - return b != NULL && (word(b, thread_id) & (1ULL << (thread_id & 0x3f))); +void ThreadFilter::unregisterThread(SlotID slot_id) { + if (slot_id < 0) return; + + // Clear the slot first + remove(slot_id); + + // Add to free list for reuse + pushToFreeList(slot_id); } -void ThreadFilter::add(int thread_id) { - u64 *b = getBitmapFor(thread_id); - assert (b != NULL); - thread_id = mapThreadId(thread_id); - assert(b == bitmap(thread_id)); - u64 bit = 1ULL << (thread_id & 0x3f); - if (!(__atomic_fetch_or(&word(b, thread_id), bit, __ATOMIC_RELAXED) & bit)) { - atomicInc(_size); - } +bool ThreadFilter::pushToFreeList(SlotID slot_id) { + // Lock-free Treiber stack push + for (int i = 0; i < kFreeListSize; ++i) { + int expected = -1; + if (_free_list[i].value.compare_exchange_strong(expected, slot_id, std::memory_order_acq_rel)) { + // Successfully stored in this slot + int old_head = _free_list_head.load(std::memory_order_acquire); + do { + _free_list[i].next.store(old_head, std::memory_order_relaxed); + } while (!_free_list_head.compare_exchange_weak(old_head, i, std::memory_order_acq_rel)); + return true; + } + } + return false; // Free list full, slot is lost but this is rare } -void ThreadFilter::remove(int thread_id) { - thread_id = mapThreadId(thread_id); - u64 *b = bitmap(thread_id); - if (b == NULL) { - return; - } - - u64 bit = 1ULL << (thread_id & 0x3f); - if (__atomic_fetch_and(&word(b, thread_id), ~bit, __ATOMIC_RELAXED) & bit) { - atomicInc(_size, -1); - } +ThreadFilter::SlotID ThreadFilter::popFromFreeList() { + // Lock-free Treiber stack pop + while (true) { + int head = _free_list_head.load(std::memory_order_acquire); + if (head == -1) { + return -1; // Empty list + } + + int slot_id = _free_list[head].value.load(std::memory_order_acquire); + int next = _free_list[head].next.load(std::memory_order_acquire); + + // Try to update the head + if (_free_list_head.compare_exchange_weak(head, next, std::memory_order_acq_rel)) { + // Clear the node + _free_list[head].value.store(-1, std::memory_order_relaxed); + _free_list[head].next.store(-1, std::memory_order_relaxed); + return slot_id; + } + // Retry if another thread modified the head + } } -void ThreadFilter::collect(std::vector &v) { - for (int i = 0; i < _max_bitmaps; i++) { - u64 *b = _bitmap[i]; - if (b != NULL) { - int start_id = i * BITMAP_CAPACITY; - for (int j = 0; j < BITMAP_SIZE / sizeof(u64); j++) { - // Considering the functional impact, relaxed could be a reasonable - // order here - u64 word = __atomic_load_n(&b[j], __ATOMIC_ACQUIRE); - while (word != 0) { - int tid = start_id + j * 64 + __builtin_ctzl(word); - // restore thread id - tid = mapThreadId(tid); - v.push_back(tid); - word &= (word - 1); +void ThreadFilter::collect(std::vector& tids) const { + tids.clear(); + + // Early exit if no active slots + int active_count = _active_slots.load(std::memory_order_relaxed); + if (active_count == 0) { + return; + } + + // Reserve space for efficiency + tids.reserve(active_count); + + // Scan only initialized chunks + int num_chunks = _num_chunks.load(std::memory_order_relaxed); + for (int chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx) { + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed); + if (chunk == nullptr) { + continue; // Skip unallocated chunks + } + + for (const auto& slot : chunk->slots) { + int slot_tid = slot.value.load(std::memory_order_acquire); + if (slot_tid != -1) { + tids.push_back(slot_tid); + } } - } } - } + + // Optional: shrink if we over-reserved significantly + if (tids.capacity() > tids.size() * 2) { + tids.shrink_to_fit(); + } +} + +void ThreadFilter::init(const char* filter) { + if (!filter) { + return; + } + // TODO: Implement parsing of filter string if needed + _enabled = true; +} + +bool ThreadFilter::enabled() const { + return _enabled; } diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index 7454be576..1cf42ee56 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -1,85 +1,77 @@ -/* - * Copyright 2020 Andrei Pangin - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - #ifndef _THREADFILTER_H #define _THREADFILTER_H -#include "arch_dd.h" +#include +#include #include +#include +#include -// The size of thread ID bitmap in bytes. Must be at least 64K to allow mmap() -const u32 BITMAP_SIZE = 65536; -// How many thread IDs one bitmap can hold -const u32 BITMAP_CAPACITY = BITMAP_SIZE * 8; - -// ThreadFilter query operations must be lock-free and signal-safe; -// update operations are mostly lock-free, except rare bitmap allocations class ThreadFilter { -private: - // Total number of bitmaps required to hold the entire range of thread IDs - u32 _max_thread_id; - u32 _max_bitmaps; - u64 **_bitmap; - bool _enabled; - volatile int _size; - - u64 *bitmap(int thread_id) { - if (thread_id >= _max_thread_id) { - return NULL; - } - return __atomic_load_n( - &(_bitmap[static_cast(thread_id) / BITMAP_CAPACITY]), - __ATOMIC_ACQUIRE); - } - - static int mapThreadId(int thread_id); - - u64 &word(u64 *bitmap, int thread_id) { - // todo: add thread safe APIs - return bitmap[((u32)thread_id % BITMAP_CAPACITY) >> 6]; - } - - u64* const wordAddress(u64 *bitmap, int thread_id) const { - return &bitmap[((u32)thread_id % BITMAP_CAPACITY) >> 6]; - } - - u64* getBitmapFor(int thread_id); public: - ThreadFilter(); - ThreadFilter(ThreadFilter &threadFilter) = delete; - ~ThreadFilter(); + using SlotID = int; - bool enabled() const { return _enabled; } + // Optimized limits for reasonable memory usage + static constexpr int kChunkSize = 256; + static constexpr int kChunkShift = 8; // log2(256) + static constexpr int kChunkMask = kChunkSize - 1; + static constexpr int kMaxThreads = 2048; + static constexpr int kMaxChunks = (kMaxThreads + kChunkSize - 1) / kChunkSize; // = 8 chunks - int size() const { return _size; } - const volatile int* addressOfSize() const { return &_size; } + ThreadFilter(); + ~ThreadFilter(); - void init(const char *filter); - void clear(); + void init(const char* filter); + void clear(); + bool enabled() const; + bool accept(SlotID slot_id) const; + void add(int tid, SlotID slot_id); + void remove(SlotID slot_id); + void collect(std::vector& tids) const; - inline bool isValid(int thread_id) { - return thread_id >= 0 && thread_id < _max_thread_id; - } + SlotID registerThread(); + void unregisterThread(SlotID slot_id); - bool accept(int thread_id); - void add(int thread_id); - void remove(int thread_id); - u64* bitmapAddressFor(int thread_id); - - void collect(std::vector &v); +private: + // Optimized slot structure with padding to avoid false sharing + struct alignas(64) Slot { + std::atomic value{-1}; + char padding[60]; // Pad to cache line size + }; + + // Lock-free free list using a stack-like structure + struct FreeListNode { + std::atomic value{-1}; + std::atomic next{-1}; + }; + + // Pre-allocated chunk storage to eliminate mutex contention + struct ChunkStorage { + std::array slots; + std::atomic initialized{false}; + }; + + bool _enabled = false; + + // Lazily allocated storage for chunks + std::atomic _chunks[kMaxChunks]; + std::atomic _num_chunks{1}; + + // Lock-free slot allocation + std::atomic _next_index{0}; + + // High-performance free list using Treiber stack + static constexpr int kFreeListSize = 1024; // Increased from 128 + std::unique_ptr _free_list; + std::atomic _free_list_head{-1}; + + // Active slot tracking for efficient collect() + std::atomic _active_slots{0}; + + // Helper methods for lock-free operations + void initializeChunk(int chunk_idx); + bool pushToFreeList(SlotID slot_id); + SlotID popFromFreeList(); }; #endif // _THREADFILTER_H diff --git a/ddprof-lib/src/main/cpp/threadIdTable.h b/ddprof-lib/src/main/cpp/threadIdTable.h new file mode 100644 index 000000000..9f62331db --- /dev/null +++ b/ddprof-lib/src/main/cpp/threadIdTable.h @@ -0,0 +1,70 @@ +/* + * Copyright The async-profiler authors + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2021, 2025 Datadog, Inc + */ + +#ifndef _THREADIDTABLE_H +#define _THREADIDTABLE_H + +#include +#include + +// Signal-safe thread ID table with fixed size +class ThreadIdTable { +private: + static const int TABLE_SIZE = 256; // Should handle most realistic thread counts + std::atomic table[TABLE_SIZE]; + + int hash(int tid) const { + // Simple hash function - could be improved if needed + return tid % TABLE_SIZE; + } + +public: + ThreadIdTable() { + clear(); + } + + // Signal-safe insertion using atomic operations only + void insert(int tid) { + if (tid == 0) return; // Invalid thread ID, 0 is reserved for empty slots + + int start_slot = hash(tid); + for (int probe = 0; probe < TABLE_SIZE; probe++) { + int slot = (start_slot + probe) % TABLE_SIZE; + int expected = 0; + + // Try to claim empty slot + if (table[slot].compare_exchange_strong(expected, tid, std::memory_order_relaxed)) { + return; // Successfully inserted + } + + // Check if already present + if (table[slot].load(std::memory_order_relaxed) == tid) { + return; // Already exists + } + } + // Table full - thread ID will be lost, but this is rare and non-critical + // Could increment a counter here for diagnostics if needed + } + + // Clear the table (not signal-safe, called during buffer switch) + void clear() { + for (int i = 0; i < TABLE_SIZE; i++) { + table[i].store(0, std::memory_order_relaxed); + } + } + + // Collect all thread IDs into a set (not signal-safe, called during buffer switch) + void collect(std::unordered_set& result) { + for (int i = 0; i < TABLE_SIZE; i++) { + int tid = table[i].load(std::memory_order_relaxed); + if (tid != 0) { + result.insert(tid); + } + } + } +}; + +#endif // _THREADIDTABLE_H \ No newline at end of file diff --git a/ddprof-lib/src/main/cpp/wallClock.cpp b/ddprof-lib/src/main/cpp/wallClock.cpp index d261cdcec..8d8738270 100644 --- a/ddprof-lib/src/main/cpp/wallClock.cpp +++ b/ddprof-lib/src/main/cpp/wallClock.cpp @@ -26,6 +26,7 @@ #include "vmStructs_dd.h" #include #include +#include // For std::sort and std::binary_search std::atomic BaseWallClock::_enabled{false}; @@ -196,20 +197,27 @@ void WallClockJVMTI::timerLoop() { bool do_filter = threadFilter->enabled(); int self = OS::threadId(); - for (int i = 0; i < threads_count; i++) { - jthread thread = threads_ptr[i]; - if (thread != nullptr) { - ddprof::VMThread* nThread = static_cast(VMThread::fromJavaThread(jni, thread)); - if (nThread == nullptr) { - continue; - } - int tid = nThread->osThreadId(); - if (!threadFilter->isValid(tid)) { - continue; - } + // If filtering is enabled, collect the filtered TIDs first + std::vector filtered_tids; + if (do_filter) { + Profiler::instance()->threadFilter()->collect(filtered_tids); + // Sort the TIDs for efficient lookup + std::sort(filtered_tids.begin(), filtered_tids.end()); + } - if (tid != self && (!do_filter || threadFilter->accept(tid))) { - threads.push_back({nThread, thread, tid}); + for (int i = 0; i < threads_count; i++) { + jthread thread = threads_ptr[i]; + if (thread != nullptr) { + ddprof::VMThread* nThread = static_cast(VMThread::fromJavaThread(jni, thread)); + if (nThread == nullptr) { + continue; + } + int tid = nThread->osThreadId(); + if (tid != self && (!do_filter || + // Use binary search to efficiently find if tid is in filtered_tids + std::binary_search(filtered_tids.begin(), filtered_tids.end(), tid))) { + threads.push_back({nThread, thread}); + } } } } @@ -264,13 +272,17 @@ void WallClockJVMTI::timerLoop() { } void WallClockASGCT::timerLoop() { + // todo: re-allocating the vector every time is not efficient auto collectThreads = [&](std::vector& tids) { + // Get thread IDs from the filter if it's enabled + // Otherwise list all threads in the system if (Profiler::instance()->threadFilter()->enabled()) { Profiler::instance()->threadFilter()->collect(tids); } else { ThreadList *thread_list = OS::listThreads(); int tid = thread_list->next(); while (tid != -1) { + // Don't include the current thread if (tid != OS::threadId()) { tids.push_back(tid); } diff --git a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java index a7cda6bc4..fbc96555e 100644 --- a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java +++ b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java @@ -109,7 +109,6 @@ public static synchronized JavaProfiler getInstance(String libLocation, String s throw new IOException("Failed to load Datadog Java profiler library", result.error); } init0(); - ActiveBitmap.initialize(); profiler.initializeContextStorage(); instance = profiler; @@ -210,11 +209,7 @@ public boolean recordTraceRoot(long rootSpanId, String endpoint, int sizeLimit) * 'filter' option must be enabled to use this method. */ public void addThread() { - if (UNSAFE != null) { - ActiveBitmap.setActive(TID.get(), true); - } else { - filterThread0(true); - } + filterThreadAdd0(); } /** @@ -222,14 +217,9 @@ public void addThread() { * 'filter' option must be enabled to use this method. */ public void removeThread() { - if (UNSAFE != null) { - ActiveBitmap.setActive(TID.get(), false); - } else { - filterThread0(false); - } + filterThreadRemove0(); } - /** * Passing context identifier to a profiler. This ID is thread-local and is dumped in * the JFR output only. 0 is a reserved value for "no-context". @@ -479,6 +469,10 @@ public Map getDebugCounters() { private static native boolean init0(); private native void stop0() throws IllegalStateException; private native String execute0(String command) throws IllegalArgumentException, IllegalStateException, IOException; + + private native void filterThreadAdd0(); + private native void filterThreadRemove0(); + // Backward compatibility for existing code private native void filterThread0(boolean enable); private static native int getTid0(); diff --git a/ddprof-lib/src/test/cpp/ddprof_ut.cpp b/ddprof-lib/src/test/cpp/ddprof_ut.cpp index a754c9b02..88d2e657e 100644 --- a/ddprof-lib/src/test/cpp/ddprof_ut.cpp +++ b/ddprof-lib/src/test/cpp/ddprof_ut.cpp @@ -14,6 +14,9 @@ #include #include #include + #include // For std::sort + #include + #include ssize_t callback(char* ptr, int len) { return len; @@ -121,31 +124,109 @@ } TEST(ThreadFilter, testThreadFilter) { - int maxTid = OS::getMaxThreadId(); ThreadFilter filter; filter.init(""); ASSERT_TRUE(filter.enabled()); - EXPECT_EQ(0, filter.size()); - // increase step gradually to create different bit densities - int step = 1; - int size = 0; - for (int tid = 1; tid < maxTid - step - 1; tid += step, size++) { - EXPECT_FALSE(filter.accept(tid)); - filter.add(tid); - EXPECT_TRUE(filter.accept(tid)); - step++; + + const int num_threads = 10; + const int num_ops = 100; + std::vector threads; + std::atomic completed_threads{0}; + + // Each thread will add and remove its own thread ID multiple times + for (int i = 1; i <= num_threads; i++) { + threads.emplace_back([&filter, i, &completed_threads]() { + for (int j = 0; j < num_ops; j++) { + // Register a new slot for this thread + int slot_id = filter.registerThread(); + + // Add thread ID to slot + filter.add(i, slot_id); + bool accepted = filter.accept(slot_id); + if (!accepted) { + fprintf(stderr, "FAIL: Thread %d, op %d, slot %d: accept(slot=%d) returned false after add\n", + i, j, slot_id, slot_id); + } + EXPECT_TRUE(accepted); + + // Remove thread ID + filter.remove(slot_id); + accepted = filter.accept(slot_id); + if (accepted) { + fprintf(stderr, "FAIL: Thread %d, op %d, slot %d: accept(slot=%d) returned true after remove\n", + i, j, slot_id, slot_id); + } + EXPECT_FALSE(accepted); + } + completed_threads++; + }); + } + + // Wait for all threads to complete + for (auto& t : threads) { + t.join(); } - ASSERT_EQ(size, filter.size()); + + // Verify all threads completed + ASSERT_EQ(completed_threads.load(), num_threads); + + // Collect and verify all thread IDs were properly removed std::vector tids; - tids.reserve(size); filter.collect(tids); - ASSERT_EQ(size, tids.size()); - for (int tid : tids) { - ASSERT_TRUE(filter.accept(tid)); - filter.remove(tid); - ASSERT_FALSE(filter.accept(tid)); + ASSERT_EQ(tids.size(), 0); + } + + TEST(ThreadFilter, testThreadFilterCollect) { + ThreadFilter filter; + filter.init(""); + ASSERT_TRUE(filter.enabled()); + + const int num_threads = 10; + std::vector threads; + std::atomic completed_threads{0}; + std::vector expected_tids; + std::vector slots(num_threads); // Track slot IDs + + // Pre-register slots for each thread + for (int i = 0; i < num_threads; i++) { + slots[i] = filter.registerThread(); + } + + // Each thread will add its thread ID + for (int i = 1; i <= num_threads; i++) { + expected_tids.push_back(i); + int slot_id = slots[i-1]; // Use the pre-registered slot + + threads.emplace_back([&filter, i, slot_id, &completed_threads]() { + filter.add(i, slot_id); + EXPECT_TRUE(filter.accept(slot_id)); + completed_threads++; + }); + } + + // Wait for all threads to complete + for (auto& t : threads) { + t.join(); + } + + // Verify all threads completed + ASSERT_EQ(completed_threads.load(), num_threads); + + // Collect and verify all thread IDs are present + std::vector collected_tids; + filter.collect(collected_tids); + + // Sort both vectors for comparison + std::sort(expected_tids.begin(), expected_tids.end()); + std::sort(collected_tids.begin(), collected_tids.end()); + + ASSERT_EQ(expected_tids.size(), collected_tids.size()); + for (size_t i = 0; i < expected_tids.size(); i++) { + EXPECT_EQ(expected_tids[i], collected_tids[i]) + << "Mismatch at index " << i + << ": expected " << expected_tids[i] + << ", got " << collected_tids[i]; } - EXPECT_EQ(0, filter.size()); } TEST(ThreadInfoTest, testThreadInfoCleanupAllDead) { diff --git a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/counters/ThreadFilterBenchmark.java b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/counters/ThreadFilterBenchmark.java index 246cbd1dd..08b84a96a 100644 --- a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/counters/ThreadFilterBenchmark.java +++ b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/counters/ThreadFilterBenchmark.java @@ -15,7 +15,10 @@ @State(Scope.Benchmark) public class ThreadFilterBenchmark extends Configuration { - private static final int NUM_THREADS = 4; + @Param({"true", "false"}) // Parameterize the filter usage + public boolean useThreadFilters; + + private static final int NUM_THREADS = 15; private ExecutorService executorService; private JavaProfiler profiler; private AtomicBoolean running; @@ -30,13 +33,13 @@ public class ThreadFilterBenchmark extends Configuration { private static final AtomicIntegerArray atomicArray = new AtomicIntegerArray(ARRAY_SIZE); private static final int CACHE_LINE_SIZE = 64; // Typical cache line size private static final int STRIDE = CACHE_LINE_SIZE / Integer.BYTES; // Elements per cache line - private boolean useThreadFilters = true; // Flag to control the use of thread filters private AtomicLong addThreadCount = new AtomicLong(0); private AtomicLong removeThreadCount = new AtomicLong(0); @Setup(Level.Trial) public void setup() throws IOException { System.out.println("Setting up benchmark..."); + System.out.println("Thread filters enabled: " + useThreadFilters); System.out.println("Creating thread pool with " + NUM_THREADS + " threads"); executorService = Executors.newFixedThreadPool(NUM_THREADS); System.out.println("Getting profiler instance"); @@ -53,6 +56,7 @@ public void setup() throws IOException { System.out.println("Starting profiler with " + config); profiler.execute(config); System.out.println("Started profiler with output file"); + running = new AtomicBoolean(true); operationCount = new AtomicLong(0); startTime = System.currentTimeMillis(); @@ -106,6 +110,7 @@ public void tearDown() { // Stop the profiler if it's active try { profiler.stop(); + System.out.println("Profiler stopped."); } catch (IllegalStateException e) { System.out.println("Profiler was not active at teardown."); } @@ -146,7 +151,7 @@ public void setUseThreadFilters(boolean useThreadFilters) { @Benchmark @BenchmarkMode(Mode.Throughput) - @Fork(value = 1, warmups = 0) + @Fork(value = 1, warmups = 1) @Warmup(iterations = 1, time = 1) @Measurement(iterations = 1, time = 2) @Threads(1) @@ -156,14 +161,13 @@ public long threadFilterStress() throws InterruptedException { startLatch = new CountDownLatch(NUM_THREADS); stopLatch = new CountDownLatch(NUM_THREADS); - // Start all worker threads + // Start all worker threads[] for (int i = 0; i < NUM_THREADS; i++) { final int threadId = i; executorService.submit(() -> { try { startLatch.countDown(); startLatch.await(30, TimeUnit.SECONDS); - String startMsg = String.format("Thread %d started%n", threadId); System.out.print(startMsg); if (logWriter != null) { @@ -223,14 +227,14 @@ public long threadFilterStress() throws InterruptedException { operationCount.incrementAndGet(); } - if (operationCount.get() % 1000 == 0) { - String progressMsg = String.format("Thread %d completed %d operations%n", threadId, operationCount.get()); - System.out.print(progressMsg); - if (logWriter != null) { - logWriter.print(progressMsg); - logWriter.flush(); - } - } + // if (operationCount.get() % 1000 == 0) { + // String progressMsg = String.format("Thread %d completed %d operations%n", threadId, operationCount.get()); + // System.out.print(progressMsg); + // if (logWriter != null) { + // logWriter.print(progressMsg); + // logWriter.flush(); + // } + // } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -243,4 +247,4 @@ public long threadFilterStress() throws InterruptedException { stopLatch.await(); return operationCount.get(); } -} +} \ No newline at end of file From 51cb97f6862499cc565a44070f7b2c8b3a2f2966 Mon Sep 17 00:00:00 2001 From: r1viollet Date: Mon, 7 Jul 2025 11:24:13 +0200 Subject: [PATCH 02/13] Add an automatic register in case we failed to register the thread --- ddprof-lib/src/main/cpp/javaApi.cpp | 22 ++++++++++++++++++++-- ddprof-lib/src/main/cpp/threadFilter.cpp | 8 +++++++- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 8b3e0ff0b..c1db99e2d 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -144,7 +144,13 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(JNIEnv *env, int slot_id = current->filterSlotId(); if (unlikely(slot_id == -1)) { - return; + // Thread doesn't have a slot ID yet (e.g., main thread), so register it + slot_id = thread_filter->registerThread(); + current->setFilterSlotId(slot_id); + } + + if (unlikely(slot_id == -1)) { + return; // Failed to register thread } thread_filter->add(tid, slot_id); } @@ -167,6 +173,7 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0(JNIEnv *env, int slot_id = current->filterSlotId(); if (unlikely(slot_id == -1)) { + // Thread doesn't have a slot ID yet - nothing to remove return; } thread_filter->remove(slot_id); @@ -192,7 +199,18 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThread0(JNIEnv *env, int slot_id = current->filterSlotId(); if (unlikely(slot_id == -1)) { - return; + if (enable) { + // Thread doesn't have a slot ID yet, so register it + slot_id = thread_filter->registerThread(); + current->setFilterSlotId(slot_id); + } else { + // Thread doesn't have a slot ID yet - nothing to remove + return; + } + } + + if (unlikely(slot_id == -1)) { + return; // Failed to register thread } if (enable) { diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index dfc3d09e6..147c8bf9c 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -6,6 +6,7 @@ #include #include #include +#include ThreadFilter::ThreadFilter() : _enabled(false) { // Initialize chunk pointers to null (lazy allocation) @@ -243,9 +244,14 @@ void ThreadFilter::collect(std::vector& tids) const { void ThreadFilter::init(const char* filter) { if (!filter) { + _enabled = false; return; } - // TODO: Implement parsing of filter string if needed + + // Simple logic: any filter value (including "0") enables filtering + // Only explicitly registered threads via addThread() will be sampled + // Previously we had a syntax where we could manually force some thread IDs. + // This is no longer supported. _enabled = true; } From cc02c1e99a35f3da4faabf5ced5e68c216e52e27 Mon Sep 17 00:00:00 2001 From: Jaroslav Bachorik Date: Thu, 10 Jul 2025 12:12:55 +0200 Subject: [PATCH 03/13] Exterminate the last remnants of false sharing --- ddprof-lib/src/main/cpp/threadFilter.cpp | 123 ++++++++++++----------- ddprof-lib/src/main/cpp/threadFilter.h | 14 +-- 2 files changed, 71 insertions(+), 66 deletions(-) diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index 147c8bf9c..dd763a2e9 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -8,13 +8,15 @@ #include #include +ThreadFilter::ShardHead ThreadFilter::_free_heads[ThreadFilter::kShardCount] {}; + ThreadFilter::ThreadFilter() : _enabled(false) { // Initialize chunk pointers to null (lazy allocation) for (int i = 0; i < kMaxChunks; ++i) { _chunks[i].store(nullptr, std::memory_order_relaxed); } _free_list = std::make_unique(kFreeListSize); - + // Initialize the first chunk initializeChunk(0); clear(); @@ -30,14 +32,14 @@ ThreadFilter::~ThreadFilter() { void ThreadFilter::initializeChunk(int chunk_idx) { if (chunk_idx >= kMaxChunks) return; - + // Check if chunk already exists ChunkStorage* existing = _chunks[chunk_idx].load(std::memory_order_acquire); if (existing != nullptr) return; - + // Allocate new chunk ChunkStorage* new_chunk = new ChunkStorage(); - + // Try to install it atomically ChunkStorage* expected = nullptr; if (_chunks[chunk_idx].compare_exchange_strong(expected, new_chunk, std::memory_order_acq_rel)) { @@ -68,13 +70,13 @@ ThreadFilter::SlotID ThreadFilter::registerThread() { } const int chunk_idx = index >> kChunkShift; - + // Ensure the chunk is initialized (lock-free) if (chunk_idx >= _num_chunks.load(std::memory_order_acquire)) { // Update the chunk count atomically int expected_chunks = chunk_idx; int desired_chunks = chunk_idx + 1; - while (!_num_chunks.compare_exchange_weak(expected_chunks, desired_chunks, + while (!_num_chunks.compare_exchange_weak(expected_chunks, desired_chunks, std::memory_order_acq_rel)) { if (expected_chunks > chunk_idx) { break; // Another thread already updated it @@ -82,10 +84,10 @@ ThreadFilter::SlotID ThreadFilter::registerThread() { desired_chunks = expected_chunks + 1; } } - + // Initialize the chunk if needed initializeChunk(chunk_idx); - + return index; } @@ -100,14 +102,17 @@ void ThreadFilter::clear() { } } } - + // Clear the free list for (int i = 0; i < kFreeListSize; ++i) { _free_list[i].value.store(-1, std::memory_order_relaxed); _free_list[i].next.store(-1, std::memory_order_relaxed); } - _free_list_head.store(-1, std::memory_order_relaxed); - _active_slots.store(0, std::memory_order_relaxed); + + // Reset the free heads for each shard + for (int s = 0; s < kShardCount; ++s) { + _free_heads[s].head.store(-1, std::memory_order_relaxed); + } } bool ThreadFilter::accept(SlotID slot_id) const { @@ -115,49 +120,43 @@ bool ThreadFilter::accept(SlotID slot_id) const { return true; } if (slot_id < 0) return false; - + int chunk_idx = slot_id >> kChunkShift; int slot_idx = slot_id & kChunkMask; - + if (chunk_idx >= kMaxChunks) return false; ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed); if (chunk == nullptr) return false; // Fail-fast if not allocated - + return chunk->slots[slot_idx].value.load(std::memory_order_acquire) != -1; } void ThreadFilter::add(int tid, SlotID slot_id) { if (slot_id < 0) return; - + int chunk_idx = slot_id >> kChunkShift; int slot_idx = slot_id & kChunkMask; - + if (chunk_idx >= kMaxChunks) return; ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed); if (chunk == nullptr) return; // Fail-fast if not allocated - - // Store the tid and increment active slots if this was previously empty - int old_value = chunk->slots[slot_idx].value.exchange(tid, std::memory_order_acq_rel); - if (old_value == -1) { - _active_slots.fetch_add(1, std::memory_order_relaxed); - } + + // Store the tid + chunk->slots[slot_idx].value.store(tid, std::memory_order_release); } void ThreadFilter::remove(SlotID slot_id) { if (slot_id < 0) return; - + int chunk_idx = slot_id >> kChunkShift; int slot_idx = slot_id & kChunkMask; - + if (chunk_idx >= kMaxChunks) return; ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed); if (chunk == nullptr) return; // Fail-fast if not allocated - - // Remove the tid and decrement active slots if this was previously occupied - int old_value = chunk->slots[slot_idx].value.exchange(-1, std::memory_order_acq_rel); - if (old_value != -1) { - _active_slots.fetch_sub(1, std::memory_order_relaxed); - } + + // Remove the tid + chunk->slots[slot_idx].value.store(-1, std::memory_order_acq_rel); } void ThreadFilter::unregisterThread(SlotID slot_id) { @@ -171,15 +170,20 @@ void ThreadFilter::unregisterThread(SlotID slot_id) { } bool ThreadFilter::pushToFreeList(SlotID slot_id) { - // Lock-free Treiber stack push + // Lock-free sharded Treiber stack push + const int shard = shardOfSlot(slot_id); + auto& head = _free_heads[shard].head; // private cache-line + for (int i = 0; i < kFreeListSize; ++i) { int expected = -1; - if (_free_list[i].value.compare_exchange_strong(expected, slot_id, std::memory_order_acq_rel)) { - // Successfully stored in this slot - int old_head = _free_list_head.load(std::memory_order_acquire); + if (_free_list[i].value.compare_exchange_strong( + expected, slot_id, std::memory_order_acq_rel)) { + // Link node into this shard’s Treiber stack + int old_head = head.load(std::memory_order_acquire); do { _free_list[i].next.store(old_head, std::memory_order_relaxed); - } while (!_free_list_head.compare_exchange_weak(old_head, i, std::memory_order_acq_rel)); + } while (!head.compare_exchange_weak(old_head, i, + std::memory_order_acq_rel, std::memory_order_relaxed)); return true; } } @@ -187,38 +191,39 @@ bool ThreadFilter::pushToFreeList(SlotID slot_id) { } ThreadFilter::SlotID ThreadFilter::popFromFreeList() { - // Lock-free Treiber stack pop - while (true) { - int head = _free_list_head.load(std::memory_order_acquire); - if (head == -1) { - return -1; // Empty list - } - - int slot_id = _free_list[head].value.load(std::memory_order_acquire); - int next = _free_list[head].next.load(std::memory_order_acquire); - - // Try to update the head - if (_free_list_head.compare_exchange_weak(head, next, std::memory_order_acq_rel)) { - // Clear the node - _free_list[head].value.store(-1, std::memory_order_relaxed); - _free_list[head].next.store(-1, std::memory_order_relaxed); - return slot_id; + // Lock-free sharded Treiber stack pop + int hash = static_cast(std::hash{}(std::this_thread::get_id())); + int start = shardOf(hash); + + for (int pass = 0; pass < kShardCount; ++pass) { + int s = (start + pass) & (kShardCount - 1); + auto& head = _free_heads[s].head; + + while (true) { + int node = head.load(std::memory_order_acquire); + if (node == -1) break; // shard empty → try next + + int next = _free_list[node].next.load(std::memory_order_relaxed); + if (head.compare_exchange_weak(node, next, + std::memory_order_acq_rel, + std::memory_order_relaxed)) + { + int id = _free_list[node].value.exchange(-1, + std::memory_order_relaxed); + _free_list[node].next.store(-1, std::memory_order_relaxed); + return id; + } } - // Retry if another thread modified the head } + return -1; // Empty list } void ThreadFilter::collect(std::vector& tids) const { tids.clear(); - // Early exit if no active slots - int active_count = _active_slots.load(std::memory_order_relaxed); - if (active_count == 0) { - return; - } - // Reserve space for efficiency - tids.reserve(active_count); + // The eventual resize is not the bottleneck, so we reserve a reasonable size + tids.reserve(512); // Scan only initialized chunks int num_chunks = _num_chunks.load(std::memory_order_relaxed); diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index 1cf42ee56..197e5987d 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -17,7 +17,9 @@ class ThreadFilter { static constexpr int kChunkMask = kChunkSize - 1; static constexpr int kMaxThreads = 2048; static constexpr int kMaxChunks = (kMaxThreads + kChunkSize - 1) / kChunkSize; // = 8 chunks - +// High-performance free list using Treiber stack, 64 shards + static constexpr int kFreeListSize = 1024; + static constexpr int kShardCount = 64; // power-of-two ThreadFilter(); ~ThreadFilter(); @@ -59,15 +61,13 @@ class ThreadFilter { // Lock-free slot allocation std::atomic _next_index{0}; - - // High-performance free list using Treiber stack - static constexpr int kFreeListSize = 1024; // Increased from 128 std::unique_ptr _free_list; - std::atomic _free_list_head{-1}; - // Active slot tracking for efficient collect() - std::atomic _active_slots{0}; + struct alignas(64) ShardHead { std::atomic head{-1}; }; +static ShardHead _free_heads[kShardCount]; // one cache-line each + static inline int shardOf(int tid) { return tid & (kShardCount - 1); } + static inline int shardOfSlot(int s){ return s & (kShardCount - 1); } // Helper methods for lock-free operations void initializeChunk(int chunk_idx); bool pushToFreeList(SlotID slot_id); From 50a8d5f19c4f31ff8ef923a9daf1465ac1649093 Mon Sep 17 00:00:00 2001 From: Jaroslav Bachorik Date: Mon, 21 Jul 2025 14:07:23 +0200 Subject: [PATCH 04/13] Minor tweaks --- ddprof-lib/src/main/cpp/flightRecorder.h | 2 +- ddprof-lib/src/main/cpp/threadFilter.cpp | 6 ++++++ ddprof-lib/src/main/cpp/threadIdTable.h | 5 ++++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/ddprof-lib/src/main/cpp/flightRecorder.h b/ddprof-lib/src/main/cpp/flightRecorder.h index 81ed85bce..541c4db1c 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.h +++ b/ddprof-lib/src/main/cpp/flightRecorder.h @@ -164,7 +164,7 @@ class Recording { public: Recording(int fd, Arguments &args); ~Recording(); - + void copyTo(int target_fd); off_t finishChunk(); diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index dd763a2e9..d37f56ed0 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -125,6 +125,8 @@ bool ThreadFilter::accept(SlotID slot_id) const { int slot_idx = slot_id & kChunkMask; if (chunk_idx >= kMaxChunks) return false; + if (slot_idx >= kChunkSize) return false; + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed); if (chunk == nullptr) return false; // Fail-fast if not allocated @@ -138,6 +140,8 @@ void ThreadFilter::add(int tid, SlotID slot_id) { int slot_idx = slot_id & kChunkMask; if (chunk_idx >= kMaxChunks) return; + if (slot_idx >= kChunkSize) return; + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed); if (chunk == nullptr) return; // Fail-fast if not allocated @@ -152,6 +156,8 @@ void ThreadFilter::remove(SlotID slot_id) { int slot_idx = slot_id & kChunkMask; if (chunk_idx >= kMaxChunks) return; + if (slot_idx >= kChunkSize) return; + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed); if (chunk == nullptr) return; // Fail-fast if not allocated diff --git a/ddprof-lib/src/main/cpp/threadIdTable.h b/ddprof-lib/src/main/cpp/threadIdTable.h index 9f62331db..1856c6522 100644 --- a/ddprof-lib/src/main/cpp/threadIdTable.h +++ b/ddprof-lib/src/main/cpp/threadIdTable.h @@ -13,7 +13,10 @@ // Signal-safe thread ID table with fixed size class ThreadIdTable { private: - static const int TABLE_SIZE = 256; // Should handle most realistic thread counts + // We have 256 slots per concurrency level (currently 16) + // This should cater for 4096 threads - if it turns out to be too small, we + // can increase it or make it configurable + static const int TABLE_SIZE = 256; std::atomic table[TABLE_SIZE]; int hash(int tid) const { From 30d32c04d083fbeb844437259e59adb1be4764c8 Mon Sep 17 00:00:00 2001 From: Jaroslav Bachorik Date: Mon, 21 Jul 2025 17:09:11 +0200 Subject: [PATCH 05/13] Merge cleanup --- ddprof-lib/src/main/cpp/wallClock.cpp | 39 +++++++++++++-------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/ddprof-lib/src/main/cpp/wallClock.cpp b/ddprof-lib/src/main/cpp/wallClock.cpp index 8d8738270..0b16d0db5 100644 --- a/ddprof-lib/src/main/cpp/wallClock.cpp +++ b/ddprof-lib/src/main/cpp/wallClock.cpp @@ -197,27 +197,26 @@ void WallClockJVMTI::timerLoop() { bool do_filter = threadFilter->enabled(); int self = OS::threadId(); - // If filtering is enabled, collect the filtered TIDs first - std::vector filtered_tids; - if (do_filter) { - Profiler::instance()->threadFilter()->collect(filtered_tids); - // Sort the TIDs for efficient lookup - std::sort(filtered_tids.begin(), filtered_tids.end()); - } + // If filtering is enabled, collect the filtered TIDs first + std::vector filtered_tids; + if (do_filter) { + Profiler::instance()->threadFilter()->collect(filtered_tids); + // Sort the TIDs for efficient lookup + std::sort(filtered_tids.begin(), filtered_tids.end()); + } - for (int i = 0; i < threads_count; i++) { - jthread thread = threads_ptr[i]; - if (thread != nullptr) { - ddprof::VMThread* nThread = static_cast(VMThread::fromJavaThread(jni, thread)); - if (nThread == nullptr) { - continue; - } - int tid = nThread->osThreadId(); - if (tid != self && (!do_filter || - // Use binary search to efficiently find if tid is in filtered_tids - std::binary_search(filtered_tids.begin(), filtered_tids.end(), tid))) { - threads.push_back({nThread, thread}); - } + for (int i = 0; i < threads_count; i++) { + jthread thread = threads_ptr[i]; + if (thread != nullptr) { + ddprof::VMThread* nThread = static_cast(VMThread::fromJavaThread(jni, thread)); + if (nThread == nullptr) { + continue; + } + int tid = nThread->osThreadId(); + if (tid != self && (!do_filter || + // Use binary search to efficiently find if tid is in filtered_tids + std::binary_search(filtered_tids.begin(), filtered_tids.end(), tid))) { + threads.push_back({nThread, thread}); } } } From 28e23ee8f0730d68c95e9be94a81d19280432ed3 Mon Sep 17 00:00:00 2001 From: r1viollet Date: Fri, 8 Aug 2025 19:18:24 +0200 Subject: [PATCH 06/13] Adjust ThreadEnd hook If the TLS cleanup fires before the JVMTI hook, we want to ensure that we don't crash while retrieving the ProfiledThread - Add a check on validity of ProfiledThread --- ddprof-lib/src/main/cpp/profiler.cpp | 32 ++++++++++++++++++-------- ddprof-lib/src/main/cpp/threadFilter.h | 4 ++-- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index 3a03a74e0..96a4758bc 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -119,18 +119,32 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { void Profiler::onThreadEnd(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { ProfiledThread *current = ProfiledThread::current(); - int slot_id = current->filterSlotId(); - int tid = current->tid(); - if (_thread_filter.enabled()) { - _thread_filter.unregisterThread(slot_id); - current->setFilterSlotId(-1); + int tid = -1; + + if (current != nullptr) { + // ProfiledThread is alive - do full cleanup and use efficient tid access + int slot_id = current->filterSlotId(); + tid = current->tid(); + + if (_thread_filter.enabled()) { + _thread_filter.unregisterThread(slot_id); + current->setFilterSlotId(-1); + } + + ProfiledThread::release(); + } else { + // ProfiledThread already cleaned up - try to get tid from JVMTI as fallback + tid = VMThread::nativeThreadId(jni, thread); + if (tid < 0) { + // No ProfiledThread AND can't get tid from JVMTI - nothing we can do + return; + } } - updateThreadName(jvmti, jni, thread, true); - + + // These should always run if we have a valid tid + updateThreadName(jvmti, jni, thread, false); // false = not self _cpu_engine->unregisterThread(tid); - // unregister here because JNI callers generally don't know about thread exits _wall_engine->unregisterThread(tid); - ProfiledThread::release(); } int Profiler::registerThread(int tid) { diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index 197e5987d..eb2eaaa1f 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -17,7 +17,7 @@ class ThreadFilter { static constexpr int kChunkMask = kChunkSize - 1; static constexpr int kMaxThreads = 2048; static constexpr int kMaxChunks = (kMaxThreads + kChunkSize - 1) / kChunkSize; // = 8 chunks -// High-performance free list using Treiber stack, 64 shards + // High-performance free list using Treiber stack, 64 shards static constexpr int kFreeListSize = 1024; static constexpr int kShardCount = 64; // power-of-two ThreadFilter(); @@ -64,7 +64,7 @@ class ThreadFilter { std::unique_ptr _free_list; struct alignas(64) ShardHead { std::atomic head{-1}; }; -static ShardHead _free_heads[kShardCount]; // one cache-line each + static ShardHead _free_heads[kShardCount]; // one cache-line each static inline int shardOf(int tid) { return tid & (kShardCount - 1); } static inline int shardOfSlot(int s){ return s & (kShardCount - 1); } From 3d31cc7e0f996ed2d3888a3e871e9d0318f72603 Mon Sep 17 00:00:00 2001 From: r1viollet Date: Tue, 19 Aug 2025 18:05:34 +0200 Subject: [PATCH 07/13] Profiler thread - Ensure we init before swap --- ddprof-lib/src/main/cpp/threadFilter.cpp | 11 +++++------ ddprof-lib/src/main/cpp/threadFilter.h | 1 - 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index d37f56ed0..825fdbeb7 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -37,17 +37,16 @@ void ThreadFilter::initializeChunk(int chunk_idx) { ChunkStorage* existing = _chunks[chunk_idx].load(std::memory_order_acquire); if (existing != nullptr) return; - // Allocate new chunk + // Allocate and initialize new chunk completely before swapping ChunkStorage* new_chunk = new ChunkStorage(); + for (auto& slot : new_chunk->slots) { + slot.value.store(-1, std::memory_order_relaxed); + } // Try to install it atomically ChunkStorage* expected = nullptr; if (_chunks[chunk_idx].compare_exchange_strong(expected, new_chunk, std::memory_order_acq_rel)) { - // Successfully installed - initialize all slots - for (auto& slot : new_chunk->slots) { - slot.value.store(-1, std::memory_order_relaxed); - } - new_chunk->initialized.store(true, std::memory_order_release); + // Successfully installed } else { // Another thread beat us to it - clean up our allocation delete new_chunk; diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index eb2eaaa1f..9151f199e 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -50,7 +50,6 @@ class ThreadFilter { // Pre-allocated chunk storage to eliminate mutex contention struct ChunkStorage { std::array slots; - std::atomic initialized{false}; }; bool _enabled = false; From dfd44de08b91141d6f961fb7c52f65d1e5f392b3 Mon Sep 17 00:00:00 2001 From: r1viollet Date: Tue, 19 Aug 2025 18:36:31 +0200 Subject: [PATCH 08/13] Thread filter bench - Start the profiler to ensure we have valid thread objects - add asserts around missing thread object - remove print (replacing with an assert) --- ddprof-lib/src/main/cpp/javaApi.cpp | 5 +++-- ddprof-lib/src/main/cpp/profiler.cpp | 2 +- .../scenarios/throughput/ThreadFilterBenchmark.java | 12 ++++++++++++ 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index c1db99e2d..3bff4e8de 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -129,8 +129,7 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(JNIEnv *env, jobject unused) { ProfiledThread *current = ProfiledThread::current(); if (unlikely(current == nullptr)) { - printf("[DEBUG] ProfiledThread::current() returned null in addThread() - thread not initialized\n"); - fflush(stdout); + assert(false); return; } int tid = current->tid(); @@ -160,6 +159,7 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0(JNIEnv *env, jobject unused) { ProfiledThread *current = ProfiledThread::current(); if (unlikely(current == nullptr)) { + assert(false); return; } int tid = current->tid(); @@ -186,6 +186,7 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThread0(JNIEnv *env, jboolean enable) { ProfiledThread *current = ProfiledThread::current(); if (unlikely(current == nullptr)) { + assert(false); return; } int tid = current->tid(); diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index 96a4758bc..135b523a7 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -141,7 +141,7 @@ void Profiler::onThreadEnd(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { } } - // These should always run if we have a valid tid + // These can run if we have a valid tid updateThreadName(jvmti, jni, thread, false); // false = not self _cpu_engine->unregisterThread(tid); _wall_engine->unregisterThread(tid); diff --git a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/throughput/ThreadFilterBenchmark.java b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/throughput/ThreadFilterBenchmark.java index bc03e0ca7..f56bd258b 100644 --- a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/throughput/ThreadFilterBenchmark.java +++ b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/throughput/ThreadFilterBenchmark.java @@ -13,6 +13,7 @@ import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; @@ -39,6 +40,17 @@ public class ThreadFilterBenchmark extends Configuration { public void setup() throws IOException { profiler = JavaProfiler.getInstance(); workloadNum = Long.parseLong(workload); + // Start profiling to enable JVMTI ThreadStart callbacks + // Add JFR file parameter to satisfy profiler requirements + profiler.execute("start," + command + ",jfr,file=/tmp/thread-filter-benchmark.jfr"); + } + + @TearDown(Level.Trial) + public void tearDown() throws IOException { + // Stop profiling and clean up + if (profiler != null) { + profiler.execute("stop"); + } } @Benchmark From 2633224226cf72625afc82ff93ff8f1414a40066 Mon Sep 17 00:00:00 2001 From: r1viollet Date: Thu, 21 Aug 2025 10:59:45 +0200 Subject: [PATCH 09/13] Thread filter optims - minor fixes - Adjust ordering or free list init - Honour the enable / disable flag - Remove bound checks --- ddprof-lib/src/main/cpp/threadFilter.cpp | 102 ++++++++++++----------- ddprof-lib/src/main/cpp/threadFilter.h | 9 +- 2 files changed, 59 insertions(+), 52 deletions(-) diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index 825fdbeb7..f3d49dc88 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -8,6 +8,15 @@ #include #include +// Branch prediction hints for hot paths +#ifdef __GNUC__ +#define likely(x) __builtin_expect(!!(x), 1) +#define unlikely(x) __builtin_expect(!!(x), 0) +#else +#define likely(x) (x) +#define unlikely(x) (x) +#endif + ThreadFilter::ShardHead ThreadFilter::_free_heads[ThreadFilter::kShardCount] {}; ThreadFilter::ThreadFilter() : _enabled(false) { @@ -19,13 +28,26 @@ ThreadFilter::ThreadFilter() : _enabled(false) { // Initialize the first chunk initializeChunk(0); - clear(); + // ordering is fine because we are not enabled yet + initFreeList(); } ThreadFilter::~ThreadFilter() { - // Clean up allocated chunks + // Make the filter inert for any concurrent readers + _enabled.store(false, std::memory_order_release); + // Reset free-list heads and nodes first + for (int s = 0; s < kShardCount; ++s) { + _free_heads[s].head.store(-1, std::memory_order_relaxed); + } + for (int i = 0; i < kFreeListSize; ++i) { + _free_list[i].value.store(-1, std::memory_order_relaxed); + _free_list[i].next.store(-1, std::memory_order_relaxed); + } + // Publish 0 chunks to stop range scans (collect) + _num_chunks.store(0, std::memory_order_release); + // Detach and delete chunks for (int i = 0; i < kMaxChunks; ++i) { - ChunkStorage* chunk = _chunks[i].load(std::memory_order_relaxed); + ChunkStorage* chunk = _chunks[i].exchange(nullptr, std::memory_order_acq_rel); delete chunk; } } @@ -54,6 +76,11 @@ void ThreadFilter::initializeChunk(int chunk_idx) { } ThreadFilter::SlotID ThreadFilter::registerThread() { + // If disabled, block new registrations + if (!_enabled.load(std::memory_order_acquire)) { + return -1; + } + // First, try to get a slot from the free list (lock-free stack) SlotID reused_slot = popFromFreeList(); if (reused_slot >= 0) { @@ -90,19 +117,8 @@ ThreadFilter::SlotID ThreadFilter::registerThread() { return index; } -void ThreadFilter::clear() { - // Clear all initialized chunks - int num_chunks = _num_chunks.load(std::memory_order_relaxed); - for (int i = 0; i < num_chunks; ++i) { - ChunkStorage* chunk = _chunks[i].load(std::memory_order_relaxed); - if (chunk != nullptr) { - for (auto& slot : chunk->slots) { - slot.value.store(-1, std::memory_order_relaxed); - } - } - } - - // Clear the free list +void ThreadFilter::initFreeList() { + // Initialize the free list storage for (int i = 0; i < kFreeListSize; ++i) { _free_list[i].value.store(-1, std::memory_order_relaxed); _free_list[i].next.store(-1, std::memory_order_relaxed); @@ -115,21 +131,21 @@ void ThreadFilter::clear() { } bool ThreadFilter::accept(SlotID slot_id) const { - if (!_enabled) { + // Fast path: if disabled, accept everything (relaxed to avoid fences on hot path) + if (unlikely(!_enabled.load(std::memory_order_relaxed))) { return true; } - if (slot_id < 0) return false; + if (unlikely(slot_id < 0)) return false; int chunk_idx = slot_id >> kChunkShift; int slot_idx = slot_id & kChunkMask; - if (chunk_idx >= kMaxChunks) return false; - if (slot_idx >= kChunkSize) return false; - + // Fast path: assume valid slot_id from registerThread() ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed); - if (chunk == nullptr) return false; // Fail-fast if not allocated - - return chunk->slots[slot_idx].value.load(std::memory_order_acquire) != -1; + if (likely(chunk != nullptr)) { + return chunk->slots[slot_idx].value.load(std::memory_order_acquire) != -1; + } + return false; } void ThreadFilter::add(int tid, SlotID slot_id) { @@ -138,14 +154,11 @@ void ThreadFilter::add(int tid, SlotID slot_id) { int chunk_idx = slot_id >> kChunkShift; int slot_idx = slot_id & kChunkMask; - if (chunk_idx >= kMaxChunks) return; - if (slot_idx >= kChunkSize) return; - + // Fast path: assume valid slot_id from registerThread() ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed); - if (chunk == nullptr) return; // Fail-fast if not allocated - - // Store the tid - chunk->slots[slot_idx].value.store(tid, std::memory_order_release); + if (likely(chunk != nullptr)) { + chunk->slots[slot_idx].value.store(tid, std::memory_order_release); + } } void ThreadFilter::remove(SlotID slot_id) { @@ -154,14 +167,11 @@ void ThreadFilter::remove(SlotID slot_id) { int chunk_idx = slot_id >> kChunkShift; int slot_idx = slot_id & kChunkMask; - if (chunk_idx >= kMaxChunks) return; - if (slot_idx >= kChunkSize) return; - + // Fast path: assume valid slot_id from registerThread() ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed); - if (chunk == nullptr) return; // Fail-fast if not allocated - - // Remove the tid - chunk->slots[slot_idx].value.store(-1, std::memory_order_acq_rel); + if (likely(chunk != nullptr)) { + chunk->slots[slot_idx].value.store(-1, std::memory_order_release); + } } void ThreadFilter::unregisterThread(SlotID slot_id) { @@ -184,11 +194,12 @@ bool ThreadFilter::pushToFreeList(SlotID slot_id) { if (_free_list[i].value.compare_exchange_strong( expected, slot_id, std::memory_order_acq_rel)) { // Link node into this shard’s Treiber stack - int old_head = head.load(std::memory_order_acquire); + int old_head; do { + old_head = head.load(std::memory_order_acquire); _free_list[i].next.store(old_head, std::memory_order_relaxed); } while (!head.compare_exchange_weak(old_head, i, - std::memory_order_acq_rel, std::memory_order_relaxed)); + std::memory_order_release, std::memory_order_relaxed)); return true; } } @@ -210,7 +221,7 @@ ThreadFilter::SlotID ThreadFilter::popFromFreeList() { int next = _free_list[node].next.load(std::memory_order_relaxed); if (head.compare_exchange_weak(node, next, - std::memory_order_acq_rel, + std::memory_order_release, std::memory_order_relaxed)) { int id = _free_list[node].value.exchange(-1, @@ -253,18 +264,13 @@ void ThreadFilter::collect(std::vector& tids) const { } void ThreadFilter::init(const char* filter) { - if (!filter) { - _enabled = false; - return; - } - // Simple logic: any filter value (including "0") enables filtering // Only explicitly registered threads via addThread() will be sampled // Previously we had a syntax where we could manually force some thread IDs. // This is no longer supported. - _enabled = true; + _enabled.store(filter != nullptr, std::memory_order_release); } bool ThreadFilter::enabled() const { - return _enabled; + return _enabled.load(std::memory_order_acquire); } diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index 9151f199e..14fab0e41 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -18,13 +18,14 @@ class ThreadFilter { static constexpr int kMaxThreads = 2048; static constexpr int kMaxChunks = (kMaxThreads + kChunkSize - 1) / kChunkSize; // = 8 chunks // High-performance free list using Treiber stack, 64 shards - static constexpr int kFreeListSize = 1024; - static constexpr int kShardCount = 64; // power-of-two + static constexpr int kFreeListSize = 1024; // power-of-two for fast modulo + static constexpr int kShardCount = 64; // power-of-two for fast modulo + static constexpr int kFreeListMask = kFreeListSize - 1; // For fast modulo ThreadFilter(); ~ThreadFilter(); void init(const char* filter); - void clear(); + void initFreeList(); bool enabled() const; bool accept(SlotID slot_id) const; void add(int tid, SlotID slot_id); @@ -52,7 +53,7 @@ class ThreadFilter { std::array slots; }; - bool _enabled = false; + std::atomic _enabled{false}; // Lazily allocated storage for chunks std::atomic _chunks[kMaxChunks]; From d967e7180054c4a6c641fd72829f542dcf05afa8 Mon Sep 17 00:00:00 2001 From: r1viollet Date: Thu, 21 Aug 2025 11:44:03 +0200 Subject: [PATCH 10/13] Thread filter - Expand test coverage --- ddprof-lib/src/main/cpp/threadFilter.cpp | 29 +- ddprof-lib/src/main/cpp/threadFilter.h | 17 +- ddprof-lib/src/test/cpp/ddprof_ut.cpp | 106 ----- ddprof-lib/src/test/cpp/threadFilter_ut.cpp | 451 ++++++++++++++++++++ 4 files changed, 490 insertions(+), 113 deletions(-) create mode 100644 ddprof-lib/src/test/cpp/threadFilter_ut.cpp diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index f3d49dc88..5491c82b0 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -1,5 +1,23 @@ -// Copyright (C) Datadog 2025 +/* + * Copyright 2025 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ // High-performance lock-free thread filter implementation +// +// PERFORMANCE CONTRACT: +// - add(), remove(), accept() are optimized for signal-safe hot paths +// - These methods assume slot_id comes from registerThread() (undefined behavior otherwise) #include "threadFilter.h" @@ -149,6 +167,8 @@ bool ThreadFilter::accept(SlotID slot_id) const { } void ThreadFilter::add(int tid, SlotID slot_id) { + // PRECONDITION: slot_id must be from registerThread() or negative + // Undefined behavior for invalid positive slot_ids (performance optimization) if (slot_id < 0) return; int chunk_idx = slot_id >> kChunkShift; @@ -162,12 +182,13 @@ void ThreadFilter::add(int tid, SlotID slot_id) { } void ThreadFilter::remove(SlotID slot_id) { + // PRECONDITION: slot_id must be from registerThread() or negative + // Undefined behavior for invalid positive slot_ids (performance optimization) if (slot_id < 0) return; int chunk_idx = slot_id >> kChunkShift; int slot_idx = slot_id & kChunkMask; - // Fast path: assume valid slot_id from registerThread() ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed); if (likely(chunk != nullptr)) { chunk->slots[slot_idx].value.store(-1, std::memory_order_release); @@ -176,11 +197,7 @@ void ThreadFilter::remove(SlotID slot_id) { void ThreadFilter::unregisterThread(SlotID slot_id) { if (slot_id < 0) return; - - // Clear the slot first remove(slot_id); - - // Add to free list for reuse pushToFreeList(slot_id); } diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index 14fab0e41..f743f7e1f 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -1,3 +1,18 @@ +/* + * Copyright 2025 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #ifndef _THREADFILTER_H #define _THREADFILTER_H @@ -20,13 +35,13 @@ class ThreadFilter { // High-performance free list using Treiber stack, 64 shards static constexpr int kFreeListSize = 1024; // power-of-two for fast modulo static constexpr int kShardCount = 64; // power-of-two for fast modulo - static constexpr int kFreeListMask = kFreeListSize - 1; // For fast modulo ThreadFilter(); ~ThreadFilter(); void init(const char* filter); void initFreeList(); bool enabled() const; + // Hot path methods - slot_id MUST be from registerThread(), undefined behavior otherwise bool accept(SlotID slot_id) const; void add(int tid, SlotID slot_id); void remove(SlotID slot_id); diff --git a/ddprof-lib/src/test/cpp/ddprof_ut.cpp b/ddprof-lib/src/test/cpp/ddprof_ut.cpp index 88d2e657e..e8e3a4e4b 100644 --- a/ddprof-lib/src/test/cpp/ddprof_ut.cpp +++ b/ddprof-lib/src/test/cpp/ddprof_ut.cpp @@ -123,112 +123,6 @@ EXPECT_EQ(2048, Contexts::getMaxPages(2097152)); } - TEST(ThreadFilter, testThreadFilter) { - ThreadFilter filter; - filter.init(""); - ASSERT_TRUE(filter.enabled()); - - const int num_threads = 10; - const int num_ops = 100; - std::vector threads; - std::atomic completed_threads{0}; - - // Each thread will add and remove its own thread ID multiple times - for (int i = 1; i <= num_threads; i++) { - threads.emplace_back([&filter, i, &completed_threads]() { - for (int j = 0; j < num_ops; j++) { - // Register a new slot for this thread - int slot_id = filter.registerThread(); - - // Add thread ID to slot - filter.add(i, slot_id); - bool accepted = filter.accept(slot_id); - if (!accepted) { - fprintf(stderr, "FAIL: Thread %d, op %d, slot %d: accept(slot=%d) returned false after add\n", - i, j, slot_id, slot_id); - } - EXPECT_TRUE(accepted); - - // Remove thread ID - filter.remove(slot_id); - accepted = filter.accept(slot_id); - if (accepted) { - fprintf(stderr, "FAIL: Thread %d, op %d, slot %d: accept(slot=%d) returned true after remove\n", - i, j, slot_id, slot_id); - } - EXPECT_FALSE(accepted); - } - completed_threads++; - }); - } - - // Wait for all threads to complete - for (auto& t : threads) { - t.join(); - } - - // Verify all threads completed - ASSERT_EQ(completed_threads.load(), num_threads); - - // Collect and verify all thread IDs were properly removed - std::vector tids; - filter.collect(tids); - ASSERT_EQ(tids.size(), 0); - } - - TEST(ThreadFilter, testThreadFilterCollect) { - ThreadFilter filter; - filter.init(""); - ASSERT_TRUE(filter.enabled()); - - const int num_threads = 10; - std::vector threads; - std::atomic completed_threads{0}; - std::vector expected_tids; - std::vector slots(num_threads); // Track slot IDs - - // Pre-register slots for each thread - for (int i = 0; i < num_threads; i++) { - slots[i] = filter.registerThread(); - } - - // Each thread will add its thread ID - for (int i = 1; i <= num_threads; i++) { - expected_tids.push_back(i); - int slot_id = slots[i-1]; // Use the pre-registered slot - - threads.emplace_back([&filter, i, slot_id, &completed_threads]() { - filter.add(i, slot_id); - EXPECT_TRUE(filter.accept(slot_id)); - completed_threads++; - }); - } - - // Wait for all threads to complete - for (auto& t : threads) { - t.join(); - } - - // Verify all threads completed - ASSERT_EQ(completed_threads.load(), num_threads); - - // Collect and verify all thread IDs are present - std::vector collected_tids; - filter.collect(collected_tids); - - // Sort both vectors for comparison - std::sort(expected_tids.begin(), expected_tids.end()); - std::sort(collected_tids.begin(), collected_tids.end()); - - ASSERT_EQ(expected_tids.size(), collected_tids.size()); - for (size_t i = 0; i < expected_tids.size(); i++) { - EXPECT_EQ(expected_tids[i], collected_tids[i]) - << "Mismatch at index " << i - << ": expected " << expected_tids[i] - << ", got " << collected_tids[i]; - } - } - TEST(ThreadInfoTest, testThreadInfoCleanupAllDead) { ThreadInfo info; info.set(1, "main", 1); diff --git a/ddprof-lib/src/test/cpp/threadFilter_ut.cpp b/ddprof-lib/src/test/cpp/threadFilter_ut.cpp new file mode 100644 index 000000000..8cbeec991 --- /dev/null +++ b/ddprof-lib/src/test/cpp/threadFilter_ut.cpp @@ -0,0 +1,451 @@ +/* + * Copyright 2025 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "threadFilter.h" +#include +#include +#include +#include +#include +#include + +class ThreadFilterTest : public ::testing::Test { +protected: + void SetUp() override { + filter = std::make_unique(); + filter->init(""); // Enable filtering + } + + void TearDown() override { + filter.reset(); + } + + std::unique_ptr filter; +}; + +// Basic functionality tests +TEST_F(ThreadFilterTest, BasicRegisterAndAccept) { + EXPECT_TRUE(filter->enabled()); + + int slot_id = filter->registerThread(); + EXPECT_GE(slot_id, 0); + + // Initially should not accept (no tid added) + EXPECT_FALSE(filter->accept(slot_id)); + + // Add tid and test accept + filter->add(1234, slot_id); + EXPECT_TRUE(filter->accept(slot_id)); + + // Remove and test + filter->remove(slot_id); + EXPECT_FALSE(filter->accept(slot_id)); +} + +TEST_F(ThreadFilterTest, DisabledFilterAcceptsAll) { + ThreadFilter disabled_filter; + disabled_filter.init(nullptr); // Disabled + + EXPECT_FALSE(disabled_filter.enabled()); + EXPECT_TRUE(disabled_filter.accept(-1)); + EXPECT_TRUE(disabled_filter.accept(0)); + EXPECT_TRUE(disabled_filter.accept(999999)); +} + +TEST_F(ThreadFilterTest, InvalidSlotHandling) { + // Test invalid slot IDs for accept() - still safe due to negative check + EXPECT_FALSE(filter->accept(-1)); + EXPECT_FALSE(filter->accept(-999)); + + // These should not crash + filter->add(1234, -1); + filter->remove(-1); + filter->unregisterThread(-1); + filter->unregisterThread(-999); +} + +TEST_F(ThreadFilterTest, ValidSlotIDContract) { + // Verify that all slot IDs returned by registerThread() are valid + std::vector slot_ids; + + for (int i = 0; i < 100; i++) { + int slot_id = filter->registerThread(); + ASSERT_GE(slot_id, 0) << "registerThread() returned invalid slot_id: " << slot_id; + ASSERT_LT(slot_id, ThreadFilter::kMaxThreads) << "slot_id out of range: " << slot_id; + + slot_ids.push_back(slot_id); + + // These operations should always be safe with slot_ids from registerThread() + filter->add(i + 10000, slot_id); + EXPECT_TRUE(filter->accept(slot_id)); + filter->remove(slot_id); + EXPECT_FALSE(filter->accept(slot_id)); + } + + // Verify slot IDs are unique (no duplicates) + std::set unique_slots(slot_ids.begin(), slot_ids.end()); + EXPECT_EQ(unique_slots.size(), slot_ids.size()) << "registerThread() returned duplicate slot IDs"; +} + +// Edge case: Maximum capacity +TEST_F(ThreadFilterTest, MaxCapacityReached) { + std::vector slot_ids; + + // Register up to the maximum + for (int i = 0; i < ThreadFilter::kMaxThreads; i++) { + int slot_id = filter->registerThread(); + if (slot_id >= 0) { + slot_ids.push_back(slot_id); + filter->add(i + 1000, slot_id); // Use unique tids + } + } + + fprintf(stderr, "Successfully registered %zu slots (max=%d)\n", + slot_ids.size(), ThreadFilter::kMaxThreads); + + // Should have registered all slots + EXPECT_EQ(slot_ids.size(), ThreadFilter::kMaxThreads); + + // Next registration should fail + int overflow_slot = filter->registerThread(); + EXPECT_EQ(overflow_slot, -1); + + // Verify all registered slots work + std::vector collected_tids; + filter->collect(collected_tids); + EXPECT_EQ(collected_tids.size(), ThreadFilter::kMaxThreads); + + // Verify all tids are unique + std::set unique_tids(collected_tids.begin(), collected_tids.end()); + EXPECT_EQ(unique_tids.size(), ThreadFilter::kMaxThreads); +} + +// Edge case: Recovery after max capacity +TEST_F(ThreadFilterTest, RecoveryAfterMaxCapacity) { + std::vector slot_ids; + + // Fill to capacity + for (int i = 0; i < ThreadFilter::kMaxThreads; i++) { + int slot_id = filter->registerThread(); + ASSERT_GE(slot_id, 0); + slot_ids.push_back(slot_id); + filter->add(i + 2000, slot_id); + } + + // Should fail to register more + EXPECT_EQ(filter->registerThread(), -1); + + // Unregister half the slots + int slots_to_free = ThreadFilter::kMaxThreads / 2; + for (int i = 0; i < slots_to_free; i++) { + filter->unregisterThread(slot_ids[i]); + slot_ids[i] = -1; // Mark as freed + } + + // Should be able to register new slots again + std::vector new_slot_ids; + for (int i = 0; i < slots_to_free; i++) { + int slot_id = filter->registerThread(); + EXPECT_GE(slot_id, 0) << "Failed to register slot " << i << " after freeing"; + new_slot_ids.push_back(slot_id); + filter->add(i + 3000, slot_id); + } + + // Verify we can still register up to capacity + EXPECT_EQ(new_slot_ids.size(), slots_to_free); + + // Should fail again when at capacity + EXPECT_EQ(filter->registerThread(), -1); + + // Verify collect works correctly + std::vector collected_tids; + filter->collect(collected_tids); + EXPECT_EQ(collected_tids.size(), ThreadFilter::kMaxThreads); +} + +// Free list stress test +TEST_F(ThreadFilterTest, FreeListStressTest) { + const int iterations = 1000; + const int batch_size = 100; + + for (int iter = 0; iter < iterations; iter++) { + std::vector slot_ids; + + // Register a batch + for (int i = 0; i < batch_size; i++) { + int slot_id = filter->registerThread(); + ASSERT_GE(slot_id, 0); + slot_ids.push_back(slot_id); + filter->add(iter * batch_size + i, slot_id); + } + + // Verify all work + for (int slot_id : slot_ids) { + EXPECT_TRUE(filter->accept(slot_id)); + } + + // Unregister all + for (int slot_id : slot_ids) { + filter->unregisterThread(slot_id); + } + + // Verify cleanup + std::vector tids; + filter->collect(tids); + EXPECT_EQ(tids.size(), 0) << "Iteration " << iter << " left " << tids.size() << " tids"; + } +} + +// Multi-threaded edge case testing +TEST_F(ThreadFilterTest, ConcurrentMaxCapacityStress) { + const int num_threads = 8; + const int slots_per_thread = ThreadFilter::kMaxThreads / num_threads; + + std::vector threads; + std::atomic successful_registrations{0}; + std::atomic failed_registrations{0}; + std::vector> thread_slots(num_threads); + + // Each thread tries to register its share of slots + for (int t = 0; t < num_threads; t++) { + threads.emplace_back([&, t]() { + for (int i = 0; i < slots_per_thread + 10; i++) { // Try to over-register + int slot_id = filter->registerThread(); + if (slot_id >= 0) { + thread_slots[t].push_back(slot_id); + filter->add(t * 1000 + i, slot_id); + successful_registrations++; + } else { + failed_registrations++; + } + } + }); + } + + // Wait for all threads + for (auto& t : threads) { + t.join(); + } + + fprintf(stderr, "Successful: %d, Failed: %d, Total attempted: %d\n", + successful_registrations.load(), failed_registrations.load(), + num_threads * (slots_per_thread + 10)); + + // Should have registered exactly kMaxThreads + EXPECT_EQ(successful_registrations.load(), ThreadFilter::kMaxThreads); + EXPECT_GT(failed_registrations.load(), 0); // Some should have failed + + // Verify collect works + std::vector collected_tids; + filter->collect(collected_tids); + EXPECT_EQ(collected_tids.size(), ThreadFilter::kMaxThreads); +} + +// Chunk boundary testing +TEST_F(ThreadFilterTest, ChunkBoundaryBehavior) { + std::vector slot_ids; + + // Register enough slots to span multiple chunks + int slots_to_register = ThreadFilter::kChunkSize * 3 + 10; // 3+ chunks + + for (int i = 0; i < slots_to_register; i++) { + int slot_id = filter->registerThread(); + ASSERT_GE(slot_id, 0) << "Failed at slot " << i; + slot_ids.push_back(slot_id); + filter->add(i + 5000, slot_id); + } + + // Verify all chunks work correctly + for (int i = 0; i < slots_to_register; i++) { + EXPECT_TRUE(filter->accept(slot_ids[i])) << "Slot " << slot_ids[i] << " (index " << i << ") not accepted"; + } + + // Test collect across chunks + std::vector collected_tids; + filter->collect(collected_tids); + EXPECT_EQ(collected_tids.size(), slots_to_register); + + // Verify tids are correct + std::sort(collected_tids.begin(), collected_tids.end()); + for (int i = 0; i < slots_to_register; i++) { + EXPECT_EQ(collected_tids[i], i + 5000) << "TID mismatch at position " << i; + } +} + +// Race condition testing for add/remove/accept +TEST_F(ThreadFilterTest, ConcurrentAddRemoveAccept) { + const int num_threads = 4; + const int operations_per_thread = 10000; + + // Pre-register slots for each thread + std::vector slot_ids(num_threads); + for (int i = 0; i < num_threads; i++) { + slot_ids[i] = filter->registerThread(); + ASSERT_GE(slot_ids[i], 0); + } + + std::vector threads; + std::atomic total_operations{0}; + + for (int t = 0; t < num_threads; t++) { + threads.emplace_back([&, t]() { + int slot_id = slot_ids[t]; + int tid = t + 6000; + + for (int i = 0; i < operations_per_thread; i++) { + // Add + filter->add(tid, slot_id); + + // Should accept + bool accepted = filter->accept(slot_id); + if (!accepted) { + fprintf(stderr, "Thread %d: accept failed after add (op %d)\n", t, i); + } + + // Remove + filter->remove(slot_id); + + // Should not accept + accepted = filter->accept(slot_id); + if (accepted) { + fprintf(stderr, "Thread %d: accept succeeded after remove (op %d)\n", t, i); + } + + total_operations++; + } + }); + } + + // Wait for all threads + for (auto& t : threads) { + t.join(); + } + + EXPECT_EQ(total_operations.load(), num_threads * operations_per_thread); + + // Final state should be empty + std::vector final_tids; + filter->collect(final_tids); + EXPECT_EQ(final_tids.size(), 0); +} + +// Free list exhaustion and recovery +TEST_F(ThreadFilterTest, FreeListExhaustionRecovery) { + // Fill up the free list by registering and unregistering + std::vector slot_ids; + + // Register many slots + for (int i = 0; i < ThreadFilter::kFreeListSize + 100; i++) { + int slot_id = filter->registerThread(); + if (slot_id >= 0) { + slot_ids.push_back(slot_id); + filter->add(i + 7000, slot_id); + } + } + + fprintf(stderr, "Registered %zu slots\n", slot_ids.size()); + + // Unregister all (this should fill the free list) + for (int slot_id : slot_ids) { + filter->unregisterThread(slot_id); + } + + // Try to register new slots - should reuse from free list + std::vector new_slot_ids; + for (int i = 0; i < 100; i++) { + int slot_id = filter->registerThread(); + EXPECT_GE(slot_id, 0) << "Failed to reuse slot " << i; + new_slot_ids.push_back(slot_id); + filter->add(i + 8000, slot_id); + } + + // Verify reused slots work + for (int slot_id : new_slot_ids) { + EXPECT_TRUE(filter->accept(slot_id)); + } + + std::vector final_tids; + filter->collect(final_tids); + EXPECT_EQ(final_tids.size(), new_slot_ids.size()); +} + +// Performance regression test - only run in release builds +#ifdef NDEBUG +TEST_F(ThreadFilterTest, PerformanceRegression) { + const int num_operations = 100000; + + // Pre-register slots + std::vector slot_ids; + for (int i = 0; i < 100; i++) { + int slot_id = filter->registerThread(); + ASSERT_GE(slot_id, 0); + slot_ids.push_back(slot_id); + } + + auto start = std::chrono::high_resolution_clock::now(); + + // Perform many add/accept/remove operations + for (int i = 0; i < num_operations; i++) { + int slot_id = slot_ids[i % slot_ids.size()]; + filter->add(i, slot_id); + bool accepted = filter->accept(slot_id); + EXPECT_TRUE(accepted); + filter->remove(slot_id); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start); + + fprintf(stderr, "Performance: %d operations in %ld microseconds (%.2f ns/op)\n", + num_operations, duration.count(), + (double)duration.count() * 1000.0 / num_operations); + + // Should be fast - less than 200ns per operation is reasonable for this complex test + EXPECT_LT(duration.count() * 1000.0 / num_operations, 200.0); // 200ns per op max +} +#endif // NDEBUG + +// Collect behavior with mixed states +TEST_F(ThreadFilterTest, CollectMixedStates) { + std::vector slot_ids; + std::vector expected_tids; + + // Register slots and add some tids, leave others empty + for (int i = 0; i < 50; i++) { + int slot_id = filter->registerThread(); + ASSERT_GE(slot_id, 0); + slot_ids.push_back(slot_id); + + if (i % 3 == 0) { // Add tid to every 3rd slot + filter->add(i + 9000, slot_id); + expected_tids.push_back(i + 9000); + } + // Leave other slots empty + } + + // Collect should only return slots with tids + std::vector collected_tids; + filter->collect(collected_tids); + + std::sort(expected_tids.begin(), expected_tids.end()); + std::sort(collected_tids.begin(), collected_tids.end()); + + EXPECT_EQ(collected_tids.size(), expected_tids.size()); + for (size_t i = 0; i < expected_tids.size(); i++) { + EXPECT_EQ(collected_tids[i], expected_tids[i]); + } +} \ No newline at end of file From 68035f67f06262db05901706bc0104c7c2dcc96b Mon Sep 17 00:00:00 2001 From: r1viollet Date: Thu, 21 Aug 2025 12:07:57 +0200 Subject: [PATCH 11/13] threadIDTable - add testing for the fixed size table --- ddprof-lib/src/main/cpp/branch_hints.h | 44 ++++ ddprof-lib/src/main/cpp/javaApi.cpp | 1 + ddprof-lib/src/main/cpp/threadFilter.cpp | 10 +- ddprof-lib/src/main/cpp/threadIdTable.h | 36 ++- ddprof-lib/src/test/cpp/branch_hints_ut.cpp | 72 +++++ ddprof-lib/src/test/cpp/threadIdTable_ut.cpp | 263 +++++++++++++++++++ 6 files changed, 406 insertions(+), 20 deletions(-) create mode 100644 ddprof-lib/src/main/cpp/branch_hints.h create mode 100644 ddprof-lib/src/test/cpp/branch_hints_ut.cpp create mode 100644 ddprof-lib/src/test/cpp/threadIdTable_ut.cpp diff --git a/ddprof-lib/src/main/cpp/branch_hints.h b/ddprof-lib/src/main/cpp/branch_hints.h new file mode 100644 index 000000000..99212f5ec --- /dev/null +++ b/ddprof-lib/src/main/cpp/branch_hints.h @@ -0,0 +1,44 @@ +/* + * Copyright 2025 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _BRANCH_HINTS_H +#define _BRANCH_HINTS_H + +/** + * Branch prediction hints for performance optimization. + * + * These macros help the compiler generate more efficient code by providing + * hints about which branches are more likely to be taken. + * + * Usage: + * if (likely(common_condition)) { ... } + * if (unlikely(error_condition)) { ... } + * + * Implementation: + * - GCC/Clang: Uses __builtin_expect for branch prediction hints + * - Other compilers: No-op (just returns the condition) + */ +// todo: in c++ 20, we can use [[likely]] and [[unlikely]] + +#if defined(__GNUC__) || defined(__clang__) +#define likely(x) __builtin_expect(!!(x), 1) +#define unlikely(x) __builtin_expect(!!(x), 0) +#else +#define likely(x) (x) +#define unlikely(x) (x) +#endif + +#endif // _BRANCH_HINTS_H \ No newline at end of file diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 3bff4e8de..525b01f3e 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -16,6 +16,7 @@ #include +#include "branch_hints.h" #include "context.h" #include "counters.h" #include "engine.h" diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index 5491c82b0..ac56306e0 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -20,21 +20,13 @@ // - These methods assume slot_id comes from registerThread() (undefined behavior otherwise) #include "threadFilter.h" +#include "branch_hints.h" #include #include #include #include -// Branch prediction hints for hot paths -#ifdef __GNUC__ -#define likely(x) __builtin_expect(!!(x), 1) -#define unlikely(x) __builtin_expect(!!(x), 0) -#else -#define likely(x) (x) -#define unlikely(x) (x) -#endif - ThreadFilter::ShardHead ThreadFilter::_free_heads[ThreadFilter::kShardCount] {}; ThreadFilter::ThreadFilter() : _enabled(false) { diff --git a/ddprof-lib/src/main/cpp/threadIdTable.h b/ddprof-lib/src/main/cpp/threadIdTable.h index 1856c6522..42b77d05f 100644 --- a/ddprof-lib/src/main/cpp/threadIdTable.h +++ b/ddprof-lib/src/main/cpp/threadIdTable.h @@ -1,7 +1,17 @@ /* - * Copyright The async-profiler authors - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2021, 2025 Datadog, Inc + * Copyright 2025 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #ifndef _THREADIDTABLE_H @@ -9,19 +19,23 @@ #include #include +#include "branch_hints.h" -// Signal-safe thread ID table with fixed size +// Simple fixed size thread ID table class ThreadIdTable { private: // We have 256 slots per concurrency level (currently 16) // This should cater for 4096 threads - if it turns out to be too small, we // can increase it or make it configurable - static const int TABLE_SIZE = 256; + static const int TABLE_SIZE = 256; // power of 2 + static const int TABLE_MASK = TABLE_SIZE - 1; // For fast bit masking std::atomic table[TABLE_SIZE]; int hash(int tid) const { - // Simple hash function - could be improved if needed - return tid % TABLE_SIZE; + // Improved hash function with bit mixing to reduce clustering + unsigned int utid = static_cast(tid); + utid ^= utid >> 16; // Mix high and low bits + return utid & TABLE_MASK; // Fast bit masking instead of modulo } public: @@ -31,11 +45,11 @@ class ThreadIdTable { // Signal-safe insertion using atomic operations only void insert(int tid) { - if (tid == 0) return; // Invalid thread ID, 0 is reserved for empty slots + if (unlikely(tid == 0)) return; // Invalid thread ID, 0 is reserved for empty slots int start_slot = hash(tid); for (int probe = 0; probe < TABLE_SIZE; probe++) { - int slot = (start_slot + probe) % TABLE_SIZE; + int slot = (start_slot + probe) & TABLE_MASK; // Fast bit masking int expected = 0; // Try to claim empty slot @@ -43,8 +57,8 @@ class ThreadIdTable { return; // Successfully inserted } - // Check if already present - if (table[slot].load(std::memory_order_relaxed) == tid) { + // Check if already present (common case - threads insert multiple times) + if (likely(table[slot].load(std::memory_order_relaxed) == tid)) { return; // Already exists } } diff --git a/ddprof-lib/src/test/cpp/branch_hints_ut.cpp b/ddprof-lib/src/test/cpp/branch_hints_ut.cpp new file mode 100644 index 000000000..b4bb260a7 --- /dev/null +++ b/ddprof-lib/src/test/cpp/branch_hints_ut.cpp @@ -0,0 +1,72 @@ +/* + * Copyright 2025 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "branch_hints.h" + +class BranchHintsTest : public ::testing::Test { +protected: + void SetUp() override {} + void TearDown() override {} +}; + +TEST_F(BranchHintsTest, BasicFunctionality) { + // Test that macros work and don't crash + int x = 42; + + if (likely(x == 42)) { + EXPECT_EQ(x, 42); + } + + if (unlikely(x == 0)) { + FAIL() << "This should not be reached"; + } + + // Test that the macros return the expected boolean values + EXPECT_TRUE(likely(true)); + EXPECT_FALSE(likely(false)); + EXPECT_TRUE(unlikely(true)); + EXPECT_FALSE(unlikely(false)); +} + +TEST_F(BranchHintsTest, ImplementationInfo) { + // Report what implementation is being used + #if defined(__GNUC__) || defined(__clang__) + fprintf(stderr, "Using __builtin_expect for branch hints\n"); + #else + fprintf(stderr, "Using no-op branch hints (fallback)\n"); + #endif + + // This test always passes - it's just for diagnostics + EXPECT_TRUE(true); +} + +TEST_F(BranchHintsTest, MacroExpansion) { + // Test that macros expand to valid expressions + volatile bool condition = true; + + // These should compile and work correctly + bool likely_result = likely(condition); + bool unlikely_result = unlikely(condition); + + EXPECT_EQ(likely_result, condition); + EXPECT_EQ(unlikely_result, condition); + + // Test with complex expressions + int a = 10, b = 20; + EXPECT_TRUE(likely(a < b)); + EXPECT_FALSE(unlikely(a > b)); +} \ No newline at end of file diff --git a/ddprof-lib/src/test/cpp/threadIdTable_ut.cpp b/ddprof-lib/src/test/cpp/threadIdTable_ut.cpp new file mode 100644 index 000000000..59f99d441 --- /dev/null +++ b/ddprof-lib/src/test/cpp/threadIdTable_ut.cpp @@ -0,0 +1,263 @@ +/* + * Copyright 2025 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "threadIdTable.h" +#include +#include +#include +#include +#include +#include + +class ThreadIdTableTest : public ::testing::Test { +protected: + void SetUp() override { + table = std::make_unique(); + } + + void TearDown() override { + table.reset(); + } + + std::unique_ptr table; +}; + +// Basic functionality tests +TEST_F(ThreadIdTableTest, BasicInsertAndCollect) { + // Insert some thread IDs + table->insert(1001); + table->insert(1002); + table->insert(1003); + + // Collect and verify + std::unordered_set result; + table->collect(result); + + EXPECT_EQ(result.size(), 3); + EXPECT_TRUE(result.count(1001)); + EXPECT_TRUE(result.count(1002)); + EXPECT_TRUE(result.count(1003)); +} + +TEST_F(ThreadIdTableTest, InvalidThreadIdHandling) { + // Invalid thread ID (0) should be ignored + table->insert(0); + + std::unordered_set result; + table->collect(result); + EXPECT_EQ(result.size(), 0); + + // Negative thread IDs should still work (they're valid Linux thread IDs) + table->insert(-1); + table->collect(result); + EXPECT_EQ(result.size(), 1); + EXPECT_TRUE(result.count(-1)); +} + +TEST_F(ThreadIdTableTest, DuplicateInsertions) { + // Insert same thread ID multiple times + table->insert(2001); + table->insert(2001); + table->insert(2001); + + std::unordered_set result; + table->collect(result); + + // Should only appear once + EXPECT_EQ(result.size(), 1); + EXPECT_TRUE(result.count(2001)); +} + +TEST_F(ThreadIdTableTest, ClearFunctionality) { + // Insert some thread IDs + table->insert(3001); + table->insert(3002); + table->insert(3003); + + // Verify they're there + std::unordered_set result; + table->collect(result); + EXPECT_EQ(result.size(), 3); + + // Clear and verify empty + table->clear(); + result.clear(); + table->collect(result); + EXPECT_EQ(result.size(), 0); +} + +// Hash collision testing +TEST_F(ThreadIdTableTest, HashCollisions) { + // Create thread IDs that will hash to the same slot + // TABLE_SIZE = 256, so tids with same (tid % 256) will collide + std::vector colliding_tids; + int base_tid = 1000; + + // Generate 10 thread IDs that hash to the same slot + for (int i = 0; i < 10; i++) { + int tid = base_tid + (i * 256); // All will hash to same slot + colliding_tids.push_back(tid); + table->insert(tid); + } + + // All should be stored (linear probing should handle collisions) + std::unordered_set result; + table->collect(result); + + EXPECT_EQ(result.size(), colliding_tids.size()); + for (int tid : colliding_tids) { + EXPECT_TRUE(result.count(tid)) << "Missing tid: " << tid; + } +} + +// Capacity testing +TEST_F(ThreadIdTableTest, TableCapacityLimits) { + std::vector inserted_tids; + + // Try to insert more than TABLE_SIZE (256) unique thread IDs + for (int i = 1; i <= 300; i++) { // More than TABLE_SIZE + table->insert(i + 10000); // Use high numbers to avoid conflicts + inserted_tids.push_back(i + 10000); + } + + // Collect and see how many were actually stored + std::unordered_set result; + table->collect(result); + + fprintf(stderr, "Inserted %zu tids, collected %zu tids\n", + inserted_tids.size(), result.size()); + + // Should have stored at most TABLE_SIZE (256) + EXPECT_LE(result.size(), 256); + + // All collected tids should be from our inserted set + for (int tid : result) { + EXPECT_TRUE(std::find(inserted_tids.begin(), inserted_tids.end(), tid) != inserted_tids.end()) + << "Unexpected tid in result: " << tid; + } +} + +// Concurrent access testing (signal safety) +TEST_F(ThreadIdTableTest, ConcurrentInsertions) { + const int num_threads = 8; + const int tids_per_thread = 50; + + std::vector threads; + std::atomic successful_insertions{0}; + std::vector> thread_tids(num_threads); + + // Each thread inserts its own set of thread IDs + for (int t = 0; t < num_threads; t++) { + threads.emplace_back([&, t]() { + for (int i = 0; i < tids_per_thread; i++) { + int tid = t * 1000 + i + 20000; // Unique per thread + thread_tids[t].push_back(tid); + table->insert(tid); + successful_insertions++; + } + }); + } + + // Wait for all threads + for (auto& t : threads) { + t.join(); + } + + EXPECT_EQ(successful_insertions.load(), num_threads * tids_per_thread); + + // Collect and verify + std::unordered_set result; + table->collect(result); + + // Should have all unique thread IDs (or at least most of them) + std::set all_expected_tids; + for (const auto& thread_tids_vec : thread_tids) { + for (int tid : thread_tids_vec) { + all_expected_tids.insert(tid); + } + } + + fprintf(stderr, "Expected %zu unique tids, collected %zu tids\n", + all_expected_tids.size(), result.size()); + + // Table has fixed capacity of 256, so with 400 unique tids, we expect exactly 256 + EXPECT_EQ(result.size(), std::min(all_expected_tids.size(), (size_t)256)); + + // All collected tids should be valid + for (int tid : result) { + EXPECT_TRUE(all_expected_tids.count(tid)) << "Unexpected tid: " << tid; + } +} + +// Edge case: Realistic thread ID patterns +TEST_F(ThreadIdTableTest, RealisticThreadIds) { + // Linux thread IDs are typically large numbers + std::vector realistic_tids = { + 12345, 12346, 12347, // Sequential + 98765, 98766, 98767, // Another sequence + 1234567, 1234568, // Large numbers + 2147483647, // Max int + -1, -2, -3 // Negative (valid in some contexts) + }; + + for (int tid : realistic_tids) { + table->insert(tid); + } + + std::unordered_set result; + table->collect(result); + + // Should have all except tid=0 if any + size_t expected_size = realistic_tids.size(); + EXPECT_EQ(result.size(), expected_size); + + for (int tid : realistic_tids) { + if (tid != 0) { // 0 is invalid and ignored + EXPECT_TRUE(result.count(tid)) << "Missing realistic tid: " << tid; + } + } +} + +// Performance test (release builds only) +#ifdef NDEBUG +TEST_F(ThreadIdTableTest, PerformanceRegression) { + const int num_operations = 100000; + std::vector tids; + + // Pre-generate thread IDs + for (int i = 0; i < 100; i++) { + tids.push_back(i + 30000); + } + + auto start = std::chrono::high_resolution_clock::now(); + + // Perform many insertions + for (int i = 0; i < num_operations; i++) { + table->insert(tids[i % tids.size()]); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start); + + fprintf(stderr, "ThreadIdTable Performance: %d operations in %ld microseconds (%.2f ns/op)\n", + num_operations, duration.count(), + (double)duration.count() * 1000.0 / num_operations); + + // Should be very fast for signal-safe operations + EXPECT_LT(duration.count() * 1000.0 / num_operations, 50.0); // 50ns per op max +} +#endif // NDEBUG \ No newline at end of file From 658097dc375ef4f90fbbe6ab3e54464f2584fd95 Mon Sep 17 00:00:00 2001 From: r1viollet Date: Thu, 21 Aug 2025 12:12:07 +0200 Subject: [PATCH 12/13] threadFilter bench - Minor cleanup of print lines --- .../scenarios/counters/ThreadFilterBenchmark.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/counters/ThreadFilterBenchmark.java b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/counters/ThreadFilterBenchmark.java index eb0df4d9e..fde05206c 100644 --- a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/counters/ThreadFilterBenchmark.java +++ b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/counters/ThreadFilterBenchmark.java @@ -218,15 +218,6 @@ public long threadFilterStress() throws InterruptedException { } operationCount.incrementAndGet(); } - - // if (operationCount.get() % 1000 == 0) { - // String progressMsg = String.format("Thread %d completed %d operations%n", threadId, operationCount.get()); - // System.out.print(progressMsg); - // if (logWriter != null) { - // logWriter.print(progressMsg); - // logWriter.flush(); - // } - // } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); From e78a6b24ba563da7bfaa0beffd11db1fbf42888f Mon Sep 17 00:00:00 2001 From: r1viollet Date: Thu, 21 Aug 2025 15:56:59 +0200 Subject: [PATCH 13/13] ThreadFilter optim - fixes - Fix removal of self in timerloop init it was not using a slotID but a thread ID - Add assertion to find other potential issues --- ddprof-lib/src/main/cpp/branch_hints.h | 44 ------------- ddprof-lib/src/main/cpp/javaApi.cpp | 2 +- ddprof-lib/src/main/cpp/threadFilter.cpp | 15 ++++- ddprof-lib/src/main/cpp/threadIdTable.h | 2 +- ddprof-lib/src/main/cpp/wallClock.h | 11 +++- ddprof-lib/src/test/cpp/branch_hints_ut.cpp | 72 --------------------- 6 files changed, 24 insertions(+), 122 deletions(-) delete mode 100644 ddprof-lib/src/main/cpp/branch_hints.h delete mode 100644 ddprof-lib/src/test/cpp/branch_hints_ut.cpp diff --git a/ddprof-lib/src/main/cpp/branch_hints.h b/ddprof-lib/src/main/cpp/branch_hints.h deleted file mode 100644 index 99212f5ec..000000000 --- a/ddprof-lib/src/main/cpp/branch_hints.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2025 Datadog, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef _BRANCH_HINTS_H -#define _BRANCH_HINTS_H - -/** - * Branch prediction hints for performance optimization. - * - * These macros help the compiler generate more efficient code by providing - * hints about which branches are more likely to be taken. - * - * Usage: - * if (likely(common_condition)) { ... } - * if (unlikely(error_condition)) { ... } - * - * Implementation: - * - GCC/Clang: Uses __builtin_expect for branch prediction hints - * - Other compilers: No-op (just returns the condition) - */ -// todo: in c++ 20, we can use [[likely]] and [[unlikely]] - -#if defined(__GNUC__) || defined(__clang__) -#define likely(x) __builtin_expect(!!(x), 1) -#define unlikely(x) __builtin_expect(!!(x), 0) -#else -#define likely(x) (x) -#define unlikely(x) (x) -#endif - -#endif // _BRANCH_HINTS_H \ No newline at end of file diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 525b01f3e..4afbb1479 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -16,7 +16,7 @@ #include -#include "branch_hints.h" +#include "arch_dd.h" #include "context.h" #include "counters.h" #include "engine.h" diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index ac56306e0..60ea2a409 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -20,9 +20,11 @@ // - These methods assume slot_id comes from registerThread() (undefined behavior otherwise) #include "threadFilter.h" -#include "branch_hints.h" +#include "arch_dd.h" +#include #include +#include #include #include #include @@ -181,10 +183,17 @@ void ThreadFilter::remove(SlotID slot_id) { int chunk_idx = slot_id >> kChunkShift; int slot_idx = slot_id & kChunkMask; + if (unlikely(chunk_idx >= kMaxChunks)) { + assert(false && "Invalid slot_id in ThreadFilter::remove - should not happen after wall clock fix"); + return; + } + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_relaxed); - if (likely(chunk != nullptr)) { - chunk->slots[slot_idx].value.store(-1, std::memory_order_release); + if (unlikely(chunk == nullptr)) { + return; } + + chunk->slots[slot_idx].value.store(-1, std::memory_order_release); } void ThreadFilter::unregisterThread(SlotID slot_id) { diff --git a/ddprof-lib/src/main/cpp/threadIdTable.h b/ddprof-lib/src/main/cpp/threadIdTable.h index 42b77d05f..f7d3b58ef 100644 --- a/ddprof-lib/src/main/cpp/threadIdTable.h +++ b/ddprof-lib/src/main/cpp/threadIdTable.h @@ -19,7 +19,7 @@ #include #include -#include "branch_hints.h" +#include "arch_dd.h" // Simple fixed size thread ID table class ThreadIdTable { diff --git a/ddprof-lib/src/main/cpp/wallClock.h b/ddprof-lib/src/main/cpp/wallClock.h index 0ead37fac..0e52115f7 100644 --- a/ddprof-lib/src/main/cpp/wallClock.h +++ b/ddprof-lib/src/main/cpp/wallClock.h @@ -21,6 +21,7 @@ #include "os.h" #include "profiler.h" #include "reservoirSampler.h" +#include "thread.h" #include "threadFilter.h" #include "threadState.h" #include "tsc.h" @@ -67,7 +68,15 @@ class BaseWallClock : public Engine { threads.reserve(reservoirSize); int self = OS::threadId(); ThreadFilter* thread_filter = Profiler::instance()->threadFilter(); - thread_filter->remove(self); + + // We don't want to profile ourselves in wall time + ProfiledThread* current = ProfiledThread::current(); + if (current != nullptr) { + int slot_id = current->filterSlotId(); + if (slot_id != -1) { + thread_filter->remove(slot_id); + } + } u64 startTime = TSC::ticks(); WallClockEpochEvent epoch(startTime); diff --git a/ddprof-lib/src/test/cpp/branch_hints_ut.cpp b/ddprof-lib/src/test/cpp/branch_hints_ut.cpp deleted file mode 100644 index b4bb260a7..000000000 --- a/ddprof-lib/src/test/cpp/branch_hints_ut.cpp +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2025 Datadog, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include "branch_hints.h" - -class BranchHintsTest : public ::testing::Test { -protected: - void SetUp() override {} - void TearDown() override {} -}; - -TEST_F(BranchHintsTest, BasicFunctionality) { - // Test that macros work and don't crash - int x = 42; - - if (likely(x == 42)) { - EXPECT_EQ(x, 42); - } - - if (unlikely(x == 0)) { - FAIL() << "This should not be reached"; - } - - // Test that the macros return the expected boolean values - EXPECT_TRUE(likely(true)); - EXPECT_FALSE(likely(false)); - EXPECT_TRUE(unlikely(true)); - EXPECT_FALSE(unlikely(false)); -} - -TEST_F(BranchHintsTest, ImplementationInfo) { - // Report what implementation is being used - #if defined(__GNUC__) || defined(__clang__) - fprintf(stderr, "Using __builtin_expect for branch hints\n"); - #else - fprintf(stderr, "Using no-op branch hints (fallback)\n"); - #endif - - // This test always passes - it's just for diagnostics - EXPECT_TRUE(true); -} - -TEST_F(BranchHintsTest, MacroExpansion) { - // Test that macros expand to valid expressions - volatile bool condition = true; - - // These should compile and work correctly - bool likely_result = likely(condition); - bool unlikely_result = unlikely(condition); - - EXPECT_EQ(likely_result, condition); - EXPECT_EQ(unlikely_result, condition); - - // Test with complex expressions - int a = 10, b = 20; - EXPECT_TRUE(likely(a < b)); - EXPECT_FALSE(unlikely(a > b)); -} \ No newline at end of file