Skip to content

Commit 7e04d23

Browse files
committed
Rm ChannelMonitor merge capabilities in favor of explicit add/update
This removes the ability to merge ChannelMonitors in favor of explicit ChannelMonitorUpdates. It further removes ChannelManager::test_restore_channel_monitor in favor of the new ChannelManager::channel_monitor_updated method, which explicitly confirms a set of updates instead of providing the latest copy of each ChannelMonitor to the user. This removes almost all need for Channels to have the latest channel_monitor, except for broadcasting the latest local state.
1 parent c4fd06f commit 7e04d23

8 files changed

+182
-333
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 70 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -73,56 +73,55 @@ impl Writer for VecWriter {
7373
}
7474
}
7575

76-
static mut IN_RESTORE: bool = false;
7776
pub struct TestChannelMonitor {
77+
pub logger: Arc<dyn Logger>,
7878
pub simple_monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys>>,
7979
pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
80-
pub latest_good_update: Mutex<HashMap<OutPoint, Vec<u8>>>,
81-
pub latest_update_good: Mutex<HashMap<OutPoint, bool>>,
82-
pub latest_updates_good_at_last_ser: Mutex<HashMap<OutPoint, bool>>,
80+
// If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
81+
// logic will automatically force-close our channels for us (as we don't have an up-to-date
82+
// monitor implying we are not able to punish misbehaving counterparties). Because this test
83+
// "fails" if we ever force-close a channel, we avoid doing so, always saving the latest
84+
// fully-serialized monitor state here, as well as the corresponding update_id.
85+
pub latest_monitors: Mutex<HashMap<OutPoint, (u64, Vec<u8>)>>,
8386
pub should_update_manager: atomic::AtomicBool,
8487
}
8588
impl TestChannelMonitor {
8689
pub fn new(chain_monitor: Arc<dyn chaininterface::ChainWatchInterface>, broadcaster: Arc<dyn chaininterface::BroadcasterInterface>, logger: Arc<dyn Logger>, feeest: Arc<dyn chaininterface::FeeEstimator>) -> Self {
8790
Self {
88-
simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest)),
91+
simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger.clone(), feeest)),
92+
logger,
8993
update_ret: Mutex::new(Ok(())),
90-
latest_good_update: Mutex::new(HashMap::new()),
91-
latest_update_good: Mutex::new(HashMap::new()),
92-
latest_updates_good_at_last_ser: Mutex::new(HashMap::new()),
94+
latest_monitors: Mutex::new(HashMap::new()),
9395
should_update_manager: atomic::AtomicBool::new(false),
9496
}
9597
}
9698
}
9799
impl channelmonitor::ManyChannelMonitor<EnforcingChannelKeys> for TestChannelMonitor {
98-
fn add_update_monitor(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<EnforcingChannelKeys>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
99-
let ret = self.update_ret.lock().unwrap().clone();
100-
if let Ok(()) = ret {
101-
let mut ser = VecWriter(Vec::new());
102-
monitor.write_for_disk(&mut ser).unwrap();
103-
self.latest_good_update.lock().unwrap().insert(funding_txo, ser.0);
104-
match self.latest_update_good.lock().unwrap().entry(funding_txo) {
105-
hash_map::Entry::Vacant(e) => { e.insert(true); },
106-
hash_map::Entry::Occupied(mut e) => {
107-
if !e.get() && unsafe { IN_RESTORE } {
108-
// Technically we can't consider an update to be "good" unless we're doing
109-
// it in response to a test_restore_channel_monitor as the channel may
110-
// still be waiting on such a call, so only set us to good if we're in the
111-
// middle of a restore call.
112-
e.insert(true);
113-
}
114-
},
115-
}
116-
self.should_update_manager.store(true, atomic::Ordering::Relaxed);
117-
} else {
118-
self.latest_update_good.lock().unwrap().insert(funding_txo, false);
100+
fn add_monitor(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<EnforcingChannelKeys>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
101+
let mut ser = VecWriter(Vec::new());
102+
monitor.write_for_disk(&mut ser).unwrap();
103+
if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) {
104+
panic!("Already had monitor pre-add_monitor");
119105
}
120-
assert!(self.simple_monitor.add_update_monitor(funding_txo, monitor).is_ok());
121-
ret
106+
self.should_update_manager.store(true, atomic::Ordering::Relaxed);
107+
assert!(self.simple_monitor.add_monitor(funding_txo, monitor).is_ok());
108+
self.update_ret.lock().unwrap().clone()
122109
}
123110

124111
fn update_monitor(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
125-
unimplemented!(); //TODO
112+
let mut map_lock = self.latest_monitors.lock().unwrap();
113+
let mut map_entry = match map_lock.entry(funding_txo) {
114+
hash_map::Entry::Occupied(entry) => entry,
115+
hash_map::Entry::Vacant(_) => panic!("Didn't have monitor on update call"),
116+
};
117+
let mut deserialized_monitor = <(Sha256d, channelmonitor::ChannelMonitor<EnforcingChannelKeys>)>::
118+
read(&mut Cursor::new(&map_entry.get().1), Arc::clone(&self.logger)).unwrap().1;
119+
deserialized_monitor.update_monitor(update.clone()).unwrap();
120+
let mut ser = VecWriter(Vec::new());
121+
deserialized_monitor.write_for_disk(&mut ser).unwrap();
122+
map_entry.insert((update.update_id, ser.0));
123+
self.should_update_manager.store(true, atomic::Ordering::Relaxed);
124+
self.update_ret.lock().unwrap().clone()
126125
}
127126

128127
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
@@ -214,10 +213,10 @@ pub fn do_test(data: &[u8]) {
214213
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
215214

216215
let mut monitors = HashMap::new();
217-
let mut old_monitors = $old_monitors.latest_good_update.lock().unwrap();
218-
for (outpoint, monitor_ser) in old_monitors.drain() {
216+
let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
217+
for (outpoint, (update_id, monitor_ser)) in old_monitors.drain() {
219218
monitors.insert(outpoint, <(Sha256d, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut Cursor::new(&monitor_ser), Arc::clone(&logger)).expect("Failed to read monitor").1);
220-
monitor.latest_good_update.lock().unwrap().insert(outpoint, monitor_ser);
219+
monitor.latest_monitors.lock().unwrap().insert(outpoint, (update_id, monitor_ser));
221220
}
222221
let mut monitor_refs = HashMap::new();
223222
for (outpoint, monitor) in monitors.iter_mut() {
@@ -234,17 +233,7 @@ pub fn do_test(data: &[u8]) {
234233
channel_monitors: &mut monitor_refs,
235234
};
236235

237-
let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys, Arc<TestChannelMonitor>>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
238-
for (_, was_good) in $old_monitors.latest_updates_good_at_last_ser.lock().unwrap().iter() {
239-
if !was_good {
240-
// If the last time we updated a monitor we didn't successfully update (and we
241-
// have sense updated our serialized copy of the ChannelManager) we may
242-
// force-close the channel on our counterparty cause we know we're missing
243-
// something. Thus, we just return here since we can't continue to test.
244-
return;
245-
}
246-
}
247-
res
236+
(<(Sha256d, ChannelManager<EnforcingChannelKeys, Arc<TestChannelMonitor>>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor)
248237
} }
249238
}
250239

@@ -270,14 +259,15 @@ pub fn do_test(data: &[u8]) {
270259
};
271260

272261
$source.handle_accept_channel(&$dest.get_our_node_id(), InitFeatures::supported(), &accept_channel);
262+
let funding_output;
273263
{
274264
let events = $source.get_and_clear_pending_events();
275265
assert_eq!(events.len(), 1);
276266
if let events::Event::FundingGenerationReady { ref temporary_channel_id, ref channel_value_satoshis, ref output_script, .. } = events[0] {
277267
let tx = Transaction { version: $chan_id, lock_time: 0, input: Vec::new(), output: vec![TxOut {
278268
value: *channel_value_satoshis, script_pubkey: output_script.clone(),
279269
}]};
280-
let funding_output = OutPoint::new(tx.txid(), 0);
270+
funding_output = OutPoint::new(tx.txid(), 0);
281271
$source.funding_transaction_generated(&temporary_channel_id, funding_output);
282272
channel_txn.push(tx);
283273
} else { panic!("Wrong event type"); }
@@ -307,6 +297,7 @@ pub fn do_test(data: &[u8]) {
307297
if let events::Event::FundingBroadcastSafe { .. } = events[0] {
308298
} else { panic!("Wrong event type"); }
309299
}
300+
funding_output
310301
} }
311302
}
312303

@@ -363,8 +354,8 @@ pub fn do_test(data: &[u8]) {
363354

364355
let mut nodes = [node_a, node_b, node_c];
365356

366-
make_channel!(nodes[0], nodes[1], 0);
367-
make_channel!(nodes[1], nodes[2], 1);
357+
let chan_1_funding = make_channel!(nodes[0], nodes[1], 0);
358+
let chan_2_funding = make_channel!(nodes[1], nodes[2], 1);
368359

369360
for node in nodes.iter() {
370361
confirm_txn!(node);
@@ -635,9 +626,26 @@ pub fn do_test(data: &[u8]) {
635626
0x03 => *monitor_a.update_ret.lock().unwrap() = Ok(()),
636627
0x04 => *monitor_b.update_ret.lock().unwrap() = Ok(()),
637628
0x05 => *monitor_c.update_ret.lock().unwrap() = Ok(()),
638-
0x06 => { unsafe { IN_RESTORE = true }; nodes[0].test_restore_channel_monitor(); unsafe { IN_RESTORE = false }; },
639-
0x07 => { unsafe { IN_RESTORE = true }; nodes[1].test_restore_channel_monitor(); unsafe { IN_RESTORE = false }; },
640-
0x08 => { unsafe { IN_RESTORE = true }; nodes[2].test_restore_channel_monitor(); unsafe { IN_RESTORE = false }; },
629+
0x06 => {
630+
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
631+
nodes[0].channel_monitor_updated(&chan_1_funding, *id);
632+
}
633+
},
634+
0x07 => {
635+
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
636+
nodes[1].channel_monitor_updated(&chan_1_funding, *id);
637+
}
638+
},
639+
0x24 => {
640+
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
641+
nodes[1].channel_monitor_updated(&chan_2_funding, *id);
642+
}
643+
},
644+
0x08 => {
645+
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
646+
nodes[2].channel_monitor_updated(&chan_2_funding, *id);
647+
}
648+
},
641649
0x09 => send_payment!(nodes[0], (&nodes[1], chan_a)),
642650
0x0a => send_payment!(nodes[1], (&nodes[0], chan_a)),
643651
0x0b => send_payment!(nodes[1], (&nodes[2], chan_b)),
@@ -726,27 +734,19 @@ pub fn do_test(data: &[u8]) {
726734
nodes[2] = node_c.clone();
727735
monitor_c = new_monitor_c;
728736
},
737+
// 0x24 defined above
729738
_ => test_return!(),
730739
}
731740

