Skip to content

Commit b362b71

Browse files
committed
Remove pruned LSPS2/LSPS5 peer state entries from the KVStore
Previously, we'd persist peer states to the `KVStore`, but, while we pruned them eventually from our in-memory state, we wouldn't remove it from the `KVStore`. Here, we change this and regularly prune and delete peer state entries from the `KVStore`. Note we still prune the state-internal data on peer disconnection, but leave removal to our (BP-driven) async `persist` calls.
1 parent a279c77 commit b362b71

File tree

3 files changed

+100
-29
lines changed

3 files changed

+100
-29
lines changed

lightning-liquidity/src/lsps2/service.rs

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ impl PeerState {
514514
// We abort the flow, and prune any data kept.
515515
self.intercept_scid_by_channel_id.retain(|_, iscid| intercept_scid != iscid);
516516
self.intercept_scid_by_user_channel_id.retain(|_, iscid| intercept_scid != iscid);
517-
// TODO: Remove peer state entry from the KVStore
517+
self.needs_persist |= true;
518518
return false;
519519
}
520520
true
@@ -1662,28 +1662,41 @@ where
16621662
}
16631663

16641664
pub(crate) fn peer_disconnected(&self, counterparty_node_id: PublicKey) {
1665-
let mut outer_state_lock = self.per_peer_state.write().unwrap();
1666-
let is_prunable =
1667-
if let Some(inner_state_lock) = outer_state_lock.get(&counterparty_node_id) {
1668-
let mut peer_state_lock = inner_state_lock.lock().unwrap();
1669-
peer_state_lock.prune_expired_request_state();
1670-
peer_state_lock.is_prunable()
1671-
} else {
1672-
return;
1673-
};
1674-
if is_prunable {
1675-
outer_state_lock.remove(&counterparty_node_id);
1665+
let outer_state_lock = self.per_peer_state.write().unwrap();
1666+
if let Some(inner_state_lock) = outer_state_lock.get(&counterparty_node_id) {
1667+
let mut peer_state_lock = inner_state_lock.lock().unwrap();
1668+
// We clean up the peer state, but leave removing the peer entry to `prune_peer_state`
1669+
// which also removes it from the store.
1670+
peer_state_lock.prune_expired_request_state();
16761671
}
16771672
}
16781673

16791674
#[allow(clippy::bool_comparison)]
1680-
pub(crate) fn prune_peer_state(&self) {
1675+
pub(crate) async fn prune_peer_state(&self) {
16811676
let mut outer_state_lock = self.per_peer_state.write().unwrap();
1682-
outer_state_lock.retain(|_, inner_state_lock| {
1677+
let mut need_remove = Vec::new();
1678+
outer_state_lock.retain(|counterparty_node_id, inner_state_lock| {
16831679
let mut peer_state_lock = inner_state_lock.lock().unwrap();
16841680
peer_state_lock.prune_expired_request_state();
1685-
peer_state_lock.is_prunable() == false
1681+
let is_prunable = peer_state_lock.is_prunable();
1682+
if is_prunable {
1683+
need_remove.push(*counterparty_node_id);
1684+
}
1685+
is_prunable == false
16861686
});
1687+
1688+
for counterparty_node_id in need_remove {
1689+
let key = counterparty_node_id.to_string();
1690+
let _ = self
1691+
.kv_store
1692+
.remove(
1693+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1694+
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1695+
&key,
1696+
true,
1697+
)
1698+
.await;
1699+
}
16871700
}
16881701
}
16891702

lightning-liquidity/src/lsps5/service.rs

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -270,18 +270,48 @@ where
270270
});
271271

272272
if should_prune {
273-
outer_state_lock.retain(|client_id, peer_state| {
274-
if self.client_has_open_channel(client_id) {
275-
// Don't prune clients with open channels
276-
return true;
277-
}
278-
// TODO: Remove peer state entry from the KVStore
279-
!peer_state.prune_stale_webhooks(now)
280-
});
273+
for (_, peer_state) in outer_state_lock.iter_mut() {
274+
// Prune stale webhooks, but leave removal of the peers states to prune_peer_state
275+
// which will also remove it from the store.
276+
peer_state.prune_stale_webhooks(now)
277+
}
281278
*last_pruning = Some(now);
282279
}
283280
}
284281

