Skip to content

remote: use .dir checksum existence to infer file contents existence #3632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 14, 2020
Merged
76 changes: 69 additions & 7 deletions dvc/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,39 @@ def __init__(self, repo):
azure = _make_remote_property("azure")


class NamedCacheItem:
def __init__(self):
self.names = set()
self.children = defaultdict(NamedCacheItem)

def __eq__(self, other):
return self.names == other.names and self.children == other.children

def child_keys(self):
for key, child in self.children.items():
yield key
yield from child.child_keys()

def child_names(self):
for key, child in self.children.items():
yield key, child.names
yield from child.child_names()

def add(self, checksum, item):
self.children[checksum].update(item)

def update(self, item, suffix=""):
if suffix:
self.names.update(n + suffix for n in item.names)
else:
self.names.update(item.names)
for checksum, child_item in item.children.items():
self.children[checksum].update(child_item)


class NamedCache(object):
def __init__(self):
self._items = defaultdict(lambda: defaultdict(set))
self._items = defaultdict(lambda: defaultdict(NamedCacheItem))
self.external = defaultdict(set)

@classmethod
Expand All @@ -86,19 +116,51 @@ def __getitem__(self, key):
return self._items[key]

def add(self, scheme, checksum, name):
self._items[scheme][checksum].add(name)
"""Add a mapped name for the specified checksum."""
self._items[scheme][checksum].names.add(name)

def add_child_cache(self, checksum, cache, suffix=""):
"""Add/update child cache for the specified checksum."""
for scheme, src in cache._items.items():
dst = self._items[scheme][checksum].children
for child_checksum, item in src.items():
dst[child_checksum].update(item, suffix=suffix)

for repo_pair, files in cache.external.items():
self.external[repo_pair].update(files)

def add_external(self, url, rev, path):
self.external[url, rev].add(path)

def update(self, cache, suffix=""):
for scheme, src in cache._items.items():
dst = self._items[scheme]
for checksum, names in src.items():
if suffix:
dst[checksum].update(n + suffix for n in names)
else:
dst[checksum].update(names)
for checksum, item in src.items():
dst[checksum].update(item, suffix=suffix)

for repo_pair, files in cache.external.items():
self.external[repo_pair].update(files)

def scheme_keys(self, scheme):
"""Iterate over a flat list of all keys for the specified scheme,
including children.
"""
for key, item in self._items[scheme].items():
yield key
yield from item.child_keys()

def scheme_names(self, scheme):
"""Iterate over a flat list of checksum, names items for the specified
scheme, including children.
"""
for key, item in self._items[scheme].items():
yield key, item.names
yield from item.child_names()

def dir_keys(self, scheme):
return (
key for key, item in self._items[scheme].items() if item.children
)

def child_keys(self, scheme, checksum):
return self._items[scheme][checksum].child_keys()
9 changes: 5 additions & 4 deletions dvc/data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,13 @@ def pull(self, cache, jobs=None, remote=None, show_checksums=False):
return downloaded_items_num

def _save_pulled_checksums(self, cache):
for checksum in cache["local"].keys():
for checksum in cache.scheme_keys("local"):
cache_file = self.repo.cache.local.checksum_to_path_info(checksum)
if self.repo.cache.local.exists(cache_file):
# We can safely save here, as existing corrupted files will be
# removed upon status, while files corrupted during download
# will not be moved from tmp_file (see `RemoteBASE.download()`)
# We can safely save here, as existing corrupted files will
# be removed upon status, while files corrupted during
# download will not be moved from tmp_file
# (see `RemoteBASE.download()`)
self.repo.state.save(cache_file, checksum)

def status(self, cache, jobs=None, remote=None, show_checksums=False):
Expand Down
4 changes: 3 additions & 1 deletion dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,9 @@ def get_used_cache(self, **kwargs):
if not self.is_dir_checksum:
return ret

ret.update(self._collect_used_dir_cache(**kwargs))
ret.add_child_cache(
self.checksum, self._collect_used_dir_cache(**kwargs),
)

return ret

Expand Down
12 changes: 8 additions & 4 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,14 +732,18 @@ def all(self, jobs=None, name=None):
)

