Skip to content

RepoTree: get rid if implicit fetching #5324

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions dvc/cache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,7 @@ def _save_file(self, path_info, tree, hash_info, save_link=True, **kwargs):
else:
if self.changed_cache(hash_info):
with tree.open(path_info, mode="rb") as fobj:
# if tree has fetch enabled, DVC out will be fetched on
# open and we do not need to read/copy any data
if not (
tree.isdvc(path_info, strict=False) and tree.fetch
):
self.tree.upload_fobj(fobj, cache_info)
self.tree.upload_fobj(fobj, cache_info)
callback = kwargs.get("download_callback")
if callback:
callback(1)
Expand Down Expand Up @@ -444,7 +439,9 @@ def save(self, path_info, tree, hash_info, save_link=True, **kwargs):
)

if not hash_info:
hash_info = tree.get_hash(path_info, **kwargs)
kw = kwargs.copy()
kw.pop("download_callback", None)
Comment on lines +442 to +443
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is temporary, we'll start computing hash on-the-fly in the followup PRs.

hash_info = tree.get_hash(path_info, **kw)
if not hash_info:
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), path_info
Expand Down
4 changes: 1 addition & 3 deletions dvc/dependency/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ def _make_repo(self, *, locked=True, **kwargs):
return external_repo(d["url"], rev=rev, **kwargs)

def _get_hash(self, locked=True):
# we want stream but not fetch, so DVC out directories are
# walked, but dir contents is not fetched
with self._make_repo(locked=locked, fetch=False, stream=True) as repo:
with self._make_repo(locked=locked) as repo:
path_info = PathInfo(repo.root_dir) / self.def_path
return repo.repo_tree.get_hash(path_info, follow_subrepos=False)

Expand Down
3 changes: 0 additions & 3 deletions dvc/external_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ def make_repo(path, **_kwargs):
**kwargs,
)

if "fetch" not in repo_kwargs:
repo_kwargs["fetch"] = True

repo = Repo(**repo_kwargs)

