Skip to content

Implement straight-to-remote functionality #5198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c57e74f
[WIP] Implement export-to-remote functionality
isidentical Dec 21, 2020
64a3f48
Redistrubition of functionality from tree to remote
isidentical Jan 4, 2021
e081a45
Pass chunk size into the open_url
isidentical Jan 5, 2021
bd331c9
Move functionality away from remote to cache
isidentical Jan 5, 2021
faf4016
Basic progress bars for both transfer types
isidentical Jan 5, 2021
9cbce0c
Proper job calculation
isidentical Jan 5, 2021
c590729
Redesign .open() usage in order to work with different trees
isidentical Jan 5, 2021
69c5acc
Align the signatures of other open() functions to accept **kwargs
isidentical Jan 5, 2021
49c01f3
Implement chunked uploading to the ssh
isidentical Jan 5, 2021
3af38da
Use local cache to compute directory hash (md5)
isidentical Jan 6, 2021
5d89eaf
S3: multipart upload test
isidentical Jan 6, 2021
457f1e8
Wrap local cache's hash_to_path with PathInfo
isidentical Jan 6, 2021
0cef7a9
Use .isdir() instead of a try/except statement for calculating the fr…
isidentical Jan 6, 2021
dc7179e
New CLI layout (add/import-url)
isidentical Jan 6, 2021
37fcb96
Adjust CLI tests to accept new arguments
isidentical Jan 7, 2021
372bb8f
tests: import_url/add func/unit tests
isidentical Jan 7, 2021
5d06ca4
Introduce cache.transfer()
isidentical Jan 7, 2021
811b489
Invalid CLI combination tests
isidentical Jan 7, 2021
3ba20f6
Store dir_info key in the transfer_directory
isidentical Jan 7, 2021
1ee02d9
LocalTree.upload_fobj
isidentical Jan 7, 2021
bacc198
Tests for utils
isidentical Jan 7, 2021
8a73d89
Lazy imports on some places
isidentical Jan 7, 2021
c77a8dd
--to-remote migration (and repo._transfer)
isidentical Jan 8, 2021
3e90a66
Use proper temporary location, more efficient pbar calc
isidentical Jan 8, 2021
859d41f
Directory transfer logic rewrite
isidentical Jan 8, 2021
e0902f6
Use shutil.copyfileobj()
isidentical Jan 11, 2021
c7eee36
Rebase...
isidentical Jan 11, 2021
ef6fd4b
Better tests
isidentical Jan 11, 2021
313ef1a
Inline transfer operation to .add() / .transfer()
isidentical Jan 11, 2021
41d0f1a
Open in the 'rb' mode for the tests
isidentical Jan 11, 2021
4fb69b3
More tests, better invalid option handling at repo level
isidentical Jan 11, 2021
4f05fc0
.read() certain amount of bytes to validate the content
isidentical Jan 12, 2021
9bd778c
Better invalid argument handling
isidentical Jan 12, 2021
6bc2b2e
Better progress bars by using .getsize() API
isidentical Jan 14, 2021
d9e5385
For fallback upload, use the from_info.name instead of temporary naem
isidentical Jan 14, 2021
06e7035
introduce --jobs for add/import-url
isidentical Jan 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions dvc/cache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import os
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from typing import Optional
Expand Down Expand Up @@ -247,6 +248,116 @@ def _save_file(self, path_info, tree, hash_info, save_link=True, **kwargs):

self.tree.state.save(cache_info, hash_info)

def _transfer_file_as_whole(self, from_tree, from_info):
from dvc.utils import tmp_fname

# When we can't use the chunked upload, we have to first download
# and then calculate the hash as if it were a local file and then
# upload it.
local_tree = self.repo.cache.local.tree
local_info = local_tree.path_info / tmp_fname()

from_tree.download(from_info, local_info)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not fit on the local filesystem πŸ™ We could download a chunk and upload a chunk though and then assemble them. Or could just forbid transfering files as a whole for now.

hash_info = local_tree.get_file_hash(local_info)

self.tree.upload(
local_info,
self.tree.hash_to_path_info(hash_info.value),
name=from_info.name,
)
return hash_info

def _transfer_file_as_chunked(self, from_tree, from_info):
from dvc.utils import tmp_fname
from dvc.utils.stream import HashedStreamReader

tmp_info = self.tree.path_info / tmp_fname()
with from_tree.open(
from_info, mode="rb", chunk_size=from_tree.CHUNK_SIZE
) as stream:
stream_reader = HashedStreamReader(stream)
# Since we don't know the hash beforehand, we'll
# upload it to a temporary location and then move
# it.
self.tree.upload_fobj(
stream_reader,
tmp_info,
total=from_tree.getsize(from_info),
desc=from_info.name,
)

