Skip to content

async zarr.create_array fails second call through async.run() #2878

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
rpitre-wetdog opened this issue Mar 1, 2025 · 4 comments
Open

async zarr.create_array fails second call through async.run() #2878

rpitre-wetdog opened this issue Mar 1, 2025 · 4 comments
Labels
bug Potential issues with the zarr-python library

Comments

@rpitre-wetdog
Copy link

Zarr version

3.0.0

Numcodecs version

0.15.0

Python Version

3.12.8

Operating System

MacOs Sonoma 14.2.1

Installation

using conda

Description

I have an async function that uses async zarr create_array to create an array in a S3 bucket using FspecStore. I call this function from a synchronous function using async.run(). The first time it works without problem. The second time create_array fails with this this error:
Error: An HTTP Client raised an unhandled exception: Event loop is closed

It seems that some ressource is still link to the loop created by the first call to async.run.
If you create your own loop and re-use it to run the async code, it works

Steps to reproduce

To run you need to set {you_own_bucket}to a S3 bucket

import asyncio
import numpy as np
import s3fs
from zarr.api.asynchronous import create_array
from zarr.storage import FsspecStore

_loop = None
def run_async_function(async_func, *args, **kwargs):
    global _loop
    if _loop is None or _loop.is_closed():
        _loop = asyncio.new_event_loop()
    asyncio.set_event_loop(_loop)

    return _loop.run_until_complete(async_func(*args, **kwargs))


async def test_zarr_async():
    s3 = s3fs.S3FileSystem(asynchronous=True)
    s3_path = "s3://{your_own_bucket}/test.zarr"
    store = FsspecStore(fs=s3, path= s3_path[4:]) 
    await create_array(store, shape=(10, 10), dtype=np.float64, chunks=(5, 5), overwrite=True)
    store.close()

# First time works
asyncio.run(test_zarr_async())
# second time fails
try:
    asyncio.run(test_zarr_async())
except Exception as e:
    print(f"Error: {e}")

# running this instead of async.run() works, because we use the same loop
# on both call
'''
run_async_function(test_zarr_async)
run_async_function(test_zarr_async)
'''

Additional output

No response

@rpitre-wetdog rpitre-wetdog added the bug Potential issues with the zarr-python library label Mar 1, 2025
@jhamman
Copy link
Member

jhamman commented Mar 1, 2025

Hi @rpitre-wetdog - thanks for the report. I was able to reproduce this with a few minor tweaks. In general, I would say that for best results (performance, reliability, etc), I would use just one loop per program. Your work around seems to be directionally aligned.

I'll note that I can't reproduce the error with any other stores (MemoryStore or LocalStore) so this seems to be confined to Fsspec. Presumably this has to do with a client cache managed in fsspec or s3fs. @martindurant will probably know.

The full traceback is here:

