-
-
Notifications
You must be signed in to change notification settings - Fork 5.6k
[Distributed] Worker local race condition between put! and fetch for Futures #42339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9a9821d
81b3075
656546d
8d8712f
7c0dff0
64922da
389088b
7635442
50c4cd2
92188ba
306b6e4
07bbd7c
38b6419
c5045cf
0040080
87d30f6
4b634d3
2c78ed9
c93f743
c59408e
4b8d7da
3b68488
a115a32
83ecafa
67da4d5
59d910a
4d73d33
19fd304
b9eaef6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,12 +26,13 @@ mutable struct Future <: AbstractRemoteRef | |
where::Int | ||
whence::Int | ||
id::Int | ||
v::Union{Some{Any}, Nothing} | ||
lock::ReentrantLock | ||
@atomic v::Union{Some{Any}, Nothing} | ||
|
||
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) = | ||
(r = new(w,rrid.whence,rrid.id,v); return test_existing_ref(r)) | ||
(r = new(w,rrid.whence,rrid.id,ReentrantLock(),v); return test_existing_ref(r)) | ||
|
||
Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],t[4]) # Useful for creating dummy, zeroed-out instances | ||
Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],ReentrantLock(),t[4]) # Useful for creating dummy, zeroed-out instances | ||
end | ||
|
||
""" | ||
|
@@ -69,10 +70,17 @@ function test_existing_ref(r::AbstractRemoteRef) | |
found = getkey(client_refs, r, nothing) | ||
if found !== nothing | ||
@assert r.where > 0 | ||
if isa(r, Future) && found.v === nothing && r.v !== nothing | ||
# we have recd the value from another source, probably a deserialized ref, send a del_client message | ||
send_del_client(r) | ||
found.v = r.v | ||
if isa(r, Future) | ||
# this is only for copying the reference from Future to RemoteRef (just created) | ||
fv_cache = @atomic :acquire found.v | ||
rv_cache = @atomic :monotonic r.v | ||
if fv_cache === nothing && rv_cache !== nothing | ||
# we have recd the value from another source, probably a deserialized ref, send a del_client message | ||
send_del_client(r) | ||
@lock found.lock begin | ||
@atomicreplace found.v nothing => rv_cache | ||
end | ||
end | ||
end | ||
return found::typeof(r) | ||
end | ||
|
@@ -91,8 +99,9 @@ function finalize_ref(r::AbstractRemoteRef) | |
send_del_client_no_lock(r) | ||
else | ||
# send_del_client only if the reference has not been set | ||
r.v === nothing && send_del_client_no_lock(r) | ||
r.v = nothing | ||
v_cache = @atomic :monotonic r.v | ||
v_cache === nothing && send_del_client_no_lock(r) | ||
@atomic :monotonic r.v = nothing | ||
end | ||
r.where = 0 | ||
finally | ||
|
@@ -201,7 +210,8 @@ isready(f) # will not block | |
``` | ||
""" | ||
function isready(rr::Future) | ||
rr.v === nothing || return true | ||
v_cache = @atomic rr.v | ||
v_cache === nothing || return true | ||
|
||
rid = remoteref_id(rr) | ||
return if rr.where == myid() | ||
|
@@ -354,26 +364,33 @@ end | |
|
||
channel_type(rr::RemoteChannel{T}) where {T} = T | ||
|
||
serialize(s::ClusterSerializer, f::Future) = serialize(s, f, f.v === nothing) | ||
serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true) | ||
function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient) | ||
if addclient | ||
function serialize(s::ClusterSerializer, f::Future) | ||
v_cache = @atomic f.v | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems a bit aggressive to make serialize/deserialize full atomic barriers (instead of their defaults). @tkf any thoughts too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fyi this is just one leftover
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... Can There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking closer, the issue here is that we need a lock around There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. put a lock on |
||
if v_cache === nothing | ||
p = worker_id_from_socket(s.io) | ||
(p !== rr.where) && send_add_client(rr, p) | ||
(p !== f.where) && send_add_client(f, p) | ||
end | ||
fc = Future((f.where, f.whence, f.id, v_cache)) # copy to be used for serialization (contains a reset lock) | ||
invoke(serialize, Tuple{ClusterSerializer, Any}, s, fc) | ||
end | ||
|
||
function serialize(s::ClusterSerializer, rr::RemoteChannel) | ||
p = worker_id_from_socket(s.io) | ||
(p !== rr.where) && send_add_client(rr, p) | ||
invoke(serialize, Tuple{ClusterSerializer, Any}, s, rr) | ||
end | ||
|
||
function deserialize(s::ClusterSerializer, t::Type{<:Future}) | ||
f = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t) | ||
f2 = Future(f.where, RRID(f.whence, f.id), f.v) # ctor adds to client_refs table | ||
fc = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t) # deserialized copy | ||
f2 = Future(fc.where, RRID(fc.whence, fc.id), fc.v) # ctor adds to client_refs table | ||
|
||
# 1) send_add_client() is not executed when the ref is being serialized | ||
# to where it exists, hence do it here. | ||
# 2) If we have received a 'fetch'ed Future or if the Future ctor found an | ||
# already 'fetch'ed instance in client_refs (Issue #25847), we should not | ||
# track it in the backing RemoteValue store. | ||
if f2.where == myid() && f2.v === nothing | ||
f2v_cache = @atomic f2.v | ||
if f2.where == myid() && f2v_cache === nothing | ||
add_client(remoteref_id(f2), myid()) | ||
end | ||
f2 | ||
|
@@ -567,7 +584,7 @@ end | |
|
||
Wait for a value to become available for the specified [`Future`](@ref). | ||
""" | ||
wait(r::Future) = (r.v !== nothing && return r; call_on_owner(wait_ref, r, myid()); r) | ||
wait(r::Future) = (v_cache = @atomic r.v; v_cache !== nothing && return r; call_on_owner(wait_ref, r, myid()); r) | ||
|
||
""" | ||
wait(r::RemoteChannel, args...) | ||
|
@@ -584,11 +601,41 @@ Further calls to `fetch` on the same reference return the cached value. If the r | |
is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace. | ||
""" | ||
function fetch(r::Future) | ||
r.v !== nothing && return something(r.v) | ||
v = call_on_owner(fetch_ref, r) | ||
r.v = Some(v) | ||
v_cache = @atomic r.v | ||
v_cache !== nothing && return something(v_cache) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we lock There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. didn't add a lock here since it's supposed to be a quick cache check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant here exactly, as in after the quick lookup. For (my own) reference on performance: julia> mutable struct AtomicCounter
@atomic x::Int
end
julia> const ac = AtomicCounter(0);
julia> const r = Some(0);
julia> const r2 = Ref(0);
# measure operation overhead
julia> @btime (r2[] = something(r))
1.696 ns (0 allocations: 0 bytes)
# measure atomic load overhead
julia> @btime @atomic ac.x
1.696 ns (0 allocations: 0 bytes)
julia> @btime @atomicreplace ac.x 0 => 0
8.372 ns (0 allocations: 0 bytes)
julia> @btime @atomicreplace :monotonic ac.x 0 => 0
8.372 ns (0 allocations: 0 bytes)
julia> const lk2 = Base.ThreadSynchronizer();
julia> @btime (lock(lk2); unlock(lk2))
20.072 ns (0 allocations: 0 bytes)
julia> @btime (Base.iolock_begin(); Base.iolock_end())
18.390 ns (0 allocations: 0 bytes)
julia> Base.iolock_begin(); @btime (Base.iolock_begin(); Base.iolock_end()); Base.iolock_end()
12.467 ns (0 allocations: 0 bytes)
julia> const lk3 = Libc.malloc(40)
Ptr{Nothing} @0x0000555e99ac70c0
julia> ccall(:uv_mutex_init, Cint, (Ptr{Cvoid},), lk3)
0
julia> @btime (ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3); ccall(:uv_mutex_unlock, Cint, (Ptr{Cvoid},), lk3))
20.407 ns (0 allocations: 0 bytes)
julia> ccall(:uv_mutex_init_recursive, Cint, (Ptr{Cvoid},), lk3)
0
julia> @btime (ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3); ccall(:uv_mutex_unlock, Cint, (Ptr{Cvoid},), lk3))
23.412 ns (0 allocations: 0 bytes)
0
julia> ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3)
0
julia> @btime (ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3); ccall(:uv_mutex_unlock, Cint, (Ptr{Cvoid},), lk3))
12.392 ns (0 allocations: 0 bytes)
0
julia> const lk = ReentrantLock()
julia> @btime (lock(lk); unlock(lk))
56.520 ns (0 allocations: 0 bytes)
julia> lock(lk)
julia> @btime (lock(lk); unlock(lk))
12.405 ns (0 allocations: 0 bytes) (note: that last lock There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I put the whole thing in a I'm not sure what else could I improve here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, yeah, I see how it is complicated by the fact that we might be the owner, so we are waiting for someone else to set the value before we can return it here. But it seems like if this fails in the remote case, it is because there was a thread-synchronization error, which could happen because this lock was not being held across here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the call on owner case won't fail due to this lock, because it fetches on a remote channel this This looks safe to me as it is right now because of that, but I haven't dwelled that deep into the remote scenarios |
||
if r.where == myid() | ||
rv, v_cache = @lock r.lock begin | ||
v_cache = @atomic :monotonic r.v | ||
rv = v_cache === nothing ? lookup_ref(remoteref_id(r)) : nothing | ||
rv, v_cache | ||
end | ||
|
||
if v_cache !== nothing | ||
return something(v_cache) | ||
else | ||
v_local = fetch(rv.c) | ||
end | ||
else | ||
v_local = call_on_owner(fetch_ref, r) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could probably be locked, so that |
||
end | ||
|
||
v_cache = @atomic r.v | ||
|
||
if v_cache === nothing # call_on_owner case | ||
v_old, status = @lock r.lock begin | ||
@atomicreplace r.v nothing => Some(v_local) | ||
end | ||
# status == true - when value obtained through call_on_owner | ||
# status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated | ||
# why? local put! performs caching and putting into channel under r.lock | ||
|
||
# for local put! use the cached value, for call_on_owner cases just take the v_local as it was just cached in r.v | ||
v_cache = status ? v_local : v_old | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vtjnash Just not sure here. So when a fetch gets the value from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure here. Since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's correct. The |
||
end | ||
|
||
send_del_client(r) | ||
v | ||
something(v_cache) | ||
end | ||
|
||
fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...) | ||
|
@@ -612,12 +659,30 @@ A `put!` on an already set `Future` throws an `Exception`. | |
All asynchronous remote calls return `Future`s and set the | ||
value to the return value of the call upon completion. | ||
""" | ||
function put!(rr::Future, v) | ||
rr.v !== nothing && error("Future can be set only once") | ||
call_on_owner(put_future, rr, v, myid()) | ||
rr.v = Some(v) | ||
rr | ||
function put!(r::Future, v) | ||
if r.where == myid() | ||
rid = remoteref_id(r) | ||
rv = lookup_ref(rid) | ||
isready(rv) && error("Future can be set only once") | ||
@lock r.lock begin | ||
put!(rv, v) # this notifies the tasks waiting on the channel in fetch | ||
set_future_cache(r, v) # set the cache before leaving the lock, so that the notified tasks already see it cached | ||
end | ||
del_client(rid, myid()) | ||
else | ||
@lock r.lock begin # same idea as above if there were any local tasks fetching on this Future | ||
call_on_owner(put_future, r, v, myid()) | ||
set_future_cache(r, v) | ||
end | ||
end | ||
r | ||
end | ||
|
||
function set_future_cache(r::Future, v) | ||
_, ok = @atomicreplace r.v nothing => Some(v) | ||
ok || error("internal consistency error detected for Future") | ||
end | ||
|
||
function put_future(rid, v, caller) | ||
krynju marked this conversation as resolved.
Show resolved
Hide resolved
|
||
rv = lookup_ref(rid) | ||
isready(rv) && error("Future can be set only once") | ||
|
Uh oh!
There was an error while loading. Please reload this page.