-
-
Notifications
You must be signed in to change notification settings - Fork 75
Closed
Labels
Description
While trying to workaround #478
I came up with the following version which still fails, but for reasons related to @spawn
in @spawn lock(() -> fit!(agg(), data), lck)
.
using Dagger: @spawn, @shard
using Distributed
# add two further julia processes which could run on other machines
addprocs(2, exeflags="--threads=2")
# Distributed.@everywhere execute code on all machines
@everywhere using Dagger, DataFrames, OnlineStats
# Dagger uses both Threads and Machines as processes
Dagger.all_processors()
# let's distributes some calculations
aggregators = [Mean, Variance, Extrema]
df = DataFrame()
# @sync waits until all enclosed calls to @spawn are ready
@sync for i in 1:1000
data = @spawn rand(10000)
# This creates a lock per worker. If the task is run on
# a worker, the correct lock is automatically picked up.
# Needed for multi-threading access to data.
lck = @shard ReentrantLock()
for agg in aggregators
res = @spawn lock(() -> fit!(agg(), data), lck)
push!(df, (i=i, aggregator=nameof(agg), result=res))
end
end
df.result .= fetch.(df.result)
results in the following error
julia> df.result .= fetch.(df.result)
ERROR: ThunkFailedException:
Root Exception Type: RemoteException
Root Exception:
On worker 2:
MethodError: no method matching iterate(::Dagger.EagerThunk)
Closest candidates are:
iterate(::DataStructures.TrieIterator)
@ DataStructures ~/.julia/packages/DataStructures/t9DKl/src/trie.jl:112
iterate(::DataStructures.TrieIterator, ::Any)
@ DataStructures ~/.julia/packages/DataStructures/t9DKl/src/trie.jl:112
iterate(::Pkg.Types.Manifest, ::Int64)
@ Pkg ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Pkg/src/Types.jl:314
...
Stacktrace:
[1] fit!
@ ~/.julia/packages/OnlineStatsBase/FMY19/src/OnlineStatsBase.jl:137
[2] #3
@ ./REPL[8]:5
[3] lock
@ ./lock.jl:229
[4] #invokelatest#2
@ ./essentials.jl:892 [inlined]
[5] invokelatest
@ ./essentials.jl:889 [inlined]
[6] #41
@ ~/.julia/packages/Dagger/Tx54v/src/threadproc.jl:20
Stacktrace:
[1] wait
@ ./task.jl:352 [inlined]
[2] fetch
@ ./task.jl:372 [inlined]
[3] #execute!#40
@ ~/.julia/packages/Dagger/Tx54v/src/threadproc.jl:26
[4] execute!
@ ~/.julia/packages/Dagger/Tx54v/src/threadproc.jl:13
[5] #167
@ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1611 [inlined]
[6] #21
@ ~/.julia/packages/Dagger/Tx54v/src/options.jl:18 [inlined]
[7] #1
@ ~/.julia/packages/ScopedValues/Kvcrb/src/ScopedValues.jl:163
[8] with_logstate
@ ./logging.jl:515
[9] with_logger
@ ./logging.jl:627 [inlined]
[10] enter_scope
@ ~/.julia/packages/ScopedValues/Kvcrb/src/payloadlogger.jl:17 [inlined]
[11] with
@ ~/.julia/packages/ScopedValues/Kvcrb/src/ScopedValues.jl:162
[12] with_options
@ ~/.julia/packages/Dagger/Tx54v/src/options.jl:17
[13] do_task
@ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1609
[14] #143
@ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1302
This Thunk: Thunk(id=9, lock(#3, Dagger.Shard(Dict{Dagger.Processor, Dagger.Chunk}(OSProc(1) => Dagger.Chunk{ReentrantLock, MemPool.DRef, OSProc, ProcessScope}(ReentrantLock, UnitDomain(), MemPool.DRef(1, 6, 0x0000000000000060), OSProc(1), ProcessScope: worker == 1, false), OSProc(2) => Dagger.Chunk{ReentrantLock, MemPool.DRef, OSProc, ProcessScope}(ReentrantLock, UnitDomain(), MemPool.DRef(2, 1, 0x0000000000000060), OSProc(2), ProcessScope: worker == 2, false), OSProc(3) => Dagger.Chunk{ReentrantLock, MemPool.DRef, OSProc, ProcessScope}(ReentrantLock, UnitDomain(), MemPool.DRef(3, 0, 0x0000000000000060), OSProc(3), ProcessScope: worker == 3, false)))))
Stacktrace:
[1] fetch(t::Dagger.ThunkFuture; proc::OSProc, raw::Bool)
@ Dagger ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:16
[2] fetch
@ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:11 [inlined]
[3] #fetch#73
@ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:58 [inlined]
[4] fetch
@ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:54 [inlined]
[5] _broadcast_getindex_evalf
@ ./broadcast.jl:709 [inlined]
[6] _broadcast_getindex
@ ./broadcast.jl:682 [inlined]
[7] getindex
@ ./broadcast.jl:636 [inlined]
[8] copy
@ ./broadcast.jl:942 [inlined]
[9] materialize
@ ./broadcast.jl:903 [inlined]
[10] copyto!(lazydf::DataFrames.LazyNewColDataFrame{…}, bc::Base.Broadcast.Broadcasted{…})
@ DataFrames ~/.julia/packages/DataFrames/58MUJ/src/other/broadcasting.jl:207
[11] materialize!
@ ./broadcast.jl:914 [inlined]
[12] materialize!(dest::DataFrames.LazyNewColDataFrame{…}, bc::Base.Broadcast.Broadcasted{…})
@ Base.Broadcast ./broadcast.jl:911
[13] top-level scope
@ REPL[9]:1
Some type information was truncated. Use `show(err)` to see complete types.
This might relate to #357, but I am unsure.
Metadata
Metadata
Assignees
Labels
Type
Projects
Milestone
Relationships
Development
Select code repository
Activity
[-]BUG? `@spawn` does not work with nested function call[/-][+]`@spawn` does not work with nested function call[/+]jpsamaroo commentedon Mar 6, 2024
This is not exactly a bug per-se, as we've always only unwrapped
EagerThunk
dependencies when passed as explicit arguments toDagger.@spawn
.Yet, it is also desirable to allow something like this, where the
EagerThunk
is embedded within a closure. I have a branch where I've worked on this, but it never materialized since we need to both pull out such implicit arguments from the closure, but also put them back into the same closure, which I couldn't figure out how to do.Anyway, I agree that it's worth doing this, but it will take some time to implement, and it's not my top priority. In the meantime, either use
Dagger.spawn
or instead pass it as an explicit argument.