Skip to content

Move to a Monitor-Update return from copying around ChannelMonitors #489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Feb 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 73 additions & 69 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,52 +73,55 @@ impl Writer for VecWriter {
}
}

static mut IN_RESTORE: bool = false;
pub struct TestChannelMonitor {
pub logger: Arc<dyn Logger>,
pub simple_monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<chaininterface::BroadcasterInterface>>>,
pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
pub latest_good_update: Mutex<HashMap<OutPoint, Vec<u8>>>,
pub latest_update_good: Mutex<HashMap<OutPoint, bool>>,
pub latest_updates_good_at_last_ser: Mutex<HashMap<OutPoint, bool>>,
// If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
// logic will automatically force-close our channels for us (as we don't have an up-to-date
// monitor implying we are not able to punish misbehaving counterparties). Because this test
// "fails" if we ever force-close a channel, we avoid doing so, always saving the latest
// fully-serialized monitor state here, as well as the corresponding update_id.
pub latest_monitors: Mutex<HashMap<OutPoint, (u64, Vec<u8>)>>,
pub should_update_manager: atomic::AtomicBool,
}
impl TestChannelMonitor {
pub fn new(chain_monitor: Arc<dyn chaininterface::ChainWatchInterface>, broadcaster: Arc<dyn chaininterface::BroadcasterInterface>, logger: Arc<dyn Logger>, feeest: Arc<dyn chaininterface::FeeEstimator>) -> Self {
Self {
simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest)),
simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger.clone(), feeest)),
logger,
update_ret: Mutex::new(Ok(())),
latest_good_update: Mutex::new(HashMap::new()),
latest_update_good: Mutex::new(HashMap::new()),
latest_updates_good_at_last_ser: Mutex::new(HashMap::new()),
latest_monitors: Mutex::new(HashMap::new()),
should_update_manager: atomic::AtomicBool::new(false),
}
}
}
impl channelmonitor::ManyChannelMonitor<EnforcingChannelKeys> for TestChannelMonitor {
fn add_update_monitor(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<EnforcingChannelKeys>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
let ret = self.update_ret.lock().unwrap().clone();
if let Ok(()) = ret {
let mut ser = VecWriter(Vec::new());
monitor.write_for_disk(&mut ser).unwrap();
self.latest_good_update.lock().unwrap().insert(funding_txo, ser.0);
match self.latest_update_good.lock().unwrap().entry(funding_txo) {
hash_map::Entry::Vacant(e) => { e.insert(true); },
hash_map::Entry::Occupied(mut e) => {
if !e.get() && unsafe { IN_RESTORE } {
// Technically we can't consider an update to be "good" unless we're doing
// it in response to a test_restore_channel_monitor as the channel may
// still be waiting on such a call, so only set us to good if we're in the
// middle of a restore call.
e.insert(true);
}
},
}
self.should_update_manager.store(true, atomic::Ordering::Relaxed);
} else {
self.latest_update_good.lock().unwrap().insert(funding_txo, false);
fn add_monitor(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<EnforcingChannelKeys>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
let mut ser = VecWriter(Vec::new());
monitor.write_for_disk(&mut ser).unwrap();
if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) {
panic!("Already had monitor pre-add_monitor");
}
assert!(self.simple_monitor.add_update_monitor(funding_txo, monitor).is_ok());
ret
self.should_update_manager.store(true, atomic::Ordering::Relaxed);
assert!(self.simple_monitor.add_monitor(funding_txo, monitor).is_ok());
self.update_ret.lock().unwrap().clone()
}

fn update_monitor(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
let mut map_lock = self.latest_monitors.lock().unwrap();
let mut map_entry = match map_lock.entry(funding_txo) {
hash_map::Entry::Occupied(entry) => entry,
hash_map::Entry::Vacant(_) => panic!("Didn't have monitor on update call"),
};
let mut deserialized_monitor = <(Sha256d, channelmonitor::ChannelMonitor<EnforcingChannelKeys>)>::
read(&mut Cursor::new(&map_entry.get().1), Arc::clone(&self.logger)).unwrap().1;
deserialized_monitor.update_monitor(update.clone()).unwrap();
let mut ser = VecWriter(Vec::new());
deserialized_monitor.write_for_disk(&mut ser).unwrap();
map_entry.insert((update.update_id, ser.0));
self.should_update_manager.store(true, atomic::Ordering::Relaxed);
self.update_ret.lock().unwrap().clone()
}

fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
Expand Down Expand Up @@ -210,10 +213,10 @@ pub fn do_test(data: &[u8]) {
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;

let mut monitors = HashMap::new();
let mut old_monitors = $old_monitors.latest_good_update.lock().unwrap();
for (outpoint, monitor_ser) in old_monitors.drain() {
let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
for (outpoint, (update_id, monitor_ser)) in old_monitors.drain() {
monitors.insert(outpoint, <(Sha256d, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut Cursor::new(&monitor_ser), Arc::clone(&logger)).expect("Failed to read monitor").1);
monitor.latest_good_update.lock().unwrap().insert(outpoint, monitor_ser);
monitor.latest_monitors.lock().unwrap().insert(outpoint, (update_id, monitor_ser));
}
let mut monitor_refs = HashMap::new();
for (outpoint, monitor) in monitors.iter_mut() {
Expand All @@ -230,17 +233,7 @@ pub fn do_test(data: &[u8]) {
channel_monitors: &mut monitor_refs,
};

let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys, Arc<TestChannelMonitor>, Arc<TestBroadcaster>>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
for (_, was_good) in $old_monitors.latest_updates_good_at_last_ser.lock().unwrap().iter() {
if !was_good {
// If the last time we updated a monitor we didn't successfully update (and we
// have sense updated our serialized copy of the ChannelManager) we may
// force-close the channel on our counterparty cause we know we're missing
// something. Thus, we just return here since we can't continue to test.
return;
}
}
res
(<(Sha256d, ChannelManager<EnforcingChannelKeys, Arc<TestChannelMonitor>, Arc<TestBroadcaster>>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor)
} }
}

Expand All @@ -266,14 +259,15 @@ pub fn do_test(data: &[u8]) {
};

$source.handle_accept_channel(&$dest.get_our_node_id(), InitFeatures::supported(), &accept_channel);
let funding_output;
{
let events = $source.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
if let events::Event::FundingGenerationReady { ref temporary_channel_id, ref channel_value_satoshis, ref output_script, .. } = events[0] {
let tx = Transaction { version: $chan_id, lock_time: 0, input: Vec::new(), output: vec![TxOut {
value: *channel_value_satoshis, script_pubkey: output_script.clone(),
}]};
let funding_output = OutPoint::new(tx.txid(), 0);
funding_output = OutPoint::new(tx.txid(), 0);
$source.funding_transaction_generated(&temporary_channel_id, funding_output);
channel_txn.push(tx);
} else { panic!("Wrong event type"); }
Expand Down Expand Up @@ -303,6 +297,7 @@ pub fn do_test(data: &[u8]) {
if let events::Event::FundingBroadcastSafe { .. } = events[0] {
} else { panic!("Wrong event type"); }
}
funding_output
} }
}

Expand Down Expand Up @@ -359,8 +354,8 @@ pub fn do_test(data: &[u8]) {

let mut nodes = [node_a, node_b, node_c];

make_channel!(nodes[0], nodes[1], 0);
make_channel!(nodes[1], nodes[2], 1);
let chan_1_funding = make_channel!(nodes[0], nodes[1], 0);
let chan_2_funding = make_channel!(nodes[1], nodes[2], 1);

for node in nodes.iter() {
confirm_txn!(node);
Expand Down Expand Up @@ -631,9 +626,26 @@ pub fn do_test(data: &[u8]) {
0x03 => *monitor_a.update_ret.lock().unwrap() = Ok(()),
0x04 => *monitor_b.update_ret.lock().unwrap() = Ok(()),
0x05 => *monitor_c.update_ret.lock().unwrap() = Ok(()),
0x06 => { unsafe { IN_RESTORE = true }; nodes[0].test_restore_channel_monitor(); unsafe { IN_RESTORE = false }; },
0x07 => { unsafe { IN_RESTORE = true }; nodes[1].test_restore_channel_monitor(); unsafe { IN_RESTORE = false }; },
0x08 => { unsafe { IN_RESTORE = true }; nodes[2].test_restore_channel_monitor(); unsafe { IN_RESTORE = false }; },
0x06 => {
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
nodes[0].channel_monitor_updated(&chan_1_funding, *id);
}
},
0x07 => {
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
nodes[1].channel_monitor_updated(&chan_1_funding, *id);
}
},
0x24 => {
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
nodes[1].channel_monitor_updated(&chan_2_funding, *id);
}
},
0x08 => {
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
nodes[2].channel_monitor_updated(&chan_2_funding, *id);
}
},
0x09 => send_payment!(nodes[0], (&nodes[1], chan_a)),
0x0a => send_payment!(nodes[1], (&nodes[0], chan_a)),
0x0b => send_payment!(nodes[1], (&nodes[2], chan_b)),
Expand Down Expand Up @@ -722,27 +734,19 @@ pub fn do_test(data: &[u8]) {
nodes[2] = node_c.clone();
monitor_c = new_monitor_c;
},
// 0x24 defined above
_ => test_return!(),
}

