Skip to content

Commit fa54c1a

Browse files
karajan1001skshetry
authored andcommitted
Solve run_all flaky tests
1. Modify run all to include currently running exps. 2. bump dvc-task to 0.1.5
1 parent 9fbbfd0 commit fa54c1a

File tree

5 files changed

+21
-5
lines changed

5 files changed

+21
-5
lines changed

dvc/repo/experiments/__init__.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import time
55
from typing import Dict, Iterable, Optional
66

7-
from funcy import cached_property, first
7+
from funcy import cached_property, chain, first
88

99
from dvc.exceptions import DvcException
1010
from dvc.ui import ui
@@ -171,7 +171,18 @@ def reproduce_celery(
171171
) -> Dict[str, str]:
172172
results: Dict[str, str] = {}
173173
if entries is None:
174-
entries = list(self.celery_queue.iter_queued())
174+
entries = list(
175+
chain(
176+
self.celery_queue.iter_active(),
177+
self.celery_queue.iter_queued(),
178+
)
179+
)
180+
181+
logger.debug(
182+
"reproduce all these entries '%s'",
183+
entries,
184+
)
185+
175186
if not entries:
176187
return results
177188

dvc/repo/experiments/queue/celery.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ def _iter_queued(self) -> Generator[_MessageEntry, None, None]:
182182
continue
183183
args, kwargs, _embed = msg.decode()
184184
entry_dict = kwargs.get("entry_dict", args[0])
185+
logger.debug("Found queued task %s", entry_dict["stash_rev"])
185186
yield _MessageEntry(msg, QueueEntry.from_dict(entry_dict))
186187

187188
def _iter_processed(self) -> Generator[_MessageEntry, None, None]:
@@ -198,6 +199,7 @@ def _iter_active_tasks(self) -> Generator[_TaskEntry, None, None]:
198199
task_id = msg.headers["id"]
199200
result: AsyncResult = AsyncResult(task_id)
200201
if not result.ready():
202+
logger.debug("Found active task %s", entry.stash_rev)
201203
yield _TaskEntry(result, entry)
202204

203205
def _iter_done_tasks(self) -> Generator[_TaskEntry, None, None]:
@@ -206,6 +208,7 @@ def _iter_done_tasks(self) -> Generator[_TaskEntry, None, None]:
206208
task_id = msg.headers["id"]
207209
result: AsyncResult = AsyncResult(task_id)
208210
if result.ready():
211+
logger.debug("Found done task %s", entry.stash_rev)
209212
yield _TaskEntry(result, entry)
210213

211214
def iter_active(self) -> Generator[QueueEntry, None, None]:

dvc/repo/experiments/run.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ def run(
2929
of `repro` for that experiment.
3030
"""
3131
if run_all:
32-
entries = list(repo.experiments.celery_queue.iter_queued())
33-
return repo.experiments.reproduce_celery(entries, jobs=jobs)
32+
return repo.experiments.reproduce_celery(jobs=jobs)
3433

3534
hydra_sweep = None
3635
if params:

tests/func/experiments/test_experiments.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ def test_run_celery(tmp_dir, scm, dvc, exp_stage, mocker):
536536
repro_spy = mocker.spy(dvc.experiments, "reproduce_celery")
537537
results = dvc.experiments.run(run_all=True)
538538
assert len(results) == 2
539-
repro_spy.assert_called_once_with(entries=mocker.ANY, jobs=1)
539+
repro_spy.assert_called_once_with(jobs=1)
540540

541541
expected = {"foo: 2", "foo: 3"}
542542
metrics = set()

tests/func/experiments/test_show.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,9 @@ def _get_rev_isotimestamp(rev):
646646
result1 = dvc.experiments.run(exp_stage.addressing, params=["foo=2"])
647647
rev1 = first(result1)
648648
ref_info1 = first(exp_refs_by_rev(scm, rev1))
649+
650+
# at least 1 second gap between these experiments to make sure
651+
# the previous experiment to be regarded as branch_base
649652
time.sleep(1)
650653
result2 = dvc.experiments.run(exp_stage.addressing, params=["foo=3"])
651654
rev2 = first(result2)

0 commit comments

Comments
 (0)