Skip to content

Add outpoint index in watch_outputs to fix tracking #653

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonit

if let Some(ref chain_source) = self.chain_source {
for (txid, outputs) in txn_outputs.drain(..) {
for (idx, output) in outputs.iter().enumerate() {
chain_source.register_output(&OutPoint { txid, index: idx as u16 }, &output.script_pubkey);
for (idx, output) in outputs.iter() {
chain_source.register_output(&OutPoint { txid, index: *idx as u16 }, &output.script_pubkey);
}
}
}
Expand Down Expand Up @@ -152,8 +152,8 @@ impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonit
if let Some(ref chain_source) = self.chain_source {
chain_source.register_tx(&funding_txo.0.txid, &funding_txo.1);
for (txid, outputs) in monitor.get_outputs_to_watch().iter() {
for (idx, script_pubkey) in outputs.iter().enumerate() {
chain_source.register_output(&OutPoint { txid: *txid, index: idx as u16 }, &script_pubkey);
for (idx, script_pubkey) in outputs.iter() {
chain_source.register_output(&OutPoint { txid: *txid, index: *idx as u16 }, script_pubkey);
}
}
}
Expand Down
75 changes: 53 additions & 22 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
// interface knows about the TXOs that we want to be notified of spends of. We could probably
// be smart and derive them from the above storage fields, but its much simpler and more
// Obviously Correct (tm) if we just keep track of them explicitly.
outputs_to_watch: HashMap<Txid, Vec<Script>>,
outputs_to_watch: HashMap<Txid, Vec<(u32, Script)>>,

#[cfg(test)]
pub onchain_tx_handler: OnchainTxHandler<ChanSigner>,
Expand Down Expand Up @@ -914,10 +914,11 @@ impl<ChanSigner: ChannelKeys + Writeable> ChannelMonitor<ChanSigner> {
}

(self.outputs_to_watch.len() as u64).write(writer)?;
for (txid, output_scripts) in self.outputs_to_watch.iter() {
for (txid, idx_scripts) in self.outputs_to_watch.iter() {
txid.write(writer)?;
(output_scripts.len() as u64).write(writer)?;
for script in output_scripts.iter() {
(idx_scripts.len() as u64).write(writer)?;
for (idx, script) in idx_scripts.iter() {
idx.write(writer)?;
script.write(writer)?;
}
}
Expand Down Expand Up @@ -963,7 +964,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
onchain_tx_handler.provide_latest_holder_tx(initial_holder_commitment_tx);

let mut outputs_to_watch = HashMap::new();
outputs_to_watch.insert(funding_info.0.txid, vec![funding_info.1.clone()]);
outputs_to_watch.insert(funding_info.0.txid, vec![(funding_info.0.index as u32, funding_info.1.clone())]);

ChannelMonitor {
latest_update_id: 0,
Expand Down Expand Up @@ -1209,7 +1210,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
/// transaction), which we must learn about spends of via block_connected().
///
/// (C-not exported) because we have no HashMap bindings
pub fn get_outputs_to_watch(&self) -> &HashMap<Txid, Vec<Script>> {
pub fn get_outputs_to_watch(&self) -> &HashMap<Txid, Vec<(u32, Script)>> {
// If we've detected a counterparty commitment tx on chain, we must include it in the set
// of outputs to watch for spends of, otherwise we're likely to lose user funds. Because
// its trivial to do, double-check that here.
Expand Down Expand Up @@ -1264,7 +1265,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
/// HTLC-Success/HTLC-Timeout transactions.
/// Return updates for HTLC pending in the channel and failed automatically by the broadcast of
/// revoked counterparty commitment tx
fn check_spend_counterparty_transaction<L: Deref>(&mut self, tx: &Transaction, height: u32, logger: &L) -> (Vec<ClaimRequest>, (Txid, Vec<TxOut>)) where L::Target: Logger {
fn check_spend_counterparty_transaction<L: Deref>(&mut self, tx: &Transaction, height: u32, logger: &L) -> (Vec<ClaimRequest>, (Txid, Vec<(u32, TxOut)>)) where L::Target: Logger {
// Most secp and related errors trying to create keys means we have no hope of constructing
// a spend transaction...so we return no transactions to broadcast
let mut claimable_outpoints = Vec::new();
Expand Down Expand Up @@ -1319,7 +1320,9 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
if !claimable_outpoints.is_empty() || per_commitment_option.is_some() { // ie we're confident this is actually ours
// We're definitely a counterparty commitment transaction!
log_trace!(logger, "Got broadcast of revoked counterparty commitment transaction, going to generate general spend tx with {} inputs", claimable_outpoints.len());
watch_outputs.append(&mut tx.output.clone());
for (idx, outp) in tx.output.iter().enumerate() {
watch_outputs.push((idx as u32, outp.clone()));
}
self.counterparty_commitment_txn_on_chain.insert(commitment_txid, commitment_number);

macro_rules! check_htlc_fails {
Expand Down Expand Up @@ -1366,7 +1369,9 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
// already processed the block, resulting in the counterparty_commitment_txn_on_chain entry
// not being generated by the above conditional. Thus, to be safe, we go ahead and
// insert it here.
watch_outputs.append(&mut tx.output.clone());
for (idx, outp) in tx.output.iter().enumerate() {
watch_outputs.push((idx as u32, outp.clone()));
}
self.counterparty_commitment_txn_on_chain.insert(commitment_txid, commitment_number);

log_trace!(logger, "Got broadcast of non-revoked counterparty commitment transaction {}", commitment_txid);
Expand Down Expand Up @@ -1456,7 +1461,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
}

/// Attempts to claim a counterparty HTLC-Success/HTLC-Timeout's outputs using the revocation key
fn check_spend_counterparty_htlc<L: Deref>(&mut self, tx: &Transaction, commitment_number: u64, height: u32, logger: &L) -> (Vec<ClaimRequest>, Option<(Txid, Vec<TxOut>)>) where L::Target: Logger {
fn check_spend_counterparty_htlc<L: Deref>(&mut self, tx: &Transaction, commitment_number: u64, height: u32, logger: &L) -> (Vec<ClaimRequest>, Option<(Txid, Vec<(u32, TxOut)>)>) where L::Target: Logger {
let htlc_txid = tx.txid();
if tx.input.len() != 1 || tx.output.len() != 1 || tx.input[0].witness.len() != 5 {
return (Vec::new(), None)
Expand All @@ -1478,10 +1483,11 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
log_trace!(logger, "Counterparty HTLC broadcast {}:{}", htlc_txid, 0);
let witness_data = InputMaterial::Revoked { per_commitment_point, counterparty_delayed_payment_base_key: self.counterparty_tx_cache.counterparty_delayed_payment_base_key, counterparty_htlc_base_key: self.counterparty_tx_cache.counterparty_htlc_base_key, per_commitment_key, input_descriptor: InputDescriptors::RevokedOutput, amount: tx.output[0].value, htlc: None, on_counterparty_tx_csv: self.counterparty_tx_cache.on_counterparty_tx_csv };
let claimable_outpoints = vec!(ClaimRequest { absolute_timelock: height + self.counterparty_tx_cache.on_counterparty_tx_csv as u32, aggregable: true, outpoint: BitcoinOutPoint { txid: htlc_txid, vout: 0}, witness_data });
(claimable_outpoints, Some((htlc_txid, tx.output.clone())))
let outputs = vec![(0, tx.output[0].clone())];
(claimable_outpoints, Some((htlc_txid, outputs)))
}

fn broadcast_by_holder_state(&self, commitment_tx: &Transaction, holder_tx: &HolderSignedTx) -> (Vec<ClaimRequest>, Vec<TxOut>, Option<(Script, PublicKey, PublicKey)>) {
fn broadcast_by_holder_state(&self, commitment_tx: &Transaction, holder_tx: &HolderSignedTx) -> (Vec<ClaimRequest>, Vec<(u32, TxOut)>, Option<(Script, PublicKey, PublicKey)>) {
let mut claim_requests = Vec::with_capacity(holder_tx.htlc_outputs.len());
let mut watch_outputs = Vec::with_capacity(holder_tx.htlc_outputs.len());

Expand All @@ -1502,7 +1508,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
} else { None },
amount: htlc.amount_msat,
}});
watch_outputs.push(commitment_tx.output[transaction_output_index as usize].clone());
watch_outputs.push((transaction_output_index, commitment_tx.output[transaction_output_index as usize].clone()));
}
}

Expand All @@ -1512,7 +1518,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
/// Attempts to claim any claimable HTLCs in a commitment transaction which was not (yet)
/// revoked using data in holder_claimable_outpoints.
/// Should not be used if check_spend_revoked_transaction succeeds.
fn check_spend_holder_transaction<L: Deref>(&mut self, tx: &Transaction, height: u32, logger: &L) -> (Vec<ClaimRequest>, (Txid, Vec<TxOut>)) where L::Target: Logger {
fn check_spend_holder_transaction<L: Deref>(&mut self, tx: &Transaction, height: u32, logger: &L) -> (Vec<ClaimRequest>, (Txid, Vec<(u32, TxOut)>)) where L::Target: Logger {
let commitment_txid = tx.txid();
let mut claim_requests = Vec::new();
let mut watch_outputs = Vec::new();
Expand Down Expand Up @@ -1662,7 +1668,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
/// [`get_outputs_to_watch`].
///
/// [`get_outputs_to_watch`]: #method.get_outputs_to_watch
pub fn block_connected<B: Deref, F: Deref, L: Deref>(&mut self, header: &BlockHeader, txdata: &TransactionData, height: u32, broadcaster: B, fee_estimator: F, logger: L)-> Vec<(Txid, Vec<TxOut>)>
pub fn block_connected<B: Deref, F: Deref, L: Deref>(&mut self, header: &BlockHeader, txdata: &TransactionData, height: u32, broadcaster: B, fee_estimator: F, logger: L)-> Vec<(Txid, Vec<(u32, TxOut)>)>
where B::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
Expand Down Expand Up @@ -1763,9 +1769,23 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
// Determine new outputs to watch by comparing against previously known outputs to watch,
// updating the latter in the process.
watch_outputs.retain(|&(ref txid, ref txouts)| {
let output_scripts = txouts.iter().map(|o| o.script_pubkey.clone()).collect();
self.outputs_to_watch.insert(txid.clone(), output_scripts).is_none()
let idx_and_scripts = txouts.iter().map(|o| (o.0, o.1.script_pubkey.clone())).collect();
self.outputs_to_watch.insert(txid.clone(), idx_and_scripts).is_none()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we iterate the new watch txn to assert they're known types so that the panic!() two hunks down is definitely correct?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, its all test-only, it doesnt matter.

});
#[cfg(test)]
{
// If we see a transaction for which we registered outputs previously,
// make sure the registered scriptpubkey at the expected index match
// the actual transaction output one. We failed this case before #653.
for tx in &txn_matched {
if let Some(outputs) = self.get_outputs_to_watch().get(&tx.txid()) {
for idx_and_script in outputs.iter() {
assert!((idx_and_script.0 as usize) < tx.output.len());
assert_eq!(tx.output[idx_and_script.0 as usize].script_pubkey, idx_and_script.1);
}
}
}
}
watch_outputs
}

Expand Down Expand Up @@ -1813,8 +1833,19 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
fn spends_watched_output(&self, tx: &Transaction) -> bool {
for input in tx.input.iter() {
if let Some(outputs) = self.get_outputs_to_watch().get(&input.previous_output.txid) {
for (idx, _script_pubkey) in outputs.iter().enumerate() {
if idx == input.previous_output.vout as usize {
for (idx, _script_pubkey) in outputs.iter() {
if *idx == input.previous_output.vout {
#[cfg(test)]
{
// If the expected script is a known type, check that the witness
// appears to be spending the correct type (ie that the match would
// actually succeed in BIP 158/159-style filters).
if _script_pubkey.is_v0_p2wsh() {
assert_eq!(&bitcoin::Address::p2wsh(&Script::from(input.witness.last().unwrap().clone()), bitcoin::Network::Bitcoin).script_pubkey(), _script_pubkey);
} else if _script_pubkey.is_v0_p2wpkh() {
assert_eq!(&bitcoin::Address::p2wpkh(&bitcoin::PublicKey::from_slice(&input.witness.last().unwrap()).unwrap(), bitcoin::Network::Bitcoin).unwrap().script_pubkey(), _script_pubkey);
} else { panic!(); }
}
return true;
}
}
Expand Down Expand Up @@ -2316,13 +2347,13 @@ impl<ChanSigner: ChannelKeys + Readable> Readable for (BlockHash, ChannelMonitor
}

let outputs_to_watch_len: u64 = Readable::read(reader)?;
let mut outputs_to_watch = HashMap::with_capacity(cmp::min(outputs_to_watch_len as usize, MAX_ALLOC_SIZE / (mem::size_of::<Txid>() + mem::size_of::<Vec<Script>>())));
let mut outputs_to_watch = HashMap::with_capacity(cmp::min(outputs_to_watch_len as usize, MAX_ALLOC_SIZE / (mem::size_of::<Txid>() + mem::size_of::<u32>() + mem::size_of::<Vec<Script>>())));
for _ in 0..outputs_to_watch_len {
let txid = Readable::read(reader)?;
let outputs_len: u64 = Readable::read(reader)?;
let mut outputs = Vec::with_capacity(cmp::min(outputs_len as usize, MAX_ALLOC_SIZE / mem::size_of::<Script>()));
let mut outputs = Vec::with_capacity(cmp::min(outputs_len as usize, MAX_ALLOC_SIZE / (mem::size_of::<u32>() + mem::size_of::<Script>())));
for _ in 0..outputs_len {
outputs.push(Readable::read(reader)?);
outputs.push((Readable::read(reader)?, Readable::read(reader)?));
}
if let Some(_) = outputs_to_watch.insert(txid, outputs) {
return Err(DecodeError::InvalidValue);
Expand Down
101 changes: 47 additions & 54 deletions lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7157,60 +7157,6 @@ fn test_failure_delay_dust_htlc_local_commitment() {
do_test_failure_delay_dust_htlc_local_commitment(false);
}

#[test]
fn test_no_failure_dust_htlc_local_commitment() {
// Transaction filters for failing back dust htlc based on local commitment txn infos has been
// prone to error, we test here that a dummy transaction don't fail them.

let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let chan = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());

// Rebalance a bit
send_payment(&nodes[0], &vec!(&nodes[1])[..], 8000000, 8_000_000);

let as_dust_limit = nodes[0].node.channel_state.lock().unwrap().by_id.get(&chan.2).unwrap().holder_dust_limit_satoshis;
let bs_dust_limit = nodes[1].node.channel_state.lock().unwrap().by_id.get(&chan.2).unwrap().holder_dust_limit_satoshis;

// We route 2 dust-HTLCs between A and B
let (preimage_1, _) = route_payment(&nodes[0], &[&nodes[1]], bs_dust_limit*1000);
let (preimage_2, _) = route_payment(&nodes[1], &[&nodes[0]], as_dust_limit*1000);

// Build a dummy invalid transaction trying to spend a commitment tx
let input = TxIn {
previous_output: BitcoinOutPoint { txid: chan.3.txid(), vout: 0 },
script_sig: Script::new(),
sequence: 0,
witness: Vec::new(),
};

let outp = TxOut {
script_pubkey: Builder::new().push_opcode(opcodes::all::OP_RETURN).into_script(),
value: 10000,
};

let dummy_tx = Transaction {
version: 2,
lock_time: 0,
input: vec![input],
output: vec![outp]
};

let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
nodes[0].chain_monitor.chain_monitor.block_connected(&header, &[(0, &dummy_tx)], 1);
assert_eq!(nodes[0].node.get_and_clear_pending_events().len(), 0);
assert_eq!(nodes[0].node.get_and_clear_pending_msg_events().len(), 0);
// We broadcast a few more block to check everything is all right
connect_blocks(&nodes[0], 20, 1, true, header.block_hash());
assert_eq!(nodes[0].node.get_and_clear_pending_events().len(), 0);
assert_eq!(nodes[0].node.get_and_clear_pending_msg_events().len(), 0);

claim_payment(&nodes[0], &vec!(&nodes[1])[..], preimage_1, bs_dust_limit*1000);
claim_payment(&nodes[1], &vec!(&nodes[0])[..], preimage_2, as_dust_limit*1000);
}

fn do_test_sweep_outbound_htlc_failure_update(revoked: bool, local: bool) {
// Outbound HTLC-failure updates must be cancelled if we get a reorg before we reach ANTI_REORG_DELAY.
// Broadcast of revoked remote commitment tx, trigger failure-update of dust/non-dust HTLCs
Expand Down Expand Up @@ -8497,3 +8443,50 @@ fn test_concurrent_monitor_claim() {
check_spends!(htlc_txn[1], bob_state_y);
}
}

#[test]
fn test_htlc_no_detection() {
// This test is a mutation to underscore the detection logic bug we had
// before #653. HTLC value routed is above the remaining balance, thus
// inverting HTLC and `to_remote` output. HTLC will come second and
// it wouldn't be seen by pre-#653 detection as we were enumerate()'ing
// on a watched outputs vector (Vec<TxOut>) thus implicitly relying on
// outputs order detection for correct spending children filtring.

let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);

// Create some initial channels
let chan_1 = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100000, 10001, InitFeatures::known(), InitFeatures::known());

send_payment(&nodes[0], &vec!(&nodes[1])[..], 1_000_000, 1_000_000);
let (_, our_payment_hash) = route_payment(&nodes[0], &vec!(&nodes[1])[..], 2_000_000);
let local_txn = get_local_commitment_txn!(nodes[0], chan_1.2);
assert_eq!(local_txn[0].input.len(), 1);
assert_eq!(local_txn[0].output.len(), 3);
check_spends!(local_txn[0], chan_1.3);

// Timeout HTLC on A's chain and so it can generate a HTLC-Timeout tx
let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
connect_block(&nodes[0], &Block { header, txdata: vec![local_txn[0].clone()] }, 200);
// We deliberately connect the local tx twice as this should provoke a failure calling
// this test before #653 fix.
connect_block(&nodes[0], &Block { header, txdata: vec![local_txn[0].clone()] }, 200);
check_closed_broadcast!(nodes[0], false);
check_added_monitors!(nodes[0], 1);

let htlc_timeout = {
let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
assert_eq!(node_txn[0].input.len(), 1);
assert_eq!(node_txn[0].input[0].witness.last().unwrap().len(), OFFERED_HTLC_SCRIPT_WEIGHT);
check_spends!(node_txn[0], local_txn[0]);
node_txn[0].clone()
};

let header_201 = BlockHeader { version: 0x20000000, prev_blockhash: header.block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
connect_block(&nodes[0], &Block { header: header_201, txdata: vec![htlc_timeout.clone()] }, 201);
connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1, 201, true, header_201.block_hash());
expect_payment_failed!(nodes[0], our_payment_hash, true);
}