Skip to content

Commit a63599c

Browse files
Update ChannelManager's ChannelMonitor Arc to be a Deref
Additional changes: * Update fuzz crate to match ChannelManager's new API * Update lightning-net-tokio library to match ChannelManager's new ChannelMonitor Deref API * Update tests to match ChannelManager's new ChannelMonitor Deref API
1 parent 9a02115 commit a63599c

File tree

11 files changed

+563
-277
lines changed

11 files changed

+563
-277
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface,ConfirmationTarget,C
2727
use lightning::chain::keysinterface::{KeysInterface, InMemoryChannelKeys};
2828
use lightning::ln::channelmonitor;
2929
use lightning::ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, HTLCUpdate};
30-
use lightning::ln::channelmanager::{ChannelManager, PaymentHash, PaymentPreimage, ChannelManagerReadArgs};
30+
use lightning::ln::channelmanager::{ChannelManager, ManyChannelMonitorArc, PaymentHash, PaymentPreimage, ChannelManagerReadArgs};
3131
use lightning::ln::router::{Route, RouteHop};
3232
use lightning::ln::features::{ChannelFeatures, InitFeatures, NodeFeatures};
3333
use lightning::ln::msgs::{CommitmentUpdate, ChannelMessageHandler, ErrorAction, UpdateAddHTLC, Init};
@@ -85,7 +85,7 @@ pub struct TestChannelMonitor {
8585
impl TestChannelMonitor {
8686
pub fn new(chain_monitor: Arc<dyn chaininterface::ChainWatchInterface>, broadcaster: Arc<dyn chaininterface::BroadcasterInterface>, logger: Arc<dyn Logger>, feeest: Arc<dyn chaininterface::FeeEstimator>) -> Self {
8787
Self {
88-
simple_monitor: channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest),
88+
simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest)),
8989
update_ret: Mutex::new(Ok(())),
9090
latest_good_update: Mutex::new(HashMap::new()),
9191
latest_update_good: Mutex::new(HashMap::new()),
@@ -190,7 +190,7 @@ pub fn do_test(data: &[u8]) {
190190
config.channel_options.fee_proportional_millionths = 0;
191191
config.channel_options.announced_channel = true;
192192
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
193-
(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap(),
193+
(Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone() as ManyChannelMonitorArc, broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap()),
194194
monitor)
195195
} }
196196
}
@@ -221,14 +221,14 @@ pub fn do_test(data: &[u8]) {
221221
let read_args = ChannelManagerReadArgs {
222222
keys_manager,
223223
fee_estimator: fee_est.clone(),
224-
monitor: monitor.clone(),
224+
monitor: monitor.clone() as ManyChannelMonitorArc,
225225
tx_broadcaster: broadcast.clone(),
226226
logger,
227227
default_config: config,
228228
channel_monitors: &mut monitor_refs,
229229
};
230230

231-
let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
231+
let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys, ManyChannelMonitorArc>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
232232
for (_, was_good) in $old_monitors.latest_updates_good_at_last_ser.lock().unwrap().iter() {
233233
if !was_good {
234234
// If the last time we updated a monitor we didn't successfully update (and we

fuzz/src/full_stack.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface,ConfirmationTarget,C
2222
use lightning::chain::transaction::OutPoint;
2323
use lightning::chain::keysinterface::{InMemoryChannelKeys, KeysInterface};
2424
use lightning::ln::channelmonitor;
25-
use lightning::ln::channelmanager::{ChannelManager, PaymentHash, PaymentPreimage};
25+
use lightning::ln::channelmanager::{ChannelManager, ManyChannelMonitorArc, PaymentHash, PaymentPreimage};
2626
use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor};
2727
use lightning::ln::router::Router;
2828
use lightning::util::events::{EventsProvider,Event};
@@ -136,9 +136,9 @@ impl<'a> Hash for Peer<'a> {
136136
}
137137

138138
struct MoneyLossDetector<'a> {
139-
manager: Arc<ChannelManager<EnforcingChannelKeys>>,
139+
manager: Arc<ChannelManager<EnforcingChannelKeys, ManyChannelMonitorArc>>,
140140
monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
141-
handler: PeerManager<Peer<'a>>,
141+
handler: PeerManager<Peer<'a>, Arc<ChannelManager<EnforcingChannelKeys, ManyChannelMonitorArc>>>,
142142

143143
peers: &'a RefCell<[bool; 256]>,
144144
funding_txn: Vec<Transaction>,
@@ -149,7 +149,7 @@ struct MoneyLossDetector<'a> {
149149
blocks_connected: u32,
150150
}
151151
impl<'a> MoneyLossDetector<'a> {
152-
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<EnforcingChannelKeys>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>>) -> Self {
152+
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<EnforcingChannelKeys, ManyChannelMonitorArc>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>, Arc<ChannelManager<EnforcingChannelKeys, ManyChannelMonitorArc>>>) -> Self {
153153
MoneyLossDetector {
154154
manager,
155155
monitor,
@@ -320,14 +320,14 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
320320

321321
let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin, Arc::clone(&logger)));
322322
let broadcast = Arc::new(TestBroadcaster{});
323-
let monitor = channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone());
323+
let monitor = Arc::new(channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone()));
324324

