Skip to content

Commit 9df4370

Browse files
committed
dvc: introduce local build cache
This patch introduces `.dvc/cache/stages` that is used to store previous runs and their results, which could then be reused later when we stumble upon the same command with the same deps and outs. Format of build cache entries is single-line json, which is readable by humans and might also be used for lock files discussed in #1871. Related to #1871 Local part of #1234
1 parent 907853b commit 9df4370

File tree

10 files changed

+223
-17
lines changed

10 files changed

+223
-17
lines changed

dvc/repo/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def __init__(self, root_dir=None):
7272
from dvc.repo.params import Params
7373
from dvc.scm.tree import WorkingTree
7474
from dvc.utils.fs import makedirs
75+
from dvc.stage.cache import StageCache
7576

7677
root_dir = self.find_root(root_dir)
7778

@@ -104,6 +105,8 @@ def __init__(self, root_dir=None):
104105
self.cache = Cache(self)
105106
self.cloud = DataCloud(self)
106107

108+
self.stage_cache = StageCache(self.cache.local.cache_dir)
109+
107110
self.metrics = Metrics(self)
108111
self.params = Params(self)
109112

dvc/repo/reproduce.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,7 @@ def reproduce(
9797

9898

9999
def _reproduce_stages(
100-
G,
101-
stages,
102-
downstream=False,
103-
ignore_build_cache=False,
104-
single_item=False,
105-
**kwargs
100+
G, stages, downstream=False, single_item=False, **kwargs
106101
):
107102
r"""Derive the evaluation of the given node for the given graph.
108103
@@ -172,7 +167,7 @@ def _reproduce_stages(
172167
try:
173168
ret = _reproduce_stage(stage, **kwargs)
174169

175-
if len(ret) != 0 and ignore_build_cache:
170+
if len(ret) != 0 and kwargs.get("ignore_build_cache", False):
176171
# NOTE: we are walking our pipeline from the top to the
177172
# bottom. If one stage is changed, it will be reproduced,
178173
# which tells us that we should force reproducing all of

dvc/repo/run.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ def run(self, fname=None, no_exec=False, **kwargs):
6868
raise OutputDuplicationError(exc.output, set(exc.stages) - {stage})
6969

7070
if not no_exec:
71-
stage.run(no_commit=kwargs.get("no_commit", False))
71+
stage.run(
72+
no_commit=kwargs.get("no_commit", False),
73+
ignore_build_cache=kwargs.get("ignore_build_cache", False),
74+
)
7275
dvcfile.dump(stage, update_pipeline=True)
7376
return stage

dvc/stage/__init__.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,8 @@ def save(self):
491491

492492
self.md5 = self._compute_md5()
493493

494+
self.repo.stage_cache.save(self)
495+
494496
@staticmethod
495497
def _changed_entries(entries):
496498
return [
@@ -617,7 +619,9 @@ def _run(self):
617619
raise StageCmdFailedError(self)
618620

619621
@rwlocked(read=["deps"], write=["outs"])
620-
def run(self, dry=False, no_commit=False, force=False):
622+
def run(
623+
self, dry=False, no_commit=False, force=False, ignore_build_cache=False
624+
):
621625
if (self.cmd or self.is_import) and not self.locked and not dry:
622626
self.remove_outs(ignore_remove=False, force=False)
623627

@@ -650,16 +654,20 @@ def run(self, dry=False, no_commit=False, force=False):
650654
self.check_missing_outputs()
651655

652656
else:
653-
logger.info("Running command:\n\t{}".format(self.cmd))
654657
if not dry:
658+
if not force and not ignore_build_cache:
659+
self.repo.stage_cache.restore(self)
660+
655661
if (
656662
not force
657663
and not self.is_callback
658664
and not self.always_changed
659665
and self._already_cached()
660666
):
667+
logger.info("Stage is cached, skipping.")
661668
self.checkout()
662669
else:
670+
logger.info("Running command:\n\t{}".format(self.cmd))
663671
self._run()
664672

665673
if not dry:

dvc/stage/cache.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import os
2+
import yaml
3+
import logging
4+
5+
from voluptuous import Schema, Required, Invalid
6+
7+
from dvc.utils.fs import makedirs
8+
from dvc.utils import relpath, dict_sha256
9+
10+
logger = logging.getLogger(__name__)
11+
12+
SCHEMA = Schema(
13+
{
14+
Required("cmd"): str,
15+
Required("deps"): {str: str},
16+
Required("outs"): {str: str},
17+
}
18+
)
19+
20+
21+
def _get_cache_hash(cache, key=False):
22+
return dict_sha256(
23+
{
24+
"cmd": cache["cmd"],
25+
"deps": cache["deps"],
26+
"outs": list(cache["outs"].keys()) if key else cache["outs"],
27+
}
28+
)
29+
30+
31+
def _get_stage_hash(stage):
32+
if not stage.cmd or not stage.deps or not stage.outs:
33+
return None
34+
35+
for dep in stage.deps:
36+
if dep.scheme != "local" or not dep.def_path or not dep.get_checksum():
37+
return None
38+
39+
for out in stage.outs:
40+
if out.scheme != "local" or not out.def_path or out.persist:
41+
return None
42+
43+
return _get_cache_hash(_create_cache(stage), key=True)
44+
45+
46+
def _create_cache(stage):
47+
return {
48+
"cmd": stage.cmd,
49+
"deps": {dep.def_path: dep.get_checksum() for dep in stage.deps},
50+
"outs": {out.def_path: out.get_checksum() for out in stage.outs},
51+
}
52+
53+
54+
class StageCache:
55+
def __init__(self, cache_dir):
56+
self.cache_dir = os.path.join(cache_dir, "stages")
57+
58+
def _get_cache_dir(self, key):
59+
return os.path.join(self.cache_dir, key[:2], key)
60+
61+
def _get_cache_path(self, key, value):
62+
return os.path.join(self._get_cache_dir(key), value)
63+
64+
def _load_cache(self, key, value):
65+
path = self._get_cache_path(key, value)
66+
67+
try:
68+
with open(path, "r") as fobj:
69+
return SCHEMA(yaml.safe_load(fobj))
70+
except FileNotFoundError:
71+
return None
72+
except (yaml.error.YAMLError, Invalid):
73+
logger.warning("corrupted cache file '%s'.", relpath(path))
74+
os.unlink(path)
75+
return None
76+
77+
def _load(self, stage):
78+
key = _get_stage_hash(stage)
79+
if not key:
80+
return None
81+
82+
cache_dir = self._get_cache_dir(key)
83+
if not os.path.exists(cache_dir):
84+
return None
85+
86+
for value in os.listdir(cache_dir):
87+
cache = self._load_cache(key, value)
88+
if cache:
89+
return cache
90+
91+
return None
92+
93+
def save(self, stage):
94+
cache_key = _get_stage_hash(stage)
95+
if not cache_key:
96+
return
97+
98+
cache = _create_cache(stage)
99+
cache_value = _get_cache_hash(cache)
100+
101+
if self._load_cache(cache_key, cache_value):
102+
return
103+
104+
# sanity check
105+
SCHEMA(cache)
106+
107+
path = self._get_cache_path(cache_key, cache_value)
108+
dpath = os.path.dirname(path)
109+
makedirs(dpath, exist_ok=True)
110+
with open(path, "w+") as fobj:
111+
yaml.dump(cache, fobj)
112+
113+
def restore(self, stage):
114+
cache = self._load(stage)
115+
if not cache:
116+
return
117+
118+
deps = {dep.def_path: dep for dep in stage.deps}
119+
for def_path, checksum in cache["deps"].items():
120+
deps[def_path].checksum = checksum
121+
122+
outs = {out.def_path: out for out in stage.outs}
123+
for def_path, checksum in cache["outs"].items():
124+
outs[def_path].checksum = checksum

dvc/utils/__init__.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ def file_md5(fname):
7676
return (None, None)
7777

7878

79-
def bytes_md5(byts):
80-
hasher = hashlib.md5()
79+
def bytes_hash(byts, typ):
80+
hasher = getattr(hashlib, typ)()
8181
hasher.update(byts)
8282
return hasher.hexdigest()
8383

@@ -100,10 +100,18 @@ def dict_filter(d, exclude=()):
100100
return d
101101

102102

103-
def dict_md5(d, exclude=()):
103+
def dict_hash(d, typ, exclude=()):
104104
filtered = dict_filter(d, exclude)
105105
byts = json.dumps(filtered, sort_keys=True).encode("utf-8")
106-
return bytes_md5(byts)
106+
return bytes_hash(byts, typ)
107+
108+
109+
def dict_md5(d, **kwargs):
110+
return dict_hash(d, "md5", **kwargs)
111+
112+
113+
def dict_sha256(d, **kwargs):
114+
return dict_hash(d, "sha256", **kwargs)
107115

108116

109117
def _split(list_to_split, chunk_size):

tests/func/test_gc.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import shutil
23
import os
34

45
import configobj
@@ -341,6 +342,7 @@ def test_gc_not_collect_pipeline_tracked_files(tmp_dir, dvc, run_copy):
341342
tmp_dir.gen("bar", "bar")
342343

343344
run_copy("foo", "foo2", name="copy")
345+
shutil.rmtree(dvc.stage_cache.cache_dir)
344346
assert _count_files(dvc.cache.local.cache_dir) == 1
345347
dvc.gc(workspace=True, force=True)
346348
assert _count_files(dvc.cache.local.cache_dir) == 1

tests/func/test_repro.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1295,7 +1295,9 @@ def test(self):
12951295
["repro", self._get_stage_target(self.stage), "--no-commit"]
12961296
)
12971297
self.assertEqual(ret, 0)
1298-
self.assertFalse(os.path.exists(self.dvc.cache.local.cache_dir))
1298+
self.assertEqual(
1299+
os.listdir(self.dvc.cache.local.cache_dir), ["stages"]
1300+
)
12991301

13001302

13011303
class TestReproAlreadyCached(TestRepro):

tests/unit/test_stage.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
import signal
23
import subprocess
34
import threading
@@ -51,8 +52,6 @@ def test_meta_ignored():
5152

5253
class TestPathConversion(TestCase):
5354
def test(self):
54-
import os
55-
5655
stage = Stage(None, "path")
5756

5857
stage.wdir = os.path.join("..", "..")
@@ -103,3 +102,39 @@ def test_always_changed(dvc):
103102
with dvc.lock:
104103
assert stage.changed()
105104
assert stage.status()["path"] == ["always changed"]
105+
106+
107+
def test_stage_cache(tmp_dir, dvc, run_copy, mocker):
108+
tmp_dir.gen("dep", "dep")
109+
stage = run_copy("dep", "out")
110+
111+
with dvc.lock, dvc.state:
112+
stage.remove(remove_outs=True, force=True)
113+
114+
assert not (tmp_dir / "out").exists()
115+
assert not (tmp_dir / "out.dvc").exists()
116+
117+
cache_dir = os.path.join(
118+
dvc.stage_cache.cache_dir,
119+
"ec",
120+
"ec5b6d8dea9136dbb62d93a95c777f87e6c54b0a6bee839554acb99fdf23d2b1",
121+
)
122+
cache_file = os.path.join(
123+
cache_dir,
124+
"09f9eb17fdb1ee7f8566b3c57394cee060eaf28075244bc6058612ac91fdf04a",
125+
)
126+
127+
assert os.path.isdir(cache_dir)
128+
assert os.listdir(cache_dir) == [os.path.basename(cache_file)]
129+
assert os.path.isfile(cache_file)
130+
131+
run_spy = mocker.spy(stage, "_run")
132+
checkout_spy = mocker.spy(stage, "checkout")
133+
with dvc.lock, dvc.state:
134+
stage.run()
135+
136+
assert not run_spy.called
137+
assert checkout_spy.call_count == 1
138+
139+
assert (tmp_dir / "out").exists()
140+
assert (tmp_dir / "out").read_text() == "dep"

tests/unit/utils/test_utils.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from dvc.path_info import PathInfo
77
from dvc.utils import (
88
file_md5,
9+
dict_sha256,
910
resolve_output,
1011
fix_env,
1112
relpath,
@@ -155,3 +156,28 @@ def test_hint_on_lockfile():
155156
with pytest.raises(Exception) as exc:
156157
assert parse_target("pipelines.lock:name")
157158
assert "pipelines.yaml:name" in str(exc.value)
159+
160+
161+
@pytest.mark.parametrize(
162+
"d,sha",
163+
[
164+
(
165+
{
166+
"cmd": "echo content > out",
167+
"deps": {"dep": "2254342becceafbd04538e0a38696791"},
168+
"outs": {"out": "f75b8179e4bbe7e2b4a074dcef62de95"},
169+
},
170+
"f472eda60f09660a4750e8b3208cf90b3a3b24e5f42e0371d829710e9464d74a",
171+
),
172+
(
173+
{
174+
"cmd": "echo content > out",
175+
"deps": {"dep": "2254342becceafbd04538e0a38696791"},
176+
"outs": ["out"],
177+
},
178+
"a239b67073bd58affcdb81fff3305d1726c6e7f9c86f3d4fca0e92e8147dc7b0",
179+
),
180+
],
181+
)
182+
def test_dict_sha256(d, sha):
183+
assert dict_sha256(d) == sha

0 commit comments

Comments
 (0)