Skip to content

Introduce non-blocking event queue access #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ class LibraryTest {

node1.connectOpenChannel(nodeId2, listenAddress2, 50000u, null, true)

val channelPendingEvent1 = node1.nextEvent()
val channelPendingEvent1 = node1.waitNextEvent()
println("Got event: $channelPendingEvent1")
assert(channelPendingEvent1 is Event.ChannelPending)
node1.eventHandled()

val channelPendingEvent2 = node2.nextEvent()
val channelPendingEvent2 = node2.waitNextEvent()
println("Got event: $channelPendingEvent2")
assert(channelPendingEvent2 is Event.ChannelPending)
node2.eventHandled()
Expand All @@ -153,12 +153,12 @@ class LibraryTest {
assert(spendableBalance1AfterOpen < 50000u)
assertEquals(100000u, spendableBalance2AfterOpen)

val channelReadyEvent1 = node1.nextEvent()
val channelReadyEvent1 = node1.waitNextEvent()
println("Got event: $channelReadyEvent1")
assert(channelReadyEvent1 is Event.ChannelReady)
node1.eventHandled()

val channelReadyEvent2 = node2.nextEvent()
val channelReadyEvent2 = node2.waitNextEvent()
println("Got event: $channelReadyEvent2")
assert(channelReadyEvent2 is Event.ChannelReady)
node2.eventHandled()
Expand All @@ -172,24 +172,24 @@ class LibraryTest {

node1.sendPayment(invoice)

val paymentSuccessfulEvent = node1.nextEvent()
val paymentSuccessfulEvent = node1.waitNextEvent()
println("Got event: $paymentSuccessfulEvent")
assert(paymentSuccessfulEvent is Event.PaymentSuccessful)
node1.eventHandled()

val paymentReceivedEvent = node2.nextEvent()
val paymentReceivedEvent = node2.waitNextEvent()
println("Got event: $paymentReceivedEvent")
assert(paymentReceivedEvent is Event.PaymentReceived)
node2.eventHandled()

node2.closeChannel(channelId, nodeId1)

val channelClosedEvent1 = node1.nextEvent()
val channelClosedEvent1 = node1.waitNextEvent()
println("Got event: $channelClosedEvent1")
assert(channelClosedEvent1 is Event.ChannelClosed)
node1.eventHandled()

val channelClosedEvent2 = node2.nextEvent()
val channelClosedEvent2 = node2.waitNextEvent()
println("Got event: $channelClosedEvent2")
assert(channelClosedEvent2 is Event.ChannelClosed)
node2.eventHandled()
Expand Down
3 changes: 2 additions & 1 deletion bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ interface Node {
void start();
[Throws=NodeError]
void stop();
Event next_event();
Event? next_event();
Event wait_next_event();
void event_handled();
PublicKey node_id();
SocketAddr? listening_address();
Expand Down
13 changes: 10 additions & 3 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,12 @@ where
Ok(())
}

pub(crate) fn next_event(&self) -> Event {
pub(crate) fn next_event(&self) -> Option<Event> {
let locked_queue = self.queue.lock().unwrap();
locked_queue.front().map(|e| e.clone())
}

pub(crate) fn wait_next_event(&self) -> Event {
let locked_queue =
self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap();
locked_queue.front().unwrap().clone()
Expand Down Expand Up @@ -700,6 +705,7 @@ mod tests {
let store = Arc::new(TestStore::new());
let logger = Arc::new(TestLogger::new());
let event_queue = EventQueue::new(Arc::clone(&store), Arc::clone(&logger));
assert_eq!(event_queue.next_event(), None);

let expected_event = Event::ChannelReady {
channel_id: ChannelId([23u8; 32]),
Expand All @@ -710,7 +716,8 @@ mod tests {

// Check we get the expected event and that it is returned until we mark it handled.
for _ in 0..5 {
assert_eq!(event_queue.next_event(), expected_event);
assert_eq!(event_queue.wait_next_event(), expected_event);
assert_eq!(event_queue.next_event(), Some(expected_event.clone()));
assert_eq!(false, store.get_and_clear_did_persist());
}

Expand All @@ -720,7 +727,7 @@ mod tests {
.unwrap();
let deser_event_queue =
EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap();
assert_eq!(deser_event_queue.next_event(), expected_event);
assert_eq!(deser_event_queue.wait_next_event(), expected_event);
assert!(!store.get_and_clear_did_persist());

// Check we persisted on `event_handled()`
Expand Down
15 changes: 13 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,13 +828,24 @@ impl Node {
Ok(())
}

/// Blocks until the next event is available.
/// Returns the next event in the event queue, if currently available.
///
/// Will return `Some(..)` if an event is available and `None` otherwise.
///
/// **Note:** this will always return the same event until handling is confirmed via [`Node::event_handled`].
pub fn next_event(&self) -> Event {
pub fn next_event(&self) -> Option<Event> {
self.event_queue.next_event()
}

/// Returns the next event in the event queue.
///
/// Will block the current thread until the next event is available.
///
/// **Note:** this will always return the same event until handling is confirmed via [`Node::event_handled`].
pub fn wait_next_event(&self) -> Event {
self.event_queue.wait_next_event()
}

/// Confirm the last retrieved event handled.
///
/// **Note:** This **MUST** be called after each event has been handled.
Expand Down
17 changes: 13 additions & 4 deletions src/test/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ fn channel_full_cycle() {
assert_eq!(node_a.onchain_balance().unwrap().get_spendable(), premine_amount_sat);
assert_eq!(node_b.onchain_balance().unwrap().get_spendable(), premine_amount_sat);

// Check we haven't got any events yet
assert_eq!(node_a.next_event(), None);
assert_eq!(node_b.next_event(), None);

println!("\nA -- connect_open_channel -> B");
let funding_amount_sat = 80_000;
let push_msat = (funding_amount_sat / 2) * 1000; // balance the channel
Expand All @@ -48,9 +52,10 @@ fn channel_full_cycle() {

expect_event!(node_a, ChannelPending);

let funding_txo = match node_b.next_event() {
let funding_txo = match node_b.wait_next_event() {
ref e @ Event::ChannelPending { funding_txo, .. } => {
println!("{} got event {:?}", std::stringify!(node_b), e);
assert_eq!(node_b.next_event().as_ref(), Some(e));
node_b.event_handled();
funding_txo
}
Expand All @@ -76,7 +81,7 @@ fn channel_full_cycle() {

expect_event!(node_a, ChannelReady);

let ev = node_b.next_event();
let ev = node_b.wait_next_event();
let channel_id = match ev {
ref e @ Event::ChannelReady { ref channel_id, .. } => {
println!("{} got event {:?}", std::stringify!(node_b), e);
Expand Down Expand Up @@ -137,7 +142,7 @@ fn channel_full_cycle() {
let overpaid_amount_msat = invoice_amount_2_msat + 100;
let payment_hash = node_a.send_payment_using_amount(&invoice, overpaid_amount_msat).unwrap();
expect_event!(node_a, PaymentSuccessful);
let received_amount = match node_b.next_event() {
let received_amount = match node_b.wait_next_event() {
ref e @ Event::PaymentReceived { amount_msat, .. } => {
println!("{} got event {:?}", std::stringify!(node_b), e);
node_b.event_handled();
Expand All @@ -163,7 +168,7 @@ fn channel_full_cycle() {
node_a.send_payment_using_amount(&variable_amount_invoice, determined_amount_msat).unwrap();

expect_event!(node_a, PaymentSuccessful);
let received_amount = match node_b.next_event() {
let received_amount = match node_b.wait_next_event() {
ref e @ Event::PaymentReceived { amount_msat, .. } => {
println!("{} got event {:?}", std::stringify!(node_b), e);
node_b.event_handled();
Expand Down Expand Up @@ -201,6 +206,10 @@ fn channel_full_cycle() {
let expected_final_amount_node_b_sat = premine_amount_sat + sum_of_all_payments_sat;
assert_eq!(node_b.onchain_balance().unwrap().get_spendable(), expected_final_amount_node_b_sat);

// Check we handled all events
assert_eq!(node_a.next_event(), None);
assert_eq!(node_b.next_event(), None);

node_a.stop().unwrap();
println!("\nA stopped");
node_b.stop().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/test/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::time::Duration;

macro_rules! expect_event {
($node: expr, $event_type: ident) => {{
match $node.next_event() {
match $node.wait_next_event() {
ref e @ Event::$event_type { .. } => {
println!("{} got event {:?}", std::stringify!($node), e);
$node.event_handled();
Expand Down