diff --git a/lightning-liquidity/src/lsps0/client.rs b/lightning-liquidity/src/lsps0/client.rs
index 5ae73005e61..f7e01b323f3 100644
--- a/lightning-liquidity/src/lsps0/client.rs
+++ b/lightning-liquidity/src/lsps0/client.rs
@@ -50,12 +50,14 @@ where
/// specifcation](https://github.com/lightning/blips/blob/master/blip-0050.md#lsps-specification-support-query)
/// for more information.
pub fn list_protocols(&self, counterparty_node_id: &PublicKey) {
+ let mut message_queue_notifier = self.pending_messages.notifier();
+
let msg = LSPS0Message::Request(
utils::generate_request_id(&self.entropy_source),
LSPS0Request::ListProtocols(LSPS0ListProtocolsRequest {}),
);
- self.pending_messages.enqueue(counterparty_node_id, msg.into());
+ message_queue_notifier.enqueue(counterparty_node_id, msg.into());
}
fn handle_response(
diff --git a/lightning-liquidity/src/lsps0/service.rs b/lightning-liquidity/src/lsps0/service.rs
index 4a595ab3d2f..2b4e6782ce8 100644
--- a/lightning-liquidity/src/lsps0/service.rs
+++ b/lightning-liquidity/src/lsps0/service.rs
@@ -40,6 +40,8 @@ impl LSPS0ServiceHandler {
fn handle_request(
&self, request_id: LSPSRequestId, request: LSPS0Request, counterparty_node_id: &PublicKey,
) -> Result<(), lightning::ln::msgs::LightningError> {
+ let mut message_queue_notifier = self.pending_messages.notifier();
+
match request {
LSPS0Request::ListProtocols(_) => {
let msg = LSPS0Message::Response(
@@ -48,7 +50,7 @@ impl LSPS0ServiceHandler {
protocols: self.protocols.clone(),
}),
);
- self.pending_messages.enqueue(counterparty_node_id, msg.into());
+ message_queue_notifier.enqueue(counterparty_node_id, msg.into());
Ok(())
},
}
diff --git a/lightning-liquidity/src/lsps1/client.rs b/lightning-liquidity/src/lsps1/client.rs
index b1b7b6a2493..45008baaa77 100644
--- a/lightning-liquidity/src/lsps1/client.rs
+++ b/lightning-liquidity/src/lsps1/client.rs
@@ -90,6 +90,8 @@ where
///
/// [`SupportedOptionsReady`]: crate::lsps1::event::LSPS1ClientEvent::SupportedOptionsReady
pub fn request_supported_options(&self, counterparty_node_id: PublicKey) -> LSPSRequestId {
+ let mut message_queue_notifier = self.pending_messages.notifier();
+
let request_id = crate::utils::generate_request_id(&self.entropy_source);
{
let mut outer_state_lock = self.per_peer_state.write().unwrap();
@@ -102,7 +104,7 @@ where
let request = LSPS1Request::GetInfo(LSPS1GetInfoRequest {});
let msg = LSPS1Message::Request(request_id.clone(), request).into();
- self.pending_messages.enqueue(&counterparty_node_id, msg);
+ message_queue_notifier.enqueue(&counterparty_node_id, msg);
request_id
}
@@ -198,27 +200,21 @@ where
&self, counterparty_node_id: &PublicKey, order: LSPS1OrderParams,
refund_onchain_address: Option
,
) -> LSPSRequestId {
- let (request_id, request_msg) = {
- let mut outer_state_lock = self.per_peer_state.write().unwrap();
- let inner_state_lock = outer_state_lock
- .entry(*counterparty_node_id)
- .or_insert(Mutex::new(PeerState::default()));
- let mut peer_state_lock = inner_state_lock.lock().unwrap();
+ let mut message_queue_notifier = self.pending_messages.notifier();
- let request_id = crate::utils::generate_request_id(&self.entropy_source);
- let request = LSPS1Request::CreateOrder(LSPS1CreateOrderRequest {
- order,
- refund_onchain_address,
- });
- let msg = LSPS1Message::Request(request_id.clone(), request).into();
- peer_state_lock.pending_create_order_requests.insert(request_id.clone());
+ let mut outer_state_lock = self.per_peer_state.write().unwrap();
+ let inner_state_lock = outer_state_lock
+ .entry(*counterparty_node_id)
+ .or_insert(Mutex::new(PeerState::default()));
+ let mut peer_state_lock = inner_state_lock.lock().unwrap();
- (request_id, Some(msg))
- };
+ let request_id = crate::utils::generate_request_id(&self.entropy_source);
+ let request =
+ LSPS1Request::CreateOrder(LSPS1CreateOrderRequest { order, refund_onchain_address });
+ let msg = LSPS1Message::Request(request_id.clone(), request).into();
+ peer_state_lock.pending_create_order_requests.insert(request_id.clone());
- if let Some(msg) = request_msg {
- self.pending_messages.enqueue(&counterparty_node_id, msg);
- }
+ message_queue_notifier.enqueue(&counterparty_node_id, msg);
request_id
}
@@ -322,26 +318,21 @@ where
pub fn check_order_status(
&self, counterparty_node_id: &PublicKey, order_id: LSPS1OrderId,
) -> LSPSRequestId {
- let (request_id, request_msg) = {
- let mut outer_state_lock = self.per_peer_state.write().unwrap();
- let inner_state_lock = outer_state_lock
- .entry(*counterparty_node_id)
- .or_insert(Mutex::new(PeerState::default()));
- let mut peer_state_lock = inner_state_lock.lock().unwrap();
+ let mut message_queue_notifier = self.pending_messages.notifier();
- let request_id = crate::utils::generate_request_id(&self.entropy_source);
- peer_state_lock.pending_get_order_requests.insert(request_id.clone());
+ let mut outer_state_lock = self.per_peer_state.write().unwrap();
+ let inner_state_lock = outer_state_lock
+ .entry(*counterparty_node_id)
+ .or_insert(Mutex::new(PeerState::default()));
+ let mut peer_state_lock = inner_state_lock.lock().unwrap();
- let request =
- LSPS1Request::GetOrder(LSPS1GetOrderRequest { order_id: order_id.clone() });
- let msg = LSPS1Message::Request(request_id.clone(), request).into();
+ let request_id = crate::utils::generate_request_id(&self.entropy_source);
+ peer_state_lock.pending_get_order_requests.insert(request_id.clone());
- (request_id, Some(msg))
- };
+ let request = LSPS1Request::GetOrder(LSPS1GetOrderRequest { order_id: order_id.clone() });
+ let msg = LSPS1Message::Request(request_id.clone(), request).into();
- if let Some(msg) = request_msg {
- self.pending_messages.enqueue(&counterparty_node_id, msg);
- }
+ message_queue_notifier.enqueue(&counterparty_node_id, msg);
request_id
}
diff --git a/lightning-liquidity/src/lsps1/service.rs b/lightning-liquidity/src/lsps1/service.rs
index 28fe72ca905..4dadf2e03dc 100644
--- a/lightning-liquidity/src/lsps1/service.rs
+++ b/lightning-liquidity/src/lsps1/service.rs
@@ -177,6 +177,8 @@ where
fn handle_get_info_request(
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
) -> Result<(), LightningError> {
+ let mut message_queue_notifier = self.pending_messages.notifier();
+
let response = LSPS1Response::GetInfo(LSPS1GetInfoResponse {
options: self
.config
@@ -190,7 +192,7 @@ where
});
let msg = LSPS1Message::Response(request_id, response).into();
- self.pending_messages.enqueue(counterparty_node_id, msg);
+ message_queue_notifier.enqueue(counterparty_node_id, msg);
Ok(())
}
@@ -198,7 +200,9 @@ where
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
params: LSPS1CreateOrderRequest,
) -> Result<(), LightningError> {
+ let mut message_queue_notifier = self.pending_messages.notifier();
let event_queue_notifier = self.pending_events.notifier();
+
if !is_valid(¶ms.order, &self.config.supported_options.as_ref().unwrap()) {
let response = LSPS1Response::CreateOrderError(LSPSResponseError {
code: LSPS1_CREATE_ORDER_REQUEST_ORDER_MISMATCH_ERROR_CODE,
@@ -209,7 +213,7 @@ where
)),
});
let msg = LSPS1Message::Response(request_id, response).into();
- self.pending_messages.enqueue(counterparty_node_id, msg);
+ message_queue_notifier.enqueue(counterparty_node_id, msg);
return Err(LightningError {
err: format!(
"Client order does not match any supported options: {:?}",
@@ -250,66 +254,47 @@ where
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
payment: LSPS1PaymentInfo, created_at: LSPSDateTime,
) -> Result<(), APIError> {
- let (result, response) = {
- let outer_state_lock = self.per_peer_state.read().unwrap();
-
- match outer_state_lock.get(counterparty_node_id) {
- Some(inner_state_lock) => {
- let mut peer_state_lock = inner_state_lock.lock().unwrap();
-
- match peer_state_lock.pending_requests.remove(&request_id) {
- Some(LSPS1Request::CreateOrder(params)) => {
- let order_id = self.generate_order_id();
- let channel = OutboundCRChannel::new(
- params.order.clone(),
- created_at.clone(),
- order_id.clone(),
- payment.clone(),
- );
-
- peer_state_lock.insert_outbound_channel(order_id.clone(), channel);
-
- let response = LSPS1Response::CreateOrder(LSPS1CreateOrderResponse {
- order: params.order,
- order_id,
- order_state: LSPS1OrderState::Created,
- created_at,
- payment,
- channel: None,
- });
-
- (Ok(()), Some(response))
- },
-
- _ => (
- Err(APIError::APIMisuseError {
- err: format!(
- "No pending buy request for request_id: {:?}",
- request_id
- ),
- }),
- None,
- ),
- }
- },
- None => (
- Err(APIError::APIMisuseError {
- err: format!(
- "No state for the counterparty exists: {:?}",
- counterparty_node_id
- ),
- }),
- None,
- ),
- }
- };
+ let mut message_queue_notifier = self.pending_messages.notifier();
- if let Some(response) = response {
- let msg = LSPS1Message::Response(request_id, response).into();
- self.pending_messages.enqueue(counterparty_node_id, msg);
- }
+ let outer_state_lock = self.per_peer_state.read().unwrap();
+ match outer_state_lock.get(counterparty_node_id) {
+ Some(inner_state_lock) => {
+ let mut peer_state_lock = inner_state_lock.lock().unwrap();
+
+ match peer_state_lock.pending_requests.remove(&request_id) {
+ Some(LSPS1Request::CreateOrder(params)) => {
+ let order_id = self.generate_order_id();
+ let channel = OutboundCRChannel::new(
+ params.order.clone(),
+ created_at.clone(),
+ order_id.clone(),
+ payment.clone(),
+ );
+
+ peer_state_lock.insert_outbound_channel(order_id.clone(), channel);
+
+ let response = LSPS1Response::CreateOrder(LSPS1CreateOrderResponse {
+ order: params.order,
+ order_id,
+ order_state: LSPS1OrderState::Created,
+ created_at,
+ payment,
+ channel: None,
+ });
+ let msg = LSPS1Message::Response(request_id, response).into();
+ message_queue_notifier.enqueue(counterparty_node_id, msg);
+ Ok(())
+ },
- result
+ _ => Err(APIError::APIMisuseError {
+ err: format!("No pending buy request for request_id: {:?}", request_id),
+ }),
+ }
+ },
+ None => Err(APIError::APIMisuseError {
+ err: format!("No state for the counterparty exists: {:?}", counterparty_node_id),
+ }),
+ }
}
fn handle_get_order_request(
@@ -376,54 +361,40 @@ where
&self, request_id: LSPSRequestId, counterparty_node_id: PublicKey, order_id: LSPS1OrderId,
order_state: LSPS1OrderState, channel: Option,
) -> Result<(), APIError> {
- let (result, response) = {
- let outer_state_lock = self.per_peer_state.read().unwrap();
+ let mut message_queue_notifier = self.pending_messages.notifier();
- match outer_state_lock.get(&counterparty_node_id) {
- Some(inner_state_lock) => {
- let mut peer_state_lock = inner_state_lock.lock().unwrap();
+ let outer_state_lock = self.per_peer_state.read().unwrap();
- if let Some(outbound_channel) =
- peer_state_lock.outbound_channels_by_order_id.get_mut(&order_id)
- {
- let config = &outbound_channel.config;
+ match outer_state_lock.get(&counterparty_node_id) {
+ Some(inner_state_lock) => {
+ let mut peer_state_lock = inner_state_lock.lock().unwrap();
- let response = LSPS1Response::GetOrder(LSPS1CreateOrderResponse {
- order_id,
- order: config.order.clone(),
- order_state,
- created_at: config.created_at.clone(),
- payment: config.payment.clone(),
- channel,
- });
- (Ok(()), Some(response))
- } else {
- (
- Err(APIError::APIMisuseError {
- err: format!("Channel with order_id {} not found", order_id.0),
- }),
- None,
- )
- }
- },
- None => (
+ if let Some(outbound_channel) =
+ peer_state_lock.outbound_channels_by_order_id.get_mut(&order_id)
+ {
+ let config = &outbound_channel.config;
+
+ let response = LSPS1Response::GetOrder(LSPS1CreateOrderResponse {
+ order_id,
+ order: config.order.clone(),
+ order_state,
+ created_at: config.created_at.clone(),
+ payment: config.payment.clone(),
+ channel,
+ });
+ let msg = LSPS1Message::Response(request_id, response).into();
+ message_queue_notifier.enqueue(&counterparty_node_id, msg);
+ Ok(())
+ } else {
Err(APIError::APIMisuseError {
- err: format!(
- "No existing state with counterparty {}",
- counterparty_node_id
- ),
- }),
- None,
- ),
- }
- };
-
- if let Some(response) = response {
- let msg = LSPS1Message::Response(request_id, response).into();
- self.pending_messages.enqueue(&counterparty_node_id, msg);
+ err: format!("Channel with order_id {} not found", order_id.0),
+ })
+ }
+ },
+ None => Err(APIError::APIMisuseError {
+ err: format!("No existing state with counterparty {}", counterparty_node_id),
+ }),
}
-
- result
}
fn generate_order_id(&self) -> LSPS1OrderId {
diff --git a/lightning-liquidity/src/lsps2/client.rs b/lightning-liquidity/src/lsps2/client.rs
index bbe313d6089..fa08093108b 100644
--- a/lightning-liquidity/src/lsps2/client.rs
+++ b/lightning-liquidity/src/lsps2/client.rs
@@ -118,6 +118,8 @@ where
pub fn request_opening_params(
&self, counterparty_node_id: PublicKey, token: Option,
) -> LSPSRequestId {
+ let mut message_queue_notifier = self.pending_messages.notifier();
+
let request_id = crate::utils::generate_request_id(&self.entropy_source);
{
@@ -131,7 +133,7 @@ where
let request = LSPS2Request::GetInfo(LSPS2GetInfoRequest { token });
let msg = LSPS2Message::Request(request_id.clone(), request).into();
- self.pending_messages.enqueue(&counterparty_node_id, msg);
+ message_queue_notifier.enqueue(&counterparty_node_id, msg);
request_id
}
@@ -160,6 +162,8 @@ where
&self, counterparty_node_id: PublicKey, payment_size_msat: Option,
opening_fee_params: LSPS2OpeningFeeParams,
) -> Result {
+ let mut message_queue_notifier = self.pending_messages.notifier();
+
let request_id = crate::utils::generate_request_id(&self.entropy_source);
{
@@ -184,7 +188,7 @@ where
let request = LSPS2Request::Buy(LSPS2BuyRequest { opening_fee_params, payment_size_msat });
let msg = LSPS2Message::Request(request_id.clone(), request).into();
- self.pending_messages.enqueue(&counterparty_node_id, msg);
+ message_queue_notifier.enqueue(&counterparty_node_id, msg);
Ok(request_id)
}
diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs
index e1666b1d352..309d7ae1755 100644
--- a/lightning-liquidity/src/lsps2/service.rs
+++ b/lightning-liquidity/src/lsps2/service.rs
@@ -27,7 +27,7 @@ use crate::lsps2::payment_queue::{InterceptedHTLC, PaymentQueue};
use crate::lsps2::utils::{
compute_opening_fee, is_expired_opening_fee_params, is_valid_opening_fee_params,
};
-use crate::message_queue::MessageQueue;
+use crate::message_queue::{MessageQueue, MessageQueueNotifierGuard};
use crate::prelude::hash_map::Entry;
use crate::prelude::{new_hash_map, HashMap};
use crate::sync::{Arc, Mutex, MutexGuard, RwLock};
@@ -499,7 +499,7 @@ impl PeerState {
}
macro_rules! get_or_insert_peer_state_entry {
- ($self: ident, $outer_state_lock: expr, $counterparty_node_id: expr) => {{
+ ($self: ident, $outer_state_lock: expr, $message_queue_notifier: expr, $counterparty_node_id: expr) => {{
// Return an internal error and abort if we hit the maximum allowed number of total peers.
let is_limited_by_max_total_peers = $outer_state_lock.len() >= MAX_TOTAL_PEERS;
match $outer_state_lock.entry(*$counterparty_node_id) {
@@ -511,8 +511,7 @@ macro_rules! get_or_insert_peer_state_entry {
};
let msg = LSPSMessage::Invalid(error_response);
- drop($outer_state_lock);
- $self.pending_messages.enqueue($counterparty_node_id, msg);
+ $message_queue_notifier.enqueue($counterparty_node_id, msg);
let err = format!(
"Dropping request from peer {} due to reaching maximally allowed number of total peers: {}",
@@ -581,51 +580,37 @@ where
pub fn invalid_token_provided(
&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
) -> Result<(), APIError> {
- let (result, response) = {
- let outer_state_lock = self.per_peer_state.read().unwrap();
+ let mut message_queue_notifier = self.pending_messages.notifier();
- match outer_state_lock.get(counterparty_node_id) {
- Some(inner_state_lock) => {
- let mut peer_state_lock = inner_state_lock.lock().unwrap();
-
- match self.remove_pending_request(&mut peer_state_lock, &request_id) {
- Some(LSPS2Request::GetInfo(_)) => {
- let response = LSPS2Response::GetInfoError(LSPSResponseError {
- code: LSPS2_GET_INFO_REQUEST_UNRECOGNIZED_OR_STALE_TOKEN_ERROR_CODE,
- message: "an unrecognized or stale token was provided".to_string(),
- data: None,
- });
- (Ok(()), Some(response))
- },
- _ => (
- Err(APIError::APIMisuseError {
- err: format!(
- "No pending get_info request for request_id: {:?}",
- request_id
- ),
- }),
- None,
- ),
- }
- },
- None => (
- Err(APIError::APIMisuseError {
+ let outer_state_lock = self.per_peer_state.read().unwrap();
+
+ match outer_state_lock.get(counterparty_node_id) {
+ Some(inner_state_lock) => {
+ let mut peer_state_lock = inner_state_lock.lock().unwrap();
+
+ match self.remove_pending_request(&mut peer_state_lock, &request_id) {
+ Some(LSPS2Request::GetInfo(_)) => {
+ let response = LSPS2Response::GetInfoError(LSPSResponseError {
+ code: LSPS2_GET_INFO_REQUEST_UNRECOGNIZED_OR_STALE_TOKEN_ERROR_CODE,
+ message: "an unrecognized or stale token was provided".to_string(),
+ data: None,
+ });
+ let msg = LSPS2Message::Response(request_id, response).into();
+ message_queue_notifier.enqueue(counterparty_node_id, msg);
+ Ok(())
+ },
+ _ => Err(APIError::APIMisuseError {
err: format!(
- "No state for the counterparty exists: {:?}",
- counterparty_node_id
+ "No pending get_info request for request_id: {:?}",
+ request_id
),
}),
- None,
- ),
- }
- };
-
- if let Some(response) = response {
- let msg = LSPS2Message::Response(request_id, response).into();
- self.pending_messages.enqueue(counterparty_node_id, msg);
+ }
+ },
+ None => Err(APIError::APIMisuseError {
+ err: format!("No state for the counterparty exists: {:?}", counterparty_node_id),
+ }),
}
-
- result
}
/// Used by LSP to provide fee parameters to a client requesting a JIT Channel.
@@ -637,62 +622,48 @@ where
&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
opening_fee_params_menu: Vec,
) -> Result<(), APIError> {
- let (result, response) = {
- let outer_state_lock = self.per_peer_state.read().unwrap();
+ let mut message_queue_notifier = self.pending_messages.notifier();
- match outer_state_lock.get(counterparty_node_id) {
- Some(inner_state_lock) => {
- let mut peer_state_lock = inner_state_lock.lock().unwrap();
-
- match self.remove_pending_request(&mut peer_state_lock, &request_id) {
- Some(LSPS2Request::GetInfo(_)) => {
- let mut opening_fee_params_menu: Vec =
- opening_fee_params_menu
- .into_iter()
- .map(|param| {
- param.into_opening_fee_params(&self.config.promise_secret)
- })
- .collect();
- opening_fee_params_menu.sort_by(|a, b| {
- match a.min_fee_msat.cmp(&b.min_fee_msat) {
- CmpOrdering::Equal => a.proportional.cmp(&b.proportional),
- other => other,
- }
- });
- let response = LSPS2Response::GetInfo(LSPS2GetInfoResponse {
- opening_fee_params_menu,
- });
- (Ok(()), Some(response))
- },
- _ => (
- Err(APIError::APIMisuseError {
- err: format!(
- "No pending get_info request for request_id: {:?}",
- request_id
- ),
- }),
- None,
- ),
- }
- },
- None => (
- Err(APIError::APIMisuseError {
+ let outer_state_lock = self.per_peer_state.read().unwrap();
+
+ match outer_state_lock.get(counterparty_node_id) {
+ Some(inner_state_lock) => {
+ let mut peer_state_lock = inner_state_lock.lock().unwrap();
+
+ match self.remove_pending_request(&mut peer_state_lock, &request_id) {
+ Some(LSPS2Request::GetInfo(_)) => {
+ let mut opening_fee_params_menu: Vec =
+ opening_fee_params_menu
+ .into_iter()
+ .map(|param| {
+ param.into_opening_fee_params(&self.config.promise_secret)
+ })
+ .collect();
+ opening_fee_params_menu.sort_by(|a, b| {
+ match a.min_fee_msat.cmp(&b.min_fee_msat) {
+ CmpOrdering::Equal => a.proportional.cmp(&b.proportional),
+ other => other,
+ }
+ });
+ let response = LSPS2Response::GetInfo(LSPS2GetInfoResponse {
+ opening_fee_params_menu,
+ });
+ let msg = LSPS2Message::Response(request_id, response).into();
+ message_queue_notifier.enqueue(counterparty_node_id, msg);
+ Ok(())
+ },
+ _ => Err(APIError::APIMisuseError {
err: format!(
- "No state for the counterparty exists: {:?}",
- counterparty_node_id
+ "No pending get_info request for request_id: {:?}",
+ request_id
),
}),
- None,
- ),
- }
- };
-
- if let Some(response) = response {
- let msg = LSPS2Message::Response(request_id, response).into();
- self.pending_messages.enqueue(counterparty_node_id, msg);
+ }
+ },
+ None => Err(APIError::APIMisuseError {
+ err: format!("No state for the counterparty exists: {:?}", counterparty_node_id),
+ }),
}
-
- result
}
/// Used by LSP to provide the client with the intercept scid and
@@ -707,70 +678,52 @@ where
&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, intercept_scid: u64,
cltv_expiry_delta: u32, client_trusts_lsp: bool, user_channel_id: u128,
) -> Result<(), APIError> {
- let (result, response) = {
- let outer_state_lock = self.per_peer_state.read().unwrap();
+ let mut message_queue_notifier = self.pending_messages.notifier();
- match outer_state_lock.get(counterparty_node_id) {
- Some(inner_state_lock) => {
- let mut peer_state_lock = inner_state_lock.lock().unwrap();
+ let outer_state_lock = self.per_peer_state.read().unwrap();
- match self.remove_pending_request(&mut peer_state_lock, &request_id) {
- Some(LSPS2Request::Buy(buy_request)) => {
- {
- let mut peer_by_intercept_scid =
- self.peer_by_intercept_scid.write().unwrap();
- peer_by_intercept_scid
- .insert(intercept_scid, *counterparty_node_id);
- }
+ match outer_state_lock.get(counterparty_node_id) {
+ Some(inner_state_lock) => {
+ let mut peer_state_lock = inner_state_lock.lock().unwrap();
- let outbound_jit_channel = OutboundJITChannel::new(
- buy_request.payment_size_msat,
- buy_request.opening_fee_params,
- user_channel_id,
- );
-
- peer_state_lock
- .intercept_scid_by_user_channel_id
- .insert(user_channel_id, intercept_scid);
- peer_state_lock
- .insert_outbound_channel(intercept_scid, outbound_jit_channel);
-
- let response = LSPS2Response::Buy(LSPS2BuyResponse {
- jit_channel_scid: intercept_scid.into(),
- lsp_cltv_expiry_delta: cltv_expiry_delta,
- client_trusts_lsp,
- });
- (Ok(()), Some(response))
- },
- _ => (
- Err(APIError::APIMisuseError {
- err: format!(
- "No pending buy request for request_id: {:?}",
- request_id
- ),
- }),
- None,
- ),
- }
- },
- None => (
- Err(APIError::APIMisuseError {
- err: format!(
- "No state for the counterparty exists: {:?}",
- counterparty_node_id
- ),
- }),
- None,
- ),
- }
- };
+ match self.remove_pending_request(&mut peer_state_lock, &request_id) {
+ Some(LSPS2Request::Buy(buy_request)) => {
+ {
+ let mut peer_by_intercept_scid =
+ self.peer_by_intercept_scid.write().unwrap();
+ peer_by_intercept_scid.insert(intercept_scid, *counterparty_node_id);
+ }
- if let Some(response) = response {
- let msg = LSPS2Message::Response(request_id, response).into();
- self.pending_messages.enqueue(counterparty_node_id, msg);
+ let outbound_jit_channel = OutboundJITChannel::new(
+ buy_request.payment_size_msat,
+ buy_request.opening_fee_params,
+ user_channel_id,
+ );
+
+ peer_state_lock
+ .intercept_scid_by_user_channel_id
+ .insert(user_channel_id, intercept_scid);
+ peer_state_lock
+ .insert_outbound_channel(intercept_scid, outbound_jit_channel);
+
+ let response = LSPS2Response::Buy(LSPS2BuyResponse {
+ jit_channel_scid: intercept_scid.into(),
+ lsp_cltv_expiry_delta: cltv_expiry_delta,
+ client_trusts_lsp,
+ });
+ let msg = LSPS2Message::Response(request_id, response).into();
+ message_queue_notifier.enqueue(counterparty_node_id, msg);
+ Ok(())
+ },
+ _ => Err(APIError::APIMisuseError {
+ err: format!("No pending buy request for request_id: {:?}", request_id),
+ }),
+ }
+ },
+ None => Err(APIError::APIMisuseError {
+ err: format!("No state for the counterparty exists: {:?}", counterparty_node_id),
+ }),
}
-
- result
}
/// Forward [`Event::HTLCIntercepted`] event parameters into this function.
@@ -1202,42 +1155,40 @@ where
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
params: LSPS2GetInfoRequest,
) -> Result<(), LightningError> {
+ let mut message_queue_notifier = self.pending_messages.notifier();
let event_queue_notifier = self.pending_events.notifier();
- let (result, response) = {
- let mut outer_state_lock = self.per_peer_state.write().unwrap();
- let inner_state_lock =
- get_or_insert_peer_state_entry!(self, outer_state_lock, counterparty_node_id);
- let mut peer_state_lock = inner_state_lock.lock().unwrap();
- let request = LSPS2Request::GetInfo(params.clone());
- match self.insert_pending_request(
- &mut peer_state_lock,
- request_id.clone(),
- *counterparty_node_id,
- request,
- ) {
- (Ok(()), msg) => {
- let event = LSPS2ServiceEvent::GetInfo {
- request_id,
- counterparty_node_id: *counterparty_node_id,
- token: params.token,
- };
- event_queue_notifier.enqueue(event);
- (Ok(()), msg)
- },
- (e, msg) => (e, msg),
- }
- };
- if let Some(msg) = response {
- self.pending_messages.enqueue(counterparty_node_id, msg);
- }
+ let mut outer_state_lock = self.per_peer_state.write().unwrap();
+ let inner_state_lock = get_or_insert_peer_state_entry!(
+ self,
+ outer_state_lock,
+ message_queue_notifier,
+ counterparty_node_id
+ );
+ let mut peer_state_lock = inner_state_lock.lock().unwrap();
+ let request = LSPS2Request::GetInfo(params.clone());
+ self.insert_pending_request(
+ &mut peer_state_lock,
+ &mut message_queue_notifier,
+ request_id.clone(),
+ *counterparty_node_id,
+ request,
+ )?;
+
+ let event = LSPS2ServiceEvent::GetInfo {
+ request_id,
+ counterparty_node_id: *counterparty_node_id,
+ token: params.token,
+ };
+ event_queue_notifier.enqueue(event);
- result
+ Ok(())
}
fn handle_buy_request(
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, params: LSPS2BuyRequest,
) -> Result<(), LightningError> {
+ let mut message_queue_notifier = self.pending_messages.notifier();
let event_queue_notifier = self.pending_events.notifier();
if let Some(payment_size_msat) = params.payment_size_msat {
if payment_size_msat < params.opening_fee_params.min_payment_size_msat {
@@ -1247,7 +1198,7 @@ where
data: None,
});
let msg = LSPS2Message::Response(request_id, response).into();
- self.pending_messages.enqueue(counterparty_node_id, msg);
+ message_queue_notifier.enqueue(counterparty_node_id, msg);
return Err(LightningError {
err: "payment size is below our minimum supported payment size".to_string(),
@@ -1262,7 +1213,7 @@ where
data: None,
});
let msg = LSPS2Message::Response(request_id, response).into();
- self.pending_messages.enqueue(counterparty_node_id, msg);
+ message_queue_notifier.enqueue(counterparty_node_id, msg);
return Err(LightningError {
err: "payment size is above our maximum supported payment size".to_string(),
action: ErrorAction::IgnoreAndLog(Level::Info),
@@ -1283,7 +1234,7 @@ where
data: None,
});
let msg = LSPS2Message::Response(request_id, response).into();
- self.pending_messages.enqueue(counterparty_node_id, msg);
+ message_queue_notifier.enqueue(counterparty_node_id, msg);
return Err(LightningError {
err: "payment size is too small to cover the opening fee".to_string(),
action: ErrorAction::IgnoreAndLog(Level::Info),
@@ -1297,7 +1248,7 @@ where
data: None,
});
let msg = LSPS2Message::Response(request_id, response).into();
- self.pending_messages.enqueue(counterparty_node_id, msg);
+ message_queue_notifier.enqueue(counterparty_node_id, msg);
return Err(LightningError {
err: "overflow error when calculating opening_fee".to_string(),
action: ErrorAction::IgnoreAndLog(Level::Info),
@@ -1314,90 +1265,90 @@ where
data: None,
});
let msg = LSPS2Message::Response(request_id, response).into();
- self.pending_messages.enqueue(counterparty_node_id, msg);
+ message_queue_notifier.enqueue(counterparty_node_id, msg);
return Err(LightningError {
err: "invalid opening fee parameters were supplied by client".to_string(),
action: ErrorAction::IgnoreAndLog(Level::Info),
});
}
- let (result, response) = {
- let mut outer_state_lock = self.per_peer_state.write().unwrap();
- let inner_state_lock =
- get_or_insert_peer_state_entry!(self, outer_state_lock, counterparty_node_id);
- let mut peer_state_lock = inner_state_lock.lock().unwrap();
-
- let request = LSPS2Request::Buy(params.clone());
- match self.insert_pending_request(
- &mut peer_state_lock,
- request_id.clone(),
- *counterparty_node_id,
- request,
- ) {
- (Ok(()), msg) => {
- let event = LSPS2ServiceEvent::BuyRequest {
- request_id,
- counterparty_node_id: *counterparty_node_id,
- opening_fee_params: params.opening_fee_params,
- payment_size_msat: params.payment_size_msat,
- };
- event_queue_notifier.enqueue(event);
-
- (Ok(()), msg)
- },
- (e, msg) => (e, msg),
- }
+ let mut outer_state_lock = self.per_peer_state.write().unwrap();
+ let inner_state_lock = get_or_insert_peer_state_entry!(
+ self,
+ outer_state_lock,
+ message_queue_notifier,
+ counterparty_node_id
+ );
+ let mut peer_state_lock = inner_state_lock.lock().unwrap();
+
+ let request = LSPS2Request::Buy(params.clone());
+
+ self.insert_pending_request(
+ &mut peer_state_lock,
+ &mut message_queue_notifier,
+ request_id.clone(),
+ *counterparty_node_id,
+ request,
+ )?;
+
+ let event = LSPS2ServiceEvent::BuyRequest {
+ request_id,
+ counterparty_node_id: *counterparty_node_id,
+ opening_fee_params: params.opening_fee_params,
+ payment_size_msat: params.payment_size_msat,
};
+ event_queue_notifier.enqueue(event);
- if let Some(msg) = response {
- self.pending_messages.enqueue(counterparty_node_id, msg);
- }
-
- result
+ Ok(())
}
fn insert_pending_request<'a>(
- &self, peer_state_lock: &mut MutexGuard<'a, PeerState>, request_id: LSPSRequestId,
+ &self, peer_state_lock: &mut MutexGuard<'a, PeerState>,
+ message_queue_notifier: &mut MessageQueueNotifierGuard, request_id: LSPSRequestId,
counterparty_node_id: PublicKey, request: LSPS2Request,
- ) -> (Result<(), LightningError>, Option) {
- let create_pending_request_limit_exceeded_response = |error_message: String| {
- let error_details = LSPSResponseError {
- code: LSPS0_CLIENT_REJECTED_ERROR_CODE,
- message: "Reached maximum number of pending requests. Please try again later."
- .to_string(),
- data: None,
- };
- let response = match &request {
- LSPS2Request::GetInfo(_) => LSPS2Response::GetInfoError(error_details),
- LSPS2Request::Buy(_) => LSPS2Response::BuyError(error_details),
- };
- let msg = Some(LSPS2Message::Response(request_id.clone(), response).into());
+ ) -> Result<(), LightningError> {
+ let create_pending_request_limit_exceeded_response =
+ |message_queue_notifier: &mut MessageQueueNotifierGuard, error_message: String| {
+ let error_details = LSPSResponseError {
+ code: LSPS0_CLIENT_REJECTED_ERROR_CODE,
+ message: "Reached maximum number of pending requests. Please try again later."
+ .to_string(),
+ data: None,
+ };
+ let response = match &request {
+ LSPS2Request::GetInfo(_) => LSPS2Response::GetInfoError(error_details),
+ LSPS2Request::Buy(_) => LSPS2Response::BuyError(error_details),
+ };
+ let msg = LSPS2Message::Response(request_id.clone(), response).into();
+ message_queue_notifier.enqueue(&counterparty_node_id, msg);
- let result = Err(LightningError {
- err: error_message,
- action: ErrorAction::IgnoreAndLog(Level::Debug),
- });
- (result, msg)
- };
+ Err(LightningError {
+ err: error_message,
+ action: ErrorAction::IgnoreAndLog(Level::Debug),
+ })
+ };
if self.total_pending_requests.load(Ordering::Relaxed) >= MAX_TOTAL_PENDING_REQUESTS {
let error_message = format!(
"Reached maximum number of total pending requests: {}",
MAX_TOTAL_PENDING_REQUESTS
);
- return create_pending_request_limit_exceeded_response(error_message);
+ return create_pending_request_limit_exceeded_response(
+ message_queue_notifier,
+ error_message,
+ );
}
if peer_state_lock.pending_requests_and_channels() < MAX_PENDING_REQUESTS_PER_PEER {
peer_state_lock.pending_requests.insert(request_id, request);
self.total_pending_requests.fetch_add(1, Ordering::Relaxed);
- (Ok(()), None)
+ Ok(())
} else {
let error_message = format!(
"Peer {} reached maximum number of pending requests: {}",
counterparty_node_id, MAX_PENDING_REQUESTS_PER_PEER
);
- create_pending_request_limit_exceeded_response(error_message)
+ create_pending_request_limit_exceeded_response(message_queue_notifier, error_message)
}
}
diff --git a/lightning-liquidity/src/lsps5/client.rs b/lightning-liquidity/src/lsps5/client.rs
index 50b2c85ce1d..bcad453d7ad 100644
--- a/lightning-liquidity/src/lsps5/client.rs
+++ b/lightning-liquidity/src/lsps5/client.rs
@@ -205,6 +205,7 @@ where
pub fn set_webhook(
&self, counterparty_node_id: PublicKey, app_name: String, webhook_url: String,
) -> Result {
+ let mut message_queue_notifier = self.pending_messages.notifier();
let app_name = LSPS5AppName::from_string(app_name)?;
let lsps_webhook_url = LSPS5WebhookUrl::from_string(webhook_url)?;
@@ -228,7 +229,7 @@ where
LSPS5Request::SetWebhook(SetWebhookRequest { app_name, webhook: lsps_webhook_url });
let message = LSPS5Message::Request(request_id.clone(), request);
- self.pending_messages.enqueue(&counterparty_node_id, LSPSMessage::LSPS5(message));
+ message_queue_notifier.enqueue(&counterparty_node_id, LSPSMessage::LSPS5(message));
Ok(request_id)
}
@@ -250,6 +251,7 @@ where
/// [`WebhooksListed`]: super::event::LSPS5ClientEvent::WebhooksListed
/// [`LSPS5Response::ListWebhooks`]: super::msgs::LSPS5Response::ListWebhooks
pub fn list_webhooks(&self, counterparty_node_id: PublicKey) -> LSPSRequestId {
+ let mut message_queue_notifier = self.pending_messages.notifier();
let request_id = generate_request_id(&self.entropy_source);
let now =
LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
@@ -260,7 +262,7 @@ where
let request = LSPS5Request::ListWebhooks(ListWebhooksRequest {});
let message = LSPS5Message::Request(request_id.clone(), request);
- self.pending_messages.enqueue(&counterparty_node_id, LSPSMessage::LSPS5(message));
+ message_queue_notifier.enqueue(&counterparty_node_id, LSPSMessage::LSPS5(message));
request_id
}
@@ -287,6 +289,7 @@ where
pub fn remove_webhook(
&self, counterparty_node_id: PublicKey, app_name: String,
) -> Result {
+ let mut message_queue_notifier = self.pending_messages.notifier();
let app_name = LSPS5AppName::from_string(app_name)?;
let request_id = generate_request_id(&self.entropy_source);
@@ -301,7 +304,7 @@ where
let request = LSPS5Request::RemoveWebhook(RemoveWebhookRequest { app_name });
let message = LSPS5Message::Request(request_id.clone(), request);
- self.pending_messages.enqueue(&counterparty_node_id, LSPSMessage::LSPS5(message));
+ message_queue_notifier.enqueue(&counterparty_node_id, LSPSMessage::LSPS5(message));
Ok(request_id)
}
diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs
index 984dd5d0575..97fde364ba1 100644
--- a/lightning-liquidity/src/lsps5/service.rs
+++ b/lightning-liquidity/src/lsps5/service.rs
@@ -158,6 +158,8 @@ where
&self, counterparty_node_id: PublicKey, request_id: LSPSRequestId,
params: SetWebhookRequest,
) -> Result<(), LightningError> {
+ let mut message_queue_notifier = self.pending_messages.notifier();
+
self.check_prune_stale_webhooks();
let mut webhooks = self.webhooks.lock().unwrap();
@@ -192,7 +194,7 @@ where
LSPS5Response::SetWebhookError(error.clone().into()),
)
.into();
- self.pending_messages.enqueue(&counterparty_node_id, msg);
+ message_queue_notifier.enqueue(&counterparty_node_id, msg);
return Err(LightningError {
err: error.message().into(),
action: ErrorAction::IgnoreAndLog(Level::Info),
@@ -221,7 +223,7 @@ where
LSPS5Response::SetWebhookError(e.clone().into()),
)
.into();
- self.pending_messages.enqueue(&counterparty_node_id, msg);
+ message_queue_notifier.enqueue(&counterparty_node_id, msg);
LightningError {
err: e.message().into(),
action: ErrorAction::IgnoreAndLog(Level::Info),
@@ -238,7 +240,7 @@ where
}),
)
.into();
- self.pending_messages.enqueue(&counterparty_node_id, msg);
+ message_queue_notifier.enqueue(&counterparty_node_id, msg);
Ok(())
}
@@ -246,6 +248,8 @@ where
&self, counterparty_node_id: PublicKey, request_id: LSPSRequestId,
_params: ListWebhooksRequest,
) -> Result<(), LightningError> {
+ let mut message_queue_notifier = self.pending_messages.notifier();
+
self.check_prune_stale_webhooks();
let webhooks = self.webhooks.lock().unwrap();
@@ -259,7 +263,7 @@ where
let response = ListWebhooksResponse { app_names, max_webhooks };
let msg = LSPS5Message::Response(request_id, LSPS5Response::ListWebhooks(response)).into();
- self.pending_messages.enqueue(&counterparty_node_id, msg);
+ message_queue_notifier.enqueue(&counterparty_node_id, msg);
Ok(())
}
@@ -268,6 +272,8 @@ where
&self, counterparty_node_id: PublicKey, request_id: LSPSRequestId,
params: RemoveWebhookRequest,
) -> Result<(), LightningError> {
+ let mut message_queue_notifier = self.pending_messages.notifier();
+
self.check_prune_stale_webhooks();
let mut webhooks = self.webhooks.lock().unwrap();
@@ -278,7 +284,7 @@ where
let msg =
LSPS5Message::Response(request_id, LSPS5Response::RemoveWebhook(response))
.into();
- self.pending_messages.enqueue(&counterparty_node_id, msg);
+ message_queue_notifier.enqueue(&counterparty_node_id, msg);
return Ok(());
}
@@ -291,7 +297,7 @@ where
)
.into();
- self.pending_messages.enqueue(&counterparty_node_id, msg);
+ message_queue_notifier.enqueue(&counterparty_node_id, msg);
return Err(LightningError {
err: error.message().into(),
action: ErrorAction::IgnoreAndLog(Level::Info),
diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs
index 6d412bd966a..bba3f6ea0d3 100644
--- a/lightning-liquidity/src/manager.rs
+++ b/lightning-liquidity/src/manager.rs
@@ -633,13 +633,15 @@ where
LSPSMessage::from_str_with_id_map(&msg.payload, &mut request_id_to_method_map)
}
.map_err(|_| {
+ let mut message_queue_notifier = self.pending_messages.notifier();
+
let error = LSPSResponseError {
code: JSONRPC_INVALID_MESSAGE_ERROR_CODE,
message: JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE.to_string(),
data: None,
};
- self.pending_messages.enqueue(&sender_node_id, LSPSMessage::Invalid(error));
+ message_queue_notifier.enqueue(&sender_node_id, LSPSMessage::Invalid(error));
self.ignored_peers.write().unwrap().insert(sender_node_id);
let err = format!(
"Failed to deserialize invalid LSPS message. Ignoring peer {} from now on.",
diff --git a/lightning-liquidity/src/message_queue.rs b/lightning-liquidity/src/message_queue.rs
index 58060862f07..45b3c7f48af 100644
--- a/lightning-liquidity/src/message_queue.rs
+++ b/lightning-liquidity/src/message_queue.rs
@@ -33,11 +33,29 @@ impl MessageQueue {
self.pending_msgs_notifier.get_future()
}
- pub(crate) fn enqueue(&self, counterparty_node_id: &PublicKey, msg: LSPSMessage) {
- {
- let mut queue = self.queue.lock().unwrap();
- queue.push_back((*counterparty_node_id, msg));
+ pub(crate) fn notifier(&self) -> MessageQueueNotifierGuard {
+ MessageQueueNotifierGuard { msg_queue: self, buffer: VecDeque::new() }
+ }
+}
+
+// A guard type that will process buffered messages and wake the background processor when dropped.
+#[must_use]
+pub(crate) struct MessageQueueNotifierGuard<'a> {
+ msg_queue: &'a MessageQueue,
+ buffer: VecDeque<(PublicKey, LSPSMessage)>,
+}
+
+impl<'a> MessageQueueNotifierGuard<'a> {
+ pub fn enqueue(&mut self, counterparty_node_id: &PublicKey, msg: LSPSMessage) {
+ self.buffer.push_back((*counterparty_node_id, msg));
+ }
+}
+
+impl<'a> Drop for MessageQueueNotifierGuard<'a> {
+ fn drop(&mut self) {
+ if !self.buffer.is_empty() {
+ self.msg_queue.queue.lock().unwrap().append(&mut self.buffer);
+ self.msg_queue.pending_msgs_notifier.notify();
}
- self.pending_msgs_notifier.notify();
}
}