diff --git a/Lib/pathlib/_abc.py b/Lib/pathlib/_abc.py index 65d91e4d67b463..800d1b4503d78d 100644 --- a/Lib/pathlib/_abc.py +++ b/Lib/pathlib/_abc.py @@ -12,11 +12,9 @@ """ import functools -import io import posixpath -from errno import EINVAL from glob import _PathGlobber, _no_recurse_symlinks -from pathlib._os import copyfileobj +from pathlib._os import magic_open, CopyReader, CopyWriter @functools.cache @@ -41,162 +39,6 @@ def _explode_path(path): return path, names -def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None, - newline=None): - """ - Open the file pointed to by this path and return a file object, as - the built-in open() function does. - """ - try: - return io.open(path, mode, buffering, encoding, errors, newline) - except TypeError: - pass - cls = type(path) - text = 'b' not in mode - mode = ''.join(sorted(c for c in mode if c not in 'bt')) - if text: - try: - attr = getattr(cls, f'__open_{mode}__') - except AttributeError: - pass - else: - return attr(path, buffering, encoding, errors, newline) - - try: - attr = getattr(cls, f'__open_{mode}b__') - except AttributeError: - pass - else: - stream = attr(path, buffering) - if text: - stream = io.TextIOWrapper(stream, encoding, errors, newline) - return stream - - raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}") - - -class CopyReader: - """ - Class that implements the "read" part of copying between path objects. - An instance of this class is available from the ReadablePath._copy_reader - property. - """ - __slots__ = ('_path',) - - def __init__(self, path): - self._path = path - - _readable_metakeys = frozenset() - - def _read_metadata(self, metakeys, *, follow_symlinks=True): - """ - Returns path metadata as a dict with string keys. - """ - raise NotImplementedError - - -class CopyWriter: - """ - Class that implements the "write" part of copying between path objects. An - instance of this class is available from the WritablePath._copy_writer - property. - """ - __slots__ = ('_path',) - - def __init__(self, path): - self._path = path - - _writable_metakeys = frozenset() - - def _write_metadata(self, metadata, *, follow_symlinks=True): - """ - Sets path metadata from the given dict with string keys. - """ - raise NotImplementedError - - def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata): - self._ensure_distinct_path(source) - if preserve_metadata: - metakeys = self._writable_metakeys & source._copy_reader._readable_metakeys - else: - metakeys = None - if not follow_symlinks and source.is_symlink(): - self._create_symlink(source, metakeys) - elif source.is_dir(): - self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok) - else: - self._create_file(source, metakeys) - return self._path - - def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok): - """Copy the given directory to our path.""" - children = list(source.iterdir()) - self._path.mkdir(exist_ok=dirs_exist_ok) - for src in children: - dst = self._path.joinpath(src.name) - if not follow_symlinks and src.is_symlink(): - dst._copy_writer._create_symlink(src, metakeys) - elif src.is_dir(): - dst._copy_writer._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok) - else: - dst._copy_writer._create_file(src, metakeys) - if metakeys: - metadata = source._copy_reader._read_metadata(metakeys) - if metadata: - self._write_metadata(metadata) - - def _create_file(self, source, metakeys): - """Copy the given file to our path.""" - self._ensure_different_file(source) - with magic_open(source, 'rb') as source_f: - try: - with magic_open(self._path, 'wb') as target_f: - copyfileobj(source_f, target_f) - except IsADirectoryError as e: - if not self._path.exists(): - # Raise a less confusing exception. - raise FileNotFoundError( - f'Directory does not exist: {self._path}') from e - raise - if metakeys: - metadata = source._copy_reader._read_metadata(metakeys) - if metadata: - self._write_metadata(metadata) - - def _create_symlink(self, source, metakeys): - """Copy the given symbolic link to our path.""" - self._path.symlink_to(source.readlink()) - if metakeys: - metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False) - if metadata: - self._write_metadata(metadata, follow_symlinks=False) - - def _ensure_different_file(self, source): - """ - Raise OSError(EINVAL) if both paths refer to the same file. - """ - pass - - def _ensure_distinct_path(self, source): - """ - Raise OSError(EINVAL) if the other path is within this path. - """ - # Note: there is no straightforward, foolproof algorithm to determine - # if one directory is within another (a particularly perverse example - # would be a single network share mounted in one location via NFS, and - # in another location via CIFS), so we simply checks whether the - # other path is lexically equal to, or within, this path. - if source == self._path: - err = OSError(EINVAL, "Source and target are the same path") - elif source in self._path.parents: - err = OSError(EINVAL, "Source path is a parent of target path") - else: - return - err.filename = str(source) - err.filename2 = str(self._path) - raise err - - class JoinablePath: """Base class for pure path objects. diff --git a/Lib/pathlib/_local.py b/Lib/pathlib/_local.py index 6cdcd448991c8c..956c1920bf6d78 100644 --- a/Lib/pathlib/_local.py +++ b/Lib/pathlib/_local.py @@ -7,7 +7,7 @@ from errno import * from glob import _StringGlobber, _no_recurse_symlinks from itertools import chain -from stat import S_IMODE, S_ISDIR, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO +from stat import S_ISDIR, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO from _collections_abc import Sequence try: @@ -19,8 +19,8 @@ except ImportError: grp = None -from pathlib._os import copyfile, PathInfo, DirEntryInfo -from pathlib._abc import CopyReader, CopyWriter, JoinablePath, ReadablePath, WritablePath +from pathlib._os import LocalCopyReader, LocalCopyWriter, PathInfo, DirEntryInfo +from pathlib._abc import JoinablePath, ReadablePath, WritablePath __all__ = [ @@ -65,141 +65,6 @@ def __repr__(self): return "<{}.parents>".format(type(self._path).__name__) -class _LocalCopyReader(CopyReader): - """This object implements the "read" part of copying local paths. Don't - try to construct it yourself. - """ - __slots__ = () - - _readable_metakeys = {'mode', 'times_ns'} - if hasattr(os.stat_result, 'st_flags'): - _readable_metakeys.add('flags') - if hasattr(os, 'listxattr'): - _readable_metakeys.add('xattrs') - _readable_metakeys = frozenset(_readable_metakeys) - - def _read_metadata(self, metakeys, *, follow_symlinks=True): - metadata = {} - if 'mode' in metakeys or 'times_ns' in metakeys or 'flags' in metakeys: - st = self._path.stat(follow_symlinks=follow_symlinks) - if 'mode' in metakeys: - metadata['mode'] = S_IMODE(st.st_mode) - if 'times_ns' in metakeys: - metadata['times_ns'] = st.st_atime_ns, st.st_mtime_ns - if 'flags' in metakeys: - metadata['flags'] = st.st_flags - if 'xattrs' in metakeys: - try: - metadata['xattrs'] = [ - (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks)) - for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)] - except OSError as err: - if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES): - raise - return metadata - - -class _LocalCopyWriter(CopyWriter): - """This object implements the "write" part of copying local paths. Don't - try to construct it yourself. - """ - __slots__ = () - - _writable_metakeys = _LocalCopyReader._readable_metakeys - - def _write_metadata(self, metadata, *, follow_symlinks=True): - def _nop(*args, ns=None, follow_symlinks=None): - pass - - if follow_symlinks: - # use the real function if it exists - def lookup(name): - return getattr(os, name, _nop) - else: - # use the real function only if it exists - # *and* it supports follow_symlinks - def lookup(name): - fn = getattr(os, name, _nop) - if fn in os.supports_follow_symlinks: - return fn - return _nop - - times_ns = metadata.get('times_ns') - if times_ns is not None: - lookup("utime")(self._path, ns=times_ns, follow_symlinks=follow_symlinks) - # We must copy extended attributes before the file is (potentially) - # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. - xattrs = metadata.get('xattrs') - if xattrs is not None: - for attr, value in xattrs: - try: - os.setxattr(self._path, attr, value, follow_symlinks=follow_symlinks) - except OSError as e: - if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES): - raise - mode = metadata.get('mode') - if mode is not None: - try: - lookup("chmod")(self._path, mode, follow_symlinks=follow_symlinks) - except NotImplementedError: - # if we got a NotImplementedError, it's because - # * follow_symlinks=False, - # * lchown() is unavailable, and - # * either - # * fchownat() is unavailable or - # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. - # (it returned ENOSUP.) - # therefore we're out of options--we simply cannot chown the - # symlink. give up, suppress the error. - # (which is what shutil always did in this circumstance.) - pass - flags = metadata.get('flags') - if flags is not None: - try: - lookup("chflags")(self._path, flags, follow_symlinks=follow_symlinks) - except OSError as why: - if why.errno not in (EOPNOTSUPP, ENOTSUP): - raise - - if copyfile: - # Use fast OS routine for local file copying where available. - def _create_file(self, source, metakeys): - """Copy the given file to the given target.""" - try: - source = os.fspath(source) - except TypeError: - if not isinstance(source, WritablePath): - raise - super()._create_file(source, metakeys) - else: - copyfile(source, os.fspath(self._path)) - - if os.name == 'nt': - # Windows: symlink target might not exist yet if we're copying several - # files, so ensure we pass is_dir to os.symlink(). - def _create_symlink(self, source, metakeys): - """Copy the given symlink to the given target.""" - self._path.symlink_to(source.readlink(), source.is_dir()) - if metakeys: - metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False) - if metadata: - self._write_metadata(metadata, follow_symlinks=False) - - def _ensure_different_file(self, source): - """ - Raise OSError(EINVAL) if both paths refer to the same file. - """ - try: - if not self._path.samefile(source): - return - except (OSError, ValueError): - return - err = OSError(EINVAL, "Source and target are the same file") - err.filename = str(source) - err.filename2 = str(self._path) - raise err - - class PurePath(JoinablePath): """Base class for manipulating paths without I/O. @@ -1190,8 +1055,8 @@ def replace(self, target): os.replace(self, target) return self.with_segments(target) - _copy_reader = property(_LocalCopyReader) - _copy_writer = property(_LocalCopyWriter) + _copy_reader = property(LocalCopyReader) + _copy_writer = property(LocalCopyWriter) def move(self, target): """ diff --git a/Lib/pathlib/_os.py b/Lib/pathlib/_os.py index c2febb773cd83a..c0c81ada858cdf 100644 --- a/Lib/pathlib/_os.py +++ b/Lib/pathlib/_os.py @@ -3,7 +3,8 @@ """ from errno import * -from stat import S_ISDIR, S_ISREG, S_ISLNK +from stat import S_ISDIR, S_ISREG, S_ISLNK, S_IMODE +import io import os import sys try: @@ -165,6 +166,295 @@ def copyfileobj(source_f, target_f): write_target(buf) +def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None, + newline=None): + """ + Open the file pointed to by this path and return a file object, as + the built-in open() function does. + """ + try: + return io.open(path, mode, buffering, encoding, errors, newline) + except TypeError: + pass + cls = type(path) + text = 'b' not in mode + mode = ''.join(sorted(c for c in mode if c not in 'bt')) + if text: + try: + attr = getattr(cls, f'__open_{mode}__') + except AttributeError: + pass + else: + return attr(path, buffering, encoding, errors, newline) + + try: + attr = getattr(cls, f'__open_{mode}b__') + except AttributeError: + pass + else: + stream = attr(path, buffering) + if text: + stream = io.TextIOWrapper(stream, encoding, errors, newline) + return stream + + raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}") + + +class CopyReader: + """ + Class that implements the "read" part of copying between path objects. + An instance of this class is available from the ReadablePath._copy_reader + property. + """ + __slots__ = ('_path',) + + def __init__(self, path): + self._path = path + + _readable_metakeys = frozenset() + + def _read_metadata(self, metakeys, *, follow_symlinks=True): + """ + Returns path metadata as a dict with string keys. + """ + raise NotImplementedError + + +class CopyWriter: + """ + Class that implements the "write" part of copying between path objects. An + instance of this class is available from the WritablePath._copy_writer + property. + """ + __slots__ = ('_path',) + + def __init__(self, path): + self._path = path + + _writable_metakeys = frozenset() + + def _write_metadata(self, metadata, *, follow_symlinks=True): + """ + Sets path metadata from the given dict with string keys. + """ + raise NotImplementedError + + def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata): + self._ensure_distinct_path(source) + if preserve_metadata: + metakeys = self._writable_metakeys & source._copy_reader._readable_metakeys + else: + metakeys = None + if not follow_symlinks and source.is_symlink(): + self._create_symlink(source, metakeys) + elif source.is_dir(): + self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok) + else: + self._create_file(source, metakeys) + return self._path + + def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok): + """Copy the given directory to our path.""" + children = list(source.iterdir()) + self._path.mkdir(exist_ok=dirs_exist_ok) + for src in children: + dst = self._path.joinpath(src.name) + if not follow_symlinks and src.is_symlink(): + dst._copy_writer._create_symlink(src, metakeys) + elif src.is_dir(): + dst._copy_writer._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok) + else: + dst._copy_writer._create_file(src, metakeys) + if metakeys: + metadata = source._copy_reader._read_metadata(metakeys) + if metadata: + self._write_metadata(metadata) + + def _create_file(self, source, metakeys): + """Copy the given file to our path.""" + self._ensure_different_file(source) + with magic_open(source, 'rb') as source_f: + try: + with magic_open(self._path, 'wb') as target_f: + copyfileobj(source_f, target_f) + except IsADirectoryError as e: + if not self._path.exists(): + # Raise a less confusing exception. + raise FileNotFoundError( + f'Directory does not exist: {self._path}') from e + raise + if metakeys: + metadata = source._copy_reader._read_metadata(metakeys) + if metadata: + self._write_metadata(metadata) + + def _create_symlink(self, source, metakeys): + """Copy the given symbolic link to our path.""" + self._path.symlink_to(source.readlink()) + if metakeys: + metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False) + if metadata: + self._write_metadata(metadata, follow_symlinks=False) + + def _ensure_different_file(self, source): + """ + Raise OSError(EINVAL) if both paths refer to the same file. + """ + pass + + def _ensure_distinct_path(self, source): + """ + Raise OSError(EINVAL) if the other path is within this path. + """ + # Note: there is no straightforward, foolproof algorithm to determine + # if one directory is within another (a particularly perverse example + # would be a single network share mounted in one location via NFS, and + # in another location via CIFS), so we simply checks whether the + # other path is lexically equal to, or within, this path. + if source == self._path: + err = OSError(EINVAL, "Source and target are the same path") + elif source in self._path.parents: + err = OSError(EINVAL, "Source path is a parent of target path") + else: + return + err.filename = str(source) + err.filename2 = str(self._path) + raise err + + +class LocalCopyReader(CopyReader): + """This object implements the "read" part of copying local paths. Don't + try to construct it yourself. + """ + __slots__ = () + + _readable_metakeys = {'mode', 'times_ns'} + if hasattr(os.stat_result, 'st_flags'): + _readable_metakeys.add('flags') + if hasattr(os, 'listxattr'): + _readable_metakeys.add('xattrs') + _readable_metakeys = frozenset(_readable_metakeys) + + def _read_metadata(self, metakeys, *, follow_symlinks=True): + metadata = {} + if 'mode' in metakeys or 'times_ns' in metakeys or 'flags' in metakeys: + st = self._path.stat(follow_symlinks=follow_symlinks) + if 'mode' in metakeys: + metadata['mode'] = S_IMODE(st.st_mode) + if 'times_ns' in metakeys: + metadata['times_ns'] = st.st_atime_ns, st.st_mtime_ns + if 'flags' in metakeys: + metadata['flags'] = st.st_flags + if 'xattrs' in metakeys: + try: + metadata['xattrs'] = [ + (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks)) + for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)] + except OSError as err: + if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES): + raise + return metadata + + +class LocalCopyWriter(CopyWriter): + """This object implements the "write" part of copying local paths. Don't + try to construct it yourself. + """ + __slots__ = () + + _writable_metakeys = LocalCopyReader._readable_metakeys + + def _write_metadata(self, metadata, *, follow_symlinks=True): + def _nop(*args, ns=None, follow_symlinks=None): + pass + + if follow_symlinks: + # use the real function if it exists + def lookup(name): + return getattr(os, name, _nop) + else: + # use the real function only if it exists + # *and* it supports follow_symlinks + def lookup(name): + fn = getattr(os, name, _nop) + if fn in os.supports_follow_symlinks: + return fn + return _nop + + times_ns = metadata.get('times_ns') + if times_ns is not None: + lookup("utime")(self._path, ns=times_ns, follow_symlinks=follow_symlinks) + # We must copy extended attributes before the file is (potentially) + # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. + xattrs = metadata.get('xattrs') + if xattrs is not None: + for attr, value in xattrs: + try: + os.setxattr(self._path, attr, value, follow_symlinks=follow_symlinks) + except OSError as e: + if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES): + raise + mode = metadata.get('mode') + if mode is not None: + try: + lookup("chmod")(self._path, mode, follow_symlinks=follow_symlinks) + except NotImplementedError: + # if we got a NotImplementedError, it's because + # * follow_symlinks=False, + # * lchown() is unavailable, and + # * either + # * fchownat() is unavailable or + # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. + # (it returned ENOSUP.) + # therefore we're out of options--we simply cannot chown the + # symlink. give up, suppress the error. + # (which is what shutil always did in this circumstance.) + pass + flags = metadata.get('flags') + if flags is not None: + try: + lookup("chflags")(self._path, flags, follow_symlinks=follow_symlinks) + except OSError as why: + if why.errno not in (EOPNOTSUPP, ENOTSUP): + raise + + if copyfile: + # Use fast OS routine for local file copying where available. + def _create_file(self, source, metakeys): + """Copy the given file to the given target.""" + try: + source = os.fspath(source) + except TypeError: + super()._create_file(source, metakeys) + else: + copyfile(source, os.fspath(self._path)) + + if os.name == 'nt': + # Windows: symlink target might not exist yet if we're copying several + # files, so ensure we pass is_dir to os.symlink(). + def _create_symlink(self, source, metakeys): + """Copy the given symlink to the given target.""" + self._path.symlink_to(source.readlink(), source.is_dir()) + if metakeys: + metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False) + if metadata: + self._write_metadata(metadata, follow_symlinks=False) + + def _ensure_different_file(self, source): + """ + Raise OSError(EINVAL) if both paths refer to the same file. + """ + try: + if not self._path.samefile(source): + return + except (OSError, ValueError): + return + err = OSError(EINVAL, "Source and target are the same file") + err.filename = str(source) + err.filename2 = str(self._path) + raise err + + class _PathInfoBase: __slots__ = ()