Skip to content

Commit 775a964

Browse files
Update ChannelManager's ChannelMonitor Arc to be a Deref
Additional changes: * Update fuzz crate to match ChannelManager's new API * Update lightning-net-tokio library to match ChannelManager's new ChannelMonitor Deref API * Update tests to match ChannelManager's new ChannelMonitor Deref API
1 parent a252f81 commit 775a964

File tree

11 files changed

+543
-270
lines changed

11 files changed

+543
-270
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ pub struct TestChannelMonitor {
8585
impl TestChannelMonitor {
8686
pub fn new(chain_monitor: Arc<dyn chaininterface::ChainWatchInterface>, broadcaster: Arc<dyn chaininterface::BroadcasterInterface>, logger: Arc<dyn Logger>, feeest: Arc<dyn chaininterface::FeeEstimator>) -> Self {
8787
Self {
88-
simple_monitor: channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest),
88+
simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest)),
8989
update_ret: Mutex::new(Ok(())),
9090
latest_good_update: Mutex::new(HashMap::new()),
9191
latest_update_good: Mutex::new(HashMap::new()),
@@ -189,7 +189,7 @@ pub fn do_test(data: &[u8]) {
189189
config.channel_options.fee_proportional_millionths = 0;
190190
config.channel_options.announced_channel = true;
191191
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
192-
(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap(),
192+
(Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>, broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap()),
193193
monitor)
194194
} }
195195
}
@@ -220,14 +220,14 @@ pub fn do_test(data: &[u8]) {
220220
let read_args = ChannelManagerReadArgs {
221221
keys_manager,
222222
fee_estimator: fee_est.clone(),
223-
monitor: monitor.clone(),
223+
monitor: monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>,
224224
tx_broadcaster: broadcast.clone(),
225225
logger,
226226
default_config: config,
227227
channel_monitors: &mut monitor_refs,
228228
};
229229

230-
let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
230+
let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
231231
for (_, was_good) in $old_monitors.latest_updates_good_at_last_ser.lock().unwrap().iter() {
232232
if !was_good {
233233
// If the last time we updated a monitor we didn't successfully update (and we

fuzz/src/full_stack.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,9 @@ impl<'a> Hash for Peer<'a> {
136136
}
137137

138138
struct MoneyLossDetector<'a> {
139-
manager: Arc<ChannelManager<EnforcingChannelKeys>>,
139+
manager: Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>,
140140
monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
141-
handler: PeerManager<Peer<'a>>,
141+
handler: PeerManager<Peer<'a>, Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>>,
142142

143143
peers: &'a RefCell<[bool; 256]>,
144144
funding_txn: Vec<Transaction>,
@@ -149,7 +149,7 @@ struct MoneyLossDetector<'a> {
149149
blocks_connected: u32,
150150
}
151151
impl<'a> MoneyLossDetector<'a> {
152-
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<EnforcingChannelKeys>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>>) -> Self {
152+
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>, Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>>) -> Self {
153153
MoneyLossDetector {
154154
manager,
155155
monitor,
@@ -318,14 +318,14 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
318318

319319
let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin, Arc::clone(&logger)));
320320
let broadcast = Arc::new(TestBroadcaster{});
321-
let monitor = channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone());
321+
let monitor = Arc::new(channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone()));
322322

323323
let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) });
324324
let mut config = UserConfig::default();
325325
config.channel_options.fee_proportional_millionths = slice_to_be32(get_slice!(4));
326326
config.channel_options.announced_channel = get_slice!(1)[0] != 0;
327327
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
328-
let channelmanager = ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap();
328+
let channelmanager = Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>, broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap());
329329
let router = Arc::new(Router::new(PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret()), watch.clone(), Arc::clone(&logger)));
330330

331331
let peers = RefCell::new([false; 256]);

lightning-net-tokio/src/lib.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use tokio::net::TcpStream;
1919

2020
use lightning::ln::peer_handler;
2121
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
22+
use lightning::ln::msgs::ChannelMessageHandler;
2223

