Skip to content

fix: Special-case suffix requests in obstore backend to support Azure #2994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 85 additions & 7 deletions src/zarr/storage/_obstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,25 @@
)
return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
elif isinstance(byte_range, SuffixByteRequest):
resp = await obs.get_async(
self.store, key, options={"range": {"suffix": byte_range.suffix}}
)
return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
# some object stores (Azure) don't support suffix requests. In this
# case, our workaround is to first get the length of the object and then
# manually request the byte range at the end.
try:
resp = await obs.get_async(
self.store, key, options={"range": {"suffix": byte_range.suffix}}
)
return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
except obs.exceptions.NotSupportedError:
head_resp = await obs.head_async(self.store, key)
file_size = head_resp["size"]
suffix_len = byte_range.suffix
buffer = await obs.get_range_async(

Check warning on line 121 in src/zarr/storage/_obstore.py

View check run for this annotation

Codecov / codecov/patch

src/zarr/storage/_obstore.py#L117-L121

Added lines #L117 - L121 were not covered by tests
self.store,
key,
start=file_size - suffix_len,
length=suffix_len,
)
return prototype.buffer.from_bytes(buffer) # type: ignore[arg-type]

Check warning on line 127 in src/zarr/storage/_obstore.py

View check run for this annotation

Codecov / codecov/patch

src/zarr/storage/_obstore.py#L127

Added line #L127 was not covered by tests
else:
raise ValueError(f"Unexpected byte_range, got {byte_range}")
except _ALLOWED_EXCEPTIONS:
Expand Down Expand Up @@ -265,10 +280,29 @@
path: str
"""The path to request from."""

range: OffsetRange | SuffixRange | None
range: OffsetRange | None
# Note: suffix requests are handled separately because some object stores (Azure)
# don't support them
"""The range request type."""


class _SuffixRequest(TypedDict):
"""Offset or suffix range requests.

These requests cannot be concurrent on the Rust side, and each need their own call
to `obstore.get_async`, passing in the `range` parameter.
"""

original_request_index: int
"""The positional index in the original key_ranges input"""

path: str
"""The path to request from."""

range: SuffixRange
"""The suffix range."""


class _Response(TypedDict):
"""A response buffer associated with the original index that it should be restored to."""

Expand Down Expand Up @@ -317,7 +351,7 @@
prototype: BufferPrototype,
semaphore: asyncio.Semaphore,
) -> list[_Response]:
"""Make suffix or offset requests.
"""Make offset or full-file requests.

We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all
futures can be gathered together.
Expand All @@ -339,6 +373,46 @@
]


async def _make_suffix_request(
store: _UpstreamObjectStore,
request: _SuffixRequest,
prototype: BufferPrototype,
semaphore: asyncio.Semaphore,
) -> list[_Response]:
"""Make suffix requests.

This is separated out from `_make_other_request` because some object stores (Azure)
don't support suffix requests. In this case, our workaround is to first get the
length of the object and then manually request the byte range at the end.

We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all
futures can be gathered together.
"""
import obstore as obs

async with semaphore:
try:
resp = await obs.get_async(store, request["path"], options={"range": request["range"]})
buffer = await resp.bytes_async()
except obs.exceptions.NotSupportedError:
head_resp = await obs.head_async(store, request["path"])
file_size = head_resp["size"]
suffix_len = request["range"]["suffix"]
buffer = await obs.get_range_async(

Check warning on line 401 in src/zarr/storage/_obstore.py

View check run for this annotation

Codecov / codecov/patch

src/zarr/storage/_obstore.py#L397-L401

Added lines #L397 - L401 were not covered by tests
store,
request["path"],
start=file_size - suffix_len,
length=suffix_len,
)

return [
{
"original_request_index": request["original_request_index"],
"buffer": prototype.buffer.from_bytes(buffer), # type: ignore[arg-type]
}
]


async def _get_partial_values(
store: _UpstreamObjectStore,
prototype: BufferPrototype,
Expand All @@ -358,6 +432,7 @@
key_ranges = list(key_ranges)
per_file_bounded_requests: dict[str, list[_BoundedRequest]] = defaultdict(list)
other_requests: list[_OtherRequest] = []
suffix_requests: list[_SuffixRequest] = []

for idx, (path, byte_range) in enumerate(key_ranges):
if byte_range is None:
Expand All @@ -381,7 +456,7 @@
}
)
elif isinstance(byte_range, SuffixByteRequest):
other_requests.append(
suffix_requests.append(
{
"original_request_index": idx,
"path": path,
Expand All @@ -402,6 +477,9 @@
for request in other_requests:
futs.append(_make_other_request(store, request, prototype, semaphore=semaphore)) # noqa: PERF401

for suffix_request in suffix_requests:
futs.append(_make_suffix_request(store, suffix_request, prototype, semaphore=semaphore)) # noqa: PERF401

buffers: list[Buffer | None] = [None] * len(key_ranges)

for responses in await asyncio.gather(*futs):
Expand Down