Skip to content

dvc: add size/nfiles for deps/outs #4836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions dvc/cache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def _save_file(self, path_info, tree, hash_info, save_link=True, **kwargs):
# we need to update path and cache, since in case of reflink,
# or copy cache type moving original file results in updates on
# next executed command, which causes md5 recalculation
self.tree.state.save(path_info, hash_info.value)
self.tree.state.save(path_info, hash_info)
else:
if self.changed_cache(hash_info):
with tree.open(path_info, mode="rb") as fobj:
Expand All @@ -241,7 +241,7 @@ def _save_file(self, path_info, tree, hash_info, save_link=True, **kwargs):
if callback:
callback(1)

self.tree.state.save(cache_info, hash_info.value)
self.tree.state.save(cache_info, hash_info)

def _cache_is_copy(self, path_info):
"""Checks whether cache uses copies."""
Expand Down Expand Up @@ -287,6 +287,7 @@ def _get_dir_info_hash(self, dir_info):
hash_info = self.tree.get_file_hash(to_info)
hash_info.value += self.tree.CHECKSUM_DIR_SUFFIX
hash_info.dir_info = self._to_dict(dir_info)
hash_info.nfiles = len(dir_info)

return hash_info, to_info

Expand All @@ -305,7 +306,7 @@ def save_dir_info(self, dir_info, hash_info=None):
self.tree.makedirs(new_info.parent)
self.tree.move(tmp_info, new_info, mode=self.CACHE_MODE)

self.tree.state.save(new_info, hi.value)
self.tree.state.save(new_info, hi)

return hi

Expand All @@ -326,10 +327,10 @@ def _save_dir(self, path_info, tree, hash_info, save_link=True, **kwargs):
if save_link:
self.tree.state.save_link(path_info)
if self.tree.exists(path_info):
self.tree.state.save(path_info, hi.value)
self.tree.state.save(path_info, hi)

cache_info = self.tree.hash_to_path_info(hi.value)
self.tree.state.save(cache_info, hi.value)
self.tree.state.save(cache_info, hi)