2324
use std::mem;
2425
use std::net::SocketAddr;
@@ -42,7 +43,7 @@ pub struct Connection {
4243
id: u64,
4344
}
4445
impl Connection {
45-
fn schedule_read(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
46+
fn schedule_read(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<dyn ChannelMessageHandler>>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
4647
let us_ref = us.clone();
4748
let us_close_ref = us.clone();
4849
let peer_manager_ref = peer_manager.clone();
@@ -110,7 +111,7 @@ impl Connection {
110111
///
111112
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
112113
/// ChannelManager and ChannelMonitor objects.
113-
pub fn setup_inbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
114+
pub fn setup_inbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<dyn ChannelMessageHandler>>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
114115
let (reader, us) = Self::new(event_notify, stream);
115116

116117
if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone(), peer_manager.clone())) {
@@ -124,7 +125,7 @@ impl Connection {
124125
///
125126
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
126127
/// ChannelManager and ChannelMonitor objects.
127-
pub fn setup_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
128+
pub fn setup_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<dyn ChannelMessageHandler>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
128129
let (reader, us) = Self::new(event_notify, stream);
129130

130131
if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone(), peer_manager.clone())) {
@@ -142,7 +143,7 @@ impl Connection {
142143
///
143144
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
144145
/// ChannelManager and ChannelMonitor objects.
145-
pub fn connect_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
146+
pub fn connect_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<dyn ChannelMessageHandler>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
146147
let connect_timeout = Delay::new(Instant::now() + Duration::from_secs(10)).then(|_| {
147148
future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
148149
});
@@ -161,10 +162,10 @@ impl Connection {
161162
pub struct SocketDescriptor {
162163
conn: Arc<Mutex<Connection>>,
163164
id: u64,
164-
peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>,
165+
peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<dyn ChannelMessageHandler>>>,
165166
}
166167
impl SocketDescriptor {
167-
fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>) -> Self {
168+
fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<dyn ChannelMessageHandler>>>) -> Self {
168169
let id = conn.lock().unwrap().id;
169170
Self { conn, id, peer_manager }
170171
}

lightning/src/chain/chaininterface.rs

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use bitcoin::network::constants::Network;
1414

1515
use util::logger::Logger;
1616

17-
use std::sync::{Mutex,Weak,MutexGuard,Arc};
17+
use std::sync::{Mutex, MutexGuard, Arc};
1818
use std::sync::atomic::{AtomicUsize, Ordering};
1919
use std::collections::HashSet;
2020

@@ -207,25 +207,33 @@ impl ChainWatchedUtil {
207207

208208
/// Utility for notifying listeners about new blocks, and handling block rescans if new watch
209209
/// data is registered.
210-
pub struct BlockNotifier {
211-
listeners: Mutex<Vec<Weak<ChainListener>>>, //TODO(vmw): try removing Weak
210+
pub struct BlockNotifier<'a> {
211+
ref_listeners: Mutex<Vec<&'a ChainListener>>,
212+
arc_listeners: Mutex<Vec<Arc<ChainListener>>>,
212213
chain_monitor: Arc<ChainWatchInterface>,
213214
}
214215

215-
impl BlockNotifier {
216+
impl<'a> BlockNotifier<'a> {
216217
/// Constructs a new BlockNotifier without any listeners.
217-
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier {
218+
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier<'a> {
218219
BlockNotifier {
219-
listeners: Mutex::new(Vec::new()),
220+
ref_listeners: Mutex::new(Vec::new()),
221+
arc_listeners: Mutex::new(Vec::new()),
220222
chain_monitor,
221223
}
222224
}
223225

224-
/// Register the given listener to receive events. Only a weak pointer is provided and
225-
/// the registration should be freed once that pointer expires.
226+
/// Register the given ref listener to receive events.
226227
// TODO: unregister
227-
pub fn register_listener(&self, listener: Weak<ChainListener>) {
228-
let mut vec = self.listeners.lock().unwrap();
228+
pub fn register_ref_listener(&self, listener: &'a ChainListener) {
229+
let mut vec = self.ref_listeners.lock().unwrap();
230+
vec.push(listener);
231+
}
232+
233+
/// Register the given Arc listener to receive events.
234+
// TODO: unregister
235+
pub fn register_arc_listener(&self, listener: Arc<ChainListener>) {
236+
let mut vec = self.arc_listeners.lock().unwrap();
229237
vec.push(listener);
230238
}
231239

@@ -250,25 +258,29 @@ impl BlockNotifier {
250258
pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
251259
let last_seen = self.chain_monitor.reentered();
252260

253-
let listeners = self.listeners.lock().unwrap().clone();
254-
for listener in listeners.iter() {
255-
match listener.upgrade() {
256-
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
257-
None => ()
258-
}
261+
let ref_listeners = self.ref_listeners.lock().unwrap().clone();
262+
for ref_listener in ref_listeners.iter() {
263+
ref_listener.block_connected(header, height, txn_matched, indexes_of_txn_matched);
264+
}
265+
266+
let arc_listeners = self.arc_listeners.lock().unwrap().clone();
267+
for arc_listener in arc_listeners.iter() {
268+
arc_listener.block_connected(header, height, txn_matched, indexes_of_txn_matched);
259269
}
260270
return last_seen != self.chain_monitor.reentered();
261271
}
262272

263273

264274
/// Notify listeners that a block was disconnected.
265275
pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
266-
let listeners = self.listeners.lock().unwrap().clone();
267-
for listener in listeners.iter() {
268-
match listener.upgrade() {
269-
Some(arc) => arc.block_disconnected(&header, disconnected_height),
270-
None => ()
271-
}
276+
let ref_listeners = self.ref_listeners.lock().unwrap().clone();
277+
for listener in ref_listeners.iter() {
278+
listener.block_disconnected(&header, disconnected_height);
279+
}
280+
281+
let arc_listeners = self.arc_listeners.lock().unwrap().clone();
282+
for listener in arc_listeners.iter() {
283+
listener.block_disconnected(&header, disconnected_height);
272284
}
273285
}
274286

0 commit comments

Comments
 (0)