diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index 179bc3170fa52..cb79a4ec530ef 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -47,6 +47,7 @@ function _threadsfor(iter, lbody, schedule) range = iter.args[2] quote local threadsfor_fun + local threadsfor_fun_dynamic let range = $(esc(range)) function threadsfor_fun(onethread=false) r = range # Load into local variable @@ -85,6 +86,15 @@ function _threadsfor(iter, lbody, schedule) $(esc(lbody)) end end + idx = Atomic{UInt}(1) + function threadsfor_fun_dynamic() + r = range # Load into local variable + lenr = length(r) + while (i = atomic_add!(idx, UInt(1))) <= lenr + local $(esc(lidx)) = @inbounds r[i] + $(esc(lbody)) + end + end end if threadid() != 1 || ccall(:jl_in_threaded_region, Cint, ()) != 0 $(if schedule === :static @@ -94,7 +104,11 @@ function _threadsfor(iter, lbody, schedule) :(Base.invokelatest(threadsfor_fun, true)) end) else - threading_run(threadsfor_fun) + $(if schedule === :dynamic + :(threading_run(threadsfor_fun_dynamic)) + else + :(threading_run(threadsfor_fun)) + end) end nothing end @@ -110,15 +124,20 @@ A barrier is placed at the end of the loop which waits for all tasks to finish execution. The `schedule` argument can be used to request a particular scheduling policy. -The only currently supported value is `:static`, which creates one task per thread +Currently supported values are `:static` and `:dynamic`. `:static` creates one task per thread and divides the iterations equally among them. Specifying `:static` is an error if used from inside another `@threads` loop or from a thread other than 1. +`:dynamic` creates one task per thread if possible and gets one new item from the iterations +only if previous item is processed until all items are processed. The default schedule (used when no `schedule` argument is present) is subject to change. !!! compat "Julia 1.5" The `schedule` argument is available as of Julia 1.5. +!!! compat "Julia 1.7" + The `schedule` argument supports `:dynamic` as of Julia 1.7. + See also: [`@spawn`](@ref Threads.@spawn), [`nthreads()`](@ref Threads.nthreads), [`threadid()`](@ref Threads.threadid), `pmap` in [`Distributed`](@ref man-distributed), `BLAS.set_num_threads` in [`LinearAlgebra`](@ref man-linalg). @@ -133,7 +152,7 @@ macro threads(args...) # for now only allow quoted symbols sched = nothing end - if sched !== :static + if sched ∉ [:static, :dynamic] throw(ArgumentError("unsupported schedule argument in @threads")) end elseif na == 1 diff --git a/test/threads_exec.jl b/test/threads_exec.jl index 860f0e03e2f5e..09c1d6e6d2640 100644 --- a/test/threads_exec.jl +++ b/test/threads_exec.jl @@ -724,16 +724,28 @@ let a = zeros(nthreads()) @test a == [1:nthreads();] end -# static schedule -function _atthreads_static_schedule() - ids = zeros(Int, nthreads()) - Threads.@threads :static for i = 1:nthreads() - ids[i] = Threads.threadid() +@testset "@threads schedule option" begin + # static schedule + function _atthreads_static_schedule() + ids = zeros(Int, nthreads()) + Threads.@threads :static for i = 1:nthreads() + ids[i] = Threads.threadid() + end + return ids + end + @test _atthreads_static_schedule() == [1:nthreads();] + @test_throws TaskFailedException @threads for i = 1:1; _atthreads_static_schedule(); end + + # dynamic schedule + function _atthreads_dynamic_schedule() + ids = zeros(Int, nthreads()) + Threads.@threads :dynamic for i = 1:nthreads() + ids[i] = i + end + return ids end - return ids + @test _atthreads_dynamic_schedule() == [1:nthreads();] end -@test _atthreads_static_schedule() == [1:nthreads();] -@test_throws TaskFailedException @threads for i = 1:1; _atthreads_static_schedule(); end try @macroexpand @threads(for i = 1:10, j = 1:10; end)