From 1fdb5e5cf2afa3c18220df3b125b5de9375e9902 Mon Sep 17 00:00:00 2001 From: Dorian Date: Sun, 19 Apr 2026 01:20:44 -0400 Subject: [PATCH] feat(federation): route state-sync / invites / notifications via FIPS first MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every federation peer-to-peer call now prefers FIPS (direct ULA dial over `fips0`, ~LAN latency) and falls back to Tor only on network failure. Per-method ed25519 signatures are preserved on both transports so authenticity doesn't change. - fips::dial::PeerRequest — fluent builder that owns transport selection. Returns the Response plus the TransportKind that carried it, so handlers can log or expose which path was used. - fips::dial::is_service_active — free-standing async probe used by migration sites (the transport::fips::is_available cache is keyed to a `&self`, not usable from static contexts). - federation/sync.rs: sync_with_peer + deploy_to_peer drop the hand-rolled reqwest::Proxy dance, call PeerRequest instead. - federation/invites.rs: notify_join takes the remote's fips_npub (already parsed out of the invite code since v1.4) and dials over FIPS when available. The "peer-joined" signature domain is unchanged. - api/rpc/federation/handlers.rs: DID rotation broadcast loops over federated peers through PeerRequest; the per-peer result payload gains a `transport` field so the UI can surface mesh vs. onion. - api/rpc/tor/mod.rs: onion-address-change propagation is now the most useful FIPS-first call — fips_npub is stable across onion rotation, so peers get the new address even when the old onion is already dead. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/api/rpc/federation/handlers.rs | 37 ++-- core/archipelago/src/api/rpc/tor/mod.rs | 37 ++-- core/archipelago/src/federation/invites.rs | 26 +-- core/archipelago/src/federation/sync.rs | 65 ++---- core/archipelago/src/fips/dial.rs | 188 ++++++++++++++++++ 5 files changed, 248 insertions(+), 105 deletions(-) diff --git a/core/archipelago/src/api/rpc/federation/handlers.rs b/core/archipelago/src/api/rpc/federation/handlers.rs index 9e728498..e01f10f4 100644 --- a/core/archipelago/src/api/rpc/federation/handlers.rs +++ b/core/archipelago/src/api/rpc/federation/handlers.rs @@ -6,7 +6,7 @@ use crate::identity; use crate::mesh; use crate::network::dwn_store::DwnStore; use crate::nostr_handshake; -use anyhow::{Context, Result}; +use anyhow::Result; use tracing::{debug, info, warn}; const FEDERATION_PROTOCOL: &str = "https://archipelago.dev/protocols/federation/v1"; @@ -630,14 +630,6 @@ impl RpcHandler { let nodes = federation::load_nodes(&self.config.data_dir).await?; - let proxy = - reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?; - let client = reqwest::Client::builder() - .proxy(proxy) - .timeout(std::time::Duration::from_secs(30)) - .build() - .context("Failed to build HTTP client")?; - let mut notified = 0u32; let mut failed = 0u32; let mut results = Vec::new(); @@ -648,13 +640,6 @@ impl RpcHandler { continue; } - let host = if node.onion.ends_with(".onion") { - node.onion.clone() - } else { - format!("{}.onion", node.onion) - }; - let url = format!("http://{}/rpc/v1", host); - let body = serde_json::json!({ "method": "federation.peer-did-changed", "params": { @@ -666,23 +651,31 @@ impl RpcHandler { } }); - match client.post(&url).json(&body).send().await { - Ok(resp) if resp.status().is_success() => { + let req = crate::fips::dial::PeerRequest::new( + node.fips_npub.as_deref(), + &node.onion, + "/rpc/v1", + ) + .timeout(std::time::Duration::from_secs(30)); + + match req.send_json(&body).await { + Ok((resp, transport)) if resp.status().is_success() => { notified += 1; results.push(serde_json::json!({ "did": node.did, "status": "ok", + "transport": transport.to_string(), })); - info!(peer_did = %node.did, "Notified peer of DID rotation"); + info!(peer_did = %node.did, transport = %transport, "Notified peer of DID rotation"); } - Ok(resp) => { + Ok((resp, transport)) => { failed += 1; results.push(serde_json::json!({ "did": node.did, "status": "error", - "error": format!("Peer returned {}", resp.status()), + "error": format!("Peer returned {} (via {})", resp.status(), transport), })); - warn!(peer_did = %node.did, status = %resp.status(), "Peer rejected DID rotation notification"); + warn!(peer_did = %node.did, status = %resp.status(), transport = %transport, "Peer rejected DID rotation notification"); } Err(e) => { failed += 1; diff --git a/core/archipelago/src/api/rpc/tor/mod.rs b/core/archipelago/src/api/rpc/tor/mod.rs index 1ff1e8ad..873a3646 100644 --- a/core/archipelago/src/api/rpc/tor/mod.rs +++ b/core/archipelago/src/api/rpc/tor/mod.rs @@ -417,11 +417,13 @@ pub(super) async fn notify_federation_peers_address_change( return; } }; - let proxy = tor_proxy.unwrap_or("127.0.0.1:9050"); + // `tor_proxy` is retained for API compat but unused — the FIPS + // fallback dial uses constants::TOR_SOCKS_PROXY internally. + let _ = tor_proxy; match federation::load_nodes(data_dir).await { Ok(peers) => { for peer in peers { - if peer.onion.is_empty() { + if peer.onion.is_empty() && peer.fips_npub.is_none() { continue; } let payload = serde_json::json!({ @@ -432,24 +434,19 @@ pub(super) async fn notify_federation_peers_address_change( "old_onion": old_onion, } }); - let url = format!("http://{}/rpc/v1", &peer.onion); - let client = match reqwest::Client::builder() - .proxy( - match reqwest::Proxy::all(format!("socks5h://{}", proxy)).or_else( - |_| reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY), - ) { - Ok(p) => p, - Err(_) => continue, - }, - ) - .timeout(std::time::Duration::from_secs(30)) - .build() - { - Ok(c) => c, - Err(_) => continue, - }; - match client.post(&url).json(&payload).send().await { - Ok(_) => info!(peer_did = %peer.did, "Notified peer of address change"), + // FIPS-preferred: peer's fips_npub is stable across + // onion rotation, so this notification reaches them + // even when their (or our) old onion is now stale. + let req = crate::fips::dial::PeerRequest::new( + peer.fips_npub.as_deref(), + &peer.onion, + "/rpc/v1", + ) + .timeout(std::time::Duration::from_secs(30)); + match req.send_json(&payload).await { + Ok((_, transport)) => { + info!(peer_did = %peer.did, transport = %transport, "Notified peer of address change") + } Err(e) => warn!(peer_did = %peer.did, "Failed to notify peer: {}", e), } } diff --git a/core/archipelago/src/federation/invites.rs b/core/archipelago/src/federation/invites.rs index 57148a66..4291fec2 100644 --- a/core/archipelago/src/federation/invites.rs +++ b/core/archipelago/src/federation/invites.rs @@ -180,9 +180,10 @@ pub async fn accept_invite( }); save_invites(data_dir, &invites).await?; - // Notify remote node (best-effort over Tor) + // Notify remote node (best-effort, FIPS-first → Tor fallback) let _ = notify_join( &node.onion, + node.fips_npub.as_deref(), local_did, local_onion, local_pubkey, @@ -195,22 +196,18 @@ pub async fn accept_invite( } /// Best-effort notification to the remote node that we joined their federation. -/// Signs the message with our ed25519 key so the remote peer can verify authenticity. +/// Prefers FIPS (if the remote advertised an npub in their invite) and +/// falls back to Tor. Signs the message with our ed25519 key so the +/// remote peer can verify authenticity regardless of transport. async fn notify_join( remote_onion: &str, + remote_fips_npub: Option<&str>, local_did: &str, local_onion: &str, local_pubkey: &str, local_fips_npub: Option<&str>, sign_fn: impl FnOnce(&[u8]) -> String, ) -> Result<()> { - let host = if remote_onion.ends_with(".onion") { - remote_onion.to_string() - } else { - format!("{}.onion", remote_onion) - }; - let url = format!("http://{}/rpc/v1", host); - // Sign the canonical message: "peer-joined:{did}:{onion}:{pubkey}" // Signature domain intentionally unchanged — fips_npub is carried // as an unsigned informational field. The FIPS daemon's own Noise @@ -234,15 +231,10 @@ async fn notify_join( "params": params, }); - let proxy = - reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?; - let client = reqwest::Client::builder() - .proxy(proxy) + let _ = crate::fips::dial::PeerRequest::new(remote_fips_npub, remote_onion, "/rpc/v1") .timeout(std::time::Duration::from_secs(30)) - .build() - .context("Failed to build HTTP client")?; - - let _ = client.post(&url).json(&body).send().await; + .send_json(&body) + .await; Ok(()) } diff --git a/core/archipelago/src/federation/sync.rs b/core/archipelago/src/federation/sync.rs index c369122f..8a00fe8f 100644 --- a/core/archipelago/src/federation/sync.rs +++ b/core/archipelago/src/federation/sync.rs @@ -1,26 +1,24 @@ //! Federation state sync and remote deployment. +//! +//! Requests prefer FIPS (direct ULA dial, ~LAN latency) and fall back to +//! Tor on any network failure. See `crate::fips::dial::PeerRequest` for +//! the fallback mechanics. use anyhow::{Context, Result}; use std::path::Path; use super::storage::update_node_state; use super::types::{AppStatus, FederatedNode, NodeStateSnapshot, TrustLevel}; +use crate::fips::dial::PeerRequest; -/// Sync state with a single federated peer over Tor. +/// Sync state with a single federated peer. Tries FIPS first; falls back +/// to Tor on any transport-level failure. pub async fn sync_with_peer( data_dir: &Path, peer: &FederatedNode, local_did: &str, sign_fn: impl FnOnce(&[u8]) -> String, ) -> Result { - let host = if peer.onion.ends_with(".onion") { - peer.onion.clone() - } else { - format!("{}.onion", peer.onion) - }; - let url = format!("http://{}/rpc/v1", host); - - // Sign current timestamp for authentication let timestamp = chrono::Utc::now().to_rfc3339(); let signature = sign_fn(timestamp.as_bytes()); @@ -29,26 +27,17 @@ pub async fn sync_with_peer( "params": {} }); - let proxy = - reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?; - let client = reqwest::Client::builder() - .proxy(proxy) - .timeout(std::time::Duration::from_secs(30)) - .build() - .context("Failed to build HTTP client")?; - - let resp = client - .post(&url) + let (resp, transport) = PeerRequest::new(peer.fips_npub.as_deref(), &peer.onion, "/rpc/v1") .header("X-Federation-DID", local_did) - .header("X-Federation-Sig", &signature) - .header("X-Federation-Timestamp", ×tamp) - .json(&body) - .send() + .header("X-Federation-Sig", signature) + .header("X-Federation-Timestamp", timestamp) + .timeout(std::time::Duration::from_secs(30)) + .send_json(&body) .await .context("Failed to reach federated peer")?; if !resp.status().is_success() { - anyhow::bail!("Peer returned {}", resp.status()); + anyhow::bail!("Peer returned {} (via {})", resp.status(), transport); } let result: serde_json::Value = resp.json().await.context("Invalid response from peer")?; @@ -109,13 +98,6 @@ pub async fn deploy_to_peer( ); } - let host = if peer.onion.ends_with(".onion") { - peer.onion.clone() - } else { - format!("{}.onion", peer.onion) - }; - let url = format!("http://{}/rpc/v1", host); - let timestamp = chrono::Utc::now().to_rfc3339(); let signature = sign_fn(timestamp.as_bytes()); @@ -128,26 +110,17 @@ pub async fn deploy_to_peer( } }); - let proxy = - reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?; - let client = reqwest::Client::builder() - .proxy(proxy) - .timeout(std::time::Duration::from_secs(120)) - .build() - .context("Failed to build HTTP client")?; - - let resp = client - .post(&url) + let (resp, transport) = PeerRequest::new(peer.fips_npub.as_deref(), &peer.onion, "/rpc/v1") .header("X-Federation-DID", local_did) - .header("X-Federation-Sig", &signature) - .header("X-Federation-Timestamp", ×tamp) - .json(&body) - .send() + .header("X-Federation-Sig", signature) + .header("X-Federation-Timestamp", timestamp) + .timeout(std::time::Duration::from_secs(120)) + .send_json(&body) .await .context("Failed to reach federated peer for deploy")?; if !resp.status().is_success() { - anyhow::bail!("Remote node returned HTTP {}", resp.status()); + anyhow::bail!("Remote node returned HTTP {} (via {})", resp.status(), transport); } let result: serde_json::Value = resp.json().await.context("Invalid response from peer")?; diff --git a/core/archipelago/src/fips/dial.rs b/core/archipelago/src/fips/dial.rs index d99465ab..d8edbfb1 100644 --- a/core/archipelago/src/fips/dial.rs +++ b/core/archipelago/src/fips/dial.rs @@ -209,6 +209,194 @@ pub fn as_ip_addr(v6: Ipv6Addr) -> IpAddr { IpAddr::V6(v6) } +// ── High-level peer request helpers ──────────────────────────────────── + +/// Quick poll: is the FIPS daemon (archipelago-supervised OR upstream) +/// currently `systemctl is-active`? Async wrapper intended for the +/// migration call sites; unlike `FipsTransport::is_available` this does +/// not maintain a cache, so callers that poll frequently should cache +/// themselves. +pub async fn is_service_active() -> bool { + for unit in [ + crate::fips::SERVICE_UNIT, + crate::fips::UPSTREAM_SERVICE_UNIT, + ] { + if crate::fips::service::unit_state(unit).await == "active" { + return true; + } + } + false +} + +/// Builder for a peer request that may be sent over FIPS (preferred) or +/// Tor (fallback). The call sites migrating off direct-Tor dialing build +/// one of these and call [`send_json`] / [`send_get`]; the helper handles +/// dial, timeout, fallback, and cross-transport auth headers. +pub struct PeerRequest<'a> { + pub fips_npub: Option<&'a str>, + pub onion_host: &'a str, + pub path: &'a str, + pub headers: Vec<(&'a str, String)>, + pub timeout: std::time::Duration, +} + +impl<'a> PeerRequest<'a> { + pub fn new( + fips_npub: Option<&'a str>, + onion_host: &'a str, + path: &'a str, + ) -> Self { + Self { + fips_npub, + onion_host, + path, + headers: Vec::new(), + timeout: std::time::Duration::from_secs(30), + } + } + + pub fn header(mut self, name: &'a str, value: impl Into) -> Self { + self.headers.push((name, value.into())); + self + } + + pub fn timeout(mut self, t: std::time::Duration) -> Self { + self.timeout = t; + self + } + + /// POST a JSON body. Returns the `reqwest::Response` — caller decides + /// how to interpret the status code. + pub async fn send_json( + &self, + body: &B, + ) -> Result<(reqwest::Response, crate::transport::TransportKind)> { + if let Some(resp) = self.try_fips_post_json(body).await? { + return Ok((resp, crate::transport::TransportKind::Fips)); + } + let resp = self.send_tor_post_json(body).await?; + Ok((resp, crate::transport::TransportKind::Tor)) + } + + /// GET with optional header-based auth. + pub async fn send_get( + &self, + ) -> Result<(reqwest::Response, crate::transport::TransportKind)> { + if let Some(resp) = self.try_fips_get().await? { + return Ok((resp, crate::transport::TransportKind::Fips)); + } + let resp = self.send_tor_get().await?; + Ok((resp, crate::transport::TransportKind::Tor)) + } + + async fn try_fips_post_json( + &self, + body: &B, + ) -> Result> { + let Some(npub) = self.fips_npub else { + return Ok(None); + }; + if !is_service_active().await { + return Ok(None); + } + let base = match peer_base_url(npub).await { + Ok(b) => b, + Err(e) => { + tracing::debug!("FIPS resolve for {} failed: {}", npub, e); + return Ok(None); + } + }; + let url = format!("{}{}", base, self.path); + let c = client(); + let mut rb = c.post(&url).json(body); + for (k, v) in &self.headers { + rb = rb.header(*k, v); + } + match rb.send().await { + Ok(r) => Ok(Some(r)), + Err(e) => { + tracing::debug!("FIPS POST {} failed: {}, falling back to Tor", url, e); + Ok(None) + } + } + } + + async fn try_fips_get(&self) -> Result> { + let Some(npub) = self.fips_npub else { + return Ok(None); + }; + if !is_service_active().await { + return Ok(None); + } + let base = match peer_base_url(npub).await { + Ok(b) => b, + Err(e) => { + tracing::debug!("FIPS resolve for {} failed: {}", npub, e); + return Ok(None); + } + }; + let url = format!("{}{}", base, self.path); + let c = client(); + let mut rb = c.get(&url); + for (k, v) in &self.headers { + rb = rb.header(*k, v); + } + match rb.send().await { + Ok(r) => Ok(Some(r)), + Err(e) => { + tracing::debug!("FIPS GET {} failed: {}, falling back to Tor", url, e); + Ok(None) + } + } + } + + async fn send_tor_post_json( + &self, + body: &B, + ) -> Result { + let url = self.tor_url(); + let client = self.tor_client()?; + let mut rb = client.post(&url).json(body); + for (k, v) in &self.headers { + rb = rb.header(*k, v); + } + rb.send() + .await + .with_context(|| format!("Tor POST {}", url)) + } + + async fn send_tor_get(&self) -> Result { + let url = self.tor_url(); + let client = self.tor_client()?; + let mut rb = client.get(&url); + for (k, v) in &self.headers { + rb = rb.header(*k, v); + } + rb.send() + .await + .with_context(|| format!("Tor GET {}", url)) + } + + fn tor_url(&self) -> String { + let host = if self.onion_host.ends_with(".onion") { + self.onion_host.to_string() + } else { + format!("{}.onion", self.onion_host) + }; + format!("http://{}{}", host, self.path) + } + + fn tor_client(&self) -> Result { + let proxy = reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY) + .context("Invalid Tor SOCKS proxy URL")?; + reqwest::Client::builder() + .proxy(proxy) + .timeout(self.timeout) + .build() + .context("Build Tor HTTP client") + } +} + #[cfg(test)] mod tests { use super::*;