Skip to content

Commit 030c49c

Browse files
authored
Merge pull request #489 from TheBlueMatt/2020-02-chan-updates
Move to a Monitor-Update return from copying around ChannelMonitors
2 parents 3670dd0 + 08db88c commit 030c49c

13 files changed

+1535
-1033
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 73 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -73,52 +73,55 @@ impl Writer for VecWriter {
7373
}
7474
}
7575

76-
static mut IN_RESTORE: bool = false;
7776
pub struct TestChannelMonitor {
77+
pub logger: Arc<dyn Logger>,
7878
pub simple_monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<chaininterface::BroadcasterInterface>>>,
7979
pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
80-
pub latest_good_update: Mutex<HashMap<OutPoint, Vec<u8>>>,
81-
pub latest_update_good: Mutex<HashMap<OutPoint, bool>>,
82-
pub latest_updates_good_at_last_ser: Mutex<HashMap<OutPoint, bool>>,
80+
// If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
81+
// logic will automatically force-close our channels for us (as we don't have an up-to-date
82+
// monitor implying we are not able to punish misbehaving counterparties). Because this test
83+
// "fails" if we ever force-close a channel, we avoid doing so, always saving the latest
84+
// fully-serialized monitor state here, as well as the corresponding update_id.
85+
pub latest_monitors: Mutex<HashMap<OutPoint, (u64, Vec<u8>)>>,
8386
pub should_update_manager: atomic::AtomicBool,
8487
}
8588
impl TestChannelMonitor {
8689
pub fn new(chain_monitor: Arc<dyn chaininterface::ChainWatchInterface>, broadcaster: Arc<dyn chaininterface::BroadcasterInterface>, logger: Arc<dyn Logger>, feeest: Arc<dyn chaininterface::FeeEstimator>) -> Self {
8790
Self {
88-
simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest)),
91+
simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger.clone(), feeest)),
92+
logger,
8993
update_ret: Mutex::new(Ok(())),
90-
latest_good_update: Mutex::new(HashMap::new()),
91-
latest_update_good: Mutex::new(HashMap::new()),
92-
latest_updates_good_at_last_ser: Mutex::new(HashMap::new()),
94+
latest_monitors: Mutex::new(HashMap::new()),
9395
should_update_manager: atomic::AtomicBool::new(false),
9496
}
9597
}
9698
}
9799
impl channelmonitor::ManyChannelMonitor<EnforcingChannelKeys> for TestChannelMonitor {
98-
fn add_update_monitor(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<EnforcingChannelKeys>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
99-
let ret = self.update_ret.lock().unwrap().clone();
100-
if let Ok(()) = ret {
101-
let mut ser = VecWriter(Vec::new());
102-
monitor.write_for_disk(&mut ser).unwrap();
103-
self.latest_good_update.lock().unwrap().insert(funding_txo, ser.0);
104-
match self.latest_update_good.lock().unwrap().entry(funding_txo) {
105-
hash_map::Entry::Vacant(e) => { e.insert(true); },
106-
hash_map::Entry::Occupied(mut e) => {
107-
if !e.get() && unsafe { IN_RESTORE } {
108-
// Technically we can't consider an update to be "good" unless we're doing
109-
// it in response to a test_restore_channel_monitor as the channel may
110-
// still be waiting on such a call, so only set us to good if we're in the
111-
// middle of a restore call.
112-
e.insert(true);
113-
}
114-
},
115-
}
116-
self.should_update_manager.store(true, atomic::Ordering::Relaxed);
117-
} else {
118-
self.latest_update_good.lock().unwrap().insert(funding_txo, false);
100+
fn add_monitor(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<EnforcingChannelKeys>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
101+
let mut ser = VecWriter(Vec::new());
102+
monitor.write_for_disk(&mut ser).unwrap();
103+
if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) {
104+
panic!("Already had monitor pre-add_monitor");
119105
}
120-
assert!(self.simple_monitor.add_update_monitor(funding_txo, monitor).is_ok());
121-
ret
106+
self.should_update_manager.store(true, atomic::Ordering::Relaxed);
107+
assert!(self.simple_monitor.add_monitor(funding_txo, monitor).is_ok());
108+
self.update_ret.lock().unwrap().clone()
109+
}
110+
111+
fn update_monitor(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
112+
let mut map_lock = self.latest_monitors.lock().unwrap();
113+
let mut map_entry = match map_lock.entry(funding_txo) {
114+
hash_map::Entry::Occupied(entry) => entry,
115+
hash_map::Entry::Vacant(_) => panic!("Didn't have monitor on update call"),
116+
};
117+
let mut deserialized_monitor = <(Sha256d, channelmonitor::ChannelMonitor<EnforcingChannelKeys>)>::
118+
read(&mut Cursor::new(&map_entry.get().1), Arc::clone(&self.logger)).unwrap().1;
119+
deserialized_monitor.update_monitor(update.clone()).unwrap();
120+
let mut ser = VecWriter(Vec::new());
121+
deserialized_monitor.write_for_disk(&mut ser).unwrap();
122+
map_entry.insert((update.update_id, ser.0));
123+
self.should_update_manager.store(true, atomic::Ordering::Relaxed);
124+
self.update_ret.lock().unwrap().clone()
122125
}
123126

124127
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
@@ -210,10 +213,10 @@ pub fn do_test(data: &[u8]) {
210213
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
211214

212215
let mut monitors = HashMap::new();
213-
let mut old_monitors = $old_monitors.latest_good_update.lock().unwrap();
214-
for (outpoint, monitor_ser) in old_monitors.drain() {
216+
let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
217+
for (outpoint, (update_id, monitor_ser)) in old_monitors.drain() {
215218
monitors.insert(outpoint, <(Sha256d, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut Cursor::new(&monitor_ser), Arc::clone(&logger)).expect("Failed to read monitor").1);
216-
monitor.latest_good_update.lock().unwrap().insert(outpoint, monitor_ser);
219+
monitor.latest_monitors.lock().unwrap().insert(outpoint, (update_id, monitor_ser));
217220
}
218221
let mut monitor_refs = HashMap::new();
219222
for (outpoint, monitor) in monitors.iter_mut() {
@@ -230,17 +233,7 @@ pub fn do_test(data: &[u8]) {
230233
channel_monitors: &mut monitor_refs,
231234
};
232235

233-
let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys, Arc<TestChannelMonitor>, Arc<TestBroadcaster>>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
234-
for (_, was_good) in $old_monitors.latest_updates_good_at_last_ser.lock().unwrap().iter() {
235-
if !was_good {
236-
// If the last time we updated a monitor we didn't successfully update (and we
237-
// have sense updated our serialized copy of the ChannelManager) we may
238-
// force-close the channel on our counterparty cause we know we're missing
239-
// something. Thus, we just return here since we can't continue to test.
240-
return;
241-
}
242-
}
243-
res
236+
(<(Sha256d, ChannelManager<EnforcingChannelKeys, Arc<TestChannelMonitor>, Arc<TestBroadcaster>>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor)
244237
} }
245238
}
246239

@@ -266,14 +259,15 @@ pub fn do_test(data: &[u8]) {
266259
};
267260

268261
$source.handle_accept_channel(&$dest.get_our_node_id(), InitFeatures::supported(), &accept_channel);
262+
let funding_output;
269263
{
270264
let events = $source.get_and_clear_pending_events();
271265
assert_eq!(events.len(), 1);
272266
if let events::Event::FundingGenerationReady { ref temporary_channel_id, ref channel_value_satoshis, ref output_script, .. } = events[0] {
273267
let tx = Transaction { version: $chan_id, lock_time: 0, input: Vec::new(), output: vec![TxOut {
274268
value: *channel_value_satoshis, script_pubkey: output_script.clone(),
275269
}]};
276-
let funding_output = OutPoint::new(tx.txid(), 0);
270+
funding_output = OutPoint::new(tx.txid(), 0);
277271
$source.funding_transaction_generated(&temporary_channel_id, funding_output);
278272
channel_txn.push(tx);
279273
} else { panic!("Wrong event type"); }
@@ -303,6 +297,7 @@ pub fn do_test(data: &[u8]) {
303297
if let events::Event::FundingBroadcastSafe { .. } = events[0] {
304298
} else { panic!("Wrong event type"); }
305299
}
300+
funding_output
306301
} }
307302
}
308303

@@ -359,8 +354,8 @@ pub fn do_test(data: &[u8]) {
359354

360355
let mut nodes = [node_a, node_b, node_c];
361356

362-
make_channel!(nodes[0], nodes[1], 0);
363-
make_channel!(nodes[1], nodes[2], 1);
357+
let chan_1_funding = make_channel!(nodes[0], nodes[1], 0);
358+
let chan_2_funding = make_channel!(nodes[1], nodes[2], 1);
364359

365360
for node in nodes.iter() {
366361
confirm_txn!(node);
@@ -631,9 +626,26 @@ pub fn do_test(data: &[u8]) {
631626
0x03 => *monitor_a.update_ret.lock().unwrap() = Ok(()),
632627
0x04 => *monitor_b.update_ret.lock().unwrap() = Ok(()),
633628
0x05 => *monitor_c.update_ret.lock().unwrap() = Ok(()),
634-
0x06 => { unsafe { IN_RESTORE = true }; nodes[0].test_restore_channel_monitor(); unsafe { IN_RESTORE = false }; },
635-
0x07 => { unsafe { IN_RESTORE = true }; nodes[1].test_restore_channel_monitor(); unsafe { IN_RESTORE = false }; },
636-
0x08 => { unsafe { IN_RESTORE = true }; nodes[2].test_restore_channel_monitor(); unsafe { IN_RESTORE = false }; },
629+
0x06 => {
630+
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
631+
nodes[0].channel_monitor_updated(&chan_1_funding, *id);
632+
}
633+
},
634+
0x07 => {
635+
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
636+
nodes[1].channel_monitor_updated(&chan_1_funding, *id);
637+
}
638+
},
639+
0x24 => {
640+
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
641+
nodes[1].channel_monitor_updated(&chan_2_funding, *id);
642+
}
643+
},
644+
0x08 => {
645+
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
646+
nodes[2].channel_monitor_updated(&chan_2_funding, *id);
647+
}
648+
},
637649
0x09 => send_payment!(nodes[0], (&nodes[1], chan_a)),
638650
0x0a => send_payment!(nodes[1], (&nodes[0], chan_a)),
639651
0x0b => send_payment!(nodes[1], (&nodes[2], chan_b)),
@@ -722,27 +734,19 @@ pub fn do_test(data: &[u8]) {
722734
nodes[2] = node_c.clone();
723735
monitor_c = new_monitor_c;
724736
},
737+
// 0x24 defined above
725738
_ => test_return!(),
726739
}
727740

