Skip to content

Commit 4fa6d96

Browse files
authored
Merge pull request #443 from valentinewallace/channelmgr-arcs-to-derefs
Drop ChannelManager's ChannelMonitor Arc for Deref
2 parents 9a02115 + 4833d1a commit 4fa6d96

File tree

11 files changed

+603
-274
lines changed

11 files changed

+603
-274
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ pub struct TestChannelMonitor {
8585
impl TestChannelMonitor {
8686
pub fn new(chain_monitor: Arc<dyn chaininterface::ChainWatchInterface>, broadcaster: Arc<dyn chaininterface::BroadcasterInterface>, logger: Arc<dyn Logger>, feeest: Arc<dyn chaininterface::FeeEstimator>) -> Self {
8787
Self {
88-
simple_monitor: channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest),
88+
simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest)),
8989
update_ret: Mutex::new(Ok(())),
9090
latest_good_update: Mutex::new(HashMap::new()),
9191
latest_update_good: Mutex::new(HashMap::new()),
@@ -190,7 +190,7 @@ pub fn do_test(data: &[u8]) {
190190
config.channel_options.fee_proportional_millionths = 0;
191191
config.channel_options.announced_channel = true;
192192
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
193-
(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap(),
193+
(Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>, broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap()),
194194
monitor)
195195
} }
196196
}
@@ -221,14 +221,14 @@ pub fn do_test(data: &[u8]) {
221221
let read_args = ChannelManagerReadArgs {
222222
keys_manager,
223223
fee_estimator: fee_est.clone(),
224-
monitor: monitor.clone(),
224+
monitor: monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>,
225225
tx_broadcaster: broadcast.clone(),
226226
logger,
227227
default_config: config,
228228
channel_monitors: &mut monitor_refs,
229229
};
230230

231-
let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
231+
let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
232232
for (_, was_good) in $old_monitors.latest_updates_good_at_last_ser.lock().unwrap().iter() {
233233
if !was_good {
234234
// If the last time we updated a monitor we didn't successfully update (and we

fuzz/src/full_stack.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,9 @@ impl<'a> Hash for Peer<'a> {
136136
}
137137

138138
struct MoneyLossDetector<'a> {
139-
manager: Arc<ChannelManager<EnforcingChannelKeys>>,
139+
manager: Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>,
140140
monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
141-
handler: PeerManager<Peer<'a>>,
141+
handler: PeerManager<Peer<'a>, Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>>,
142142

143143
peers: &'a RefCell<[bool; 256]>,
144144
funding_txn: Vec<Transaction>,
@@ -149,7 +149,7 @@ struct MoneyLossDetector<'a> {
149149
blocks_connected: u32,
150150
}
151151
impl<'a> MoneyLossDetector<'a> {
152-
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<EnforcingChannelKeys>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>>) -> Self {
152+
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>, Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>>) -> Self {
153153
MoneyLossDetector {
154154
manager,
155155
monitor,
@@ -320,14 +320,14 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
320320

321321
let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin, Arc::clone(&logger)));
322322
let broadcast = Arc::new(TestBroadcaster{});
323-
let monitor = channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone());
323+
let monitor = Arc::new(channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone()));
324324

325325
let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) });
326326
let mut config = UserConfig::default();
327327
config.channel_options.fee_proportional_millionths = slice_to_be32(get_slice!(4));
328328
config.channel_options.announced_channel = get_slice!(1)[0] != 0;
329329
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
330-
let channelmanager = ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap();
330+
let channelmanager = Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>, broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap());
331331
let router = Arc::new(Router::new(PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret()), watch.clone(), Arc::clone(&logger)));
332332

333333
let peers = RefCell::new([false; 256]);

lightning-net-tokio/src/lib.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use tokio::net::TcpStream;
1919

2020
use lightning::ln::peer_handler;
2121
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
22+
use lightning::ln::msgs::ChannelMessageHandler;
2223