def gc(self, named_cache, jobs=None):
logger.debug("named_cache: {} jobs: {}".format(named_cache, jobs))
used = self.extract_used_local_checksums(named_cache)

if self.scheme != "":
used.update(named_cache[self.scheme])
used.update(named_cache.scheme_keys(self.scheme))

removed = False
for checksum in self.all(jobs, str(self.path_info)):
# checksums must be sorted to ensure we always remove .dir files first
for checksum in sorted(
self.all(jobs, str(self.path_info)),
key=self.is_dir_checksum,
reverse=True,
):
if checksum in used:
continue
path_info = self.checksum_to_path_info(checksum)
Expand Down Expand Up @@ -1247,7 +1251,7 @@ def _get_unpacked_dir_names(self, checksums):
return set()

def extract_used_local_checksums(self, named_cache):
used = set(named_cache["local"])
used = set(named_cache.scheme_keys("local"))
unpacked = self._get_unpacked_dir_names(used)
return used | unpacked

Expand Down
160 changes: 136 additions & 24 deletions dvc/remote/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import logging
import os
import stat
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed, ThreadPoolExecutor
from functools import partial

from funcy import concat

from shortuuid import uuid

from dvc.compat import fspath_py35
Expand Down Expand Up @@ -255,37 +257,102 @@ def status(
show_checksums=False,
download=False,
):
# Return flattened dict containing all status info
dir_status, file_status, _ = self._status(
named_cache,
remote,
jobs=jobs,
show_checksums=show_checksums,
download=download,
)
return dict(dir_status, **file_status)

def _status(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit late to the party :) excellent PR overall. will mention a few things that I noticed reviewing that will might help later somehow.

this function definitely wants some split I would say... extract presentation logic out for example, and the that checks dir and excludes it? or may be some other refactoring ... but it's too long and complicated now

overall "internal-client" functions should be easy to read, even if it feels that extraction does not make much sense (e.g. helpers that are used only in a single place) think how will developers read this. It's easier to read story-like main function and go into details if needed

self,
named_cache,
remote,
jobs=None,
show_checksums=False,
download=False,
):
"""Return a tuple of (dir_status_info, file_status_info, dir_mapping).

dir_status_info contains status for .dir files, file_status_info
contains status for all other files, and dir_mapping is a dict of
{dir_path_info: set(file_path_info...)} which can be used to map
a .dir file to its file contents.
"""
logger.debug(
"Preparing to collect status from {}".format(remote.path_info)
)
md5s = list(named_cache[self.scheme])
md5s = set(named_cache.scheme_keys(self.scheme))

logger.debug("Collecting information from local cache...")
local_exists = self.cache_exists(md5s, jobs=jobs, name=self.cache_dir)
local_exists = frozenset(
self.cache_exists(md5s, jobs=jobs, name=self.cache_dir)
)

