Skip to content

Drop ChannelManager's ChannelMonitor Arc for Deref #443

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub struct TestChannelMonitor {
impl TestChannelMonitor {
pub fn new(chain_monitor: Arc<dyn chaininterface::ChainWatchInterface>, broadcaster: Arc<dyn chaininterface::BroadcasterInterface>, logger: Arc<dyn Logger>, feeest: Arc<dyn chaininterface::FeeEstimator>) -> Self {
Self {
simple_monitor: channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest),
simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest)),
update_ret: Mutex::new(Ok(())),
latest_good_update: Mutex::new(HashMap::new()),
latest_update_good: Mutex::new(HashMap::new()),
Expand Down Expand Up @@ -190,7 +190,7 @@ pub fn do_test(data: &[u8]) {
config.channel_options.fee_proportional_millionths = 0;
config.channel_options.announced_channel = true;
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap(),
(Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>, broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap()),
monitor)
} }
}
Expand Down Expand Up @@ -221,14 +221,14 @@ pub fn do_test(data: &[u8]) {
let read_args = ChannelManagerReadArgs {
keys_manager,
fee_estimator: fee_est.clone(),
monitor: monitor.clone(),
monitor: monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>,
tx_broadcaster: broadcast.clone(),
logger,
default_config: config,
channel_monitors: &mut monitor_refs,
};

let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
for (_, was_good) in $old_monitors.latest_updates_good_at_last_ser.lock().unwrap().iter() {
if !was_good {
// If the last time we updated a monitor we didn't successfully update (and we
Expand Down
10 changes: 5 additions & 5 deletions fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ impl<'a> Hash for Peer<'a> {
}

struct MoneyLossDetector<'a> {
manager: Arc<ChannelManager<EnforcingChannelKeys>>,
manager: Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>,
monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
handler: PeerManager<Peer<'a>>,
handler: PeerManager<Peer<'a>, Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>>,

peers: &'a RefCell<[bool; 256]>,
funding_txn: Vec<Transaction>,
Expand All @@ -149,7 +149,7 @@ struct MoneyLossDetector<'a> {
blocks_connected: u32,
}
impl<'a> MoneyLossDetector<'a> {
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<EnforcingChannelKeys>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>>) -> Self {
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>, Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>>) -> Self {
MoneyLossDetector {
manager,
monitor,
Expand Down Expand Up @@ -320,14 +320,14 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {

let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin, Arc::clone(&logger)));
let broadcast = Arc::new(TestBroadcaster{});
let monitor = channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone());
let monitor = Arc::new(channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone()));

let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) });
let mut config = UserConfig::default();
config.channel_options.fee_proportional_millionths = slice_to_be32(get_slice!(4));
config.channel_options.announced_channel = get_slice!(1)[0] != 0;
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
let channelmanager = ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap();
let channelmanager = Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>, broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap());
let router = Arc::new(Router::new(PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret()), watch.clone(), Arc::clone(&logger)));

let peers = RefCell::new([false; 256]);
Expand Down
35 changes: 22 additions & 13 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use tokio::net::TcpStream;

use lightning::ln::peer_handler;
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
use lightning::ln::msgs::ChannelMessageHandler;