2324
use std::mem;
2425
use std::net::SocketAddr;
@@ -42,7 +43,7 @@ pub struct Connection {
4243
id: u64,
4344
}
4445
impl Connection {
45-
fn schedule_read(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
46+
fn schedule_read<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
4647
let us_ref = us.clone();
4748
let us_close_ref = us.clone();
4849
let peer_manager_ref = peer_manager.clone();
@@ -110,7 +111,7 @@ impl Connection {
110111
///
111112
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
112113
/// ChannelManager and ChannelMonitor objects.
113-
pub fn setup_inbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
114+
pub fn setup_inbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
114115
let (reader, us) = Self::new(event_notify, stream);
115116

116117
if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone(), peer_manager.clone())) {
@@ -124,7 +125,7 @@ impl Connection {
124125
///
125126
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
126127
/// ChannelManager and ChannelMonitor objects.
127-
pub fn setup_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
128+
pub fn setup_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
128129
let (reader, us) = Self::new(event_notify, stream);
129130

130131
if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone(), peer_manager.clone())) {
@@ -142,7 +143,7 @@ impl Connection {
142143
///
143144
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
144145
/// ChannelManager and ChannelMonitor objects.
145-
pub fn connect_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
146+
pub fn connect_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
146147
let connect_timeout = Delay::new(Instant::now() + Duration::from_secs(10)).then(|_| {
147148
future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
148149
});
@@ -157,19 +158,18 @@ impl Connection {
157158
}
158159
}
159160

160-
#[derive(Clone)]
161-
pub struct SocketDescriptor {
161+
pub struct SocketDescriptor<CMH: ChannelMessageHandler + 'static> {
162162
conn: Arc<Mutex<Connection>>,
163163
id: u64,
164-
peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>,
164+
peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>,
165165
}
166-
impl SocketDescriptor {
167-
fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>) -> Self {
166+
impl<CMH: ChannelMessageHandler> SocketDescriptor<CMH> {
167+
fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>) -> Self {
168168
let id = conn.lock().unwrap().id;
169169
Self { conn, id, peer_manager }
170170
}
171171
}
172-
impl peer_handler::SocketDescriptor for SocketDescriptor {
172+
impl<CMH: ChannelMessageHandler> peer_handler::SocketDescriptor for SocketDescriptor<CMH> {
173173
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
174174
macro_rules! schedule_read {
175175
($us_ref: expr) => {
@@ -256,13 +256,22 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
256256
us.read_paused = true;
257257
}
258258
}
259-
impl Eq for SocketDescriptor {}
260-
impl PartialEq for SocketDescriptor {
259+
impl<CMH: ChannelMessageHandler> Clone for SocketDescriptor<CMH> {
260+
fn clone(&self) -> Self {
261+
Self {
262+
conn: Arc::clone(&self.conn),
263+
id: self.id,
264+
peer_manager: Arc::clone(&self.peer_manager),
265+
}
266+
}
267+
}
268+
impl<CMH: ChannelMessageHandler> Eq for SocketDescriptor<CMH> {}
269+
impl<CMH: ChannelMessageHandler> PartialEq for SocketDescriptor<CMH> {
261270
fn eq(&self, o: &Self) -> bool {
262271
self.id == o.id
263272
}
264273
}
265-
impl Hash for SocketDescriptor {
274+
impl<CMH: ChannelMessageHandler> Hash for SocketDescriptor<CMH> {
266275
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
267276
self.id.hash(state);
268277
}

lightning/src/chain/chaininterface.rs

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ use bitcoin::network::constants::Network;
1414

1515
use util::logger::Logger;
1616

17-
use std::sync::{Mutex,Weak,MutexGuard,Arc};
17+
use std::sync::{Mutex, MutexGuard, Arc};
1818
use std::sync::atomic::{AtomicUsize, Ordering};
1919
use std::collections::HashSet;
20+
use std::ops::Deref;
21+
use std::marker::PhantomData;
2022

2123
/// Used to give chain error details upstream
2224
pub enum ChainError {
@@ -205,26 +207,48 @@ impl ChainWatchedUtil {
205207
}
206208
}
207209

210+
/// BlockNotifierArc is useful when you need a BlockNotifier that points to ChainListeners with
211+
/// static lifetimes, e.g. when you're using lightning-net-tokio (since tokio::spawn requires
212+
/// parameters with static lifetimes). Other times you can afford a reference, which is more
213+
/// efficient, in which case BlockNotifierRef is a more appropriate type. Defining these type
214+
/// aliases prevents issues such as overly long function definitions.
215+
pub type BlockNotifierArc = Arc<BlockNotifier<'static, Arc<ChainListener>>>;
216+
217+
/// BlockNotifierRef is useful when you want a BlockNotifier that points to ChainListeners
218+
/// with nonstatic lifetimes. This is useful for when static lifetimes are not needed. Nonstatic
219+
/// lifetimes are more efficient but less flexible, and should be used by default unless static
220+
/// lifetimes are required, e.g. when you're using lightning-net-tokio (since tokio::spawn
221+
/// requires parameters with static lifetimes), in which case BlockNotifierArc is a more
222+
/// appropriate type. Defining these type aliases for common usages prevents issues such as
223+
/// overly long function definitions.
224+
pub type BlockNotifierRef<'a> = BlockNotifier<'a, &'a ChainListener>;
225+
208226
/// Utility for notifying listeners about new blocks, and handling block rescans if new watch
209227
/// data is registered.
210-
pub struct BlockNotifier {
211-
listeners: Mutex<Vec<Weak<ChainListener>>>, //TODO(vmw): try removing Weak
228+
///
229+
/// Rather than using a plain BlockNotifier, it is preferable to use either a BlockNotifierArc
230+
/// or a BlockNotifierRef for conciseness. See their documentation for more details, but essentially
231+
/// you should default to using a BlockNotifierRef, and use a BlockNotifierArc instead when you
232+
/// require ChainListeners with static lifetimes, such as when you're using lightning-net-tokio.
233+
pub struct BlockNotifier<'a, CL: Deref<Target = ChainListener + 'a> + 'a> {
234+
listeners: Mutex<Vec<CL>>,
212235
chain_monitor: Arc<ChainWatchInterface>,
236+
phantom: PhantomData<&'a ()>,
213237
}
214238

215-
impl BlockNotifier {
239+
impl<'a, CL: Deref<Target = ChainListener + 'a> + 'a> BlockNotifier<'a, CL> {
216240
/// Constructs a new BlockNotifier without any listeners.
217-
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier {
241+
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier<'a, CL> {
218242
BlockNotifier {
219243
listeners: Mutex::new(Vec::new()),
220244
chain_monitor,
245+
phantom: PhantomData,
221246
}
222247
}
223248

224-
/// Register the given listener to receive events. Only a weak pointer is provided and
225-
/// the registration should be freed once that pointer expires.
249+
/// Register the given listener to receive events.
226250
// TODO: unregister
227-
pub fn register_listener(&self, listener: Weak<ChainListener>) {
251+
pub fn register_listener(&self, listener: CL) {
228252
let mut vec = self.listeners.lock().unwrap();
229253
vec.push(listener);
230254
}
@@ -250,25 +274,19 @@ impl BlockNotifier {
250274
pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
251275
let last_seen = self.chain_monitor.reentered();
252276

253-
let listeners = self.listeners.lock().unwrap().clone();
277+
let listeners = self.listeners.lock().unwrap();
254278
for listener in listeners.iter() {
255-
match listener.upgrade() {
256-
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
257-
None => ()
258-
}
279+
listener.block_connected(header, height, txn_matched, indexes_of_txn_matched);
259280
}
260281
return last_seen != self.chain_monitor.reentered();
261282
}
262283

263284

264285
/// Notify listeners that a block was disconnected.
265286
pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
266-
let listeners = self.listeners.lock().unwrap().clone();
287+
let listeners = self.listeners.lock().unwrap();
267288
for listener in listeners.iter() {
268-
match listener.upgrade() {
269-
Some(arc) => arc.block_disconnected(&header, disconnected_height),
270-
None => ()
271-
}
289+
listener.block_disconnected(&header, disconnected_height);
272290
}
273291
}
274292

0 commit comments

Comments
 (0)