Skip to content

WIP: Drop ChannelManager's ChannelMonitor Arc for reference #424

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
[workspace]

members = [
"lightning",
"lightning-net-tokio",
"lightning"
]

# Our tests do actual crypo and lots of work, the tradeoff for -O1 is well worth it
Expand Down
112 changes: 63 additions & 49 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl Writer for VecWriter {

static mut IN_RESTORE: bool = false;
pub struct TestChannelMonitor {
pub simple_monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
pub simple_monitor: channelmonitor::SimpleManyChannelMonitor<OutPoint>,
pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
pub latest_good_update: Mutex<HashMap<OutPoint, Vec<u8>>>,
pub latest_update_good: Mutex<HashMap<OutPoint, bool>>,
Expand Down Expand Up @@ -172,34 +172,41 @@ impl KeysInterface for KeyProvider {
}
}

pub struct NodeCfg {
pub chain_monitor: Arc<chaininterface::ChainWatchInterfaceUtil>,
pub chan_monitor: TestChannelMonitor,
keys_manager: Arc<KeyProvider>,
pub logger: Arc<Logger>,
}

#[inline]
pub fn do_test(data: &[u8]) {
let fee_est = Arc::new(FuzzEstimator{});
let broadcast = Arc::new(TestBroadcaster{});

macro_rules! make_node {
macro_rules! make_node_cfg {
($node_id: expr) => { {
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string()));
let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin, Arc::clone(&logger)));
let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone()));

let chain_monitor = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin, Arc::clone(&logger)));
let chan_monitor = TestChannelMonitor::new(chain_monitor.clone(), broadcast.clone(), logger.clone(), fee_est.clone());
let keys_manager = Arc::new(KeyProvider { node_id: $node_id, session_id: atomic::AtomicU8::new(0), channel_id: atomic::AtomicU8::new(0) });
Box::pin(NodeCfg{ logger: Arc::clone(&logger), chain_monitor, chan_monitor, keys_manager })
} }
}

macro_rules! make_node {
($node_cfg: expr) => { {
let mut config = UserConfig::default();
config.channel_options.fee_proportional_millionths = 0;
config.channel_options.announced_channel = true;
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap(),
monitor)
let chan_monitor_ref = unsafe { &(*(&(*$node_cfg) as *const NodeCfg)).chan_monitor };
ChannelManager::new(Network::Bitcoin, fee_est.clone(), chan_monitor_ref, broadcast.clone(), Arc::clone(&$node_cfg.logger), $node_cfg.keys_manager.clone(), config, 0).unwrap()
} }
}

macro_rules! reload_node {
($ser: expr, $node_id: expr, $old_monitors: expr) => { {
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string()));
let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin, Arc::clone(&logger)));
let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone()));

