Skip to content

Commit 6d3436e

Browse files
committed
Make TestStore async writes actually async, with manual complete
`TestStore` recently got the ability to make async writes, but wasn't a very useful test as all writes actually completed immediately. Instead, here, we make the writes actually-async, forcing the test to mark writes complete as required.
1 parent dd28fa5 commit 6d3436e

File tree

1 file changed

+111
-21
lines changed

1 file changed

+111
-21
lines changed

lightning/src/util/test_utils.rs

Lines changed: 111 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ use core::future::Future;
8989
use core::mem;
9090
use core::pin::Pin;
9191
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
92+
use core::task::{Context, Poll, Waker};
9293
use core::time::Duration;
9394

9495
use bitcoin::psbt::Psbt;
@@ -856,15 +857,80 @@ impl<Signer: sign::ecdsa::EcdsaChannelSigner> Persist<Signer> for TestPersister
856857
}
857858
}
858859

860+
type SPSCKVChannelState = Arc<Mutex<(Option<Result<(), io::Error>>, Option<Waker>)>>;
861+
struct SPSCKVChannel(SPSCKVChannelState);
862+
impl Future for SPSCKVChannel {
863+
type Output = Result<(), io::Error>;
864+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
865+
let mut state = self.0.lock().unwrap();
866+
state.0.take().map(|res| Poll::Ready(res)).unwrap_or_else(|| {
867+
state.1 = Some(cx.waker().clone());
868+
Poll::Pending
869+
})
870+
}
871+
}
872+
859873
pub struct TestStore {
874+
pending_async_writes: Mutex<HashMap<String, Vec<(usize, SPSCKVChannelState, Vec<u8>)>>>,
860875
persisted_bytes: Mutex<HashMap<String, HashMap<String, Vec<u8>>>>,
861876
read_only: bool,
862877
}
863878