282+
pub(crate) async fn prune_peer_state(&self) {
283+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
284+
let mut need_remove = Vec::new();
285+
286+
self.check_prune_stale_webhooks(&mut outer_state_lock);
287+
288+
outer_state_lock.retain(|client_id, peer_state| {
289+
if self.client_has_open_channel(client_id) {
290+
// Don't prune clients with open channels
291+
return true;
292+
}
293+
294+
let is_prunable = peer_state.is_prunable();
295+
if is_prunable {
296+
need_remove.push(*client_id);
297+
}
298+
!is_prunable
299+
});
300+
301+
for counterparty_node_id in need_remove {
302+
let key = counterparty_node_id.to_string();
303+
let _ = self
304+
.kv_store
305+
.remove(
306+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
307+
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
308+
&key,
309+
true,
310+
)
311+
.await;
312+
}
313+
}
314+
285315
fn handle_set_webhook(
286316
&self, counterparty_node_id: PublicKey, request_id: LSPSRequestId,
287317
params: SetWebhookRequest,
@@ -733,11 +763,17 @@ impl PeerState {
733763
}
734764

735765
// Returns whether the entire state is empty and can be pruned.
736-
fn prune_stale_webhooks(&mut self, now: LSPSDateTime) -> bool {
766+
fn prune_stale_webhooks(&mut self, now: LSPSDateTime) {
737767
self.webhooks.retain(|(_, webhook)| {
738-
now.duration_since(&webhook.last_used) < MIN_WEBHOOK_RETENTION_DAYS
768+
let should_prune = now.duration_since(&webhook.last_used) >= MIN_WEBHOOK_RETENTION_DAYS;
769+
if should_prune {
770+
self.needs_persist |= true;
771+
}
772+
!should_prune
739773
});
774+
}
740775

776+
fn is_prunable(&mut self) -> bool {
741777
self.webhooks.is_empty()
742778
}
743779
}

lightning-liquidity/src/manager.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ use bitcoin::secp256k1::PublicKey;
6161
use core::future::Future as StdFuture;
6262
use core::ops::Deref;
6363
use core::task;
64+
use core::time::Duration;
6465

6566
const LSPS_FEATURE_BIT: usize = 729;
6667

@@ -311,7 +312,9 @@ pub struct LiquidityManager<
311312
service_config: Option<LiquidityServiceConfig>,
312313
_client_config: Option<LiquidityClientConfig>,
313314
best_block: RwLock<Option<BestBlock>>,
315+
last_peer_state_pruning: Mutex<Option<Duration>>,
314316
_chain_source: Option<C>,
317+
time_provider: TP,
315318
pending_msgs_or_needs_persist_notifier: Arc<Notifier>,
316319
}
317320

@@ -461,7 +464,7 @@ where
461464
kv_store.clone(),
462465
node_signer,
463466
lsps5_service_config.clone(),
464-
time_provider,
467+
time_provider.clone(),
465468
))
466469
} else {
467470
None
@@ -512,6 +515,8 @@ where
512515
None
513516
};
514517

518+
let last_peer_state_pruning = Mutex::new(None);
519+
515520
Ok(Self {
516521
pending_messages,
517522
pending_events,
@@ -529,7 +534,9 @@ where
529534
service_config,
530535
_client_config: client_config,
531536
best_block: RwLock::new(chain_params.map(|chain_params| chain_params.best_block)),
537+
last_peer_state_pruning,
532538
_chain_source: chain_source,
539+
time_provider,
533540
pending_msgs_or_needs_persist_notifier,
534541
})
535542
}
@@ -650,14 +657,32 @@ where
650657
/// This will be regularly called by LDK's background processor if necessary and only needs to
651658
/// be called manually if it's not utilized.
652659
pub async fn persist(&self) -> Result<(), lightning::io::Error> {
660+
let should_prune_state = {
661+
const PRUNE_INTERVAL: Duration = Duration::from_secs(600);
662+
let mut last_peer_state_pruning_lock = self.last_peer_state_pruning.lock().unwrap();
663+
let now = self.time_provider.duration_since_epoch();
664+
if last_peer_state_pruning_lock.map_or(true, |l| l + PRUNE_INTERVAL < now) {
665+
*last_peer_state_pruning_lock = Some(now);
666+
true
667+
} else {
668+
false
669+
}
670+
};
671+
653672
// TODO: We should eventually persist in parallel.
654673
self.pending_events.persist().await?;
655674

656675
if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
676+
if should_prune_state {
677+
lsps2_service_handler.prune_peer_state().await;
678+
}
657679
lsps2_service_handler.persist().await?;
658680
}
659681

660682
if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() {
683+
if should_prune_state {
684+
lsps5_service_handler.prune_peer_state().await;
685+
}
661686
lsps5_service_handler.persist().await?;
662687
}
663688

@@ -1015,9 +1040,6 @@ where
10151040
*self.best_block.write().unwrap() = Some(new_best_block);
10161041

10171042
// TODO: Call best_block_updated on all sub-modules that require it, e.g., LSPS1MessageHandler.
1018-
if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
1019-
lsps2_service_handler.prune_peer_state();
1020-
}
10211043
}
10221044

10231045
fn get_relevant_txids(&self) -> Vec<(bitcoin::Txid, u32, Option<bitcoin::BlockHash>)> {

0 commit comments

Comments
 (0)