Skip to content

Commit 4113f1e

Browse files
committed
feat: unify bitcoin core API clients
Here we unify the REST and RPC API clients primarily to reduce code duplication.
1 parent a6e3cf3 commit 4113f1e

File tree

6 files changed

+825
-1365
lines changed

6 files changed

+825
-1365
lines changed

src/chain/bitcoind/common.rs

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
use crate::types::{ChainMonitor, ChannelManager, Sweeper, Wallet};
2+
3+
use bitcoin::{transaction::Txid, Transaction};
4+
use bitcoin::{BlockHash, FeeRate};
5+
use lightning::chain::Listen;
6+
use lightning_block_sync::http::{HttpEndpoint, JsonResponse};
7+
use lightning_block_sync::poll::ValidatedBlockHeader;
8+
use lightning_block_sync::Cache;
9+
10+
use base64::{prelude::BASE64_STANDARD, Engine};
11+
use serde::Serialize;
12+
13+
use std::collections::{HashMap, VecDeque};
14+
use std::sync::Arc;
15+
16+
#[derive(Debug, Clone)]
17+
pub(crate) struct MempoolEntry {
18+
/// The transaction id
19+
pub txid: Txid,
20+
/// Local time transaction entered pool in seconds since 1 Jan 1970 GMT
21+
pub time: u64,
22+
/// Block height when transaction entered pool
23+
pub height: u32,
24+
}
25+
26+
#[derive(Debug, Clone, Serialize)]
27+
#[serde(rename_all = "UPPERCASE")]
28+
pub(crate) enum FeeRateEstimationMode {
29+
Economical,
30+
Conservative,
31+
}
32+
33+
pub(crate) struct FeeResponse(pub FeeRate);
34+
35+
impl TryInto<FeeResponse> for JsonResponse {
36+
type Error = std::io::Error;
37+
fn try_into(self) -> std::io::Result<FeeResponse> {
38+
if !self.0["errors"].is_null() {
39+
return Err(std::io::Error::new(
40+
std::io::ErrorKind::Other,
41+
self.0["errors"].to_string(),
42+
));
43+
}
44+
let fee_rate_btc_per_kvbyte = self.0["feerate"]
45+
.as_f64()
46+
.ok_or(std::io::Error::new(std::io::ErrorKind::Other, "Failed to parse fee rate"))?;
47+
// Bitcoin Core gives us a feerate in BTC/KvB.
48+
// Thus, we multiply by 25_000_000 (10^8 / 4) to get satoshis/kwu.
49+
let fee_rate = {
50+
let fee_rate_sat_per_kwu = (fee_rate_btc_per_kvbyte * 25_000_000.0).round() as u64;
51+
FeeRate::from_sat_per_kwu(fee_rate_sat_per_kwu)
52+
};
53+
Ok(FeeResponse(fee_rate))
54+
}
55+
}
56+
57+
pub(crate) struct MempoolMinFeeResponse(pub FeeRate);
58+
59+
impl TryInto<MempoolMinFeeResponse> for JsonResponse {
60+
type Error = std::io::Error;
61+
fn try_into(self) -> std::io::Result<MempoolMinFeeResponse> {
62+
let fee_rate_btc_per_kvbyte = self.0["mempoolminfee"]
63+
.as_f64()
64+
.ok_or(std::io::Error::new(std::io::ErrorKind::Other, "Failed to parse fee rate"))?;
65+
// Bitcoin Core gives us a feerate in BTC/KvB.
66+
// Thus, we multiply by 25_000_000 (10^8 / 4) to get satoshis/kwu.
67+
let fee_rate = {
68+
let fee_rate_sat_per_kwu = (fee_rate_btc_per_kvbyte * 25_000_000.0).round() as u64;
69+
FeeRate::from_sat_per_kwu(fee_rate_sat_per_kwu)
70+
};
71+
Ok(MempoolMinFeeResponse(fee_rate))
72+
}
73+
}
74+
75+
pub(crate) struct GetRawTransactionResponse(pub Transaction);
76+
77+
impl TryInto<GetRawTransactionResponse> for JsonResponse {
78+
type Error = std::io::Error;
79+
fn try_into(self) -> std::io::Result<GetRawTransactionResponse> {
80+
let tx = self
81+
.0
82+
.as_str()
83+
.ok_or(std::io::Error::new(
84+
std::io::ErrorKind::Other,
85+
"Failed to parse getrawtransaction response",
86+
))
87+
.and_then(|s| {
88+
bitcoin::consensus::encode::deserialize_hex(s).map_err(|_| {
89+
std::io::Error::new(
90+
std::io::ErrorKind::Other,
91+
"Failed to parse getrawtransaction response",
92+
)
93+
})
94+
})?;
95+
96+
Ok(GetRawTransactionResponse(tx))
97+
}
98+
}
99+
100+
pub(crate) struct GetRawMempoolResponse(pub Vec<Txid>);
101+
102+
impl TryInto<GetRawMempoolResponse> for JsonResponse {
103+
type Error = std::io::Error;
104+
fn try_into(self) -> std::io::Result<GetRawMempoolResponse> {
105+
let res = self.0.as_array().ok_or(std::io::Error::new(
106+
std::io::ErrorKind::Other,
107+
"Failed to parse getrawmempool response",
108+
))?;
109+
110+
let mut mempool_transactions = Vec::with_capacity(res.len());
111+
112+
for hex in res {
113+
let txid = if let Some(hex_str) = hex.as_str() {
114+
match bitcoin::consensus::encode::deserialize_hex(hex_str) {
115+
Ok(txid) => txid,
116+
Err(_) => {
117+
return Err(std::io::Error::new(
118+
std::io::ErrorKind::Other,
119+
"Failed to parse getrawmempool response",
120+
));
121+
},
122+
}
123+
} else {
124+
return Err(std::io::Error::new(
125+
std::io::ErrorKind::Other,
126+
"Failed to parse getrawmempool response",
127+
));
128+
};
129+
130+
mempool_transactions.push(txid);
131+
}
132+
133+
Ok(GetRawMempoolResponse(mempool_transactions))
134+
}
135+
}
136+
137+
pub(crate) struct GetMempoolEntryResponse {
138+
pub time: u64,
139+
pub height: u32,
140+
}
141+
142+
impl TryInto<GetMempoolEntryResponse> for JsonResponse {
143+
type Error = std::io::Error;
144+
fn try_into(self) -> std::io::Result<GetMempoolEntryResponse> {
145+
let res = self.0.as_object().ok_or(std::io::Error::new(
146+
std::io::ErrorKind::Other,
147+
"Failed to parse getmempoolentry response",
148+
))?;
149+
150+
let time = match res["time"].as_u64() {
151+
Some(time) => time,
152+
None => {
153+
return Err(std::io::Error::new(
154+
std::io::ErrorKind::Other,
155+
"Failed to parse getmempoolentry response",
156+
));
157+
},
158+
};
159+
160+
let height = match res["height"].as_u64().and_then(|h| h.try_into().ok()) {
161+
Some(height) => height,
162+
None => {
163+
return Err(std::io::Error::new(
164+
std::io::ErrorKind::Other,
165+
"Failed to parse getmempoolentry response",
166+
));
167+
},
168+
};
169+
170+
Ok(GetMempoolEntryResponse { time, height })
171+
}
172+
}
173+
174+
const MAX_HEADER_CACHE_ENTRIES: usize = 100;
175+
176+
pub(crate) struct BoundedHeaderCache {
177+
header_map: HashMap<BlockHash, ValidatedBlockHeader>,
178+
recently_seen: VecDeque<BlockHash>,
179+
}
180+
181+
impl BoundedHeaderCache {
182+
pub(crate) fn new() -> Self {
183+
let header_map = HashMap::new();
184+
let recently_seen = VecDeque::new();
185+
Self { header_map, recently_seen }
186+
}
187+
}
188+
189+
impl Cache for BoundedHeaderCache {
190+
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
191+
self.header_map.get(block_hash)
192+
}
193+
194+
fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
195+
self.recently_seen.push_back(block_hash);
196+
self.header_map.insert(block_hash, block_header);
197+
198+
if self.header_map.len() >= MAX_HEADER_CACHE_ENTRIES {
199+
// Keep dropping old entries until we've actually removed a header entry.
200+
while let Some(oldest_entry) = self.recently_seen.pop_front() {
201+
if self.header_map.remove(&oldest_entry).is_some() {
202+
break;
203+
}
204+
}
205+
}
206+
}
207+
208+
fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
209+
self.recently_seen.retain(|e| e != block_hash);
210+
self.header_map.remove(block_hash)
211+
}
212+
}
213+
214+
pub(crate) struct ChainListener {
215+
pub(crate) onchain_wallet: Arc<Wallet>,
216+
pub(crate) channel_manager: Arc<ChannelManager>,
217+
pub(crate) chain_monitor: Arc<ChainMonitor>,
218+
pub(crate) output_sweeper: Arc<Sweeper>,
219+
}
220+
221+
impl Listen for ChainListener {
222+
fn filtered_block_connected(
223+
&self, header: &bitcoin::block::Header,
224+
txdata: &lightning::chain::transaction::TransactionData, height: u32,
225+
) {
226+
self.onchain_wallet.filtered_block_connected(header, txdata, height);
227+
self.channel_manager.filtered_block_connected(header, txdata, height);
228+
self.chain_monitor.filtered_block_connected(header, txdata, height);
229+
self.output_sweeper.filtered_block_connected(header, txdata, height);
230+
}
231+
fn block_connected(&self, block: &bitcoin::Block, height: u32) {
232+
self.onchain_wallet.block_connected(block, height);
233+
self.channel_manager.block_connected(block, height);
234+
self.chain_monitor.block_connected(block, height);
235+
self.output_sweeper.block_connected(block, height);
236+
}
237+
238+
fn block_disconnected(&self, header: &bitcoin::block::Header, height: u32) {
239+
self.onchain_wallet.block_disconnected(header, height);
240+
self.channel_manager.block_disconnected(header, height);
241+
self.chain_monitor.block_disconnected(header, height);
242+
self.output_sweeper.block_disconnected(header, height);
243+
}
244+
}
245+
246+
pub(crate) fn rpc_credentials(rpc_user: String, rpc_password: String) -> String {
247+
BASE64_STANDARD.encode(format!("{}:{}", rpc_user, rpc_password))
248+
}
249+
250+
pub(crate) fn endpoint(host: String, port: u16) -> HttpEndpoint {
251+
HttpEndpoint::for_host(host.clone()).with_port(port)
252+
}
253+
254+
#[derive(Debug)]
255+
pub struct HttpError {
256+
pub(crate) status_code: String,
257+
pub(crate) contents: Vec<u8>,
258+
}
259+
260+
impl std::error::Error for HttpError {}
261+
262+
impl std::fmt::Display for HttpError {
263+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
264+
let contents = String::from_utf8_lossy(&self.contents);
265+
write!(f, "status_code: {}, contents: {}", self.status_code, contents)
266+
}
267+
}

0 commit comments

Comments
 (0)