Skip to content

Commit 290eb42

Browse files
authoredNov 15, 2024··
Merge pull request #11 from n0-computer/mem-rpc-client
feat: Add a lazily initialized in mem client for ffi etc.
2 parents 4c7b8e7 + 5ce5a03 commit 290eb42

File tree

8 files changed

+67
-12
lines changed

8 files changed

+67
-12
lines changed
 

‎Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ oneshot = "0.1.8"
3636
parking_lot = { version = "0.12.1", optional = true }
3737
portable-atomic = { version = "1", optional = true }
3838
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
39-
quic-rpc = { version = "0.15.0", optional = true }
39+
quic-rpc = { version = "0.15.1", optional = true }
4040
quic-rpc-derive = { version = "0.15.0", optional = true }
4141
quinn = { package = "iroh-quinn", version = "0.12", features = ["ring"] }
4242
rand = "0.8"
@@ -130,3 +130,4 @@ iroh-router = { git = "https://github.com/n0-computer/iroh", branch = "main" }
130130
iroh-net = { git = "https://github.com/n0-computer/iroh", branch = "main" }
131131
iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" }
132132
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
133+

‎src/downloader.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,6 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
645645
}
646646

647647
/// Handle receiving a [`Message`].
648-
///
649648
// This is called in the actor loop, and only async because subscribing to an existing transfer
650649
// sends the initial state.
651650
async fn handle_message(&mut self, msg: Message) {

‎src/net_protocol.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
// TODO: reduce API surface and add documentation
44
#![allow(missing_docs)]
55

6-
use std::{collections::BTreeMap, sync::Arc};
6+
use std::{
7+
collections::BTreeMap,
8+
sync::{Arc, OnceLock},
9+
};
710

811
use anyhow::{anyhow, Result};
912
use futures_lite::future::Boxed as BoxedFuture;
@@ -36,6 +39,8 @@ pub struct Blobs<S> {
3639
downloader: Downloader,
3740
batches: tokio::sync::Mutex<BlobBatches>,
3841
endpoint: Endpoint,
42+
#[cfg(feature = "rpc")]
43+
pub(crate) rpc_handler: Arc<OnceLock<crate::rpc::RpcHandler>>,
3944
}
4045

4146
/// Name used for logging when new node addresses are added from gossip.
@@ -107,6 +112,8 @@ impl<S: crate::store::Store> Blobs<S> {
107112
downloader,
108113
endpoint,
109114
batches: Default::default(),
115+
#[cfg(feature = "rpc")]
116+
rpc_handler: Arc::new(OnceLock::new()),
110117
}
111118
}
112119

‎src/protocol.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@
148148
//! # use bao_tree::{ChunkNum, ChunkRanges};
149149
//! # use iroh_blobs::protocol::{GetRequest, RangeSpecSeq};
150150
//! # let hash: iroh_blobs::Hash = [0; 32].into();
151-
//! let ranges = &ChunkRanges::from(..ChunkNum(10)) | &ChunkRanges::from(ChunkNum(100)..ChunkNum(110));
151+
//! let ranges =
152+
//! &ChunkRanges::from(..ChunkNum(10)) | &ChunkRanges::from(ChunkNum(100)..ChunkNum(110));
152153
//! let spec = RangeSpecSeq::from_ranges([ranges]);
153154
//! let request = GetRequest::new(hash, spec);
154155
//! ```
@@ -236,8 +237,8 @@
236237
//! # use iroh_blobs::protocol::{GetRequest, RangeSpecSeq};
237238
//! # let hash: iroh_blobs::Hash = [0; 32].into();
238239
//! let spec = RangeSpecSeq::from_ranges_infinite([
239-
//! ChunkRanges::all(), // the collection itself
240-
//! ChunkRanges::from(..ChunkNum(1)), // the first chunk of each child
240+
//! ChunkRanges::all(), // the collection itself
241+
//! ChunkRanges::from(..ChunkNum(1)), // the first chunk of each child
241242
//! ]);
242243
//! let request = GetRequest::new(hash, spec);
243244
//! ```
@@ -252,9 +253,9 @@
252253
//! # use iroh_blobs::protocol::{GetRequest, RangeSpecSeq};
253254
//! # let hash: iroh_blobs::Hash = [0; 32].into();
254255
//! let spec = RangeSpecSeq::from_ranges([
255-
//! ChunkRanges::empty(), // we don't need the collection itself
256-
//! ChunkRanges::empty(), // we don't need the first child either
257-
//! ChunkRanges::all(), // we need the second child completely
256+
//! ChunkRanges::empty(), // we don't need the collection itself
257+
//! ChunkRanges::empty(), // we don't need the first child either
258+
//! ChunkRanges::all(), // we need the second child completely
258259
//! ]);
259260
//! let request = GetRequest::new(hash, spec);
260261
//! ```

