Skip to content

Commit 5e43070

Browse files
committed
Move pending-HTLC-updated ChannelMonitor from ManyChannelMonitor
This is important for a number of reasons: * Firstly, I hit this trying to implement rescan in the demo bitcoinrpc client - if individual ChannelMonitors are out of sync with each other, we cannot add them all into a ManyChannelMonitor together and then rescan, but need to rescan them individually without having to do a bunch of manual work. Of the three return values in ChannelMonitor::block_connected, only the HTLCsource stuff that is moved here makes no sense to be exposed to the user. * Secondly, the logic currently in ManyChannelMonitor cannot be reproduced by the user! HTLCSource is deliberately an opaque type but we use its data to decide which things to keep when inserting into the HashMap. This would prevent a user from properly implementing a replacement ManyChannelMonitor, which is unacceptable. * Finally, by moving the tracking into ChannelMonitor, we can serialize them out, which prevents us from forgetting them when loading from disk, though there are still other races which need to be handled to make this fully safe (see TODOs in ChannelManager). This is safe as no two entries can have the same HTLCSource across different channels (or, if they did, it would be a rather serious bug), though note that, IIRC, when this code was added, the HTLCSource field in the values was not present. We also take this opportunity to rename the fetch function to match our other event interfaces, makaing it clear that by calling the function the set of HTLCUpdates will also be cleared.
1 parent 9666fcc commit 5e43070

File tree

4 files changed

+63
-66
lines changed

4 files changed

+63
-66
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ impl channelmonitor::ManyChannelMonitor<EnforcingChannelKeys> for TestChannelMon
121121
ret
122122
}
123123

124-
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
125-
return self.simple_monitor.fetch_pending_htlc_updated();
124+
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
125+
return self.simple_monitor.get_and_clear_pending_htlcs_updated();
126126
}
127127
}
128128

