Skip to content

Commit c2ce5ea

Browse files
authored
Close files when CachingFileManager is garbage collected (#2595)
* Close files when CachingFileManager is garbage collected Fixes GH2560 This frees users from needing to worry about this. * Minor tweak * Test raising an error in __del__ * restore change * Remove the need for a lock in __del__ * Handle locking ourselves with rasterio * Remove race condition with netCDF4 * refactor optional lock * Fix more possible race conditions * Warn if we can't close in FileManager.__del__ * Fix lock acquisition in CachingFileManager.__del__ * Cleaner fall-back for no dask-distributed * Test tweaks * Test for FileManager.__repr__ * Add reference counting to CachingFileManager * remove unused import * Spelling / reorg
1 parent 9352b3c commit c2ce5ea

File tree

13 files changed

+361
-121
lines changed

13 files changed

+361
-121
lines changed

doc/whats-new.rst

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ Enhancements
5757
- Like :py:class:`pandas.DatetimeIndex`, :py:class:`CFTimeIndex` now supports
5858
"dayofyear" and "dayofweek" accessors (:issue:`2597`). By `Spencer Clark
5959
<https://github.com/spencerkclark>`_.
60+
- The option ``'warn_for_unclosed_files'`` (False by default) has been added to
61+
allow users to enable a warning when files opened by xarray are deallocated
62+
but were not explicitly closed. This is mostly useful for debugging; we
63+
recommend enabling it in your test suites if you use xarray for IO.
64+
By `Stephan Hoyer <https://github.com/shoyer>`_
6065
- Support Dask ``HighLevelGraphs`` by `Matthew Rocklin <https://matthewrocklin.com>`_.
6166
- :py:meth:`DataArray.resample` and :py:meth:`Dataset.resample` now supports the
6267
``loffset`` kwarg just like Pandas.
@@ -68,6 +73,12 @@ Enhancements
6873
Bug fixes
6974
~~~~~~~~~
7075

76+
- Ensure files are automatically closed, if possible, when no longer referenced
77+
by a Python variable (:issue:`2560`).
78+
By `Stephan Hoyer <https://github.com/shoyer>`_
79+
- Fixed possible race conditions when reading/writing to disk in parallel
80+
(:issue:`2595`).
81+
By `Stephan Hoyer <https://github.com/shoyer>`_
7182
- Fix h5netcdf saving scalars with filters or chunks (:issue:`2563`).
7283
By `Martin Raspaud <https://github.com/mraspaud>`_.
7384
- Fix parsing of ``_Unsigned`` attribute set by OPENDAP servers. (:issue:`2583`).

xarray/backends/file_manager.py

Lines changed: 104 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
import contextlib
12
import threading
3+
import warnings
24

35
from ..core import utils
46
from ..core.options import OPTIONS
7+
from .locks import acquire
58
from .lru_cache import LRUCache
69

710

@@ -11,6 +14,8 @@
1114
assert FILE_CACHE.maxsize, 'file cache must be at least size one'
1215

1316

17+
REF_COUNTS = {}
18+
1419
_DEFAULT_MODE = utils.ReprObject('<unused>')
1520

1621

@@ -22,7 +27,7 @@ class FileManager(object):
2227
many open files and transferring them between multiple processes.
2328
"""
2429

25-
def acquire(self):
30+
def acquire(self, needs_lock=True):
2631
"""Acquire the file object from this manager."""
2732
raise NotImplementedError
2833

@@ -62,6 +67,9 @@ class CachingFileManager(FileManager):
6267
def __init__(self, opener, *args, **keywords):
6368
"""Initialize a FileManager.
6469
70+
The cache and ref_counts arguments exist solely to facilitate
71+
dependency injection, and should only be set for tests.
72+
6573
Parameters
6674
----------
6775
opener : callable
@@ -90,13 +98,17 @@ def __init__(self, opener, *args, **keywords):
9098
global variable and contains non-picklable file objects, an
9199
unpickled FileManager objects will be restored with the default
92100
cache.
101+
ref_counts : dict, optional
102+
Optional dict to use for keeping track the number of references to
103+
the same file.
93104
"""
94105
# TODO: replace with real keyword arguments when we drop Python 2
95106
# support
96107
mode = keywords.pop('mode', _DEFAULT_MODE)
97108
kwargs = keywords.pop('kwargs', None)
98109
lock = keywords.pop('lock', None)
99110
cache = keywords.pop('cache', FILE_CACHE)
111+
ref_counts = keywords.pop('ref_counts', REF_COUNTS)
100112
if keywords:
101113
raise TypeError('FileManager() got unexpected keyword arguments: '
102114
'%s' % list(keywords))
@@ -105,34 +117,52 @@ def __init__(self, opener, *args, **keywords):
105117
self._args = args
106118
self._mode = mode
107119
self._kwargs = {} if kwargs is None else dict(kwargs)
120+
108121
self._default_lock = lock is None or lock is False
109122
self._lock = threading.Lock() if self._default_lock else lock
123+
124+
# cache[self._key] stores the file associated with this object.
110125
self._cache = cache
111126
self._key = self._make_key()
112127

128+
# ref_counts[self._key] stores the number of CachingFileManager objects
129+
# in memory referencing this same file. We use this to know if we can
130+
# close a file when the manager is deallocated.
131+
self._ref_counter = _RefCounter(ref_counts)
132+
self._ref_counter.increment(self._key)
133+
113134
def _make_key(self):
114135
"""Make a key for caching files in the LRU cache."""
115136
value = (self._opener,
116137
self._args,
117-
self._mode,
138+
'a' if self._mode == 'w' else self._mode,
118139
tuple(sorted(self._kwargs.items())))
119140
return _HashedSequence(value)
120141

121-
def acquire(self):
142+
@contextlib.contextmanager
143+
def _optional_lock(self, needs_lock):
144+
"""Context manager for optionally acquiring a lock."""
145+
if needs_lock:
146+
with self._lock:
147+
yield
148+
else:
149+
yield
150+
151+
def acquire(self, needs_lock=True):
122152
"""Acquiring a file object from the manager.
123153
124154
A new file is only opened if it has expired from the
125155
least-recently-used cache.
126156
127-
This method uses a reentrant lock, which ensures that it is
128-
thread-safe. You can safely acquire a file in multiple threads at the
129-
same time, as long as the underlying file object is thread-safe.
157+
This method uses a lock, which ensures that it is thread-safe. You can
158+
safely acquire a file in multiple threads at the same time, as long as
159+
the underlying file object is thread-safe.
130160
131161
Returns
132162
-------
133163
An open file object, as returned by ``opener(*args, **kwargs)``.
134164
"""
135-
with self._lock:
165+
with self._optional_lock(needs_lock):
136166
try:
137167
file = self._cache[self._key]
138168
except KeyError:
@@ -144,28 +174,53 @@ def acquire(self):
144174
if self._mode == 'w':
145175
# ensure file doesn't get overriden when opened again
146176
self._mode = 'a'
147-
self._key = self._make_key()
148177
self._cache[self._key] = file
149178
return file
150179

151-
def _close(self):
152-
default = None
153-
file = self._cache.pop(self._key, default)
154-
if file is not None:
155-
file.close()
156-
157180
def close(self, needs_lock=True):
158181
"""Explicitly close any associated file object (if necessary)."""
159182
# TODO: remove needs_lock if/when we have a reentrant lock in
160183
# dask.distributed: https://github.com/dask/dask/issues/3832
161-
if needs_lock:
162-
with self._lock:
163-
self._close()
164-
else:
165-
self._close()
184+
with self._optional_lock(needs_lock):
185+
default = None
186+
file = self._cache.pop(self._key, default)
187+
if file is not None:
188+
file.close()
189+
190+
def __del__(self):
191+
# If we're the only CachingFileManger referencing a unclosed file, we
192+
# should remove it from the cache upon garbage collection.
193+
#
194+
# Keeping our own count of file references might seem like overkill,
195+
# but it's actually pretty common to reopen files with the same
196+
# variable name in a notebook or command line environment, e.g., to
197+
# fix the parameters used when opening a file:
198+
# >>> ds = xarray.open_dataset('myfile.nc')
199+
# >>> ds = xarray.open_dataset('myfile.nc', decode_times=False)
200+
# This second assignment to "ds" drops CPython's ref-count on the first
201+
# "ds" argument to zero, which can trigger garbage collections. So if
202+
# we didn't check whether another object is referencing 'myfile.nc',
203+
# the newly opened file would actually be immediately closed!
204+
ref_count = self._ref_counter.decrement(self._key)
205+
206+
if not ref_count and self._key in self._cache:
207+
if acquire(self._lock, blocking=False):
208+
# Only close files if we can do so immediately.
209+
try:
210+
self.close(needs_lock=False)
211+
finally:
212+
self._lock.release()
213+
214+
if OPTIONS['warn_for_unclosed_files']:
215+
warnings.warn(
216+
'deallocating {}, but file is not already closed. '
217+
'This may indicate a bug.'
218+
.format(self), RuntimeWarning, stacklevel=2)
166219

167220
def __getstate__(self):
168221
"""State for pickling."""
222+
# cache and ref_counts are intentionally omitted: we don't want to try
223+
# to serialize these global objects.
169224
lock = None if self._default_lock else self._lock
170225
return (self._opener, self._args, self._mode, self._kwargs, lock)
171226

@@ -174,6 +229,34 @@ def __setstate__(self, state):
174229
opener, args, mode, kwargs, lock = state
175230
self.__init__(opener, *args, mode=mode, kwargs=kwargs, lock=lock)
176231

232+
def __repr__(self):
233+
args_string = ', '.join(map(repr, self._args))
234+
if self._mode is not _DEFAULT_MODE:
235+
args_string += ', mode={!r}'.format(self._mode)
236+
return '{}({!r}, {}, kwargs={})'.format(
237+
type(self).__name__, self._opener, args_string, self._kwargs)
238+
239+
240+
class _RefCounter(object):
241+
"""Class for keeping track of reference counts."""
242+
def __init__(self, counts):
243+
self._counts = counts
244+
self._lock = threading.Lock()
245+
246+
def increment(self, name):
247+
with self._lock:
248+
count = self._counts[name] = self._counts.get(name, 0) + 1
249+
return count
250+
251+
def decrement(self, name):
252+
with self._lock:
253+
count = self._counts[name] - 1
254+
if count:
255+
self._counts[name] = count
256+
else:
257+
del self._counts[name]
258+
return count
259+
177260

178261
class _HashedSequence(list):
179262
"""Speedup repeated look-ups by caching hash values.
@@ -198,7 +281,8 @@ class DummyFileManager(FileManager):
198281
def __init__(self, value):
199282
self._value = value
200283

201-
def acquire(self):
284+
def acquire(self, needs_lock=True):
285+
del needs_lock # ignored
202286
return self._value
203287

204288
def close(self, needs_lock=True):

xarray/backends/h5netcdf_.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ def _getitem(self, key):
2626
# h5py requires using lists for fancy indexing:
2727
# https://github.com/h5py/h5py/issues/992
2828
key = tuple(list(k) if isinstance(k, np.ndarray) else k for k in key)
29-
array = self.get_array()
3029
with self.datastore.lock:
30+
array = self.get_array(needs_lock=False)
3131
return array[key]
3232

3333

@@ -230,9 +230,6 @@ def prepare_variable(self, name, variable, check_encoding=False,
230230

231231
def sync(self):
232232
self.ds.sync()
233-
# if self.autoclose:
234-
# self.close()
235-
# super(H5NetCDFStore, self).sync(compute=compute)
236233

237234
def close(self, **kwargs):
238235
self._manager.close(**kwargs)

xarray/backends/locks.py

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@
88
# no need to worry about serializing the lock
99
SerializableLock = threading.Lock
1010

11+
try:
12+
from dask.distributed import Lock as DistributedLock
13+
except ImportError:
14+
DistributedLock = None
15+
1116

1217
# Locks used by multiple backends.
1318
# Neither HDF5 nor the netCDF-C library are thread-safe.
@@ -33,16 +38,11 @@ def _get_multiprocessing_lock(key):
3338
return multiprocessing.Lock()
3439

3540

36-
def _get_distributed_lock(key):
37-
from dask.distributed import Lock
38-
return Lock(key)
39-
40-
4141
_LOCK_MAKERS = {
4242
None: _get_threaded_lock,
4343
'threaded': _get_threaded_lock,
4444
'multiprocessing': _get_multiprocessing_lock,
45-
'distributed': _get_distributed_lock,
45+
'distributed': DistributedLock,
4646
}
4747

4848

@@ -113,6 +113,27 @@ def get_write_lock(key):
113113
return lock_maker(key)
114114

115115

116+
def acquire(lock, blocking=True):
117+
"""Acquire a lock, possibly in a non-blocking fashion.
118+
119+
Includes backwards compatibility hacks for old versions of Python, dask
120+
and dask-distributed.
121+
"""
122+
if blocking:
123+
# no arguments needed
124+
return lock.acquire()
125+
elif DistributedLock is not None and isinstance(lock, DistributedLock):
126+
# distributed.Lock doesn't support the blocking argument yet:
127+
# https://github.com/dask/distributed/pull/2412
128+
return lock.acquire(timeout=0)
129+
else:
130+
# "blocking" keyword argument not supported for:
131+
# - threading.Lock on Python 2.
132+
# - dask.SerializableLock with dask v1.0.0 or earlier.
133+
# - multiprocessing.Lock calls the argument "block" instead.
134+
return lock.acquire(blocking)
135+
136+
116137
class CombinedLock(object):
117138
"""A combination of multiple locks.
118139
@@ -123,12 +144,12 @@ class CombinedLock(object):
123144
def __init__(self, locks):
124145
self.locks = tuple(set(locks)) # remove duplicates
125146

126-
def acquire(self, *args):
127-
return all(lock.acquire(*args) for lock in self.locks)
147+
def acquire(self, blocking=True):
148+
return all(acquire(lock, blocking=blocking) for lock in self.locks)
128149

129-
def release(self, *args):
150+
def release(self):
130151
for lock in self.locks:
131-
lock.release(*args)
152+
lock.release()
132153

133154
def __enter__(self):
134155
for lock in self.locks:
@@ -138,7 +159,6 @@ def __exit__(self, *args):
138159
for lock in self.locks:
139160
lock.__exit__(*args)
140161

141-
@property
142162
def locked(self):
143163
return any(lock.locked for lock in self.locks)
144164

@@ -149,10 +169,10 @@ def __repr__(self):
149169
class DummyLock(object):
150170
"""DummyLock provides the lock API without any actual locking."""
151171

152-
def acquire(self, *args):
172+
def acquire(self, blocking=True):
153173
pass
154174

155-
def release(self, *args):
175+
def release(self):
156176
pass
157177

158178
def __enter__(self):
@@ -161,7 +181,6 @@ def __enter__(self):
161181
def __exit__(self, *args):
162182
pass
163183

164-
@property
165184
def locked(self):
166185
return False
167186

0 commit comments

Comments
 (0)