From c288701d39ad463c2212b1b0418ba8c1397fe07e Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Sun, 19 Sep 2021 12:31:22 -0500 Subject: [PATCH 1/2] Force reschedule in adjust_pressure! Co-authored-by: krynju --- src/sch/eager.jl | 1 + test/thunk.jl | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/src/sch/eager.jl b/src/sch/eager.jl index fdc1004d8..ddec116ac 100644 --- a/src/sch/eager.jl +++ b/src/sch/eager.jl @@ -44,6 +44,7 @@ function adjust_pressure!(h::SchedulerHandle, proc::Processor, pressure) end function _adjust_pressure!(ctx, state, task, tid, (pid, proc, pressure)) state.worker_pressure[pid][proc] += pressure + put!(state.chan, RescheduleSignal()) nothing end diff --git a/test/thunk.jl b/test/thunk.jl index d55caab79..9c3eff8f9 100644 --- a/test/thunk.jl +++ b/test/thunk.jl @@ -240,4 +240,12 @@ end @test a.f.scope isa NodeScope end end + @testset "parent fetch child, one thread" begin + # Issue #282 + + s = p -> p == Dagger.ThreadProc(1, 1) + f = (x) -> 10 + x + g = (x) -> fetch(Dagger.spawn(f, x; proclist=s)) + fetch(Dagger.spawn(g, 10; proclist=s)) + end end From 91ba2294e502c576744f659d747276799f7a01c6 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Wed, 22 Sep 2021 09:58:23 -0500 Subject: [PATCH 2/2] Only reschedule on negative pressure changes Co-authored-by: krynju --- src/sch/eager.jl | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/sch/eager.jl b/src/sch/eager.jl index ddec116ac..10b3b819f 100644 --- a/src/sch/eager.jl +++ b/src/sch/eager.jl @@ -32,8 +32,9 @@ function init_eager() end end -"Adjusts the scheduler's cached pressure indicator for the specified worker by -the specified amount." +"Adjusts the scheduler's cached pressure indicators for the specified worker by +the specified amount, and signals the scheduler to try scheduling again if +pressure decreased." function adjust_pressure!(h::SchedulerHandle, proc::Processor, pressure) uid = Dagger.get_tls().sch_uid lock(TASK_SYNC) do @@ -44,7 +45,9 @@ function adjust_pressure!(h::SchedulerHandle, proc::Processor, pressure) end function _adjust_pressure!(ctx, state, task, tid, (pid, proc, pressure)) state.worker_pressure[pid][proc] += pressure - put!(state.chan, RescheduleSignal()) + if pressure < 0 + put!(state.chan, RescheduleSignal()) + end nothing end