Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 92 additions & 19 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ function task_done_hook(t::Task)

if err && !handled && Threads.threadid() == 1
if isa(result, InterruptException) && isdefined(Base, :active_repl_backend) &&
active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) &&
active_repl_backend.backend_task._state === task_state_runnable && isempty(FirstWorkqueue) &&
active_repl_backend.in_eval
throwto(active_repl_backend.backend_task, result) # this terminates the task
end
Expand All @@ -677,7 +677,7 @@ function task_done_hook(t::Task)
# issue #19467
if Threads.threadid() == 1 &&
isa(e, InterruptException) && isdefined(Base, :active_repl_backend) &&
active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) &&
active_repl_backend.backend_task._state === task_state_runnable && isempty(FirstWorkqueue) &&
active_repl_backend.in_eval
throwto(active_repl_backend.backend_task, e)
else
Expand Down Expand Up @@ -740,10 +740,37 @@ function list_deletefirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
return W
end

const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task}
global Workqueues::Vector{StickyWorkqueue} = [StickyWorkqueue()]
function trypop!(W::IntrusiveLinkedListSynchronized)
lock(W.lock)
try
if isempty(W.queue)
return nothing
else
return pop!(W.queue)
end
finally
unlock(W.lock)
end
end

function trypopfirst!(W::IntrusiveLinkedListSynchronized)
lock(W.lock)
try
if isempty(W.queue)
return nothing
else
return popfirst!(W.queue)
end
finally
unlock(W.lock)
end
end


const Workqueue = IntrusiveLinkedListSynchronized{Task}
global Workqueues::Vector{Workqueue} = [Workqueue()]
const FirstWorkqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable
const Workqueues_lock = Threads.SpinLock()
const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable

function workqueue_for(tid::Int)
qs = Workqueues
Expand All @@ -760,7 +787,7 @@ function workqueue_for(tid::Int)
global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs)
end
if !isassigned(qs, tid)
@inbounds qs[tid] = StickyWorkqueue()
@inbounds qs[tid] = Workqueue()
end
return @inbounds qs[tid]
end
Expand All @@ -784,16 +811,21 @@ function enq_work(t::Task)
push!(workqueue_for(tid), t)
else
tp = Threads.threadpool(t)
if Threads.threadpoolsize(tp) == 1
# There's only one thread in the task's assigned thread pool;
# use its work queue.
tid = (tp === :interactive) ? 1 : Threads.threadpoolsize(:interactive)+1
ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
mytp = Threads.threadpool()
if tp == mytp
tid = Threads.threadid()
push!(workqueue_for(tid), t)
else
# Otherwise, put the task in the multiqueue.
Partr.multiq_insert(t, t.priority)
tid = 0
# pick a tid in the threadpool to push work to,
# must use the sticky-wq
N = Threads.threadpoolsize(tp)
offset = (tp === :interactive) ? 0 : Threads.threadpoolsize(:interactive)
if N != 1
tid = offset + Partr.cong(UInt32(N), Partr.unbias_cong(UInt32(N))) # TODO cache unbias_cong
else
tid = offset + 1
end
push!(workqueue_for(tid), t)
end
end
ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)
Expand Down Expand Up @@ -950,9 +982,14 @@ function ensure_rescheduled(othertask::Task)
nothing
end

function trypoptask(W::StickyWorkqueue)
function trypoptask(W::Workqueue)
while !isempty(W)
t = popfirst!(W)
t = trypopfirst!(W)
if !(t isa Task)
continue
end
t::Task

if t._state !== task_state_runnable
# assume this somehow got queued twice,
# probably broken now, but try discarding this switch and keep going
Expand All @@ -964,12 +1001,48 @@ function trypoptask(W::StickyWorkqueue)
end
return t
end
return Partr.multiq_deletemin()

# For now we don't steal across threadpools
tid = Threads.threadid() # store in workqueue?
tp = Threads.threadpool(tid)
N = Threads.threadpoolsize(tp)
offset = (tp === :interactive) ? 0 : Threads.threadpoolsize(:interactive)
if N == 1
return nothing # No steal possible
end
attempts = 0
while attempts < N
othertid = offset + Partr.cong(UInt32(N), Partr.unbias_cong(UInt32(N))) # TODO cache unbias_cong
if othertid == tid
continue
end
t = trypop!(workqueue_for(othertid)) # Steal from the back
if t isa Task
return t
end
attempts += 1
end
# TODO linear scan?

return nothing
end

checktaskempty = Partr.multiq_check_empty
function checktaskempty()
tid = Threads.threadid()
tp = Threads.threadpool(tid)
N = Threads.threadpoolsize(tp)
offset = (tp === :interactive) ? 0 : Threads.threadpoolsize(:interactive)

for tid in 1:N
W = workqueue_for(offset+tid)
if !isempty(W)
return false
end
end
return true
end

@noinline function poptask(W::StickyWorkqueue)
@noinline function poptask(W::Workqueue)
task = trypoptask(W)
if !(task isa Task)
task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty)
Expand Down