try:
Expand Down
6 changes: 1 addition & 5 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ def __init__(
config=None,
url=None,
repo_factory=None,
fetch=None,
stream=None,
):
from dvc.cache import Cache
from dvc.config import Config
Expand All @@ -148,8 +146,6 @@ def __init__(

self.url = url
self._tree_conf = {
"stream": stream,
"fetch": fetch,
"repo_factory": repo_factory,
}

Expand Down Expand Up @@ -458,7 +454,7 @@ def open_by_relpath(self, path, remote=None, mode="r", encoding=None):
"""Opens a specified resource as a file descriptor"""
from dvc.tree.repo import RepoTree

tree = RepoTree(self, stream=True, subrepos=True)
tree = RepoTree(self, subrepos=True)
path = PathInfo(self.root_dir) / path
try:
with self.state:
Expand Down
2 changes: 1 addition & 1 deletion dvc/repo/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def diff(self, a_rev="HEAD", b_rev=None, targets=None):

from dvc.tree.repo import RepoTree

repo_tree = RepoTree(self, stream=True)
repo_tree = RepoTree(self)

b_rev = b_rev if b_rev else "workspace"
results = {}
Expand Down
13 changes: 12 additions & 1 deletion dvc/repo/fetch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
import os

from dvc.config import NoRemoteError
from dvc.exceptions import DownloadError
from dvc.exceptions import DownloadError, NoOutputOrStageError

from . import locked

Expand Down Expand Up @@ -98,6 +99,16 @@ def cb(result):
root = PathInfo(repo.root_dir)
for path in files:
path_info = root / path
try:
used = repo.used_cache(
[os.fspath(path_info)],
force=True,
jobs=jobs,
recursive=True,
)
cb(repo.cloud.pull(used, jobs))
Comment on lines +103 to +109
Copy link
Contributor Author

@efiop efiop Jan 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking this is not really needed, as cache.save() will fetch everything anyway, but I've added this for now because logic around this (e.g. tests) expects specific exceptions and also because this might be faster than save(), as we didn't optimize it yet. Need to check perf first, maybe we can remove it right now.

except (NoOutputOrStageError, NoRemoteError):
pass
self.cache.local.save(
path_info,
repo.repo_tree,
Expand Down
4 changes: 1 addition & 3 deletions dvc/repo/ls.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ def ls(
"""
from dvc.external_repo import external_repo

# use our own RepoTree instance instead of repo.repo_tree since we want to
# fetch directory listings, but don't want to fetch file contents.
with external_repo(url, rev, fetch=False, stream=True) as repo:
with external_repo(url, rev) as repo:
path_info = PathInfo(repo.root_dir)
if path:
path_info /= path
Expand Down
71 changes: 24 additions & 47 deletions dvc/tree/dvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dvc.utils import relpath

from ._metadata import Metadata
from .base import BaseTree, RemoteActionNotImplemented
from .base import BaseTree

if typing.TYPE_CHECKING:
from dvc.output.base import BaseOutput
Expand All @@ -21,22 +21,13 @@ class DvcTree(BaseTree): # pylint:disable=abstract-method

Args:
repo: DVC repo.
fetch: if True, uncached DVC outs will be fetched on `open()`.
stream: if True, uncached DVC outs will be streamed directly from
remote on `open()`.

`stream` takes precedence over `fetch`. If `stream` is enabled and
a remote does not support streaming, uncached DVC outs will be fetched
as a fallback.
"""

scheme = "local"
PARAM_CHECKSUM = "md5"

def __init__(self, repo, fetch=False, stream=False):
def __init__(self, repo):
super().__init__(repo, {"url": repo.root_dir})
self.fetch = fetch
self.stream = stream

def _find_outs(self, path, *args, **kwargs):
outs = self.repo.find_outs_by_path(path, *args, **kwargs)
Expand All @@ -54,9 +45,6 @@ def _get_granular_hash(
self, path_info: PathInfo, out: "BaseOutput", remote=None
):
assert isinstance(path_info, PathInfo)
if not self.fetch and not self.stream:
raise FileNotFoundError

# NOTE: use string paths here for performance reasons
key = tuple(relpath(path_info, out.path_info).split(os.sep))
out.get_dir_cache(remote=remote)
Expand All @@ -80,24 +68,20 @@ def open( # type: ignore

out = outs[0]
if out.changed_cache(filter_info=path):
if not self.fetch and not self.stream:
raise FileNotFoundError
from dvc.config import NoRemoteError

remote_obj = self.repo.cloud.get_remote(remote)
if self.stream:
if out.is_dir_checksum:
checksum = self._get_granular_hash(path, out).value
else:
checksum = out.hash_info.value
try:
remote_info = remote_obj.tree.hash_to_path_info(checksum)
return remote_obj.tree.open(
remote_info, mode=mode, encoding=encoding
)
except RemoteActionNotImplemented:
pass
cache_info = out.get_used_cache(filter_info=path, remote=remote)
self.repo.cloud.pull(cache_info, remote=remote)
try:
remote_obj = self.repo.cloud.get_remote(remote)
except NoRemoteError:
raise FileNotFoundError
if out.is_dir_checksum:
checksum = self._get_granular_hash(path, out).value
else:
checksum = out.hash_info.value
remote_info = remote_obj.tree.hash_to_path_info(checksum)
return remote_obj.tree.open(
remote_info, mode=mode, encoding=encoding
)

if out.is_dir_checksum:
checksum = self._get_granular_hash(path, out).value
Expand Down Expand Up @@ -143,24 +127,17 @@ def isfile(self, path): # pylint: disable=arguments-differ
except FileNotFoundError:
return False

def _fetch_dir(
self, out, filter_info=None, download_callback=None, **kwargs
):
def _fetch_dir(self, out, **kwargs):
# pull dir cache if needed
out.get_dir_cache(**kwargs)

# pull dir contents if needed
if self.fetch and out.changed_cache(filter_info=filter_info):
used_cache = out.get_used_cache(filter_info=filter_info)
downloaded = self.repo.cloud.pull(used_cache, **kwargs)
if download_callback:
download_callback(downloaded)

def _add_dir(self, top, trie, out, **kwargs):
if not self.fetch and not self.stream:
return
dir_cache = out.dir_cache
hash_info = out.cache.save_dir_info(dir_cache)
if hash_info != out.hash_info:
raise FileNotFoundError

self._fetch_dir(out, filter_info=top, **kwargs)
def _add_dir(self, trie, out, **kwargs):
self._fetch_dir(out, **kwargs)

base = out.path_info.parts
for key in out.dir_cache.trie.iterkeys(): # noqa: B301
Expand All @@ -172,7 +149,7 @@ def _walk(self, root, trie, topdown=True, **kwargs):

out = trie.get(root.parts)
if out and out.is_dir_checksum:
self._add_dir(root, trie, out, **kwargs)
self._add_dir(trie, out, **kwargs)

root_len = len(root.parts)
for key, out in trie.iteritems(prefix=root.parts): # noqa: B301
Expand Down Expand Up @@ -215,7 +192,7 @@ def walk(self, top, topdown=True, onerror=None, **kwargs):
trie[out.path_info.parts] = out

if out.is_dir_checksum and root.isin_or_eq(out.path_info):
self._add_dir(top, trie, out, **kwargs)
self._add_dir(trie, out, **kwargs)

yield from self._walk(root, trie, topdown=topdown, **kwargs)

Expand Down
20 changes: 3 additions & 17 deletions dvc/tree/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class RepoTree(BaseTree): # pylint:disable=abstract-method
PARAM_CHECKSUM = "md5"

def __init__(
self, repo, subrepos=False, repo_factory: RepoFactory = None, **kwargs
self, repo, subrepos=False, repo_factory: RepoFactory = None,
):
super().__init__(repo, {"url": repo.root_dir})

Expand All @@ -61,10 +61,8 @@ def __init__(
self._dvctrees = {}
"""Keep a dvctree instance of each repo."""

self._dvctree_configs = kwargs

if hasattr(repo, "dvc_dir"):
self._dvctrees[repo.root_dir] = DvcTree(repo, **kwargs)
self._dvctrees[repo.root_dir] = DvcTree(repo)

def _get_repo(self, path) -> Optional["Repo"]:
"""Returns repo that the path falls in, using prefix.
Expand Down Expand Up @@ -99,12 +97,8 @@ def _update(self, dirs, starting_repo):
scm=self.repo.scm,
rev=self.repo.get_rev(),
repo_factory=self.repo_factory,
fetch=self.fetch,
stream=self.stream,
)
self._dvctrees[repo.root_dir] = DvcTree(
repo, **self._dvctree_configs
)
self._dvctrees[repo.root_dir] = DvcTree(repo)
self._subrepos_trie[d] = repo

def _is_dvc_repo(self, dir_path):
Expand All @@ -131,14 +125,6 @@ def _get_tree_pair(self, path) -> Tuple[BaseTree, Optional[DvcTree]]:
dvc_tree = self._dvctrees.get(repo.root_dir)
return repo.tree, dvc_tree

@property
def fetch(self):
return self._dvctree_configs.get("fetch")

@property
def stream(self):
return self._dvctree_configs.get("stream")

def open(
self, path, mode="r", encoding="utf-8", **kwargs
): # pylint: disable=arguments-differ
Expand Down
20 changes: 20 additions & 0 deletions dvc/tree/webdav.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import logging
import os
import threading
Expand Down Expand Up @@ -122,6 +123,25 @@ def _client(self):

return client

def open(self, path_info, mode="r", encoding=None, **kwargs):
from webdav3.exceptions import RemoteResourceNotFound

assert mode in {"r", "rt", "rb"}

fobj = io.BytesIO()

try:
self._client.download_from(buff=fobj, remote_path=path_info.path)
except RemoteResourceNotFound as exc:
raise FileNotFoundError from exc

fobj.seek(0)

if "mode" == "rb":
return fobj

return io.TextIOWrapper(fobj, encoding=encoding)

# Checks whether file/directory exists at remote
def exists(self, path_info, use_dvcignore=True):
# Use webdav check to test for file existence
Expand Down
4 changes: 2 additions & 2 deletions tests/func/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dvc.cache import Cache
from dvc.config import NoRemoteError
from dvc.dvcfile import Dvcfile
from dvc.exceptions import CollectCacheError, DownloadError
from dvc.exceptions import DownloadError
from dvc.stage.exceptions import StagePathNotFoundError
from dvc.system import System
from dvc.utils.fs import makedirs, remove
Expand Down Expand Up @@ -289,7 +289,7 @@ def test_push_wildcard_from_bare_git_repo(
dvc_repo = make_tmp_dir("dvc-repo", scm=True, dvc=True)
with dvc_repo.chdir():
dvc_repo.dvc.imp(os.fspath(tmp_dir), "dirextra")
with pytest.raises(CollectCacheError):
with pytest.raises(FileNotFoundError):
dvc_repo.dvc.imp(os.fspath(tmp_dir), "dir123")


Expand Down
Loading