Skip to content

repo: Support streaming and pulling files on RepoTree/DvcTree.open() #3810

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ def unprotect(self):
if self.exists:
self.remote.unprotect(self.path_info)

def _collect_used_dir_cache(
def collect_used_dir_cache(
self, remote=None, force=False, jobs=None, filter_info=None
):
"""Get a list of `info`s related to the given directory.
Expand Down Expand Up @@ -449,7 +449,7 @@ def get_used_cache(self, **kwargs):
return ret

ret.add_child_cache(
self.checksum, self._collect_used_dir_cache(**kwargs),
self.checksum, self.collect_used_dir_cache(**kwargs),
)

return ret
Expand Down
64 changes: 17 additions & 47 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@
from funcy import cached_property, cat, first

from dvc.config import Config
from dvc.exceptions import (
FileMissingError,
IsADirectoryError,
NotDvcRepoError,
OutputNotFoundError,
)
from dvc.exceptions import FileMissingError
from dvc.exceptions import IsADirectoryError as DvcIsADirectoryError
from dvc.exceptions import NotDvcRepoError, OutputNotFoundError
from dvc.ignore import CleanTree
from dvc.path_info import PathInfo
from dvc.remote.base import RemoteActionNotImplemented
from dvc.repo.tree import RepoTree
from dvc.utils.fs import path_isin

from ..utils import parse_target
Expand Down Expand Up @@ -486,48 +483,21 @@ def is_dvc_internal(self, path):
@contextmanager
def open_by_relpath(self, path, remote=None, mode="r", encoding=None):
"""Opens a specified resource as a file descriptor"""
cause = None
try:
out = self.find_out_by_relpath(path)
except OutputNotFoundError as exc:
out = None
cause = exc

if out and out.use_cache:
try:
with self._open_cached(out, remote, mode, encoding) as fd:
yield fd
return
except FileNotFoundError as exc:
raise FileMissingError(path) from exc

abs_path = os.path.join(self.root_dir, path)
if os.path.exists(abs_path):
with open(abs_path, mode=mode, encoding=encoding) as fd:
yield fd
return

raise FileMissingError(path) from cause

def _open_cached(self, out, remote=None, mode="r", encoding=None):
if out.isdir():
raise IsADirectoryError("Can't open a dir")

cache_file = self.cache.local.checksum_to_path_info(out.checksum)

if os.path.exists(cache_file):
return open(cache_file, mode=mode, encoding=encoding)

tree = RepoTree(self, stream=True)
path = os.path.join(self.root_dir, path)
try:
remote_obj = self.cloud.get_remote(remote)
remote_info = remote_obj.checksum_to_path_info(out.checksum)
return remote_obj.open(remote_info, mode=mode, encoding=encoding)
except RemoteActionNotImplemented:
with self.state:
cache_info = out.get_used_cache(remote=remote)
self.cloud.pull(cache_info, remote=remote)

return open(cache_file, mode=mode, encoding=encoding)
with tree.open(
os.path.join(self.root_dir, path),
mode=mode,
encoding=encoding,
remote=remote,
) as fobj:
yield fobj
except FileNotFoundError as exc:
raise FileMissingError(path) from exc
except IsADirectoryError as exc:
raise DvcIsADirectoryError from exc

def close(self):
self.scm.close()
Expand Down
180 changes: 164 additions & 16 deletions dvc/repo/tree.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
import errno
import logging
import os

from dvc.exceptions import OutputNotFoundError
from dvc.path_info import PathInfo
from dvc.remote.base import RemoteActionNotImplemented
from dvc.scm.tree import BaseTree, WorkingTree

logger = logging.getLogger(__name__)


class DvcTree(BaseTree):
def __init__(self, repo):
"""DVC repo tree.

Args:
repo: DVC repo.
fetch: if True, uncached DVC outs will be fetched on `open()`.
stream: if True, uncached DVC outs will be streamed directly from
remote on `open()`.

`stream` takes precedence over `fetch`. If `stream` is enabled and
a remote does not support streaming, uncached DVC outs will be fetched
as a fallback.
"""

def __init__(self, repo, fetch=False, stream=False):
self.repo = repo
self.fetch = fetch
self.stream = stream

def _find_outs(self, path, *args, **kwargs):
outs = self.repo.find_outs_by_path(path, *args, **kwargs)
Expand All @@ -22,14 +40,14 @@ def _is_cached(out):

return outs

def open(self, path, mode="r", encoding="utf-8"):
def open(self, path, mode="r", encoding="utf-8", remote=None):
try:
outs = self._find_outs(path, strict=False)
except OutputNotFoundError as exc:
raise FileNotFoundError from exc

if len(outs) != 1 or outs[0].is_dir_checksum:
raise OSError(errno.EISDIR)
raise IsADirectoryError

out = outs[0]
# temporary hack to make cache use WorkingTree and not GitTree, because
Expand All @@ -38,7 +56,23 @@ def open(self, path, mode="r", encoding="utf-8"):
self.repo.tree = WorkingTree(self.repo.root_dir)
try:
if out.changed_cache():
raise FileNotFoundError
if not self.fetch and not self.stream:
raise FileNotFoundError

remote_obj = self.repo.cloud.get_remote(remote)
if self.stream:
try:
remote_info = remote_obj.checksum_to_path_info(
out.checksum
)
return remote_obj.open(
remote_info, mode=mode, encoding=encoding
)
except RemoteActionNotImplemented:
pass
with self.repo.state:
cache_info = out.get_used_cache(remote=remote)
self.repo.cloud.pull(cache_info, remote=remote)
finally:
self.repo.tree = saved_tree

Expand Down Expand Up @@ -78,14 +112,15 @@ def _walk(self, root, trie, topdown=True):
continue

name = key[root_len]
if len(key) > root_len + 1 or out.is_dir_checksum:
if len(key) > root_len + 1 or (out and out.is_dir_checksum):
dirs.add(name)
continue

files.append(name)

if topdown:
yield root.fspath, list(dirs), files
dirs = list(dirs)
yield root.fspath, dirs, files

for dname in dirs:
yield from self._walk(root / dname, trie)
Expand All @@ -111,6 +146,15 @@ def walk(self, top, topdown=True):
for out in outs:
trie[out.path_info.parts] = out

if out.is_dir_checksum and (self.fetch or self.stream):
# will pull dir cache if needed
with self.repo.state:
cache = out.collect_used_dir_cache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be slow to collect. You only need out.dir_cache, so maybe it is worth creating extracting some method to fetch .dir for it into an out method. But I'm ok with keeping it as is for now.

for _, names in cache.scheme_names(out.scheme):
for name in names:
path_info = out.path_info.parent / name
trie[path_info.parts] = None

yield from self._walk(root, trie, topdown=topdown)

def isdvc(self, path):
Expand All @@ -125,17 +169,121 @@ def isexec(self, path):


class RepoTree(BaseTree):
def __init__(self, repo):
"""DVC + git-tracked files tree.

Args:
repo: DVC or git repo.

Any kwargs will be passed to `DvcTree()`.
"""

def __init__(self, repo, **kwargs):
self.repo = repo
self.dvctree = DvcTree(repo)
if hasattr(repo, "dvc_dir"):
self.dvctree = DvcTree(repo, **kwargs)
else:
# git-only erepo's do not need dvctree
self.dvctree = None

def open(self, path, mode="r", encoding="utf-8", **kwargs):
if self.dvctree and self.dvctree.exists(path):
try:
return self.dvctree.open(
path, mode=mode, encoding=encoding, **kwargs
)
except FileNotFoundError:
if self.isdvc(path):
raise
return self.repo.tree.open(path, mode=mode, encoding=encoding)

def exists(self, path):
return self.repo.tree.exists(path) or (
self.dvctree and self.dvctree.exists(path)
)

def isdir(self, path):
return self.repo.tree.isdir(path) or (
self.dvctree and self.dvctree.isdir(path)
)

def isdvc(self, path):
return self.dvctree is not None and self.dvctree.isdvc(path)

def isfile(self, path):
return self.repo.tree.isfile(path) or (
self.dvctree and self.dvctree.isfile(path)
)

def isexec(self, path):
if self.dvctree and self.dvctree.exists(path):
return self.dvctree.isexec(path)
return self.repo.tree.isexec(path)

def open(self, *args, **kwargs):
def _walk_one(self, walk):
try:
return self.dvctree.open(*args, **kwargs)
except FileNotFoundError:
pass
root, dirs, files = next(walk)
except StopIteration:
return
yield root, dirs, files
for _ in dirs:
yield from self._walk_one(walk)

def _walk(self, dvc_walk, repo_walk):
try:
_, dvc_dirs, dvc_fnames = next(dvc_walk)
repo_root, repo_dirs, repo_fnames = next(repo_walk)
except StopIteration:
return

# separate subdirs into shared dirs, dvc-only dirs, repo-only dirs
dvc_set = set(dvc_dirs)
repo_set = set(repo_dirs)
dvc_only = list(dvc_set - repo_set)
repo_only = list(repo_set - dvc_set)
shared = list(dvc_set & repo_set)
dirs = shared + dvc_only + repo_only

# merge file lists
files = set(dvc_fnames)
for filename in repo_fnames:
files.add(filename)

yield repo_root, dirs, list(files)

# set dir order for next recursion level - shared dirs first so that
# next() for both generators recurses into the same shared directory
dvc_dirs[:] = [dirname for dirname in dirs if dirname in dvc_set]
repo_dirs[:] = [dirname for dirname in dirs if dirname in repo_set]

for dirname in dirs:
if dirname in shared:
yield from self._walk(dvc_walk, repo_walk)
elif dirname in dvc_set:
yield from self._walk_one(dvc_walk)
elif dirname in repo_set:
yield from self._walk_one(repo_walk)

return self.repo.tree.open(*args, **kwargs)
def walk(self, top, topdown=True):
"""Walk and merge both DVC and repo trees."""
assert topdown

def exists(self, path):
return self.repo.tree.exists(path) or self.dvctree.exists(path)
if not self.exists(top):
raise FileNotFoundError

if not self.isdir(top):
raise NotADirectoryError

dvc_exists = self.dvctree and self.dvctree.exists(top)
repo_exists = self.repo.tree.exists(top)
if dvc_exists and not repo_exists:
yield from self.dvctree.walk(top, topdown=topdown)
return
if repo_exists and not dvc_exists:
yield from self.repo.tree.walk(top, topdown=topdown)
return
if not dvc_exists and not repo_exists:
raise FileNotFoundError

dvc_walk = self.dvctree.walk(top, topdown=topdown)
repo_walk = self.repo.tree.walk(top, topdown=topdown)
yield from self._walk(dvc_walk, repo_walk)
Loading