Skip to content

Commit 3f9e3ca

Browse files
committed
refactor threadpool to become more eventlet.GreenPool like
and to not directly subclass anything from threading. Also don't use threading.Thread but thread.start_thread as we are wrapping the thread anyway.
1 parent fa97df9 commit 3f9e3ca

File tree

5 files changed

+64
-51
lines changed

5 files changed

+64
-51
lines changed

execnet/multi.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -237,20 +237,20 @@ def safe_terminate(timeout, list_of_paired_functions):
237237
workerpool = WorkerPool(len(list_of_paired_functions)*2)
238238

239239
def termkill(termfunc, killfunc):
240-
termreply = workerpool.dispatch(termfunc)
240+
termreply = workerpool.spawn(termfunc)
241241
try:
242242
termreply.get(timeout=timeout)
243243
except IOError:
244244
killfunc()
245245

246246
replylist = []
247247
for termfunc, killfunc in list_of_paired_functions:
248-
reply = workerpool.dispatch(termkill, termfunc, killfunc)
248+
reply = workerpool.spawn(termkill, termfunc, killfunc)
249249
replylist.append(reply)
250250
for reply in replylist:
251251
reply.get()
252252
workerpool.shutdown()
253-
workerpool.join()
253+
workerpool.waitall()
254254

255255

256256
default_group = Group()

execnet/threadpool.py

+38-22
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44
(c) 2009, holger krekel
55
"""
66
import threading
7+
try:
8+
import thread
9+
except ImportError:
10+
import _thread as thread
11+
712
import time
813
import sys
914

@@ -22,7 +27,7 @@
2227
class Reply(object):
2328
""" reply instances provide access to the result
2429
of a function execution that got dispatched
25-
through WorkerPool.dispatch()
30+
through WorkerPool.spawn()
2631
"""
2732
_excinfo = None
2833
def __init__(self, task):
@@ -54,12 +59,11 @@ def get(self, timeout=None):
5459
reraise(excinfo[0], excinfo[1], excinfo[2]) # noqa
5560
return result
5661

57-
class WorkerThread(threading.Thread):
62+
class WorkerThread:
5863
def __init__(self, pool):
59-
threading.Thread.__init__(self)
6064
self._queue = queue.Queue()
6165
self._pool = pool
62-
self.setDaemon(1)
66+
self._finishevent = threading.Event()
6367

6468
def _run_once(self):
6569
reply = self._queue.get()
@@ -79,16 +83,23 @@ def _run_once(self):
7983
# at this point, reply, task and all other local variables go away
8084
return True
8185

86+
def start(self):
87+
self.id = thread.start_new_thread(self.run, ())
88+
89+
@property
90+
def dead(self):
91+
return self._finishevent.isSet()
92+
8293
def run(self):
8394
try:
8495
while self._run_once():
85-
self._pool._ready[self] = True
96+
self._pool._ready.add(self)
8697
finally:
87-
del self._pool._alive[self]
8898
try:
89-
del self._pool._ready[self]
99+
self._pool._ready.remove(self)
90100
except KeyError:
91101
pass
102+
self._finishevent.set()
92103

93104
def send(self, task):
94105
reply = Reply(task)
@@ -98,8 +109,11 @@ def send(self, task):
98109
def stop(self):
99110
self._queue.put(SystemExit)
100111

112+
def join(self, timeout=None):
113+
self._finishevent.wait(timeout)
114+
101115
class WorkerPool(object):
102-
""" A WorkerPool allows to dispatch function executions
116+
""" A WorkerPool allows to spawn function executions
103117
to threads. Each Worker Thread is reused for multiple
104118
function executions. The dispatching operation
105119
takes care to create and dispatch to existing
@@ -116,29 +130,30 @@ def __init__(self, maxthreads=None):
116130
create up to `maxthreads` worker threads.
117131
"""
118132
self.maxthreads = maxthreads
119-
self._ready = {}
120-
self._alive = {}
133+
self._running = set()
134+
self._ready = set()
121135

