Skip to content

migrate use of upload_fobj to use transfer #6558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions dvc/fs/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
from typing import TYPE_CHECKING
from io import BytesIO
from typing import TYPE_CHECKING, BinaryIO, Union

from .local import LocalFileSystem

Expand All @@ -18,9 +19,25 @@ def transfer(
to_fs: "BaseFileSystem",
to_info: "DvcPath",
move: bool = False,
content: Union[bytes, BinaryIO] = None,
) -> None:
use_move = isinstance(from_fs, type(to_fs)) and move
try:
if content:
Copy link
Collaborator Author

@skshetry skshetry Sep 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit overloaded though, but moving with it for now.

if isinstance(content, bytes):
fobj: BinaryIO = BytesIO(content)
size = len(content)
else:
fobj = content
size = from_fs.getsize(from_info)

desc = (
from_info.name
if isinstance(from_info, from_fs.PATH_CLS)
else from_info
)
return to_fs.upload_fobj(fobj, to_info, size=size, desc=desc)

if use_move:
return to_fs.move(from_info, to_info)

Expand All @@ -33,7 +50,8 @@ def transfer(
return from_fs.download_file(from_info, to_info)

with from_fs.open(from_info, mode="rb") as fobj:
return to_fs.upload_fobj(fobj, to_info)
size = from_fs.getsize(from_info)
return to_fs.upload_fobj(fobj, to_info, size=size)
except OSError as exc:
# If the target file already exists, we are going to simply
# ignore the exception (#4992).
Expand Down
24 changes: 8 additions & 16 deletions dvc/objects/db/reference.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import io
import logging
import os
from typing import TYPE_CHECKING, Dict

from dvc.scheme import Schemes
Expand Down Expand Up @@ -60,25 +58,19 @@ def _add_file(
hash_info: "HashInfo",
move: bool = False,
):
from dvc import fs

self.makedirs(to_info.parent)
if hash_info.isdir:
return super()._add_file(
from_fs, from_info, to_info, hash_info, move
from_fs, from_info, to_info, hash_info, move=move
)

ref_file = ReferenceHashFile(from_info, from_fs, hash_info)
self._obj_cache[hash_info] = ref_file
ref_fobj = io.BytesIO(ref_file.to_bytes())
ref_fobj.seek(0)
try:
self.fs.upload_fobj(ref_fobj, to_info)
except OSError as exc:
if isinstance(exc, FileExistsError) or (
os.name == "nt"
and exc.__context__
and isinstance(exc.__context__, FileExistsError)
):
logger.debug("'%s' file already exists, skipping", to_info)
else:
raise
content = ref_file.to_bytes()
fs.utils.transfer(
from_fs, from_info, self.fs, to_info, move=move, content=content
)
Comment on lines +72 to +74
Copy link
Collaborator Author

@skshetry skshetry Sep 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pmrowla, what do you think of this? Does transfer fit here, or should it use more primitive functions like pipe_file?
(although I want to use less API as possible)

Thinking about it again, I have doubts about this here, I only did it for simplifying the exception handling. 😅

Copy link
Contributor

@pmrowla pmrowla Sep 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upload_fobj for reference objects isn't really a file transfer, it's just a write() of the serialized reference data from memory.

It seems to me like we should still have a distinction between a file transfer and a direct binary data write/upload, since in this case there really isn't a from_fs and from_info that we are transferring.


One alternative would maybe be adding something like ReferenceHashFile.to_memfile() where it returns a memfs://some_tmp_pathname path to a MemoryFileSystem entry (containing the serialized data). And then we could do an actual "file transfer" using the memfs and memfs_path_info without needing the content field?

(but this seems a bit more convoluted than just having an fs.utils.write method)

Copy link
Collaborator Author

@skshetry skshetry Sep 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, initially I started with two methods, but at the end fs.utils.write is going to be equivalent to upload_fobj, so I don't see much difference there. With transfer, I tried to push the problem of getting size for the callback support inside the transfer itself, rather than putting that burden on the caller.

We do the following when transferring files straight to the remote, which is just a file transfer at the end:

 with fs.open(path_info, mode="rb", chunk_size=fs.CHUNK_SIZE) as stream:
         stream = HashedStreamReader(stream)
         upload_odb.fs.upload_fobj(
             stream, tmp_info, desc=path_info.name, size=fs.getsize(path_info)
         )

But maybe the best thing to do here is put that burden on the caller itself, rather than trying to generalize it in transfer.

if from_fs.scheme != Schemes.LOCAL:
self._fs_cache[ReferenceHashFile.config_tuple(from_fs)] = from_fs
11 changes: 5 additions & 6 deletions dvc/objects/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,17 @@


def _upload_file(path_info, fs, odb, upload_odb):
from dvc.fs.utils import transfer
from dvc.utils import tmp_fname
from dvc.utils.stream import HashedStreamReader

tmp_info = upload_odb.path_info / tmp_fname()
with fs.open(path_info, mode="rb", chunk_size=fs.CHUNK_SIZE) as stream:
stream = HashedStreamReader(stream)
upload_odb.fs.upload_fobj(
stream, tmp_info, desc=path_info.name, size=fs.getsize(path_info)
)
wrapped = HashedStreamReader(stream)
transfer(fs, path_info, upload_odb.fs, tmp_info, content=wrapped)

odb.add(tmp_info, upload_odb.fs, stream.hash_info)
return path_info, odb.get(stream.hash_info)
odb.add(tmp_info, upload_odb.fs, wrapped.hash_info)
return path_info, odb.get(wrapped.hash_info)


def _get_file_hash(path_info, fs, name):
Expand Down