12
12
//! There are a bunch of these as their handling is relatively error-prone so they are split out
13
13
//! here. See also the chanmon_fail_consistency fuzz test.
14
14
15
- use crate :: chain:: channelmonitor:: { ChannelMonitor , ANTI_REORG_DELAY } ;
15
+ use crate :: chain:: chainmonitor:: ChainMonitor ;
16
+ use crate :: chain:: channelmonitor:: { ChannelMonitor , ANTI_REORG_DELAY , MonitorEvent } ;
17
+ use crate :: chain:: transaction:: OutPoint ;
16
18
use crate :: chain:: { ChannelMonitorUpdateStatus , Listen , Watch } ;
17
19
use crate :: events:: { ClosureReason , Event , HTLCHandlingFailureType , PaymentPurpose } ;
18
20
use crate :: ln:: channel:: AnnouncementSigsState ;
@@ -22,6 +24,13 @@ use crate::ln::msgs::{
22
24
BaseMessageHandler , ChannelMessageHandler , MessageSendEvent , RoutingMessageHandler ,
23
25
} ;
24
26
use crate :: ln:: types:: ChannelId ;
27
+ use crate :: sign:: NodeSigner ;
28
+ use crate :: util:: native_async:: FutureQueue ;
29
+ use crate :: util:: persist:: {
30
+ CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE , CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
31
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE , MonitorName ,
32
+ MonitorUpdatingPersisterAsync ,
33
+ } ;
25
34
use crate :: util:: ser:: { ReadableArgs , Writeable } ;
26
35
use crate :: util:: test_channel_signer:: TestChannelSigner ;
27
36
use crate :: util:: test_utils:: TestBroadcaster ;
@@ -4847,3 +4856,201 @@ fn test_single_channel_multiple_mpp() {
4847
4856
nodes[ 7 ] . node . handle_revoke_and_ack ( node_i_id, & raa) ;
4848
4857
check_added_monitors ( & nodes[ 7 ] , 1 ) ;
4849
4858
}
4859
+
4860
+ #[ test]
4861
+ fn native_async_persist ( ) {
4862
+ // Test ChainMonitor::new_async_beta and the backing MonitorUpdatingPersisterAsync.
4863
+ //
4864
+ // Because our test utils aren't really set up for such utils, we simply test them directly,
4865
+ // first spinning up some nodes to create a `ChannelMonitor` and some `ChannelMonitorUpdate`s
4866
+ // we can apply.
4867
+ let ( monitor, updates) ;
4868
+ let mut chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
4869
+ let node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
4870
+ let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
4871
+ let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
4872
+
4873
+ let ( _, _, chan_id, funding_tx) = create_announced_chan_between_nodes ( & nodes, 0 , 1 ) ;
4874
+
4875
+ monitor = get_monitor ! ( nodes[ 0 ] , chan_id) . clone ( ) ;
4876
+ send_payment ( & nodes[ 0 ] , & [ & nodes[ 1 ] ] , 1_000_000 ) ;
4877
+ let mon_updates =
4878
+ nodes[ 0 ] . chain_monitor . monitor_updates . lock ( ) . unwrap ( ) . remove ( & chan_id) . unwrap ( ) ;
4879
+ updates = mon_updates. into_iter ( ) . collect :: < Vec < _ > > ( ) ;
4880
+ assert ! ( updates. len( ) >= 4 , "The test below needs at least four updates" ) ;
4881
+
4882
+ core:: mem:: drop ( nodes) ;
4883
+ core:: mem:: drop ( node_chanmgrs) ;
4884
+ core:: mem:: drop ( node_cfgs) ;
4885
+
4886
+ let node_0_utils = chanmon_cfgs. remove ( 0 ) ;
4887
+ let ( logger, keys_manager, tx_broadcaster, fee_estimator) =
4888
+ (
4889
+ node_0_utils. logger ,
4890
+ node_0_utils. keys_manager ,
4891
+ node_0_utils. tx_broadcaster ,
4892
+ node_0_utils. fee_estimator ,
4893
+ ) ;
4894
+
4895
+ // Now that we have some updates, build a new ChainMonitor with a backing async KVStore.
4896
+ let logger = Arc :: new ( logger) ;
4897
+ let keys_manager = Arc :: new ( keys_manager) ;
4898
+ let tx_broadcaster = Arc :: new ( tx_broadcaster) ;
4899
+ let fee_estimator = Arc :: new ( fee_estimator) ;
4900
+
4901
+ let kv_store = Arc :: new ( test_utils:: TestStore :: new ( false ) ) ;
4902
+ let persist_futures = Arc :: new ( FutureQueue :: new ( ) ) ;
4903
+ let native_async_persister = MonitorUpdatingPersisterAsync :: new (
4904
+ Arc :: clone ( & kv_store) ,
4905
+ Arc :: clone ( & persist_futures) ,
4906
+ Arc :: clone ( & logger) ,
4907
+ 42 ,
4908
+ Arc :: clone ( & keys_manager) ,
4909
+ Arc :: clone ( & keys_manager) ,
4910
+ Arc :: clone ( & tx_broadcaster) ,
4911
+ Arc :: clone ( & fee_estimator) ,
4912
+ ) ;
4913
+ let chain_source = test_utils:: TestChainSource :: new ( Network :: Testnet ) ;
4914
+ let async_chain_monitor = ChainMonitor :: new_async_beta (
4915
+ Some ( & chain_source) ,
4916
+ tx_broadcaster,
4917
+ logger,
4918
+ fee_estimator,
4919
+ native_async_persister,
4920
+ Arc :: clone ( & keys_manager) ,
4921
+ keys_manager. get_peer_storage_key ( ) ,
4922
+ ) ;
4923
+
4924
+ // Write the initial ChannelMonitor async, testing primarily that the `MonitorEvent::Completed`
4925
+ // isn't returned until the write is complted (via `complete_all_async_writes`) and the future
4926
+ // is `poll`ed (which a background spawn should do automatically in production, but which is
4927
+ // needed to get the future completion through to the `ChainMonitor`).
4928
+ let write_status = async_chain_monitor. watch_channel ( chan_id, monitor) . unwrap ( ) ;
4929
+ assert_eq ! ( write_status, ChannelMonitorUpdateStatus :: InProgress ) ;
4930
+
4931
+ // The write will remain pending until we call `complete_all_async_writes`, below.
4932
+ assert_eq ! ( persist_futures. 0 . lock( ) . unwrap( ) . len( ) , 1 ) ;
4933
+ persist_futures. poll_futures ( ) ;
4934
+ assert_eq ! ( persist_futures. 0 . lock( ) . unwrap( ) . len( ) , 1 ) ;
4935
+
4936
+ let funding_txo = OutPoint { txid : funding_tx. compute_txid ( ) , index : 0 } ;
4937
+ let key = MonitorName :: V1Channel ( funding_txo) . to_string ( ) ;
4938
+ let pending_writes = kv_store. list_pending_async_writes (
4939
+ CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
4940
+ CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
4941
+ & key,
4942
+ ) ;
4943
+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
4944
+
4945
+ // Once we complete the future, the write will still be pending until the future gets `poll`ed.
4946
+ kv_store. complete_all_async_writes ( ) ;
4947
+ assert_eq ! ( persist_futures. 0 . lock( ) . unwrap( ) . len( ) , 1 ) ;
4948
+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
4949
+
4950
+ assert_eq ! ( persist_futures. 0 . lock( ) . unwrap( ) . len( ) , 1 ) ;
4951
+ persist_futures. poll_futures ( ) ;
4952
+ assert_eq ! ( persist_futures. 0 . lock( ) . unwrap( ) . len( ) , 0 ) ;
4953
+
4954
+ let completed_persist = async_chain_monitor. release_pending_monitor_events ( ) ;
4955
+ assert_eq ! ( completed_persist. len( ) , 1 ) ;
4956
+ assert_eq ! ( completed_persist[ 0 ] . 2 . len( ) , 1 ) ;
4957
+ assert ! ( matches!( completed_persist[ 0 ] . 2 [ 0 ] , MonitorEvent :: Completed { .. } ) ) ;
4958
+
4959
+ // Now test two async `ChannelMonitorUpdate`s in flight at once, completing them in-order but
4960
+ // separately.
4961
+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 0 ] ) ;
4962
+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
4963
+
4964
+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 1 ] ) ;
4965
+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
4966
+
4967
+ persist_futures. poll_futures ( ) ;
4968
+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
4969
+
4970
+ let pending_writes = kv_store. list_pending_async_writes (
4971
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4972
+ & key,
4973
+ "1" ,
4974
+ ) ;
4975
+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
4976
+ let pending_writes = kv_store. list_pending_async_writes (
4977
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4978
+ & key,
4979
+ "2" ,
4980
+ ) ;
4981
+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
4982
+
4983
+ kv_store. complete_async_writes_through (
4984
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4985
+ & key,
4986
+ "1" ,
4987
+ usize:: MAX ,
4988
+ ) ;
4989
+ persist_futures. poll_futures ( ) ;
4990
+ // While the `ChainMonitor` could return a `MonitorEvent::Completed` here, it currently
4991
+ // doesn't. If that ever changes we should validate that the `Completed` event has the correct
4992
+ // `monitor_update_id` (1).
4993
+ assert ! ( async_chain_monitor. release_pending_monitor_events( ) . is_empty( ) ) ;
4994
+
4995
+ kv_store. complete_async_writes_through (
4996
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4997
+ & key,
4998
+ "2" ,
4999
+ usize:: MAX ,
5000
+ ) ;
5001
+ persist_futures. poll_futures ( ) ;
5002
+ let completed_persist = async_chain_monitor. release_pending_monitor_events ( ) ;
5003
+ assert_eq ! ( completed_persist. len( ) , 1 ) ;
5004
+ assert_eq ! ( completed_persist[ 0 ] . 2 . len( ) , 1 ) ;
5005
+ assert ! ( matches!( completed_persist[ 0 ] . 2 [ 0 ] , MonitorEvent :: Completed { .. } ) ) ;
5006
+
5007
+ // Finally, test two async `ChanelMonitorUpdate`s in flight at once, completing them
5008
+ // out-of-order and ensuring that no `MonitorEvent::Completed` is generated until they are both
5009
+ // completed (and that it marks both as completed when it is generated).
5010
+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 2 ] ) ;
5011
+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
5012
+
5013
+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 3 ] ) ;
5014
+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
5015
+
5016
+ persist_futures. poll_futures ( ) ;
5017
+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
5018
+
5019
+ let pending_writes = kv_store. list_pending_async_writes (
5020
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5021
+ & key,
5022
+ "3" ,
5023
+ ) ;
5024
+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
5025
+ let pending_writes = kv_store. list_pending_async_writes (
5026
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5027
+ & key,
5028
+ "4" ,
5029
+ ) ;
5030
+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
5031
+
5032
+ kv_store. complete_async_writes_through (
5033
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5034
+ & key,
5035
+ "4" ,
5036
+ usize:: MAX ,
5037
+ ) ;
5038
+ persist_futures. poll_futures ( ) ;
5039
+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
5040
+
5041
+ kv_store. complete_async_writes_through (
5042
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5043
+ & key,
5044
+ "3" ,
5045
+ usize:: MAX ,
5046
+ ) ;
5047
+ persist_futures. poll_futures ( ) ;
5048
+ let completed_persist = async_chain_monitor. release_pending_monitor_events ( ) ;
5049
+ assert_eq ! ( completed_persist. len( ) , 1 ) ;
5050
+ assert_eq ! ( completed_persist[ 0 ] . 2 . len( ) , 1 ) ;
5051
+ if let MonitorEvent :: Completed { monitor_update_id, .. } = & completed_persist[ 0 ] . 2 [ 0 ] {
5052
+ assert_eq ! ( * monitor_update_id, 4 ) ;
5053
+ } else {
5054
+ panic ! ( ) ;
5055
+ }
5056
+ }
0 commit comments