Traceback (most recent call last):
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiohttp/client.py", line 606, in _request
    resp = await req.send(conn)
           ^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiohttp/client_reqrep.py", line 725, in send
    self._writer = self.loop.create_task(self.write_bytes(writer, conn))
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/asyncio/base_events.py", line 456, in create_task
    self._check_closed()
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/asyncio/base_events.py", line 541, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/httpsession.py", line 222, in send
    response = await session.request(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiohttp/client.py", line 613, in _request
    conn.close()
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiohttp/connector.py", line 166, in close
    self._connector._release(self._key, self._protocol, should_close=True)
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiohttp/connector.py", line 667, in _release
    protocol.close()
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiohttp/client_proto.py", line 71, in close
    transport.close()
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/asyncio/sslproto.py", line 116, in close
    self._ssl_protocol._start_shutdown()
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/asyncio/sslproto.py", line 624, in _start_shutdown
    self._shutdown_timeout_handle = self._loop.call_later(
                                    ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/asyncio/base_events.py", line 761, in call_later
    timer = self.call_at(self.time() + delay, callback, *args,
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/asyncio/base_events.py", line 774, in call_at
    self._check_closed()
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/asyncio/base_events.py", line 541, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/jhamman/workdir/test.py", line 27, in <module>
    asyncio.run(test_zarr_async())
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/jhamman/workdir/test.py", line 22, in test_zarr_async
    await create_array(store, shape=(10, 10), dtype=np.float64, chunks=(5, 5), overwrite=True)
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/zarr/core/array.py", line 4148, in create_array
    result = await init_array(
             ^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/zarr/core/array.py", line 3918, in init_array
    await store_path.delete_dir()
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/zarr/storage/_common.py", line 161, in delete_dir
    await self.store.delete_dir(self.path)
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/zarr/storage/_fsspec.py", line 301, in delete_dir
    await self.fs._rm(path_to_delete, recursive=True)
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/s3fs/core.py", line 2048, in _rm
    paths = await self._expand_path(path, recursive=recursive)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/fsspec/asyn.py", line 871, in _expand_path
    out = await self._expand_path([path], recursive, maxdepth)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/fsspec/asyn.py", line 894, in _expand_path
    rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/s3fs/core.py", line 859, in _find
    out = await self._lsdir(path, delimiter="", prefix=prefix, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/s3fs/core.py", line 734, in _lsdir
    async for c in self._iterdir(
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/s3fs/core.py", line 784, in _iterdir
    async for i in it:
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/paginate.py", line 30, in __anext__
    response = await self._make_request(current_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/client.py", line 391, in _make_api_call
    http, parsed_response = await self._make_request(
                            ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/client.py", line 419, in _make_request
    return await self._endpoint.make_request(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/endpoint.py", line 102, in _send_request
    while await self._needs_retry(
          ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/endpoint.py", line 264, in _needs_retry
    responses = await self._event_emitter.emit(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/hooks.py", line 66, in _emit
    response = await resolve_awaitable(handler(**kwargs))
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/_helpers.py", line 15, in resolve_awaitable
    return await obj
           ^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/retryhandler.py", line 107, in _call
    if await resolve_awaitable(self._checker(**checker_kwargs)):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/_helpers.py", line 15, in resolve_awaitable
    return await obj
           ^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/retryhandler.py", line 126, in _call
    should_retry = await self._should_retry(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/retryhandler.py", line 152, in _should_retry
    return await resolve_awaitable(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/_helpers.py", line 15, in resolve_awaitable
    return await obj
           ^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/retryhandler.py", line 174, in _call
    checker(attempt_number, response, caught_exception)
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/botocore/retryhandler.py", line 247, in __call__
    return self._check_caught_exception(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/botocore/retryhandler.py", line 416, in _check_caught_exception
    raise caught_exception
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/endpoint.py", line 183, in _do_get_response
    http_response = await self._send(request)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/endpoint.py", line 287, in _send
    return await self.http_session.send(request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jhamman/miniforge3/envs/earthmover-dev-312/lib/python3.12/site-packages/aiobotocore/httpsession.py", line 273, in send
    raise HTTPClientError(error=e)
botocore.exceptions.HTTPClientError: An HTTP Client raised an unhandled exception: Event loop is closed
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x107199280>
sys:1: RuntimeWarning: coroutine 'ClientRequest.write_bytes' was never awaited

</details?

@martindurant
Copy link
Member

Sorry, I forgot to answer here. Details of the fsspec instance caching strategy:
Instances are keyed by the set of kwargs passed in at instantiation, plus thread ID, PID and optional extra arguments (the latter depending on implementation class).

For async implementations, the the loop is an important attribute of the instance. It will be the "current" loop when instantiated in an async context, but the IOloop in fsspec's own thread if not. This is really why the flag asynchronous= exists, to make sure that these instances do not end up clashing in the cache.

In the case described here, I think the instance is being created twice with identical kwargs in an async context, but with different loops. This is not an anticipated circumstance! So, one could

  • explicitly avoid the cache altogether,
  • include the loop= explicitly in creating the filesystem
  • try to find an automatic way that id(loop) is yet another part of the cache key for async instances created in async contexts

@jhamman
Copy link
Member

jhamman commented Mar 15, 2025

try to find an automatic way that id(loop) is yet another part of the cache key for async instances created in async contexts

👍 This seems like the most ideal outcome from Zarr's perspective.

@martindurant
Copy link
Member

Repeating my proposed diff from the other issue:

--- a/fsspec/spec.py
+++ b/fsspec/spec.py
@@ -1,5 +1,6 @@
 from __future__ import annotations

+import asyncio
 import io
 import json
 import logging
@@ -67,8 +68,15 @@ class _Cached(type):
         extra_tokens = tuple(
             getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes
         )
+        if "loop" not in kwargs and cls.async_impl:
+            try:
+                loop = asyncio.get_running_loop()
+            except RuntimeError:
+                loop = None
+        else:
+            loop = None
         token = tokenize(
-            cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs
+            cls, cls._pid, threading.get_ident(), loop, *args, *extra_tokens, **kwargs
         )
         skip = kwargs.pop("skip_instance_cache", False)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Potential issues with the zarr-python library
Projects
None yet
Development

No branches or pull requests

3 participants