Skip to content

Move pending-HTLC-updated ChannelMonitor from ManyChannelMonitor #474

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ impl channelmonitor::ManyChannelMonitor<EnforcingChannelKeys> for TestChannelMon
ret
}

fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
return self.simple_monitor.fetch_pending_htlc_updated();
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
return self.simple_monitor.get_and_clear_pending_htlcs_updated();
}
}

Expand Down
4 changes: 2 additions & 2 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2549,7 +2549,7 @@ impl<ChanSigner: ChannelKeys, M: Deref> events::MessageSendEventsProvider for Ch
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
{
//TODO: This behavior should be documented.
for htlc_update in self.monitor.fetch_pending_htlc_updated() {
for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
if let Some(preimage) = htlc_update.payment_preimage {
log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
Expand All @@ -2574,7 +2574,7 @@ impl<ChanSigner: ChannelKeys, M: Deref> events::EventsProvider for ChannelManage
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
{
//TODO: This behavior should be documented.
for htlc_update in self.monitor.fetch_pending_htlc_updated() {
for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
if let Some(preimage) = htlc_update.payment_preimage {
log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
Expand Down
117 changes: 57 additions & 60 deletions lightning/src/ln/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ pub struct MonitorUpdateError(pub &'static str);

/// Simple structure send back by ManyChannelMonitor in case of HTLC detected onchain from a
/// forward channel and from which info are needed to update HTLC in a backward channel.
#[derive(Clone, PartialEq)]
pub struct HTLCUpdate {
pub(super) payment_hash: PaymentHash,
pub(super) payment_preimage: Option<PaymentPreimage>,
pub(super) source: HTLCSource
}
impl_writeable!(HTLCUpdate, 0, { payment_hash, payment_preimage, source });

/// Simple trait indicating ability to track a set of ChannelMonitors and multiplex events between
/// them. Generally should be implemented by keeping a local SimpleManyChannelMonitor and passing
Expand Down Expand Up @@ -130,8 +132,12 @@ pub trait ManyChannelMonitor<ChanSigner: ChannelKeys>: Send + Sync {
fn add_update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), ChannelMonitorUpdateErr>;

/// Used by ChannelManager to get list of HTLC resolved onchain and which needed to be updated
/// with success or failure backward
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate>;
/// with success or failure.
///
/// You should probably just call through to
/// ChannelMonitor::get_and_clear_pending_htlcs_updated() for each ChannelMonitor and return
/// the full list.
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate>;
}

/// A simple implementation of a ManyChannelMonitor and ChainListener. Can be used to create a
Expand All @@ -153,7 +159,6 @@ pub struct SimpleManyChannelMonitor<Key, ChanSigner: ChannelKeys> {
chain_monitor: Arc<ChainWatchInterface>,
broadcaster: Arc<BroadcasterInterface>,
pending_events: Mutex<Vec<events::Event>>,
pending_htlc_updated: Mutex<HashMap<PaymentHash, Vec<(HTLCSource, Option<PaymentPreimage>)>>>,
logger: Arc<Logger>,
fee_estimator: Arc<FeeEstimator>
}
Expand All @@ -162,11 +167,10 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys> ChainListen
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
let block_hash = header.bitcoin_hash();
let mut new_events: Vec<events::Event> = Vec::with_capacity(0);
let mut htlc_updated_infos = Vec::new();
{
let mut monitors = self.monitors.lock().unwrap();
for monitor in monitors.values_mut() {
let (txn_outputs, spendable_outputs, mut htlc_updated) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
if spendable_outputs.len() > 0 {
new_events.push(events::Event::SpendableOutputs {
outputs: spendable_outputs,
Expand All @@ -178,35 +182,6 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys> ChainListen
self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey);
}
}
htlc_updated_infos.append(&mut htlc_updated);
}
}
{
// ChannelManager will just need to fetch pending_htlc_updated and pass state backward
let mut pending_htlc_updated = self.pending_htlc_updated.lock().unwrap();
for htlc in htlc_updated_infos.drain(..) {
match pending_htlc_updated.entry(htlc.2) {
hash_map::Entry::Occupied(mut e) => {
// In case of reorg we may have htlc outputs solved in a different way so
// we prefer to keep claims but don't store duplicate updates for a given
// (payment_hash, HTLCSource) pair.
let mut existing_claim = false;
e.get_mut().retain(|htlc_data| {
if htlc.0 == htlc_data.0 {
if htlc_data.1.is_some() {
existing_claim = true;
true
} else { false }
} else { true }
});
if !existing_claim {
e.get_mut().push((htlc.0, htlc.1));
}
}
hash_map::Entry::Vacant(e) => {
e.insert(vec![(htlc.0, htlc.1)]);
}
}
}
}
let mut pending_events = self.pending_events.lock().unwrap();
Expand All @@ -231,7 +206,6 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys> Simpl
chain_monitor,
broadcaster,
pending_events: Mutex::new(Vec::new()),
pending_htlc_updated: Mutex::new(HashMap::new()),
logger,
fee_estimator: feeest,
};
Expand Down Expand Up @@ -284,17 +258,10 @@ impl<ChanSigner: ChannelKeys> ManyChannelMonitor<ChanSigner> for SimpleManyChann
}
}

fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
let mut updated = self.pending_htlc_updated.lock().unwrap();
let mut pending_htlcs_updated = Vec::with_capacity(updated.len());
for (k, v) in updated.drain() {
for htlc_data in v {
pending_htlcs_updated.push(HTLCUpdate {
payment_hash: k,
payment_preimage: htlc_data.1,
source: htlc_data.0,
});
}
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
let mut pending_htlcs_updated = Vec::new();
for chan in self.monitors.lock().unwrap().values_mut() {
pending_htlcs_updated.append(&mut chan.get_and_clear_pending_htlcs_updated());
}
pending_htlcs_updated
}
Expand Down Expand Up @@ -640,6 +607,8 @@ pub struct ChannelMonitor<ChanSigner: ChannelKeys> {

payment_preimages: HashMap<PaymentHash, PaymentPreimage>,

pending_htlcs_updated: Vec<HTLCUpdate>,

destination_script: Script,
// Thanks to data loss protection, we may be able to claim our non-htlc funds
// back, this is the script we have to spend from but we need to
Expand Down Expand Up @@ -750,6 +719,7 @@ impl<ChanSigner: ChannelKeys> PartialEq for ChannelMonitor<ChanSigner> {
self.current_remote_commitment_number != other.current_remote_commitment_number ||
self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx ||
self.payment_preimages != other.payment_preimages ||
self.pending_htlcs_updated != other.pending_htlcs_updated ||
self.destination_script != other.destination_script ||
self.to_remote_rescue != other.to_remote_rescue ||
self.pending_claim_requests != other.pending_claim_requests ||
Expand Down Expand Up @@ -938,6 +908,11 @@ impl<ChanSigner: ChannelKeys + Writeable> ChannelMonitor<ChanSigner> {
writer.write_all(&payment_preimage.0[..])?;
}

writer.write_all(&byte_utils::be64_to_array(self.pending_htlcs_updated.len() as u64))?;
for data in self.pending_htlcs_updated.iter() {
data.write(writer)?;
}

self.last_block_hash.write(writer)?;
self.destination_script.write(writer)?;
if let Some((ref to_remote_script, ref local_key)) = self.to_remote_rescue {
Expand Down Expand Up @@ -1056,6 +1031,8 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
current_remote_commitment_number: 1 << 48,

payment_preimages: HashMap::new(),
pending_htlcs_updated: Vec::new(),

destination_script: destination_script,
to_remote_rescue: None,

Expand Down Expand Up @@ -1419,6 +1396,14 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
res
}

/// Get the list of HTLCs who's status has been updated on chain. This should be called by
/// ChannelManager via ManyChannelMonitor::get_and_clear_pending_htlcs_updated().
pub fn get_and_clear_pending_htlcs_updated(&mut self) -> Vec<HTLCUpdate> {
let mut ret = Vec::new();
mem::swap(&mut ret, &mut self.pending_htlcs_updated);
ret
}

/// Can only fail if idx is < get_min_seen_secret
pub(super) fn get_secret(&self, idx: u64) -> Option<[u8; 32]> {
for i in 0..self.old_secrets.len() {
Expand Down Expand Up @@ -2402,7 +2387,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
/// Eventually this should be pub and, roughly, implement ChainListener, however this requires
/// &mut self, as well as returns new spendable outputs and outpoints to watch for spending of
/// on-chain.
fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>, Vec<(HTLCSource, Option<PaymentPreimage>, PaymentHash)>) {
fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>) {
for tx in txn_matched {
let mut output_val = 0;
for out in tx.output.iter() {
Expand All @@ -2415,7 +2400,6 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
log_trace!(self, "Block {} at height {} connected with {} txn matched", block_hash, height, txn_matched.len());
let mut watch_outputs = Vec::new();
let mut spendable_outputs = Vec::new();
let mut htlc_updated = Vec::new();
let mut bump_candidates = HashSet::new();
for tx in txn_matched {
if tx.input.len() == 1 {
Expand Down Expand Up @@ -2474,10 +2458,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
// While all commitment/HTLC-Success/HTLC-Timeout transactions have one input, HTLCs
// can also be resolved in a few other ways which can have more than one output. Thus,
// we call is_resolving_htlc_output here outside of the tx.input.len() == 1 check.
let mut updated = self.is_resolving_htlc_output(&tx, height);
if updated.len() > 0 {
htlc_updated.append(&mut updated);
}
self.is_resolving_htlc_output(&tx, height);

// Scan all input to verify is one of the outpoint spent is of interest for us
let mut claimed_outputs_material = Vec::new();
Expand Down Expand Up @@ -2600,7 +2581,11 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
},
OnchainEvent::HTLCUpdate { htlc_update } => {
log_trace!(self, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0));
htlc_updated.push((htlc_update.0, None, htlc_update.1));
self.pending_htlcs_updated.push(HTLCUpdate {
payment_hash: htlc_update.1,
payment_preimage: None,
source: htlc_update.0,
});
},
OnchainEvent::ContentiousOutpoint { outpoint, .. } => {
self.claimable_outpoints.remove(&outpoint);
Expand Down Expand Up @@ -2632,7 +2617,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
for &(ref txid, ref output_scripts) in watch_outputs.iter() {
self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect());
}
(watch_outputs, spendable_outputs, htlc_updated)
(watch_outputs, spendable_outputs)
}

fn block_disconnected(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator) {
Expand Down Expand Up @@ -2752,9 +2737,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {

/// Check if any transaction broadcasted is resolving HTLC output by a success or timeout on a local
/// or remote commitment tx, if so send back the source, preimage if found and payment_hash of resolved HTLC
fn is_resolving_htlc_output(&mut self, tx: &Transaction, height: u32) -> Vec<(HTLCSource, Option<PaymentPreimage>, PaymentHash)> {
let mut htlc_updated = Vec::new();

fn is_resolving_htlc_output(&mut self, tx: &Transaction, height: u32) {
'outer_loop: for input in &tx.input {
let mut payment_data = None;
let revocation_sig_claim = (input.witness.len() == 3 && HTLCType::scriptlen_to_htlctype(input.witness[2].len()) == Some(HTLCType::OfferedHTLC) && input.witness[1].len() == 33)
Expand Down Expand Up @@ -2854,10 +2837,18 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
let mut payment_preimage = PaymentPreimage([0; 32]);
if accepted_preimage_claim {
payment_preimage.0.copy_from_slice(&input.witness[3]);
htlc_updated.push((source, Some(payment_preimage), payment_hash));
self.pending_htlcs_updated.push(HTLCUpdate {
source,
payment_preimage: Some(payment_preimage),
payment_hash
});
} else if offered_preimage_claim {
payment_preimage.0.copy_from_slice(&input.witness[1]);
htlc_updated.push((source, Some(payment_preimage), payment_hash));
self.pending_htlcs_updated.push(HTLCUpdate {
source,
payment_preimage: Some(payment_preimage),
payment_hash
});
} else {
log_info!(self, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), height + ANTI_REORG_DELAY - 1);
match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
Expand All @@ -2880,7 +2871,6 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
}
}
}
htlc_updated
}

/// Lightning security model (i.e being able to redeem/timeout HTLC or penalize coutnerparty onchain) lays on the assumption of claim transactions getting confirmed before timelock expiration
Expand Down Expand Up @@ -3221,6 +3211,12 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,
}
}

let pending_htlcs_updated_len: u64 = Readable::read(reader)?;
let mut pending_htlcs_updated = Vec::with_capacity(cmp::min(pending_htlcs_updated_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)));
for _ in 0..pending_htlcs_updated_len {
pending_htlcs_updated.push(Readable::read(reader)?);
}

let last_block_hash: Sha256dHash = Readable::read(reader)?;
let destination_script = Readable::read(reader)?;
let to_remote_rescue = match <u8 as Readable<R>>::read(reader)? {
Expand Down Expand Up @@ -3321,6 +3317,7 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,
current_remote_commitment_number,

payment_preimages,
pending_htlcs_updated,

destination_script,
to_remote_rescue,
Expand Down
14 changes: 14 additions & 0 deletions lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,20 @@ macro_rules! expect_payment_sent {
}
}

macro_rules! expect_payment_failed {
($node: expr, $expected_payment_hash: expr, $rejected_by_dest: expr) => {
let events = $node.node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
match events[0] {
Event::PaymentFailed { ref payment_hash, rejected_by_dest, .. } => {
assert_eq!(*payment_hash, $expected_payment_hash);
assert_eq!(rejected_by_dest, $rejected_by_dest);
},
_ => panic!("Unexpected event"),
}
}
}

pub fn send_along_route_with_hash<'a, 'b>(origin_node: &Node<'a, 'b>, route: Route, expected_route: &[&Node<'a, 'b>], recv_value: u64, our_payment_hash: PaymentHash) {
let mut payment_event = {
origin_node.node.send_payment(route, our_payment_hash).unwrap();
Expand Down
2 changes: 2 additions & 0 deletions lightning/src/ln/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ pub(crate) mod functional_test_utils;
mod functional_tests;
#[cfg(test)]
mod chanmon_update_fail_tests;
#[cfg(test)]
mod reorg_tests;
Loading