95
95
@enum WorkerState W_CREATED W_CONNECTED W_TERMINATING W_TERMINATED
96
96
mutable struct Worker
97
97
id:: Int
98
- msg_lock:: Threads.ReentrantLock # Lock for del_msgs, add_msgs, and gcflag
99
98
del_msgs:: Array{Any,1}
100
99
add_msgs:: Array{Any,1}
101
100
gcflag:: Bool
102
101
state:: WorkerState
103
- c_state:: Threads. Condition # wait for state changes, lock for state
104
- ct_time:: Float64 # creation time
105
- conn_func:: Any # used to setup connections lazily
102
+ c_state:: Condition # wait for state changes
103
+ ct_time:: Float64 # creation time
104
+ conn_func:: Any # used to setup connections lazily
106
105
107
106
r_stream:: IO
108
107
w_stream:: IO
@@ -134,7 +133,7 @@ mutable struct Worker
134
133
if haskey (map_pid_wrkr, id)
135
134
return map_pid_wrkr[id]
136
135
end
137
- w= new (id, Threads . ReentrantLock (), [], [], false , W_CREATED, Threads . Condition (), time (), conn_func)
136
+ w= new (id, [], [], false , W_CREATED, Condition (), time (), conn_func)
138
137
w. initialized = Event ()
139
138
register_worker (w)
140
139
w
@@ -144,16 +143,12 @@ mutable struct Worker
144
143
end
145
144
146
145
function set_worker_state (w, state)
147
- lock (w. c_state) do
148
- w. state = state
149
- notify (w. c_state; all= true )
150
- end
146
+ w. state = state
147
+ notify (w. c_state; all= true )
151
148
end
152
149
153
150
function check_worker_state (w:: Worker )
154
- lock (w. c_state)
155
151
if w. state === W_CREATED
156
- unlock (w. c_state)
157
152
if ! isclusterlazy ()
158
153
if PGRP. topology === :all_to_all
159
154
# Since higher pids connect with lower pids, the remote worker
@@ -173,8 +168,6 @@ function check_worker_state(w::Worker)
173
168
errormonitor (t)
174
169
wait_for_conn (w)
175
170
end
176
- else
177
- unlock (w. c_state)
178
171
end
179
172
end
180
173
@@ -193,25 +186,13 @@ function exec_conn_func(w::Worker)
193
186
end
194
187
195
188
function wait_for_conn (w)
196
- lock (w. c_state)
197
189
if w. state === W_CREATED
198
- unlock (w. c_state)
199
190
timeout = worker_timeout () - (time () - w. ct_time)
200
191
timeout <= 0 && error (" peer $(w. id) has not connected to $(myid ()) " )
201
192
202
- T = Threads. @spawn begin
203
- sleep ($ timeout)
204
- lock (w. c_state) do
205
- notify (w. c_state; all= true )
206
- end
207
- end
208
- errormonitor (T)
209
- lock (w. c_state) do
210
- wait (w. c_state)
211
- w. state === W_CREATED && error (" peer $(w. id) didn't connect to $(myid ()) within $timeout seconds" )
212
- end
213
- else
214
- unlock (w. c_state)
193
+ @async (sleep (timeout); notify (w. c_state; all= true ))
194
+ wait (w. c_state)
195
+ w. state === W_CREATED && error (" peer $(w. id) didn't connect to $(myid ()) within $timeout seconds" )
215
196
end
216
197
nothing
217
198
end
@@ -490,10 +471,6 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
490
471
# The `launch` method should add an object of type WorkerConfig for every
491
472
# worker launched. It provides information required on how to connect
492
473
# to it.
493
-
494
- # FIXME : launched should be a Channel, launch_ntfy should be a Threads.Condition
495
- # but both are part of the public interface. This means we currently can't use
496
- # `Threads.@spawn` in the code below.
497
474
launched = WorkerConfig[]
498
475
launch_ntfy = Condition ()
499
476
@@ -506,10 +483,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
506
483
while true
507
484
if isempty (launched)
508
485
istaskdone (t_launch) && break
509
- @async begin
510
- sleep (1 )
511
- notify (launch_ntfy)
512
- end
486
+ @async (sleep (1 ); notify (launch_ntfy))
513
487
wait (launch_ntfy)
514
488
end
515
489
@@ -662,12 +636,7 @@ function create_worker(manager, wconfig)
662
636
# require the value of config.connect_at which is set only upon connection completion
663
637
for jw in PGRP. workers
664
638
if (jw. id != 1 ) && (jw. id < w. id)
665
- # wait for wl to join
666
- lock (jw. c_state) do
667
- if jw. state === W_CREATED
668
- wait (jw. c_state)
669
- end
670
- end
639
+ (jw. state === W_CREATED) && wait (jw. c_state)
671
640
push! (join_list, jw)
672
641
end
673
642
end
@@ -690,12 +659,7 @@ function create_worker(manager, wconfig)
690
659
end
691
660
692
661
for wl in wlist
693
- lock (wl. c_state) do
694
- if wl. state === W_CREATED
695
- # wait for wl to join
696
- wait (wl. c_state)
697
- end
698
- end
662
+ (wl. state === W_CREATED) && wait (wl. c_state)
699
663
push! (join_list, wl)
700
664
end
701
665
end
@@ -712,11 +676,7 @@ function create_worker(manager, wconfig)
712
676
@async manage (w. manager, w. id, w. config, :register )
713
677
# wait for rr_ntfy_join with timeout
714
678
timedout = false
715
- @async begin
716
- sleep ($ timeout)
717
- timedout = true
718
- put! (rr_ntfy_join, 1 )
719
- end
679
+ @async (sleep ($ timeout); timedout = true ; put! (rr_ntfy_join, 1 ))
720
680
wait (rr_ntfy_join)
721
681
if timedout
722
682
error (" worker did not connect within $timeout seconds" )
0 commit comments