Skip to content

Commit b41648b

Browse files
committed
f: revert to atomic counter again
1 parent 2dbf59c commit b41648b

File tree

1 file changed

+44
-64
lines changed

1 file changed

+44
-64
lines changed

lightning-persister/src/fs_store.rs

Lines changed: 44 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::collections::HashMap;
88
use std::fs;
99
use std::io::{Read, Write};
1010
use std::path::{Path, PathBuf};
11-
use std::sync::atomic::{AtomicUsize, Ordering};
11+
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1212
use std::sync::{Arc, Mutex, RwLock};
1313

1414
#[cfg(feature = "tokio")]
@@ -41,45 +41,55 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
4141
// a consistent view and error out.
4242
const LIST_DIR_CONSISTENCY_RETRIES: usize = 10;
4343

44-
#[derive(Default)]
45-
struct AsyncState {
46-
// Version counter to ensure that writes are applied in the correct order. It is assumed that read, list and remove
47-
// operations aren't sensitive to the order of execution.
48-
latest_version: u64,
49-
50-
// The last version that was written to disk.
51-
latest_written_version: u64,
52-
}
53-
5444
struct FilesystemStoreInner {
5545
data_dir: PathBuf,
5646
tmp_file_counter: AtomicUsize,
5747

5848
// Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the
5949
// latest written version per key.
60-
locks: Mutex<HashMap<PathBuf, Arc<RwLock<AsyncState>>>>,
50+
locks: Mutex<HashMap<PathBuf, Arc<RwLock<u64>>>>,
6151
}
6252

6353
/// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system.
6454
///
6555
/// [`KVStore`]: lightning::util::persist::KVStore
6656
pub struct FilesystemStore {
6757
inner: Arc<FilesystemStoreInner>,
58+
59+
// Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
60+
// operations aren't sensitive to the order of execution.
61+
next_version: AtomicU64,
6862
}
6963

7064
impl FilesystemStore {
7165
/// Constructs a new [`FilesystemStore`].
7266
pub fn new(data_dir: PathBuf) -> Self {
7367
let locks = Mutex::new(HashMap::new());
7468
let tmp_file_counter = AtomicUsize::new(0);
75-
Self { inner: Arc::new(FilesystemStoreInner { data_dir, tmp_file_counter, locks }) }
69+
Self {
70+
inner: Arc::new(FilesystemStoreInner { data_dir, tmp_file_counter, locks }),
71+
next_version: AtomicU64::new(1),
72+
}
7673
}
7774

7875
/// Returns the data directory.
7976
pub fn get_data_dir(&self) -> PathBuf {
8077
self.inner.data_dir.clone()
8178
}
8279

80+
fn get_new_version_and_lock_ref(&self, dest_file_path: PathBuf) -> (Arc<RwLock<u64>>, u64) {
81+
let version = self.next_version.fetch_add(1, Ordering::Relaxed);
82+
if version == u64::MAX {
83+
panic!("FilesystemStore version counter overflowed");
84+
}
85+
86+
// Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for
87+
// cleaning up unused locks.
88+
let inner_lock_ref = self.inner.get_inner_lock_ref(dest_file_path);
89+
90+
(inner_lock_ref, version)
91+
}
92+
8393
#[cfg(any(all(feature = "tokio", test), fuzzing))]
8494
/// Returns the size of the async state.
8595
pub fn state_size(&self) -> usize {
@@ -110,8 +120,8 @@ impl KVStoreSync for FilesystemStore {
110120
Some(key),
111121
"write",
112122
)?;
113-
let inner_lock_ref = self.inner.get_inner_lock_ref(path.clone());
114-
self.inner.write_version(inner_lock_ref, path, buf, None)
123+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone());
124+
self.inner.write_version(inner_lock_ref, path, buf, version)
115125
}
116126

117127
fn remove(
@@ -123,8 +133,8 @@ impl KVStoreSync for FilesystemStore {
123133
Some(key),
124134
"remove",
125135
)?;
126-
let inner_lock_ref = self.inner.get_inner_lock_ref(path.clone());
127-
self.inner.remove_version(inner_lock_ref, path, lazy, None)
136+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone());
137+
self.inner.remove_version(inner_lock_ref, path, lazy, version)
128138
}
129139