728-
if monitor_a.should_update_manager.load(atomic::Ordering::Relaxed) {
729-
node_a_ser.0.clear();
730-
nodes[0].write(&mut node_a_ser).unwrap();
731-
monitor_a.should_update_manager.store(false, atomic::Ordering::Relaxed);
732-
*monitor_a.latest_updates_good_at_last_ser.lock().unwrap() = monitor_a.latest_update_good.lock().unwrap().clone();
733-
}
734-
if monitor_b.should_update_manager.load(atomic::Ordering::Relaxed) {
735-
node_b_ser.0.clear();
736-
nodes[1].write(&mut node_b_ser).unwrap();
737-
monitor_b.should_update_manager.store(false, atomic::Ordering::Relaxed);
738-
*monitor_b.latest_updates_good_at_last_ser.lock().unwrap() = monitor_b.latest_update_good.lock().unwrap().clone();
739-
}
740-
if monitor_c.should_update_manager.load(atomic::Ordering::Relaxed) {
741-
node_c_ser.0.clear();
742-
nodes[2].write(&mut node_c_ser).unwrap();
743-
monitor_c.should_update_manager.store(false, atomic::Ordering::Relaxed);
744-
*monitor_c.latest_updates_good_at_last_ser.lock().unwrap() = monitor_c.latest_update_good.lock().unwrap().clone();
745-
}
741+
node_a_ser.0.clear();
742+
nodes[0].write(&mut node_a_ser).unwrap();
743+
monitor_a.should_update_manager.store(false, atomic::Ordering::Relaxed);
744+
node_b_ser.0.clear();
745+
nodes[1].write(&mut node_b_ser).unwrap();
746+
monitor_b.should_update_manager.store(false, atomic::Ordering::Relaxed);
747+
node_c_ser.0.clear();
748+
nodes[2].write(&mut node_c_ser).unwrap();
749+
monitor_c.should_update_manager.store(false, atomic::Ordering::Relaxed);
746750
}
747751
}
748752

lightning/src/chain/keysinterface.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ pub trait KeysInterface: Send + Sync {
135135
/// (TODO: We shouldn't require that, and should have an API to get them at deser time, due mostly
136136
/// to the possibility of reentrancy issues by calling the user's code during our deserialization
137137
/// routine).
138-
/// TODO: remove Clone once we start returning ChannelUpdate objects instead of copying ChannelMonitor
138+
/// TODO: We should remove Clone by instead requesting a new ChannelKeys copy when we create
139+
/// ChannelMonitors instead of expecting to clone the one out of the Channel into the monitors.
139140
pub trait ChannelKeys : Send+Clone {
140141
/// Gets the private key for the anchor tx
141142
fn funding_key<'a>(&'a self) -> &'a SecretKey;

lightning/src/chain/transaction.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ impl OutPoint {
3939
}
4040
}
4141

42+
impl_writeable!(OutPoint, 0, { txid, index });
43+
4244
#[cfg(test)]
4345
mod tests {
4446
use chain::transaction::OutPoint;

0 commit comments

Comments
 (0)