# This is a performance optimization. We can safely assume that,
# if the resources that we want to fetch are already cached,
# there's no need to check the remote storage for the existence of
# those files.
if download and sorted(local_exists) == sorted(md5s):
if download and local_exists == md5s:
remote_exists = local_exists
else:
logger.debug("Collecting information from remote cache...")
remote_exists = list(
remote.cache_exists(
md5s, jobs=jobs, name=str(remote.path_info)
remote_exists = set()
dir_md5s = set(named_cache.dir_keys(self.scheme))
if dir_md5s:
# If .dir checksum exists on the remote, assume directory
# contents also exists on the remote
for dir_checksum in remote._cache_object_exists(dir_md5s):
file_checksums = list(
named_cache.child_keys(self.scheme, dir_checksum)
)
logger.debug(
"'{}' exists on remote, "
"assuming '{}' files also exist".format(
dir_checksum, len(file_checksums)
)
)
md5s.remove(dir_checksum)
remote_exists.add(dir_checksum)
md5s.difference_update(file_checksums)
remote_exists.update(file_checksums)
if md5s:
remote_exists.update(
remote.cache_exists(
md5s, jobs=jobs, name=str(remote.path_info)
)
)
)

ret = {
checksum: {"name": checksum if show_checksums else " ".join(names)}
for checksum, names in named_cache[self.scheme].items()
}
self._fill_statuses(ret, local_exists, remote_exists)
def make_names(checksum, names):
return {"name": checksum if show_checksums else " ".join(names)}

dir_status = {}
file_status = {}
dir_paths = {}
for checksum, item in named_cache[self.scheme].items():
if item.children:
dir_status[checksum] = make_names(checksum, item.names)
file_status.update(
{
child_checksum: make_names(child_checksum, child.names)
for child_checksum, child in item.children.items()
}
)
dir_paths[remote.checksum_to_path_info(checksum)] = frozenset(
map(remote.checksum_to_path_info, item.child_keys())
)
else:
file_status[checksum] = make_names(checksum, item.names)

self._fill_statuses(dir_status, local_exists, remote_exists)
self._fill_statuses(file_status, local_exists, remote_exists)

self._log_missing_caches(ret)
self._log_missing_caches(dict(dir_status, **file_status))

return ret
return dir_status, file_status, dir_paths

@staticmethod
def _fill_statuses(checksum_info_dir, local_exists, remote_exists):
Expand Down Expand Up @@ -347,31 +414,76 @@ def _process(
if jobs is None:
jobs = remote.JOBS

status_info = self.status(
dir_status, file_status, dir_paths = self._status(
named_cache,
remote,
jobs=jobs,
show_checksums=show_checksums,
download=download,
)

plans = self._get_plans(download, remote, status_info, status)
dir_plans = self._get_plans(download, remote, dir_status, status)
file_plans = self._get_plans(download, remote, file_status, status)

if len(plans[0]) == 0:
if len(dir_plans[0]) + len(file_plans[0]) == 0:
return 0

if jobs > 1:
with ThreadPoolExecutor(max_workers=jobs) as executor:
fails = sum(executor.map(func, *plans))
else:
fails = sum(map(func, *plans))
with ThreadPoolExecutor(max_workers=jobs) as executor:
if download:
fails = sum(executor.map(func, *dir_plans))
fails += sum(executor.map(func, *file_plans))
else:
# for uploads, push files first, and any .dir files last

file_futures = {}
for from_info, to_info, name in zip(*file_plans):
file_futures[to_info] = executor.submit(
func, from_info, to_info, name
)
dir_futures = {}
for from_info, to_info, name in zip(*dir_plans):
wait_futures = {
future
for file_path, future in file_futures.items()
if file_path in dir_paths[to_info]
}
dir_futures[to_info] = executor.submit(
self._dir_upload,
func,
wait_futures,
from_info,
to_info,
name,
)
fails = sum(
future.result()
for future in concat(
file_futures.values(), dir_futures.values()
)
)

if fails:
if download:
raise DownloadError(fails)
raise UploadError(fails)

return len(plans[0])
return len(dir_plans[0]) + len(file_plans[0])

@staticmethod
def _dir_upload(func, futures, from_info, to_info, name):
for future in as_completed(futures):
if future.result():
# do not upload this .dir file if any file in this
# directory failed to upload
logger.debug(
"failed to upload full contents of '{}', "
"aborting .dir file upload".format(name)
)
logger.error(
"failed to upload '{}' to '{}'".format(from_info, to_info)
)
return 1
return func(from_info, to_info, name)

def push(self, named_cache, remote, jobs=None, show_checksums=False):
return self._process(
Expand Down
5 changes: 3 additions & 2 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,9 @@ def used_cache(
`all_branches`/`all_tags`/`all_commits` to expand the scope.

Returns:
A dictionary with Schemes (representing output's location) as keys,
and a list with the outputs' `dumpd` as values.
A dictionary with Schemes (representing output's location) mapped
to items containing the output's `dumpd` names and the output's
children (if the given output is a directory).
"""
from dvc.cache import NamedCache

Expand Down
1 change: 0 additions & 1 deletion dvc/repo/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from dvc.scm.base import CloneError
from dvc.path_info import PathInfo


logger = logging.getLogger(__name__)


Expand Down
Loading