let keys_manager = Arc::new(KeyProvider { node_id: $node_id, session_id: atomic::AtomicU8::new(0), channel_id: atomic::AtomicU8::new(0) });
($ser: expr, $new_cfg: expr, $old_monitors: expr) => { {
let mut config = UserConfig::default();
config.channel_options.fee_proportional_millionths = 0;
config.channel_options.announced_channel = true;
Expand All @@ -208,25 +215,26 @@ pub fn do_test(data: &[u8]) {
let mut monitors = HashMap::new();
let mut old_monitors = $old_monitors.latest_good_update.lock().unwrap();
for (outpoint, monitor_ser) in old_monitors.drain() {
monitors.insert(outpoint, <(Sha256d, ChannelMonitor)>::read(&mut Cursor::new(&monitor_ser), Arc::clone(&logger)).expect("Failed to read monitor").1);
monitor.latest_good_update.lock().unwrap().insert(outpoint, monitor_ser);
monitors.insert(outpoint, <(Sha256d, ChannelMonitor)>::read(&mut Cursor::new(&monitor_ser), Arc::clone(&$new_cfg.logger)).expect("Failed to read monitor").1);
$new_cfg.chan_monitor.latest_good_update.lock().unwrap().insert(outpoint, monitor_ser);
}
let mut monitor_refs = HashMap::new();
for (outpoint, monitor) in monitors.iter() {
monitor_refs.insert(*outpoint, monitor);
}

let chan_monitor_ref = unsafe { &(*(&(*$new_cfg) as *const NodeCfg)).chan_monitor };
let read_args = ChannelManagerReadArgs {
keys_manager,
keys_manager: $new_cfg.keys_manager.clone(),
fee_estimator: fee_est.clone(),
monitor: monitor.clone(),
monitor: chan_monitor_ref,
tx_broadcaster: broadcast.clone(),
logger,
logger: $new_cfg.logger.clone(),
default_config: config,
channel_monitors: &monitor_refs,
};

let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, chan_monitor_ref);
for (_, was_good) in $old_monitors.latest_updates_good_at_last_ser.lock().unwrap().iter() {
if !was_good {
// If the last time we updated a monitor we didn't successfully update (and we
Expand Down Expand Up @@ -349,9 +357,12 @@ pub fn do_test(data: &[u8]) {

// 3 nodes is enough to hit all the possible cases, notably unknown-source-unknown-dest
// forwarding.
let (mut node_a, mut monitor_a) = make_node!(0);
let (mut node_b, mut monitor_b) = make_node!(1);
let (mut node_c, mut monitor_c) = make_node!(2);
let mut cfg_a = make_node_cfg!(0);
let mut cfg_b = make_node_cfg!(1);
let mut cfg_c = make_node_cfg!(2);
let node_a = make_node!(cfg_a);
let node_b = make_node!(cfg_b);
let node_c = make_node!(cfg_c);

let mut nodes = [node_a, node_b, node_c];

Expand Down Expand Up @@ -618,12 +629,12 @@ pub fn do_test(data: &[u8]) {
}

match get_slice!(1)[0] {
0x00 => *monitor_a.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
0x01 => *monitor_b.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
0x02 => *monitor_c.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
0x03 => *monitor_a.update_ret.lock().unwrap() = Ok(()),
0x04 => *monitor_b.update_ret.lock().unwrap() = Ok(()),
0x05 => *monitor_c.update_ret.lock().unwrap() = Ok(()),
0x00 => *cfg_a.chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
0x01 => *cfg_b.chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
0x02 => *cfg_c.chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
0x03 => *cfg_a.chan_monitor.update_ret.lock().unwrap() = Ok(()),
0x04 => *cfg_b.chan_monitor.update_ret.lock().unwrap() = Ok(()),
0x05 => *cfg_c.chan_monitor.update_ret.lock().unwrap() = Ok(()),
0x06 => { unsafe { IN_RESTORE = true }; nodes[0].test_restore_channel_monitor(); unsafe { IN_RESTORE = false }; },
0x07 => { unsafe { IN_RESTORE = true }; nodes[1].test_restore_channel_monitor(); unsafe { IN_RESTORE = false }; },
0x08 => { unsafe { IN_RESTORE = true }; nodes[2].test_restore_channel_monitor(); unsafe { IN_RESTORE = false }; },
Expand Down Expand Up @@ -681,10 +692,10 @@ pub fn do_test(data: &[u8]) {
chan_a_disconnected = true;
drain_msg_events_on_disconnect!(0);
}
let (new_node_a, new_monitor_a) = reload_node!(node_a_ser, 0, monitor_a);
node_a = Arc::new(new_node_a);
nodes[0] = node_a.clone();
monitor_a = new_monitor_a;
let new_cfg_a = make_node_cfg!(0);
let new_node_a = reload_node!(node_a_ser, new_cfg_a, &cfg_a.chan_monitor);
nodes[0] = new_node_a.0;
cfg_a = new_cfg_a;
},
0x20 => {
if !chan_a_disconnected {
Expand All @@ -699,42 +710,45 @@ pub fn do_test(data: &[u8]) {
nodes[2].get_and_clear_pending_msg_events();
bc_events.clear();
}
let (new_node_b, new_monitor_b) = reload_node!(node_b_ser, 1, monitor_b);
node_b = Arc::new(new_node_b);
nodes[1] = node_b.clone();
monitor_b = new_monitor_b;
let new_cfg_b = make_node_cfg!(1);
let new_node_b = reload_node!(node_b_ser, new_cfg_b, &cfg_b.chan_monitor);
nodes[0] = new_node_b.0;
cfg_b = new_cfg_b;
},
0x21 => {
if !chan_b_disconnected {
nodes[1].peer_disconnected(&nodes[2].get_our_node_id(), false);
chan_b_disconnected = true;
drain_msg_events_on_disconnect!(2);
}
let (new_node_c, new_monitor_c) = reload_node!(node_c_ser, 2, monitor_c);
node_c = Arc::new(new_node_c);
nodes[2] = node_c.clone();
monitor_c = new_monitor_c;
let new_cfg_c = make_node_cfg!(2);
let new_node_c = reload_node!(node_c_ser, new_cfg_c, &cfg_c.chan_monitor);
nodes[0] = new_node_c.0;
cfg_c = new_cfg_c;
},
_ => test_return!(),
}

if monitor_a.should_update_manager.load(atomic::Ordering::Relaxed) {
if cfg_a.chan_monitor.should_update_manager.load(atomic::Ordering::Relaxed) {
node_a_ser.0.clear();
nodes[0].write(&mut node_a_ser).unwrap();
monitor_a.should_update_manager.store(false, atomic::Ordering::Relaxed);
*monitor_a.latest_updates_good_at_last_ser.lock().unwrap() = monitor_a.latest_update_good.lock().unwrap().clone();
cfg_a.chan_monitor.should_update_manager.store(false, atomic::Ordering::Relaxed);
let chan_monitor_ref = unsafe { &(*(&(*cfg_a) as *const NodeCfg)).chan_monitor };
*chan_monitor_ref.latest_updates_good_at_last_ser.lock().unwrap() = cfg_a.chan_monitor.latest_update_good.lock().unwrap().clone();
}
if monitor_b.should_update_manager.load(atomic::Ordering::Relaxed) {
if cfg_b.chan_monitor.should_update_manager.load(atomic::Ordering::Relaxed) {
node_b_ser.0.clear();
nodes[1].write(&mut node_b_ser).unwrap();
monitor_b.should_update_manager.store(false, atomic::Ordering::Relaxed);
*monitor_b.latest_updates_good_at_last_ser.lock().unwrap() = monitor_b.latest_update_good.lock().unwrap().clone();
cfg_b.chan_monitor.should_update_manager.store(false, atomic::Ordering::Relaxed);
let chan_monitor_ref = unsafe { &(*(&(*cfg_b) as *const NodeCfg)).chan_monitor };
*chan_monitor_ref.latest_updates_good_at_last_ser.lock().unwrap() = cfg_b.chan_monitor.latest_update_good.lock().unwrap().clone();
}
if monitor_c.should_update_manager.load(atomic::Ordering::Relaxed) {
if cfg_c.chan_monitor.should_update_manager.load(atomic::Ordering::Relaxed) {
node_c_ser.0.clear();
nodes[2].write(&mut node_c_ser).unwrap();
monitor_c.should_update_manager.store(false, atomic::Ordering::Relaxed);
*monitor_c.latest_updates_good_at_last_ser.lock().unwrap() = monitor_c.latest_update_good.lock().unwrap().clone();
cfg_c.chan_monitor.should_update_manager.store(false, atomic::Ordering::Relaxed);
let chan_monitor_ref = unsafe { &(*(&(*cfg_c) as *const NodeCfg)).chan_monitor };
*chan_monitor_ref.latest_updates_good_at_last_ser.lock().unwrap() = cfg_c.chan_monitor.latest_update_good.lock().unwrap().clone();
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ impl<'a> Hash for Peer<'a> {
}

struct MoneyLossDetector<'a, 'b> {
manager: Arc<ChannelManager<'b, EnforcingChannelKeys>>,
monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
handler: PeerManager<Peer<'a>>,
manager: &'a ChannelManager<'b, EnforcingChannelKeys>,
monitor: &'b channelmonitor::SimpleManyChannelMonitor<OutPoint>,
handler: PeerManager<'a, Peer<'a>>,

peers: &'a RefCell<[bool; 256]>,
funding_txn: Vec<Transaction>,
Expand All @@ -149,7 +149,7 @@ struct MoneyLossDetector<'a, 'b> {
blocks_connected: u32,
}
impl<'a, 'b> MoneyLossDetector<'a, 'b> {
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<'b, EnforcingChannelKeys>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>>) -> Self {
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: &'a ChannelManager<'b, EnforcingChannelKeys>, monitor: &'b channelmonitor::SimpleManyChannelMonitor<OutPoint>, handler: PeerManager<'a, Peer<'a>>) -> Self {
MoneyLossDetector {
manager,
monitor,
Expand Down Expand Up @@ -325,12 +325,12 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
config.channel_options.fee_proportional_millionths = slice_to_be32(get_slice!(4));
config.channel_options.announced_channel = get_slice!(1)[0] != 0;
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
let channelmanager = ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap();
let channelmanager = ChannelManager::new(Network::Bitcoin, fee_est.clone(), &monitor, broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap();
let router = Arc::new(Router::new(PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret()), watch.clone(), Arc::clone(&logger)));

let peers = RefCell::new([false; 256]);
let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
chan_handler: channelmanager.clone(),
let mut loss_detector = MoneyLossDetector::new(&peers, &channelmanager, &monitor, PeerManager::new(MessageHandler {
chan_handler: &channelmanager,
route_handler: router.clone(),
}, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger)));

Expand Down
19 changes: 0 additions & 19 deletions lightning-net-tokio/Cargo.toml

This file was deleted.

Loading