Skip to content

Peer Storage (Part 3): Identifying Lost Channel States #3897

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,5 @@ check-cfg = [
"cfg(splicing)",
"cfg(async_payments)",
"cfg(simple_close)",
"cfg(peer_storage)",
]
2 changes: 2 additions & 0 deletions ci/ci-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,5 @@ RUSTFLAGS="--cfg=async_payments" cargo test --verbose --color always -p lightnin
RUSTFLAGS="--cfg=simple_close" cargo test --verbose --color always -p lightning
[ "$CI_MINIMIZE_DISK_USAGE" != "" ] && cargo clean
RUSTFLAGS="--cfg=lsps1_service" cargo test --verbose --color always -p lightning-liquidity
[ "$CI_MINIMIZE_DISK_USAGE" != "" ] && cargo clean
RUSTFLAGS="--cfg=peer_storage" cargo test --verbose --color always -p lightning
101 changes: 91 additions & 10 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use bitcoin::hash_types::{BlockHash, Txid};

use crate::chain;
use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
#[cfg(peer_storage)]
use crate::chain::channelmonitor::write_chanmon_internal;
use crate::chain::channelmonitor::{
Balance, ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, TransactionOutputs,
WithChannelMonitor,
Expand All @@ -36,8 +38,11 @@ use crate::chain::transaction::{OutPoint, TransactionData};
use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
use crate::events::{self, Event, EventHandler, ReplayEvent};
use crate::ln::channel_state::ChannelDetails;
use crate::ln::msgs::{self, BaseMessageHandler, Init, MessageSendEvent, SendOnlyMessageHandler};
use crate::ln::our_peer_storage::DecryptedOurPeerStorage;
#[cfg(peer_storage)]
use crate::ln::msgs::PeerStorage;
use crate::ln::msgs::{BaseMessageHandler, Init, MessageSendEvent, SendOnlyMessageHandler};
#[cfg(peer_storage)]
use crate::ln::our_peer_storage::{DecryptedOurPeerStorage, PeerStorageMonitorHolder};
use crate::ln::types::ChannelId;
use crate::prelude::*;
use crate::sign::ecdsa::EcdsaChannelSigner;
Expand All @@ -47,8 +52,12 @@ use crate::types::features::{InitFeatures, NodeFeatures};
use crate::util::errors::APIError;
use crate::util::logger::{Logger, WithContext};
use crate::util::persist::MonitorName;
#[cfg(peer_storage)]
use crate::util::ser::{VecWriter, Writeable};
use crate::util::wakers::{Future, Notifier};
use bitcoin::secp256k1::PublicKey;
#[cfg(peer_storage)]
use core::iter::Cycle;
use core::ops::Deref;
use core::sync::atomic::{AtomicUsize, Ordering};

Expand Down Expand Up @@ -264,7 +273,8 @@ pub struct ChainMonitor<
logger: L,
fee_estimator: F,
persister: P,
entropy_source: ES,

_entropy_source: ES,
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
/// from the user and not from a [`ChannelMonitor`].
pending_monitor_events: Mutex<Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)>>,
Expand All @@ -278,6 +288,7 @@ pub struct ChainMonitor<
/// Messages to send to the peer. This is currently used to distribute PeerStorage to channel partners.
pending_send_only_events: Mutex<Vec<MessageSendEvent>>,

#[cfg(peer_storage)]
our_peerstorage_encryption_key: PeerStorageKey,
}

Expand Down Expand Up @@ -477,7 +488,7 @@ where
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
pub fn new(
chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P,
entropy_source: ES, our_peerstorage_encryption_key: PeerStorageKey,
_entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey,
) -> Self {
Self {
monitors: RwLock::new(new_hash_map()),
Expand All @@ -486,12 +497,13 @@ where
logger,
fee_estimator: feeest,
persister,
entropy_source,
_entropy_source,
pending_monitor_events: Mutex::new(Vec::new()),
highest_chain_height: AtomicUsize::new(0),
event_notifier: Notifier::new(),
pending_send_only_events: Mutex::new(Vec::new()),
our_peerstorage_encryption_key,
#[cfg(peer_storage)]
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
}
}

Expand Down Expand Up @@ -804,23 +816,90 @@ where

/// This function collects the counterparty node IDs from all monitors into a `HashSet`,
/// ensuring unique IDs are returned.
#[cfg(peer_storage)]
fn all_counterparty_node_ids(&self) -> HashSet<PublicKey> {
let mon = self.monitors.read().unwrap();
mon.values().map(|monitor| monitor.monitor.get_counterparty_node_id()).collect()
}