864879
impl TestStore {
865880
pub fn new(read_only: bool) -> Self {
881+
let pending_async_writes = Mutex::new(new_hash_map());
866882
let persisted_bytes = Mutex::new(new_hash_map());
867-
Self { persisted_bytes, read_only }
883+
Self { pending_async_writes, persisted_bytes, read_only }
884+
}
885+
886+
pub fn list_pending_async_writes(
887+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
888+
) -> Vec<usize> {
889+
let key = format!("{primary_namespace}/{secondary_namespace}/{key}");
890+
let writes_lock = self.pending_async_writes.lock().unwrap();
891+
writes_lock
892+
.get(&key)
893+
.map(|v| v.iter().map(|(id, _, _)| *id).collect())
894+
.unwrap_or(Vec::new())
895+
}
896+
897+
pub fn complete_async_writes_through(
898+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, write_id: usize,
899+
) {
900+
let prefix = format!("{primary_namespace}/{secondary_namespace}");
901+
let key = format!("{primary_namespace}/{secondary_namespace}/{key}");
902+
903+
let mut persisted_lock = self.persisted_bytes.lock().unwrap();
904+
let mut writes_lock = self.pending_async_writes.lock().unwrap();
905+
906+
let pending_writes = writes_lock.get_mut(&key).unwrap();
907+
pending_writes.retain(|(id, res, data)| {
908+
if *id <= write_id {
909+
let namespace = persisted_lock.entry(prefix.clone()).or_insert(new_hash_map());
910+
*namespace.entry(key.to_string()).or_default() = data.clone();
911+
let mut future_state = res.lock().unwrap();
912+
future_state.0 = Some(Ok(()));
913+
if let Some(waker) = future_state.1.take() {
914+
waker.wake();
915+
}
916+
false
917+
} else {
918+
true
919+
}
920+
});
921+
}
922+
923+
pub fn complete_all_async_writes(&self) {
924+
let pending_writes: Vec<String> =
925+
self.pending_async_writes.lock().unwrap().keys().cloned().collect();
926+
for key in pending_writes {
927+
let mut levels = key.split("/");
928+
let primary = levels.next().unwrap();
929+
let secondary = levels.next().unwrap();
930+
let key = levels.next().unwrap();
931+
assert!(levels.next().is_none());
932+
self.complete_async_writes_through(primary, secondary, key, usize::MAX);
933+
}
868934
}
869935

870936
fn read_internal(
@@ -885,23 +951,6 @@ impl TestStore {
885951
}
886952
}
887953

888-
fn write_internal(
889-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
890-
) -> io::Result<()> {
891-
if self.read_only {
892-
return Err(io::Error::new(
893-
io::ErrorKind::PermissionDenied,
894-
"Cannot modify read-only store",
895-
));
896-
}
897-
let mut persisted_lock = self.persisted_bytes.lock().unwrap();
898-
899-
let prefixed = format!("{primary_namespace}/{secondary_namespace}");
900-
let outer_e = persisted_lock.entry(prefixed).or_insert(new_hash_map());
901-
outer_e.insert(key.to_string(), buf);
902-
Ok(())
903-
}
904-
905954
fn remove_internal(
906955
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
907956
) -> io::Result<()> {
@@ -913,12 +962,23 @@ impl TestStore {
913962
}
914963

915964
let mut persisted_lock = self.persisted_bytes.lock().unwrap();
965+
let mut async_writes_lock = self.pending_async_writes.lock().unwrap();
916966

917967
let prefixed = format!("{primary_namespace}/{secondary_namespace}");
918968
if let Some(outer_ref) = persisted_lock.get_mut(&prefixed) {
919969
outer_ref.remove(&key.to_string());
920970
}
921971

972+
if let Some(pending_writes) = async_writes_lock.remove(&format!("{prefixed}/{key}")) {
973+
for (_, future, _) in pending_writes {
974+
let mut future_lock = future.lock().unwrap();
975+
future_lock.0 = Some(Ok(()));
976+
if let Some(waker) = future_lock.1.take() {
977+
waker.wake();
978+
}
979+
}
980+
}
981+
922982
Ok(())
923983
}
924984

@@ -945,8 +1005,15 @@ impl KVStore for TestStore {
9451005
fn write(
9461006
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
9471007
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> {
948-
let res = self.write_internal(&primary_namespace, &secondary_namespace, &key, buf);
949-
Box::pin(async move { res })
1008+
let path = format!("{primary_namespace}/{secondary_namespace}/{key}");
1009+
let future = Arc::new(Mutex::new((None, None)));
1010+
1011+
let mut async_writes_lock = self.pending_async_writes.lock().unwrap();
1012+
let pending_writes = async_writes_lock.entry(path).or_insert(Vec::new());
1013+
let new_id = pending_writes.last().map(|(id, _, _)| id + 1).unwrap_or(0);
1014+
pending_writes.push((new_id, Arc::clone(&future), buf));
1015+
1016+
Box::pin(SPSCKVChannel(future))
9501017
}
9511018
fn remove(
9521019
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
@@ -972,7 +1039,30 @@ impl KVStoreSync for TestStore {
9721039
fn write(
9731040
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
9741041
) -> io::Result<()> {
975-
self.write_internal(primary_namespace, secondary_namespace, key, buf)
1042+
if self.read_only {
1043+
return Err(io::Error::new(
1044+
io::ErrorKind::PermissionDenied,
1045+
"Cannot modify read-only store",
1046+
));
1047+
}
1048+
let mut persisted_lock = self.persisted_bytes.lock().unwrap();
1049+
let mut async_writes_lock = self.pending_async_writes.lock().unwrap();
1050+
1051+
let prefixed = format!("{primary_namespace}/{secondary_namespace}");
1052+
let async_writes_pending = async_writes_lock.remove(&format!("{prefixed}/{key}"));
1053+
let outer_e = persisted_lock.entry(prefixed).or_insert(new_hash_map());
1054+
outer_e.insert(key.to_string(), buf);
1055+
1056+
if let Some(pending_writes) = async_writes_pending {
1057+
for (_, future, _) in pending_writes {
1058+
let mut future_lock = future.lock().unwrap();
1059+
future_lock.0 = Some(Ok(()));
1060+
if let Some(waker) = future_lock.1.take() {
1061+
waker.wake();
1062+
}
1063+
}
1064+
}
1065+
Ok(())
9761066
}
9771067

9781068
fn remove(

0 commit comments

Comments
 (0)