use std::mem;
use std::net::SocketAddr;
Expand All @@ -42,7 +43,7 @@ pub struct Connection {
id: u64,
}
impl Connection {
fn schedule_read(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
fn schedule_read<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
let us_ref = us.clone();
let us_close_ref = us.clone();
let peer_manager_ref = peer_manager.clone();
Expand Down Expand Up @@ -110,7 +111,7 @@ impl Connection {
///
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
/// ChannelManager and ChannelMonitor objects.
pub fn setup_inbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
pub fn setup_inbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
let (reader, us) = Self::new(event_notify, stream);

if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone(), peer_manager.clone())) {
Expand All @@ -124,7 +125,7 @@ impl Connection {
///
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
/// ChannelManager and ChannelMonitor objects.
pub fn setup_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
pub fn setup_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
let (reader, us) = Self::new(event_notify, stream);

if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone(), peer_manager.clone())) {
Expand All @@ -142,7 +143,7 @@ impl Connection {
///
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
/// ChannelManager and ChannelMonitor objects.
pub fn connect_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
pub fn connect_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
let connect_timeout = Delay::new(Instant::now() + Duration::from_secs(10)).then(|_| {
future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
});
Expand All @@ -157,19 +158,18 @@ impl Connection {
}
}

#[derive(Clone)]
pub struct SocketDescriptor {
pub struct SocketDescriptor<CMH: ChannelMessageHandler + 'static> {
conn: Arc<Mutex<Connection>>,
id: u64,
peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>,
peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>,
}
impl SocketDescriptor {
fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>) -> Self {
impl<CMH: ChannelMessageHandler> SocketDescriptor<CMH> {
fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>) -> Self {
let id = conn.lock().unwrap().id;
Self { conn, id, peer_manager }
}
}
impl peer_handler::SocketDescriptor for SocketDescriptor {
impl<CMH: ChannelMessageHandler> peer_handler::SocketDescriptor for SocketDescriptor<CMH> {
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
macro_rules! schedule_read {
($us_ref: expr) => {
Expand Down Expand Up @@ -256,13 +256,22 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
us.read_paused = true;
}
}
impl Eq for SocketDescriptor {}
impl PartialEq for SocketDescriptor {
impl<CMH: ChannelMessageHandler> Clone for SocketDescriptor<CMH> {
fn clone(&self) -> Self {
Self {
conn: Arc::clone(&self.conn),
id: self.id,
peer_manager: Arc::clone(&self.peer_manager),
}
}
}
impl<CMH: ChannelMessageHandler> Eq for SocketDescriptor<CMH> {}
impl<CMH: ChannelMessageHandler> PartialEq for SocketDescriptor<CMH> {
fn eq(&self, o: &Self) -> bool {
self.id == o.id
}
}
impl Hash for SocketDescriptor {
impl<CMH: ChannelMessageHandler> Hash for SocketDescriptor<CMH> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
Expand Down
54 changes: 36 additions & 18 deletions lightning/src/chain/chaininterface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ use bitcoin::network::constants::Network;

use util::logger::Logger;

use std::sync::{Mutex,Weak,MutexGuard,Arc};
use std::sync::{Mutex, MutexGuard, Arc};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::collections::HashSet;
use std::ops::Deref;
use std::marker::PhantomData;

/// Used to give chain error details upstream
pub enum ChainError {
Expand Down Expand Up @@ -205,26 +207,48 @@ impl ChainWatchedUtil {
}
}

/// BlockNotifierArc is useful when you need a BlockNotifier that points to ChainListeners with
/// static lifetimes, e.g. when you're using lightning-net-tokio (since tokio::spawn requires
/// parameters with static lifetimes). Other times you can afford a reference, which is more
/// efficient, in which case BlockNotifierRef is a more appropriate type. Defining these type
/// aliases prevents issues such as overly long function definitions.
pub type BlockNotifierArc = Arc<BlockNotifier<'static, Arc<ChainListener>>>;

/// BlockNotifierRef is useful when you want a BlockNotifier that points to ChainListeners
/// with nonstatic lifetimes. This is useful for when static lifetimes are not needed. Nonstatic
/// lifetimes are more efficient but less flexible, and should be used by default unless static
/// lifetimes are required, e.g. when you're using lightning-net-tokio (since tokio::spawn
/// requires parameters with static lifetimes), in which case BlockNotifierArc is a more
/// appropriate type. Defining these type aliases for common usages prevents issues such as
/// overly long function definitions.
pub type BlockNotifierRef<'a> = BlockNotifier<'a, &'a ChainListener>;

/// Utility for notifying listeners about new blocks, and handling block rescans if new watch
/// data is registered.
pub struct BlockNotifier {
listeners: Mutex<Vec<Weak<ChainListener>>>, //TODO(vmw): try removing Weak
///
/// Rather than using a plain BlockNotifier, it is preferable to use either a BlockNotifierArc
/// or a BlockNotifierRef for conciseness. See their documentation for more details, but essentially
/// you should default to using a BlockNotifierRef, and use a BlockNotifierArc instead when you
/// require ChainListeners with static lifetimes, such as when you're using lightning-net-tokio.
pub struct BlockNotifier<'a, CL: Deref<Target = ChainListener + 'a> + 'a> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add docs that describe the new parameters (here and everywhere) and discuss different ways to use them. Because its super easy to end up with massive type gook, we should probably define a type that defines types for Arc and maybe one for &'a as well if thats easy (plus point the docs to them).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, hopefully the docs I added + their location make sense

listeners: Mutex<Vec<CL>>,
chain_monitor: Arc<ChainWatchInterface>,
phantom: PhantomData<&'a ()>,
}

impl BlockNotifier {
impl<'a, CL: Deref<Target = ChainListener + 'a> + 'a> BlockNotifier<'a, CL> {
/// Constructs a new BlockNotifier without any listeners.
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier {
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier<'a, CL> {
BlockNotifier {
listeners: Mutex::new(Vec::new()),
chain_monitor,
phantom: PhantomData,
}
}

/// Register the given listener to receive events. Only a weak pointer is provided and
/// the registration should be freed once that pointer expires.
/// Register the given listener to receive events.
// TODO: unregister
pub fn register_listener(&self, listener: Weak<ChainListener>) {
pub fn register_listener(&self, listener: CL) {
let mut vec = self.listeners.lock().unwrap();
vec.push(listener);
}
Expand All @@ -250,25 +274,19 @@ impl BlockNotifier {
pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
let last_seen = self.chain_monitor.reentered();

let listeners = self.listeners.lock().unwrap().clone();
let listeners = self.listeners.lock().unwrap();
for listener in listeners.iter() {
match listener.upgrade() {
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
None => ()
}
listener.block_connected(header, height, txn_matched, indexes_of_txn_matched);
}
return last_seen != self.chain_monitor.reentered();
}


/// Notify listeners that a block was disconnected.
pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
let listeners = self.listeners.lock().unwrap().clone();
let listeners = self.listeners.lock().unwrap();
for listener in listeners.iter() {
match listener.upgrade() {
Some(arc) => arc.block_disconnected(&header, disconnected_height),
None => ()
}
listener.block_disconnected(&header, disconnected_height);
}
}

Expand Down
Loading