325325
let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) });
326326
let mut config = UserConfig::default();
327327
config.channel_options.fee_proportional_millionths = slice_to_be32(get_slice!(4));
328328
config.channel_options.announced_channel = get_slice!(1)[0] != 0;
329329
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
330-
let channelmanager = ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap();
330+
let channelmanager = Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>, broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap());
331331
let router = Arc::new(Router::new(PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret()), watch.clone(), Arc::clone(&logger)));
332332

333333
let peers = RefCell::new([false; 256]);

lightning-net-tokio/src/lib.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use tokio::net::TcpStream;
1919

2020
use lightning::ln::peer_handler;
2121
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
22+
use lightning::ln::msgs::ChannelMessageHandler;
2223

2324
use std::mem;
2425
use std::net::SocketAddr;
@@ -42,7 +43,7 @@ pub struct Connection {
4243
id: u64,
4344
}
4445
impl Connection {
45-
fn schedule_read(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
46+
fn schedule_read<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
4647
let us_ref = us.clone();
4748
let us_close_ref = us.clone();
4849
let peer_manager_ref = peer_manager.clone();
@@ -110,7 +111,7 @@ impl Connection {
110111
///
111112
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
112113
/// ChannelManager and ChannelMonitor objects.
113-
pub fn setup_inbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
114+
pub fn setup_inbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
114115
let (reader, us) = Self::new(event_notify, stream);
115116

116117
if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone(), peer_manager.clone())) {
@@ -124,7 +125,7 @@ impl Connection {
124125
///
125126
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
126127
/// ChannelManager and ChannelMonitor objects.
127-
pub fn setup_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
128+
pub fn setup_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
128129
let (reader, us) = Self::new(event_notify, stream);
129130

130131
if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone(), peer_manager.clone())) {
@@ -142,7 +143,7 @@ impl Connection {
142143
///
143144
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
144145
/// ChannelManager and ChannelMonitor objects.
145-
pub fn connect_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
146+
pub fn connect_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
146147
let connect_timeout = Delay::new(Instant::now() + Duration::from_secs(10)).then(|_| {
147148
future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
148149
});
@@ -157,19 +158,18 @@ impl Connection {
157158
}
158159
}
159160

160-
#[derive(Clone)]
161-
pub struct SocketDescriptor {
161+
pub struct SocketDescriptor<CMH: ChannelMessageHandler + 'static> {
162162
conn: Arc<Mutex<Connection>>,
163163
id: u64,
164-
peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>,
164+
peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>,
165165
}
166-
impl SocketDescriptor {
167-
fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>) -> Self {
166+
impl<CMH: ChannelMessageHandler> SocketDescriptor<CMH> {
167+
fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>) -> Self {
168168
let id = conn.lock().unwrap().id;
169169
Self { conn, id, peer_manager }
170170
}
171171
}
172-
impl peer_handler::SocketDescriptor for SocketDescriptor {
172+
impl<CMH: ChannelMessageHandler> peer_handler::SocketDescriptor for SocketDescriptor<CMH> {
173173
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
174174
macro_rules! schedule_read {
175175
($us_ref: expr) => {
@@ -256,13 +256,22 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
256256
us.read_paused = true;
257257
}
258258
}
259-
impl Eq for SocketDescriptor {}
260-
impl PartialEq for SocketDescriptor {
259+
impl<CMH: ChannelMessageHandler> Clone for SocketDescriptor<CMH> {
260+
fn clone(&self) -> Self {
261+
Self {
262+
conn: Arc::clone(&self.conn),
263+
id: self.id,
264+
peer_manager: Arc::clone(&self.peer_manager),
265+
}
266+
}
267+
}
268+
impl<CMH: ChannelMessageHandler> Eq for SocketDescriptor<CMH> {}
269+
impl<CMH: ChannelMessageHandler> PartialEq for SocketDescriptor<CMH> {
261270
fn eq(&self, o: &Self) -> bool {
262271
self.id == o.id
263272
}
264273
}
265-
impl Hash for SocketDescriptor {
274+
impl<CMH: ChannelMessageHandler> Hash for SocketDescriptor<CMH> {
266275
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
267276
self.id.hash(state);
268277
}

lightning/src/chain/chaininterface.rs

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ use bitcoin::network::constants::Network;
1414

1515
use util::logger::Logger;
1616

17-
use std::sync::{Mutex,Weak,MutexGuard,Arc};
17+
use std::sync::{Mutex, MutexGuard, Arc};
1818
use std::sync::atomic::{AtomicUsize, Ordering};
1919
use std::collections::HashSet;
20+
use std::ops::Deref;
21+
use std::marker::PhantomData;
2022

2123
/// Used to give chain error details upstream
2224
pub enum ChainError {
@@ -205,26 +207,37 @@ impl ChainWatchedUtil {
205207
}
206208
}
207209

210+
/// Type alias for an Arc containing a ChainListener trait object.
211+
pub type ChainListenerArc = Arc<ChainListener>;
212+
213+
/// Type alias for reference to a ChainListener.
214+
pub type ChainListenerRef<'a> = &'a ChainListener;
215+
208216
/// Utility for notifying listeners about new blocks, and handling block rescans if new watch
209217
/// data is registered.
210-
pub struct BlockNotifier {
211-
listeners: Mutex<Vec<Weak<ChainListener>>>, //TODO(vmw): try removing Weak
218+
/// CL can be a ChainListenerArc or a ChainListenerRef, which is useful because sometimes you need
219+
/// a ChainListener with a static lifetime, i.e. when you're using lightning-net-tokio (since
220+
/// tokio::spawn requires parameters with static lifetimes). Other times you can afford a reference,
221+
/// which is more efficient.
222+
pub struct BlockNotifier<'a, CL: Deref<Target = ChainListener + 'a> + 'a> {
223+
listeners: Mutex<Vec<CL>>,
212224
chain_monitor: Arc<ChainWatchInterface>,
225+
phantom: PhantomData<&'a ()>,
213226
}
214227

215-
impl BlockNotifier {
228+
impl<'a, CL: Deref<Target = ChainListener + 'a> + 'a> BlockNotifier<'a, CL> {
216229
/// Constructs a new BlockNotifier without any listeners.
217-
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier {
230+
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier<'a, CL> {
218231
BlockNotifier {
219232
listeners: Mutex::new(Vec::new()),
220233
chain_monitor,
234+
phantom: PhantomData,
221235
}
222236
}
223237

224-
/// Register the given listener to receive events. Only a weak pointer is provided and
225-
/// the registration should be freed once that pointer expires.
238+
/// Register the given listener to receive events.
226239
// TODO: unregister
227-
pub fn register_listener(&self, listener: Weak<ChainListener>) {
240+
pub fn register_listener(&self, listener: CL) {
228241
let mut vec = self.listeners.lock().unwrap();
229242
vec.push(listener);
230243
}
@@ -250,25 +263,19 @@ impl BlockNotifier {
250263
pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
251264
let last_seen = self.chain_monitor.reentered();
252265

253-
let listeners = self.listeners.lock().unwrap().clone();
266+
let listeners = self.listeners.lock().unwrap();
254267
for listener in listeners.iter() {
255-
match listener.upgrade() {
256-
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
257-
None => ()
258-
}
268+
listener.block_connected(header, height, txn_matched, indexes_of_txn_matched);
259269
}
260270
return last_seen != self.chain_monitor.reentered();
261271
}
262272

263273

264274
/// Notify listeners that a block was disconnected.
265275
pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
266-
let listeners = self.listeners.lock().unwrap().clone();
276+
let listeners = self.listeners.lock().unwrap();
267277
for listener in listeners.iter() {
268-
match listener.upgrade() {
269-
Some(arc) => arc.block_disconnected(&header, disconnected_height),
270-
None => ()
271-
}
278+
listener.block_disconnected(&header, disconnected_height);
272279
}
273280
}
274281

0 commit comments

Comments
 (0)