diff --git a/src/jltypes.c b/src/jltypes.c index 9428bf6a91092..61e57adae78e0 100644 --- a/src/jltypes.c +++ b/src/jltypes.c @@ -2692,7 +2692,7 @@ void jl_init_types(void) JL_GC_DISABLED NULL, jl_any_type, jl_emptysvec, - jl_perm_symsvec(15, + jl_perm_symsvec(16, "next", "queue", "storage", @@ -2707,8 +2707,9 @@ void jl_init_types(void) JL_GC_DISABLED "_state", "sticky", "_isexception", - "priority"), - jl_svec(15, + "priority", + "cpu_time_ns"), + jl_svec(16, jl_any_type, jl_any_type, jl_any_type, @@ -2723,7 +2724,8 @@ void jl_init_types(void) JL_GC_DISABLED jl_uint8_type, jl_bool_type, jl_bool_type, - jl_uint16_type), + jl_uint16_type, + jl_uint64_type), jl_emptysvec, 0, 1, 6); jl_value_t *listt = jl_new_struct(jl_uniontype_type, jl_task_type, jl_nothing_type); diff --git a/src/julia.h b/src/julia.h index 03efa773d026c..b280f3690c56d 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1928,8 +1928,12 @@ typedef struct _jl_task_t { _Atomic(uint8_t) _isexception; // set if `result` is an exception to throw or that we exited with // multiqueue priority uint16_t priority; + // TODO: int32 of ms instead? + uint64_t cpu_time_ns; // time this task has spent running; updated when it yields // hidden state: + // timestamp this task was last scheduled (TODO: int32 of ms instead?) + uint64_t last_scheduled_ns; // id of owning thread - does not need to be defined until the task runs _Atomic(int16_t) tid; // threadpool id diff --git a/src/task.c b/src/task.c index 7373de937b9ae..5fa0a9ba7b1c4 100644 --- a/src/task.c +++ b/src/task.c @@ -294,6 +294,7 @@ void JL_NORETURN jl_finish_task(jl_task_t *t) { jl_task_t *ct = jl_current_task; JL_PROBE_RT_FINISH_TASK(ct); + ct->cpu_time_ns += jl_hrtime() - ct->last_scheduled_ns; JL_SIGATOMIC_BEGIN(); if (jl_atomic_load_relaxed(&t->_isexception)) jl_atomic_store_release(&t->_state, JL_TASK_STATE_FAILED); @@ -640,6 +641,7 @@ JL_DLLEXPORT void jl_switch(void) JL_NOTSAFEPOINT_LEAVE JL_NOTSAFEPOINT_ENTER jl_error("cannot switch to task running on another thread"); JL_PROBE_RT_PAUSE_TASK(ct); + ct->cpu_time_ns += jl_hrtime() - ct->last_scheduled_ns; // Store old values on the stack and reset sig_atomic_t defer_signal = ptls->defer_signal; @@ -688,6 +690,8 @@ JL_DLLEXPORT void jl_switch(void) JL_NOTSAFEPOINT_LEAVE JL_NOTSAFEPOINT_ENTER jl_sigint_safepoint(ptls); JL_PROBE_RT_RUN_TASK(ct); + ct->last_scheduled_ns = jl_hrtime(); + jl_gc_unsafe_leave(ptls, gc_state); } @@ -942,6 +946,8 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, jl_value_t *completion t->reentrant_timing = 0; t->reentrant_inference = 0; t->inference_start_time = 0; + t->last_scheduled_ns = 0; + t->cpu_time_ns = 0; #ifdef COPY_STACKS if (!t->copy_stack) { @@ -1077,6 +1083,7 @@ CFI_NORETURN ct->started = 1; JL_PROBE_RT_START_TASK(ct); + ct->last_scheduled_ns = jl_hrtime(); if (jl_atomic_load_relaxed(&ct->_isexception)) { record_backtrace(ptls, 0); jl_push_excstack(&ct->excstack, ct->result, @@ -1530,6 +1537,8 @@ jl_task_t *jl_init_root_task(jl_ptls_t ptls, void *stack_lo, void *stack_hi) ct->reentrant_timing = 0; ct->reentrant_inference = 0; ct->inference_start_time = 0; + ct->last_scheduled_ns = 0; + ct->cpu_time_ns = 0; ptls->root_task = ct; jl_atomic_store_relaxed(&ptls->current_task, ct); JL_GC_PROMISE_ROOTED(ct); diff --git a/test/threads_exec.jl b/test/threads_exec.jl index 68ba9377cf955..8269443d05153 100644 --- a/test/threads_exec.jl +++ b/test/threads_exec.jl @@ -3,6 +3,7 @@ using Test using Base.Threads using Base.Threads: SpinLock, threadpoolsize +using LinearAlgebra: peakflops # for cfunction_closure include("testenv.jl") @@ -1067,3 +1068,33 @@ end popfirst!(LOAD_PATH) end end + +@testset "CPU time counter" begin + t = Threads.@spawn begin + peakflops() + end + wait(t) + @test t.cpu_time_ns > 0 +end + +@testset "CPU time counter: lots of spawns" begin + using Base.Threads, Dates + # create more tasks than we have cores + # the CPU time each task gets should be less + # than the wall time + @sync begin + for i in 1:100 + start_time = now() + task = @spawn begin + peakflops() + end + @spawn begin + wait(task) + end_time = now() + wall_time_delta = end_time - start_time + cpu_time = Nanosecond(task.cpu_time_ns) + @test cpu_time < wall_time_delta + end + end + end +end