From 6d321fd110853a6be3039d0b74f10c6c97e4e41c Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 12 Sep 2025 09:58:56 +0200 Subject: [PATCH 01/12] Move current VSS `KVStoreSync` logic to `_internal` methods .. first step to make review easier. --- src/io/vss_store.rs | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index a03aafc44..64143dd42 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -126,10 +126,8 @@ impl VssStore { } Ok(keys) } -} -impl KVStoreSync for VssStore { - fn read( + fn read_internal( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result> { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; @@ -160,7 +158,7 @@ impl KVStoreSync for VssStore { Ok(self.storable_builder.deconstruct(storable)?.0) } - fn write( + fn write_internal( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> io::Result<()> { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; @@ -188,7 +186,7 @@ impl KVStoreSync for VssStore { Ok(()) } - fn remove( + fn remove_internal( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, ) -> io::Result<()> { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; @@ -211,7 +209,9 @@ impl KVStoreSync for VssStore { Ok(()) } - fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + fn list_internal( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; let keys = self @@ -229,6 +229,30 @@ impl KVStoreSync for VssStore { } } +impl KVStoreSync for VssStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + self.read_internal(primary_namespace, secondary_namespace, key) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + self.write_internal(primary_namespace, secondary_namespace, key, buf) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + self.remove_internal(primary_namespace, secondary_namespace, key, lazy) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + self.list_internal(primary_namespace, secondary_namespace) + } +} + fn derive_data_encryption_and_obfuscation_keys(vss_seed: &[u8; 32]) -> ([u8; 32], [u8; 32]) { let hkdf = |initial_key_material: &[u8], salt: &[u8]| -> [u8; 32] { let mut engine = HmacEngine::::new(salt); From 8c7234cc5578ef97a8c93cd6d0daae4eeae95073 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 12 Sep 2025 10:03:30 +0200 Subject: [PATCH 02/12] Make VSS internal methods `async`, move `block_on` to `impl KVStoreSync` .. as we're gonna reuse the `async` `_internal` methods shortly. --- src/io/vss_store.rs | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 64143dd42..02cb54e78 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -127,7 +127,7 @@ impl VssStore { Ok(keys) } - fn read_internal( + async fn read_internal( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result> { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; @@ -135,7 +135,7 @@ impl VssStore { store_id: self.store_id.clone(), key: self.build_key(primary_namespace, secondary_namespace, key)?, }; - let resp = self.runtime.block_on(self.client.get_object(&request)).map_err(|e| { + let resp = self.client.get_object(&request).await.map_err(|e| { let msg = format!( "Failed to read from key {}/{}/{}: {}", primary_namespace, secondary_namespace, key, e @@ -145,6 +145,7 @@ impl VssStore { _ => Error::new(ErrorKind::Other, msg), } })?; + // unwrap safety: resp.value must be always present for a non-erroneous VSS response, otherwise // it is an API-violation which is converted to [`VssError::InternalServerError`] in [`VssClient`] let storable = Storable::decode(&resp.value.unwrap().value[..]).map_err(|e| { @@ -158,7 +159,7 @@ impl VssStore { Ok(self.storable_builder.deconstruct(storable)?.0) } - fn write_internal( + async fn write_internal( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> io::Result<()> { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; @@ -175,7 +176,7 @@ impl VssStore { delete_items: vec![], }; - self.runtime.block_on(self.client.put_object(&request)).map_err(|e| { + self.client.put_object(&request).await.map_err(|e| { let msg = format!( "Failed to write to key {}/{}/{}: {}", primary_namespace, secondary_namespace, key, e @@ -186,7 +187,7 @@ impl VssStore { Ok(()) } - fn remove_internal( + async fn remove_internal( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, ) -> io::Result<()> { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; @@ -199,25 +200,24 @@ impl VssStore { }), }; - self.runtime.block_on(self.client.delete_object(&request)).map_err(|e| { + self.client.delete_object(&request).await.map_err(|e| { let msg = format!( "Failed to delete key {}/{}/{}: {}", primary_namespace, secondary_namespace, key, e ); Error::new(ErrorKind::Other, msg) })?; + Ok(()) } - fn list_internal( + async fn list_internal( &self, primary_namespace: &str, secondary_namespace: &str, ) -> io::Result> { check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; - let keys = self - .runtime - .block_on(self.list_all_keys(primary_namespace, secondary_namespace)) - .map_err(|e| { + let keys = + self.list_all_keys(primary_namespace, secondary_namespace).await.map_err(|e| { let msg = format!( "Failed to retrieve keys in namespace: {}/{} : {}", primary_namespace, secondary_namespace, e @@ -233,23 +233,27 @@ impl KVStoreSync for VssStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result> { - self.read_internal(primary_namespace, secondary_namespace, key) + let fut = self.read_internal(primary_namespace, secondary_namespace, key); + self.runtime.block_on(fut) } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> io::Result<()> { - self.write_internal(primary_namespace, secondary_namespace, key, buf) + let fut = self.write_internal(primary_namespace, secondary_namespace, key, buf); + self.runtime.block_on(fut) } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> io::Result<()> { - self.remove_internal(primary_namespace, secondary_namespace, key, lazy) + let fut = self.remove_internal(primary_namespace, secondary_namespace, key, lazy); + self.runtime.block_on(fut) } fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { - self.list_internal(primary_namespace, secondary_namespace) + let fut = self.list_internal(primary_namespace, secondary_namespace); + self.runtime.block_on(fut) } } From af2864dd478bd01e3a46b91c08f60f20754139a4 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 12 Sep 2025 10:40:22 +0200 Subject: [PATCH 03/12] Split `VssStore` into `VssStore` and `VssStoreInner` .. where the former holds the latter in an `Arc` that can be used in async/`Future` contexts more easily. --- src/io/vss_store.rs | 78 ++++++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 32 deletions(-) diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 02cb54e78..b85477f5e 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -41,17 +41,59 @@ type CustomRetryPolicy = FilteredRetryPolicy< /// A [`KVStoreSync`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend. pub struct VssStore { + inner: Arc, + runtime: Arc, +} + +impl VssStore { + pub(crate) fn new( + base_url: String, store_id: String, vss_seed: [u8; 32], + header_provider: Arc, runtime: Arc, + ) -> Self { + let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider)); + Self { inner, runtime } + } +} + +impl KVStoreSync for VssStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + let fut = self.inner.read_internal(primary_namespace, secondary_namespace, key); + self.runtime.block_on(fut) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + let fut = self.inner.write_internal(primary_namespace, secondary_namespace, key, buf); + self.runtime.block_on(fut) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + let fut = self.inner.remove_internal(primary_namespace, secondary_namespace, key, lazy); + self.runtime.block_on(fut) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + let fut = self.inner.list_internal(primary_namespace, secondary_namespace); + self.runtime.block_on(fut) + } +} + +struct VssStoreInner { client: VssClient, store_id: String, - runtime: Arc, storable_builder: StorableBuilder, key_obfuscator: KeyObfuscator, } -impl VssStore { +impl VssStoreInner { pub(crate) fn new( base_url: String, store_id: String, vss_seed: [u8; 32], - header_provider: Arc, runtime: Arc, + header_provider: Arc, ) -> Self { let (data_encryption_key, obfuscation_master_key) = derive_data_encryption_and_obfuscation_keys(&vss_seed); @@ -71,7 +113,7 @@ impl VssStore { }) as _); let client = VssClient::new_with_headers(base_url, retry_policy, header_provider); - Self { client, store_id, runtime, storable_builder, key_obfuscator } + Self { client, store_id, storable_builder, key_obfuscator } } fn build_key( @@ -229,34 +271,6 @@ impl VssStore { } } -impl KVStoreSync for VssStore { - fn read( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> io::Result> { - let fut = self.read_internal(primary_namespace, secondary_namespace, key); - self.runtime.block_on(fut) - } - - fn write( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> io::Result<()> { - let fut = self.write_internal(primary_namespace, secondary_namespace, key, buf); - self.runtime.block_on(fut) - } - - fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> io::Result<()> { - let fut = self.remove_internal(primary_namespace, secondary_namespace, key, lazy); - self.runtime.block_on(fut) - } - - fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { - let fut = self.list_internal(primary_namespace, secondary_namespace); - self.runtime.block_on(fut) - } -} - fn derive_data_encryption_and_obfuscation_keys(vss_seed: &[u8; 32]) -> ([u8; 32], [u8; 32]) { let hkdf = |initial_key_material: &[u8], salt: &[u8]| -> [u8; 32] { let mut engine = HmacEngine::::new(salt); From 9d08b27c540e634ccc35f7d26388a76301546aff Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 26 Sep 2025 09:12:52 +0200 Subject: [PATCH 04/12] Refactor infallible `build_key` to not return an error --- src/io/vss_store.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index b85477f5e..7c3ccdd86 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -116,14 +116,12 @@ impl VssStoreInner { Self { client, store_id, storable_builder, key_obfuscator } } - fn build_key( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> io::Result { + fn build_key(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> String { let obfuscated_key = self.key_obfuscator.obfuscate(key); if primary_namespace.is_empty() { - Ok(obfuscated_key) + obfuscated_key } else { - Ok(format!("{}#{}#{}", primary_namespace, secondary_namespace, obfuscated_key)) + format!("{}#{}#{}", primary_namespace, secondary_namespace, obfuscated_key) } } @@ -175,7 +173,7 @@ impl VssStoreInner { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; let request = GetObjectRequest { store_id: self.store_id.clone(), - key: self.build_key(primary_namespace, secondary_namespace, key)?, + key: self.build_key(primary_namespace, secondary_namespace, key), }; let resp = self.client.get_object(&request).await.map_err(|e| { let msg = format!( @@ -211,7 +209,7 @@ impl VssStoreInner { store_id: self.store_id.clone(), global_version: None, transaction_items: vec![KeyValue { - key: self.build_key(primary_namespace, secondary_namespace, key)?, + key: self.build_key(primary_namespace, secondary_namespace, key), version, value: storable.encode_to_vec(), }], @@ -236,7 +234,7 @@ impl VssStoreInner { let request = DeleteObjectRequest { store_id: self.store_id.clone(), key_value: Some(KeyValue { - key: self.build_key(primary_namespace, secondary_namespace, key)?, + key: self.build_key(primary_namespace, secondary_namespace, key), version: -1, value: vec![], }), From ee122f21a98cc800d195bcc2871b433472356e78 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 12 Sep 2025 10:47:46 +0200 Subject: [PATCH 05/12] Implement `KVStore` for `VssStore` We implement the async `KVStore` trait for `VssStore`. --- src/io/vss_store.rs | 283 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 234 insertions(+), 49 deletions(-) diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 7c3ccdd86..b71d4144e 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -5,16 +5,22 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use std::boxed::Box; +use std::collections::HashMap; +use std::future::Future; #[cfg(test)] use std::panic::RefUnwindSafe; -use std::sync::Arc; +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; use std::time::Duration; use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine}; use lightning::io::{self, Error, ErrorKind}; -use lightning::util::persist::KVStoreSync; +use lightning::util::persist::{KVStore, KVStoreSync}; use prost::Message; use rand::RngCore; +use tokio::sync::RwLock; use vss_client::client::VssClient; use vss_client::error::VssError; use vss_client::headers::VssHeaderProvider; @@ -42,6 +48,9 @@ type CustomRetryPolicy = FilteredRetryPolicy< /// A [`KVStoreSync`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend. pub struct VssStore { inner: Arc, + // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list + // operations aren't sensitive to the order of execution. + next_version: AtomicU64, runtime: Arc, } @@ -51,7 +60,32 @@ impl VssStore { header_provider: Arc, runtime: Arc, ) -> Self { let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider)); - Self { inner, runtime } + let next_version = AtomicU64::new(1); + Self { inner, next_version, runtime } + } + + // Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys + fn build_locking_key( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> String { + if primary_namespace.is_empty() { + key.to_owned() + } else { + format!("{}#{}#{}", primary_namespace, secondary_namespace, key) + } + } + + fn get_new_version_and_lock_ref(&self, locking_key: String) -> (Arc>, u64) { + let version = self.next_version.fetch_add(1, Ordering::Relaxed); + if version == u64::MAX { + panic!("VssStore version counter overflowed"); + } + + // Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for + // cleaning up unused locks. + let inner_lock_ref = self.inner.get_inner_lock_ref(locking_key); + + (inner_lock_ref, version) } } @@ -66,14 +100,34 @@ impl KVStoreSync for VssStore { fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> io::Result<()> { - let fut = self.inner.write_internal(primary_namespace, secondary_namespace, key, buf); + let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + let fut = self.inner.write_internal( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + buf, + ); self.runtime.block_on(fut) } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> io::Result<()> { - let fut = self.inner.remove_internal(primary_namespace, secondary_namespace, key, lazy); + let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + let fut = self.inner.remove_internal( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + lazy, + ); self.runtime.block_on(fut) } @@ -83,11 +137,82 @@ impl KVStoreSync for VssStore { } } +impl KVStore for VssStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, io::Error>> + Send>> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + Box::pin(async move { + inner.read_internal(&primary_namespace, &secondary_namespace, &key).await + }) + } + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Pin> + Send>> { + let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + Box::pin(async move { + inner + .write_internal( + inner_lock_ref, + locking_key, + version, + &primary_namespace, + &secondary_namespace, + &key, + buf, + ) + .await + }) + } + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Pin> + Send>> { + let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + Box::pin(async move { + inner + .remove_internal( + inner_lock_ref, + locking_key, + version, + &primary_namespace, + &secondary_namespace, + &key, + lazy, + ) + .await + }) + } + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, io::Error>> + Send>> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let inner = Arc::clone(&self.inner); + Box::pin(async move { inner.list_internal(&primary_namespace, &secondary_namespace).await }) + } +} + struct VssStoreInner { client: VssClient, store_id: String, storable_builder: StorableBuilder, key_obfuscator: KeyObfuscator, + // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key. + // The lock also encapsulates the latest written version per key. + locks: Mutex>>>, } impl VssStoreInner { @@ -113,10 +238,18 @@ impl VssStoreInner { }) as _); let client = VssClient::new_with_headers(base_url, retry_policy, header_provider); - Self { client, store_id, storable_builder, key_obfuscator } + let locks = Mutex::new(HashMap::new()); + Self { client, store_id, storable_builder, key_obfuscator, locks } } - fn build_key(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> String { + fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { + let mut outer_lock = self.locks.lock().unwrap(); + Arc::clone(&outer_lock.entry(locking_key).or_default()) + } + + fn build_obfuscated_key( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> String { let obfuscated_key = self.key_obfuscator.obfuscate(key); if primary_namespace.is_empty() { obfuscated_key @@ -171,10 +304,9 @@ impl VssStoreInner { &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result> { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; - let request = GetObjectRequest { - store_id: self.store_id.clone(), - key: self.build_key(primary_namespace, secondary_namespace, key), - }; + + let obfuscated_key = self.build_obfuscated_key(primary_namespace, secondary_namespace, key); + let request = GetObjectRequest { store_id: self.store_id.clone(), key: obfuscated_key }; let resp = self.client.get_object(&request).await.map_err(|e| { let msg = format!( "Failed to read from key {}/{}/{}: {}", @@ -200,55 +332,65 @@ impl VssStoreInner { } async fn write_internal( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> io::Result<()> { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; - let version = -1; - let storable = self.storable_builder.build(buf, version); - let request = PutObjectRequest { - store_id: self.store_id.clone(), - global_version: None, - transaction_items: vec![KeyValue { - key: self.build_key(primary_namespace, secondary_namespace, key), - version, - value: storable.encode_to_vec(), - }], - delete_items: vec![], - }; - self.client.put_object(&request).await.map_err(|e| { - let msg = format!( - "Failed to write to key {}/{}/{}: {}", - primary_namespace, secondary_namespace, key, e - ); - Error::new(ErrorKind::Other, msg) - })?; + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { + let obfuscated_key = + self.build_obfuscated_key(primary_namespace, secondary_namespace, key); + let vss_version = -1; + let storable = self.storable_builder.build(buf, vss_version); + let request = PutObjectRequest { + store_id: self.store_id.clone(), + global_version: None, + transaction_items: vec![KeyValue { + key: obfuscated_key, + version: vss_version, + value: storable.encode_to_vec(), + }], + delete_items: vec![], + }; - Ok(()) + self.client.put_object(&request).await.map_err(|e| { + let msg = format!( + "Failed to write to key {}/{}/{}: {}", + primary_namespace, secondary_namespace, key, e + ); + Error::new(ErrorKind::Other, msg) + })?; + + Ok(()) + }) + .await } async fn remove_internal( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, ) -> io::Result<()> { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; - let request = DeleteObjectRequest { - store_id: self.store_id.clone(), - key_value: Some(KeyValue { - key: self.build_key(primary_namespace, secondary_namespace, key), - version: -1, - value: vec![], - }), - }; - self.client.delete_object(&request).await.map_err(|e| { - let msg = format!( - "Failed to delete key {}/{}/{}: {}", - primary_namespace, secondary_namespace, key, e - ); - Error::new(ErrorKind::Other, msg) - })?; + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { + let obfuscated_key = + self.build_obfuscated_key(primary_namespace, secondary_namespace, key); + let request = DeleteObjectRequest { + store_id: self.store_id.clone(), + key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }), + }; - Ok(()) + self.client.delete_object(&request).await.map_err(|e| { + let msg = format!( + "Failed to delete key {}/{}/{}: {}", + primary_namespace, secondary_namespace, key, e + ); + Error::new(ErrorKind::Other, msg) + })?; + + Ok(()) + }) + .await } async fn list_internal( @@ -267,6 +409,49 @@ impl VssStoreInner { Ok(keys) } + + async fn execute_locked_write< + F: Future>, + FN: FnOnce() -> F, + >( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, callback: FN, + ) -> Result<(), lightning::io::Error> { + let res = { + let mut last_written_version = inner_lock_ref.write().await; + + // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual + // consistency. + let is_stale_version = version <= *last_written_version; + + // If the version is not stale, we execute the callback. Otherwise we can and must skip writing. + if is_stale_version { + Ok(()) + } else { + callback().await.map(|_| { + *last_written_version = version; + }) + } + }; + + self.clean_locks(&inner_lock_ref, locking_key); + + res + } + + fn clean_locks(&self, inner_lock_ref: &Arc>, locking_key: String) { + // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry + // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in + // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already + // counted. + let mut outer_lock = self.locks.lock().unwrap(); + + let strong_count = Arc::strong_count(&inner_lock_ref); + debug_assert!(strong_count >= 2, "Unexpected VssStore strong count"); + + if strong_count == 2 { + outer_lock.remove(&locking_key); + } + } } fn derive_data_encryption_and_obfuscation_keys(vss_seed: &[u8; 32]) -> ([u8; 32], [u8; 32]) { From 86e1badbf379ebbec899d9e9c93af994a88506a4 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 12 Sep 2025 11:00:04 +0200 Subject: [PATCH 06/12] Move `SqliteStore` logic to `_internal` methods .. to be easier reusable via `KVStore` also --- src/io/sqlite_store/mod.rs | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index d18c7440d..582dd831b 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -126,10 +126,8 @@ impl SqliteStore { pub fn get_data_dir(&self) -> PathBuf { self.data_dir.clone() } -} -impl KVStoreSync for SqliteStore { - fn read( + fn read_internal( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result> { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; @@ -177,7 +175,7 @@ impl KVStoreSync for SqliteStore { Ok(res) } - fn write( + fn write_internal( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> io::Result<()> { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; @@ -213,7 +211,7 @@ impl KVStoreSync for SqliteStore { }) } - fn remove( + fn remove_internal( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, ) -> io::Result<()> { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; @@ -245,7 +243,9 @@ impl KVStoreSync for SqliteStore { Ok(()) } - fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + fn list_internal( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; let locked_conn = self.connection.lock().unwrap(); @@ -285,6 +285,30 @@ impl KVStoreSync for SqliteStore { } } +impl KVStoreSync for SqliteStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + self.read_internal(primary_namespace, secondary_namespace, key) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + self.write_internal(primary_namespace, secondary_namespace, key, buf) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + self.remove_internal(primary_namespace, secondary_namespace, key, lazy) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + self.list_internal(primary_namespace, secondary_namespace) + } +} + #[cfg(test)] mod tests { use super::*; From 00baaa2962cc27aef529455c6de2a2cb85628f7b Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 12 Sep 2025 11:18:42 +0200 Subject: [PATCH 07/12] Split `SqliteStore` into `SqliteStore` and `SqliteStoreInner` .. where the former holds the latter in an `Arc` that can be used in async/`Future` contexts more easily. --- src/io/sqlite_store/mod.rs | 79 ++++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 33 deletions(-) diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index 582dd831b..2ab5c11a6 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -37,9 +37,7 @@ const SCHEMA_USER_VERSION: u16 = 2; /// /// [SQLite]: https://sqlite.org pub struct SqliteStore { - connection: Arc>, - data_dir: PathBuf, - kv_table_name: String, + inner: Arc, } impl SqliteStore { @@ -51,6 +49,50 @@ impl SqliteStore { /// Similarly, the given `kv_table_name` will be used or default to [`DEFAULT_KV_TABLE_NAME`]. pub fn new( data_dir: PathBuf, db_file_name: Option, kv_table_name: Option, + ) -> io::Result { + let inner = Arc::new(SqliteStoreInner::new(data_dir, db_file_name, kv_table_name)?); + Ok(Self { inner }) + } + + /// Returns the data directory. + pub fn get_data_dir(&self) -> PathBuf { + self.inner.data_dir.clone() + } +} + +impl KVStoreSync for SqliteStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + self.inner.read_internal(primary_namespace, secondary_namespace, key) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + self.inner.write_internal(primary_namespace, secondary_namespace, key, buf) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + self.inner.remove_internal(primary_namespace, secondary_namespace, key, lazy) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + self.inner.list_internal(primary_namespace, secondary_namespace) + } +} + +struct SqliteStoreInner { + connection: Arc>, + data_dir: PathBuf, + kv_table_name: String, +} + +impl SqliteStoreInner { + fn new( + data_dir: PathBuf, db_file_name: Option, kv_table_name: Option, ) -> io::Result { let db_file_name = db_file_name.unwrap_or(DEFAULT_SQLITE_DB_FILE_NAME.to_string()); let kv_table_name = kv_table_name.unwrap_or(DEFAULT_KV_TABLE_NAME.to_string()); @@ -122,11 +164,6 @@ impl SqliteStore { Ok(Self { connection, data_dir, kv_table_name }) } - /// Returns the data directory. - pub fn get_data_dir(&self) -> PathBuf { - self.data_dir.clone() - } - fn read_internal( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result> { @@ -285,30 +322,6 @@ impl SqliteStore { } } -impl KVStoreSync for SqliteStore { - fn read( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> io::Result> { - self.read_internal(primary_namespace, secondary_namespace, key) - } - - fn write( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> io::Result<()> { - self.write_internal(primary_namespace, secondary_namespace, key, buf) - } - - fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> io::Result<()> { - self.remove_internal(primary_namespace, secondary_namespace, key, lazy) - } - - fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { - self.list_internal(primary_namespace, secondary_namespace) - } -} - #[cfg(test)] mod tests { use super::*; @@ -318,7 +331,7 @@ mod tests { impl Drop for SqliteStore { fn drop(&mut self) { - match fs::remove_dir_all(&self.data_dir) { + match fs::remove_dir_all(&self.inner.data_dir) { Err(e) => println!("Failed to remove test store directory: {}", e), _ => {}, } From 5f591324047b0cbbfd9d0a6b445d1a9cb71a9d35 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 12 Sep 2025 11:42:47 +0200 Subject: [PATCH 08/12] Implement `KVStore` for `SqliteStore` --- src/io/sqlite_store/mod.rs | 75 +++++++++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index 2ab5c11a6..40d5592cf 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -6,12 +6,15 @@ // accordance with one or both of these licenses. //! Objects related to [`SqliteStore`] live here. +use std::boxed::Box; use std::fs; +use std::future::Future; use std::path::PathBuf; +use std::pin::Pin; use std::sync::{Arc, Mutex}; use lightning::io; -use lightning::util::persist::KVStoreSync; +use lightning::util::persist::{KVStore, KVStoreSync}; use lightning_types::string::PrintableString; use rusqlite::{named_params, Connection}; @@ -60,6 +63,76 @@ impl SqliteStore { } } +impl KVStore for SqliteStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, io::Error>> + Send>> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + let fut = tokio::task::spawn_blocking(move || { + inner.read_internal(&primary_namespace, &secondary_namespace, &key) + }); + Box::pin(async move { + fut.await.unwrap_or_else(|e| { + let msg = format!("Failed to IO operation due join error: {}", e); + Err(io::Error::new(io::ErrorKind::Other, msg)) + }) + }) + } + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Pin> + Send>> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + let fut = tokio::task::spawn_blocking(move || { + inner.write_internal(&primary_namespace, &secondary_namespace, &key, buf) + }); + Box::pin(async move { + fut.await.unwrap_or_else(|e| { + let msg = format!("Failed to IO operation due join error: {}", e); + Err(io::Error::new(io::ErrorKind::Other, msg)) + }) + }) + } + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Pin> + Send>> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + let fut = tokio::task::spawn_blocking(move || { + inner.remove_internal(&primary_namespace, &secondary_namespace, &key, lazy) + }); + Box::pin(async move { + fut.await.unwrap_or_else(|e| { + let msg = format!("Failed to IO operation due join error: {}", e); + Err(io::Error::new(io::ErrorKind::Other, msg)) + }) + }) + } + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, io::Error>> + Send>> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let inner = Arc::clone(&self.inner); + let fut = tokio::task::spawn_blocking(move || { + inner.list_internal(&primary_namespace, &secondary_namespace) + }); + Box::pin(async move { + fut.await.unwrap_or_else(|e| { + let msg = format!("Failed to IO operation due join error: {}", e); + Err(io::Error::new(io::ErrorKind::Other, msg)) + }) + }) + } +} + impl KVStoreSync for SqliteStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, From 33c32d40ca07c37aa07a1e411256df5acfc0f180 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 18 Sep 2025 11:13:46 +0200 Subject: [PATCH 09/12] Move `TestStoreSync` logic to `_internal` methods .. to be easier reusable via `KVStore` also --- tests/common/mod.rs | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 98c96e307..4c9b98c03 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1242,10 +1242,8 @@ impl TestSyncStore { }, } } -} -impl KVStoreSync for TestSyncStore { - fn read( + fn read_internal( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> lightning::io::Result> { let _guard = self.serializer.read().unwrap(); @@ -1270,7 +1268,7 @@ impl KVStoreSync for TestSyncStore { } } - fn write( + fn write_internal( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> lightning::io::Result<()> { let _guard = self.serializer.write().unwrap(); @@ -1299,7 +1297,7 @@ impl KVStoreSync for TestSyncStore { } } - fn remove( + fn remove_internal( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> lightning::io::Result<()> { let _guard = self.serializer.write().unwrap(); @@ -1327,10 +1325,36 @@ impl KVStoreSync for TestSyncStore { } } - fn list( + fn list_internal( &self, primary_namespace: &str, secondary_namespace: &str, ) -> lightning::io::Result> { let _guard = self.serializer.read().unwrap(); self.do_list(primary_namespace, secondary_namespace) } } + +impl KVStoreSync for TestSyncStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> lightning::io::Result> { + self.read_internal(primary_namespace, secondary_namespace, key) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> lightning::io::Result<()> { + self.write_internal(primary_namespace, secondary_namespace, key, buf) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> lightning::io::Result<()> { + self.remove_internal(primary_namespace, secondary_namespace, key, lazy) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> lightning::io::Result> { + self.list_internal(primary_namespace, secondary_namespace) + } +} From c715538b5d977b836b81ac469e808d4a92a979d3 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 18 Sep 2025 11:16:36 +0200 Subject: [PATCH 10/12] Split `TestSyncStore` into `TestSyncStore` and `TestSyncStoreInner` .. where the former holds the latter in an `Arc` that can be used in async/`Future` contexts more easily. --- tests/common/mod.rs | 67 ++++++++++++++++++++++++++------------------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 4c9b98c03..64151cdb7 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1190,14 +1190,51 @@ pub(crate) fn do_channel_full_cycle( // A `KVStore` impl for testing purposes that wraps all our `KVStore`s and asserts their synchronicity. pub(crate) struct TestSyncStore { + inner: Arc, +} + +impl TestSyncStore { + pub(crate) fn new(dest_dir: PathBuf) -> Self { + let inner = Arc::new(TestSyncStoreInner::new(dest_dir)); + Self { inner } + } +} + +impl KVStoreSync for TestSyncStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> lightning::io::Result> { + self.inner.read_internal(primary_namespace, secondary_namespace, key) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> lightning::io::Result<()> { + self.inner.write_internal(primary_namespace, secondary_namespace, key, buf) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> lightning::io::Result<()> { + self.inner.remove_internal(primary_namespace, secondary_namespace, key, lazy) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> lightning::io::Result> { + self.inner.list_internal(primary_namespace, secondary_namespace) + } +} + +struct TestSyncStoreInner { serializer: RwLock<()>, test_store: TestStore, fs_store: FilesystemStore, sqlite_store: SqliteStore, } -impl TestSyncStore { - pub(crate) fn new(dest_dir: PathBuf) -> Self { +impl TestSyncStoreInner { + fn new(dest_dir: PathBuf) -> Self { let serializer = RwLock::new(()); let mut fs_dir = dest_dir.clone(); fs_dir.push("fs_store"); @@ -1332,29 +1369,3 @@ impl TestSyncStore { self.do_list(primary_namespace, secondary_namespace) } } - -impl KVStoreSync for TestSyncStore { - fn read( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> lightning::io::Result> { - self.read_internal(primary_namespace, secondary_namespace, key) - } - - fn write( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> lightning::io::Result<()> { - self.write_internal(primary_namespace, secondary_namespace, key, buf) - } - - fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> lightning::io::Result<()> { - self.remove_internal(primary_namespace, secondary_namespace, key, lazy) - } - - fn list( - &self, primary_namespace: &str, secondary_namespace: &str, - ) -> lightning::io::Result> { - self.list_internal(primary_namespace, secondary_namespace) - } -} From aae4ce9cb6e55c74b820b3531deb771ef97155ee Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 18 Sep 2025 11:18:54 +0200 Subject: [PATCH 11/12] Implement `KVStore` for `TestSyncStore` --- tests/common/mod.rs | 137 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 121 insertions(+), 16 deletions(-) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 64151cdb7..e9ea2a69e 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -10,9 +10,12 @@ pub(crate) mod logging; +use std::boxed::Box; use std::collections::{HashMap, HashSet}; use std::env; +use std::future::Future; use std::path::PathBuf; +use std::pin::Pin; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -31,9 +34,10 @@ use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; use ldk_node::{ Builder, CustomTlvRecord, Event, LightningBalance, Node, NodeError, PendingSweepBalance, }; +use lightning::io; use lightning::ln::msgs::SocketAddress; use lightning::routing::gossip::NodeAlias; -use lightning::util::persist::KVStoreSync; +use lightning::util::persist::{KVStore, KVStoreSync}; use lightning::util::test_utils::TestStore; use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_persister::fs_store::FilesystemStore; @@ -1200,6 +1204,76 @@ impl TestSyncStore { } } +impl KVStore for TestSyncStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, io::Error>> + Send>> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + let fut = tokio::task::spawn_blocking(move || { + inner.read_internal(&primary_namespace, &secondary_namespace, &key) + }); + Box::pin(async move { + fut.await.unwrap_or_else(|e| { + let msg = format!("Failed to IO operation due join error: {}", e); + Err(io::Error::new(io::ErrorKind::Other, msg)) + }) + }) + } + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Pin> + Send>> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + let fut = tokio::task::spawn_blocking(move || { + inner.write_internal(&primary_namespace, &secondary_namespace, &key, buf) + }); + Box::pin(async move { + fut.await.unwrap_or_else(|e| { + let msg = format!("Failed to IO operation due join error: {}", e); + Err(io::Error::new(io::ErrorKind::Other, msg)) + }) + }) + } + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Pin> + Send>> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + let fut = tokio::task::spawn_blocking(move || { + inner.remove_internal(&primary_namespace, &secondary_namespace, &key, lazy) + }); + Box::pin(async move { + fut.await.unwrap_or_else(|e| { + let msg = format!("Failed to IO operation due join error: {}", e); + Err(io::Error::new(io::ErrorKind::Other, msg)) + }) + }) + } + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, io::Error>> + Send>> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let inner = Arc::clone(&self.inner); + let fut = tokio::task::spawn_blocking(move || { + inner.list_internal(&primary_namespace, &secondary_namespace) + }); + Box::pin(async move { + fut.await.unwrap_or_else(|e| { + let msg = format!("Failed to IO operation due join error: {}", e); + Err(io::Error::new(io::ErrorKind::Other, msg)) + }) + }) + } +} + impl KVStoreSync for TestSyncStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, @@ -1254,9 +1328,10 @@ impl TestSyncStoreInner { fn do_list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> lightning::io::Result> { - let fs_res = self.fs_store.list(primary_namespace, secondary_namespace); - let sqlite_res = self.sqlite_store.list(primary_namespace, secondary_namespace); - let test_res = self.test_store.list(primary_namespace, secondary_namespace); + let fs_res = KVStoreSync::list(&self.fs_store, primary_namespace, secondary_namespace); + let sqlite_res = + KVStoreSync::list(&self.sqlite_store, primary_namespace, secondary_namespace); + let test_res = KVStoreSync::list(&self.test_store, primary_namespace, secondary_namespace); match fs_res { Ok(mut list) => { @@ -1285,9 +1360,11 @@ impl TestSyncStoreInner { ) -> lightning::io::Result> { let _guard = self.serializer.read().unwrap(); - let fs_res = self.fs_store.read(primary_namespace, secondary_namespace, key); - let sqlite_res = self.sqlite_store.read(primary_namespace, secondary_namespace, key); - let test_res = self.test_store.read(primary_namespace, secondary_namespace, key); + let fs_res = KVStoreSync::read(&self.fs_store, primary_namespace, secondary_namespace, key); + let sqlite_res = + KVStoreSync::read(&self.sqlite_store, primary_namespace, secondary_namespace, key); + let test_res = + KVStoreSync::read(&self.test_store, primary_namespace, secondary_namespace, key); match fs_res { Ok(read) => { @@ -1309,11 +1386,27 @@ impl TestSyncStoreInner { &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> lightning::io::Result<()> { let _guard = self.serializer.write().unwrap(); - let fs_res = self.fs_store.write(primary_namespace, secondary_namespace, key, buf.clone()); - let sqlite_res = - self.sqlite_store.write(primary_namespace, secondary_namespace, key, buf.clone()); - let test_res = - self.test_store.write(primary_namespace, secondary_namespace, key, buf.clone()); + let fs_res = KVStoreSync::write( + &self.fs_store, + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + let sqlite_res = KVStoreSync::write( + &self.sqlite_store, + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + let test_res = KVStoreSync::write( + &self.test_store, + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); assert!(self .do_list(primary_namespace, secondary_namespace) @@ -1338,10 +1431,22 @@ impl TestSyncStoreInner { &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> lightning::io::Result<()> { let _guard = self.serializer.write().unwrap(); - let fs_res = self.fs_store.remove(primary_namespace, secondary_namespace, key, lazy); - let sqlite_res = - self.sqlite_store.remove(primary_namespace, secondary_namespace, key, lazy); - let test_res = self.test_store.remove(primary_namespace, secondary_namespace, key, lazy); + let fs_res = + KVStoreSync::remove(&self.fs_store, primary_namespace, secondary_namespace, key, lazy); + let sqlite_res = KVStoreSync::remove( + &self.sqlite_store, + primary_namespace, + secondary_namespace, + key, + lazy, + ); + let test_res = KVStoreSync::remove( + &self.test_store, + primary_namespace, + secondary_namespace, + key, + lazy, + ); assert!(!self .do_list(primary_namespace, secondary_namespace) From 6c3fdf379e94b0960ed29e5ddd3786621e923926 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 17 Sep 2025 12:57:25 +0200 Subject: [PATCH 12/12] Require both types of `KVStore` As an intermediary step, we require any store to implement both `KVStore` and `KVStoreSync`, allowing us to switch over step-by-step. We already switch to the fully-async background processor variant here. --- Cargo.toml | 8 +- src/builder.rs | 7 +- src/data_store.rs | 49 ++++--- src/event.rs | 45 +++--- src/io/utils.rs | 128 ++++++++++-------- src/lib.rs | 41 +++--- .../asynchronous/static_invoice_store.rs | 59 ++++---- src/peer_store.rs | 59 ++++---- src/types.rs | 18 ++- tests/integration_tests_rust.rs | 5 +- 10 files changed, 236 insertions(+), 183 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b639b7dc1..7385b5c46 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ default = [] #lightning-types = { version = "0.2.0" } #lightning-invoice = { version = "0.33.0", features = ["std"] } #lightning-net-tokio = { version = "0.1.0" } -#lightning-persister = { version = "0.1.0" } +#lightning-persister = { version = "0.1.0", features = ["tokio"] } #lightning-background-processor = { version = "0.1.0" } #lightning-rapid-gossip-sync = { version = "0.1.0" } #lightning-block-sync = { version = "0.1.0", features = ["rest-client", "rpc-client", "tokio"] } @@ -44,7 +44,7 @@ default = [] #lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } #lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["std"] } #lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } -#lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } +#lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["tokio"] } #lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } #lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } #lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["rest-client", "rpc-client", "tokio"] } @@ -56,7 +56,7 @@ lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = " lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" } lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994", features = ["std"] } lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" } -lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" } +lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994", features = ["tokio"] } lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" } lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" } lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994", features = ["rest-client", "rpc-client", "tokio"] } @@ -68,7 +68,7 @@ lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", #lightning-types = { path = "../rust-lightning/lightning-types" } #lightning-invoice = { path = "../rust-lightning/lightning-invoice", features = ["std"] } #lightning-net-tokio = { path = "../rust-lightning/lightning-net-tokio" } -#lightning-persister = { path = "../rust-lightning/lightning-persister" } +#lightning-persister = { path = "../rust-lightning/lightning-persister", features = ["tokio"] } #lightning-background-processor = { path = "../rust-lightning/lightning-background-processor" } #lightning-rapid-gossip-sync = { path = "../rust-lightning/lightning-rapid-gossip-sync" } #lightning-block-sync = { path = "../rust-lightning/lightning-block-sync", features = ["rest-client", "rpc-client", "tokio"] } diff --git a/src/builder.rs b/src/builder.rs index cf414ec57..f39566bd5 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -32,7 +32,7 @@ use lightning::routing::scoring::{ }; use lightning::sign::{EntropySource, NodeSigner}; use lightning::util::persist::{ - read_channel_monitors, CHANNEL_MANAGER_PERSISTENCE_KEY, + read_channel_monitors, KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::ser::ReadableArgs; @@ -1421,7 +1421,8 @@ fn build_with_store_internal( // Initialize the ChannelManager let channel_manager = { - if let Ok(res) = kv_store.read( + if let Ok(res) = KVStoreSync::read( + &*kv_store, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, @@ -1659,7 +1660,7 @@ fn build_with_store_internal( Ok(output_sweeper) => Arc::new(output_sweeper), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { - Arc::new(OutputSweeper::new_with_kv_store_sync( + Arc::new(OutputSweeper::new( channel_manager.current_best_block(), Arc::clone(&tx_broadcaster), Arc::clone(&fee_estimator), diff --git a/src/data_store.rs b/src/data_store.rs index f9dbaa788..83cbf4476 100644 --- a/src/data_store.rs +++ b/src/data_store.rs @@ -9,6 +9,7 @@ use std::collections::{hash_map, HashMap}; use std::ops::Deref; use std::sync::{Arc, Mutex}; +use lightning::util::persist::KVStoreSync; use lightning::util::ser::{Readable, Writeable}; use crate::logger::{log_error, LdkLogger}; @@ -97,19 +98,24 @@ where let removed = self.objects.lock().unwrap().remove(id).is_some(); if removed { let store_key = id.encode_to_hex_str(); - self.kv_store - .remove(&self.primary_namespace, &self.secondary_namespace, &store_key, false) - .map_err(|e| { - log_error!( - self.logger, - "Removing object data for key {}/{}/{} failed due to: {}", - &self.primary_namespace, - &self.secondary_namespace, - store_key, - e - ); - Error::PersistenceFailed - })?; + KVStoreSync::remove( + &*self.kv_store, + &self.primary_namespace, + &self.secondary_namespace, + &store_key, + false, + ) + .map_err(|e| { + log_error!( + self.logger, + "Removing object data for key {}/{}/{} failed due to: {}", + &self.primary_namespace, + &self.secondary_namespace, + store_key, + e + ); + Error::PersistenceFailed + })?; } Ok(()) } @@ -141,9 +147,14 @@ where fn persist(&self, object: &SO) -> Result<(), Error> { let store_key = object.id().encode_to_hex_str(); let data = object.encode(); - self.kv_store - .write(&self.primary_namespace, &self.secondary_namespace, &store_key, data) - .map_err(|e| { + KVStoreSync::write( + &*self.kv_store, + &self.primary_namespace, + &self.secondary_namespace, + &store_key, + data, + ) + .map_err(|e| { log_error!( self.logger, "Write for key {}/{}/{} failed due to: {}", @@ -241,13 +252,15 @@ mod tests { let store_key = id.encode_to_hex_str(); // Check we start empty. - assert!(store.read(&primary_namespace, &secondary_namespace, &store_key).is_err()); + assert!(KVStoreSync::read(&*store, &primary_namespace, &secondary_namespace, &store_key) + .is_err()); // Check we successfully store an object and return `false` let object = TestObject { id, data: [23u8; 3] }; assert_eq!(Ok(false), data_store.insert(object.clone())); assert_eq!(Some(object), data_store.get(&id)); - assert!(store.read(&primary_namespace, &secondary_namespace, &store_key).is_ok()); + assert!(KVStoreSync::read(&*store, &primary_namespace, &secondary_namespace, &store_key) + .is_ok()); // Test re-insertion returns `true` let mut override_object = object.clone(); diff --git a/src/event.rs b/src/event.rs index 1236c7cf2..824cba694 100644 --- a/src/event.rs +++ b/src/event.rs @@ -26,6 +26,7 @@ use lightning::util::config::{ ChannelConfigOverrides, ChannelConfigUpdate, ChannelHandshakeConfigUpdate, }; use lightning::util::errors::APIError; +use lightning::util::persist::KVStoreSync; use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use lightning_liquidity::lsps2::utils::compute_opening_fee; use lightning_types::payment::{PaymentHash, PaymentPreimage}; @@ -348,24 +349,24 @@ where fn persist_queue(&self, locked_queue: &VecDeque) -> Result<(), Error> { let data = EventQueueSerWrapper(locked_queue).encode(); - self.kv_store - .write( + KVStoreSync::write( + &*self.kv_store, + EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_KEY, + data, + ) + .map_err(|e| { + log_error!( + self.logger, + "Write for key {}/{}/{} failed due to: {}", EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY, - data, - ) - .map_err(|e| { - log_error!( - self.logger, - "Write for key {}/{}/{} failed due to: {}", - EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, - EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, - EVENT_QUEUE_PERSISTENCE_KEY, - e - ); - Error::PersistenceFailed - })?; + e + ); + Error::PersistenceFailed + })?; Ok(()) } } @@ -1620,13 +1621,13 @@ mod tests { } // Check we can read back what we persisted. - let persisted_bytes = store - .read( - EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, - EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, - EVENT_QUEUE_PERSISTENCE_KEY, - ) - .unwrap(); + let persisted_bytes = KVStoreSync::read( + &*store, + EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_KEY, + ) + .unwrap(); let deser_event_queue = EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap(); assert_eq!(deser_event_queue.wait_next_event(), expected_event); diff --git a/src/io/utils.rs b/src/io/utils.rs index 0cc910ad7..cb3ca0847 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -24,11 +24,12 @@ use lightning::ln::msgs::DecodeError; use lightning::routing::gossip::NetworkGraph; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters}; use lightning::util::persist::{ - KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY, - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + KVStoreSync, KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, + NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY, + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::ser::{Readable, ReadableArgs, Writeable}; use lightning::util::sweep::OutputSweeper; @@ -131,7 +132,8 @@ pub(crate) fn read_network_graph( where L::Target: LdkLogger, { - let mut reader = Cursor::new(kv_store.read( + let mut reader = Cursor::new(KVStoreSync::read( + &*kv_store, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, @@ -150,7 +152,8 @@ where L::Target: LdkLogger, { let params = ProbabilisticScoringDecayParameters::default(); - let mut reader = Cursor::new(kv_store.read( + let mut reader = Cursor::new(KVStoreSync::read( + &*kv_store, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, @@ -169,7 +172,8 @@ pub(crate) fn read_event_queue( where L::Target: LdkLogger, { - let mut reader = Cursor::new(kv_store.read( + let mut reader = Cursor::new(KVStoreSync::read( + &*kv_store, EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY, @@ -187,7 +191,8 @@ pub(crate) fn read_peer_info( where L::Target: LdkLogger, { - let mut reader = Cursor::new(kv_store.read( + let mut reader = Cursor::new(KVStoreSync::read( + &*kv_store, PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PEER_INFO_PERSISTENCE_KEY, @@ -207,11 +212,13 @@ where { let mut res = Vec::new(); - for stored_key in kv_store.list( + for stored_key in KVStoreSync::list( + &*kv_store, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, )? { - let mut reader = Cursor::new(kv_store.read( + let mut reader = Cursor::new(KVStoreSync::read( + &*kv_store, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, &stored_key, @@ -234,7 +241,8 @@ pub(crate) fn read_output_sweeper( chain_data_source: Arc, keys_manager: Arc, kv_store: Arc, logger: Arc, ) -> Result { - let mut reader = Cursor::new(kv_store.read( + let mut reader = Cursor::new(KVStoreSync::read( + &*kv_store, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY, @@ -248,7 +256,7 @@ pub(crate) fn read_output_sweeper( kv_store, logger.clone(), ); - OutputSweeper::read_with_kv_store_sync(&mut reader, args).map_err(|e| { + OutputSweeper::read(&mut reader, args).map_err(|e| { log_error!(logger, "Failed to deserialize OutputSweeper: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize OutputSweeper") }) @@ -260,7 +268,8 @@ pub(crate) fn read_node_metrics( where L::Target: LdkLogger, { - let mut reader = Cursor::new(kv_store.read( + let mut reader = Cursor::new(KVStoreSync::read( + &*kv_store, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE, NODE_METRICS_KEY, @@ -278,24 +287,24 @@ where L::Target: LdkLogger, { let data = node_metrics.encode(); - kv_store - .write( + KVStoreSync::write( + &*kv_store, + NODE_METRICS_PRIMARY_NAMESPACE, + NODE_METRICS_SECONDARY_NAMESPACE, + NODE_METRICS_KEY, + data, + ) + .map_err(|e| { + log_error!( + logger, + "Writing data to key {}/{}/{} failed due to: {}", NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE, NODE_METRICS_KEY, - data, - ) - .map_err(|e| { - log_error!( - logger, - "Writing data to key {}/{}/{} failed due to: {}", - NODE_METRICS_PRIMARY_NAMESPACE, - NODE_METRICS_SECONDARY_NAMESPACE, - NODE_METRICS_KEY, - e - ); - Error::PersistenceFailed - }) + e + ); + Error::PersistenceFailed + }) } pub(crate) fn is_valid_kvstore_str(key: &str) -> bool { @@ -397,24 +406,26 @@ macro_rules! impl_read_write_change_set_type { where L::Target: LdkLogger, { - let bytes = match kv_store.read($primary_namespace, $secondary_namespace, $key) { - Ok(bytes) => bytes, - Err(e) => { - if e.kind() == lightning::io::ErrorKind::NotFound { - return Ok(None); - } else { - log_error!( - logger, - "Reading data from key {}/{}/{} failed due to: {}", - $primary_namespace, - $secondary_namespace, - $key, - e - ); - return Err(e.into()); - } - }, - }; + let bytes = + match KVStoreSync::read(&*kv_store, $primary_namespace, $secondary_namespace, $key) + { + Ok(bytes) => bytes, + Err(e) => { + if e.kind() == lightning::io::ErrorKind::NotFound { + return Ok(None); + } else { + log_error!( + logger, + "Reading data from key {}/{}/{} failed due to: {}", + $primary_namespace, + $secondary_namespace, + $key, + e + ); + return Err(e.into()); + } + }, + }; let mut reader = Cursor::new(bytes); let res: Result, DecodeError> = @@ -438,17 +449,18 @@ macro_rules! impl_read_write_change_set_type { L::Target: LdkLogger, { let data = ChangeSetSerWrapper(value).encode(); - kv_store.write($primary_namespace, $secondary_namespace, $key, data).map_err(|e| { - log_error!( - logger, - "Writing data to key {}/{}/{} failed due to: {}", - $primary_namespace, - $secondary_namespace, - $key, - e - ); - e.into() - }) + KVStoreSync::write(&*kv_store, $primary_namespace, $secondary_namespace, $key, data) + .map_err(|e| { + log_error!( + logger, + "Writing data to key {}/{}/{} failed due to: {}", + $primary_namespace, + $secondary_namespace, + $key, + e + ); + e.into() + }) } }; } diff --git a/src/lib.rs b/src/lib.rs index 0f547ce1d..b7af9d7d2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -135,7 +135,8 @@ use lightning::ln::channel_state::ChannelShutdownState; use lightning::ln::channelmanager::PaymentId; use lightning::ln::msgs::SocketAddress; use lightning::routing::gossip::NodeAlias; -use lightning_background_processor::process_events_async_with_kv_store_sync; +use lightning::util::persist::KVStoreSync; +use lightning_background_processor::process_events_async; use liquidity::{LSPS1Liquidity, LiquiditySource}; use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use payment::asynchronous::om_mailbox::OnionMessageMailbox; @@ -148,10 +149,12 @@ use peer_store::{PeerInfo, PeerStore}; use rand::Rng; use runtime::Runtime; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, - KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, Graph, KeysManager, + OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, +}; +pub use types::{ + ChannelDetails, CustomTlvRecord, DynStore, PeerDetails, SyncAndAsyncKVStore, UserChannelId, }; -pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, UserChannelId}; pub use { bip39, bitcoin, lightning, lightning_invoice, lightning_liquidity, lightning_types, tokio, vss_client, @@ -549,7 +552,7 @@ impl Node { }; self.runtime.spawn_background_processor_task(async move { - process_events_async_with_kv_store_sync( + process_events_async( background_persister, |e| background_event_handler.handle_event(e), background_chain_mon, @@ -1467,20 +1470,20 @@ impl Node { /// Exports the current state of the scorer. The result can be shared with and merged by light nodes that only have /// a limited view of the network. pub fn export_pathfinding_scores(&self) -> Result, Error> { - self.kv_store - .read( - lightning::util::persist::SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - lightning::util::persist::SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - lightning::util::persist::SCORER_PERSISTENCE_KEY, - ) - .map_err(|e| { - log_error!( - self.logger, - "Failed to access store while exporting pathfinding scores: {}", - e - ); - Error::PersistenceFailed - }) + KVStoreSync::read( + &*self.kv_store, + lightning::util::persist::SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + lightning::util::persist::SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + lightning::util::persist::SCORER_PERSISTENCE_KEY, + ) + .map_err(|e| { + log_error!( + self.logger, + "Failed to access store while exporting pathfinding scores: {}", + e + ); + Error::PersistenceFailed + }) } } diff --git a/src/payment/asynchronous/static_invoice_store.rs b/src/payment/asynchronous/static_invoice_store.rs index e81fd8216..a7e2d2f9e 100644 --- a/src/payment/asynchronous/static_invoice_store.rs +++ b/src/payment/asynchronous/static_invoice_store.rs @@ -15,6 +15,7 @@ use bitcoin::hashes::Hash; use lightning::blinded_path::message::BlindedMessagePath; use lightning::impl_writeable_tlv_based; use lightning::offers::static_invoice::StaticInvoice; +use lightning::util::persist::KVStoreSync; use lightning::util::ser::{Readable, Writeable}; use crate::hex_utils; @@ -77,29 +78,33 @@ impl StaticInvoiceStore { let (secondary_namespace, key) = Self::get_storage_location(invoice_slot, recipient_id); - self.kv_store - .read(STATIC_INVOICE_STORE_PRIMARY_NAMESPACE, &secondary_namespace, &key) - .and_then(|data| { - PersistedStaticInvoice::read(&mut &*data) - .map(|persisted_invoice| { - Some((persisted_invoice.invoice, persisted_invoice.request_path)) - }) - .map_err(|e| { - lightning::io::Error::new( - lightning::io::ErrorKind::InvalidData, - format!("Failed to parse static invoice: {:?}", e), - ) - }) - }) - .or_else( - |e| { - if e.kind() == lightning::io::ErrorKind::NotFound { - Ok(None) - } else { - Err(e) - } - }, - ) + KVStoreSync::read( + &*self.kv_store, + STATIC_INVOICE_STORE_PRIMARY_NAMESPACE, + &secondary_namespace, + &key, + ) + .and_then(|data| { + PersistedStaticInvoice::read(&mut &*data) + .map(|persisted_invoice| { + Some((persisted_invoice.invoice, persisted_invoice.request_path)) + }) + .map_err(|e| { + lightning::io::Error::new( + lightning::io::ErrorKind::InvalidData, + format!("Failed to parse static invoice: {:?}", e), + ) + }) + }) + .or_else( + |e| { + if e.kind() == lightning::io::ErrorKind::NotFound { + Ok(None) + } else { + Err(e) + } + }, + ) } pub(crate) async fn handle_persist_static_invoice( @@ -119,7 +124,13 @@ impl StaticInvoiceStore { // Static invoices will be persisted at "static_invoices//". // // Example: static_invoices/039058c6f2c0cb492c533b0a4d14ef77cc0f78abccced5287d84a1a2011cfb81/00001 - self.kv_store.write(STATIC_INVOICE_STORE_PRIMARY_NAMESPACE, &secondary_namespace, &key, buf) + KVStoreSync::write( + &*self.kv_store, + STATIC_INVOICE_STORE_PRIMARY_NAMESPACE, + &secondary_namespace, + &key, + buf, + ) } fn get_storage_location(invoice_slot: u16, recipient_id: &[u8]) -> (String, String) { diff --git a/src/peer_store.rs b/src/peer_store.rs index 5ebdc0419..82c80c396 100644 --- a/src/peer_store.rs +++ b/src/peer_store.rs @@ -11,6 +11,7 @@ use std::sync::{Arc, RwLock}; use bitcoin::secp256k1::PublicKey; use lightning::impl_writeable_tlv_based; +use lightning::util::persist::KVStoreSync; use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::io::{ @@ -67,24 +68,24 @@ where fn persist_peers(&self, locked_peers: &HashMap) -> Result<(), Error> { let data = PeerStoreSerWrapper(&*locked_peers).encode(); - self.kv_store - .write( + KVStoreSync::write( + &*self.kv_store, + PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PEER_INFO_PERSISTENCE_KEY, + data, + ) + .map_err(|e| { + log_error!( + self.logger, + "Write for key {}/{}/{} failed due to: {}", PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PEER_INFO_PERSISTENCE_KEY, - data, - ) - .map_err(|e| { - log_error!( - self.logger, - "Write for key {}/{}/{} failed due to: {}", - PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - PEER_INFO_PERSISTENCE_KEY, - e - ); - Error::PersistenceFailed - })?; + e + ); + Error::PersistenceFailed + })?; Ok(()) } } @@ -167,23 +168,23 @@ mod tests { .unwrap(); let address = SocketAddress::from_str("127.0.0.1:9738").unwrap(); let expected_peer_info = PeerInfo { node_id, address }; - assert!(store - .read( - PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - PEER_INFO_PERSISTENCE_KEY, - ) - .is_err()); + assert!(KVStoreSync::read( + &*store, + PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PEER_INFO_PERSISTENCE_KEY, + ) + .is_err()); peer_store.add_peer(expected_peer_info.clone()).unwrap(); // Check we can read back what we persisted. - let persisted_bytes = store - .read( - PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - PEER_INFO_PERSISTENCE_KEY, - ) - .unwrap(); + let persisted_bytes = KVStoreSync::read( + &*store, + PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PEER_INFO_PERSISTENCE_KEY, + ) + .unwrap(); let deser_peer_store = PeerStore::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap(); diff --git a/src/types.rs b/src/types.rs index f152772a1..4f5229dd2 100644 --- a/src/types.rs +++ b/src/types.rs @@ -19,7 +19,7 @@ use lightning::routing::gossip; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; use lightning::sign::InMemorySigner; -use lightning::util::persist::{KVStoreSync, KVStoreSyncWrapper}; +use lightning::util::persist::{KVStore, KVStoreSync}; use lightning::util::ser::{Readable, Writeable, Writer}; use lightning::util::sweep::OutputSweeper; use lightning_block_sync::gossip::{GossipVerifier, UtxoSource}; @@ -35,7 +35,19 @@ use crate::logger::Logger; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::PaymentDetails; -pub(crate) type DynStore = dyn KVStoreSync + Sync + Send; +/// A supertrait that requires that a type implements both [`KVStore`] and [`KVStoreSync`] at the +/// same time. +pub trait SyncAndAsyncKVStore: KVStore + KVStoreSync {} + +impl SyncAndAsyncKVStore for T +where + T: KVStore, + T: KVStoreSync, +{ +} + +/// A type alias for [`SyncAndAsyncKVStore`] with `Sync`/`Send` markers; +pub type DynStore = dyn SyncAndAsyncKVStore + Sync + Send; pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, @@ -133,7 +145,7 @@ pub(crate) type Sweeper = OutputSweeper< Arc, Arc, Arc, - KVStoreSyncWrapper>, + Arc, Arc, Arc, >; diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 0db30ea1c..e476c08b7 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -31,11 +31,10 @@ use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, QrPaymentResult, }; -use ldk_node::{Builder, Event, NodeError}; +use ldk_node::{Builder, DynStore, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; -use lightning::util::persist::KVStoreSync; use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use log::LevelFilter; @@ -243,7 +242,7 @@ fn start_stop_reinit() { let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); - let test_sync_store: Arc = + let test_sync_store: Arc = Arc::new(TestSyncStore::new(config.node_config.storage_dir_path.clone().into())); let sync_config = EsploraSyncConfig { background_sync_config: None };