‎src/rpc.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use std::{
77

88
use anyhow::anyhow;
99
use client::{
10-
blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, WrapOption},
10+
blobs::{self, BlobInfo, BlobStatus, IncompleteBlobInfo, WrapOption},
1111
tags::TagInfo,
12+
MemConnector,
1213
};
1314
use futures_buffered::BufferedStreamExt;
1415
use futures_lite::StreamExt;
@@ -32,7 +33,11 @@ use proto::{
3233
},
3334
Request, RpcError, RpcResult, RpcService,
3435
};
35-
use quic_rpc::server::{ChannelTypes, RpcChannel, RpcServerError};
36+
use quic_rpc::{
37+
server::{ChannelTypes, RpcChannel, RpcServerError},
38+
RpcClient, RpcServer,
39+
};
40+
use tokio_util::task::AbortOnDropHandle;
3641

3742
use crate::{
3843
export::ExportProgress,
@@ -56,6 +61,16 @@ const RPC_BLOB_GET_CHUNK_SIZE: usize = 1024 * 64;
5661
const RPC_BLOB_GET_CHANNEL_CAP: usize = 2;
5762

5863
impl<D: crate::store::Store> Blobs<D> {
64+
/// Get a client for the blobs protocol
65+
pub fn client(self: Arc<Self>) -> blobs::MemClient {
66+
let client = self
67+
.rpc_handler
68+
.get_or_init(|| RpcHandler::new(&self))
69+
.client
70+
.clone();
71+
blobs::Client::new(client)
72+
}
73+
5974
/// Handle an RPC request
6075
pub async fn handle_rpc_request<C>(
6176
self: Arc<Self>,
@@ -871,3 +886,23 @@ impl<D: crate::store::Store> Blobs<D> {
871886
Ok(CreateCollectionResponse { hash, tag })
872887
}
873888
}
889+
890+
#[derive(Debug)]
891+
pub(crate) struct RpcHandler {
892+
/// Client to hand out
893+
client: RpcClient<RpcService, MemConnector>,
894+
/// Handler task
895+
_handler: AbortOnDropHandle<()>,
896+
}
897+
898+
impl RpcHandler {
899+
fn new<D: crate::store::Store>(blobs: &Arc<Blobs<D>>) -> Self {
900+
let blobs = blobs.clone();
901+
let (listener, connector) = quic_rpc::transport::flume::channel(1);
902+
let listener = RpcServer::new(listener);
903+
let client = RpcClient::new(connector);
904+
let _handler = listener
905+
.spawn_accept_loop(move |req, chan| blobs.clone().handle_rpc_request(req, chan));
906+
Self { client, _handler }
907+
}
908+
}

‎src/rpc/client.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
//! Iroh blobs and tags client
22
use anyhow::Result;
33
use futures_util::{Stream, StreamExt};
4+
use quic_rpc::transport::flume::FlumeConnector;
45

56
pub mod blobs;
67
pub mod tags;
78

9+
/// Type alias for a memory-backed client.
10+
pub(crate) type MemConnector =
11+
FlumeConnector<crate::rpc::proto::Response, crate::rpc::proto::Request>;
12+
813
fn flatten<T, E1, E2>(
914
s: impl Stream<Item = Result<Result<T, E1>, E2>>,
1015
) -> impl Stream<Item = Result<T>>

‎src/rpc/client/blobs.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ pub struct Client<C = BoxedConnector<RpcService>> {
111111
pub(super) rpc: RpcClient<RpcService, C>,
112112
}
113113

114+
/// Type alias for a memory-backed client.
115+
pub type MemClient = Client<crate::rpc::MemConnector>;
116+
114117
impl<C> Client<C>
115118
where
116119
C: Connector<RpcService>,
@@ -120,6 +123,11 @@ where
120123
Self { rpc }
121124
}
122125

126+
/// Get a tags client.
127+
pub fn tags(&self) -> tags::Client<C> {
128+
tags::Client::new(self.rpc.clone())
129+
}
130+
123131
/// Check if a blob is completely stored on the node.
124132
///
125133
/// Note that this will return false for blobs that are partially stored on

‎src/util/fs.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,6 @@ pub struct PathContent {
179179
}
180180

181181
/// Walks the directory to get the total size and number of files in directory or file
182-
///
183182
// TODO: possible combine with `scan_dir`
184183
pub fn path_content_info(path: impl AsRef<Path>) -> anyhow::Result<PathContent> {
185184
path_content_info0(path)

0 commit comments

Comments
 (0)
Please sign in to comment.