Skip to content

Commit 35b34be

Browse files
Add a sample module FilesystemPersister.
This adds a new lightning-data-persister crate, that uses the newly exposed lightning crate's test utilities. Notably, this crate is pretty small right now. However, due to future plans to add more data persistence (e.g. persisting the ChannelManager, etc) and a desire to avoid pulling in filesystem usage into the core lightning package, it is best for it to be separated out.
1 parent 8ace280 commit 35b34be

File tree

3 files changed

+234
-0
lines changed

3 files changed

+234
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
members = [
44
"lightning",
55
"lightning-net-tokio",
6+
"lightning-data-persister",
67
]
78

89
# Our tests do actual crypo and lots of work, the tradeoff for -O1 is well worth it

lightning-data-persister/Cargo.toml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
[package]
2+
name = "lightning-data-persister"
3+
version = "0.0.1"
4+
authors = ["Matt Corallo"]
5+
license = "Apache-2.0"
6+
description = """
7+
Utilities to manage channel data persistence and retrieval.
8+
"""
9+
10+
[dependencies]
11+
bitcoin = "0.23"
12+
lightning = { version = "0.0.11", path = "../lightning" }
13+
14+
[dev-dependencies.bitcoin]
15+
version = "0.23"
16+
features = ["bitcoinconsensus"]
17+
18+
[dev-dependencies]
19+
lightning = { version = "0.0.11", path = "../lightning", features = ["_test-utils"] }

