Skip to content

Commit 51ada6d

Browse files
authored
Implement straight-to-remote functionality (#5198)
* Implement export-to-remote functionality * Redistrubition of functionality from tree to remote * Pass chunk size into the open_url * Move functionality away from remote to cache * Basic progress bars for both transfer types * Proper job calculation * Redesign .open() usage in order to work with different trees * Align the signatures of other open() functions to accept **kwargs * Implement chunked uploading to the ssh * Use local cache to compute directory hash (md5) * S3: multipart upload test * Wrap local cache's hash_to_path with PathInfo * Use .isdir() instead of a try/except statement for calculating the from_infos * New CLI layout (add/import-url) * Adjust CLI tests to accept new arguments * tests: import_url/add func/unit tests * Introduce cache.transfer() * Invalid CLI combination tests * Store dir_info key in the transfer_directory * LocalTree.upload_fobj * Tests for utils * Lazy imports on some places * --to-remote migration (and repo._transfer) * Use proper temporary location, more efficient pbar calc * Directory transfer logic rewrite * Use shutil.copyfileobj() * Rebase... * Better tests * Inline transfer operation to .add() / .transfer() * Open in the 'rb' mode for the tests * More tests, better invalid option handling at repo level * .read() certain amount of bytes to validate the content * Better invalid argument handling * Better progress bars by using .getsize() API * For fallback upload, use the from_info.name instead of temporary naem * introduce --jobs for add/import-url
1 parent 55df6b8 commit 51ada6d

27 files changed

+719
-41
lines changed

dvc/cache/base.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
import logging
55
import os
6+
from concurrent import futures
67
from concurrent.futures import ThreadPoolExecutor
78
from copy import copy
89
from typing import Optional
@@ -247,6 +248,116 @@ def _save_file(self, path_info, tree, hash_info, save_link=True, **kwargs):
247248

248249
self.tree.state.save(cache_info, hash_info)
249250

251+
def _transfer_file_as_whole(self, from_tree, from_info):
252+
from dvc.utils import tmp_fname
253+
254+
# When we can't use the chunked upload, we have to first download
255+
# and then calculate the hash as if it were a local file and then
256+
# upload it.
257+
local_tree = self.repo.cache.local.tree
258+
local_info = local_tree.path_info / tmp_fname()
259+
260+
from_tree.download(from_info, local_info)
261+
hash_info = local_tree.get_file_hash(local_info)
262+
263+
self.tree.upload(
264+
local_info,
265+
self.tree.hash_to_path_info(hash_info.value),
266+
name=from_info.name,
267+
)
268+
return hash_info
269+
270+
def _transfer_file_as_chunked(self, from_tree, from_info):
271+
from dvc.utils import tmp_fname
272+
from dvc.utils.stream import HashedStreamReader
273+
274+
tmp_info = self.tree.path_info / tmp_fname()
275+
with from_tree.open(
276+
from_info, mode="rb", chunk_size=from_tree.CHUNK_SIZE
277+
) as stream:
278+
stream_reader = HashedStreamReader(stream)
279+
# Since we don't know the hash beforehand, we'll
280+
# upload it to a temporary location and then move
281+
# it.
282+
self.tree.upload_fobj(
283+
stream_reader,
284+
tmp_info,
285+
total=from_tree.getsize(from_info),
286+
desc=from_info.name,
287+
)
288+
289+
hash_info = stream_reader.hash_info
290+
self.tree.move(tmp_info, self.tree.hash_to_path_info(hash_info.value))
291+
return hash_info
292+
293+
def _transfer_file(self, from_tree, from_info):
294+
try:
295+
hash_info = self._transfer_file_as_chunked(from_tree, from_info)
296+
except RemoteActionNotImplemented:
297+
hash_info = self._transfer_file_as_whole(from_tree, from_info)
298+
299+
return hash_info
300+
301+
def _transfer_directory_contents(self, from_tree, from_info, jobs, pbar):
302+
rel_path_infos = {}
303+
from_infos = from_tree.walk_files(from_info)
304+
305+
def create_tasks(executor, amount):
306+
for entry_info in itertools.islice(from_infos, amount):
307+
pbar.total += 1
308+
task = executor.submit(
309+
pbar.wrap_fn(self._transfer_file), from_tree, entry_info
310+
)
311+
rel_path_infos[task] = entry_info.relative_to(from_info)
312+
yield task
313+
314+
pbar.total = 0
315+
with ThreadPoolExecutor(max_workers=jobs) as executor:
316+
tasks = set(create_tasks(executor, jobs * 5))
317+
318+
while tasks:
319+
done, tasks = futures.wait(
320+
tasks, return_when=futures.FIRST_COMPLETED
321+
)
322+
tasks.update(create_tasks(executor, len(done)))
323+
for task in done:
324+
yield rel_path_infos.pop(task), task.result()
325+
326+
def _transfer_directory(
327+
self, from_tree, from_info, jobs, no_progress_bar=False
328+
):
329+
dir_info = DirInfo()
330+
331+
with Tqdm(total=1, unit="Files", disable=no_progress_bar) as pbar:
332+
for entry_info, entry_hash in self._transfer_directory_contents(
333+
from_tree, from_info, jobs, pbar
334+
):
335+
dir_info.trie[entry_info.parts] = entry_hash
336+
337+
local_cache = self.repo.cache.local
338+
(
339+
hash_info,
340+
to_info,
341+
) = local_cache._get_dir_info_hash( # pylint: disable=protected-access
342+
dir_info
343+
)
344+
345+
self.tree.upload(to_info, self.tree.hash_to_path_info(hash_info.value))
346+
return hash_info
347+
348+
def transfer(self, from_tree, from_info, jobs=None, no_progress_bar=False):
349+
jobs = jobs or min((from_tree.jobs, self.tree.jobs))
350+
351+
if from_tree.isdir(from_info):
352+
return self._transfer_directory(
353+
from_tree,
354+
from_info,
355+
jobs=jobs,
356+
no_progress_bar=no_progress_bar,
357+
)
358+
else:
359+
return self._transfer_file(from_tree, from_info)
360+
250361
def _cache_is_copy(self, path_info):
251362
"""Checks whether cache uses copies."""
252363
if self.cache_type_confirmed:

dvc/command/add.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ def run(self):
2626
external=self.args.external,
2727
glob=self.args.glob,
2828
desc=self.args.desc,
29+
out=self.args.out,
30+
remote=self.args.remote,
31+
to_remote=self.args.to_remote,
32+
jobs=self.args.jobs,
2933
)
3034