hash_info = stream_reader.hash_info
self.tree.move(tmp_info, self.tree.hash_to_path_info(hash_info.value))
return hash_info

def _transfer_file(self, from_tree, from_info):
try:
hash_info = self._transfer_file_as_chunked(from_tree, from_info)
except RemoteActionNotImplemented:
hash_info = self._transfer_file_as_whole(from_tree, from_info)

return hash_info

def _transfer_directory_contents(self, from_tree, from_info, jobs, pbar):
rel_path_infos = {}
from_infos = from_tree.walk_files(from_info)

def create_tasks(executor, amount):
for entry_info in itertools.islice(from_infos, amount):
pbar.total += 1
task = executor.submit(
pbar.wrap_fn(self._transfer_file), from_tree, entry_info
)
rel_path_infos[task] = entry_info.relative_to(from_info)
yield task

pbar.total = 0
with ThreadPoolExecutor(max_workers=jobs) as executor:
tasks = set(create_tasks(executor, jobs * 5))

while tasks:
done, tasks = futures.wait(
tasks, return_when=futures.FIRST_COMPLETED
)
tasks.update(create_tasks(executor, len(done)))
for task in done:
yield rel_path_infos.pop(task), task.result()

def _transfer_directory(
self, from_tree, from_info, jobs, no_progress_bar=False
):
dir_info = DirInfo()

with Tqdm(total=1, unit="Files", disable=no_progress_bar) as pbar:
for entry_info, entry_hash in self._transfer_directory_contents(
from_tree, from_info, jobs, pbar
):
dir_info.trie[entry_info.parts] = entry_hash

local_cache = self.repo.cache.local
(
hash_info,
to_info,
) = local_cache._get_dir_info_hash( # pylint: disable=protected-access
dir_info
)

self.tree.upload(to_info, self.tree.hash_to_path_info(hash_info.value))
return hash_info

def transfer(self, from_tree, from_info, jobs=None, no_progress_bar=False):
jobs = jobs or min((from_tree.jobs, self.tree.jobs))

if from_tree.isdir(from_info):
return self._transfer_directory(
from_tree,
from_info,
jobs=jobs,
no_progress_bar=no_progress_bar,
)
else:
return self._transfer_file(from_tree, from_info)

def _cache_is_copy(self, path_info):
"""Checks whether cache uses copies."""
if self.cache_type_confirmed:
Expand Down
33 changes: 33 additions & 0 deletions dvc/command/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ def run(self):
external=self.args.external,
glob=self.args.glob,
desc=self.args.desc,
out=self.args.out,
remote=self.args.remote,
to_remote=self.args.to_remote,
jobs=self.args.jobs,
)

