Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions fsspec/implementations/cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from fsspec.exceptions import BlocksizeMismatchError
from fsspec.implementations.cache_mapper import create_cache_mapper
from fsspec.implementations.cache_metadata import CacheMetadata
from fsspec.implementations.local import LocalFileSystem
from fsspec.spec import AbstractBufferedFile
from fsspec.transaction import Transaction
from fsspec.utils import infer_compression
Expand Down Expand Up @@ -433,7 +434,9 @@ def __getattribute__(self, item):
"open",
"cat",
"cat_file",
"_cat_file",
"cat_ranges",
"_cat_ranges",
"get",
"read_block",
"tail",
Expand Down Expand Up @@ -835,14 +838,55 @@ def pipe(self, path, value=None, **kwargs):
else:
raise ValueError("path must be str or dict")

async def _cat_file(self, path, start=None, end=None, **kwargs):
logger.debug("async cat_file %s", path)
path = self._strip_protocol(path)
sha = self._mapper(path)
fn = self._check_file(path)

if not fn:
fn = os.path.join(self.storage[-1], sha)
await self.fs._get_file(path, fn, **kwargs)

with open(fn, "rb") as f: # noqa ASYNC230
if start:
f.seek(start)
size = -1 if end is None else end - f.tell()
return f.read(size)

async def _cat_ranges(
self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
):
logger.debug("async cat ranges %s", paths)
lpaths = []
rset = set()
download = []
rpaths = []
for p in paths:
fn = self._check_file(p)
if fn is None and p not in rset:
sha = self._mapper(p)
fn = os.path.join(self.storage[-1], sha)
download.append(fn)
rset.add(p)
rpaths.append(p)
lpaths.append(fn)
if download:
await self.fs._get(rpaths, download, on_error=on_error)

return LocalFileSystem().cat_ranges(
lpaths, starts, ends, max_gap=max_gap, on_error=on_error, **kwargs
)

def cat_ranges(
self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
):
logger.debug("cat ranges %s", paths)
lpaths = [self._check_file(p) for p in paths]
rpaths = [p for l, p in zip(lpaths, paths) if l is False]
lpaths = [l for l, p in zip(lpaths, paths) if l is False]
self.fs.get(rpaths, lpaths)
return super().cat_ranges(
return LocalFileSystem().cat_ranges(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this need to be changed? Switching this back fixes the issue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original was pulling the bytes from remote, which is exactly what the cache filesystem is supposed to prevent, and what this PR was for. Are the file(s) really not in the local store?

However, I can see that this code is wrong. Can you try adding the following line before the one highlighted:

paths = [self._check_file(p) for p in paths]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, that fixes it!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

paths, starts, ends, max_gap=max_gap, on_error=on_error, **kwargs
)

Expand Down Expand Up @@ -940,7 +984,7 @@ def discard(self):

def commit(self):
self.fs.put(self.fn, self.path, **self.kwargs)
# we do not delete local copy - it's still in the cache
# we do not delete the local copy, it's still in the cache.

@property
def name(self):
Expand Down
6 changes: 3 additions & 3 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ def __init__(
https://docs.aiohttp.org/en/stable/client_reference.html
For example, ``{'auth': aiohttp.BasicAuth('user', 'pass')}``
get_client: Callable[..., aiohttp.ClientSession]
A callable which takes keyword arguments and constructs
an aiohttp.ClientSession. It's state will be managed by
A callable, which takes keyword arguments and constructs
an aiohttp.ClientSession. Its state will be managed by
the HTTPFileSystem class.
storage_options: key-value
Any other parameters passed on to requests
cache_type, cache_options: defaults used in open
cache_type, cache_options: defaults used in open()
"""
super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options)
self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE
Expand Down
Loading