46
46
from listeners import StatisticsWatcher
47
47
48
48
49
- class TcpPortDispatcher :
50
- """ Helper class holds available and occupied TCP port ranges. This ranges
51
- intended to distributes between workers.
52
- """
53
- def __init__ (self , range_count ):
54
- lowest_port = 3000
55
- highest_port = 59999
56
- port_count = highest_port - lowest_port + 1
57
- range_size = port_count // range_count
58
-
59
- self .available_ranges = set ()
60
- for i in range (range_count ):
61
- start_port = lowest_port + i * range_size
62
- end_port = start_port + range_size - 1
63
- tcp_port_range = (start_port , end_port )
64
- self .available_ranges .add (tcp_port_range )
65
-
66
- self .acquired_ranges = dict ()
67
-
68
- def acquire_range (self , _id ):
69
- tcp_port_range = self .available_ranges .pop ()
70
- self .acquired_ranges [_id ] = tcp_port_range
71
- return tcp_port_range
72
-
73
- def release_range (self , _id ):
74
- tcp_port_range = self .acquired_ranges .pop (_id )
75
- self .available_ranges .add (tcp_port_range )
76
-
77
-
78
49
class Dispatcher :
79
50
"""Run specified count of worker processes ('max_workers_cnt' arg), pass
80
51
task IDs (via 'task_queue'), receive results and output (via
@@ -136,8 +107,6 @@ def __init__(self, task_groups, max_workers_cnt, randomize):
136
107
self .worker_id_to_pid = dict ()
137
108
138
109
self .randomize = randomize
139
- self .tcp_port_dispatcher = TcpPortDispatcher (
140
- range_count = max_workers_cnt )
141
110
142
111
def terminate_all_workers (self ):
143
112
for process in self .processes :
@@ -235,10 +204,7 @@ def add_worker(self):
235
204
# find_nonempty_task_queue_disp()
236
205
if self .workers_cnt >= self .max_workers_cnt :
237
206
return False
238
- tcp_port_range = self .tcp_port_dispatcher .acquire_range (
239
- self .worker_next_id )
240
- process = task_queue_disp .add_worker (self .worker_next_id ,
241
- tcp_port_range )
207
+ process = task_queue_disp .add_worker (self .worker_next_id )
242
208
self .processes .append (process )
243
209
self .pids .append (process .pid )
244
210
self .pid_to_worker_id [process .pid ] = self .worker_next_id
@@ -255,7 +221,6 @@ def del_worker(self, worker_id):
255
221
task_queue_disp = self .get_task_queue_disp (worker_id )
256
222
task_queue_disp .del_worker (worker_id )
257
223
self .workers_cnt -= 1
258
- self .tcp_port_dispatcher .release_range (worker_id )
259
224
260
225
self .pids .remove (pid )
261
226
del self .worker_id_to_pid [worker_id ]
@@ -412,24 +377,22 @@ def __init__(self, key, task_group, randomize):
412
377
self .done = False
413
378
self .done_task_ids = set ()
414
379
415
- def _run_worker (self , worker_id , tcp_port_range ):
380
+ def _run_worker (self , worker_id ):
416
381
"""Entry function for worker processes."""
417
382
os .environ ['TEST_RUN_WORKER_ID' ] = str (worker_id )
418
- os .environ ['TEST_RUN_TCP_PORT_START' ] = str (tcp_port_range [0 ])
419
- os .environ ['TEST_RUN_TCP_PORT_END' ] = str (tcp_port_range [1 ])
420
383
color_stdout .queue = self .result_queue
421
384
worker = self .gen_worker (worker_id )
422
385
sampler .set_queue (self .result_queue , worker_id , worker .name )
423
386
worker .run_all (self .task_queue , self .result_queue )
424
387
425
- def add_worker (self , worker_id , tcp_port_range ):
388
+ def add_worker (self , worker_id ):
426
389
# Note: each of our workers should consume only one None, but for the
427
390
# case of abnormal circumstances we listen for processes termination
428
391
# (method 'check_for_dead_processes') and for time w/o output from
429
392
# workers (class 'HangWatcher').
430
393
self .task_queue .put (None ) # 'stop worker' marker
431
394
432
- entry = functools .partial (self ._run_worker , worker_id , tcp_port_range )
395
+ entry = functools .partial (self ._run_worker , worker_id )
433
396
434
397
self .worker_ids .add (worker_id )
435
398
process = multiprocessing .Process (target = entry )
0 commit comments