Skip to content

GH-125413: Move pathlib.Path.copy() implementation alongside Path.info #129856

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 1 addition & 159 deletions Lib/pathlib/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
"""

import functools
import io
import posixpath
from errno import EINVAL
from glob import _PathGlobber, _no_recurse_symlinks
from pathlib._os import copyfileobj
from pathlib._os import magic_open, CopyReader, CopyWriter


@functools.cache
Expand All @@ -41,162 +39,6 @@ def _explode_path(path):
return path, names


def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
newline=None):
"""
Open the file pointed to by this path and return a file object, as
the built-in open() function does.
"""
try:
return io.open(path, mode, buffering, encoding, errors, newline)
except TypeError:
pass
cls = type(path)
text = 'b' not in mode
mode = ''.join(sorted(c for c in mode if c not in 'bt'))
if text:
try:
attr = getattr(cls, f'__open_{mode}__')
except AttributeError:
pass
else:
return attr(path, buffering, encoding, errors, newline)

try:
attr = getattr(cls, f'__open_{mode}b__')
except AttributeError:
pass
else:
stream = attr(path, buffering)
if text:
stream = io.TextIOWrapper(stream, encoding, errors, newline)
return stream

raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")


class CopyReader:
"""
Class that implements the "read" part of copying between path objects.
An instance of this class is available from the ReadablePath._copy_reader
property.
"""
__slots__ = ('_path',)

def __init__(self, path):
self._path = path

_readable_metakeys = frozenset()

def _read_metadata(self, metakeys, *, follow_symlinks=True):
"""
Returns path metadata as a dict with string keys.
"""
raise NotImplementedError


class CopyWriter:
"""
Class that implements the "write" part of copying between path objects. An
instance of this class is available from the WritablePath._copy_writer
property.
"""
__slots__ = ('_path',)

def __init__(self, path):
self._path = path

_writable_metakeys = frozenset()

def _write_metadata(self, metadata, *, follow_symlinks=True):
"""
Sets path metadata from the given dict with string keys.
"""
raise NotImplementedError

def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
self._ensure_distinct_path(source)
if preserve_metadata:
metakeys = self._writable_metakeys & source._copy_reader._readable_metakeys
else:
metakeys = None
if not follow_symlinks and source.is_symlink():
self._create_symlink(source, metakeys)
elif source.is_dir():
self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok)
else:
self._create_file(source, metakeys)
return self._path

def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok):
"""Copy the given directory to our path."""
children = list(source.iterdir())
self._path.mkdir(exist_ok=dirs_exist_ok)
for src in children:
dst = self._path.joinpath(src.name)
if not follow_symlinks and src.is_symlink():
dst._copy_writer._create_symlink(src, metakeys)
elif src.is_dir():
dst._copy_writer._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
else:
dst._copy_writer._create_file(src, metakeys)
if metakeys:
metadata = source._copy_reader._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)

def _create_file(self, source, metakeys):
"""Copy the given file to our path."""
self._ensure_different_file(source)
with magic_open(source, 'rb') as source_f:
try:
with magic_open(self._path, 'wb') as target_f:
copyfileobj(source_f, target_f)
except IsADirectoryError as e:
if not self._path.exists():
# Raise a less confusing exception.
raise FileNotFoundError(
f'Directory does not exist: {self._path}') from e
raise
if metakeys:
metadata = source._copy_reader._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)

def _create_symlink(self, source, metakeys):
"""Copy the given symbolic link to our path."""
self._path.symlink_to(source.readlink())
if metakeys:
metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
if metadata:
self._write_metadata(metadata, follow_symlinks=False)

def _ensure_different_file(self, source):
"""
Raise OSError(EINVAL) if both paths refer to the same file.
"""
pass

def _ensure_distinct_path(self, source):
"""
Raise OSError(EINVAL) if the other path is within this path.
"""
# Note: there is no straightforward, foolproof algorithm to determine
# if one directory is within another (a particularly perverse example
# would be a single network share mounted in one location via NFS, and
# in another location via CIFS), so we simply checks whether the
# other path is lexically equal to, or within, this path.
if source == self._path:
err = OSError(EINVAL, "Source and target are the same path")
elif source in self._path.parents:
err = OSError(EINVAL, "Source path is a parent of target path")
else:
return
err.filename = str(source)
err.filename2 = str(self._path)
raise err


class JoinablePath:
"""Base class for pure path objects.

Expand Down
145 changes: 5 additions & 140 deletions Lib/pathlib/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from errno import *
from glob import _StringGlobber, _no_recurse_symlinks
from itertools import chain
from stat import S_IMODE, S_ISDIR, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
from stat import S_ISDIR, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
from _collections_abc import Sequence

try:
Expand All @@ -19,8 +19,8 @@
except ImportError:
grp = None

