diff --git a/Cargo.lock b/Cargo.lock index 17076293..5f1e6623 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,20 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "acto" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a026259da4f1a13b4af60cda453c392de64c58c12d239c560923e0382f42f2b9" +dependencies = [ + "parking_lot", + "pin-project-lite", + "rustc_version", + "smol_str", + "tokio", + "tracing", +] + [[package]] name = "addr2line" version = "0.24.2" @@ -1678,6 +1692,7 @@ dependencies = [ "strum", "stun-rs", "surge-ping", + "swarm-discovery", "time", "tokio", "tokio-stream", @@ -3566,6 +3581,12 @@ dependencies = [ "serde", ] +[[package]] +name = "smol_str" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fad6c857cbab2627dcf01ec85a623ca4e7dcb5691cbaa3d7fb7653671f0d09c9" + [[package]] name = "snafu" version = "0.8.6" @@ -3720,6 +3741,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "swarm-discovery" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4eae338a4551897c6a50fa2c041c4b75f578962d9fca8adb828cf81f7158740f" +dependencies = [ + "acto", + "hickory-proto", + "rand 0.9.1", + "socket2", + "thiserror 2.0.12", + "tokio", + "tracing", +] + [[package]] name = "syn" version = "1.0.109" diff --git a/Cargo.toml b/Cargo.toml index 4dff26e0..f7ff49b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ testresult = "0.4.1" tracing-subscriber = { version = "0.3.19", features = ["fmt"] } tracing-test = "0.2.5" walkdir = "2.5.0" +iroh = { version = "0.90", features = ["discovery-local-network"]} [features] hide-proto-docs = [] diff --git a/examples/common/mod.rs b/examples/common/mod.rs new file mode 100644 index 00000000..c915d7ef --- /dev/null +++ b/examples/common/mod.rs @@ -0,0 +1,35 @@ +#![allow(dead_code)] +use anyhow::Result; +use iroh::SecretKey; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + +/// Gets a secret key from the IROH_SECRET environment variable or generates a new random one. +/// If the environment variable is set, it must be a valid string representation of a secret key. +pub fn get_or_generate_secret_key() -> Result { + use std::{env, str::FromStr}; + + use anyhow::Context; + use rand::thread_rng; + if let Ok(secret) = env::var("IROH_SECRET") { + // Parse the secret key from string + SecretKey::from_str(&secret).context("Invalid secret key format") + } else { + // Generate a new random key + let secret_key = SecretKey::generate(&mut thread_rng()); + println!( + "Generated new secret key: {}", + hex::encode(secret_key.to_bytes()) + ); + println!("To reuse this key, set the IROH_SECRET environment variable to this value"); + Ok(secret_key) + } +} + +// set the RUST_LOG env var to one of {debug,info,warn} to see logging info +pub fn setup_logging() { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) + .with(EnvFilter::from_default_env()) + .try_init() + .ok(); +} diff --git a/examples/custom-protocol.rs b/examples/custom-protocol.rs new file mode 100644 index 00000000..c021b7f0 --- /dev/null +++ b/examples/custom-protocol.rs @@ -0,0 +1,325 @@ +//! Example for adding a custom protocol to a iroh node. +//! +//! We are building a very simple custom protocol here, and make our iroh nodes speak this protocol +//! in addition to a protocol that is provider by number0, iroh-blobs. +//! +//! Our custom protocol allows querying the blob store of other nodes for text matches. For +//! this, we keep a very primitive index of the UTF-8 text of our blobs. +//! +//! The example is contrived - we only use memory nodes, and our database is a hashmap in a mutex, +//! and our queries just match if the query string appears as-is in a blob. +//! Nevertheless, this shows how powerful systems can be built with custom protocols by also using +//! the existing iroh protocols (blobs in this case). +//! +//! ## Usage +//! +//! In one terminal, run +//! +//! cargo run --example custom-protocol -- listen "hello-world" "foo-bar" "hello-moon" +//! +//! This spawns an iroh nodes with three blobs. It will print the node's node id. +//! +//! In another terminal, run +//! +//! cargo run --example custom-protocol -- query hello +//! +//! Replace with the node id from above. This will connect to the listening node with our +//! custom protocol and query for the string `hello`. The listening node will return a list of +//! blob hashes that contain `hello`. We will then download all these blobs with iroh-blobs, +//! and then print a list of the hashes with their content. +//! +//! For this example, this will print: +//! +//! 7b54d6be55: hello-moon +//! c92dabdf91: hello-world +//! +//! That's it! Follow along in the code below, we added a bunch of comments to explain things. + +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +use anyhow::Result; +use clap::Parser; +use iroh::{ + discovery::pkarr::PkarrResolver, + endpoint::Connection, + protocol::{AcceptError, ProtocolHandler, Router}, + Endpoint, NodeId, +}; +use iroh_blobs::{api::Store, store::mem::MemStore, BlobsProtocol, Hash}; +mod common; +use common::{get_or_generate_secret_key, setup_logging}; + +#[derive(Debug, Parser)] +pub struct Cli { + #[clap(subcommand)] + command: Command, +} + +#[derive(Debug, Parser)] +pub enum Command { + /// Spawn a node in listening mode. + Listen { + /// Each text string will be imported as a blob and inserted into the search database. + text: Vec, + }, + /// Query a remote node for data and print the results. + Query { + /// The node id of the node we want to query. + node_id: NodeId, + /// The text we want to match. + query: String, + }, +} + +/// Each custom protocol is identified by its ALPN string. +/// +/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake, +/// and the connection is aborted unless both nodes pass the same bytestring. +const ALPN: &[u8] = b"iroh-example/text-search/0"; + +async fn listen(text: Vec) -> Result<()> { + // allow the user to provide a secret so we can have a stable node id. + // This is only needed for the listen side. + let secret_key = get_or_generate_secret_key()?; + // Use an in-memory store for this example. You would use a persistent store in production code. + let store = MemStore::new(); + // Create an endpoint with the secret key and discovery publishing to the n0 dns server enabled. + let endpoint = Endpoint::builder() + .secret_key(secret_key) + .discovery_n0() + .bind() + .await?; + // Build our custom protocol handler. The `builder` exposes access to various subsystems in the + // iroh node. In our case, we need a blobs client and the endpoint. + let proto = BlobSearch::new(&store); + // Insert the text strings as blobs and index them. + for text in text.into_iter() { + proto.insert_and_index(text).await?; + } + // Build the iroh-blobs protocol handler, which is used to download blobs. + let blobs = BlobsProtocol::new(&store, endpoint.clone(), None); + + // create a router that handles both our custom protocol and the iroh-blobs protocol. + let node = Router::builder(endpoint) + .accept(ALPN, proto.clone()) + .accept(iroh_blobs::ALPN, blobs.clone()) + .spawn(); + + // Print our node id, so clients know how to connect to us. + let node_id = node.endpoint().node_id(); + println!("our node id: {node_id}"); + + // Wait for Ctrl-C to be pressed. + tokio::signal::ctrl_c().await?; + node.shutdown().await?; + Ok(()) +} + +async fn query(node_id: NodeId, query: String) -> Result<()> { + // Build a in-memory node. For production code, you'd want a persistent node instead usually. + let store = MemStore::new(); + // Create an endpoint with a random secret key and no discovery publishing. + // For a client we just need discovery resolution via the n0 dns server, which + // the PkarrResolver provides. + let endpoint = Endpoint::builder() + .add_discovery(PkarrResolver::n0_dns()) + .bind() + .await?; + // Query the remote node. + // This will send the query over our custom protocol, read hashes on the reply stream, + // and download each hash over iroh-blobs. + let hashes = query_remote(&endpoint, &store, node_id, &query).await?; + + // Print out our query results. + for hash in hashes { + read_and_print(&store, hash).await?; + } + + // Close the endpoint and shutdown the store. + // Shutting down the store is not needed for a memory store, but would be important for persistent stores + // to allow them to flush their data to disk. + endpoint.close().await; + store.shutdown().await?; + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<()> { + setup_logging(); + let args = Cli::parse(); + + match args.command { + Command::Listen { text } => { + listen(text).await?; + } + Command::Query { + node_id, + query: query_text, + } => { + query(node_id, query_text).await?; + } + } + + Ok(()) +} + +#[derive(Debug, Clone)] +struct BlobSearch { + blobs: Store, + index: Arc>>, +} + +impl ProtocolHandler for BlobSearch { + /// The `accept` method is called for each incoming connection for our ALPN. + /// + /// The returned future runs on a newly spawned tokio task, so it can run as long as + /// the connection lasts. + async fn accept(&self, connection: Connection) -> std::result::Result<(), AcceptError> { + let this = self.clone(); + // We can get the remote's node id from the connection. + let node_id = connection.remote_node_id()?; + println!("accepted connection from {node_id}"); + + // Our protocol is a simple request-response protocol, so we expect the + // connecting peer to open a single bi-directional stream. + let (mut send, mut recv) = connection.accept_bi().await?; + + // We read the query from the receive stream, while enforcing a max query length. + let query_bytes = recv.read_to_end(64).await.map_err(AcceptError::from_err)?; + + // Now, we can perform the actual query on our local database. + let query = String::from_utf8(query_bytes).map_err(AcceptError::from_err)?; + let hashes = this.query_local(&query); + println!("query: {query}, found {} results", hashes.len()); + + // We want to return a list of hashes. We do the simplest thing possible, and just send + // one hash after the other. Because the hashes have a fixed size of 32 bytes, this is + // very easy to parse on the other end. + for hash in hashes { + send.write_all(hash.as_bytes()) + .await + .map_err(AcceptError::from_err)?; + } + + // By calling `finish` on the send stream we signal that we will not send anything + // further, which makes the receive stream on the other end terminate. + send.finish()?; + connection.closed().await; + Ok(()) + } +} + +impl BlobSearch { + /// Create a new protocol handler. + pub fn new(blobs: &Store) -> Arc { + Arc::new(Self { + blobs: blobs.clone(), + index: Default::default(), + }) + } + + /// Query the local database. + /// + /// Returns the list of hashes of blobs which contain `query` literally. + pub fn query_local(&self, query: &str) -> Vec { + let db = self.index.lock().unwrap(); + db.iter() + .filter_map(|(text, hash)| text.contains(query).then_some(*hash)) + .collect::>() + } + + /// Insert a text string into the database. + /// + /// This first imports the text as a blob into the iroh blob store, and then inserts a + /// reference to that hash in our (primitive) text database. + pub async fn insert_and_index(&self, text: String) -> Result { + let hash = self.blobs.add_bytes(text.into_bytes()).await?.hash; + self.add_to_index(hash).await?; + Ok(hash) + } + + /// Index a blob which is already in our blob store. + /// + /// This only indexes complete blobs that are smaller than 1MiB. + /// + /// Returns `true` if the blob was indexed. + async fn add_to_index(&self, hash: Hash) -> Result { + let bitfield = self.blobs.observe(hash).await?; + if !bitfield.is_complete() || bitfield.size() > 1024 * 1024 { + // If the blob is not complete or too large, we do not index it. + return Ok(false); + } + let data = self.blobs.get_bytes(hash).await?; + match String::from_utf8(data.to_vec()) { + Ok(text) => { + let mut db = self.index.lock().unwrap(); + db.insert(text, hash); + Ok(true) + } + Err(_err) => Ok(false), + } + } +} + +/// Query a remote node, download all matching blobs and print the results. +pub async fn query_remote( + endpoint: &Endpoint, + store: &Store, + node_id: NodeId, + query: &str, +) -> Result> { + // Establish a connection to our node. + // We use the default node discovery in iroh, so we can connect by node id without + // providing further information. + let conn = endpoint.connect(node_id, ALPN).await?; + let blobs_conn = endpoint.connect(node_id, iroh_blobs::ALPN).await?; + + // Open a bi-directional in our connection. + let (mut send, mut recv) = conn.open_bi().await?; + + // Send our query. + send.write_all(query.as_bytes()).await?; + + // Finish the send stream, signalling that no further data will be sent. + // This makes the `read_to_end` call on the accepting side terminate. + send.finish()?; + + // In this example, we simply collect all results into a vector. + // For real protocols, you'd usually want to return a stream of results instead. + let mut out = vec![]; + + // The response is sent as a list of 32-byte long hashes. + // We simply read one after the other into a byte buffer. + let mut hash_bytes = [0u8; 32]; + loop { + // Read 32 bytes from the stream. + match recv.read_exact(&mut hash_bytes).await { + // FinishedEarly means that the remote side did not send further data, + // so in this case we break our loop. + Err(iroh::endpoint::ReadExactError::FinishedEarly(_)) => break, + // Other errors are connection errors, so we bail. + Err(err) => return Err(err.into()), + Ok(_) => {} + }; + // Upcast the raw bytes to the `Hash` type. + let hash = Hash::from_bytes(hash_bytes); + // Download the content via iroh-blobs. + store.remote().fetch(blobs_conn.clone(), hash).await?; + out.push(hash); + } + conn.close(0u32.into(), b"done"); + blobs_conn.close(0u32.into(), b"done"); + Ok(out) +} + +/// Read a blob from the local blob store and print it to STDOUT. +async fn read_and_print(store: &Store, hash: Hash) -> Result<()> { + let content = store.get_bytes(hash).await?; + let message = String::from_utf8(content.to_vec())?; + println!("{}: {message}", hash.fmt_short()); + Ok(()) +} diff --git a/examples/get-blob.rs b/examples/get-blob.rs new file mode 100644 index 00000000..0c6ea135 --- /dev/null +++ b/examples/get-blob.rs @@ -0,0 +1,73 @@ +/// Example how to request a blob from a remote node without using a store. +mod common; +use bao_tree::io::BaoContentItem; +use clap::Parser; +use common::setup_logging; +use iroh::discovery::pkarr::PkarrResolver; +use iroh_blobs::{get::request::GetBlobItem, ticket::BlobTicket, BlobFormat}; +use n0_future::StreamExt; +use tokio::io::AsyncWriteExt; + +#[derive(Debug, Parser)] +#[command(version, about)] +pub struct Cli { + /// Ticket describing the content to fetch and the node to fetch it from + /// + /// This example only supports raw blobs. + ticket: BlobTicket, + /// True to print data as it arrives, false to complete the download and then + /// print the data. Defaults to true. + /// + /// Note that setting progress to false can lead to an out-of-memory error + /// for very large blobs. + #[arg(long, default_value = "true")] + progress: bool, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + setup_logging(); + let cli = Cli::parse(); + let ticket = cli.ticket; + let endpoint = iroh::Endpoint::builder() + .discovery(PkarrResolver::n0_dns()) + .bind() + .await?; + anyhow::ensure!( + ticket.format() == BlobFormat::Raw, + "This example only supports raw blobs." + ); + let connection = endpoint + .connect(ticket.node_addr().node_id, iroh_blobs::ALPN) + .await?; + let mut progress = iroh_blobs::get::request::get_blob(connection, ticket.hash()); + let stats = if cli.progress { + loop { + match progress.next().await { + Some(GetBlobItem::Item(item)) => match item { + BaoContentItem::Leaf(leaf) => { + tokio::io::stdout().write_all(&leaf.data).await?; + } + BaoContentItem::Parent(parent) => { + tracing::info!("Parent: {parent:?}"); + } + }, + Some(GetBlobItem::Done(stats)) => { + break stats; + } + Some(GetBlobItem::Error(err)) => { + anyhow::bail!("Error while streaming blob: {err}"); + } + None => { + anyhow::bail!("Stream ended unexpectedly."); + } + } + } + } else { + let (bytes, stats) = progress.bytes_and_stats().await?; + tokio::io::stdout().write_all(&bytes).await?; + stats + }; + tracing::info!("Stream done with stats: {stats:?}"); + Ok(()) +} diff --git a/examples/mdns-discovery.rs b/examples/mdns-discovery.rs new file mode 100644 index 00000000..b42f88f4 --- /dev/null +++ b/examples/mdns-discovery.rs @@ -0,0 +1,144 @@ +//! Example that runs an iroh node with local node discovery and no relay server. +//! +//! You can think of this as a local version of [sendme](https://www.iroh.computer/sendme) +//! that only works for individual files. +//! +//! **This example is using a non-default feature of iroh, so you need to run it with the +//! examples feature enabled.** +//! +//! Run the follow command to run the "accept" side, that hosts the content: +//! $ cargo run --example mdns-discovery --features examples -- accept [FILE_PATH] +//! Wait for output that looks like the following: +//! $ cargo run --example mdns-discovery --features examples -- connect [NODE_ID] [HASH] -o [FILE_PATH] +//! Run that command on another machine in the same local network, replacing [FILE_PATH] to the path on which you want to save the transferred content. +use std::path::{Path, PathBuf}; + +use anyhow::{ensure, Result}; +use clap::{Parser, Subcommand}; +use iroh::{ + discovery::mdns::MdnsDiscovery, protocol::Router, Endpoint, PublicKey, RelayMode, SecretKey, +}; +use iroh_blobs::{store::mem::MemStore, BlobsProtocol, Hash}; + +mod common; +use common::{get_or_generate_secret_key, setup_logging}; + +#[derive(Debug, Parser)] +#[command(version, about)] +pub struct Cli { + #[clap(subcommand)] + command: Commands, +} + +#[derive(Subcommand, Clone, Debug)] +pub enum Commands { + /// Launch an iroh node and provide the content at the given path + Accept { + /// path to the file you want to provide + path: PathBuf, + }, + /// Get the node_id and hash string from a node running accept in the local network + /// Download the content from that node. + Connect { + /// Node ID of a node on the local network + node_id: PublicKey, + /// Hash of content you want to download from the node + hash: Hash, + /// save the content to a file + #[clap(long, short)] + out: Option, + }, +} + +async fn accept(path: &Path) -> Result<()> { + if !path.is_file() { + println!("Content must be a file."); + return Ok(()); + } + + let key = get_or_generate_secret_key()?; + + println!("Starting iroh node with mdns discovery..."); + // create a new node + let endpoint = Endpoint::builder() + .secret_key(key) + .add_discovery(MdnsDiscovery::builder()) + .relay_mode(RelayMode::Disabled) + .bind() + .await?; + let builder = Router::builder(endpoint.clone()); + let store = MemStore::new(); + let blobs = BlobsProtocol::new(&store, endpoint.clone(), None); + let builder = builder.accept(iroh_blobs::ALPN, blobs.clone()); + let node = builder.spawn(); + + if !path.is_file() { + println!("Content must be a file."); + node.shutdown().await?; + return Ok(()); + } + let absolute = path.canonicalize()?; + println!("Adding {} as {}...", path.display(), absolute.display()); + let tag = store.add_path(absolute).await?; + println!("To fetch the blob:\n\tcargo run --example mdns-discovery --features examples -- connect {} {} -o [FILE_PATH]", node.endpoint().node_id(), tag.hash); + tokio::signal::ctrl_c().await?; + node.shutdown().await?; + Ok(()) +} + +async fn connect(node_id: PublicKey, hash: Hash, out: Option) -> Result<()> { + let key = SecretKey::generate(rand::rngs::OsRng); + // todo: disable discovery publishing once https://github.com/n0-computer/iroh/issues/3401 is implemented + let discovery = MdnsDiscovery::builder(); + + println!("Starting iroh node with mdns discovery..."); + // create a new node + let endpoint = Endpoint::builder() + .secret_key(key) + .add_discovery(discovery) + .relay_mode(RelayMode::Disabled) + .bind() + .await?; + let store = MemStore::new(); + + println!("NodeID: {}", endpoint.node_id()); + let conn = endpoint.connect(node_id, iroh_blobs::ALPN).await?; + let stats = store.remote().fetch(conn, hash).await?; + println!( + "Fetched {} bytes for hash {}", + stats.payload_bytes_read, hash + ); + if let Some(path) = out { + let absolute = std::env::current_dir()?.join(&path); + ensure!(!absolute.is_dir(), "output must not be a directory"); + println!( + "exporting {hash} to {} -> {}", + path.display(), + absolute.display() + ); + let size = store.export(hash, absolute).await?; + println!("Exported {size} bytes"); + } + + endpoint.close().await; + // Shutdown the store. This is not needed for the mem store, but would be + // necessary for a persistent store to allow it to write any pending data to disk. + store.shutdown().await?; + Ok(()) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + setup_logging(); + let cli = Cli::parse(); + + match &cli.command { + Commands::Accept { path } => { + accept(path).await?; + } + Commands::Connect { node_id, hash, out } => { + connect(*node_id, *hash, out.clone()).await?; + } + } + Ok(()) +} diff --git a/examples/request.rs b/examples/request.rs deleted file mode 100644 index 3239eee8..00000000 --- a/examples/request.rs +++ /dev/null @@ -1,4 +0,0 @@ -#[tokio::main] -async fn main() -> anyhow::Result<()> { - Ok(()) -}