Skip to content

lightning-liquidity: Introduce MessageQueueNotifierGuard type #3981

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lightning-liquidity/src/lsps0/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ where
/// specifcation](https://github.com/lightning/blips/blob/master/blip-0050.md#lsps-specification-support-query)
/// for more information.
pub fn list_protocols(&self, counterparty_node_id: &PublicKey) {
let mut message_queue_notifier = self.pending_messages.notifier();

let msg = LSPS0Message::Request(
utils::generate_request_id(&self.entropy_source),
LSPS0Request::ListProtocols(LSPS0ListProtocolsRequest {}),
);

self.pending_messages.enqueue(counterparty_node_id, msg.into());
message_queue_notifier.enqueue(counterparty_node_id, msg.into());
}

fn handle_response(
Expand Down
4 changes: 3 additions & 1 deletion lightning-liquidity/src/lsps0/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ impl LSPS0ServiceHandler {
fn handle_request(
&self, request_id: LSPSRequestId, request: LSPS0Request, counterparty_node_id: &PublicKey,
) -> Result<(), lightning::ln::msgs::LightningError> {
let mut message_queue_notifier = self.pending_messages.notifier();

match request {
LSPS0Request::ListProtocols(_) => {
let msg = LSPS0Message::Response(
Expand All @@ -48,7 +50,7 @@ impl LSPS0ServiceHandler {
protocols: self.protocols.clone(),
}),
);
self.pending_messages.enqueue(counterparty_node_id, msg.into());
message_queue_notifier.enqueue(counterparty_node_id, msg.into());
Ok(())
},
}
Expand Down
61 changes: 26 additions & 35 deletions lightning-liquidity/src/lsps1/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ where
///
/// [`SupportedOptionsReady`]: crate::lsps1::event::LSPS1ClientEvent::SupportedOptionsReady
pub fn request_supported_options(&self, counterparty_node_id: PublicKey) -> LSPSRequestId {
let mut message_queue_notifier = self.pending_messages.notifier();

let request_id = crate::utils::generate_request_id(&self.entropy_source);
{
let mut outer_state_lock = self.per_peer_state.write().unwrap();
Expand All @@ -102,7 +104,7 @@ where

let request = LSPS1Request::GetInfo(LSPS1GetInfoRequest {});
let msg = LSPS1Message::Request(request_id.clone(), request).into();
self.pending_messages.enqueue(&counterparty_node_id, msg);
message_queue_notifier.enqueue(&counterparty_node_id, msg);
request_id
}

Expand Down Expand Up @@ -198,27 +200,21 @@ where
&self, counterparty_node_id: &PublicKey, order: LSPS1OrderParams,
refund_onchain_address: Option<Address>,
) -> LSPSRequestId {
let (request_id, request_msg) = {
let mut outer_state_lock = self.per_peer_state.write().unwrap();
let inner_state_lock = outer_state_lock
.entry(*counterparty_node_id)
.or_insert(Mutex::new(PeerState::default()));
let mut peer_state_lock = inner_state_lock.lock().unwrap();
let mut message_queue_notifier = self.pending_messages.notifier();

let request_id = crate::utils::generate_request_id(&self.entropy_source);
let request = LSPS1Request::CreateOrder(LSPS1CreateOrderRequest {
order,
refund_onchain_address,
});
let msg = LSPS1Message::Request(request_id.clone(), request).into();
peer_state_lock.pending_create_order_requests.insert(request_id.clone());
let mut outer_state_lock = self.per_peer_state.write().unwrap();
let inner_state_lock = outer_state_lock
.entry(*counterparty_node_id)
.or_insert(Mutex::new(PeerState::default()));
let mut peer_state_lock = inner_state_lock.lock().unwrap();

(request_id, Some(msg))
};
let request_id = crate::utils::generate_request_id(&self.entropy_source);
let request =
LSPS1Request::CreateOrder(LSPS1CreateOrderRequest { order, refund_onchain_address });
let msg = LSPS1Message::Request(request_id.clone(), request).into();
peer_state_lock.pending_create_order_requests.insert(request_id.clone());

if let Some(msg) = request_msg {
self.pending_messages.enqueue(&counterparty_node_id, msg);
}
message_queue_notifier.enqueue(&counterparty_node_id, msg);

request_id
}
Expand Down Expand Up @@ -322,26 +318,21 @@ where
pub fn check_order_status(
&self, counterparty_node_id: &PublicKey, order_id: LSPS1OrderId,
) -> LSPSRequestId {
let (request_id, request_msg) = {
let mut outer_state_lock = self.per_peer_state.write().unwrap();
let inner_state_lock = outer_state_lock
.entry(*counterparty_node_id)
.or_insert(Mutex::new(PeerState::default()));
let mut peer_state_lock = inner_state_lock.lock().unwrap();
let mut message_queue_notifier = self.pending_messages.notifier();

let request_id = crate::utils::generate_request_id(&self.entropy_source);
peer_state_lock.pending_get_order_requests.insert(request_id.clone());
let mut outer_state_lock = self.per_peer_state.write().unwrap();
let inner_state_lock = outer_state_lock
.entry(*counterparty_node_id)
.or_insert(Mutex::new(PeerState::default()));
let mut peer_state_lock = inner_state_lock.lock().unwrap();

let request =
LSPS1Request::GetOrder(LSPS1GetOrderRequest { order_id: order_id.clone() });
let msg = LSPS1Message::Request(request_id.clone(), request).into();
let request_id = crate::utils::generate_request_id(&self.entropy_source);
peer_state_lock.pending_get_order_requests.insert(request_id.clone());

(request_id, Some(msg))
};
let request = LSPS1Request::GetOrder(LSPS1GetOrderRequest { order_id: order_id.clone() });
let msg = LSPS1Message::Request(request_id.clone(), request).into();

if let Some(msg) = request_msg {
self.pending_messages.enqueue(&counterparty_node_id, msg);
}
message_queue_notifier.enqueue(&counterparty_node_id, msg);

request_id
}
Expand Down
177 changes: 74 additions & 103 deletions lightning-liquidity/src/lsps1/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ where
fn handle_get_info_request(
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
) -> Result<(), LightningError> {
let mut message_queue_notifier = self.pending_messages.notifier();

let response = LSPS1Response::GetInfo(LSPS1GetInfoResponse {
options: self
.config
Expand All @@ -190,15 +192,17 @@ where
});

let msg = LSPS1Message::Response(request_id, response).into();
self.pending_messages.enqueue(counterparty_node_id, msg);
message_queue_notifier.enqueue(counterparty_node_id, msg);
Ok(())
}

fn handle_create_order_request(
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
params: LSPS1CreateOrderRequest,
) -> Result<(), LightningError> {
let mut message_queue_notifier = self.pending_messages.notifier();
let event_queue_notifier = self.pending_events.notifier();

if !is_valid(&params.order, &self.config.supported_options.as_ref().unwrap()) {
let response = LSPS1Response::CreateOrderError(LSPSResponseError {
code: LSPS1_CREATE_ORDER_REQUEST_ORDER_MISMATCH_ERROR_CODE,
Expand All @@ -209,7 +213,7 @@ where
)),
});
let msg = LSPS1Message::Response(request_id, response).into();
self.pending_messages.enqueue(counterparty_node_id, msg);
message_queue_notifier.enqueue(counterparty_node_id, msg);
return Err(LightningError {
err: format!(
"Client order does not match any supported options: {:?}",
Expand Down Expand Up @@ -250,66 +254,47 @@ where
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
payment: LSPS1PaymentInfo, created_at: LSPSDateTime,
) -> Result<(), APIError> {
let (result, response) = {
let outer_state_lock = self.per_peer_state.read().unwrap();

match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
let mut peer_state_lock = inner_state_lock.lock().unwrap();

match peer_state_lock.pending_requests.remove(&request_id) {
Some(LSPS1Request::CreateOrder(params)) => {
let order_id = self.generate_order_id();
let channel = OutboundCRChannel::new(
params.order.clone(),
created_at.clone(),
order_id.clone(),
payment.clone(),
);

peer_state_lock.insert_outbound_channel(order_id.clone(), channel);

let response = LSPS1Response::CreateOrder(LSPS1CreateOrderResponse {
order: params.order,
order_id,
order_state: LSPS1OrderState::Created,
created_at,
payment,
channel: None,
});

(Ok(()), Some(response))
},

_ => (
Err(APIError::APIMisuseError {
err: format!(
"No pending buy request for request_id: {:?}",
request_id
),
}),
None,
),
}
},
None => (
Err(APIError::APIMisuseError {
err: format!(
"No state for the counterparty exists: {:?}",
counterparty_node_id
),
}),
None,
),
}
};
let mut message_queue_notifier = self.pending_messages.notifier();

if let Some(response) = response {
let msg = LSPS1Message::Response(request_id, response).into();
self.pending_messages.enqueue(counterparty_node_id, msg);
}
let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
let mut peer_state_lock = inner_state_lock.lock().unwrap();

match peer_state_lock.pending_requests.remove(&request_id) {
Some(LSPS1Request::CreateOrder(params)) => {
let order_id = self.generate_order_id();
let channel = OutboundCRChannel::new(
params.order.clone(),
created_at.clone(),
order_id.clone(),
payment.clone(),
);

peer_state_lock.insert_outbound_channel(order_id.clone(), channel);

let response = LSPS1Response::CreateOrder(LSPS1CreateOrderResponse {
order: params.order,
order_id,
order_state: LSPS1OrderState::Created,
created_at,
payment,
channel: None,
});
let msg = LSPS1Message::Response(request_id, response).into();
message_queue_notifier.enqueue(counterparty_node_id, msg);
Ok(())
},

result
_ => Err(APIError::APIMisuseError {
err: format!("No pending buy request for request_id: {:?}", request_id),
}),
}
},
None => Err(APIError::APIMisuseError {
err: format!("No state for the counterparty exists: {:?}", counterparty_node_id),
}),
}
}

