diff --git a/src/zarr/storage/_obstore.py b/src/zarr/storage/_obstore.py index 79afa08d15..4381acb2ae 100644 --- a/src/zarr/storage/_obstore.py +++ b/src/zarr/storage/_obstore.py @@ -106,10 +106,25 @@ async def get( ) return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] elif isinstance(byte_range, SuffixByteRequest): - resp = await obs.get_async( - self.store, key, options={"range": {"suffix": byte_range.suffix}} - ) - return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] + # some object stores (Azure) don't support suffix requests. In this + # case, our workaround is to first get the length of the object and then + # manually request the byte range at the end. + try: + resp = await obs.get_async( + self.store, key, options={"range": {"suffix": byte_range.suffix}} + ) + return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] + except obs.exceptions.NotSupportedError: + head_resp = await obs.head_async(self.store, key) + file_size = head_resp["size"] + suffix_len = byte_range.suffix + buffer = await obs.get_range_async( + self.store, + key, + start=file_size - suffix_len, + length=suffix_len, + ) + return prototype.buffer.from_bytes(buffer) # type: ignore[arg-type] else: raise ValueError(f"Unexpected byte_range, got {byte_range}") except _ALLOWED_EXCEPTIONS: @@ -265,10 +280,29 @@ class _OtherRequest(TypedDict): path: str """The path to request from.""" - range: OffsetRange | SuffixRange | None + range: OffsetRange | None + # Note: suffix requests are handled separately because some object stores (Azure) + # don't support them """The range request type.""" +class _SuffixRequest(TypedDict): + """Offset or suffix range requests. + + These requests cannot be concurrent on the Rust side, and each need their own call + to `obstore.get_async`, passing in the `range` parameter. + """ + + original_request_index: int + """The positional index in the original key_ranges input""" + + path: str + """The path to request from.""" + + range: SuffixRange + """The suffix range.""" + + class _Response(TypedDict): """A response buffer associated with the original index that it should be restored to.""" @@ -317,7 +351,7 @@ async def _make_other_request( prototype: BufferPrototype, semaphore: asyncio.Semaphore, ) -> list[_Response]: - """Make suffix or offset requests. + """Make offset or full-file requests. We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all futures can be gathered together. @@ -339,6 +373,46 @@ async def _make_other_request( ] +async def _make_suffix_request( + store: _UpstreamObjectStore, + request: _SuffixRequest, + prototype: BufferPrototype, + semaphore: asyncio.Semaphore, +) -> list[_Response]: + """Make suffix requests. + + This is separated out from `_make_other_request` because some object stores (Azure) + don't support suffix requests. In this case, our workaround is to first get the + length of the object and then manually request the byte range at the end. + + We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all + futures can be gathered together. + """ + import obstore as obs + + async with semaphore: + try: + resp = await obs.get_async(store, request["path"], options={"range": request["range"]}) + buffer = await resp.bytes_async() + except obs.exceptions.NotSupportedError: + head_resp = await obs.head_async(store, request["path"]) + file_size = head_resp["size"] + suffix_len = request["range"]["suffix"] + buffer = await obs.get_range_async( + store, + request["path"], + start=file_size - suffix_len, + length=suffix_len, + ) + + return [ + { + "original_request_index": request["original_request_index"], + "buffer": prototype.buffer.from_bytes(buffer), # type: ignore[arg-type] + } + ] + + async def _get_partial_values( store: _UpstreamObjectStore, prototype: BufferPrototype, @@ -358,6 +432,7 @@ async def _get_partial_values( key_ranges = list(key_ranges) per_file_bounded_requests: dict[str, list[_BoundedRequest]] = defaultdict(list) other_requests: list[_OtherRequest] = [] + suffix_requests: list[_SuffixRequest] = [] for idx, (path, byte_range) in enumerate(key_ranges): if byte_range is None: @@ -381,7 +456,7 @@ async def _get_partial_values( } ) elif isinstance(byte_range, SuffixByteRequest): - other_requests.append( + suffix_requests.append( { "original_request_index": idx, "path": path, @@ -402,6 +477,9 @@ async def _get_partial_values( for request in other_requests: futs.append(_make_other_request(store, request, prototype, semaphore=semaphore)) # noqa: PERF401 + for suffix_request in suffix_requests: + futs.append(_make_suffix_request(store, suffix_request, prototype, semaphore=semaphore)) # noqa: PERF401 + buffers: list[Buffer | None] = [None] * len(key_ranges) for responses in await asyncio.gather(*futs):