@use_state
def save(self, path_info, tree, hash_info, save_link=True, **kwargs):
Expand Down Expand Up @@ -461,7 +462,7 @@ def _checkout_file(

self.link(cache_info, path_info)
self.tree.state.save_link(path_info)
self.tree.state.save(path_info, hash_info.value)
self.tree.state.save(path_info, hash_info)
if progress_callback:
progress_callback(str(path_info))

Expand Down Expand Up @@ -501,7 +502,7 @@ def _checkout_dir(
modified = True
self.safe_remove(entry_info, force=force)
self.link(entry_cache_info, entry_info)
self.tree.state.save(entry_info, entry_hash)
self.tree.state.save(entry_info, entry_hash_info)
if progress_callback:
progress_callback(str(entry_info))

Expand All @@ -511,7 +512,7 @@ def _checkout_dir(
)

self.tree.state.save_link(path_info)
self.tree.state.save(path_info, hash_info.value)
self.tree.state.save(path_info, hash_info)

# relink is not modified, assume it as nochange
return added, not added and modified and not relink
Expand Down Expand Up @@ -690,9 +691,20 @@ def _merge_dirs(self, ancestor_info, our_info, their_info):

# Sorting the list by path to ensure reproducibility
return sorted(
self._from_dict(merged), key=itemgetter(self.tree.PARAM_RELPATH)
self._from_dict(merged), key=itemgetter(self.tree.PARAM_RELPATH),
)

def _get_dir_size(self, dir_info):
def _getsize(entry):
return self.tree.getsize(
self.tree.hash_to_path_info(entry[self.tree.PARAM_CHECKSUM])
)

try:
return sum(_getsize(entry) for entry in dir_info)
except FileNotFoundError:
return None

def merge(self, ancestor_info, our_info, their_info):
assert our_info
assert their_info
Expand All @@ -706,7 +718,9 @@ def merge(self, ancestor_info, our_info, their_info):
their = self.get_dir_cache(their_info)

merged = self._merge_dirs(ancestor, our, their)
return self.save_dir_info(merged)
hash_info = self.save_dir_info(merged)
hash_info.size = self._get_dir_size(merged)
return hash_info

@use_state
def get_hash(self, tree, path_info):
Expand All @@ -715,9 +729,12 @@ def get_hash(self, tree, path_info):
assert hash_info.name == self.tree.PARAM_CHECKSUM
return hash_info

return self.save_dir_info(hash_info.dir_info, hash_info)
hi = self.save_dir_info(hash_info.dir_info, hash_info)
hi.size = hash_info.size
return hi

def set_dir_info(self, hash_info):
assert hash_info.isdir

hash_info.dir_info = self._to_dict(self.get_dir_cache(hash_info))
hash_info.nfiles = len(hash_info.dir_info)
3 changes: 2 additions & 1 deletion dvc/cache/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@ def pull(self, named_cache, remote, jobs=None, show_checksums=False):
# be removed upon status, while files corrupted during
# download will not be moved from tmp_file
# (see `BaseTree.download()`)
self.tree.state.save(cache_file, checksum)
hash_info = HashInfo(self.tree.PARAM_CHECKSUM, checksum)
self.tree.state.save(cache_file, hash_info)

return ret

Expand Down
28 changes: 24 additions & 4 deletions dvc/hash_info.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,46 @@
from collections import OrderedDict
from dataclasses import dataclass, field

HASH_DIR_SUFFIX = ".dir"


@dataclass
class HashInfo:
PARAM_SIZE = "size"
PARAM_NFILES = "nfiles"

name: str
value: str
dir_info: dict = field(default=None, compare=False)
size: int = field(default=None, compare=False)
nfiles: int = field(default=None, compare=False)

def __bool__(self):
return bool(self.value)

@classmethod
def from_dict(cls, d):
if not d:
_d = d.copy() if d else {}
size = _d.pop(cls.PARAM_SIZE, None)
nfiles = _d.pop(cls.PARAM_NFILES, None)

if not _d:
return cls(None, None)
((name, value),) = d.items()
return cls(name, value)

((name, value),) = _d.items()
return cls(name, value, size=size, nfiles=nfiles)

def to_dict(self):
return {self.name: self.value} if self else {}
ret = OrderedDict()
if not self:
return ret

ret[self.name] = self.value
if self.size is not None:
ret[self.PARAM_SIZE] = self.size
if self.nfiles is not None:
ret[self.PARAM_NFILES] = self.nfiles
return ret

@property
def isdir(self):
Expand Down
3 changes: 3 additions & 0 deletions dvc/output/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from funcy import collecting, project
from voluptuous import And, Any, Coerce, Length, Lower, Required, SetTo

from dvc.hash_info import HashInfo
from dvc.output.base import BaseOutput
from dvc.output.gs import GSOutput
from dvc.output.hdfs import HDFSOutput
Expand Down Expand Up @@ -59,6 +60,8 @@
SCHEMA[BaseOutput.PARAM_METRIC] = BaseOutput.METRIC_SCHEMA
SCHEMA[BaseOutput.PARAM_PLOT] = bool
SCHEMA[BaseOutput.PARAM_PERSIST] = bool
SCHEMA[HashInfo.PARAM_SIZE] = int
SCHEMA[HashInfo.PARAM_NFILES] = int


def _get(
Expand Down
11 changes: 9 additions & 2 deletions dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,15 @@ def _check_can_merge(self, out):
my = self.dumpd()
other = out.dumpd()

my.pop(self.tree.PARAM_CHECKSUM)
other.pop(self.tree.PARAM_CHECKSUM)
ignored = [
self.tree.PARAM_CHECKSUM,
HashInfo.PARAM_SIZE,
HashInfo.PARAM_NFILES,
]

for opt in ignored:
my.pop(opt, None)
other.pop(opt, None)

if my != other:
raise MergeError(
Expand Down
8 changes: 7 additions & 1 deletion dvc/schema.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from voluptuous import Any, Optional, Required, Schema

from dvc import dependency, output
from dvc.hash_info import HashInfo
from dvc.output import CHECKSUMS_SCHEMA, BaseOutput
from dvc.parsing import FOREACH_KWD, IN_KWD, SET_KWD, USE_KWD, VARS_KWD
from dvc.stage.params import StageParams
Expand All @@ -18,7 +19,12 @@
StageParams.PARAM_ALWAYS_CHANGED: bool,
}

DATA_SCHEMA = {**CHECKSUMS_SCHEMA, Required("path"): str}
DATA_SCHEMA = {
**CHECKSUMS_SCHEMA,
Required("path"): str,
HashInfo.PARAM_SIZE: int,
HashInfo.PARAM_NFILES: int,
}
LOCK_FILE_STAGE_SCHEMA = {
Required(StageParams.PARAM_CMD): str,
StageParams.PARAM_DEPS: [DATA_SCHEMA],
Expand Down
6 changes: 5 additions & 1 deletion dvc/stage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ def __init__(self, stage):


def _get_cache_hash(cache, key=False):
from dvc.hash_info import HashInfo

if key:
cache["outs"] = [out["path"] for out in cache.get("outs", [])]
return dict_sha256(cache)
return dict_sha256(
cache, exclude=[HashInfo.PARAM_SIZE, HashInfo.PARAM_NFILES]
)


def _get_stage_hash(stage):
Expand Down
18 changes: 9 additions & 9 deletions dvc/stage/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,18 @@ def to_pipeline_file(stage: "PipelineStage"):
def to_single_stage_lockfile(stage: "Stage") -> dict:
assert stage.cmd

def _dumpd(item):
ret = [
(item.PARAM_PATH, item.def_path),
*item.hash_info.to_dict().items(),
]

return OrderedDict(ret)
Comment on lines +137 to +143
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should eventually turn into a special Dumper for outputs or into Output.dumpd_lock(or smth like that). We've discussed it before.


res = OrderedDict([("cmd", stage.cmd)])
params, deps = split_params_deps(stage)
deps, outs = [
[
OrderedDict(
[
(PARAM_PATH, item.def_path),
*item.hash_info.to_dict().items(),
]
)
for item in sort_by_path(items)
]
[_dumpd(item) for item in sort_by_path(items)]
for items in [deps, stage.outs]
]
params = _serialize_params_values(params)
Expand Down
5 changes: 5 additions & 0 deletions dvc/stage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dvc.utils.fs import path_isin

from ..dependency import ParamsDependency
from ..hash_info import HashInfo
from ..tree.local import LocalTree
from ..tree.s3 import S3Tree
from ..utils import dict_md5, format_link, relpath
Expand Down Expand Up @@ -136,6 +137,8 @@ def stage_dump_eq(stage_cls, old_d, new_d):
for out in outs:
out.pop(LocalTree.PARAM_CHECKSUM, None)
out.pop(S3Tree.PARAM_CHECKSUM, None)
out.pop(HashInfo.PARAM_SIZE, None)
out.pop(HashInfo.PARAM_NFILES, None)

# outs and deps are lists of dicts. To check equality, we need to make
# them independent of the order, so, we convert them to dicts.
Expand Down Expand Up @@ -171,6 +174,8 @@ def compute_md5(stage):
stage.PARAM_FROZEN,
BaseOutput.PARAM_METRIC,
BaseOutput.PARAM_PERSIST,
HashInfo.PARAM_SIZE,
HashInfo.PARAM_NFILES,
],
)

Expand Down
Loading