Skip to content
Open
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [2.4.12] - (Unreleased)

### Added

- Symlink support to `ReadTarFS` [#426](https://github.com/PyFilesystem/pyfilesystem2/pull/426). Closes [#409](https://github.com/PyFilesystem/pyfilesystem2/issues/409).

### Changed

- Start testing on PyPy. Due to [#342](https://github.com/PyFilesystem/pyfilesystem2/issues/342)
Expand Down
194 changes: 149 additions & 45 deletions fs/tarfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import tarfile
import typing
from collections import OrderedDict
from typing import cast, IO

import six
from six.moves import map

from . import errors
from .base import FS
Expand All @@ -22,7 +22,18 @@
from .opener import open_fs
from .permissions import Permissions
from ._url_tools import url_quote
from .path import relpath, basename, isbase, normpath, parts, frombase
from .path import (
dirname,
join,
relpath,
basename,
isbase,
normpath,
parts,
frombase,
recursepath,
relativefrom,
)
from .wrapfs import WrapFS

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -157,8 +168,7 @@ def __init__(

@six.python_2_unicode_compatible
class WriteTarFS(WrapFS):
"""A writable tar file.
"""
"""A writable tar file."""

def __init__(
self,
Expand Down Expand Up @@ -234,8 +244,7 @@ def write_tar(

@six.python_2_unicode_compatible
class ReadTarFS(FS):
"""A readable tar file.
"""
"""A readable tar file."""

_meta = {
"case_insensitive": True,
Expand All @@ -257,6 +266,8 @@ class ReadTarFS(FS):
tarfile.SYMTYPE: ResourceType.symlink,
tarfile.CONTTYPE: ResourceType.file,
tarfile.LNKTYPE: ResourceType.symlink,
# this is how we mark implicit directories
tarfile.DIRTYPE + b"i": ResourceType.directory,
}

@errors.CreateFailed.catch_all
Expand All @@ -277,24 +288,74 @@ def _directory_entries(self):
"""Lazy directory cache."""
if self._directory_cache is None:
_decode = self._decode
_encode = self._encode

# collect all directory entries and remove slashes
_directory_entries = (
(_decode(info.name).strip("/"), info) for info in self._tar
)

def _list_tar():
for name, info in _directory_entries:
try:
_name = normpath(name)
except IllegalBackReference:
# Back references outside root, must be up to no good.
pass
else:
if _name:
yield _name, info

self._directory_cache = OrderedDict(_list_tar())
# build the cache first before updating it to reduce chances
# of data races
_cache = OrderedDict()
for name, info in _directory_entries:
# check for any invalid back references
try:
_name = normpath(name)
except IllegalBackReference:
continue

# add all implicit dirnames if not in the cache already
for partial_name in map(relpath, recursepath(_name)):
dirinfo = tarfile.TarInfo(_encode(partial_name))
dirinfo.type = tarfile.DIRTYPE
_cache.setdefault(partial_name, dirinfo)

# add the entry itself, potentially overwriting implicit entries
_cache[_name] = info

self._directory_cache = _cache
return self._directory_cache

def _follow_symlink(self, entry):
"""Follow an symlink `TarInfo` to find a concrete entry.

Returns ``None`` if the symlink is dangling.
"""
done = set()
_entry = entry
while _entry.issym():
linkname = normpath(
join(dirname(self._decode(_entry.name)), self._decode(_entry.linkname))
)
resolved = self._resolve(linkname)
if resolved is None:
return None
done.add(_entry)
_entry = self._directory_entries[resolved]
# if we already saw this symlink, then we are following cyclic
# symlinks and we should break the loop
if _entry in done:
return None

return _entry

def _resolve(self, path):
"""Replace path components that are symlinks with concrete components.

Returns ``None`` when the path could not be resolved to an existing
entry in the archive.
"""
if path in self._directory_entries or not path:
return path
for prefix in map(relpath, reversed(recursepath(path))):
suffix = relativefrom(prefix, path)
entry = self._directory_entries.get(prefix)
if entry is not None and entry.issym():
entry = self._follow_symlink(entry)
return self._resolve(join(self._decode(entry.name), suffix))
return None

def __repr__(self):
# type: () -> Text
return "ReadTarFS({!r})".format(self._file)
Expand Down Expand Up @@ -329,27 +390,34 @@ def getinfo(self, path, namespaces=None):
namespaces = namespaces or ()
raw_info = {} # type: Dict[Text, Dict[Text, object]]

# special case for root
if not _path:
raw_info["basic"] = {"name": "", "is_dir": True}
if "details" in namespaces:
raw_info["details"] = {"type": int(ResourceType.directory)}

else:
try:
implicit = False
member = self._directory_entries[_path]
except KeyError:
if not self.isdir(_path):
raise errors.ResourceNotFound(path)
implicit = True
member = tarfile.TarInfo(_path)
member.type = tarfile.DIRTYPE
_realpath = self._resolve(_path)
if _realpath is None:
raise errors.ResourceNotFound(path)

implicit = False
member = self._directory_entries[_realpath]

raw_info["basic"] = {
"name": basename(self._decode(member.name)),
"is_dir": member.isdir(),
"is_dir": self.isdir(_path), # is_dir should follow symlinks
}

if "link" in namespaces:
if member.issym():
target = normpath(join(
dirname(self._decode(member.name)),
self._decode(member.linkname),
)) # type: Optional[Text]
else:
target = None
raw_info["link"] = {"target": target}
if "details" in namespaces:
raw_info["details"] = {
"size": member.size,
Expand Down Expand Up @@ -379,16 +447,29 @@ def getinfo(self, path, namespaces=None):

def isdir(self, path):
_path = relpath(self.validatepath(path))
try:
return self._directory_entries[_path].isdir()
except KeyError:
return any(isbase(_path, name) for name in self._directory_entries)
realpath = self._resolve(_path)
if realpath is not None:
entry = self._follow_symlink(self._directory_entries[realpath])
return False if entry is None else entry.isdir()
else:
return False

def isfile(self, path):
_path = relpath(self.validatepath(path))
try:
return self._directory_entries[_path].isfile()
except KeyError:
realpath = self._resolve(_path)
if realpath is not None:
entry = self._follow_symlink(self._directory_entries[realpath])
return False if entry is None else entry.isfile()
else:
return False

def islink(self, path):
_path = relpath(self.validatepath(path))
realpath = self._resolve(_path)
if realpath is not None:
entry = self._directory_entries[realpath]
return entry.issym()
else:
return False

def setinfo(self, path, info):
Expand All @@ -400,13 +481,28 @@ def listdir(self, path):
# type: (Text) -> List[Text]
_path = relpath(self.validatepath(path))

if not self.gettype(path) is ResourceType.directory:
raise errors.DirectoryExpected(path)
# check the given path exists
realpath = self._resolve(_path)
if realpath is None:
raise errors.ResourceNotFound(path)
elif realpath:
target = self._follow_symlink(self._directory_entries[realpath])
# check the path is either a symlink mapping to a directory or a directory
if target is None:
raise errors.ResourceNotFound(path)
elif not target.isdir():
raise errors.DirectoryExpected(path)
else:
base = target.name
else:
base = ""

# find all entries in the actual directory
children = (
frombase(_path, n) for n in self._directory_entries if isbase(_path, n)
frombase(base, n) for n in self._directory_entries if isbase(base, n)
)
content = (parts(child)[1] for child in children if relpath(child))

return list(OrderedDict.fromkeys(content))

def makedir(
Expand All @@ -423,19 +519,27 @@ def openbin(self, path, mode="r", buffering=-1, **options):
# type: (Text, Text, int, **Any) -> BinaryIO
_path = relpath(self.validatepath(path))

# check the requested mode is only a reading mode
if "w" in mode or "+" in mode or "a" in mode:
raise errors.ResourceReadOnly(path)

try:
member = self._directory_entries[_path]
except KeyError:
six.raise_from(errors.ResourceNotFound(path), None)
# check the path actually resolves after following symlink components
_realpath = self._resolve(_path)
if _realpath is None:
raise errors.ResourceNotFound(path)

if not member.isfile():
raise errors.FileExpected(path)
# get the entry at the resolved path and follow all symlinks
entry = self._follow_symlink(self._directory_entries[_realpath])
if entry is None:
raise errors.ResourceNotFound(path)

rw = RawWrapper(cast(IO, self._tar.extractfile(member)))
# TarFile.extractfile returns None if the entry is not a file
# neither a file nor a symlink
reader = self._tar.extractfile(self._directory_entries[_realpath])
if reader is None:
raise errors.FileExpected(path)

rw = RawWrapper(reader)
if six.PY2: # Patch nonexistent file.flush in Python2

def _flush():
Expand Down
Loading