Skip to content

Update to LDK 0.1-beta1 #137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
lightning = { version = "0.0.125", features = ["max_level_trace"] }
lightning-block-sync = { version = "0.0.125", features = [ "rpc-client", "tokio" ] }
lightning-invoice = { version = "0.32.0" }
lightning-net-tokio = { version = "0.0.125" }
lightning-persister = { version = "0.0.125" }
lightning-background-processor = { version = "0.0.125", features = [ "futures" ] }
lightning-rapid-gossip-sync = { version = "0.0.125" }
lightning = { version = "0.1.0", features = ["dnssec"] }
lightning-block-sync = { version = "0.1.0", features = [ "rpc-client", "tokio" ] }
lightning-dns-resolver = { version = "0.2.0" }
lightning-invoice = { version = "0.33.0" }
lightning-net-tokio = { version = "0.1.0" }
lightning-persister = { version = "0.1.0" }
lightning-background-processor = { version = "0.1.0", features = [ "futures" ] }
lightning-rapid-gossip-sync = { version = "0.1.0" }

base64 = "0.13.0"
bitcoin = "0.32"
Expand Down
122 changes: 85 additions & 37 deletions src/bitcoind_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ use lightning_block_sync::rpc::RpcClient;
use lightning_block_sync::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource};
use serde_json;
use std::collections::HashMap;
use std::future::Future;
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;

use tokio::runtime::{self, Runtime};

