Skip to content

Commit d62a54c

Browse files
authored
push/pull: properly collect run cache (#3768)
Fixes #1234
1 parent c922da4 commit d62a54c

File tree

5 files changed

+48
-25
lines changed

5 files changed

+48
-25
lines changed

dvc/data_cloud.py

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,7 @@ def _init_remote(self, name):
4848
return Remote(self.repo, name=name)
4949

5050
def push(
51-
self,
52-
cache,
53-
jobs=None,
54-
remote=None,
55-
show_checksums=False,
56-
run_cache=False,
51+
self, cache, jobs=None, remote=None, show_checksums=False,
5752
):
5853
"""Push data items in a cloud-agnostic way.
5954
@@ -67,20 +62,12 @@ def push(
6762
"""
6863
remote = self.get_remote(remote, "push")
6964

70-
if run_cache:
71-
self.repo.stage_cache.push(remote)
72-
7365
return self.repo.cache.local.push(
7466
cache, jobs=jobs, remote=remote, show_checksums=show_checksums,
7567
)
7668

7769
def pull(
78-
self,
79-
cache,
80-
jobs=None,
81-
remote=None,
82-
show_checksums=False,
83-
run_cache=False,
70+
self, cache, jobs=None, remote=None, show_checksums=False,
8471
):
8572
"""Pull data items in a cloud-agnostic way.
8673
@@ -94,9 +81,6 @@ def pull(
9481
"""
9582
remote = self.get_remote(remote, "pull")
9683

97-
if run_cache:
98-
self.repo.stage_cache.pull(remote)
99-
10084
downloaded_items_num = self.repo.cache.local.pull(
10185
cache, jobs=jobs, remote=remote, show_checksums=show_checksums
10286
)

dvc/repo/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ def used_cache(
248248
force=False,
249249
jobs=None,
250250
recursive=False,
251+
used_run_cache=None,
251252
):
252253
"""Get the stages related to the given target and collect
253254
the `info` of its outputs.
@@ -291,6 +292,12 @@ def used_cache(
291292
)
292293
cache.update(used_cache, suffix=suffix)
293294

295+
if used_run_cache:
296+
used_cache = self.stage_cache.get_used_cache(
297+
used_run_cache, remote=remote, force=force, jobs=jobs,
298+
)
299+
cache.update(used_cache)
300+
294301
return cache
295302

296303
def _collect_graph(self, stages):

dvc/repo/fetch.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ def _fetch(
3434
config.NoRemoteError: thrown when downloading only local files and no
3535
remote is configured
3636
"""
37+
38+
used_run_cache = self.stage_cache.pull(remote) if run_cache else []
39+
3740
used = self.used_cache(
3841
targets,
3942
all_branches=all_branches,
@@ -44,18 +47,15 @@ def _fetch(
4447
remote=remote,
4548
jobs=jobs,
4649
recursive=recursive,
50+
used_run_cache=used_run_cache,
4751
)
4852

4953
downloaded = 0
5054
failed = 0
5155

5256
try:
5357
downloaded += self.cloud.pull(
54-
used,
55-
jobs,
56-
remote=remote,
57-
show_checksums=show_checksums,
58-
run_cache=run_cache,
58+
used, jobs, remote=remote, show_checksums=show_checksums,
5959
)
6060
except NoRemoteError:
6161
if not used.external and used["local"]:

dvc/repo/push.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ def push(
1414
all_commits=False,
1515
run_cache=False,
1616
):
17+
used_run_cache = self.stage_cache.push(remote) if run_cache else []
18+
1719
used = self.used_cache(
1820
targets,
1921
all_branches=all_branches,
@@ -24,6 +26,7 @@ def push(
2426
remote=remote,
2527
jobs=jobs,
2628
recursive=recursive,
29+
used_run_cache=used_run_cache,
2730
)
2831

29-
return self.cloud.push(used, jobs, remote=remote, run_cache=run_cache)
32+
return self.cloud.push(used, jobs, remote=remote)

dvc/stage/cache.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,11 @@ def restore(self, stage):
106106

107107
@staticmethod
108108
def _transfer(func, from_remote, to_remote):
109+
ret = []
110+
109111
runs = from_remote.path_info / "runs"
110112
if not from_remote.exists(runs):
111-
return
113+
return []
112114

113115
for src in from_remote.walk_files(runs):
114116
rel = src.relative_to(from_remote.path_info)
@@ -118,9 +120,36 @@ def _transfer(func, from_remote, to_remote):
118120
if to_remote.exists(key) and first(to_remote.walk_files(key)):
119121
continue
120122
func(src, dst)
123+
ret.append((src.parent.name, src.name))
124+
125+
return ret
121126

122127
def push(self, remote):
128+
remote = self.repo.cloud.get_remote(remote)
123129
return self._transfer(remote.upload, self.repo.cache.local, remote)
124130

125131
def pull(self, remote):
132+
remote = self.repo.cloud.get_remote(remote)
126133
return self._transfer(remote.download, remote, self.repo.cache.local)
134+
135+
def get_used_cache(self, used_run_cache, *args, **kwargs):
136+
from dvc.cache import NamedCache
137+
from dvc.stage import create_stage, PipelineStage
138+
139+
cache = NamedCache()
140+
141+
for key, value in used_run_cache:
142+
entry = self._load_cache(key, value)
143+
if not entry:
144+
continue
145+
stage = create_stage(
146+
PipelineStage,
147+
repo=self.repo,
148+
path="dvc.yaml",
149+
cmd=entry["cmd"],
150+
deps=[dep["path"] for dep in entry["deps"]],
151+
outs=[out["path"] for out in entry["outs"]],
152+
)
153+
StageLoader.fill_from_lock(stage, entry)
154+
cache.update(stage.get_used_cache(*args, **kwargs))
155+
return cache

0 commit comments

Comments
 (0)