lightning-data-persister/src/lib.rs

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
extern crate lightning;
2+
extern crate bitcoin;
3+
4+
use lightning::ln::data_persister::ChannelDataPersister;
5+
use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr};
6+
use lightning::chain::keysinterface::ChannelKeys;
7+
use lightning::chain::transaction::OutPoint;
8+
use lightning::util::ser::{Writeable, Readable};
9+
use bitcoin::hash_types::{BlockHash, Txid};
10+
use bitcoin::hashes::hex::{ToHex, FromHex};
11+
use std::fs;
12+
use std::io::{Error, ErrorKind, Cursor};
13+
use std::collections::HashMap;
14+
use std::marker::PhantomData;
15+
16+
/// FilesystemPersister can persist channel data on disk on Linux machines, where
17+
/// each channel's data is stored in a file named after its funding outpoint.
18+
pub struct FilesystemPersister<ChanSigner: ChannelKeys + Readable + Writeable> {
19+
path_to_channel_data: String,
20+
phantom: PhantomData<ChanSigner>, // TODO: is there a way around this?
21+
}
22+
23+
impl<ChanSigner: ChannelKeys + Readable + Writeable> FilesystemPersister<ChanSigner> {
24+
/// Initialize a new FilesystemPersister and set the path to the individual channels'
25+
/// files.
26+
pub fn new(path_to_channel_data: String) -> Self {
27+
return Self {
28+
path_to_channel_data,
29+
phantom: PhantomData,
30+
}
31+
}
32+
33+
fn get_full_filepath(&self, funding_txo: OutPoint) -> String {
34+
format!("{}/{}_{}", self.path_to_channel_data, funding_txo.txid.to_hex(), funding_txo.index)
35+
}
36+
37+
fn write_channel_data(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChanSigner>) -> std::io::Result<()> {
38+
// Do a crazy dance with lots of fsync()s to be overly cautious here...
39+
// We never want to end up in a state where we've lost the old data, or end up using the
40+
// old data on power loss after we've returned
41+
// Note that this actually *isn't* enough (at least on Linux)! We need to fsync an fd with
42+
// the containing dir, but Rust doesn't let us do that directly, sadly. TODO: Fix this with
43+
// the libc crate!
44+
let filename = self.get_full_filepath(funding_txo);
45+
let tmp_filename = filename.clone() + ".tmp";
46+
47+
{
48+
let mut f = fs::File::create(&tmp_filename)?;
49+
monitor.write_for_disk(&mut f)?;
50+
f.sync_all()?;
51+
}
52+
// We don't need to create a backup if didn't already have the file, but in any other case
53+
// try to create the backup and expect failure on fs::copy() if eg there's a perms issue.
54+
let need_bk = match fs::metadata(&filename) {
55+
Ok(data) => {
56+
if !data.is_file() { return Err(Error::new(ErrorKind::InvalidInput, "Filename given was not a file")); }
57+
true
58+
},
59+
Err(e) => match e.kind() {
60+
std::io::ErrorKind::NotFound => false,
61+
_ => true,
62+
}
63+
};
64+
let bk_filename = filename.clone() + ".bk";
65+
if need_bk {
66+
fs::copy(&filename, &bk_filename)?;
67+
{
68+
let f = fs::File::open(&bk_filename)?;
69+
f.sync_all()?;
70+
}
71+
}
72+
fs::rename(&tmp_filename, &filename)?;
73+
{
74+
let f = fs::File::open(&filename)?;
75+
f.sync_all()?;
76+
}
77+
if need_bk {
78+
fs::remove_file(&bk_filename)?;
79+
}
80+
Ok(())
81+
}
82+
}
83+
84+
impl<ChanSigner: ChannelKeys + Readable + Writeable + Send + Sync> ChannelDataPersister for FilesystemPersister<ChanSigner> {
85+
type Keys = ChanSigner;
86+
87+
fn persist_channel_data(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<Self::Keys>) -> Result<(), ChannelMonitorUpdateErr> {
88+
match self.write_channel_data(funding_txo, monitor) {
89+
Ok(_) => Ok(()),
90+
Err(_) => Err(ChannelMonitorUpdateErr::TemporaryFailure)
91+
}
92+
}
93+
94+
fn update_channel_data(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor<ChanSigner>) -> Result<(), ChannelMonitorUpdateErr> {
95+
match self.write_channel_data(funding_txo, monitor) {
96+
Ok(_) => Ok(()),
97+
Err(_) => Err(ChannelMonitorUpdateErr::TemporaryFailure)
98+
}
99+
}
100+
101+
fn load_channel_data(&self) -> Result<HashMap<OutPoint, ChannelMonitor<ChanSigner>>, ChannelMonitorUpdateErr> {
102+
if let Err(_) = fs::create_dir_all(&self.path_to_channel_data) {
103+
return Err(ChannelMonitorUpdateErr::TemporaryFailure);
104+
}
105+
let mut res = HashMap::new();
106+
for file_option in fs::read_dir(&self.path_to_channel_data).unwrap() {
107+
let mut loaded = false;
108+
let file = file_option.unwrap();
109+
if let Some(filename) = file.file_name().to_str() {
110+
if filename.is_ascii() && filename.len() > 65 {
111+
if let Ok(txid) = Txid::from_hex(filename.split_at(64).0) {
112+
if let Ok(index) = filename.split_at(65).1.split('.').next().unwrap().parse() {
113+
if let Ok(contents) = fs::read(&file.path()) {
114+
if let Ok((_, loaded_monitor)) = <(BlockHash, ChannelMonitor<ChanSigner>)>::read(&mut Cursor::new(&contents)) {
115+
res.insert(OutPoint { txid, index }, loaded_monitor);
116+
loaded = true;
117+
}
118+
}
119+
}
120+
}
121+
}
122+
}
123+
if !loaded {
124+
// TODO(val): this should prob error not just print something
125+
println!("WARNING: Failed to read one of the channel monitor storage files! Check perms!");
126+
}
127+
}
128+
Ok(res)
129+
}
130+
}
131+
132+
#[cfg(test)]
133+
mod tests {
134+
extern crate lightning;
135+
extern crate bitcoin;
136+
use crate::FilesystemPersister;
137+
use lightning::ln::features::InitFeatures;
138+
use lightning::ln::data_persister::ChannelDataPersister;
139+
use lightning::ln::functional_test_utils::*;
140+
use lightning::ln::msgs::ErrorAction;
141+
use lightning::{check_closed_broadcast, check_added_monitors};
142+
use lightning::util::enforcing_trait_impls::EnforcingChannelKeys;
143+
use lightning::util::events::{MessageSendEventsProvider, MessageSendEvent};
144+
use lightning::util::test_utils;
145+
use bitcoin::blockdata::block::{Block, BlockHeader};
146+
use std::fs;
147+
148+
#[test]
149+
fn test_filesystem_data_persister() {
150+
// Create the nodes, giving them FilesystemPersisters for data persisters.
151+
let data_persister_0: FilesystemPersister<EnforcingChannelKeys> = FilesystemPersister::new("./persister0".to_string());
152+
let data_persister_1: FilesystemPersister<EnforcingChannelKeys> = FilesystemPersister::new("./persister1".to_string());
153+
let chanmon_cfgs = create_chanmon_cfgs(2);
154+
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
155+
let chain_mon_0 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &data_persister_0);
156+
let chain_mon_1 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[1].chain_source), &chanmon_cfgs[1].tx_broadcaster, &chanmon_cfgs[1].logger, &chanmon_cfgs[1].fee_estimator, &data_persister_1);
157+
node_cfgs[0].chain_monitor = chain_mon_0;
158+
node_cfgs[1].chain_monitor = chain_mon_1;
159+
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
160+
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
161+
162+
// Check that the persisted channel data is empty before any channels are
163+
// open.
164+
let mut persisted_chan_data_0 = data_persister_0.load_channel_data().unwrap();
165+
assert_eq!(persisted_chan_data_0.keys().len(), 0);
166+
let mut persisted_chan_data_1 = data_persister_1.load_channel_data().unwrap();
167+
assert_eq!(persisted_chan_data_1.keys().len(), 0);
168+
169+
// Helper to make sure the channel is on the expected update ID.
170+
macro_rules! check_persisted_data {
171+
($expected_update_id: expr) => {
172+
persisted_chan_data_0 = data_persister_0.load_channel_data().unwrap();
173+
assert_eq!(persisted_chan_data_0.keys().len(), 1);
174+
for mon in persisted_chan_data_0.values() {
175+
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
176+
}
177+
persisted_chan_data_1 = data_persister_1.load_channel_data().unwrap();
178+
assert_eq!(persisted_chan_data_1.keys().len(), 1);
179+
for mon in persisted_chan_data_1.values() {
180+
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
181+
}
182+
}
183+
}
184+
185+
// Create some initial channel and check that a channel was persisted.
186+
let _ = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
187+
check_persisted_data!(0);
188+
189+
// Send a few payments and make sure the monitors are updated to the latest.
190+
send_payment(&nodes[0], &vec!(&nodes[1])[..], 8000000, 8_000_000);
191+
check_persisted_data!(5);
192+
send_payment(&nodes[1], &vec!(&nodes[0])[..], 4000000, 4_000_000);
193+
check_persisted_data!(10);
194+
195+
// Close the channel and make sure everything is persisted as expected.
196+
// Force close because cooperative close doesn't result in any persisted
197+
// updates.
198+
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id);
199+
check_closed_broadcast!(nodes[0], false);
200+
check_added_monitors!(nodes[0], 1);
201+
202+
let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
203+
assert_eq!(node_txn.len(), 1);
204+
205+
let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
206+
connect_block(&nodes[1], &Block { header, txdata: vec![node_txn[0].clone(), node_txn[0].clone()]}, 1);
207+
check_closed_broadcast!(nodes[1], false);
208+
check_added_monitors!(nodes[1], 1);
209+
check_persisted_data!(11);
210+
211+
fs::remove_dir_all("./persister0").unwrap();
212+
fs::remove_dir_all("./persister1").unwrap();
213+
}
214+
}

0 commit comments

Comments
 (0)