if monitor_a.should_update_manager.load(atomic::Ordering::Relaxed) {
node_a_ser.0.clear();
nodes[0].write(&mut node_a_ser).unwrap();
monitor_a.should_update_manager.store(false, atomic::Ordering::Relaxed);
*monitor_a.latest_updates_good_at_last_ser.lock().unwrap() = monitor_a.latest_update_good.lock().unwrap().clone();
}
if monitor_b.should_update_manager.load(atomic::Ordering::Relaxed) {
node_b_ser.0.clear();
nodes[1].write(&mut node_b_ser).unwrap();
monitor_b.should_update_manager.store(false, atomic::Ordering::Relaxed);
*monitor_b.latest_updates_good_at_last_ser.lock().unwrap() = monitor_b.latest_update_good.lock().unwrap().clone();
}
if monitor_c.should_update_manager.load(atomic::Ordering::Relaxed) {
node_c_ser.0.clear();
nodes[2].write(&mut node_c_ser).unwrap();
monitor_c.should_update_manager.store(false, atomic::Ordering::Relaxed);
*monitor_c.latest_updates_good_at_last_ser.lock().unwrap() = monitor_c.latest_update_good.lock().unwrap().clone();
}
node_a_ser.0.clear();
nodes[0].write(&mut node_a_ser).unwrap();
monitor_a.should_update_manager.store(false, atomic::Ordering::Relaxed);
node_b_ser.0.clear();
nodes[1].write(&mut node_b_ser).unwrap();
monitor_b.should_update_manager.store(false, atomic::Ordering::Relaxed);
node_c_ser.0.clear();
nodes[2].write(&mut node_c_ser).unwrap();
monitor_c.should_update_manager.store(false, atomic::Ordering::Relaxed);
}
}

Expand Down
3 changes: 2 additions & 1 deletion lightning/src/chain/keysinterface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ pub trait KeysInterface: Send + Sync {
/// (TODO: We shouldn't require that, and should have an API to get them at deser time, due mostly
/// to the possibility of reentrancy issues by calling the user's code during our deserialization
/// routine).
/// TODO: remove Clone once we start returning ChannelUpdate objects instead of copying ChannelMonitor
/// TODO: We should remove Clone by instead requesting a new ChannelKeys copy when we create
/// ChannelMonitors instead of expecting to clone the one out of the Channel into the monitors.
pub trait ChannelKeys : Send+Clone {
/// Gets the private key for the anchor tx
fn funding_key<'a>(&'a self) -> &'a SecretKey;
Expand Down
2 changes: 2 additions & 0 deletions lightning/src/chain/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ impl OutPoint {
}
}

impl_writeable!(OutPoint, 0, { txid, index });

#[cfg(test)]
mod tests {
use chain::transaction::OutPoint;
Expand Down
Loading