@@ -17,12 +17,42 @@ use crate::proto::{
17
17
MESSAGE_TYPE_DATA , MESSAGE_TYPE_RESPONSE ,
18
18
} ;
19
19
20
- pub type MessageSender = mpsc:: Sender < GenMessage > ;
21
- pub type MessageReceiver = mpsc:: Receiver < GenMessage > ;
20
+ pub type MessageSender = mpsc:: Sender < SendingMessage > ;
21
+ pub type MessageReceiver = mpsc:: Receiver < SendingMessage > ;
22
22
23
23
pub type ResultSender = mpsc:: Sender < Result < GenMessage > > ;
24
24
pub type ResultReceiver = mpsc:: Receiver < Result < GenMessage > > ;
25
25
26
+ #[ derive( Debug ) ]
27
+ pub struct SendingMessage {
28
+ pub msg : GenMessage ,
29
+ pub result_chan : Option < tokio:: sync:: oneshot:: Sender < Result < ( ) > > > ,
30
+ }
31
+
32
+ impl SendingMessage {
33
+ pub fn new ( msg : GenMessage ) -> Self {
34
+ Self {
35
+ msg,
36
+ result_chan : None ,
37
+ }
38
+ }
39
+ pub fn new_with_result (
40
+ msg : GenMessage ,
41
+ result_chan : tokio:: sync:: oneshot:: Sender < Result < ( ) > > ,
42
+ ) -> Self {
43
+ Self {
44
+ msg,
45
+ result_chan : Some ( result_chan) ,
46
+ }
47
+ }
48
+
49
+ pub fn send_result ( & mut self , result : Result < ( ) > ) {
50
+ if let Some ( result_ch) = self . result_chan . take ( ) {
51
+ result_ch. send ( result) . unwrap_or_default ( ) ;
52
+ }
53
+ }
54
+ }
55
+
26
56
#[ derive( Debug ) ]
27
57
pub struct ClientStream < Q , P > {
28
58
tx : CSSender < Q > ,
@@ -317,9 +347,13 @@ async fn _recv(rx: &mut ResultReceiver) -> Result<GenMessage> {
317
347
}
318
348
319
349
async fn _send ( tx : & MessageSender , msg : GenMessage ) -> Result < ( ) > {
320
- tx. send ( msg)
350
+ let ( res_tx, res_rx) = tokio:: sync:: oneshot:: channel ( ) ;
351
+ tx. send ( SendingMessage :: new_with_result ( msg, res_tx) )
352
+ . await
353
+ . map_err ( |e| Error :: Others ( format ! ( "Send data packet to sender error {:?}" , e) ) ) ?;
354
+ res_rx
321
355
. await
322
- . map_err ( |e| Error :: Others ( format ! ( "Send data packet to sender error {e :?}" ) ) )
356
+ . map_err ( |e| Error :: Others ( format ! ( "Failed to wait send result { :?}" , e ) ) ) ?
323
357
}
324
358
325
359
#[ derive( Clone , Copy , Debug , PartialEq , Eq ) ]
0 commit comments