Skip to content

Commit 0292fe2

Browse files
committed
Rerun failed fragile test
The patch is a bit dirty: tasks to rerun are added after 'stop worker' marker, so old worker is stopped and the new one is spawned to handle those tasks. See new FIXME comment for more information. How to verify: | # Ensure a test is marked as fragile. | $ grep tarantoolctl.test.lua test/app-tap/suite.ini | fragile = tarantoolctl.test.lua ; gh-5059 | | # Let the test fail randomly. | $ git diff | --- a/test/app-tap/tarantoolctl.test.lua | +++ b/test/app-tap/tarantoolctl.test.lua | @@ -643,4 +643,24 @@ test:test('filter_xlog', function(test) | end | end) | | -os.exit(test:check() == true and 0 or -1) | +local function string_to_unsigned(str) | + local byte_list = {string.byte(str, 1, #str)} | + local res = 0 | + for i = 1, #byte_list do | + res = bit.lshift(res, 8) | + res = bit.bor(res, byte_list[i]) | + end | + -- fix bitop's numbers range: signed 32-bit to unsigned 32-bit | + -- http://bitop.luajit.org/semantics.html | + res = (res < 0) and -res + 0x7fffffff or res | + return res | +end | + | +local RND_SEED_LEN = 4 | +local fh = require('fio').open('/dev/urandom', {'O_RDONLY'}) | +local seed_raw = fh:read(RND_SEED_LEN) | +fh:close() | +local seed = string_to_unsigned(seed_raw) | +math.randomseed(seed) | + | +os.exit(test:check() == true and math.random(0, 1) or 1) | | # Run several times to observe actual behaviour. | $ (cd test && ./test-run.py app-tap/tarantoolctl.test.lua) Part of tarantool/tarantool#5050
1 parent f0a3bd3 commit 0292fe2

File tree

2 files changed

+76
-3
lines changed

2 files changed

+76
-3
lines changed

dispatcher.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,15 @@ def init_listeners(self):
130130
not args.valgrind
131131
watch_fail = not lib.Options().args.is_force
132132

133+
rerun_watcher = listeners.RerunWatcher(self.can_add_task, self.add_task)
133134
log_output_watcher = listeners.LogOutputWatcher()
134135
self.statistics = listeners.StatisticsWatcher(
135136
log_output_watcher.get_logfile)
136137
self.artifacts = listeners.ArtifactsWatcher(
137138
log_output_watcher.get_logfile)
138139
output_watcher = listeners.OutputWatcher()
139-
self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts]
140+
self.listeners = [rerun_watcher, self.statistics, log_output_watcher,
141+
output_watcher, self.artifacts]
140142
if watch_fail:
141143
self.fail_watcher = listeners.FailWatcher(
142144
self.terminate_all_workers)
@@ -171,6 +173,8 @@ def find_nonempty_task_queue_disp(self):
171173
for task_queue_disp in task_queue_disps_rnd:
172174
if not task_queue_disp.is_parallel:
173175
continue
176+
# Don't add a worker when a task list is exhausted.
177+
# Some tasks may be running at the moment.
174178
if task_queue_disp.done:
175179
continue
176180
return task_queue_disp
@@ -179,7 +183,18 @@ def find_nonempty_task_queue_disp(self):
179183
for task_queue_disp in task_queue_disps_rnd:
180184
if len(task_queue_disp.worker_ids) > 0:
181185
continue
182-
if task_queue_disp.done:
186+
# Allow to add a worker even if there was a point when
187+
# all tasks were taken. It is possible that a task
188+
# will be added afterwards.
189+
#
190+
# FIXME: It is a kind of hack. A worker may read
191+
# 'stop worker' marker and stop, but than we'll start
192+
# a new worker, which will handle reruns. We should
193+
# keep a worker running while there is a possibility
194+
# that a task will be added again to the queue.
195+
#
196+
# NB: Don't forget to handle 'not is_force' case.
197+
if not task_queue_disp.undone_tasks():
183198
continue
184199
return task_queue_disp
185200
return None
@@ -234,6 +249,20 @@ def del_worker(self, worker_id):
234249
self.processes.remove(process)
235250
break
236251

252+
def can_add_task(self, worker_id):
253+
"""Whether given task group (determined by the worker id)
254+
allows to add a task dynamically (during tasks
255+
processing).
256+
257+
It is prerequisite to call <add_task>().
258+
"""
259+
task_queue_disp = self.get_task_queue_disp(worker_id)
260+
return task_queue_disp.can_add_task()
261+
262+
def add_task(self, worker_id, task_id):
263+
task_queue_disp = self.get_task_queue_disp(worker_id)
264+
task_queue_disp.add_task(task_id)
265+
237266
def mark_task_done(self, worker_id, task_id):
238267
task_queue_disp = self.get_task_queue_disp(worker_id)
239268
task_queue_disp.mark_task_done(task_id)
@@ -277,7 +306,8 @@ def wait(self):
277306

278307
objs = self.invoke_listeners(inputs, ready_inputs)
279308
for obj in objs:
280-
if isinstance(obj, WorkerTaskResult):
309+
if isinstance(obj, WorkerTaskResult) and \
310+
obj.short_status != 'transient fail':
281311
self.mark_task_done(obj.worker_id, obj.task_id)
282312
elif isinstance(obj, WorkerDone):
283313
self.del_worker(obj.worker_id)
@@ -409,6 +439,12 @@ def del_worker(self, worker_id):
409439
# with add-del workers
410440
self.done = True
411441

442+
def can_add_task(self):
443+
return self.key.endswith('_fragile')
444+
445+
def add_task(self, task_id):
446+
self.task_queue.put(task_id)
447+
412448
def mark_task_done(self, task_id):
413449
self.done_task_ids.add(task_id)
414450

listeners.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,43 @@ def process_timeout(self, delta_seconds):
2828
pass
2929

3030

31+
class RerunWatcher(BaseWatcher):
32+
def __init__(self, can_add_task, add_task):
33+
self._can_add_task = can_add_task
34+
self._add_task = add_task
35+
self._fail_counters = dict()
36+
self._fail_limit = 3
37+
38+
def _count_fail(self, obj):
39+
if obj.task_id not in self._fail_counters:
40+
self._fail_counters[obj.task_id] = 0
41+
self._fail_counters[obj.task_id] += 1
42+
43+
def _can_rerun(self, task_id):
44+
return self._fail_counters[task_id] < self._fail_limit
45+
46+
def process_result(self, obj):
47+
if not isinstance(obj, WorkerTaskResult):
48+
return
49+
50+
if obj.short_status != 'fail':
51+
return
52+
53+
if not self._can_add_task(obj.worker_id):
54+
return
55+
56+
self._count_fail(obj)
57+
58+
if self._can_rerun(obj.task_id):
59+
color_stdout('Schedule task %s to run again\n' % str(obj.task_id),
60+
schema='test_var')
61+
obj.short_status = 'transient fail'
62+
self._add_task(obj.worker_id, obj.task_id)
63+
else:
64+
color_stdout('Task %s reaches fail limit (%d)\n' %
65+
(str(obj.task_id), self._fail_limit), schema='test_fail')
66+
67+
3168
class StatisticsWatcher(BaseWatcher):
3269
def __init__(self, get_logfile):
3370
self.stats = dict()

0 commit comments

Comments
 (0)