from pathlib._os import copyfile, PathInfo, DirEntryInfo
from pathlib._abc import CopyReader, CopyWriter, JoinablePath, ReadablePath, WritablePath
from pathlib._os import LocalCopyReader, LocalCopyWriter, PathInfo, DirEntryInfo
from pathlib._abc import JoinablePath, ReadablePath, WritablePath


__all__ = [
Expand Down Expand Up @@ -65,141 +65,6 @@ def __repr__(self):
return "<{}.parents>".format(type(self._path).__name__)


class _LocalCopyReader(CopyReader):
"""This object implements the "read" part of copying local paths. Don't
try to construct it yourself.
"""
__slots__ = ()

_readable_metakeys = {'mode', 'times_ns'}
if hasattr(os.stat_result, 'st_flags'):
_readable_metakeys.add('flags')
if hasattr(os, 'listxattr'):
_readable_metakeys.add('xattrs')
_readable_metakeys = frozenset(_readable_metakeys)

def _read_metadata(self, metakeys, *, follow_symlinks=True):
metadata = {}
if 'mode' in metakeys or 'times_ns' in metakeys or 'flags' in metakeys:
st = self._path.stat(follow_symlinks=follow_symlinks)
if 'mode' in metakeys:
metadata['mode'] = S_IMODE(st.st_mode)
if 'times_ns' in metakeys:
metadata['times_ns'] = st.st_atime_ns, st.st_mtime_ns
if 'flags' in metakeys:
metadata['flags'] = st.st_flags
if 'xattrs' in metakeys:
try:
metadata['xattrs'] = [
(attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks))
for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)]
except OSError as err:
if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
raise
return metadata


class _LocalCopyWriter(CopyWriter):
"""This object implements the "write" part of copying local paths. Don't
try to construct it yourself.
"""
__slots__ = ()

_writable_metakeys = _LocalCopyReader._readable_metakeys

def _write_metadata(self, metadata, *, follow_symlinks=True):
def _nop(*args, ns=None, follow_symlinks=None):
pass

if follow_symlinks:
# use the real function if it exists
def lookup(name):
return getattr(os, name, _nop)
else:
# use the real function only if it exists
# *and* it supports follow_symlinks
def lookup(name):
fn = getattr(os, name, _nop)
if fn in os.supports_follow_symlinks:
return fn
return _nop

times_ns = metadata.get('times_ns')
if times_ns is not None:
lookup("utime")(self._path, ns=times_ns, follow_symlinks=follow_symlinks)
# We must copy extended attributes before the file is (potentially)
# chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
xattrs = metadata.get('xattrs')
if xattrs is not None:
for attr, value in xattrs:
try:
os.setxattr(self._path, attr, value, follow_symlinks=follow_symlinks)
except OSError as e:
if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
raise
mode = metadata.get('mode')
if mode is not None:
try:
lookup("chmod")(self._path, mode, follow_symlinks=follow_symlinks)
except NotImplementedError:
# if we got a NotImplementedError, it's because
# * follow_symlinks=False,
# * lchown() is unavailable, and
# * either
# * fchownat() is unavailable or
# * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
# (it returned ENOSUP.)
# therefore we're out of options--we simply cannot chown the
# symlink. give up, suppress the error.
# (which is what shutil always did in this circumstance.)
pass
flags = metadata.get('flags')
if flags is not None:
try:
lookup("chflags")(self._path, flags, follow_symlinks=follow_symlinks)
except OSError as why:
if why.errno not in (EOPNOTSUPP, ENOTSUP):
raise

if copyfile:
# Use fast OS routine for local file copying where available.
def _create_file(self, source, metakeys):
"""Copy the given file to the given target."""
try:
source = os.fspath(source)
except TypeError:
if not isinstance(source, WritablePath):
raise
super()._create_file(source, metakeys)
else:
copyfile(source, os.fspath(self._path))

if os.name == 'nt':
# Windows: symlink target might not exist yet if we're copying several
# files, so ensure we pass is_dir to os.symlink().
def _create_symlink(self, source, metakeys):
"""Copy the given symlink to the given target."""
self._path.symlink_to(source.readlink(), source.is_dir())
if metakeys:
metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
if metadata:
self._write_metadata(metadata, follow_symlinks=False)

def _ensure_different_file(self, source):
"""
Raise OSError(EINVAL) if both paths refer to the same file.
"""
try:
if not self._path.samefile(source):
return
except (OSError, ValueError):
return
err = OSError(EINVAL, "Source and target are the same file")
err.filename = str(source)
err.filename2 = str(self._path)
raise err


class PurePath(JoinablePath):
"""Base class for manipulating paths without I/O.

Expand Down Expand Up @@ -1190,8 +1055,8 @@ def replace(self, target):
os.replace(self, target)
return self.with_segments(target)

_copy_reader = property(_LocalCopyReader)
_copy_writer = property(_LocalCopyWriter)
_copy_reader = property(LocalCopyReader)
_copy_writer = property(LocalCopyWriter)

def move(self, target):
"""
Expand Down
Loading
Loading