130140
fn list(
@@ -141,7 +151,7 @@ impl KVStoreSync for FilesystemStore {
141151
}
142152

143153
impl FilesystemStoreInner {
144-
fn get_inner_lock_ref(&self, path: PathBuf) -> Arc<RwLock<AsyncState>> {
154+
fn get_inner_lock_ref(&self, path: PathBuf) -> Arc<RwLock<u64>> {
145155
let mut outer_lock = self.locks.lock().unwrap();
146156
Arc::clone(&outer_lock.entry(path).or_default())
147157
}
@@ -184,34 +194,10 @@ impl FilesystemStoreInner {
184194
Ok(dest_file_path)
185195
}
186196

187-
#[cfg(feature = "tokio")]
188-
fn get_new_version_and_state(&self, dest_file_path: PathBuf) -> (u64, Arc<RwLock<AsyncState>>) {
189-
let inner_lock_ref: Arc<RwLock<AsyncState>> = self.get_inner_lock_ref(dest_file_path);
190-
191-
let new_version = {
192-
let mut async_state = inner_lock_ref.write().unwrap();
193-
Self::get_new_version(&mut async_state)
194-
};
195-
196-
return (new_version, inner_lock_ref);
197-
}
198-
199-
fn get_new_version(async_state: &mut AsyncState) -> u64 {
200-
async_state.latest_version += 1;
201-
if async_state.latest_version == 0 {
202-
panic!("FilesystemStore version counter overflowed");
203-
}
204-
205-
debug_assert!(async_state.latest_version > async_state.latest_written_version);
206-
207-
async_state.latest_version
208-
}
209-
210197
fn read(&self, dest_file_path: PathBuf) -> lightning::io::Result<Vec<u8>> {
211198
let mut buf = Vec::new();
212199

213-
let inner_lock_ref = self.get_inner_lock_ref(dest_file_path.clone());
214-
self.execute_locked_read(inner_lock_ref, dest_file_path.clone(), || {
200+
self.execute_locked_read(dest_file_path.clone(), || {
215201
let mut f = fs::File::open(dest_file_path.clone())?;
216202
f.read_to_end(&mut buf)?;
217203
Ok(())
@@ -221,24 +207,20 @@ impl FilesystemStoreInner {
221207
}
222208

223209
fn execute_locked_write<F: FnOnce() -> Result<(), lightning::io::Error>>(
224-
&self, inner_lock_ref: Arc<RwLock<AsyncState>>, dest_file_path: PathBuf,
225-
version: Option<u64>, callback: F,
210+
&self, inner_lock_ref: Arc<RwLock<u64>>, dest_file_path: PathBuf, version: u64, callback: F,
226211
) -> Result<(), lightning::io::Error> {
227-
let mut async_state = inner_lock_ref.write().unwrap();
228-
229-
// Sync calls haven't assigned a version yet because it would require another lock acquisition.
230-
let version = version.unwrap_or_else(|| Self::get_new_version(&mut async_state));
212+
let mut last_written_version = inner_lock_ref.write().unwrap();
231213

232214
// Check if we already have a newer version written/removed. This is used in async contexts to realize eventual
233215
// consistency.
234-
let is_stale_version = version <= async_state.latest_written_version;
216+
let is_stale_version = version <= *last_written_version;
235217

236218
// If the version is not stale, we execute the callback. Otherwise we can and must skip writing.
237219
let res = if is_stale_version {
238220
Ok(())
239221
} else {
240222
callback().map(|_| {
241-
async_state.latest_written_version = version;
223+
*last_written_version = version;
242224
})
243225
};
244226

@@ -248,15 +230,16 @@ impl FilesystemStoreInner {
248230
}
249231

250232
fn execute_locked_read<F: FnOnce() -> Result<(), lightning::io::Error>>(
251-
&self, inner_lock_ref: Arc<RwLock<AsyncState>>, dest_file_path: PathBuf, callback: F,
233+
&self, dest_file_path: PathBuf, callback: F,
252234
) -> Result<(), lightning::io::Error> {
253-
let _async_state = inner_lock_ref.read().unwrap();
235+
let inner_lock_ref = self.get_inner_lock_ref(dest_file_path.clone());
236+
let _guard = inner_lock_ref.read().unwrap();
254237
let res = callback();
255238
self.clean_locks(&inner_lock_ref, dest_file_path);
256239
res
257240
}
258241

259-
fn clean_locks(&self, inner_lock_ref: &Arc<RwLock<AsyncState>>, dest_file_path: PathBuf) {
242+
fn clean_locks(&self, inner_lock_ref: &Arc<RwLock<u64>>, dest_file_path: PathBuf) {
260243
// If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry
261244
// to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in
262245
// inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already
@@ -270,8 +253,8 @@ impl FilesystemStoreInner {
270253
/// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function
271254
/// returns early without writing.
272255
fn write_version(
273-
&self, inner_lock_ref: Arc<RwLock<AsyncState>>, dest_file_path: PathBuf, buf: Vec<u8>,
274-
version: Option<u64>,
256+
&self, inner_lock_ref: Arc<RwLock<u64>>, dest_file_path: PathBuf, buf: Vec<u8>,
257+
version: u64,
275258
) -> lightning::io::Result<()> {
276259
let parent_directory = dest_file_path.parent().ok_or_else(|| {
277260
let msg =
@@ -343,8 +326,7 @@ impl FilesystemStoreInner {
343326
}
344327

345328
fn remove_version(
346-
&self, inner_lock_ref: Arc<RwLock<AsyncState>>, dest_file_path: PathBuf, lazy: bool,
347-
version: Option<u64>,
329+
&self, inner_lock_ref: Arc<RwLock<u64>>, dest_file_path: PathBuf, lazy: bool, version: u64,
348330
) -> lightning::io::Result<()> {
349331
self.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || {
350332
if !dest_file_path.is_file() {
@@ -507,11 +489,10 @@ impl KVStore for FilesystemStore {
507489
Err(e) => return Box::pin(async move { Err(e) }),
508490
};
509491

510-
// Obtain a version number to retain the call sequence.
511-
let (version, inner_lock_ref) = self.inner.get_new_version_and_state(path.clone());
492+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone());
512493
Box::pin(async move {
513494
tokio::task::spawn_blocking(move || {
514-
this.write_version(inner_lock_ref, path, buf, Some(version))
495+
this.write_version(inner_lock_ref, path, buf, version)
515496
})
516497
.await
517498
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
@@ -532,11 +513,10 @@ impl KVStore for FilesystemStore {
532513
Err(e) => return Box::pin(async move { Err(e) }),
533514
};
534515

535-
// Obtain a version number to retain the call sequence.
536-
let (version, inner_lock_ref) = self.inner.get_new_version_and_state(path.clone());
516+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone());
537517
Box::pin(async move {
538518
tokio::task::spawn_blocking(move || {
539-
this.remove_version(inner_lock_ref, path, lazy, Some(version))
519+
this.remove_version(inner_lock_ref, path, lazy, version)
540520
})
541521
.await
542522
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))

0 commit comments

Comments
 (0)