Skip to content

Commit 8fd30e8

Browse files
authored
Merge pull request #91 from tnull/2023-05-add-try-next-event
Introduce non-blocking event queue access
2 parents dbc466b + aa850a3 commit 8fd30e8

File tree

6 files changed

+47
-19
lines changed

6 files changed

+47
-19
lines changed

bindings/kotlin/ldk-node-jvm/lib/src/test/kotlin/org/lightningdevkit/ldknode/LibraryTest.kt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,12 @@ class LibraryTest {
123123

124124
node1.connectOpenChannel(nodeId2, listenAddress2, 50000u, null, true)
125125

126-
val channelPendingEvent1 = node1.nextEvent()
126+
val channelPendingEvent1 = node1.waitNextEvent()
127127
println("Got event: $channelPendingEvent1")
128128
assert(channelPendingEvent1 is Event.ChannelPending)
129129
node1.eventHandled()
130130

131-
val channelPendingEvent2 = node2.nextEvent()
131+
val channelPendingEvent2 = node2.waitNextEvent()
132132
println("Got event: $channelPendingEvent2")
133133
assert(channelPendingEvent2 is Event.ChannelPending)
134134
node2.eventHandled()
@@ -153,12 +153,12 @@ class LibraryTest {
153153
assert(spendableBalance1AfterOpen < 50000u)
154154
assertEquals(100000u, spendableBalance2AfterOpen)
155155

156-
val channelReadyEvent1 = node1.nextEvent()
156+
val channelReadyEvent1 = node1.waitNextEvent()
157157
println("Got event: $channelReadyEvent1")
158158
assert(channelReadyEvent1 is Event.ChannelReady)
159159
node1.eventHandled()
160160

161-
val channelReadyEvent2 = node2.nextEvent()
161+
val channelReadyEvent2 = node2.waitNextEvent()
162162
println("Got event: $channelReadyEvent2")
163163
assert(channelReadyEvent2 is Event.ChannelReady)
164164
node2.eventHandled()
@@ -172,24 +172,24 @@ class LibraryTest {
172172

173173
node1.sendPayment(invoice)
174174

175-
val paymentSuccessfulEvent = node1.nextEvent()
175+
val paymentSuccessfulEvent = node1.waitNextEvent()
176176
println("Got event: $paymentSuccessfulEvent")
177177
assert(paymentSuccessfulEvent is Event.PaymentSuccessful)
178178
node1.eventHandled()
179179

180-
val paymentReceivedEvent = node2.nextEvent()
180+
val paymentReceivedEvent = node2.waitNextEvent()
181181
println("Got event: $paymentReceivedEvent")
182182
assert(paymentReceivedEvent is Event.PaymentReceived)
183183
node2.eventHandled()
184184

185185
node2.closeChannel(channelId, nodeId1)
186186

187-
val channelClosedEvent1 = node1.nextEvent()
187+
val channelClosedEvent1 = node1.waitNextEvent()
188188
println("Got event: $channelClosedEvent1")
189189
assert(channelClosedEvent1 is Event.ChannelClosed)
190190
node1.eventHandled()
191191

192-
val channelClosedEvent2 = node2.nextEvent()
192+
val channelClosedEvent2 = node2.waitNextEvent()
193193
println("Got event: $channelClosedEvent2")
194194
assert(channelClosedEvent2 is Event.ChannelClosed)
195195
node2.eventHandled()

bindings/ldk_node.udl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ interface Node {
2121
void start();
2222
[Throws=NodeError]
2323
void stop();
24-
Event next_event();
24+
Event? next_event();
25+
Event wait_next_event();
2526
void event_handled();
2627
PublicKey node_id();
2728
SocketAddr? listening_address();

src/event.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,12 @@ where
141141
Ok(())
142142
}
143143

144-
pub(crate) fn next_event(&self) -> Event {
144+
pub(crate) fn next_event(&self) -> Option<Event> {
145+
let locked_queue = self.queue.lock().unwrap();
146+
locked_queue.front().map(|e| e.clone())
147+
}
148+
149+
pub(crate) fn wait_next_event(&self) -> Event {
145150
let locked_queue =
146151
self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap();
147152
locked_queue.front().unwrap().clone()
@@ -700,6 +705,7 @@ mod tests {
700705
let store = Arc::new(TestStore::new());
701706
let logger = Arc::new(TestLogger::new());
702707
let event_queue = EventQueue::new(Arc::clone(&store), Arc::clone(&logger));
708+
assert_eq!(event_queue.next_event(), None);
703709

704710
let expected_event = Event::ChannelReady {
705711
channel_id: ChannelId([23u8; 32]),
@@ -710,7 +716,8 @@ mod tests {
710716

711717
// Check we get the expected event and that it is returned until we mark it handled.
712718
for _ in 0..5 {
713-
assert_eq!(event_queue.next_event(), expected_event);
719+
assert_eq!(event_queue.wait_next_event(), expected_event);
720+
assert_eq!(event_queue.next_event(), Some(expected_event.clone()));
714721
assert_eq!(false, store.get_and_clear_did_persist());
715722
}
716723

@@ -720,7 +727,7 @@ mod tests {
720727
.unwrap();
721728
let deser_event_queue =
722729
EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap();
723-
assert_eq!(deser_event_queue.next_event(), expected_event);
730+
assert_eq!(deser_event_queue.wait_next_event(), expected_event);
724731
assert!(!store.get_and_clear_did_persist());
725732

726733
// Check we persisted on `event_handled()`

src/lib.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -931,13 +931,24 @@ impl Node {
931931
Ok(())
932932
}
933933

934-
/// Blocks until the next event is available.
934+
/// Returns the next event in the event queue, if currently available.
935+
///
936+
/// Will return `Some(..)` if an event is available and `None` otherwise.
935937
///
936938
/// **Note:** this will always return the same event until handling is confirmed via [`Node::event_handled`].
937-
pub fn next_event(&self) -> Event {
939+
pub fn next_event(&self) -> Option<Event> {
938940
self.event_queue.next_event()
939941
}
940942

943+
/// Returns the next event in the event queue.
944+
///
945+
/// Will block the current thread until the next event is available.
946+
///
947+
/// **Note:** this will always return the same event until handling is confirmed via [`Node::event_handled`].
948+
pub fn wait_next_event(&self) -> Event {
949+
self.event_queue.wait_next_event()
950+
}
951+
941952
/// Confirm the last retrieved event handled.
942953
///
943954
/// **Note:** This **MUST** be called after each event has been handled.

src/test/functional_tests.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ fn channel_full_cycle() {
3333
assert_eq!(node_a.onchain_balance().unwrap().get_spendable(), premine_amount_sat);
3434
assert_eq!(node_b.onchain_balance().unwrap().get_spendable(), premine_amount_sat);
3535

36+
// Check we haven't got any events yet
37+
assert_eq!(node_a.next_event(), None);
38+
assert_eq!(node_b.next_event(), None);
39+
3640
println!("\nA -- connect_open_channel -> B");
3741
let funding_amount_sat = 80_000;
3842
let push_msat = (funding_amount_sat / 2) * 1000; // balance the channel
@@ -49,9 +53,10 @@ fn channel_full_cycle() {
4953
assert_eq!(node_a.list_peers().first().unwrap().node_id, node_b.node_id());
5054
expect_event!(node_a, ChannelPending);
5155

52-
let funding_txo = match node_b.next_event() {
56+
let funding_txo = match node_b.wait_next_event() {
5357
ref e @ Event::ChannelPending { funding_txo, .. } => {
5458
println!("{} got event {:?}", std::stringify!(node_b), e);
59+
assert_eq!(node_b.next_event().as_ref(), Some(e));
5560
node_b.event_handled();
5661
funding_txo
5762
}
@@ -77,7 +82,7 @@ fn channel_full_cycle() {
7782

7883
expect_event!(node_a, ChannelReady);
7984

80-
let ev = node_b.next_event();
85+
let ev = node_b.wait_next_event();
8186
let channel_id = match ev {
8287
ref e @ Event::ChannelReady { ref channel_id, .. } => {
8388
println!("{} got event {:?}", std::stringify!(node_b), e);
@@ -138,7 +143,7 @@ fn channel_full_cycle() {
138143
let overpaid_amount_msat = invoice_amount_2_msat + 100;
139144
let payment_hash = node_a.send_payment_using_amount(&invoice, overpaid_amount_msat).unwrap();
140145
expect_event!(node_a, PaymentSuccessful);
141-
let received_amount = match node_b.next_event() {
146+
let received_amount = match node_b.wait_next_event() {
142147
ref e @ Event::PaymentReceived { amount_msat, .. } => {
143148
println!("{} got event {:?}", std::stringify!(node_b), e);
144149
node_b.event_handled();
@@ -164,7 +169,7 @@ fn channel_full_cycle() {
164169
node_a.send_payment_using_amount(&variable_amount_invoice, determined_amount_msat).unwrap();
165170

166171
expect_event!(node_a, PaymentSuccessful);
167-
let received_amount = match node_b.next_event() {
172+
let received_amount = match node_b.wait_next_event() {
168173
ref e @ Event::PaymentReceived { amount_msat, .. } => {
169174
println!("{} got event {:?}", std::stringify!(node_b), e);
170175
node_b.event_handled();
@@ -202,6 +207,10 @@ fn channel_full_cycle() {
202207
let expected_final_amount_node_b_sat = premine_amount_sat + sum_of_all_payments_sat;
203208
assert_eq!(node_b.onchain_balance().unwrap().get_spendable(), expected_final_amount_node_b_sat);
204209

210+
// Check we handled all events
211+
assert_eq!(node_a.next_event(), None);
212+
assert_eq!(node_b.next_event(), None);
213+
205214
node_a.stop().unwrap();
206215
println!("\nA stopped");
207216
node_b.stop().unwrap();

src/test/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use std::time::Duration;
2727

2828
macro_rules! expect_event {
2929
($node: expr, $event_type: ident) => {{
30-
match $node.next_event() {
30+
match $node.wait_next_event() {
3131
ref e @ Event::$event_type { .. } => {
3232
println!("{} got event {:?}", std::stringify!($node), e);
3333
$node.event_handled();

0 commit comments

Comments
 (0)