122-
def dispatch(self, func, *args, **kwargs):
136+
def spawn(self, func, *args, **kwargs):
123137
""" return Reply object for the asynchronous dispatch
124138
of the given func(*args, **kwargs) in a
125139
separate worker thread.
126140
"""
127141
if self._shuttingdown:
128142
raise IOError("WorkerPool is already shutting down")
129143
try:
130-
thread, _ = self._ready.popitem()
144+
thread = self._ready.pop()
131145
except KeyError: # pop from empty list
132-
if self.maxthreads and len(self._alive) >= self.maxthreads:
133-
raise IOError("can't create more than %d threads." %
146+
if self.maxthreads and len(self._running) >= self.maxthreads:
147+
raise IOError("maximum of %d threads are busy, "
148+
"can't create more." %
134149
(self.maxthreads,))
135150
thread = self._newthread()
136151
return thread.send((func, args, kwargs))
137152

138153
def _newthread(self):
139154
thread = WorkerThread(self)
140-
self._alive[thread] = True
141155
thread.start()
156+
self._running.add(thread)
142157
return thread
143158

144159
def shutdown(self):
@@ -147,22 +162,23 @@ def shutdown(self):
147162
"""
148163
if not self._shuttingdown:
149164
self._shuttingdown = True
150-
for t in list(self._alive):
165+
for t in self._running:
151166
t.stop()
152167

153-
def join(self, timeout=None):
168+
def waitall(self, timeout=None):
154169
""" wait until all worker threads have terminated. """
155170
deadline = delta = None
156171
if timeout is not None:
157172
deadline = time.time() + timeout
158-
for thread in list(self._alive):
173+
while self._running:
174+
thread = self._running.pop()
159175
if deadline:
160176
delta = deadline - time.time()
161177
if delta <= 0:
162178
raise IOError("timeout while joining threads")
163179
thread.join(timeout=delta)
164-
if thread.isAlive():
165-
raise IOError("timeout while joining threads")
180+
if not thread.dead:
181+
raise IOError("timeout while joining thread %s" % thread.id)
166182

167183
if __name__ == '__channelexec__':
168184
maxthreads = channel.receive() # noqa
@@ -176,7 +192,7 @@ def join(self, timeout=None):
176192
if task is None:
177193
gw._trace("thread-dispatcher got None, exiting")
178194
execpool.shutdown()
179-
execpool.join()
195+
execpool.waitall()
180196
raise gw._StopExecLoop
181197
gw._trace("dispatching exec task to thread pool")
182-
execpool.dispatch(gw.executetask, task)
198+
execpool.spawn(gw.executetask, task)

testing/test_termination.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def doit():
1818
return 17
1919

2020
pool = WorkerPool()
21-
reply = pool.dispatch(doit)
21+
reply = pool.spawn(doit)
2222
x = reply.get(timeout=5.0)
2323
assert x == 17
2424

@@ -102,7 +102,7 @@ def flush(self):
102102
popen = subprocess.Popen([str(anypython), str(p)], stdout=subprocess.PIPE)
103103
# sync with start-up
104104
popen.stdout.readline()
105-
reply = WorkerPool(1).dispatch(popen.communicate)
105+
reply = WorkerPool(1).spawn(popen.communicate)
106106
reply.get(timeout=50)
107107
out, err = capfd.readouterr()
108108
lines = [x for x in err.splitlines()

testing/test_threadpool.py

+19-24
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,21 @@ def f(i):
1414
while q.qsize():
1515
py.std.time.sleep(0.01)
1616
for i in range(num):
17-
pool.dispatch(f, i)
17+
pool.spawn(f, i)
1818
for i in range(num):
1919
q.get()
20-
assert len(pool._alive) == 4
20+
assert len(pool._running) == 4
2121
pool.shutdown()
22-
# XXX I replaced the following join() with a time.sleep(1), which seems
23-
# to fix the test on Windows, and doesn't break it on Linux... Completely
24-
# unsure what the idea is, though, so it would be nice if someone with some
25-
# more understanding of what happens here would either fix this better, or
26-
# remove this comment...
27-
# pool.join(timeout=1.0)
28-
py.std.time.sleep(1)
29-
assert len(pool._alive) == 0
30-
assert len(pool._ready) == 0
22+
pool.waitall(timeout=1.0)
23+
#py.std.time.sleep(1) helps on windows?
24+
assert len(pool._running) == 0
25+
assert len(pool._running) == 0
3126

3227
def test_get():
3328
pool = WorkerPool()
3429
def f():
3530
return 42
36-
reply = pool.dispatch(f)
31+
reply = pool.spawn(f)
3732
result = reply.get()
3833
assert result == 42
3934

@@ -42,15 +37,15 @@ def test_get_timeout():
4237
def f():
4338
py.std.time.sleep(0.2)
4439
return 42
45-
reply = pool.dispatch(f)
40+
reply = pool.spawn(f)
4641
with py.test.raises(IOError):
4742
reply.get(timeout=0.01)
4843

4944
def test_get_excinfo():
5045
pool = WorkerPool()
5146
def f():
5247
raise ValueError("42")
53-
reply = pool.dispatch(f)
48+
reply = pool.spawn(f)
5449
with py.test.raises(ValueError):
5550
reply.get(1.0)
5651
with pytest.raises(EOFError):
@@ -61,34 +56,34 @@ def test_maxthreads():
6156
def f():
6257
py.std.time.sleep(0.5)
6358
try:
64-
pool.dispatch(f)
65-
py.test.raises(IOError, pool.dispatch, f)
59+
pool.spawn(f)
60+
py.test.raises(IOError, pool.spawn, f)
6661
finally:
6762
pool.shutdown()
6863

69-
def test_join_timeout():
64+
def test_waitall_timeout():
7065
pool = WorkerPool()
7166
q = queue.Queue()
7267
def f():
7368
q.get()
74-
reply = pool.dispatch(f)
69+
reply = pool.spawn(f)
7570
pool.shutdown()
76-
py.test.raises(IOError, pool.join, 0.01)
71+
py.test.raises(IOError, pool.waitall, 0.01)
7772
q.put(None)
7873
reply.get(timeout=1.0)
79-
pool.join(timeout=0.1)
74+
pool.waitall(timeout=0.1)
8075

8176
@py.test.mark.skipif("not hasattr(os, 'dup')")
8277
def test_pool_clean_shutdown():
8378
capture = py.io.StdCaptureFD()
8479
pool = WorkerPool()
8580
def f():
8681
pass
87-
pool.dispatch(f)
88-
pool.dispatch(f)
82+
pool.spawn(f)
83+
pool.spawn(f)
8984
pool.shutdown()
90-
pool.join(timeout=1.0)
91-
assert not pool._alive
85+
pool.waitall(timeout=1.0)
86+
assert not pool._running
9287
assert not pool._ready
9388
out, err = capture.reset()
9489
print(out)

tox.ini

+2
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,5 @@ commands=
3131
[pytest]
3232
timeout = 20
3333
addopts = -rxXs
34+
rsyncdirs = execnet testing
35+

0 commit comments

Comments
 (0)