lightning/src/ln/channelmanager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2549,7 +2549,7 @@ impl<ChanSigner: ChannelKeys, M: Deref> events::MessageSendEventsProvider for Ch
25492549
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
25502550
{
25512551
//TODO: This behavior should be documented.
2552-
for htlc_update in self.monitor.fetch_pending_htlc_updated() {
2552+
for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
25532553
if let Some(preimage) = htlc_update.payment_preimage {
25542554
log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
25552555
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
@@ -2574,7 +2574,7 @@ impl<ChanSigner: ChannelKeys, M: Deref> events::EventsProvider for ChannelManage
25742574
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
25752575
{
25762576
//TODO: This behavior should be documented.
2577-
for htlc_update in self.monitor.fetch_pending_htlc_updated() {
2577+
for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
25782578
if let Some(preimage) = htlc_update.payment_preimage {
25792579
log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
25802580
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);

lightning/src/ln/channelmonitor.rs

Lines changed: 57 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,13 @@ pub struct MonitorUpdateError(pub &'static str);
9393

9494
/// Simple structure send back by ManyChannelMonitor in case of HTLC detected onchain from a
9595
/// forward channel and from which info are needed to update HTLC in a backward channel.
96+
#[derive(Clone, PartialEq)]
9697
pub struct HTLCUpdate {
9798
pub(super) payment_hash: PaymentHash,
9899
pub(super) payment_preimage: Option<PaymentPreimage>,
99100
pub(super) source: HTLCSource
100101
}
102+
impl_writeable!(HTLCUpdate, 0, { payment_hash, payment_preimage, source });
101103

102104
/// Simple trait indicating ability to track a set of ChannelMonitors and multiplex events between
103105
/// them. Generally should be implemented by keeping a local SimpleManyChannelMonitor and passing
@@ -130,8 +132,12 @@ pub trait ManyChannelMonitor<ChanSigner: ChannelKeys>: Send + Sync {
130132
fn add_update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), ChannelMonitorUpdateErr>;
131133

132134
/// Used by ChannelManager to get list of HTLC resolved onchain and which needed to be updated
133-
/// with success or failure backward
134-
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate>;
135+
/// with success or failure.
136+
///
137+
/// You should probably just call through to
138+
/// ChannelMonitor::get_and_clear_pending_htlcs_updated() for each ChannelMonitor and return
139+
/// the full list.
140+
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate>;
135141
}
136142

137143
/// A simple implementation of a ManyChannelMonitor and ChainListener. Can be used to create a
@@ -153,7 +159,6 @@ pub struct SimpleManyChannelMonitor<Key, ChanSigner: ChannelKeys> {
153159
chain_monitor: Arc<ChainWatchInterface>,
154160
broadcaster: Arc<BroadcasterInterface>,
155161
pending_events: Mutex<Vec<events::Event>>,
156-
pending_htlc_updated: Mutex<HashMap<PaymentHash, Vec<(HTLCSource, Option<PaymentPreimage>)>>>,
157162
logger: Arc<Logger>,
158163
fee_estimator: Arc<FeeEstimator>
159164
}
@@ -162,11 +167,10 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys> ChainListen
162167
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
163168
let block_hash = header.bitcoin_hash();
164169
let mut new_events: Vec<events::Event> = Vec::with_capacity(0);
165-
let mut htlc_updated_infos = Vec::new();
166170
{
167171
let mut monitors = self.monitors.lock().unwrap();
168172
for monitor in monitors.values_mut() {
169-
let (txn_outputs, spendable_outputs, mut htlc_updated) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
173+
let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
170174
if spendable_outputs.len() > 0 {
171175
new_events.push(events::Event::SpendableOutputs {
172176
outputs: spendable_outputs,
@@ -178,35 +182,6 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys> ChainListen
178182
self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey);
179183
}
180184
}
181-
htlc_updated_infos.append(&mut htlc_updated);
182-
}
183-
}
184-
{
185-
// ChannelManager will just need to fetch pending_htlc_updated and pass state backward
186-
let mut pending_htlc_updated = self.pending_htlc_updated.lock().unwrap();
187-
for htlc in htlc_updated_infos.drain(..) {
188-
match pending_htlc_updated.entry(htlc.2) {
189-
hash_map::Entry::Occupied(mut e) => {
190-
// In case of reorg we may have htlc outputs solved in a different way so
191-
// we prefer to keep claims but don't store duplicate updates for a given
192-
// (payment_hash, HTLCSource) pair.
193-
let mut existing_claim = false;
194-
e.get_mut().retain(|htlc_data| {
195-
if htlc.0 == htlc_data.0 {
196-
if htlc_data.1.is_some() {
197-
existing_claim = true;
198-
true
199-
} else { false }
200-
} else { true }
201-
});
202-
if !existing_claim {
203-
e.get_mut().push((htlc.0, htlc.1));
204-
}
205-
}
206-
hash_map::Entry::Vacant(e) => {
207-
e.insert(vec![(htlc.0, htlc.1)]);
208-
}
209-
}
210185
}
211186
}
212187
let mut pending_events = self.pending_events.lock().unwrap();
@@ -231,7 +206,6 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys> Simpl
231206
chain_monitor,
232207
broadcaster,
233208
pending_events: Mutex::new(Vec::new()),
234-
pending_htlc_updated: Mutex::new(HashMap::new()),
235209
logger,
236210
fee_estimator: feeest,
237211
};
@@ -284,17 +258,10 @@ impl<ChanSigner: ChannelKeys> ManyChannelMonitor<ChanSigner> for SimpleManyChann
284258
}
285259
}
286260

287-
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
288-
let mut updated = self.pending_htlc_updated.lock().unwrap();
289-
let mut pending_htlcs_updated = Vec::with_capacity(updated.len());
290-
for (k, v) in updated.drain() {
291-
for htlc_data in v {
292-
pending_htlcs_updated.push(HTLCUpdate {
293-
payment_hash: k,
294-
payment_preimage: htlc_data.1,
295-
source: htlc_data.0,
296-
});
297-
}
261+
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
262+
let mut pending_htlcs_updated = Vec::new();
263+
for chan in self.monitors.lock().unwrap().values_mut() {
264+
pending_htlcs_updated.append(&mut chan.get_and_clear_pending_htlcs_updated());
298265
}
299266
pending_htlcs_updated
300267
}
@@ -640,6 +607,8 @@ pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
640607

641608
payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
642609

610+
pending_htlcs_updated: Vec<HTLCUpdate>,
611+
643612
destination_script: Script,
644613
// Thanks to data loss protection, we may be able to claim our non-htlc funds
645614
// back, this is the script we have to spend from but we need to
@@ -750,6 +719,7 @@ impl<ChanSigner: ChannelKeys> PartialEq for ChannelMonitor<ChanSigner> {
750719
self.current_remote_commitment_number != other.current_remote_commitment_number ||
751720
self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx ||
752721
self.payment_preimages != other.payment_preimages ||
722+
self.pending_htlcs_updated != other.pending_htlcs_updated ||
753723
self.destination_script != other.destination_script ||
754724
self.to_remote_rescue != other.to_remote_rescue ||
755725
self.pending_claim_requests != other.pending_claim_requests ||
@@ -938,6 +908,11 @@ impl<ChanSigner: ChannelKeys + Writeable> ChannelMonitor<ChanSigner> {
938908
writer.write_all(&payment_preimage.0[..])?;
939909
}
940910

911+
writer.write_all(&byte_utils::be64_to_array(self.pending_htlcs_updated.len() as u64))?;
912+
for data in self.pending_htlcs_updated.iter() {
913+
data.write(writer)?;
914+
}
915+
941916
self.last_block_hash.write(writer)?;
942917
self.destination_script.write(writer)?;
943918
if let Some((ref to_remote_script, ref local_key)) = self.to_remote_rescue {
@@ -1056,6 +1031,8 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
10561031
current_remote_commitment_number: 1 << 48,
10571032

10581033
payment_preimages: HashMap::new(),
1034+
pending_htlcs_updated: Vec::new(),
1035+
10591036
destination_script: destination_script,
10601037
to_remote_rescue: None,
10611038

@@ -1419,6 +1396,14 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
14191396
res
14201397
}
14211398

1399+
/// Get the list of HTLCs who's status has been updated on chain. This should be called by
1400+
/// ChannelManager via ManyChannelMonitor::get_and_clear_pending_htlcs_updated().
1401+
pub fn get_and_clear_pending_htlcs_updated(&mut self) -> Vec<HTLCUpdate> {
1402+
let mut ret = Vec::new();
1403+
mem::swap(&mut ret, &mut self.pending_htlcs_updated);
1404+
ret
1405+
}
1406+
14221407
/// Can only fail if idx is < get_min_seen_secret
14231408
pub(super) fn get_secret(&self, idx: u64) -> Option<[u8; 32]> {
14241409
for i in 0..self.old_secrets.len() {
@@ -2402,7 +2387,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
24022387
/// Eventually this should be pub and, roughly, implement ChainListener, however this requires
24032388
/// &mut self, as well as returns new spendable outputs and outpoints to watch for spending of
24042389
/// on-chain.
2405-
fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>, Vec<(HTLCSource, Option<PaymentPreimage>, PaymentHash)>) {
2390+
fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>) {
24062391
for tx in txn_matched {
24072392
let mut output_val = 0;
24082393
for out in tx.output.iter() {
@@ -2415,7 +2400,6 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
24152400
log_trace!(self, "Block {} at height {} connected with {} txn matched", block_hash, height, txn_matched.len());
24162401
let mut watch_outputs = Vec::new();
24172402
let mut spendable_outputs = Vec::new();
2418-
let mut htlc_updated = Vec::new();
24192403
let mut bump_candidates = HashSet::new();
24202404
for tx in txn_matched {
24212405
if tx.input.len() == 1 {
@@ -2474,10 +2458,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
24742458
// While all commitment/HTLC-Success/HTLC-Timeout transactions have one input, HTLCs
24752459
// can also be resolved in a few other ways which can have more than one output. Thus,
24762460
// we call is_resolving_htlc_output here outside of the tx.input.len() == 1 check.
2477-
let mut updated = self.is_resolving_htlc_output(&tx, height);
2478-
if updated.len() > 0 {
2479-
htlc_updated.append(&mut updated);
2480-
}
2461+
self.is_resolving_htlc_output(&tx, height);
24812462

24822463
// Scan all input to verify is one of the outpoint spent is of interest for us
24832464
let mut claimed_outputs_material = Vec::new();
@@ -2600,7 +2581,11 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
26002581
},
26012582
OnchainEvent::HTLCUpdate { htlc_update } => {
26022583
log_trace!(self, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0));
2603-
htlc_updated.push((htlc_update.0, None, htlc_update.1));
2584+
self.pending_htlcs_updated.push(HTLCUpdate {
2585+
payment_hash: htlc_update.1,
2586+
payment_preimage: None,
2587+
source: htlc_update.0,
2588+
});
26042589
},
26052590
OnchainEvent::ContentiousOutpoint { outpoint, .. } => {
26062591
self.claimable_outpoints.remove(&outpoint);
@@ -2632,7 +2617,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
26322617
for &(ref txid, ref output_scripts) in watch_outputs.iter() {
26332618
self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect());
26342619
}
2635-
(watch_outputs, spendable_outputs, htlc_updated)
2620+
(watch_outputs, spendable_outputs)
26362621
}
26372622

26382623
fn block_disconnected(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator) {
@@ -2752,9 +2737,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
27522737

27532738
/// Check if any transaction broadcasted is resolving HTLC output by a success or timeout on a local
27542739
/// or remote commitment tx, if so send back the source, preimage if found and payment_hash of resolved HTLC
2755-
fn is_resolving_htlc_output(&mut self, tx: &Transaction, height: u32) -> Vec<(HTLCSource, Option<PaymentPreimage>, PaymentHash)> {
2756-
let mut htlc_updated = Vec::new();
2757-
2740+
fn is_resolving_htlc_output(&mut self, tx: &Transaction, height: u32) {
27582741
'outer_loop: for input in &tx.input {
27592742
let mut payment_data = None;
27602743
let revocation_sig_claim = (input.witness.len() == 3 && HTLCType::scriptlen_to_htlctype(input.witness[2].len()) == Some(HTLCType::OfferedHTLC) && input.witness[1].len() == 33)
@@ -2854,10 +2837,18 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
28542837
let mut payment_preimage = PaymentPreimage([0; 32]);
28552838
if accepted_preimage_claim {
28562839
payment_preimage.0.copy_from_slice(&input.witness[3]);
2857-
htlc_updated.push((source, Some(payment_preimage), payment_hash));
2840+
self.pending_htlcs_updated.push(HTLCUpdate {
2841+
source,
2842+
payment_preimage: Some(payment_preimage),
2843+
payment_hash
2844+
});
28582845
} else if offered_preimage_claim {
28592846
payment_preimage.0.copy_from_slice(&input.witness[1]);
2860-
htlc_updated.push((source, Some(payment_preimage), payment_hash));
2847+
self.pending_htlcs_updated.push(HTLCUpdate {
2848+
source,
2849+
payment_preimage: Some(payment_preimage),
2850+
payment_hash
2851+
});
28612852
} else {
28622853
log_info!(self, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), height + ANTI_REORG_DELAY - 1);
28632854
match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
@@ -2880,7 +2871,6 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
28802871
}
28812872
}
28822873
}
2883-
htlc_updated
28842874
}
28852875

28862876
/// Lightning security model (i.e being able to redeem/timeout HTLC or penalize coutnerparty onchain) lays on the assumption of claim transactions getting confirmed before timelock expiration
@@ -3221,6 +3211,12 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,
32213211
}
32223212
}
32233213

3214+
let pending_htlcs_updated_len: u64 = Readable::read(reader)?;
3215+
let mut pending_htlcs_updated = Vec::with_capacity(cmp::min(pending_htlcs_updated_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)));
3216+
for _ in 0..pending_htlcs_updated_len {
3217+
pending_htlcs_updated.push(Readable::read(reader)?);
3218+
}
3219+
32243220
let last_block_hash: Sha256dHash = Readable::read(reader)?;
32253221
let destination_script = Readable::read(reader)?;
32263222
let to_remote_rescue = match <u8 as Readable<R>>::read(reader)? {
@@ -3321,6 +3317,7 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,
33213317
current_remote_commitment_number,
33223318

33233319
payment_preimages,
3320+
pending_htlcs_updated,
33243321

33253322
destination_script,
33263323
to_remote_rescue,

lightning/src/util/test_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ impl channelmonitor::ManyChannelMonitor<EnforcingChannelKeys> for TestChannelMon
7474
self.update_ret.lock().unwrap().clone()
7575
}
7676

77-
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
78-
return self.simple_monitor.fetch_pending_htlc_updated();
77+
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
78+
return self.simple_monitor.get_and_clear_pending_htlcs_updated();
7979
}
8080
}
8181

0 commit comments

Comments
 (0)