732-
if monitor_a.should_update_manager.load(atomic::Ordering::Relaxed) {
733-
node_a_ser.0.clear();
734-
nodes[0].write(&mut node_a_ser).unwrap();
735-
monitor_a.should_update_manager.store(false, atomic::Ordering::Relaxed);
736-
*monitor_a.latest_updates_good_at_last_ser.lock().unwrap() = monitor_a.latest_update_good.lock().unwrap().clone();
737-
}
738-
if monitor_b.should_update_manager.load(atomic::Ordering::Relaxed) {
739-
node_b_ser.0.clear();
740-
nodes[1].write(&mut node_b_ser).unwrap();
741-
monitor_b.should_update_manager.store(false, atomic::Ordering::Relaxed);
742-
*monitor_b.latest_updates_good_at_last_ser.lock().unwrap() = monitor_b.latest_update_good.lock().unwrap().clone();
743-
}
744-
if monitor_c.should_update_manager.load(atomic::Ordering::Relaxed) {
745-
node_c_ser.0.clear();
746-
nodes[2].write(&mut node_c_ser).unwrap();
747-
monitor_c.should_update_manager.store(false, atomic::Ordering::Relaxed);
748-
*monitor_c.latest_updates_good_at_last_ser.lock().unwrap() = monitor_c.latest_update_good.lock().unwrap().clone();
749-
}
741+
node_a_ser.0.clear();
742+
nodes[0].write(&mut node_a_ser).unwrap();
743+
monitor_a.should_update_manager.store(false, atomic::Ordering::Relaxed);
744+
node_b_ser.0.clear();
745+
nodes[1].write(&mut node_b_ser).unwrap();
746+
monitor_b.should_update_manager.store(false, atomic::Ordering::Relaxed);
747+
node_c_ser.0.clear();
748+
nodes[2].write(&mut node_c_ser).unwrap();
749+
monitor_c.should_update_manager.store(false, atomic::Ordering::Relaxed);
750750
}
751751
}
752752

0 commit comments

Comments
 (0)