fn handle_get_order_request(
Expand Down Expand Up @@ -376,54 +361,40 @@ where
&self, request_id: LSPSRequestId, counterparty_node_id: PublicKey, order_id: LSPS1OrderId,
order_state: LSPS1OrderState, channel: Option<LSPS1ChannelInfo>,
) -> Result<(), APIError> {
let (result, response) = {
let outer_state_lock = self.per_peer_state.read().unwrap();
let mut message_queue_notifier = self.pending_messages.notifier();

match outer_state_lock.get(&counterparty_node_id) {
Some(inner_state_lock) => {
let mut peer_state_lock = inner_state_lock.lock().unwrap();
let outer_state_lock = self.per_peer_state.read().unwrap();

if let Some(outbound_channel) =
peer_state_lock.outbound_channels_by_order_id.get_mut(&order_id)
{
let config = &outbound_channel.config;
match outer_state_lock.get(&counterparty_node_id) {
Some(inner_state_lock) => {
let mut peer_state_lock = inner_state_lock.lock().unwrap();

let response = LSPS1Response::GetOrder(LSPS1CreateOrderResponse {
order_id,
order: config.order.clone(),
order_state,
created_at: config.created_at.clone(),
payment: config.payment.clone(),
channel,
});
(Ok(()), Some(response))
} else {
(
Err(APIError::APIMisuseError {
err: format!("Channel with order_id {} not found", order_id.0),
}),
None,
)
}
},
None => (
if let Some(outbound_channel) =
peer_state_lock.outbound_channels_by_order_id.get_mut(&order_id)
{
let config = &outbound_channel.config;

let response = LSPS1Response::GetOrder(LSPS1CreateOrderResponse {
order_id,
order: config.order.clone(),
order_state,
created_at: config.created_at.clone(),
payment: config.payment.clone(),
channel,
});
let msg = LSPS1Message::Response(request_id, response).into();
message_queue_notifier.enqueue(&counterparty_node_id, msg);
Ok(())
} else {
Err(APIError::APIMisuseError {
err: format!(
"No existing state with counterparty {}",
counterparty_node_id
),
}),
None,
),
}
};

if let Some(response) = response {
let msg = LSPS1Message::Response(request_id, response).into();
self.pending_messages.enqueue(&counterparty_node_id, msg);
err: format!("Channel with order_id {} not found", order_id.0),
})
}
},
None => Err(APIError::APIMisuseError {
err: format!("No existing state with counterparty {}", counterparty_node_id),
}),
}

