From 9a9821dc027f535cda84385c8ae84b03e763edbd Mon Sep 17 00:00:00 2001 From: krynju Date: Wed, 22 Sep 2021 00:33:20 +0200 Subject: [PATCH 01/28] add local_lock to Future, use it in fetch and put! --- stdlib/Distributed/src/remotecall.jl | 35 +++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 75caf7f3065b7..12c57c95186b1 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -27,11 +27,12 @@ mutable struct Future <: AbstractRemoteRef whence::Int id::Int v::Union{Some{Any}, Nothing} + local_lock::Threads.Condition Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) = - (r = new(w,rrid.whence,rrid.id,v); return test_existing_ref(r)) + (r = new(w, rrid.whence, rrid.id, v, Threads.Condition()); return test_existing_ref(r)) - Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],t[4]) # Useful for creating dummy, zeroed-out instances + Future(t::NTuple{4, Any}) = new(t[1], t[2], t[3], t[4], Threads.Condition()) # Useful for creating dummy, zeroed-out instances end """ @@ -582,8 +583,20 @@ is an exception, throws a [`RemoteException`](@ref) which captures the remote ex """ function fetch(r::Future) r.v !== nothing && return something(r.v) - v = call_on_owner(fetch_ref, r) - r.v = Some(v) + if r.where == myid() + lock(r.local_lock) + try + while r.v === nothing + wait(r.local_lock) + end + finally + unlock(r.local_lock) + end + return something(r.v) + else + v = call_on_owner(fetch_ref, r) + r.v = Some(v) + end send_del_client(r) v end @@ -611,8 +624,18 @@ value to the return value of the call upon completion. """ function put!(rr::Future, v) rr.v !== nothing && error("Future can be set only once") - call_on_owner(put_future, rr, v, myid()) - rr.v = Some(v) + if rr.where == myid() + lock(rr.local_lock) + try + rr.v = Some(v) + notify(rr.local_lock) + finally + unlock(rr.local_lock) + end + else + call_on_owner(put_future, rr, v, myid()) + rr.v = Some(v) + end rr end function put_future(rid, v, caller) From 81b3075a891efb5dd524b0e62e4b27880096c60e Mon Sep 17 00:00:00 2001 From: krynju Date: Wed, 22 Sep 2021 09:07:38 +0200 Subject: [PATCH 02/28] add corrections to the remote/clientref logic --- stdlib/Distributed/src/remotecall.jl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 12c57c95186b1..c3f68dce99700 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -596,8 +596,8 @@ function fetch(r::Future) else v = call_on_owner(fetch_ref, r) r.v = Some(v) + send_del_client(r) end - send_del_client(r) v end @@ -627,6 +627,7 @@ function put!(rr::Future, v) if rr.where == myid() lock(rr.local_lock) try + call_on_owner(put_future, rr, v, myid()) rr.v = Some(v) notify(rr.local_lock) finally From 656546d6f4d0575753e7a4ff4553d5dd64bde5d8 Mon Sep 17 00:00:00 2001 From: krynju Date: Tue, 28 Sep 2021 21:02:30 +0200 Subject: [PATCH 03/28] add additional check and ordering --- stdlib/Distributed/src/remotecall.jl | 42 +++++++++++----------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index c3f68dce99700..d13bd2fdaa30f 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -27,12 +27,11 @@ mutable struct Future <: AbstractRemoteRef whence::Int id::Int v::Union{Some{Any}, Nothing} - local_lock::Threads.Condition Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) = - (r = new(w, rrid.whence, rrid.id, v, Threads.Condition()); return test_existing_ref(r)) + (r = new(w, rrid.whence, rrid.id, v); return test_existing_ref(r)) - Future(t::NTuple{4, Any}) = new(t[1], t[2], t[3], t[4], Threads.Condition()) # Useful for creating dummy, zeroed-out instances + Future(t::NTuple{4, Any}) = new(t[1], t[2], t[3], t[4]) # Useful for creating dummy, zeroed-out instances end """ @@ -584,15 +583,14 @@ is an exception, throws a [`RemoteException`](@ref) which captures the remote ex function fetch(r::Future) r.v !== nothing && return something(r.v) if r.where == myid() - lock(r.local_lock) - try - while r.v === nothing - wait(r.local_lock) - end - finally - unlock(r.local_lock) + rv = lookup_ref(remoteref_id(r)) + @debug "fet; rid=$(objectid(rid)); rv=$(objectid(rv))" + + if r.v !== nothing + send_del_client(r) + return something(r.v) end - return something(r.v) + v = fetch(rv.c) else v = call_on_owner(fetch_ref, r) r.v = Some(v) @@ -601,7 +599,11 @@ function fetch(r::Future) v end -fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...) +fetch_ref(rid, args...) = begin + rv=lookup_ref(rid) + @debug "fet; rid=$(objectid(rid)); rv=$(objectid(rv))" + fetch(rv.c, args...) +end """ fetch(c::RemoteChannel) @@ -624,23 +626,13 @@ value to the return value of the call upon completion. """ function put!(rr::Future, v) rr.v !== nothing && error("Future can be set only once") - if rr.where == myid() - lock(rr.local_lock) - try - call_on_owner(put_future, rr, v, myid()) - rr.v = Some(v) - notify(rr.local_lock) - finally - unlock(rr.local_lock) - end - else - call_on_owner(put_future, rr, v, myid()) - rr.v = Some(v) - end + rr.v = Some(v) + call_on_owner(put_future, rr, v, myid()) rr end function put_future(rid, v, caller) rv = lookup_ref(rid) + @debug "put; rid=$(objectid(rid)); rv=$(objectid(rv))" isready(rv) && error("Future can be set only once") put!(rv, v) # The caller has the value and hence can be removed from the remote store. From 8d8712f67db4bb24d3e6382e67d0dabc8c607bf3 Mon Sep 17 00:00:00 2001 From: krynju Date: Wed, 29 Sep 2021 08:39:29 +0200 Subject: [PATCH 04/28] add some comments --- stdlib/Distributed/src/remotecall.jl | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) mode change 100644 => 100755 stdlib/Distributed/src/remotecall.jl diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl old mode 100644 new mode 100755 index d13bd2fdaa30f..c2745f11a9e14 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -583,14 +583,21 @@ is an exception, throws a [`RemoteException`](@ref) which captures the remote ex function fetch(r::Future) r.v !== nothing && return something(r.v) if r.where == myid() - rv = lookup_ref(remoteref_id(r)) + # this lookup_ref serves as a synchronization point (lock on client_refs) + # will either return the correct rv (if done before del_client in put!) + # or a wrong rv (if done after del_client) + # for the case of the wrong rv we should check r.v again and return without waiting on rv + # r.v will be populated at that time if the rv is wrong, because caching is done before del_client + # in case of a correct rv it's safe to fetch on it and we do it after the additional check + rv = lookup_ref(remoteref_id(r)) + @debug "fet; rid=$(objectid(rid)); rv=$(objectid(rv))" - if r.v !== nothing + if r.v !== nothing # check again, because the put! might have already cached send_del_client(r) return something(r.v) end - v = fetch(rv.c) + v = fetch(rv.c) # fetch on channel, because at this point we know we have the correct rv, which will eventually be put! else v = call_on_owner(fetch_ref, r) r.v = Some(v) @@ -626,10 +633,11 @@ value to the return value of the call upon completion. """ function put!(rr::Future, v) rr.v !== nothing && error("Future can be set only once") - rr.v = Some(v) + rr.v = Some(v) # cache before put_future call_on_owner(put_future, rr, v, myid()) rr end + function put_future(rid, v, caller) rv = lookup_ref(rid) @debug "put; rid=$(objectid(rid)); rv=$(objectid(rv))" From 7c0dff0ffca162b7718e73843a7a960770fdebe9 Mon Sep 17 00:00:00 2001 From: krynju Date: Wed, 29 Sep 2021 08:59:49 +0200 Subject: [PATCH 05/28] fix whitespace --- stdlib/Distributed/src/remotecall.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index c2745f11a9e14..71278fc5909c4 100755 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -589,7 +589,7 @@ function fetch(r::Future) # for the case of the wrong rv we should check r.v again and return without waiting on rv # r.v will be populated at that time if the rv is wrong, because caching is done before del_client # in case of a correct rv it's safe to fetch on it and we do it after the additional check - rv = lookup_ref(remoteref_id(r)) + rv = lookup_ref(remoteref_id(r)) @debug "fet; rid=$(objectid(rid)); rv=$(objectid(rv))" From 64922da9f31b18a6bb9b72cb98baa320188ff7a4 Mon Sep 17 00:00:00 2001 From: krynju Date: Wed, 29 Sep 2021 21:39:31 +0200 Subject: [PATCH 06/28] add atomic and remove del_client from put --- stdlib/Distributed/src/remotecall.jl | 47 +++++++++++++++------------- 1 file changed, 25 insertions(+), 22 deletions(-) mode change 100755 => 100644 stdlib/Distributed/src/remotecall.jl diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl old mode 100755 new mode 100644 index 71278fc5909c4..ad53eb3af0773 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -26,7 +26,7 @@ mutable struct Future <: AbstractRemoteRef where::Int whence::Int id::Int - v::Union{Some{Any}, Nothing} + @atomic v::Union{Some{Any}, Nothing} Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) = (r = new(w, rrid.whence, rrid.id, v); return test_existing_ref(r)) @@ -72,7 +72,7 @@ function test_existing_ref(r::AbstractRemoteRef) if isa(r, Future) && found.v === nothing && r.v !== nothing # we have recd the value from another source, probably a deserialized ref, send a del_client message send_del_client(r) - found.v = r.v + @atomic :sequentially_consistent found.v = r.v end return found::typeof(r) end @@ -92,7 +92,7 @@ function finalize_ref(r::AbstractRemoteRef) else # send_del_client only if the reference has not been set r.v === nothing && send_del_client_no_lock(r) - r.v = nothing + @atomic :sequentially_consistent r.v = nothing end r.where = 0 finally @@ -581,26 +581,28 @@ Further calls to `fetch` on the same reference return the cached value. If the r is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace. """ function fetch(r::Future) - r.v !== nothing && return something(r.v) - if r.where == myid() - # this lookup_ref serves as a synchronization point (lock on client_refs) - # will either return the correct rv (if done before del_client in put!) - # or a wrong rv (if done after del_client) - # for the case of the wrong rv we should check r.v again and return without waiting on rv - # r.v will be populated at that time if the rv is wrong, because caching is done before del_client - # in case of a correct rv it's safe to fetch on it and we do it after the additional check - rv = lookup_ref(remoteref_id(r)) - @debug "fet; rid=$(objectid(rid)); rv=$(objectid(rv))" + v = @atomic :sequentially_consistent r.v - if r.v !== nothing # check again, because the put! might have already cached - send_del_client(r) - return something(r.v) + v !== nothing && return something(v) + if r.where ==myid() + rv = lookup_ref(remoteref_id(r)) + v = @atomic :sequentially_consistent r.v + if v !== nothing + return something(v) + else + fetch(rv.c) end - v = fetch(rv.c) # fetch on channel, because at this point we know we have the correct rv, which will eventually be put! else v = call_on_owner(fetch_ref, r) - r.v = Some(v) + end + if r.where == myid() + lock(client_refs) do + @atomic :sequentially_consistent r.v = Some(v) + del_client(r) + end + else + @atomic :sequentially_consistent r.v = Some(v) send_del_client(r) end v @@ -609,7 +611,9 @@ end fetch_ref(rid, args...) = begin rv=lookup_ref(rid) @debug "fet; rid=$(objectid(rid)); rv=$(objectid(rv))" - fetch(rv.c, args...) + f = fetch(rv.c, args...) + @debug "endfet; rid=$(objectid(rid)); rv=$(objectid(rv))" + f end """ @@ -632,8 +636,8 @@ All asynchronous remote calls return `Future`s and set the value to the return value of the call upon completion. """ function put!(rr::Future, v) - rr.v !== nothing && error("Future can be set only once") - rr.v = Some(v) # cache before put_future + _, ok = @atomicreplace rr.v nothing => Some(v) + ok || error("Future can be set only once") call_on_owner(put_future, rr, v, myid()) rr end @@ -644,7 +648,6 @@ function put_future(rid, v, caller) isready(rv) && error("Future can be set only once") put!(rv, v) # The caller has the value and hence can be removed from the remote store. - del_client(rid, caller) nothing end From 389088b1ca91f108fd1e4e5669c340b6157c5377 Mon Sep 17 00:00:00 2001 From: krynju Date: Thu, 30 Sep 2021 10:04:10 +0200 Subject: [PATCH 07/28] add check in lock --- stdlib/Distributed/src/remotecall.jl | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index ad53eb3af0773..7d200242c7025 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -581,30 +581,25 @@ Further calls to `fetch` on the same reference return the cached value. If the r is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace. """ function fetch(r::Future) - v = @atomic :sequentially_consistent r.v - v !== nothing && return something(v) + if r.where ==myid() - rv = lookup_ref(remoteref_id(r)) - v = @atomic :sequentially_consistent r.v + (rv, v) = lock(client_refs) do + v = @atomic :sequentially_consistent r.v + rv = v === nothing ? lookup_ref(remoteref_id(r)) : nothing + rv, v + end if v !== nothing return something(v) else - fetch(rv.c) + v = fetch(rv.c) end else v = call_on_owner(fetch_ref, r) end - if r.where == myid() - lock(client_refs) do - @atomic :sequentially_consistent r.v = Some(v) - del_client(r) - end - else - @atomic :sequentially_consistent r.v = Some(v) - send_del_client(r) - end + @atomic :sequentially_consistent r.v = Some(v) + send_del_client(r) v end @@ -648,6 +643,7 @@ function put_future(rid, v, caller) isready(rv) && error("Future can be set only once") put!(rv, v) # The caller has the value and hence can be removed from the remote store. + del_client(rid, caller) nothing end From 7635442516e675075560bd81bc912dd761d95e56 Mon Sep 17 00:00:00 2001 From: krynju Date: Fri, 1 Oct 2021 15:46:58 +0200 Subject: [PATCH 08/28] add cleanup --- stdlib/Distributed/src/remotecall.jl | 36 ++++++++++------------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 7d200242c7025..ea4ba67260b74 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -71,13 +71,11 @@ function test_existing_ref(r::AbstractRemoteRef) @assert r.where > 0 if isa(r, Future) && found.v === nothing && r.v !== nothing # we have recd the value from another source, probably a deserialized ref, send a del_client message + @atomic found.v = r.v send_del_client(r) - @atomic :sequentially_consistent found.v = r.v end return found::typeof(r) end - - client_refs[r] = nothing finalizer(finalize_ref, r) return r end @@ -91,8 +89,9 @@ function finalize_ref(r::AbstractRemoteRef) send_del_client_no_lock(r) else # send_del_client only if the reference has not been set + r.v === nothing && send_del_client_no_lock(r) - @atomic :sequentially_consistent r.v = nothing + @atomic r.v = nothing end r.where = 0 finally @@ -581,35 +580,25 @@ Further calls to `fetch` on the same reference return the cached value. If the r is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace. """ function fetch(r::Future) - v = @atomic :sequentially_consistent r.v - v !== nothing && return something(v) - - if r.where ==myid() - (rv, v) = lock(client_refs) do - v = @atomic :sequentially_consistent r.v - rv = v === nothing ? lookup_ref(remoteref_id(r)) : nothing - rv, v + r.v !== nothing && return something(r.v) + if r.where == myid() + rv = lock(client_refs) do + r.v === nothing ? lookup_ref(remoteref_id(r)) : nothing end - if v !== nothing - return something(v) + if r.v !== nothing + return something(r.v) else v = fetch(rv.c) end else v = call_on_owner(fetch_ref, r) end - @atomic :sequentially_consistent r.v = Some(v) + @atomic r.v = Some(v) send_del_client(r) - v + r.v end -fetch_ref(rid, args...) = begin - rv=lookup_ref(rid) - @debug "fet; rid=$(objectid(rid)); rv=$(objectid(rv))" - f = fetch(rv.c, args...) - @debug "endfet; rid=$(objectid(rid)); rv=$(objectid(rv))" - f -end +fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...) """ fetch(c::RemoteChannel) @@ -639,7 +628,6 @@ end function put_future(rid, v, caller) rv = lookup_ref(rid) - @debug "put; rid=$(objectid(rid)); rv=$(objectid(rv))" isready(rv) && error("Future can be set only once") put!(rv, v) # The caller has the value and hence can be removed from the remote store. From 50c4cd264f393a4cccec080ed8aa95dad57863f8 Mon Sep 17 00:00:00 2001 From: krynju Date: Fri, 1 Oct 2021 16:35:16 +0200 Subject: [PATCH 09/28] fix a return mistake --- stdlib/Distributed/src/remotecall.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index ea4ba67260b74..ce385f876dfc8 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -595,7 +595,7 @@ function fetch(r::Future) end @atomic r.v = Some(v) send_del_client(r) - r.v + something(r.v) end fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...) From 92188ba5ae5a4a4572683e0af41b6fc28c498915 Mon Sep 17 00:00:00 2001 From: krynju Date: Sat, 2 Oct 2021 10:30:03 +0200 Subject: [PATCH 10/28] add one more ordering adjustment --- stdlib/Distributed/src/remotecall.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index ce385f876dfc8..9b27aec93e193 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -622,7 +622,7 @@ value to the return value of the call upon completion. function put!(rr::Future, v) _, ok = @atomicreplace rr.v nothing => Some(v) ok || error("Future can be set only once") - call_on_owner(put_future, rr, v, myid()) + call_on_owner(put_future, rr, something(rr.v), myid()) rr end From 306b6e488c673445a0b8dd280584dff4fb7c3bf6 Mon Sep 17 00:00:00 2001 From: krynju Date: Sat, 2 Oct 2021 10:47:24 +0200 Subject: [PATCH 11/28] fix accidentaly removed line --- stdlib/Distributed/src/remotecall.jl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 9b27aec93e193..45d119f1575b9 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -76,6 +76,8 @@ function test_existing_ref(r::AbstractRemoteRef) end return found::typeof(r) end + + client_refs[r] = nothing finalizer(finalize_ref, r) return r end From 07bbd7c097669a6fd4f43b7cce5eacc9931f8c38 Mon Sep 17 00:00:00 2001 From: krynju Date: Sat, 2 Oct 2021 10:49:08 +0200 Subject: [PATCH 12/28] fix unnecessary changes --- stdlib/Distributed/src/remotecall.jl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 45d119f1575b9..663fd23122d56 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -29,9 +29,9 @@ mutable struct Future <: AbstractRemoteRef @atomic v::Union{Some{Any}, Nothing} Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) = - (r = new(w, rrid.whence, rrid.id, v); return test_existing_ref(r)) + (r = new(w,rrid.whence,rrid.id,v); return test_existing_ref(r)) - Future(t::NTuple{4, Any}) = new(t[1], t[2], t[3], t[4]) # Useful for creating dummy, zeroed-out instances + Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],t[4]) # Useful for creating dummy, zeroed-out instances end """ @@ -91,7 +91,6 @@ function finalize_ref(r::AbstractRemoteRef) send_del_client_no_lock(r) else # send_del_client only if the reference has not been set - r.v === nothing && send_del_client_no_lock(r) @atomic r.v = nothing end From 38b64193bd5a7db1144c9ed01839b038d050f034 Mon Sep 17 00:00:00 2001 From: krynju Date: Sat, 9 Oct 2021 14:50:15 +0200 Subject: [PATCH 13/28] add @atomic back to loads + acquire/release --- stdlib/Distributed/src/remotecall.jl | 55 ++++++++++++++++++---------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 663fd23122d56..7086fb6d47f1c 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -69,10 +69,14 @@ function test_existing_ref(r::AbstractRemoteRef) found = getkey(client_refs, r, nothing) if found !== nothing @assert r.where > 0 - if isa(r, Future) && found.v === nothing && r.v !== nothing - # we have recd the value from another source, probably a deserialized ref, send a del_client message - @atomic found.v = r.v - send_del_client(r) + if isa(r, Future) + fv_cache = @atomic :acquire found.v + rv_cache = @atomic :acquire r.v + if fv_cache === nothing && rv_cache !== nothing + # we have recd the value from another source, probably a deserialized ref, send a del_client message + @atomic :release found.v = rv_cache + send_del_client(r) + end end return found::typeof(r) end @@ -91,8 +95,9 @@ function finalize_ref(r::AbstractRemoteRef) send_del_client_no_lock(r) else # send_del_client only if the reference has not been set - r.v === nothing && send_del_client_no_lock(r) - @atomic r.v = nothing + v_cache = @atomic :acquire r.v + v_cache === nothing && send_del_client_no_lock(r) + @atomic :release r.v = nothing end r.where = 0 finally @@ -201,7 +206,8 @@ isready(f) # will not block ``` """ function isready(rr::Future) - rr.v === nothing || return true + v_cache = @atomic :acquire rr.v + v_cache === nothing || return true rid = remoteref_id(rr) return if rr.where == myid() @@ -564,7 +570,7 @@ end Wait for a value to become available for the specified [`Future`](@ref). """ -wait(r::Future) = (r.v !== nothing && return r; call_on_owner(wait_ref, r, myid()); r) +wait(r::Future) = (v_cache = @atomic :acquire r.v; v_cache !== nothing && return r; call_on_owner(wait_ref, r, myid()); r) """ wait(r::RemoteChannel, args...) @@ -581,22 +587,28 @@ Further calls to `fetch` on the same reference return the cached value. If the r is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace. """ function fetch(r::Future) - r.v !== nothing && return something(r.v) + v_cache = @atomic :acquire r.v + v_cache !== nothing && return something(v_cache) + if r.where == myid() - rv = lock(client_refs) do - r.v === nothing ? lookup_ref(remoteref_id(r)) : nothing + v_cache, rv = lock(client_refs) do + v_cache = @atomic :acquire r.v + v_cache, v_cache === nothing ? lookup_ref(remoteref_id(r)) : nothing end - if r.v !== nothing - return something(r.v) + + if v_cache !== nothing + return something(v_cache) else - v = fetch(rv.c) + v_local = fetch(rv.c) end else - v = call_on_owner(fetch_ref, r) + v_local = call_on_owner(fetch_ref, r) end - @atomic r.v = Some(v) + + @atomic :release r.v = Some(v_local) send_del_client(r) - something(r.v) + v_cache = @atomic :acquire r.v + something(v_cache) end fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...) @@ -621,10 +633,15 @@ All asynchronous remote calls return `Future`s and set the value to the return value of the call upon completion. """ function put!(rr::Future, v) + rr.where == myid() && set_future_cache(rr, v) + call_on_owner(put_future, rr, v, myid()) + rr.where != myid() && set_future_cache(rr, v) + rr +end + +function set_future_cache(rr::Future, v) _, ok = @atomicreplace rr.v nothing => Some(v) ok || error("Future can be set only once") - call_on_owner(put_future, rr, something(rr.v), myid()) - rr end function put_future(rid, v, caller) From c5045cf308a428ab86429a617c75d0a820d36d87 Mon Sep 17 00:00:00 2001 From: krynju Date: Sat, 9 Oct 2021 14:55:47 +0200 Subject: [PATCH 14/28] fix whitespace --- stdlib/Distributed/src/remotecall.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 7086fb6d47f1c..04981c9f8092e 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -589,7 +589,7 @@ is an exception, throws a [`RemoteException`](@ref) which captures the remote ex function fetch(r::Future) v_cache = @atomic :acquire r.v v_cache !== nothing && return something(v_cache) - + if r.where == myid() v_cache, rv = lock(client_refs) do v_cache = @atomic :acquire r.v From 00400802fc5344e1a976fd189b732da97e45be2a Mon Sep 17 00:00:00 2001 From: krynju Date: Thu, 21 Oct 2021 23:27:26 +0200 Subject: [PATCH 15/28] review things applied (mostly) --- stdlib/Distributed/src/remotecall.jl | 59 ++++++++++++++------- stdlib/Distributed/test/distributed_exec.jl | 3 ++ 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 04981c9f8092e..9802212972c64 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -26,12 +26,13 @@ mutable struct Future <: AbstractRemoteRef where::Int whence::Int id::Int + lock::ReentrantLock @atomic v::Union{Some{Any}, Nothing} Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) = - (r = new(w,rrid.whence,rrid.id,v); return test_existing_ref(r)) + (r = new(w,rrid.whence,rrid.id,ReentrantLock(),v); return test_existing_ref(r)) - Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],t[4]) # Useful for creating dummy, zeroed-out instances + Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],ReentrantLock(),t[4]) # Useful for creating dummy, zeroed-out instances end """ @@ -71,11 +72,11 @@ function test_existing_ref(r::AbstractRemoteRef) @assert r.where > 0 if isa(r, Future) fv_cache = @atomic :acquire found.v - rv_cache = @atomic :acquire r.v + rv_cache = @atomic :monotonic r.v if fv_cache === nothing && rv_cache !== nothing # we have recd the value from another source, probably a deserialized ref, send a del_client message - @atomic :release found.v = rv_cache send_del_client(r) + @atomic :release found.v = rv_cache end end return found::typeof(r) @@ -206,7 +207,7 @@ isready(f) # will not block ``` """ function isready(rr::Future) - v_cache = @atomic :acquire rr.v + v_cache = @atomic rr.v v_cache === nothing || return true rid = remoteref_id(rr) @@ -570,7 +571,7 @@ end Wait for a value to become available for the specified [`Future`](@ref). """ -wait(r::Future) = (v_cache = @atomic :acquire r.v; v_cache !== nothing && return r; call_on_owner(wait_ref, r, myid()); r) +wait(r::Future) = (v_cache = @atomic r.v; v_cache !== nothing && return r; call_on_owner(wait_ref, r, myid()); r) """ wait(r::RemoteChannel, args...) @@ -587,12 +588,12 @@ Further calls to `fetch` on the same reference return the cached value. If the r is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace. """ function fetch(r::Future) - v_cache = @atomic :acquire r.v + v_cache = @atomic r.v v_cache !== nothing && return something(v_cache) if r.where == myid() - v_cache, rv = lock(client_refs) do - v_cache = @atomic :acquire r.v + v_cache, rv = lock(r.lock) do + v_cache = @atomic :monotonic r.v v_cache, v_cache === nothing ? lookup_ref(remoteref_id(r)) : nothing end @@ -605,9 +606,19 @@ function fetch(r::Future) v_local = call_on_owner(fetch_ref, r) end - @atomic :release r.v = Some(v_local) + v_cache = @atomic r.v + + if v_cache === nothing # call_on_owner case + v_old, status = lock(r.lock) do + @atomicreplace r.v nothing => Some(v_local) + end + # status == true - when value obtained through call_on_owner + # status == false - other siuations replace fails, because by this time the cache will always be populated + # why? put! performs caching and putting into channel under r.lock + v_cache = status ? v_local : v_old + end + send_del_client(r) - v_cache = @atomic :acquire r.v something(v_cache) end @@ -632,15 +643,27 @@ A `put!` on an already set `Future` throws an `Exception`. All asynchronous remote calls return `Future`s and set the value to the return value of the call upon completion. """ -function put!(rr::Future, v) - rr.where == myid() && set_future_cache(rr, v) - call_on_owner(put_future, rr, v, myid()) - rr.where != myid() && set_future_cache(rr, v) - rr +function put!(r::Future, v) + if r.where == myid() + rid = remoteref_id(r) + rv = lookup_ref(rid) + isready(rv) && error("Future can be set only once") + lock(r.lock) do + put!(rv, v) + set_future_cache(r, v) + end + del_client(rid, myid()) + else + lock(r.lock) do + call_on_owner(put_future, r, v, myid()) + set_future_cache(r, v) + end + end + r end -function set_future_cache(rr::Future, v) - _, ok = @atomicreplace rr.v nothing => Some(v) +function set_future_cache(r::Future, v) + _, ok = @atomicreplace r.v nothing => Some(v) ok || error("Future can be set only once") end diff --git a/stdlib/Distributed/test/distributed_exec.jl b/stdlib/Distributed/test/distributed_exec.jl index a113f5e38703e..6c415fe089e93 100644 --- a/stdlib/Distributed/test/distributed_exec.jl +++ b/stdlib/Distributed/test/distributed_exec.jl @@ -325,6 +325,9 @@ function test_regular_io_ser(ref::Distributed.AbstractRemoteRef) v = getfield(ref2, fld) if isa(v, Number) @test v === zero(typeof(v)) + elseif fld == :lock + @test v isa ReentrantLock + @test !islocked(v) elseif v !== nothing error(string("Add test for field ", fld)) end From 87d30f69fa0cf51e4a93ae6d80503110af06d39b Mon Sep 17 00:00:00 2001 From: krynju Date: Fri, 22 Oct 2021 19:31:45 +0200 Subject: [PATCH 16/28] add some comments --- stdlib/Distributed/src/remotecall.jl | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 9802212972c64..ea9051259d4f7 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -612,9 +612,11 @@ function fetch(r::Future) v_old, status = lock(r.lock) do @atomicreplace r.v nothing => Some(v_local) end - # status == true - when value obtained through call_on_owner - # status == false - other siuations replace fails, because by this time the cache will always be populated + # status == true - when value obtained through call_on_owner, put! done from a different worker + # status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated # why? put! performs caching and putting into channel under r.lock + + # for local put! use the cached value, for call_on_owner cases just take the v_local as it was just cached in r.v v_cache = status ? v_local : v_old end @@ -649,12 +651,12 @@ function put!(r::Future, v) rv = lookup_ref(rid) isready(rv) && error("Future can be set only once") lock(r.lock) do - put!(rv, v) - set_future_cache(r, v) + put!(rv, v) # this notifies the tasks waiting on the channel in fetch + set_future_cache(r, v) # set the cache before leaving the lock, so that the notified tasks already see it cached end del_client(rid, myid()) else - lock(r.lock) do + lock(r.lock) do # same idea as above if there were any local tasks fetching on this Future call_on_owner(put_future, r, v, myid()) set_future_cache(r, v) end From 4b634d33916df00ce40914d33aa88304e82b2017 Mon Sep 17 00:00:00 2001 From: krynju Date: Wed, 3 Nov 2021 18:53:45 +0100 Subject: [PATCH 17/28] fix comment --- stdlib/Distributed/src/remotecall.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index ea9051259d4f7..2afc331a26cc3 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -612,7 +612,7 @@ function fetch(r::Future) v_old, status = lock(r.lock) do @atomicreplace r.v nothing => Some(v_local) end - # status == true - when value obtained through call_on_owner, put! done from a different worker + # status == true - when value obtained through call_on_owner, put! called from a different worker # status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated # why? put! performs caching and putting into channel under r.lock From 2c78ed94b1cae5c4b86fb449fa545b795955c20a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krystian=20Guli=C5=84ski?= Date: Sat, 6 Nov 2021 00:21:31 +0100 Subject: [PATCH 18/28] Apply suggestions from code review Co-authored-by: Jameson Nash --- stdlib/Distributed/src/remotecall.jl | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 2afc331a26cc3..b28991be5327b 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -96,9 +96,9 @@ function finalize_ref(r::AbstractRemoteRef) send_del_client_no_lock(r) else # send_del_client only if the reference has not been set - v_cache = @atomic :acquire r.v + v_cache = @atomic :monotonic r.v v_cache === nothing && send_del_client_no_lock(r) - @atomic :release r.v = nothing + @atomic :monotonic r.v = nothing end r.where = 0 finally @@ -592,10 +592,10 @@ function fetch(r::Future) v_cache !== nothing && return something(v_cache) if r.where == myid() - v_cache, rv = lock(r.lock) do + @lock(r.lock, begin v_cache = @atomic :monotonic r.v - v_cache, v_cache === nothing ? lookup_ref(remoteref_id(r)) : nothing - end + rv = v_cache === nothing ? lookup_ref(remoteref_id(r)) : nothing + end) if v_cache !== nothing return something(v_cache) @@ -609,8 +609,8 @@ function fetch(r::Future) v_cache = @atomic r.v if v_cache === nothing # call_on_owner case - v_old, status = lock(r.lock) do - @atomicreplace r.v nothing => Some(v_local) + @lock r.lock begin + v_old, status = @atomicreplace r.v nothing => Some(v_local) end # status == true - when value obtained through call_on_owner, put! called from a different worker # status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated @@ -650,13 +650,13 @@ function put!(r::Future, v) rid = remoteref_id(r) rv = lookup_ref(rid) isready(rv) && error("Future can be set only once") - lock(r.lock) do + @lock r.lock begin put!(rv, v) # this notifies the tasks waiting on the channel in fetch set_future_cache(r, v) # set the cache before leaving the lock, so that the notified tasks already see it cached end del_client(rid, myid()) else - lock(r.lock) do # same idea as above if there were any local tasks fetching on this Future + @lock r.lock begin # same idea as above if there were any local tasks fetching on this Future call_on_owner(put_future, r, v, myid()) set_future_cache(r, v) end @@ -666,7 +666,7 @@ end function set_future_cache(r::Future, v) _, ok = @atomicreplace r.v nothing => Some(v) - ok || error("Future can be set only once") + @assert ok "internal consistency error detected for Future" end function put_future(rid, v, caller) From c93f743447ae7140bc87f8c90bd390d0f95c7054 Mon Sep 17 00:00:00 2001 From: krynju Date: Sat, 6 Nov 2021 12:20:01 +0100 Subject: [PATCH 19/28] fix the @lock issues --- stdlib/Distributed/src/remotecall.jl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index b28991be5327b..8c145b813c52f 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -592,10 +592,11 @@ function fetch(r::Future) v_cache !== nothing && return something(v_cache) if r.where == myid() - @lock(r.lock, begin + rv, v_cache = @lock r.lock begin v_cache = @atomic :monotonic r.v rv = v_cache === nothing ? lookup_ref(remoteref_id(r)) : nothing - end) + rv, v_cache + end if v_cache !== nothing return something(v_cache) @@ -609,8 +610,8 @@ function fetch(r::Future) v_cache = @atomic r.v if v_cache === nothing # call_on_owner case - @lock r.lock begin - v_old, status = @atomicreplace r.v nothing => Some(v_local) + v_old, status = @lock r.lock begin + @atomicreplace r.v nothing => Some(v_local) end # status == true - when value obtained through call_on_owner, put! called from a different worker # status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated From c59408e598b92bd08692ae169cdd1511ac6535e7 Mon Sep 17 00:00:00 2001 From: krynju Date: Sat, 6 Nov 2021 17:24:15 +0100 Subject: [PATCH 20/28] small adjustments --- stdlib/Distributed/src/remotecall.jl | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 8c145b813c52f..c0a09aa14baaf 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -71,6 +71,7 @@ function test_existing_ref(r::AbstractRemoteRef) if found !== nothing @assert r.where > 0 if isa(r, Future) + # this is only for copying the reference from Future to RemoteRef (just created) fv_cache = @atomic :acquire found.v rv_cache = @atomic :monotonic r.v if fv_cache === nothing && rv_cache !== nothing @@ -358,7 +359,10 @@ end channel_type(rr::RemoteChannel{T}) where {T} = T -serialize(s::ClusterSerializer, f::Future) = serialize(s, f, f.v === nothing) +function serialize(s::ClusterSerializer, f::Future) + v_cache = @atomic f.v + serialize(s, f, v_cache === nothing) +end serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true) function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient) if addclient @@ -370,14 +374,16 @@ end function deserialize(s::ClusterSerializer, t::Type{<:Future}) f = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t) - f2 = Future(f.where, RRID(f.whence, f.id), f.v) # ctor adds to client_refs table + fv_cache = @atomic f.v + f2 = Future(f.where, RRID(f.whence, f.id), fv_cache) # ctor adds to client_refs table # 1) send_add_client() is not executed when the ref is being serialized # to where it exists, hence do it here. # 2) If we have received a 'fetch'ed Future or if the Future ctor found an # already 'fetch'ed instance in client_refs (Issue #25847), we should not # track it in the backing RemoteValue store. - if f2.where == myid() && f2.v === nothing + f2v_cache = @atomic f2.v + if f2.where == myid() && f2v_cache === nothing add_client(remoteref_id(f2), myid()) end f2 @@ -613,9 +619,9 @@ function fetch(r::Future) v_old, status = @lock r.lock begin @atomicreplace r.v nothing => Some(v_local) end - # status == true - when value obtained through call_on_owner, put! called from a different worker + # status == true - when value obtained through call_on_owner # status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated - # why? put! performs caching and putting into channel under r.lock + # why? local put! performs caching and putting into channel under r.lock # for local put! use the cached value, for call_on_owner cases just take the v_local as it was just cached in r.v v_cache = status ? v_local : v_old From 4b8d7da79090d8d213e13b66dcc71c96085a1182 Mon Sep 17 00:00:00 2001 From: krynju Date: Wed, 17 Nov 2021 23:14:51 +0100 Subject: [PATCH 21/28] serialize adjustments --- stdlib/Distributed/src/remotecall.jl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index c0a09aa14baaf..68c7db289ea75 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -77,7 +77,7 @@ function test_existing_ref(r::AbstractRemoteRef) if fv_cache === nothing && rv_cache !== nothing # we have recd the value from another source, probably a deserialized ref, send a del_client message send_del_client(r) - @atomic :release found.v = rv_cache + @atomicreplace found.v nothing => rv_cache end end return found::typeof(r) @@ -360,8 +360,10 @@ end channel_type(rr::RemoteChannel{T}) where {T} = T function serialize(s::ClusterSerializer, f::Future) - v_cache = @atomic f.v - serialize(s, f, v_cache === nothing) + @lock f.lock begin + v_cache = @atomic f.v + serialize(s, f, v_cache === nothing) + end end serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true) function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient) From 3b68488d34ab1eeb16b98be2ab80d24d19483d39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krystian=20Guli=C5=84ski?= Date: Thu, 18 Nov 2021 00:10:05 +0100 Subject: [PATCH 22/28] Update stdlib/Distributed/src/remotecall.jl Co-authored-by: Jameson Nash --- stdlib/Distributed/src/remotecall.jl | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 68c7db289ea75..da6eace4d1f8a 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -360,10 +360,7 @@ end channel_type(rr::RemoteChannel{T}) where {T} = T function serialize(s::ClusterSerializer, f::Future) - @lock f.lock begin - v_cache = @atomic f.v - serialize(s, f, v_cache === nothing) - end + @lock f.lock serialize(s, f, f.v === nothing) end serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true) function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient) From a115a32a2c8f4b36244bb12aa0694453686932f7 Mon Sep 17 00:00:00 2001 From: krynju Date: Thu, 18 Nov 2021 17:04:50 +0100 Subject: [PATCH 23/28] revert lock on serialize --- stdlib/Distributed/src/remotecall.jl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index da6eace4d1f8a..5ec30dbd52101 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -77,7 +77,9 @@ function test_existing_ref(r::AbstractRemoteRef) if fv_cache === nothing && rv_cache !== nothing # we have recd the value from another source, probably a deserialized ref, send a del_client message send_del_client(r) - @atomicreplace found.v nothing => rv_cache + @lock found.lock begin + @atomicreplace found.v nothing => rv_cache + end end end return found::typeof(r) @@ -360,7 +362,8 @@ end channel_type(rr::RemoteChannel{T}) where {T} = T function serialize(s::ClusterSerializer, f::Future) - @lock f.lock serialize(s, f, f.v === nothing) + v_cache = @atomic f.v + serialize(s, f, v_cache === nothing) end serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true) function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient) From 83ecafaf251ff6c47cd0f993ba72a6964ed5f8a6 Mon Sep 17 00:00:00 2001 From: krynju Date: Thu, 18 Nov 2021 19:01:14 +0100 Subject: [PATCH 24/28] change assert to error, seems like the isready check doesn't check properly due to the channel being removed at some point --- stdlib/Distributed/src/remotecall.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 5ec30dbd52101..218fbe0eda84f 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -675,7 +675,7 @@ end function set_future_cache(r::Future, v) _, ok = @atomicreplace r.v nothing => Some(v) - @assert ok "internal consistency error detected for Future" + ok || error("internal consistency error detected for Future") end function put_future(rid, v, caller) From 67da4d51156de2c4fc5a6be64ec1d7b8f44d8e95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krystian=20Guli=C5=84ski?= Date: Sat, 20 Nov 2021 09:32:46 +0100 Subject: [PATCH 25/28] Update stdlib/Distributed/src/remotecall.jl Co-authored-by: Jameson Nash --- stdlib/Distributed/src/remotecall.jl | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 218fbe0eda84f..c2287565cdd5a 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -362,12 +362,22 @@ end channel_type(rr::RemoteChannel{T}) where {T} = T function serialize(s::ClusterSerializer, f::Future) - v_cache = @atomic f.v - serialize(s, f, v_cache === nothing) + serialize_type(s, typeof(f)) + serialize(s, f.where) + serialize(s, remoteref_id(f)) + value = @atomic f.v + if value === nothing + p = worker_id_from_socket(s.io) + (p !== rr.where) && send_add_client(rr, p) + end + serialize(s, value) +end +function serialize(s::ClusterSerializer, f::RemoteChannel) + p = worker_id_from_socket(s.io) + (p !== rr.where) && send_add_client(rr, p) + invoke(serialize, Tuple{AbstractSerializer, Any}, s, f) end -serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true) -function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient) - if addclient +serialize(s::AbstractSerializer, ::AbstractLock) = error("Locks cannot be serialized") p = worker_id_from_socket(s.io) (p !== rr.where) && send_add_client(rr, p) end From 59d910a8f1ae129a866bc18f015cb001f889e58c Mon Sep 17 00:00:00 2001 From: krynju Date: Sat, 20 Nov 2021 16:55:10 +0100 Subject: [PATCH 26/28] Revert "Update stdlib/Distributed/src/remotecall.jl" This reverts commit 67da4d51156de2c4fc5a6be64ec1d7b8f44d8e95. --- stdlib/Distributed/src/remotecall.jl | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index c2287565cdd5a..218fbe0eda84f 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -362,22 +362,12 @@ end channel_type(rr::RemoteChannel{T}) where {T} = T function serialize(s::ClusterSerializer, f::Future) - serialize_type(s, typeof(f)) - serialize(s, f.where) - serialize(s, remoteref_id(f)) - value = @atomic f.v - if value === nothing - p = worker_id_from_socket(s.io) - (p !== rr.where) && send_add_client(rr, p) - end - serialize(s, value) -end -function serialize(s::ClusterSerializer, f::RemoteChannel) - p = worker_id_from_socket(s.io) - (p !== rr.where) && send_add_client(rr, p) - invoke(serialize, Tuple{AbstractSerializer, Any}, s, f) + v_cache = @atomic f.v + serialize(s, f, v_cache === nothing) end -serialize(s::AbstractSerializer, ::AbstractLock) = error("Locks cannot be serialized") +serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true) +function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient) + if addclient p = worker_id_from_socket(s.io) (p !== rr.where) && send_add_client(rr, p) end From 4d73d3318dbe5693a82c41a2262a7bb8cfa0ee76 Mon Sep 17 00:00:00 2001 From: krynju Date: Sun, 21 Nov 2021 10:19:37 +0100 Subject: [PATCH 27/28] serialize a copy of the future --- stdlib/Distributed/src/remotecall.jl | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 218fbe0eda84f..4dddd6f811cad 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -363,21 +363,29 @@ channel_type(rr::RemoteChannel{T}) where {T} = T function serialize(s::ClusterSerializer, f::Future) v_cache = @atomic f.v - serialize(s, f, v_cache === nothing) -end -serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true) -function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient) - if addclient + if v_cache === nothing p = worker_id_from_socket(s.io) - (p !== rr.where) && send_add_client(rr, p) + (p !== f.where) && send_add_client(f, p) end + invoke(serialize, Tuple{ClusterSerializer, Any}, s, f.where) + invoke(serialize, Tuple{ClusterSerializer, Any}, s, f.whence) + invoke(serialize, Tuple{ClusterSerializer, Any}, s, f.id) + invoke(serialize, Tuple{ClusterSerializer, Any}, s, v_cache) +end + +function serialize(s::ClusterSerializer, rr::RemoteChannel) + p = worker_id_from_socket(s.io) + (p !== rr.where) && send_add_client(rr, p) invoke(serialize, Tuple{ClusterSerializer, Any}, s, rr) end function deserialize(s::ClusterSerializer, t::Type{<:Future}) - f = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t) - fv_cache = @atomic f.v - f2 = Future(f.where, RRID(f.whence, f.id), fv_cache) # ctor adds to client_refs table + where = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, Int) + whence = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, Int) + id = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, Int) + v_cache = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, Union{Some{Any}, Nothing}) + + f2 = Future(where, RRID(whence, id), v_cache) # ctor adds to client_refs table # 1) send_add_client() is not executed when the ref is being serialized # to where it exists, hence do it here. From 19fd304d2ab0cfa952753abd751607d09e94b171 Mon Sep 17 00:00:00 2001 From: krynju Date: Sun, 21 Nov 2021 10:28:10 +0100 Subject: [PATCH 28/28] future copy serialization --- stdlib/Distributed/src/remotecall.jl | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 4dddd6f811cad..c8f21d7791b12 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -367,10 +367,8 @@ function serialize(s::ClusterSerializer, f::Future) p = worker_id_from_socket(s.io) (p !== f.where) && send_add_client(f, p) end - invoke(serialize, Tuple{ClusterSerializer, Any}, s, f.where) - invoke(serialize, Tuple{ClusterSerializer, Any}, s, f.whence) - invoke(serialize, Tuple{ClusterSerializer, Any}, s, f.id) - invoke(serialize, Tuple{ClusterSerializer, Any}, s, v_cache) + fc = Future((f.where, f.whence, f.id, v_cache)) # copy to be used for serialization (contains a reset lock) + invoke(serialize, Tuple{ClusterSerializer, Any}, s, fc) end function serialize(s::ClusterSerializer, rr::RemoteChannel) @@ -380,12 +378,8 @@ function serialize(s::ClusterSerializer, rr::RemoteChannel) end function deserialize(s::ClusterSerializer, t::Type{<:Future}) - where = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, Int) - whence = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, Int) - id = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, Int) - v_cache = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, Union{Some{Any}, Nothing}) - - f2 = Future(where, RRID(whence, id), v_cache) # ctor adds to client_refs table + fc = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t) # deserialized copy + f2 = Future(fc.where, RRID(fc.whence, fc.id), fc.v) # ctor adds to client_refs table # 1) send_add_client() is not executed when the ref is being serialized # to where it exists, hence do it here.