except DvcException:
Expand Down Expand Up @@ -74,6 +78,35 @@ def add_parser(subparsers, parent_parser):
help="Specify name of the DVC-file this command will generate.",
metavar="<filename>",
)
parser.add_argument(
"-o",
"--out",
help="Destination path to put files to.",
metavar="<path>",
)
parser.add_argument(
"--to-remote",
action="store_true",
default=False,
help="Download it directly to the remote",
Copy link
Contributor

@jorgeorpinel jorgeorpinel Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something like

Suggested change
help="Download it directly to the remote",
help="Track an external target, but don't move it into the "
"workspace, nor cache it. Push it to remote storage "
"instead.",

May need formatting

This comment was marked as off-topic.

)
parser.add_argument(
"-r",
"--remote",
help="Remote storage to download to",
Comment on lines +93 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should definitely be included in --to-remote (let it accept a remote name as argument) to simplify the UI. add --to-remote --remote looks fort of repetitive anyway.

Copy link
Contributor Author

@isidentical isidentical Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually thought about it and decided for the sake of consistency with other commands who works on the remote, it would better to just split it out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, in most cases it'll be just --to-remote, right?

also, can an option in argparse be made to optionally accept a value - in this case we would have to make --to-remote [remote-name] where remote-name is optional (default remote if not specified)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, in most cases it'll be just --to-remote, right?

yes

also, can an option in argparse be made to optionally accept a value - in this case we would have to make --to-remote [remote-name] where remote-name is optional (default remote if not specified)

yes, the implementation is possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the implementation is possible.

then, I guess, I don't have a strong preference I think. --remote is good for consistency, but I would not say it looks critical to me, --to-remote already reminds it a lot.

Copy link
Contributor

@jorgeorpinel jorgeorpinel Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, in most cases it'll be just --to-remote, right?

I'm not sure that most users employ default remotes. Do we know this?

I see the consistency argument but it's not that big a deal, and forcing the exact flag name causes other issues e.g. just add --remote is meaningless (and forbidden, I think, or it it silently ignored?) + its an extra flag to doc/explain and maintain πŸ˜‹

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that most users employ default remotes. Do we know this?

that's why we have default remotes at all. if we are not sure about this we should reconsider the whole logic we work with remotes then

add --remote is meaningless

agreed, I would not do this. may be keep only --to-remote <optional-name>

metavar="<name>",
)
parser.add_argument(
"-j",
"--jobs",
type=int,
help=(
"Number of jobs to run simultaneously. "
"The default value is 4 * cpu_count(). "
"For SSH remotes, the default is 4. "
),
metavar="<number>",
)
parser.add_argument(
"--desc",
type=str,
Expand Down
26 changes: 26 additions & 0 deletions dvc/command/imp_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ def run(self):
out=self.args.out,
fname=self.args.file,
no_exec=self.args.no_exec,
remote=self.args.remote,
to_remote=self.args.to_remote,
desc=self.args.desc,
jobs=self.args.jobs,
)
except DvcException:
logger.exception(
Expand Down Expand Up @@ -68,6 +71,29 @@ def add_parser(subparsers, parent_parser):
default=False,
help="Only create DVC-file without actually downloading it.",
)
import_parser.add_argument(
"--to-remote",
action="store_true",
default=False,
help="Download it directly to the remote",
)
import_parser.add_argument(
"-r",
"--remote",
help="Remote storage to download to",
metavar="<name>",
)
import_parser.add_argument(
"-j",
"--jobs",
type=int,
help=(
"Number of jobs to run simultaneously. "
"The default value is 4 * cpu_count(). "
"For SSH remotes, the default is 4. "
),
metavar="<number>",
)
import_parser.add_argument(
"--desc",
type=str,
Expand Down
18 changes: 18 additions & 0 deletions dvc/data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,24 @@ def pull(
show_checksums=show_checksums,
)

def transfer(self, source, jobs=None, remote=None, command=None):
"""Transfer data items in a cloud-agnostic way.

Args:
source (str): url for the source location.
jobs (int): number of jobs that can be running simultaneously.
remote (dvc.remote.base.BaseRemote): optional remote to compare
cache to. By default remote from core.remote config option
is used.
command (bool): the command which is benefitting from this function
(to be used for reporting better error messages).
"""
from dvc.tree import get_cloud_tree

from_tree = get_cloud_tree(self.repo, url=source)
remote = self.get_remote(remote, command)
return remote.transfer(from_tree, from_tree.path_info, jobs=jobs)

def status(
self,
cache,
Expand Down
29 changes: 16 additions & 13 deletions dvc/istextfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,7 @@
TEXT_CHARS = bytes(range(32, 127)) + b"\n\r\t\f\b"


def istextfile(fname, blocksize=512, tree=None):
""" Uses heuristics to guess whether the given file is text or binary,
by reading a single block of bytes from the file.
If more than 30% of the chars in the block are non-text, or there
are NUL ('\x00') bytes in the block, assume this is a binary file.
"""
if tree:
open_func = tree.open
else:
open_func = open
with open_func(fname, "rb") as fobj:
block = fobj.read(blocksize)

def istextblock(block):
if not block:
# An empty file is considered a valid text file
return True
Expand All @@ -32,3 +20,18 @@ def istextfile(fname, blocksize=512, tree=None):
# occurrences of TEXT_CHARS from the block
nontext = block.translate(None, TEXT_CHARS)
return float(len(nontext)) / len(block) <= 0.30


def istextfile(fname, blocksize=512, tree=None):
""" Uses heuristics to guess whether the given file is text or binary,
by reading a single block of bytes from the file.
If more than 30% of the chars in the block are non-text, or there
are NUL ('\x00') bytes in the block, assume this is a binary file.
"""
if tree:
open_func = tree.open
else:
open_func = open
with open_func(fname, "rb") as fobj:
block = fobj.read(blocksize)
return istextblock(block)
5 changes: 5 additions & 0 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,11 @@ def pull(self, cache, named_cache, jobs=None, show_checksums=False):

return ret

def transfer(self, from_tree, from_info, jobs=None, no_progress_bar=False):
return self.cache.transfer(
from_tree, from_info, jobs=jobs, no_progress_bar=no_progress_bar
)

@staticmethod
def _log_missing_caches(hash_info_dict):
missing_caches = [
Expand Down
Loading