3135
except DvcException:
@@ -74,6 +78,35 @@ def add_parser(subparsers, parent_parser):
7478
help="Specify name of the DVC-file this command will generate.",
7579
metavar="<filename>",
7680
)
81+
parser.add_argument(
82+
"-o",
83+
"--out",
84+
help="Destination path to put files to.",
85+
metavar="<path>",
86+
)
87+
parser.add_argument(
88+
"--to-remote",
89+
action="store_true",
90+
default=False,
91+
help="Download it directly to the remote",
92+
)
93+
parser.add_argument(
94+
"-r",
95+
"--remote",
96+
help="Remote storage to download to",
97+
metavar="<name>",
98+
)
99+
parser.add_argument(
100+
"-j",
101+
"--jobs",
102+
type=int,
103+
help=(
104+
"Number of jobs to run simultaneously. "
105+
"The default value is 4 * cpu_count(). "
106+
"For SSH remotes, the default is 4. "
107+
),
108+
metavar="<number>",
109+
)
77110
parser.add_argument(
78111
"--desc",
79112
type=str,

dvc/command/imp_url.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ def run(self):
1616
out=self.args.out,
1717
fname=self.args.file,
1818
no_exec=self.args.no_exec,
19+
remote=self.args.remote,
20+
to_remote=self.args.to_remote,
1921
desc=self.args.desc,
22+
jobs=self.args.jobs,
2023
)
2124
except DvcException:
2225
logger.exception(
@@ -68,6 +71,29 @@ def add_parser(subparsers, parent_parser):
6871
default=False,
6972
help="Only create DVC-file without actually downloading it.",
7073
)
74+
import_parser.add_argument(
75+
"--to-remote",
76+
action="store_true",
77+
default=False,
78+
help="Download it directly to the remote",
79+
)
80+
import_parser.add_argument(
81+
"-r",
82+
"--remote",
83+
help="Remote storage to download to",
84+
metavar="<name>",
85+
)
86+
import_parser.add_argument(
87+
"-j",
88+
"--jobs",
89+
type=int,
90+
help=(
91+
"Number of jobs to run simultaneously. "
92+
"The default value is 4 * cpu_count(). "
93+
"For SSH remotes, the default is 4. "
94+
),
95+
metavar="<number>",
96+
)
7197
import_parser.add_argument(
7298
"--desc",
7399
type=str,

dvc/data_cloud.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,24 @@ def pull(
9292
show_checksums=show_checksums,
9393
)
9494

95+
def transfer(self, source, jobs=None, remote=None, command=None):
96+
"""Transfer data items in a cloud-agnostic way.
97+
98+
Args:
99+
source (str): url for the source location.
100+
jobs (int): number of jobs that can be running simultaneously.
101+
remote (dvc.remote.base.BaseRemote): optional remote to compare
102+
cache to. By default remote from core.remote config option
103+
is used.
104+
command (bool): the command which is benefitting from this function
105+
(to be used for reporting better error messages).
106+
"""
107+
from dvc.tree import get_cloud_tree
108+
109+
from_tree = get_cloud_tree(self.repo, url=source)
110+
remote = self.get_remote(remote, command)
111+
return remote.transfer(from_tree, from_tree.path_info, jobs=jobs)
112+
95113
def status(
96114
self,
97115
cache,

dvc/istextfile.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,7 @@
77
TEXT_CHARS = bytes(range(32, 127)) + b"\n\r\t\f\b"
88

99

10-
def istextfile(fname, blocksize=512, tree=None):
11-
""" Uses heuristics to guess whether the given file is text or binary,
12-
by reading a single block of bytes from the file.
13-
If more than 30% of the chars in the block are non-text, or there
14-
are NUL ('\x00') bytes in the block, assume this is a binary file.
15-
"""
16-
if tree:
17-
open_func = tree.open
18-
else:
19-
open_func = open
20-
with open_func(fname, "rb") as fobj:
21-
block = fobj.read(blocksize)
22-
10+
def istextblock(block):
2311
if not block:
2412
# An empty file is considered a valid text file
2513
return True
@@ -32,3 +20,18 @@ def istextfile(fname, blocksize=512, tree=None):
3220
# occurrences of TEXT_CHARS from the block
3321
nontext = block.translate(None, TEXT_CHARS)
3422
return float(len(nontext)) / len(block) <= 0.30
23+
24+
25+
def istextfile(fname, blocksize=512, tree=None):
26+
""" Uses heuristics to guess whether the given file is text or binary,
27+
by reading a single block of bytes from the file.
28+
If more than 30% of the chars in the block are non-text, or there
29+
are NUL ('\x00') bytes in the block, assume this is a binary file.
30+
"""
31+
if tree:
32+
open_func = tree.open
33+
else:
34+
open_func = open
35+
with open_func(fname, "rb") as fobj:
36+
block = fobj.read(blocksize)
37+
return istextblock(block)

dvc/remote/base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,11 @@ def pull(self, cache, named_cache, jobs=None, show_checksums=False):
472472

473473
return ret
474474

475+
def transfer(self, from_tree, from_info, jobs=None, no_progress_bar=False):
476+
return self.cache.transfer(
477+
from_tree, from_info, jobs=jobs, no_progress_bar=no_progress_bar
478+
)
479+
475480
@staticmethod
476481
def _log_missing_caches(hash_info_dict):
477482
missing_caches = [

0 commit comments

Comments
 (0)