Skip to content

Commit 136f36d

Browse files
committed
Utility for syncing a set of chain listeners
Add a utility for syncing a set of chain listeners to a common chain tip. Required to use before creating an SpvClient when the chain listener used with the client is actually a set of listeners each of which may have had left off at a different block. This would occur when the listeners had been persisted individually at different frequencies (e.g., a ChainMonitor's individual ChannelMonitors).
1 parent c93907b commit 136f36d

File tree

2 files changed

+359
-55
lines changed

2 files changed

+359
-55
lines changed

lightning-block-sync/src/init.rs

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
use crate::{BlockSource, BlockSourceResult, Cache, ChainListener, ChainNotifier};
2+
use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader};
3+
4+
use bitcoin::blockdata::block::{Block, BlockHeader};
5+
use bitcoin::hash_types::BlockHash;
6+
use bitcoin::network::constants::Network;
7+
8+
/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each
9+
/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip.
10+
///
11+
/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of
12+
/// failure, each listener may be left at a different block hash than the one it was originally
13+
/// paired with.
14+
///
15+
/// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before
16+
/// switching to [`SpvClient`].
17+
///
18+
/// [`SpvClient`]: ../struct.SpvClient.html
19+
/// [`ChannelManager`]: ../../lightning/ln/channelmanager/struct.ChannelManager.html
20+
/// [`ChannelMonitor`]: ../../lightning/chain/channelmonitor/struct.ChannelMonitor.html
21+
pub async fn sync_listeners<B: BlockSource, C: Cache>(
22+
block_source: &mut B,
23+
network: Network,
24+
header_cache: &mut C,
25+
mut chain_listeners: Vec<(BlockHash, &mut dyn ChainListener)>,
26+
) -> BlockSourceResult<ValidatedBlockHeader> {
27+
let (best_block_hash, best_block_height) = block_source.get_best_block().await?;
28+
let new_header = block_source
29+
.get_header(&best_block_hash, best_block_height).await?
30+
.validate(best_block_hash)?;
31+
32+
// Find differences and disconnect blocks for each listener individually.
33+
let mut chain_listeners_at_height = Vec::new();
34+
let mut most_common_ancestor = None;
35+
let mut most_connected_blocks = Vec::new();
36+
for (old_block, chain_listener) in chain_listeners.drain(..) {
37+
let old_header = match header_cache.look_up(&old_block) {
38+
Some(header) => *header,
39+
None => block_source
40+
.get_header(&old_block, None).await?
41+
.validate(old_block)?
42+
};
43+
44+
// Disconnect any stale blocks, but keep them in the cache for the next iteration.
45+
let header_cache = &mut NonDiscardingCache(header_cache);
46+
let mut chain_notifier = ChainNotifier { header_cache };
47+
let mut chain_poller = ChainPoller::new(block_source as &mut dyn BlockSource, network);
48+
let difference =
49+
chain_notifier.find_difference(new_header, &old_header, &mut chain_poller).await?;
50+
chain_notifier.disconnect_blocks(
51+
difference.disconnected_blocks,
52+
&mut DynamicChainListener(chain_listener),
53+
);
54+
55+
// Keep track of the most common ancestor and all blocks connected across all listeners.
56+
chain_listeners_at_height.push((difference.common_ancestor.height, chain_listener));
57+
if difference.connected_blocks.len() > most_connected_blocks.len() {
58+
most_common_ancestor = Some(difference.common_ancestor);
59+
most_connected_blocks = difference.connected_blocks;
60+
}
61+
}
62+
63+
// Connect new blocks for all listeners at once to avoid re-fetching blocks.
64+
if let Some(common_ancestor) = most_common_ancestor {
65+
let mut chain_notifier = ChainNotifier { header_cache };
66+
let mut chain_poller = ChainPoller::new(block_source as &mut dyn BlockSource, network);
67+
let mut chain_listener = ChainListenerSet(chain_listeners_at_height);
68+
chain_notifier.connect_blocks(
69+
common_ancestor,
70+
most_connected_blocks,
71+
&mut chain_poller,
72+
&mut chain_listener,
73+
).await.or_else(|(e, _)| Err(e))?;
74+
}
75+
76+
Ok(new_header)
77+
}
78+
79+
/// A cache that won't discard any block headers. Used to prevent losing headers that are needed to
80+
/// disconnect blocks common to more than one listener.
81+
struct NonDiscardingCache<'a, C: Cache>(&'a mut C);
82+
83+
impl<'a, C: Cache> Cache for NonDiscardingCache<'a, C> {
84+
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
85+
self.0.look_up(block_hash)
86+
}
87+
88+
fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) {
89+
unreachable!()
90+
}
91+
92+
fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
93+
None
94+
}
95+
}
96+
97+
/// Wrapper for supporting dynamically sized chain listeners.
98+
struct DynamicChainListener<'a>(&'a mut dyn ChainListener);
99+
100+
impl<'a> ChainListener for DynamicChainListener<'a> {
101+
fn block_connected(&mut self, _block: &Block, _height: u32) {
102+
unreachable!()
103+
}
104+
105+
fn block_disconnected(&mut self, header: &BlockHeader, height: u32) {
106+
self.0.block_disconnected(header, height)
107+
}
108+
}
109+
110+
/// A set of dynamically sized chain listeners, each paired with a starting block height.
111+
struct ChainListenerSet<'a>(Vec<(u32, &'a mut dyn ChainListener)>);
112+
113+
impl<'a> ChainListener for ChainListenerSet<'a> {
114+
fn block_connected(&mut self, block: &Block, height: u32) {
115+
for (starting_height, chain_listener) in self.0.iter_mut() {
116+
if height > *starting_height {
117+
chain_listener.block_connected(block, height);
118+
}
119+
}
120+
}
121+
122+
fn block_disconnected(&mut self, _header: &BlockHeader, _height: u32) {
123+
unreachable!()
124+
}
125+
}
126+
127+
#[cfg(test)]
128+
mod tests {
129+
use crate::test_utils::{Blockchain, MockChainListener};
130+
use super::*;
131+
132+
use bitcoin::network::constants::Network;
133+
134+
#[tokio::test]
135+
async fn sync_from_same_chain() {
136+
let mut chain = Blockchain::default().with_height(4);
137+
let new_tip = chain.tip();
138+
let old_tip_1 = chain.at_height(1);
139+
let old_tip_2 = chain.at_height(2);
140+
let old_tip_3 = chain.at_height(3);
141+
142+
let mut listener_1 = MockChainListener::new()
143+
.expect_block_connected(*old_tip_2)
144+
.expect_block_connected(*old_tip_3)
145+
.expect_block_connected(*new_tip);
146+
let mut listener_2 = MockChainListener::new()
147+
.expect_block_connected(*old_tip_3)
148+
.expect_block_connected(*new_tip);
149+
let mut listener_3 = MockChainListener::new()
150+
.expect_block_connected(*new_tip);
151+
152+
let listeners = vec![
153+
(old_tip_1.block_hash, &mut listener_1 as &mut dyn ChainListener),
154+
(old_tip_2.block_hash, &mut listener_2 as &mut dyn ChainListener),
155+
(old_tip_3.block_hash, &mut listener_3 as &mut dyn ChainListener),
156+
];
157+
let mut cache = chain.header_cache(0..=4);
158+
match sync_listeners(&mut chain, Network::Bitcoin, &mut cache, listeners).await {
159+
Ok(header) => assert_eq!(header, new_tip),
160+
Err(e) => panic!("Unexpected error: {:?}", e),
161+
}
162+
}
163+
164+
#[tokio::test]
165+
async fn sync_from_different_chains() {
166+
let mut main_chain = Blockchain::default().with_height(4);
167+
let fork_chain_1 = main_chain.fork_at_height(1);
168+
let fork_chain_2 = main_chain.fork_at_height(2);
169+
let fork_chain_3 = main_chain.fork_at_height(3);
170+
171+
let new_tip = main_chain.tip();
172+
let old_tip_1 = fork_chain_1.tip();
173+
let old_tip_2 = fork_chain_2.tip();
174+
let old_tip_3 = fork_chain_3.tip();
175+
176+
let mut listener_1 = MockChainListener::new()
177+
.expect_block_disconnected(*fork_chain_1.at_height(4))
178+
.expect_block_disconnected(*fork_chain_1.at_height(3))
179+
.expect_block_disconnected(*fork_chain_1.at_height(2))
180+
.expect_block_connected(*main_chain.at_height(2))
181+
.expect_block_connected(*main_chain.at_height(3))
182+
.expect_block_connected(*main_chain.at_height(4));
183+
let mut listener_2 = MockChainListener::new()
184+
.expect_block_disconnected(*fork_chain_2.at_height(4))
185+
.expect_block_disconnected(*fork_chain_2.at_height(3))
186+
.expect_block_connected(*main_chain.at_height(3))
187+
.expect_block_connected(*main_chain.at_height(4));
188+
let mut listener_3 = MockChainListener::new()
189+
.expect_block_disconnected(*fork_chain_3.at_height(4))
190+
.expect_block_connected(*main_chain.at_height(4));
191+
192+
let listeners = vec![
193+
(old_tip_1.block_hash, &mut listener_1 as &mut dyn ChainListener),
194+
(old_tip_2.block_hash, &mut listener_2 as &mut dyn ChainListener),
195+
(old_tip_3.block_hash, &mut listener_3 as &mut dyn ChainListener),
196+
];
197+
let mut cache = fork_chain_1.header_cache(2..=4);
198+
cache.extend(fork_chain_2.header_cache(3..=4));
199+
cache.extend(fork_chain_3.header_cache(4..=4));
200+
match sync_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await {
201+
Ok(header) => assert_eq!(header, new_tip),
202+
Err(e) => panic!("Unexpected error: {:?}", e),
203+
}
204+
}
205+
206+
#[tokio::test]
207+
async fn sync_from_overlapping_chains() {
208+
let mut main_chain = Blockchain::default().with_height(4);
209+
let fork_chain_1 = main_chain.fork_at_height(1);
210+
let fork_chain_2 = fork_chain_1.fork_at_height(2);
211+
let fork_chain_3 = fork_chain_2.fork_at_height(3);
212+
213+
let new_tip = main_chain.tip();
214+
let old_tip_1 = fork_chain_1.tip();
215+
let old_tip_2 = fork_chain_2.tip();
216+
let old_tip_3 = fork_chain_3.tip();
217+
218+
let mut listener_1 = MockChainListener::new()
219+
.expect_block_disconnected(*fork_chain_1.at_height(4))
220+
.expect_block_disconnected(*fork_chain_1.at_height(3))
221+
.expect_block_disconnected(*fork_chain_1.at_height(2))
222+
.expect_block_connected(*main_chain.at_height(2))
223+
.expect_block_connected(*main_chain.at_height(3))
224+
.expect_block_connected(*main_chain.at_height(4));
225+
let mut listener_2 = MockChainListener::new()
226+
.expect_block_disconnected(*fork_chain_2.at_height(4))
227+
.expect_block_disconnected(*fork_chain_2.at_height(3))
228+
.expect_block_disconnected(*fork_chain_2.at_height(2))
229+
.expect_block_connected(*main_chain.at_height(2))
230+
.expect_block_connected(*main_chain.at_height(3))
231+
.expect_block_connected(*main_chain.at_height(4));
232+
let mut listener_3 = MockChainListener::new()
233+
.expect_block_disconnected(*fork_chain_3.at_height(4))
234+
.expect_block_disconnected(*fork_chain_3.at_height(3))
235+
.expect_block_disconnected(*fork_chain_3.at_height(2))
236+
.expect_block_connected(*main_chain.at_height(2))
237+
.expect_block_connected(*main_chain.at_height(3))
238+
.expect_block_connected(*main_chain.at_height(4));
239+
240+
let listeners = vec![
241+
(old_tip_1.block_hash, &mut listener_1 as &mut dyn ChainListener),
242+
(old_tip_2.block_hash, &mut listener_2 as &mut dyn ChainListener),
243+
(old_tip_3.block_hash, &mut listener_3 as &mut dyn ChainListener),
244+
];
245+
let mut cache = fork_chain_1.header_cache(2..=4);
246+
cache.extend(fork_chain_2.header_cache(3..=4));
247+
cache.extend(fork_chain_3.header_cache(4..=4));
248+
match sync_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await {
249+
Ok(header) => assert_eq!(header, new_tip),
250+
Err(e) => panic!("Unexpected error: {:?}", e),
251+
}
252+
}
253+
254+
#[tokio::test]
255+
async fn cache_connected_and_keep_disconnected_blocks() {
256+
let mut main_chain = Blockchain::default().with_height(2);
257+
let fork_chain = main_chain.fork_at_height(1);
258+
259+
let new_tip = main_chain.tip();
260+
let old_tip = fork_chain.tip();
261+
262+
let mut listener = MockChainListener::new()
263+
.expect_block_disconnected(*fork_chain.at_height(2))
264+
.expect_block_connected(*main_chain.at_height(2));
265+
266+
let listeners = vec![(old_tip.block_hash, &mut listener as &mut dyn ChainListener)];
267+
let mut cache = fork_chain.header_cache(2..=2);
268+
match sync_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await {
269+
Ok(_) => {
270+
assert!(cache.contains_key(&new_tip.block_hash));
271+
assert!(cache.contains_key(&old_tip.block_hash));
272+
},
273+
Err(e) => panic!("Unexpected error: {:?}", e),
274+
}
275+
}
276+
}

0 commit comments

Comments
 (0)