#[cfg(peer_storage)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you'll also need to introduce the cfg-gate in a few other places to get rid of the warning, e.g., above for all_counterparty_node_ids and on some imports.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should I also cfg-gate entropy_source inside ChainMonitor?
If yes, I would have to cfg-gate a lot of things in other files as well...
I think, It would be better to allow unused on it, since we are going to remove the peer-storage cfg gate in the next PR...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we need to add the cfg-gate where ever we'll get a warning. I'd prefer to avoid allow_unused, especially as it might easily slip through (i.e., we might not remember to remove it again going forward), while for the cfg guard we will be forced to make the cleanup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so I have cfg-gated at almost all the places in the chainmonitor monitor, but I have decided to add PhantomData in ChainMonitor, to avoid redeclaring the whole struct or the new function.

Copy link
Contributor

@tnull tnull Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned elsewhere, it should be fine to just rename the entropy source _entropy_source for now, no need for PhantomData then.

fn send_peer_storage(&self, their_node_id: PublicKey) {
// TODO: Serialize `ChannelMonitor`s inside `our_peer_storage`.
let mut monitors_list: Vec<PeerStorageMonitorHolder> = Vec::new();
let random_bytes = self._entropy_source.get_secure_random_bytes();

const MAX_PEER_STORAGE_SIZE: usize = 65531;
const USIZE_LEN: usize = core::mem::size_of::<usize>();
let mut random_bytes_cycle_iter = random_bytes.iter().cycle();

let mut current_size = 0;
let monitors_lock = self.monitors.read().unwrap();
let mut channel_ids = monitors_lock.keys().copied().collect();

fn next_random_id(
channel_ids: &mut Vec<ChannelId>,
random_bytes_cycle_iter: &mut Cycle<core::slice::Iter<u8>>,
) -> Option<ChannelId> {
if channel_ids.is_empty() {
return None;
}
let random_idx = {
let mut usize_bytes = [0u8; USIZE_LEN];
usize_bytes.iter_mut().for_each(|b| {
*b = *random_bytes_cycle_iter.next().expect("A cycle never ends")
});
// Take one more to introduce a slight misalignment.
random_bytes_cycle_iter.next().expect("A cycle never ends");
usize::from_le_bytes(usize_bytes) % channel_ids.len()
};
Some(channel_ids.swap_remove(random_idx))
}

while let Some(channel_id) = next_random_id(&mut channel_ids, &mut random_bytes_cycle_iter)
{
let monitor_holder = if let Some(monitor_holder) = monitors_lock.get(&channel_id) {
monitor_holder
} else {
debug_assert!(
false,
"Tried to access non-existing monitor, this should never happen"
);
break;
};

let mut serialized_channel = VecWriter(Vec::new());
let min_seen_secret = monitor_holder.monitor.get_min_seen_secret();
let counterparty_node_id = monitor_holder.monitor.get_counterparty_node_id();
{
let inner_lock = monitor_holder.monitor.inner.lock().unwrap();

write_chanmon_internal(&inner_lock, true, &mut serialized_channel)
.expect("can not write Channel Monitor for peer storage message");
}
let peer_storage_monitor = PeerStorageMonitorHolder {
channel_id,
min_seen_secret,
counterparty_node_id,
monitor_bytes: serialized_channel.0,
};

let serialized_length = peer_storage_monitor.serialized_length();

if current_size + serialized_length > MAX_PEER_STORAGE_SIZE {
continue;
} else {
current_size += serialized_length;
monitors_list.push(peer_storage_monitor);
}
}

let random_bytes = self.entropy_source.get_secure_random_bytes();
let serialised_channels = Vec::new();
let serialised_channels = monitors_list.encode();
let our_peer_storage = DecryptedOurPeerStorage::new(serialised_channels);
let cipher = our_peer_storage.encrypt(&self.our_peerstorage_encryption_key, &random_bytes);

log_debug!(self.logger, "Sending Peer Storage to {}", log_pubkey!(their_node_id));
let send_peer_storage_event = MessageSendEvent::SendPeerStorage {
node_id: their_node_id,
msg: msgs::PeerStorage { data: cipher.into_vec() },
msg: PeerStorage { data: cipher.into_vec() },
};

self.pending_send_only_events.lock().unwrap().push(send_peer_storage_event)
Expand Down Expand Up @@ -920,6 +999,7 @@ where
)
});

#[cfg(peer_storage)]
// Send peer storage everytime a new block arrives.
for node_id in self.all_counterparty_node_ids() {
self.send_peer_storage(node_id);
Expand Down Expand Up @@ -1021,6 +1101,7 @@ where
)
});

#[cfg(peer_storage)]
// Send peer storage everytime a new block arrives.
for node_id in self.all_counterparty_node_ids() {
self.send_peer_storage(node_id);
Expand Down
Loading
Loading