Skip to content

feat(net): add Magicsock::network_change #1845

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions iroh-net/src/magic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,20 @@ impl MagicEndpoint {
Ok(())
}

/// Call to notify the system of potential network changes.
///
/// On many systems iroh is able to detect network changes by itself, however
/// some systems like android do not expose this functionality to native code.
/// Android does however provide this functionality to Java code. This
/// function allows for notifying iroh of any potential network changes like
/// this.
///
/// Even when the network did not change, or iroh was already able to detect
/// the network change itself, there is no harm in calling this function.
pub async fn network_change(&self) {
self.msock.network_change().await;
}

#[cfg(test)]
pub(crate) fn magic_sock(&self) -> &MagicSock {
&self.msock
Expand Down
66 changes: 45 additions & 21 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,7 @@ impl MagicSock {
});

let inner2 = inner.clone();
let network_monitor = netmon::Monitor::new().await?;
let main_actor_task = tokio::task::spawn(
async move {
let actor = Actor {
Expand All @@ -1272,6 +1273,7 @@ impl MagicSock {
no_v4_send: false,
net_checker,
udp_disco_sender_task,
network_monitor,
};

if let Err(err) = actor.run().await {
Expand Down Expand Up @@ -1424,6 +1426,15 @@ impl MagicSock {
pub fn discovery(&self) -> Option<&dyn Discovery> {
self.inner.discovery.as_ref().map(Box::as_ref)
}

/// Call to notify the system of potential network changes.
pub async fn network_change(&self) {
self.inner
.actor_sender
.send(ActorMessage::NetworkChange)
.await
.ok();
}
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -1551,6 +1562,7 @@ enum ActorMessage {
ReceiveDerp(DerpReadResult),
EndpointPingExpired(usize, stun::TransactionId),
NetcheckReport(Result<Option<Arc<netcheck::Report>>>, &'static str),
NetworkChange,
}

struct Actor {
Expand Down Expand Up @@ -1584,34 +1596,20 @@ struct Actor {

/// Task that sends disco messages over UDP.
udp_disco_sender_task: tokio::task::JoinHandle<()>,

network_monitor: netmon::Monitor,
}

impl Actor {
async fn run(mut self) -> Result<()> {
// Setup network monitoring
let monitor = netmon::Monitor::new().await?;
let inner = self.inner.clone();
let _token = monitor
let (link_change_s, mut link_change_r) = mpsc::channel(8);
let _token = self
.network_monitor
.subscribe(move |is_major| {
let inner = inner.clone();
let link_change_s = link_change_s.clone();
async move {
info!("link change detected: major? {}", is_major);

// Clear DNS cache
DNS_RESOLVER.clear_cache();

if is_major {
let (s, r) = sync::oneshot::channel();
inner.re_stun("link-change-major");
inner
.actor_sender
.send(ActorMessage::RebindAll(s))
.await
.ok();
r.await.ok();
} else {
inner.re_stun("link-change-minor");
}
link_change_s.send(is_major).await.ok();
}
.boxed()
})
Expand Down Expand Up @@ -1675,13 +1673,36 @@ impl Actor {
Err(e) => debug!(%e, "failed to persist known nodes"),
}
}
Some(is_major) = link_change_r.recv() => {
self.handle_network_change(is_major).await;
}
else => {
trace!("tick: other");
}
}
}
}

async fn handle_network_change(&mut self, is_major: bool) {
info!("link change detected: major? {}", is_major);

if is_major {
// Clear DNS cache
DNS_RESOLVER.clear_cache();

let (s, r) = sync::oneshot::channel();
self.inner.re_stun("link-change-major");
self.inner
.actor_sender
.send(ActorMessage::RebindAll(s))
.await
.ok();
r.await.ok();
} else {
self.inner.re_stun("link-change-minor");
}
}

async fn handle_ping_actions(&mut self, mut msgs: Vec<PingAction>) {
if msgs.is_empty() {
return;
Expand Down Expand Up @@ -1780,6 +1801,9 @@ impl Actor {
}
self.finalize_endpoints_update(why);
}
ActorMessage::NetworkChange => {
self.network_monitor.network_change().await.ok();
}
}

false
Expand Down
6 changes: 6 additions & 0 deletions iroh-net/src/net/netmon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ impl Monitor {
r.await?;
Ok(())
}

/// Potential change detected outside
pub async fn network_change(&self) -> Result<()> {
self.actor_tx.send(ActorMessage::NetworkChange).await?;
Ok(())
}
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions iroh-net/src/net/netmon/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub(super) type Callback = Box<dyn Fn(bool) -> BoxFuture<'static, ()> + Sync + S
pub(super) enum ActorMessage {
Subscribe(Callback, oneshot::Sender<CallbackToken>),
Unsubscribe(CallbackToken, oneshot::Sender<()>),
NetworkChange,
}

impl Actor {
Expand Down Expand Up @@ -143,6 +144,11 @@ impl Actor {
self.callbacks.remove(&token);
s.send(()).ok();
}
ActorMessage::NetworkChange => {
trace!("external network activitiy detected");
last_event.replace(false);
debounce_interval.reset_immediately();
}
},
else => {
break;
Expand Down