Skip to content
10 changes: 6 additions & 4 deletions src/jltypes.c
Original file line number Diff line number Diff line change
Expand Up @@ -2692,7 +2692,7 @@ void jl_init_types(void) JL_GC_DISABLED
NULL,
jl_any_type,
jl_emptysvec,
jl_perm_symsvec(15,
jl_perm_symsvec(16,
"next",
"queue",
"storage",
Expand All @@ -2707,8 +2707,9 @@ void jl_init_types(void) JL_GC_DISABLED
"_state",
"sticky",
"_isexception",
"priority"),
jl_svec(15,
"priority",
"cpu_time_ns"),
jl_svec(16,
jl_any_type,
jl_any_type,
jl_any_type,
Expand All @@ -2723,7 +2724,8 @@ void jl_init_types(void) JL_GC_DISABLED
jl_uint8_type,
jl_bool_type,
jl_bool_type,
jl_uint16_type),
jl_uint16_type,
jl_uint64_type),
jl_emptysvec,
0, 1, 6);
jl_value_t *listt = jl_new_struct(jl_uniontype_type, jl_task_type, jl_nothing_type);
Expand Down
4 changes: 4 additions & 0 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1928,8 +1928,12 @@ typedef struct _jl_task_t {
_Atomic(uint8_t) _isexception; // set if `result` is an exception to throw or that we exited with
// multiqueue priority
uint16_t priority;
// TODO: int32 of ms instead?
uint64_t cpu_time_ns; // time this task has spent running; updated when it yields

// hidden state:
// timestamp this task was last scheduled (TODO: int32 of ms instead?)
uint64_t last_scheduled_ns;
// id of owning thread - does not need to be defined until the task runs
_Atomic(int16_t) tid;
// threadpool id
Expand Down
9 changes: 9 additions & 0 deletions src/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ void JL_NORETURN jl_finish_task(jl_task_t *t)
{
jl_task_t *ct = jl_current_task;
JL_PROBE_RT_FINISH_TASK(ct);
ct->cpu_time_ns += jl_hrtime() - ct->last_scheduled_ns;
JL_SIGATOMIC_BEGIN();
if (jl_atomic_load_relaxed(&t->_isexception))
jl_atomic_store_release(&t->_state, JL_TASK_STATE_FAILED);
Expand Down Expand Up @@ -640,6 +641,7 @@ JL_DLLEXPORT void jl_switch(void) JL_NOTSAFEPOINT_LEAVE JL_NOTSAFEPOINT_ENTER
jl_error("cannot switch to task running on another thread");

JL_PROBE_RT_PAUSE_TASK(ct);
ct->cpu_time_ns += jl_hrtime() - ct->last_scheduled_ns;

// Store old values on the stack and reset
sig_atomic_t defer_signal = ptls->defer_signal;
Expand Down Expand Up @@ -688,6 +690,8 @@ JL_DLLEXPORT void jl_switch(void) JL_NOTSAFEPOINT_LEAVE JL_NOTSAFEPOINT_ENTER
jl_sigint_safepoint(ptls);

JL_PROBE_RT_RUN_TASK(ct);
ct->last_scheduled_ns = jl_hrtime();

jl_gc_unsafe_leave(ptls, gc_state);
}

Expand Down Expand Up @@ -942,6 +946,8 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, jl_value_t *completion
t->reentrant_timing = 0;
t->reentrant_inference = 0;
t->inference_start_time = 0;
t->last_scheduled_ns = 0;
t->cpu_time_ns = 0;

#ifdef COPY_STACKS
if (!t->copy_stack) {
Expand Down Expand Up @@ -1077,6 +1083,7 @@ CFI_NORETURN

ct->started = 1;
JL_PROBE_RT_START_TASK(ct);
ct->last_scheduled_ns = jl_hrtime();
if (jl_atomic_load_relaxed(&ct->_isexception)) {
record_backtrace(ptls, 0);
jl_push_excstack(&ct->excstack, ct->result,
Expand Down Expand Up @@ -1530,6 +1537,8 @@ jl_task_t *jl_init_root_task(jl_ptls_t ptls, void *stack_lo, void *stack_hi)
ct->reentrant_timing = 0;
ct->reentrant_inference = 0;
ct->inference_start_time = 0;
ct->last_scheduled_ns = 0;
ct->cpu_time_ns = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, this PR was shockingly simple! 👌👌

We'll have to figure out all the details Jameson mentioned, but it looks great to me other than that!

ptls->root_task = ct;
jl_atomic_store_relaxed(&ptls->current_task, ct);
JL_GC_PROMISE_ROOTED(ct);
Expand Down
31 changes: 31 additions & 0 deletions test/threads_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Test
using Base.Threads
using Base.Threads: SpinLock, threadpoolsize
using LinearAlgebra: peakflops

# for cfunction_closure
include("testenv.jl")
Expand Down Expand Up @@ -1067,3 +1068,33 @@ end
popfirst!(LOAD_PATH)
end
end

@testset "CPU time counter" begin
t = Threads.@spawn begin
peakflops()
end
wait(t)
@test t.cpu_time_ns > 0
end

@testset "CPU time counter: lots of spawns" begin
using Base.Threads, Dates
# create more tasks than we have cores
# the CPU time each task gets should be less
# than the wall time
@sync begin
for i in 1:100
start_time = now()
task = @spawn begin
peakflops()
end
@spawn begin
wait(task)
end_time = now()
wall_time_delta = end_time - start_time
cpu_time = Nanosecond(task.cpu_time_ns)
@test cpu_time < wall_time_delta
end
end
end
end