pub struct BitcoindClient {
pub(crate) bitcoind_rpc_client: Arc<RpcClient>,
network: Network,
Expand All @@ -38,7 +41,8 @@ pub struct BitcoindClient {
rpc_user: String,
rpc_password: String,
fees: Arc<HashMap<ConfirmationTarget, AtomicU32>>,
handle: tokio::runtime::Handle,
main_runtime_handle: runtime::Handle,
inner_runtime: Arc<Runtime>,
logger: Arc<FilesystemLogger>,
}

Expand Down Expand Up @@ -66,12 +70,12 @@ const MIN_FEERATE: u32 = 253;
impl BitcoindClient {
pub(crate) async fn new(
host: String, port: u16, rpc_user: String, rpc_password: String, network: Network,
handle: tokio::runtime::Handle, logger: Arc<FilesystemLogger>,
handle: runtime::Handle, logger: Arc<FilesystemLogger>,
) -> std::io::Result<Self> {
let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port);
let rpc_credentials =
base64::encode(format!("{}:{}", rpc_user.clone(), rpc_password.clone()));
let bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint)?;
let bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint);
let _dummy = bitcoind_rpc_client
.call_method::<BlockchainInfo>("getblockchaininfo", &vec![])
.await
Expand All @@ -95,6 +99,15 @@ impl BitcoindClient {
fees.insert(ConfirmationTarget::ChannelCloseMinimum, AtomicU32::new(MIN_FEERATE));
fees.insert(ConfirmationTarget::OutputSpendingFee, AtomicU32::new(MIN_FEERATE));

let mut builder = runtime::Builder::new_multi_thread();
let runtime =
builder.enable_all().worker_threads(1).thread_name("rpc-worker").build().unwrap();
let inner_runtime = Arc::new(runtime);
// Tokio will panic if we drop a runtime while in another runtime. Because the entire
// application runs inside a tokio runtime, we have to ensure this runtime is never
// `drop`'d, which we do by leaking an Arc reference.
std::mem::forget(Arc::clone(&inner_runtime));

let client = Self {
bitcoind_rpc_client: Arc::new(bitcoind_rpc_client),
host,
Expand All @@ -103,7 +116,8 @@ impl BitcoindClient {
rpc_password,
network,
fees: Arc::new(fees),
handle: handle.clone(),
main_runtime_handle: handle.clone(),
inner_runtime,
logger,
};
BitcoindClient::poll_for_fee_estimates(
Expand Down Expand Up @@ -226,10 +240,42 @@ impl BitcoindClient {
});
}

pub fn get_new_rpc_client(&self) -> std::io::Result<RpcClient> {
fn run_future_in_blocking_context<F: Future + Send + 'static>(&self, future: F) -> F::Output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, given this is the sample I don't think we'd want to advise users to employ such hacky manners? Instead, can we just switch to a pattern that avoids block_on altogether?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what alternative we have? I'm not aware of another option.

Copy link
Contributor

@tnull tnull Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what alternative we have? I'm not aware of another option.

I think there are a bunch of options, but one would to create an actor that is driven by a task spawned upon intialization. Then any of these blocking trait calls could communicate with that actor via MPSC channels, for example. This should work and would keep the blocking and async contexts more or less separate, no?
(although we might still run into issues when, depending on the actual callstack, we'd block the current thread, as other tasks might still try to run on the same runtime thread, IIUC. But that seems like an orthogonal issue we can't really resolve until we have proper async support/traits throughout the codebase, AFAICT).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are a bunch of options, but one would to create an actor that is driven by a task spawned upon intialization. Then any of these blocking trait calls could communicate with that actor via MPSC channels, for example. This should work and would keep the blocking and async contexts more or less separate, no?

I don't believe the net-impact of that is any different than what we're already doing by spawning the task on a background task at the time we need it.

But that seems like an orthogonal issue we can't really resolve until we have proper async support/traits throughout the codebase, AFAICT

It looks like the only use of the sync->async inversion in the current ldk-sample codebase is in the anchor spend paths (ChangeDestinationSource+WalletSource) so it may be practical to make that whole thing dual-sync-async upstream pretty easily (with some proc macro, I assume)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the only use of the sync->async inversion in the current ldk-sample codebase is in the anchor spend paths (ChangeDestinationSource+WalletSource) so it may be practical to make that whole thing dual-sync-async upstream pretty easily (with some proc macro, I assume)?

I'm not quite sure this is true, due to ChangeDestinationSource being used in OutputSweeper's spending method which in turn is triggered by Confirm/Listen. So if we make the traits async-optional, we'd also need to propagate it up the chain syncing logic, including all the traits and connected methods, no? But let's probably continue this discussion on lightningdevkit/rust-lightning#3540.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grrrrr

where
F::Output: Send + 'static,
{
// Tokio deliberately makes it nigh impossible to block on a future in a sync context that
// is running in an async task (which makes it really hard to interact with sync code that
// has callbacks in an async project).
//
// Reading the docs, it *seems* like
// `tokio::task::block_in_place(tokio::runtime::Handle::spawn(future))` should do the
// trick, and 99.999% of the time it does! But tokio has a "non-stealable I/O driver" - if
// the task we're running happens to, by sheer luck, be holding the "I/O driver" when we go
// into a `block_in_place` call, and the inner future requires I/O (which of course it
// does, its a future!), the whole thing will come to a grinding halt as no other thread is
// allowed to poll I/O until the blocked one finishes.
//
// This is, of course, nuts, and an almost trivial performance penalty of occasional
// additional wakeups would solve this, but tokio refuses to do so because any performance
// penalty at all would be too much (tokio issue #4730).
//
// Instead, we have to do a rather insane dance - we have to spawn the `future` we want to
// run on a *different* (threaded) tokio runtime (doing the `block_in_place` dance to avoid
// blocking too many threads on the main runtime). We want to block on that `future` being
// run on the other runtime's threads, but tokio only provides `block_on` to do so, which
// runs the `future` itself on the current thread, panicing if this thread is already a
// part of a tokio runtime (which in this case it is - the main tokio runtime). Thus, we
// have to `spawn` the `future` on the secondary runtime and then `block_on` the resulting
// `JoinHandle` on the main runtime.
tokio::task::block_in_place(move || {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, note that this block_in_place will act on whatever runtime context is present here. This might or might not be the main runtime, it also could result being a no-op.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I think all of that is fine? Its generally expected to be the main runtime, but the point is just to tell the runtime "hey, this is gonna take some time, go ahead and make sure there's another thread to run actually-async things". Its just opportunistic, but we don't rely on it - we'll eventually return and the runtime will be happy.

self.main_runtime_handle.block_on(self.inner_runtime.spawn(future)).unwrap()
})
}

pub fn get_new_rpc_client(&self) -> RpcClient {
let http_endpoint = HttpEndpoint::for_host(self.host.clone()).with_port(self.port);
let rpc_credentials =
base64::encode(format!("{}:{}", self.rpc_user.clone(), self.rpc_password.clone()));
let rpc_credentials = base64::encode(format!("{}:{}", self.rpc_user, self.rpc_password));
RpcClient::new(&rpc_credentials, http_endpoint)
}

Expand Down Expand Up @@ -273,22 +319,28 @@ impl BitcoindClient {
.unwrap();
}

pub async fn sign_raw_transaction_with_wallet(&self, tx_hex: String) -> SignedTx {
pub fn sign_raw_transaction_with_wallet(
&self, tx_hex: String,
) -> impl Future<Output = SignedTx> {
let tx_hex_json = serde_json::json!(tx_hex);
self.bitcoind_rpc_client
.call_method("signrawtransactionwithwallet", &vec![tx_hex_json])
.await
.unwrap()
let rpc_client = self.get_new_rpc_client();
async move {
rpc_client
.call_method("signrawtransactionwithwallet", &vec![tx_hex_json])
.await
.unwrap()
}
}

pub async fn get_new_address(&self) -> Address {
pub fn get_new_address(&self) -> impl Future<Output = Address> {
let addr_args = vec![serde_json::json!("LDK output address")];
let addr = self
.bitcoind_rpc_client
.call_method::<NewAddress>("getnewaddress", &addr_args)
.await
.unwrap();
Address::from_str(addr.0.as_str()).unwrap().require_network(self.network).unwrap()
let network = self.network;
let rpc_client = self.get_new_rpc_client();
async move {
let addr =
rpc_client.call_method::<NewAddress>("getnewaddress", &addr_args).await.unwrap();
Address::from_str(addr.0.as_str()).unwrap().require_network(network).unwrap()
}
}

pub async fn get_blockchain_info(&self) -> BlockchainInfo {
Expand All @@ -298,11 +350,11 @@ impl BitcoindClient {
.unwrap()
}

pub async fn list_unspent(&self) -> ListUnspentResponse {
self.bitcoind_rpc_client
.call_method::<ListUnspentResponse>("listunspent", &vec![])
.await
.unwrap()
pub fn list_unspent(&self) -> impl Future<Output = ListUnspentResponse> {
let rpc_client = self.get_new_rpc_client();
async move {
rpc_client.call_method::<ListUnspentResponse>("listunspent", &vec![]).await.unwrap()
}
}
}

Expand All @@ -324,7 +376,7 @@ impl BroadcasterInterface for BitcoindClient {
let txn = txs.iter().map(|tx| encode::serialize_hex(tx)).collect::<Vec<_>>();
let bitcoind_rpc_client = Arc::clone(&self.bitcoind_rpc_client);
let logger = Arc::clone(&self.logger);
self.handle.spawn(async move {
self.main_runtime_handle.spawn(async move {
let res = if txn.len() == 1 {
let tx_json = serde_json::json!(txn[0]);
bitcoind_rpc_client
Expand Down Expand Up @@ -355,17 +407,15 @@ impl BroadcasterInterface for BitcoindClient {

impl ChangeDestinationSource for BitcoindClient {
fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
tokio::task::block_in_place(move || {
Ok(self.handle.block_on(async move { self.get_new_address().await.script_pubkey() }))
})
let future = self.get_new_address();
Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() }))
}
}

impl WalletSource for BitcoindClient {
fn list_confirmed_utxos(&self) -> Result<Vec<Utxo>, ()> {
let utxos = tokio::task::block_in_place(move || {
self.handle.block_on(async move { self.list_unspent().await }).0
});
let future = self.list_unspent();
let utxos = self.run_future_in_blocking_context(async move { future.await.0 });
Ok(utxos
.into_iter()
.filter_map(|utxo| {
Expand Down Expand Up @@ -398,18 +448,16 @@ impl WalletSource for BitcoindClient {
}

fn get_change_script(&self) -> Result<ScriptBuf, ()> {
tokio::task::block_in_place(move || {
Ok(self.handle.block_on(async move { self.get_new_address().await.script_pubkey() }))
})
let future = self.get_new_address();
Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() }))
}

fn sign_psbt(&self, tx: Psbt) -> Result<Transaction, ()> {
let mut tx_bytes = Vec::new();
let _ = tx.unsigned_tx.consensus_encode(&mut tx_bytes).map_err(|_| ());
let tx_hex = hex_utils::hex_str(&tx_bytes);
let signed_tx = tokio::task::block_in_place(move || {
self.handle.block_on(async move { self.sign_raw_transaction_with_wallet(tx_hex).await })
});
let future = self.sign_raw_transaction_with_wallet(tx_hex);
let signed_tx = self.run_future_in_blocking_context(async move { future.await });
let signed_tx_bytes = hex_utils::to_vec(&signed_tx.hex).ok_or(())?;
Transaction::consensus_decode(&mut signed_tx_bytes.as_slice()).map_err(|_| ())
}
Expand Down
Loading
Loading