Skip to content

[Ready for Review] Track in-flight solving tx to delay HTLC failure update until enough confirmations #305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions fuzz/fuzz_targets/full_stack_target.rs

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions src/chain/chaininterface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ pub trait ChainListener: Sync + Send {
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]);
/// Notifies a listener that a block was disconnected.
/// Unlike block_connected, this *must* never be called twice for the same disconnect event.
fn block_disconnected(&self, header: &BlockHeader);
///
/// Provide listeners with filtered txn previously registered as watched because channels are
/// driven by onchain events (tx broadcast, height), a cancel of one of them may conduct to
/// rollback state (ChannelMonitor or Channel).
fn block_disconnected(&self, header: &BlockHeader, height: u32);
}

/// An enum that represents the speed at which we want a transaction to confirm used for feerate
Expand Down Expand Up @@ -279,11 +283,11 @@ impl ChainWatchInterfaceUtil {
}

/// Notify listeners that a block was disconnected.
pub fn block_disconnected(&self, header: &BlockHeader) {
pub fn block_disconnected(&self, block: &Block, height: u32) {
let listeners = self.listeners.lock().unwrap().clone();
for listener in listeners.iter() {
match listener.upgrade() {
Some(arc) => arc.block_disconnected(header),
Some(arc) => arc.block_disconnected(&block.header, height),
None => ()
}
}
Expand Down
90 changes: 75 additions & 15 deletions src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ pub struct ChannelManager {
channel_state: Mutex<ChannelHolder>,
our_network_key: SecretKey,

channel_closing_waiting_threshold_conf: Mutex<HashMap<u32, Vec<[u8; 32]>>>,

pending_events: Mutex<Vec<events::Event>>,
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
/// Essentially just when we're serializing ourselves out.
Expand Down Expand Up @@ -556,6 +558,8 @@ impl ChannelManager {
}),
our_network_key: keys_manager.get_node_secret(),

channel_closing_waiting_threshold_conf: Mutex::new(HashMap::new()),

pending_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),

Expand Down Expand Up @@ -2400,11 +2404,12 @@ impl ChainListener for ChannelManager {
let _ = self.total_consistency_lock.read().unwrap();
let mut failed_channels = Vec::new();
{
let mut channel_closing_lock = self.channel_closing_waiting_threshold_conf.lock().unwrap();
let mut channel_lock = self.channel_state.lock().unwrap();
let channel_state = channel_lock.borrow_parts();
let short_to_id = channel_state.short_to_id;
let pending_msg_events = channel_state.pending_msg_events;
channel_state.by_id.retain(|_, channel| {
channel_state.by_id.retain(|channel_id, channel| {
let chan_res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched);
if let Ok(Some(funding_locked)) = chan_res {
pending_msg_events.push(events::MessageSendEvent::SendFundingLocked {
Expand All @@ -2429,20 +2434,24 @@ impl ChainListener for ChannelManager {
for tx in txn_matched {
for inp in tx.input.iter() {
if inp.previous_output == funding_txo.into_bitcoin_outpoint() {
log_trace!(self, "Detected channel-closing tx {} spending {}:{}, closing channel {}", tx.txid(), inp.previous_output.txid, inp.previous_output.vout, log_bytes!(channel.channel_id()));
if let Some(short_id) = channel.get_short_channel_id() {
short_to_id.remove(&short_id);
}
// It looks like our counterparty went on-chain. We go ahead and
// broadcast our latest local state as well here, just in case its
// some kind of SPV attack, though we expect these to be dropped.
failed_channels.push(channel.force_shutdown());
if let Ok(update) = self.get_channel_update(&channel) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
log_trace!(self, "Detected channel-closing tx {} spending {}:{}, waiting until {} to close channel {}", tx.txid(), inp.previous_output.txid, inp.previous_output.vout, height + HTLC_FAIL_ANTI_REORG_DELAY - 1, log_bytes!(channel_id[..]));
match channel_closing_lock.entry(height + HTLC_FAIL_ANTI_REORG_DELAY - 1) {
hash_map::Entry::Occupied(mut entry) => {
let mut duplicate = false;
for id in entry.get().iter() {
if *id == *channel_id {
duplicate = true;
break;
}
}
if !duplicate {
entry.get_mut().push(*channel_id);
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert(vec![*channel_id]);
}
}
return false;
}
}
}
Expand All @@ -2465,6 +2474,25 @@ impl ChainListener for ChannelManager {
}
true
});
if let Some(channel_closings) = channel_closing_lock.remove(&height) {
for channel_id in channel_closings {
log_trace!(self, "Enough confirmations for a broacast commitment tx, channel {} can be closed", log_bytes!(&channel_id[..]));
if let Some(mut channel) = channel_state.by_id.remove(&channel_id) {
if let Some(short_id) = channel.get_short_channel_id() {
short_to_id.remove(&short_id);
}
// It looks like our counterparty went on-chain. We go ahead and
// broadcast our latest local state as well here, just in case its
// some kind of SPV attack, though we expect these to be dropped.
failed_channels.push(channel.force_shutdown());
if let Ok(update) = self.get_channel_update(&channel) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
}
}
}
}
for failure in failed_channels.drain(..) {
self.finish_force_close_channel(failure);
Expand All @@ -2474,7 +2502,7 @@ impl ChainListener for ChannelManager {
}

/// We force-close the channel without letting our counterparty participate in the shutdown
fn block_disconnected(&self, header: &BlockHeader) {
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
let _ = self.total_consistency_lock.read().unwrap();
let mut failed_channels = Vec::new();
{
Expand All @@ -2499,6 +2527,12 @@ impl ChainListener for ChannelManager {
}
});
}
{
let mut channel_closing_lock = self.channel_closing_waiting_threshold_conf.lock().unwrap();
if let Some(_) = channel_closing_lock.remove(&(height + HTLC_FAIL_ANTI_REORG_DELAY - 1)) {
// We discard channel_closing there as brooadcast commitment tx has been disconnected, (and may be replaced by a legit closing_signed)
}
}
for failure in failed_channels.drain(..) {
self.finish_force_close_channel(failure);
}
Expand Down Expand Up @@ -2936,6 +2970,15 @@ impl Writeable for ChannelManager {
}
}

let channel_closing_lock = self.channel_closing_waiting_threshold_conf.lock().unwrap();
(channel_closing_lock.len() as u64).write(writer)?;
for (confirmation_height, channel_id) in channel_closing_lock.iter() {
confirmation_height.write(writer)?;
for id in channel_id {
id.write(writer)?;
}
}

Ok(())
}
}
Expand Down Expand Up @@ -3073,6 +3116,21 @@ impl<'a, R : ::std::io::Read> ReadableArgs<R, ChannelManagerReadArgs<'a>> for (S
claimable_htlcs.insert(payment_hash, previous_hops);
}

let channel_closing_count: u64 = Readable::read(reader)?;
let mut channel_closing: HashMap<u32, Vec<[u8; 32]>> = HashMap::with_capacity(cmp::min(channel_closing_count as usize, 32));
for _ in 0..channel_closing_count {
let confirmation_height: u32 = Readable::read(reader)?;
let channel_id: [u8; 32] = Readable::read(reader)?;
match channel_closing.entry(confirmation_height) {
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().push(channel_id);
}
hash_map::Entry::Vacant(entry) => {
entry.insert(vec![channel_id]);
}
}
}

let channel_manager = ChannelManager {
genesis_hash,
fee_estimator: args.fee_estimator,
Expand All @@ -3094,6 +3152,8 @@ impl<'a, R : ::std::io::Read> ReadableArgs<R, ChannelManagerReadArgs<'a>> for (S
}),
our_network_key: args.keys_manager.get_node_secret(),

channel_closing_waiting_threshold_conf: Mutex::new(channel_closing),

pending_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),
keys_manager: args.keys_manager,
Expand Down
Loading