12
12
//! There are a bunch of these as their handling is relatively error-prone so they are split out
13
13
//! here. See also the chanmon_fail_consistency fuzz test.
14
14
15
- use crate :: chain:: channelmonitor:: { ChannelMonitor , ANTI_REORG_DELAY } ;
15
+ use crate :: chain:: chainmonitor:: ChainMonitor ;
16
+ use crate :: chain:: channelmonitor:: { ChannelMonitor , MonitorEvent , ANTI_REORG_DELAY } ;
17
+ use crate :: chain:: transaction:: OutPoint ;
16
18
use crate :: chain:: { ChannelMonitorUpdateStatus , Listen , Watch } ;
17
19
use crate :: events:: { ClosureReason , Event , HTLCHandlingFailureType , PaymentPurpose } ;
18
20
use crate :: ln:: channel:: AnnouncementSigsState ;
@@ -22,6 +24,13 @@ use crate::ln::msgs::{
22
24
BaseMessageHandler , ChannelMessageHandler , MessageSendEvent , RoutingMessageHandler ,
23
25
} ;
24
26
use crate :: ln:: types:: ChannelId ;
27
+ use crate :: sign:: NodeSigner ;
28
+ use crate :: util:: native_async:: FutureQueue ;
29
+ use crate :: util:: persist:: {
30
+ MonitorName , MonitorUpdatingPersisterAsync , CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
31
+ CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
32
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
33
+ } ;
25
34
use crate :: util:: ser:: { ReadableArgs , Writeable } ;
26
35
use crate :: util:: test_channel_signer:: TestChannelSigner ;
27
36
use crate :: util:: test_utils:: TestBroadcaster ;
@@ -4847,3 +4856,200 @@ fn test_single_channel_multiple_mpp() {
4847
4856
nodes[ 7 ] . node . handle_revoke_and_ack ( node_i_id, & raa) ;
4848
4857
check_added_monitors ( & nodes[ 7 ] , 1 ) ;
4849
4858
}
4859
+
4860
+ #[ test]
4861
+ fn native_async_persist ( ) {
4862
+ // Test ChainMonitor::new_async_beta and the backing MonitorUpdatingPersisterAsync.
4863
+ //
4864
+ // Because our test utils aren't really set up for such utils, we simply test them directly,
4865
+ // first spinning up some nodes to create a `ChannelMonitor` and some `ChannelMonitorUpdate`s
4866
+ // we can apply.
4867
+ let ( monitor, updates) ;
4868
+ let mut chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
4869
+ let node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
4870
+ let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
4871
+ let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
4872
+
4873
+ let ( _, _, chan_id, funding_tx) = create_announced_chan_between_nodes ( & nodes, 0 , 1 ) ;
4874
+
4875
+ monitor = get_monitor ! ( nodes[ 0 ] , chan_id) . clone ( ) ;
4876
+ send_payment ( & nodes[ 0 ] , & [ & nodes[ 1 ] ] , 1_000_000 ) ;
4877
+ let mon_updates =
4878
+ nodes[ 0 ] . chain_monitor . monitor_updates . lock ( ) . unwrap ( ) . remove ( & chan_id) . unwrap ( ) ;
4879
+ updates = mon_updates. into_iter ( ) . collect :: < Vec < _ > > ( ) ;
4880
+ assert ! ( updates. len( ) >= 4 , "The test below needs at least four updates" ) ;
4881
+
4882
+ core:: mem:: drop ( nodes) ;
4883
+ core:: mem:: drop ( node_chanmgrs) ;
4884
+ core:: mem:: drop ( node_cfgs) ;
4885
+
4886
+ let node_0_utils = chanmon_cfgs. remove ( 0 ) ;
4887
+ let ( logger, keys_manager, tx_broadcaster, fee_estimator) = (
4888
+ node_0_utils. logger ,
4889
+ node_0_utils. keys_manager ,
4890
+ node_0_utils. tx_broadcaster ,
4891
+ node_0_utils. fee_estimator ,
4892
+ ) ;
4893
+
4894
+ // Now that we have some updates, build a new ChainMonitor with a backing async KVStore.
4895
+ let logger = Arc :: new ( logger) ;
4896
+ let keys_manager = Arc :: new ( keys_manager) ;
4897
+ let tx_broadcaster = Arc :: new ( tx_broadcaster) ;
4898
+ let fee_estimator = Arc :: new ( fee_estimator) ;
4899
+
4900
+ let kv_store = Arc :: new ( test_utils:: TestStore :: new ( false ) ) ;
4901
+ let persist_futures = Arc :: new ( FutureQueue :: new ( ) ) ;
4902
+ let native_async_persister = MonitorUpdatingPersisterAsync :: new (
4903
+ Arc :: clone ( & kv_store) ,
4904
+ Arc :: clone ( & persist_futures) ,
4905
+ Arc :: clone ( & logger) ,
4906
+ 42 ,
4907
+ Arc :: clone ( & keys_manager) ,
4908
+ Arc :: clone ( & keys_manager) ,
4909
+ Arc :: clone ( & tx_broadcaster) ,
4910
+ Arc :: clone ( & fee_estimator) ,
4911
+ ) ;
4912
+ let chain_source = test_utils:: TestChainSource :: new ( Network :: Testnet ) ;
4913
+ let async_chain_monitor = ChainMonitor :: new_async_beta (
4914
+ Some ( & chain_source) ,
4915
+ tx_broadcaster,
4916
+ logger,
4917
+ fee_estimator,
4918
+ native_async_persister,
4919
+ Arc :: clone ( & keys_manager) ,
4920
+ keys_manager. get_peer_storage_key ( ) ,
4921
+ ) ;
4922
+
4923
+ // Write the initial ChannelMonitor async, testing primarily that the `MonitorEvent::Completed`
4924
+ // isn't returned until the write is completed (via `complete_all_async_writes`) and the future
4925
+ // is `poll`ed (which a background spawn should do automatically in production, but which is
4926
+ // needed to get the future completion through to the `ChainMonitor`).
4927
+ let write_status = async_chain_monitor. watch_channel ( chan_id, monitor) . unwrap ( ) ;
4928
+ assert_eq ! ( write_status, ChannelMonitorUpdateStatus :: InProgress ) ;
4929
+
4930
+ // The write will remain pending until we call `complete_all_async_writes`, below.
4931
+ assert_eq ! ( persist_futures. pending_futures( ) , 1 ) ;
4932
+ persist_futures. poll_futures ( ) ;
4933
+ assert_eq ! ( persist_futures. pending_futures( ) , 1 ) ;
4934
+
4935
+ let funding_txo = OutPoint { txid : funding_tx. compute_txid ( ) , index : 0 } ;
4936
+ let key = MonitorName :: V1Channel ( funding_txo) . to_string ( ) ;
4937
+ let pending_writes = kv_store. list_pending_async_writes (
4938
+ CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
4939
+ CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
4940
+ & key,
4941
+ ) ;
4942
+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
4943
+
4944
+ // Once we complete the future, the write will still be pending until the future gets `poll`ed.
4945
+ kv_store. complete_all_async_writes ( ) ;
4946
+ assert_eq ! ( persist_futures. pending_futures( ) , 1 ) ;
4947
+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
4948
+
4949
+ assert_eq ! ( persist_futures. pending_futures( ) , 1 ) ;
4950
+ persist_futures. poll_futures ( ) ;
4951
+ assert_eq ! ( persist_futures. pending_futures( ) , 0 ) ;
4952
+
4953
+ let completed_persist = async_chain_monitor. release_pending_monitor_events ( ) ;
4954
+ assert_eq ! ( completed_persist. len( ) , 1 ) ;
4955
+ assert_eq ! ( completed_persist[ 0 ] . 2 . len( ) , 1 ) ;
4956
+ assert ! ( matches!( completed_persist[ 0 ] . 2 [ 0 ] , MonitorEvent :: Completed { .. } ) ) ;
4957
+
4958
+ // Now test two async `ChannelMonitorUpdate`s in flight at once, completing them in-order but
4959
+ // separately.
4960
+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 0 ] ) ;
4961
+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
4962
+
4963
+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 1 ] ) ;
4964
+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
4965
+
4966
+ persist_futures. poll_futures ( ) ;
4967
+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
4968
+
4969
+ let pending_writes = kv_store. list_pending_async_writes (
4970
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4971
+ & key,
4972
+ "1" ,
4973
+ ) ;
4974
+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
4975
+ let pending_writes = kv_store. list_pending_async_writes (
4976
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4977
+ & key,
4978
+ "2" ,
4979
+ ) ;
4980
+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
4981
+
4982
+ kv_store. complete_async_writes_through (
4983
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4984
+ & key,
4985
+ "1" ,
4986
+ usize:: MAX ,
4987
+ ) ;
4988
+ persist_futures. poll_futures ( ) ;
4989
+ // While the `ChainMonitor` could return a `MonitorEvent::Completed` here, it currently
4990
+ // doesn't. If that ever changes we should validate that the `Completed` event has the correct
4991
+ // `monitor_update_id` (1).
4992
+ assert ! ( async_chain_monitor. release_pending_monitor_events( ) . is_empty( ) ) ;
4993
+
4994
+ kv_store. complete_async_writes_through (
4995
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4996
+ & key,
4997
+ "2" ,
4998
+ usize:: MAX ,
4999
+ ) ;
5000
+ persist_futures. poll_futures ( ) ;
5001
+ let completed_persist = async_chain_monitor. release_pending_monitor_events ( ) ;
5002
+ assert_eq ! ( completed_persist. len( ) , 1 ) ;
5003
+ assert_eq ! ( completed_persist[ 0 ] . 2 . len( ) , 1 ) ;
5004
+ assert ! ( matches!( completed_persist[ 0 ] . 2 [ 0 ] , MonitorEvent :: Completed { .. } ) ) ;
5005
+
5006
+ // Finally, test two async `ChanelMonitorUpdate`s in flight at once, completing them
5007
+ // out-of-order and ensuring that no `MonitorEvent::Completed` is generated until they are both
5008
+ // completed (and that it marks both as completed when it is generated).
5009
+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 2 ] ) ;
5010
+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
5011
+
5012
+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 3 ] ) ;
5013
+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
5014
+
5015
+ persist_futures. poll_futures ( ) ;
5016
+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
5017
+
5018
+ let pending_writes = kv_store. list_pending_async_writes (
5019
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5020
+ & key,
5021
+ "3" ,
5022
+ ) ;
5023
+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
5024
+ let pending_writes = kv_store. list_pending_async_writes (
5025
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5026
+ & key,
5027
+ "4" ,
5028
+ ) ;
5029
+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
5030
+
5031
+ kv_store. complete_async_writes_through (
5032
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5033
+ & key,
5034
+ "4" ,
5035
+ usize:: MAX ,
5036
+ ) ;
5037
+ persist_futures. poll_futures ( ) ;
5038
+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
5039
+
5040
+ kv_store. complete_async_writes_through (
5041
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5042
+ & key,
5043
+ "3" ,
5044
+ usize:: MAX ,
5045
+ ) ;
5046
+ persist_futures. poll_futures ( ) ;
5047
+ let completed_persist = async_chain_monitor. release_pending_monitor_events ( ) ;
5048
+ assert_eq ! ( completed_persist. len( ) , 1 ) ;
5049
+ assert_eq ! ( completed_persist[ 0 ] . 2 . len( ) , 1 ) ;
5050
+ if let MonitorEvent :: Completed { monitor_update_id, .. } = & completed_persist[ 0 ] . 2 [ 0 ] {
5051
+ assert_eq ! ( * monitor_update_id, 4 ) ;
5052
+ } else {
5053
+ panic ! ( ) ;
5054
+ }
5055
+ }
0 commit comments