Skip to content

Commit 4bd6d9f

Browse files
authored
fs: merge upload_fobj and upload (#6570)
* fs: merge upload_fobj and upload * Update dvc/fs/base.py
1 parent ef017be commit 4bd6d9f

File tree

12 files changed

+56
-51
lines changed

12 files changed

+56
-51
lines changed

dvc/fs/base.py

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -217,44 +217,50 @@ def is_dir_hash(cls, hash_):
217217
return False
218218
return hash_.endswith(cls.CHECKSUM_DIR_SUFFIX)
219219

220-
def upload(self, from_info, to_info, name=None, no_progress_bar=False):
221-
if not hasattr(self, "_upload"):
222-
raise RemoteActionNotImplemented("upload", self.scheme)
220+
def upload(
221+
self,
222+
from_info,
223+
to_info,
224+
total=None,
225+
desc=None,
226+
no_progress_bar=False,
227+
**pbar_args,
228+
):
229+
is_file_obj = hasattr(from_info, "read")
230+
method = "upload_fobj" if is_file_obj else "_upload"
231+
if not hasattr(self, method):
232+
raise RemoteActionNotImplemented(method, self.scheme)
223233

224234
if to_info.scheme != self.scheme:
225235
raise NotImplementedError
226236

227-
if from_info.scheme != "local":
228-
raise NotImplementedError
229-
230237
logger.debug("Uploading '%s' to '%s'", from_info, to_info)
238+
if is_file_obj:
239+
with Tqdm.wrapattr(
240+
from_info,
241+
"read",
242+
disable=no_progress_bar,
243+
bytes=True,
244+
total=total,
245+
desc=desc,
246+
**pbar_args,
247+
) as wrapped:
248+
# `size` is used to provide hints to the WebdavFileSystem
249+
# for legacy servers.
250+
# pylint: disable=no-member
251+
return self.upload_fobj(wrapped, to_info, size=total)
231252

232-
name = name or from_info.name
253+
if from_info.scheme != "local":
254+
raise NotImplementedError
233255

234-
self._upload( # noqa, pylint: disable=no-member
256+
name = desc or from_info.name
257+
return self._upload( # noqa, pylint: disable=no-member
235258
from_info.fspath,
236259
to_info,
237260
name=name,
238261
no_progress_bar=no_progress_bar,
239-
)
240-
241-
def upload_fobj(
242-
self, fobj, to_info, no_progress_bar=False, size=None, **pbar_args
243-
):
244-
if not hasattr(self, "_upload_fobj"):
245-
raise RemoteActionNotImplemented("upload_fobj", self.scheme)
246-
247-
with Tqdm.wrapattr(
248-
fobj,
249-
"read",
250-
disable=no_progress_bar,
251-
bytes=True,
252-
total=size,
253262
**pbar_args,
254-
) as wrapped:
255-
self._upload_fobj( # pylint: disable=no-member
256-
wrapped, to_info, size=size
257-
)
263+
)
258264

259265
def download(
260266
self,

dvc/fs/fsspec_wrapper.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ def makedirs(self, path_info, **kwargs):
130130
self._with_bucket(path_info), exist_ok=kwargs.pop("exist_ok", True)
131131
)
132132

133-
def _upload_fobj(self, fobj, to_info, size=None):
133+
def upload_fobj(self, fobj, to_info, **kwargs):
134134
self.makedirs(to_info.parent)
135135
with self.open(to_info, "wb") as fdest:
136136
shutil.copyfileobj(fobj, fdest, length=fdest.blocksize)
@@ -141,13 +141,15 @@ def _upload(
141141
self.makedirs(to_info.parent)
142142
size = os.path.getsize(from_file)
143143
with open(from_file, "rb") as fobj:
144-
self.upload_fobj(
144+
with Tqdm.wrapattr(
145145
fobj,
146-
to_info,
147-
size=size,
146+
"read",
147+
disable=no_progress_bar,
148+
bytes=True,
149+
total=size,
148150
desc=name,
149-
no_progress_bar=no_progress_bar,
150-
)
151+
) as wrapped:
152+
self.upload_fobj(wrapped, to_info, size=size)
151153
self.fs.invalidate_cache(self._with_bucket(to_info.parent))
152154

153155
def _download(

dvc/fs/gdrive.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ def _with_bucket(self, path):
298298

299299
return super()._with_bucket(path)
300300

301-
def _upload_fobj(self, fobj, to_info, size: int = None):
301+
def upload_fobj(self, fobj, to_info, **kwargs):
302302
rpath = self._with_bucket(to_info)
303303
self.makedirs(os.path.dirname(rpath))
304-
return self.fs.upload_fobj(fobj, rpath, size=size)
304+
return self.fs.upload_fobj(fobj, rpath, **kwargs)

dvc/fs/hdfs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ def checksum(self, path_info):
244244
size=self.getsize(path_info),
245245
)
246246

247-
def _upload_fobj(self, fobj, to_info, **kwargs):
247+
def upload_fobj(self, fobj, to_info, **kwargs):
248248
with self.hdfs(to_info) as hdfs:
249249
with hdfs.open_output_stream(to_info.path) as fdest:
250250
shutil.copyfileobj(fobj, fdest, self.BLOCK_SIZE)

dvc/fs/local.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def copy(self, from_info, to_info):
108108
self.remove(tmp_info)
109109
raise
110110

111-
def _upload_fobj(self, fobj, to_info, **kwargs):
111+
def upload_fobj(self, fobj, to_info, **kwargs):
112112
self.makedirs(to_info.parent)
113113
tmp_info = to_info.parent / tmp_fname("")
114114
try:

dvc/fs/ssh.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,9 @@ def fs(self):
120120
# Ensure that if an interrupt happens during the transfer, we don't
121121
# pollute the cache.
122122

123-
def _upload_fobj(self, fobj, to_info, *args, **kwargs):
123+
def upload_fobj(self, fobj, to_info, **kwargs):
124124
with as_atomic(self, to_info) as tmp_file:
125-
super()._upload_fobj(fobj, tmp_file, *args, **kwargs)
125+
super().upload_fobj(fobj, tmp_file, **kwargs)
126126

127127
def _upload(self, from_file, to_info, *args, **kwargs):
128128
with as_atomic(self, to_info) as tmp_file:

dvc/fs/utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ def transfer(
3333
return from_fs.download_file(from_info, to_info)
3434

3535
with from_fs.open(from_info, mode="rb") as fobj:
36-
return to_fs.upload_fobj(fobj, to_info)
36+
size = from_fs.getsize(from_info)
37+
return to_fs.upload(fobj, to_info, total=size)
3738
except OSError as exc:
3839
# If the target file already exists, we are going to simply
3940
# ignore the exception (#4992).

dvc/fs/webdav.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import logging
2-
import os
32
import threading
43
from functools import lru_cache
54

@@ -71,15 +70,13 @@ def fs(self):
7170

7271
return WebdavFileSystem(**self.fs_args)
7372

74-
def _upload_fobj(self, fobj, to_info, size: int = None):
73+
def upload_fobj(self, fobj, to_info, **kwargs):
7574
rpath = self.translate_path_info(to_info)
76-
self.makedirs(os.path.dirname(rpath))
75+
size = kwargs.get("size")
7776
# using upload_fileobj to directly upload fileobj rather than buffering
7877
# and using overwrite=True to avoid check for an extra exists call,
7978
# as caller should ensure that the file does not exist beforehand.
80-
return self.fs.client.upload_fileobj(
81-
fobj, rpath, overwrite=True, size=size
82-
)
79+
return self.fs.upload_fileobj(fobj, rpath, overwrite=True, size=size)
8380

8481
@lru_cache(512)
8582
def translate_path_info(self, path):

dvc/fs/webhdfs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def move(self, from_info, to_info):
141141
self.hdfs_client.makedirs(to_info.parent.path)
142142
self.hdfs_client.rename(from_info.path, to_info.path)
143143

144-
def _upload_fobj(self, fobj, to_info, **kwargs):
144+
def upload_fobj(self, fobj, to_info, **kwargs):
145145
with self.hdfs_client.write(to_info.path) as fdest:
146146
shutil.copyfileobj(fobj, fdest)
147147

dvc/objects/db/reference.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def _add_file(
7070
ref_fobj = io.BytesIO(ref_file.to_bytes())
7171
ref_fobj.seek(0)
7272
try:
73-
self.fs.upload_fobj(ref_fobj, to_info)
73+
self.fs.upload(ref_fobj, to_info)
7474
except OSError as exc:
7575
if isinstance(exc, FileExistsError) or (
7676
os.name == "nt"

dvc/objects/stage.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,8 @@ def _upload_file(path_info, fs, odb, upload_odb):
3434
tmp_info = upload_odb.path_info / tmp_fname()
3535
with fs.open(path_info, mode="rb", chunk_size=fs.CHUNK_SIZE) as stream:
3636
stream = HashedStreamReader(stream)
37-
upload_odb.fs.upload_fobj(
38-
stream, tmp_info, desc=path_info.name, size=fs.getsize(path_info)
39-
)
37+
size = fs.getsize(path_info)
38+
upload_odb.fs.upload(stream, tmp_info, desc=path_info.name, total=size)
4039

4140
odb.add(tmp_info, upload_odb.fs, stream.hash_info)
4241
return path_info, odb.get(stream.hash_info)

tests/func/test_fs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ def test_fs_makedirs_on_upload_and_copy(dvc, cloud):
405405
fs = cls(**config)
406406

407407
with io.BytesIO(b"foo") as stream:
408-
fs.upload_fobj(stream, cloud / "dir" / "foo")
408+
fs.upload(stream, cloud / "dir" / "foo")
409409

410410
assert fs.isdir(cloud / "dir")
411411
assert fs.exists(cloud / "dir" / "foo")

0 commit comments

Comments
 (0)