Skip to content

Commit baa8db4

Browse files
committed
Introduce non-blocking event queue access
So far we only offererd the blocking `Node::next_event` variant which would block the current thread until an event becomes available. However, non-blocking polling of the event queue is useful, it for example allows to handle events in single-threaded control flow environments. Here we add a simple non-blocking variant under the same name as previously and rename the previously existing method `wait_next_event` to highlight its blockiness.
1 parent 80d62d4 commit baa8db4

File tree

6 files changed

+50
-19
lines changed

6 files changed

+50
-19
lines changed

bindings/kotlin/ldk-node-jvm/lib/src/test/kotlin/org/lightningdevkit/ldknode/LibraryTest.kt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,12 @@ class LibraryTest {
123123

124124
node1.connectOpenChannel(nodeId2, listenAddress2, 50000u, null, true)
125125

126-
val channelPendingEvent1 = node1.nextEvent()
126+
val channelPendingEvent1 = node1.waitNextEvent()
127127
println("Got event: $channelPendingEvent1")
128128
assert(channelPendingEvent1 is Event.ChannelPending)
129129
node1.eventHandled()
130130

131-
val channelPendingEvent2 = node2.nextEvent()
131+
val channelPendingEvent2 = node2.waitNextEvent()
132132
println("Got event: $channelPendingEvent2")
133133
assert(channelPendingEvent2 is Event.ChannelPending)
134134
node2.eventHandled()
@@ -153,12 +153,12 @@ class LibraryTest {
153153
assert(spendableBalance1AfterOpen < 50000u)
154154
assertEquals(100000u, spendableBalance2AfterOpen)
155155

156-
val channelReadyEvent1 = node1.nextEvent()
156+
val channelReadyEvent1 = node1.waitNextEvent()
157157
println("Got event: $channelReadyEvent1")
158158
assert(channelReadyEvent1 is Event.ChannelReady)
159159
node1.eventHandled()
160160

161-
val channelReadyEvent2 = node2.nextEvent()
161+
val channelReadyEvent2 = node2.waitNextEvent()
162162
println("Got event: $channelReadyEvent2")
163163
assert(channelReadyEvent2 is Event.ChannelReady)
164164
node2.eventHandled()
@@ -172,24 +172,24 @@ class LibraryTest {
172172

173173
node1.sendPayment(invoice)
174174

175-
val paymentSuccessfulEvent = node1.nextEvent()
175+
val paymentSuccessfulEvent = node1.waitNextEvent()
176176
println("Got event: $paymentSuccessfulEvent")
177177
assert(paymentSuccessfulEvent is Event.PaymentSuccessful)
178178
node1.eventHandled()
179179

180-
val paymentReceivedEvent = node2.nextEvent()
180+
val paymentReceivedEvent = node2.waitNextEvent()
181181
println("Got event: $paymentReceivedEvent")
182182
assert(paymentReceivedEvent is Event.PaymentReceived)
183183
node2.eventHandled()
184184

185185
node2.closeChannel(channelId, nodeId1)
186186

187-
val channelClosedEvent1 = node1.nextEvent()
187+
val channelClosedEvent1 = node1.waitNextEvent()
188188
println("Got event: $channelClosedEvent1")
189189
assert(channelClosedEvent1 is Event.ChannelClosed)
190190
node1.eventHandled()
191191

192-
val channelClosedEvent2 = node2.nextEvent()
192+
val channelClosedEvent2 = node2.waitNextEvent()
193193
println("Got event: $channelClosedEvent2")
194194
assert(channelClosedEvent2 is Event.ChannelClosed)
195195
node2.eventHandled()

bindings/ldk_node.udl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ interface Node {
2121
void start();
2222
[Throws=NodeError]
2323
void stop();
24-
Event next_event();
24+
Event? next_event();
25+
Event wait_next_event();
2526
void event_handled();
2627
PublicKey node_id();
2728
SocketAddr? listening_address();

src/event.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,15 @@ where
141141
Ok(())
142142
}
143143

144-
pub(crate) fn next_event(&self) -> Event {
144+
pub(crate) fn next_event(&self) -> Option<Event> {
145+
if let Ok(locked_queue) = self.queue.try_lock() {
146+
locked_queue.front().map(|e| e.clone())
147+
} else {
148+
None
149+
}
150+
}
151+
152+
pub(crate) fn wait_next_event(&self) -> Event {
145153
let locked_queue =
146154
self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap();
147155
locked_queue.front().unwrap().clone()
@@ -700,6 +708,7 @@ mod tests {
700708
let store = Arc::new(TestStore::new());
701709
let logger = Arc::new(TestLogger::new());
702710
let event_queue = EventQueue::new(Arc::clone(&store), Arc::clone(&logger));
711+
assert_eq!(event_queue.next_event(), None);
703712

704713
let expected_event = Event::ChannelReady {
705714
channel_id: ChannelId([23u8; 32]),
@@ -710,7 +719,8 @@ mod tests {
710719

711720
// Check we get the expected event and that it is returned until we mark it handled.
712721
for _ in 0..5 {
713-
assert_eq!(event_queue.next_event(), expected_event);
722+
assert_eq!(event_queue.wait_next_event(), expected_event);
723+
assert_eq!(event_queue.next_event(), Some(expected_event.clone()));
714724
assert_eq!(false, store.get_and_clear_did_persist());
715725
}
716726

@@ -720,7 +730,7 @@ mod tests {
720730
.unwrap();
721731
let deser_event_queue =
722732
EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap();
723-
assert_eq!(deser_event_queue.next_event(), expected_event);
733+
assert_eq!(deser_event_queue.wait_next_event(), expected_event);
724734
assert!(!store.get_and_clear_did_persist());
725735

726736
// Check we persisted on `event_handled()`

src/lib.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -828,13 +828,24 @@ impl Node {
828828
Ok(())
829829
}
830830

831-
/// Blocks until the next event is available.
831+
/// Returns the next event in the event queue, if currently available.
832+
///
833+
/// Will return `Some(..)` if an event is available and `None` otherwise.
832834
///
833835
/// **Note:** this will always return the same event until handling is confirmed via [`Node::event_handled`].
834-
pub fn next_event(&self) -> Event {
836+
pub fn next_event(&self) -> Option<Event> {
835837
self.event_queue.next_event()
836838
}
837839

840+
/// Returns the next event in the event queue.
841+
///
842+
/// Will block the current thread until the next event is available.
843+
///
844+
/// **Note:** this will always return the same event until handling is confirmed via [`Node::event_handled`].
845+
pub fn wait_next_event(&self) -> Event {
846+
self.event_queue.wait_next_event()
847+
}
848+
838849
/// Confirm the last retrieved event handled.
839850
///
840851
/// **Note:** This **MUST** be called after each event has been handled.

src/test/functional_tests.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ fn channel_full_cycle() {
3333
assert_eq!(node_a.onchain_balance().unwrap().get_spendable(), premine_amount_sat);
3434
assert_eq!(node_b.onchain_balance().unwrap().get_spendable(), premine_amount_sat);
3535

36+
// Check we haven't got any events yet
37+
assert_eq!(node_a.next_event(), None);
38+
assert_eq!(node_b.next_event(), None);
39+
3640
println!("\nA -- connect_open_channel -> B");
3741
let funding_amount_sat = 80_000;
3842
let push_msat = (funding_amount_sat / 2) * 1000; // balance the channel
@@ -48,9 +52,10 @@ fn channel_full_cycle() {
4852

4953
expect_event!(node_a, ChannelPending);
5054

51-
let funding_txo = match node_b.next_event() {
55+
let funding_txo = match node_b.wait_next_event() {
5256
ref e @ Event::ChannelPending { funding_txo, .. } => {
5357
println!("{} got event {:?}", std::stringify!(node_b), e);
58+
assert_eq!(node_b.next_event().as_ref(), Some(e));
5459
node_b.event_handled();
5560
funding_txo
5661
}
@@ -76,7 +81,7 @@ fn channel_full_cycle() {
7681

7782
expect_event!(node_a, ChannelReady);
7883

79-
let ev = node_b.next_event();
84+
let ev = node_b.wait_next_event();
8085
let channel_id = match ev {
8186
ref e @ Event::ChannelReady { ref channel_id, .. } => {
8287
println!("{} got event {:?}", std::stringify!(node_b), e);
@@ -137,7 +142,7 @@ fn channel_full_cycle() {
137142
let overpaid_amount_msat = invoice_amount_2_msat + 100;
138143
let payment_hash = node_a.send_payment_using_amount(&invoice, overpaid_amount_msat).unwrap();
139144
expect_event!(node_a, PaymentSuccessful);
140-
let received_amount = match node_b.next_event() {
145+
let received_amount = match node_b.wait_next_event() {
141146
ref e @ Event::PaymentReceived { amount_msat, .. } => {
142147
println!("{} got event {:?}", std::stringify!(node_b), e);
143148
node_b.event_handled();
@@ -163,7 +168,7 @@ fn channel_full_cycle() {
163168
node_a.send_payment_using_amount(&variable_amount_invoice, determined_amount_msat).unwrap();
164169

165170
expect_event!(node_a, PaymentSuccessful);
166-
let received_amount = match node_b.next_event() {
171+
let received_amount = match node_b.wait_next_event() {
167172
ref e @ Event::PaymentReceived { amount_msat, .. } => {
168173
println!("{} got event {:?}", std::stringify!(node_b), e);
169174
node_b.event_handled();
@@ -201,6 +206,10 @@ fn channel_full_cycle() {
201206
let expected_final_amount_node_b_sat = premine_amount_sat + sum_of_all_payments_sat;
202207
assert_eq!(node_b.onchain_balance().unwrap().get_spendable(), expected_final_amount_node_b_sat);
203208

209+
// Check we handled all events
210+
assert_eq!(node_a.next_event(), None);
211+
assert_eq!(node_b.next_event(), None);
212+
204213
node_a.stop().unwrap();
205214
println!("\nA stopped");
206215
node_b.stop().unwrap();

src/test/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use std::time::Duration;
2727

2828
macro_rules! expect_event {
2929
($node: expr, $event_type: ident) => {{
30-
match $node.next_event() {
30+
match $node.wait_next_event() {
3131
ref e @ Event::$event_type { .. } => {
3232
println!("{} got event {:?}", std::stringify!($node), e);
3333
$node.event_handled();

0 commit comments

Comments
 (0)