Skip to content

Thread filter optim #238

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions ddprof-lib/src/main/cpp/flightRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ char *Recording::_jvm_flags = NULL;
char *Recording::_java_command = NULL;

Recording::Recording(int fd, Arguments &args)
: _fd(fd), _thread_set(), _method_map() {
: _fd(fd), _method_map() {

args.save(_args);
_chunk_start = lseek(_fd, 0, SEEK_END);
Expand All @@ -330,6 +330,8 @@ Recording::Recording(int fd, Arguments &args)
_bytes_written = 0;

_tid = OS::threadId();
_active_index.store(0, std::memory_order_relaxed);

VM::jvmti()->GetAvailableProcessors(&_available_processors);

writeHeader(_buf);
Expand Down Expand Up @@ -1059,11 +1061,18 @@ void Recording::writeExecutionModes(Buffer *buf) {
}

void Recording::writeThreads(Buffer *buf) {
addThread(_tid);
std::vector<int> threads;
threads.reserve(_thread_set.size());
_thread_set.collect(threads);
_thread_set.clear();
int old_index = _active_index.fetch_xor(1, std::memory_order_acq_rel);
// After flip: new samples go into the new active set
// We flush from old_index (the previous active set)

std::unordered_set<int> threads;
threads.insert(_tid);

for (int i = 0; i < CONCURRENCY_LEVEL; ++i) {
// Collect thread IDs from the fixed-size table into the main set
_thread_ids[i][old_index].collect(threads);
_thread_ids[i][old_index].clear();
}

Profiler *profiler = Profiler::instance();
ThreadInfo *t_info = &profiler->_thread_info;
Expand All @@ -1072,15 +1081,15 @@ void Recording::writeThreads(Buffer *buf) {

buf->putVar64(T_THREAD);
buf->putVar64(threads.size());
for (int i = 0; i < threads.size(); i++) {
for (auto tid : threads) {
const char *thread_name;
jlong thread_id;
std::pair<std::shared_ptr<std::string>, u64> info = t_info->get(threads[i]);
std::pair<std::shared_ptr<std::string>, u64> info = t_info->get(tid);
if (info.first) {
thread_name = info.first->c_str();
thread_id = info.second;
} else {
snprintf(name_buf, sizeof(name_buf), "[tid=%d]", threads[i]);
snprintf(name_buf, sizeof(name_buf), "[tid=%d]", tid);
thread_name = name_buf;
thread_id = 0;
}
Expand All @@ -1090,9 +1099,9 @@ void Recording::writeThreads(Buffer *buf) {
(thread_id == 0 ? length + 1 : 2 * length) -
3 * 10; // 3x max varint length
flushIfNeeded(buf, required);
buf->putVar64(threads[i]);
buf->putVar64(tid);
buf->putUtf8(thread_name, length);
buf->putVar64(threads[i]);
buf->putVar64(tid);
if (thread_id == 0) {
buf->put8(0);
} else {
Expand Down Expand Up @@ -1442,7 +1451,11 @@ void Recording::recordCpuLoad(Buffer *buf, float proc_user, float proc_system,
flushIfNeeded(buf);
}

void Recording::addThread(int tid) { _thread_set.add(tid); }
// assumption is that we hold the lock (with lock_index)
void Recording::addThread(int lock_index, int tid) {
int active = _active_index.load(std::memory_order_acquire);
_thread_ids[lock_index][active].insert(tid);
}

Error FlightRecorder::start(Arguments &args, bool reset) {
const char *file = args.file();
Expand Down Expand Up @@ -1599,7 +1612,7 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u32 call_trace_id,
break;
}
_rec->flushIfNeeded(buf);
_rec->addThread(tid);
_rec->addThread(lock_index, tid);
}
}

Expand Down
11 changes: 9 additions & 2 deletions ddprof-lib/src/main/cpp/flightRecorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define _FLIGHTRECORDER_H

#include <map>
#include <unordered_set>

#include <limits.h>
#include <string.h>
Expand All @@ -34,6 +35,7 @@
#include "mutex.h"
#include "objectSampler.h"
#include "threadFilter.h"
#include "threadIdTable.h"
#include "vmEntry.h"

const u64 MAX_JLONG = 0x7fffffffffffffffULL;
Expand Down Expand Up @@ -127,9 +129,13 @@ class Recording {
static char *_java_command;

RecordingBuffer _buf[CONCURRENCY_LEVEL];
// we have several tables to avoid lock contention
// we have a second dimension to allow a switch in the active table
ThreadIdTable _thread_ids[CONCURRENCY_LEVEL][2];
std::atomic<int> _active_index{0}; // 0 or 1 globally

int _fd;
off_t _chunk_start;
ThreadFilter _thread_set;
MethodMap _method_map;

Arguments _args;
Expand Down Expand Up @@ -258,7 +264,8 @@ class Recording {
LockEvent *event);
void recordCpuLoad(Buffer *buf, float proc_user, float proc_system,
float machine_total);
void addThread(int tid);

void addThread(int lock_index, int tid);
};

class Lookup {
Expand Down
111 changes: 86 additions & 25 deletions ddprof-lib/src/main/cpp/javaApi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,101 @@ Java_com_datadoghq_profiler_JavaProfiler_getSamples(JNIEnv *env,
return (jlong)Profiler::instance()->total_samples();
}

// some duplication between add and remove, though we want to avoid having an extra branch in the hot path
extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(JNIEnv *env,
jobject unused) {
ProfiledThread *current = ProfiledThread::current();
if (unlikely(current == nullptr)) {
assert(false);
return;
}
int tid = current->tid();
if (unlikely(tid < 0)) {
return;
}
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
if (unlikely(!thread_filter->enabled())) {
return;
}

int slot_id = current->filterSlotId();
if (unlikely(slot_id == -1)) {
// Thread doesn't have a slot ID yet (e.g., main thread), so register it
slot_id = thread_filter->registerThread();
current->setFilterSlotId(slot_id);
}

if (unlikely(slot_id == -1)) {
return; // Failed to register thread
}
thread_filter->add(tid, slot_id);
}

extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0(JNIEnv *env,
jobject unused) {
ProfiledThread *current = ProfiledThread::current();
if (unlikely(current == nullptr)) {
assert(false);
return;
}
int tid = current->tid();
if (unlikely(tid < 0)) {
return;
}
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
if (unlikely(!thread_filter->enabled())) {
return;
}

int slot_id = current->filterSlotId();
if (unlikely(slot_id == -1)) {
// Thread doesn't have a slot ID yet - nothing to remove
return;
}
thread_filter->remove(slot_id);
}

// Backward compatibility for existing code
extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_filterThread0(JNIEnv *env,
jobject unused,
jboolean enable) {
int tid = ProfiledThread::currentTid();
if (tid < 0) {
ProfiledThread *current = ProfiledThread::current();
if (unlikely(current == nullptr)) {
assert(false);
return;
}
int tid = current->tid();
if (unlikely(tid < 0)) {
return;
}
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
if (unlikely(!thread_filter->enabled())) {
return;
}

int slot_id = current->filterSlotId();
if (unlikely(slot_id == -1)) {
if (enable) {
// Thread doesn't have a slot ID yet, so register it
slot_id = thread_filter->registerThread();
current->setFilterSlotId(slot_id);
} else {
// Thread doesn't have a slot ID yet - nothing to remove
return;
}
}

if (unlikely(slot_id == -1)) {
return; // Failed to register thread
}

if (enable) {
thread_filter->add(tid);
thread_filter->add(tid, slot_id);
} else {
thread_filter->remove(tid);
thread_filter->remove(slot_id);
}
}

Expand Down Expand Up @@ -406,24 +488,3 @@ Java_com_datadoghq_profiler_JVMAccess_healthCheck0(JNIEnv *env,
jobject unused) {
return true;
}

extern "C" DLLEXPORT jlong JNICALL
Java_com_datadoghq_profiler_ActiveBitmap_bitmapAddressFor0(JNIEnv *env,
jclass unused,
jint tid) {
u64* bitmap = Profiler::instance()->threadFilter()->bitmapAddressFor((int)tid);
return (jlong)bitmap;
}

extern "C" DLLEXPORT jboolean JNICALL
Java_com_datadoghq_profiler_ActiveBitmap_isActive0(JNIEnv *env,
jclass unused,
jint tid) {
return Profiler::instance()->threadFilter()->accept((int)tid) ? JNI_TRUE : JNI_FALSE;
}

extern "C" DLLEXPORT jlong JNICALL
Java_com_datadoghq_profiler_ActiveBitmap_getActiveCountAddr0(JNIEnv *env,
jclass unused) {
return (jlong)Profiler::instance()->threadFilter()->addressOfSize();
}
39 changes: 29 additions & 10 deletions ddprof-lib/src/main/cpp/profiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@ void Profiler::addRuntimeStub(const void *address, int length,

void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
ProfiledThread::initCurrentThread();

int tid = ProfiledThread::currentTid();
ProfiledThread *current = ProfiledThread::current();
int tid = current->tid();
if (_thread_filter.enabled()) {
_thread_filter.remove(tid);
int slot_id = _thread_filter.registerThread();
current->setFilterSlotId(slot_id);
_thread_filter.remove(slot_id); // Remove from filtering initially
}
updateThreadName(jvmti, jni, thread, true);

Expand All @@ -116,16 +118,33 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
}

void Profiler::onThreadEnd(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
int tid = ProfiledThread::currentTid();
if (_thread_filter.enabled()) {
_thread_filter.remove(tid);
ProfiledThread *current = ProfiledThread::current();
int tid = -1;

if (current != nullptr) {
// ProfiledThread is alive - do full cleanup and use efficient tid access
int slot_id = current->filterSlotId();
tid = current->tid();

if (_thread_filter.enabled()) {
_thread_filter.unregisterThread(slot_id);
current->setFilterSlotId(-1);
}

ProfiledThread::release();
} else {
// ProfiledThread already cleaned up - try to get tid from JVMTI as fallback
tid = VMThread::nativeThreadId(jni, thread);
if (tid < 0) {
// No ProfiledThread AND can't get tid from JVMTI - nothing we can do
return;
}
}
updateThreadName(jvmti, jni, thread, true);


// These can run if we have a valid tid
updateThreadName(jvmti, jni, thread, false); // false = not self
_cpu_engine->unregisterThread(tid);
// unregister here because JNI callers generally don't know about thread exits
_wall_engine->unregisterThread(tid);
ProfiledThread::release();
}

int Profiler::registerThread(int tid) {
Expand Down
6 changes: 5 additions & 1 deletion ddprof-lib/src/main/cpp/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ class ProfiledThread : public ThreadLocalData {
u32 _wall_epoch;
u32 _call_trace_id;
u32 _recording_epoch;
int _filter_slot_id; // Slot ID for thread filtering
UnwindFailures _unwind_failures;

ProfiledThread(int buffer_pos, int tid)
: ThreadLocalData(), _pc(0), _span_id(0), _crash_depth(0), _buffer_pos(buffer_pos), _tid(tid), _cpu_epoch(0),
_wall_epoch(0), _call_trace_id(0), _recording_epoch(0) {};
_wall_epoch(0), _call_trace_id(0), _recording_epoch(0), _filter_slot_id(-1) {};

void releaseFromBuffer();

Expand Down Expand Up @@ -120,6 +121,9 @@ class ProfiledThread : public ThreadLocalData {
}

static void signalHandler(int signo, siginfo_t *siginfo, void *ucontext);

int filterSlotId() { return _filter_slot_id; }
void setFilterSlotId(int slotId) { _filter_slot_id = slotId; }
};

#endif // _THREAD_H
Loading
Loading