result
}

fn generate_order_id(&self) -> LSPS1OrderId {
Expand Down
8 changes: 6 additions & 2 deletions lightning-liquidity/src/lsps2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ where
pub fn request_opening_params(
&self, counterparty_node_id: PublicKey, token: Option<String>,
) -> LSPSRequestId {
let mut message_queue_notifier = self.pending_messages.notifier();

let request_id = crate::utils::generate_request_id(&self.entropy_source);

{
Expand All @@ -131,7 +133,7 @@ where

let request = LSPS2Request::GetInfo(LSPS2GetInfoRequest { token });
let msg = LSPS2Message::Request(request_id.clone(), request).into();
self.pending_messages.enqueue(&counterparty_node_id, msg);
message_queue_notifier.enqueue(&counterparty_node_id, msg);

request_id
}
Expand Down Expand Up @@ -160,6 +162,8 @@ where
&self, counterparty_node_id: PublicKey, payment_size_msat: Option<u64>,
opening_fee_params: LSPS2OpeningFeeParams,
) -> Result<LSPSRequestId, APIError> {
let mut message_queue_notifier = self.pending_messages.notifier();

let request_id = crate::utils::generate_request_id(&self.entropy_source);

{
Expand All @@ -184,7 +188,7 @@ where

let request = LSPS2Request::Buy(LSPS2BuyRequest { opening_fee_params, payment_size_msat });
let msg = LSPS2Message::Request(request_id.clone(), request).into();
self.pending_messages.enqueue(&counterparty_node_id, msg);
message_queue_notifier.enqueue(&counterparty_node_id, msg);

Ok(request_id)
}
Expand Down
Loading
Loading