feat(federation): route state-sync / invites / notifications via FIPS first

Every federation peer-to-peer call now prefers FIPS (direct ULA dial
over `fips0`, ~LAN latency) and falls back to Tor only on network
failure. Per-method ed25519 signatures are preserved on both
transports so authenticity doesn't change.

- fips::dial::PeerRequest — fluent builder that owns transport
  selection. Returns the Response plus the TransportKind that carried
  it, so handlers can log or expose which path was used.
- fips::dial::is_service_active — free-standing async probe used by
  migration sites (the transport::fips::is_available cache is keyed
  to a `&self`, not usable from static contexts).
- federation/sync.rs: sync_with_peer + deploy_to_peer drop the
  hand-rolled reqwest::Proxy dance, call PeerRequest instead.
- federation/invites.rs: notify_join takes the remote's fips_npub
  (already parsed out of the invite code since v1.4) and dials over
  FIPS when available. The "peer-joined" signature domain is
  unchanged.
- api/rpc/federation/handlers.rs: DID rotation broadcast loops over
  federated peers through PeerRequest; the per-peer result payload
  gains a `transport` field so the UI can surface mesh vs. onion.
- api/rpc/tor/mod.rs: onion-address-change propagation is now the
  most useful FIPS-first call — fips_npub is stable across onion
  rotation, so peers get the new address even when the old onion
  is already dead.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dorian 2026-04-19 01:20:44 -04:00
parent 274ed008fe
commit 1fdb5e5cf2
5 changed files with 248 additions and 105 deletions

View File

@ -6,7 +6,7 @@ use crate::identity;
use crate::mesh;
use crate::network::dwn_store::DwnStore;
use crate::nostr_handshake;
use anyhow::{Context, Result};
use anyhow::Result;
use tracing::{debug, info, warn};
const FEDERATION_PROTOCOL: &str = "https://archipelago.dev/protocols/federation/v1";
@ -630,14 +630,6 @@ impl RpcHandler {
let nodes = federation::load_nodes(&self.config.data_dir).await?;
let proxy =
reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?;
let client = reqwest::Client::builder()
.proxy(proxy)
.timeout(std::time::Duration::from_secs(30))
.build()
.context("Failed to build HTTP client")?;
let mut notified = 0u32;
let mut failed = 0u32;
let mut results = Vec::new();
@ -648,13 +640,6 @@ impl RpcHandler {
continue;
}
let host = if node.onion.ends_with(".onion") {
node.onion.clone()
} else {
format!("{}.onion", node.onion)
};
let url = format!("http://{}/rpc/v1", host);
let body = serde_json::json!({
"method": "federation.peer-did-changed",
"params": {
@ -666,23 +651,31 @@ impl RpcHandler {
}
});
match client.post(&url).json(&body).send().await {
Ok(resp) if resp.status().is_success() => {
let req = crate::fips::dial::PeerRequest::new(
node.fips_npub.as_deref(),
&node.onion,
"/rpc/v1",
)
.timeout(std::time::Duration::from_secs(30));
match req.send_json(&body).await {
Ok((resp, transport)) if resp.status().is_success() => {
notified += 1;
results.push(serde_json::json!({
"did": node.did,
"status": "ok",
"transport": transport.to_string(),
}));
info!(peer_did = %node.did, "Notified peer of DID rotation");
info!(peer_did = %node.did, transport = %transport, "Notified peer of DID rotation");
}
Ok(resp) => {
Ok((resp, transport)) => {
failed += 1;
results.push(serde_json::json!({
"did": node.did,
"status": "error",
"error": format!("Peer returned {}", resp.status()),
"error": format!("Peer returned {} (via {})", resp.status(), transport),
}));
warn!(peer_did = %node.did, status = %resp.status(), "Peer rejected DID rotation notification");
warn!(peer_did = %node.did, status = %resp.status(), transport = %transport, "Peer rejected DID rotation notification");
}
Err(e) => {
failed += 1;

View File

@ -417,11 +417,13 @@ pub(super) async fn notify_federation_peers_address_change(
return;
}
};
let proxy = tor_proxy.unwrap_or("127.0.0.1:9050");
// `tor_proxy` is retained for API compat but unused — the FIPS
// fallback dial uses constants::TOR_SOCKS_PROXY internally.
let _ = tor_proxy;
match federation::load_nodes(data_dir).await {
Ok(peers) => {
for peer in peers {
if peer.onion.is_empty() {
if peer.onion.is_empty() && peer.fips_npub.is_none() {
continue;
}
let payload = serde_json::json!({
@ -432,24 +434,19 @@ pub(super) async fn notify_federation_peers_address_change(
"old_onion": old_onion,
}
});
let url = format!("http://{}/rpc/v1", &peer.onion);
let client = match reqwest::Client::builder()
.proxy(
match reqwest::Proxy::all(format!("socks5h://{}", proxy)).or_else(
|_| reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY),
) {
Ok(p) => p,
Err(_) => continue,
},
)
.timeout(std::time::Duration::from_secs(30))
.build()
{
Ok(c) => c,
Err(_) => continue,
};
match client.post(&url).json(&payload).send().await {
Ok(_) => info!(peer_did = %peer.did, "Notified peer of address change"),
// FIPS-preferred: peer's fips_npub is stable across
// onion rotation, so this notification reaches them
// even when their (or our) old onion is now stale.
let req = crate::fips::dial::PeerRequest::new(
peer.fips_npub.as_deref(),
&peer.onion,
"/rpc/v1",
)
.timeout(std::time::Duration::from_secs(30));
match req.send_json(&payload).await {
Ok((_, transport)) => {
info!(peer_did = %peer.did, transport = %transport, "Notified peer of address change")
}
Err(e) => warn!(peer_did = %peer.did, "Failed to notify peer: {}", e),
}
}

View File

@ -180,9 +180,10 @@ pub async fn accept_invite(
});
save_invites(data_dir, &invites).await?;
// Notify remote node (best-effort over Tor)
// Notify remote node (best-effort, FIPS-first → Tor fallback)
let _ = notify_join(
&node.onion,
node.fips_npub.as_deref(),
local_did,
local_onion,
local_pubkey,
@ -195,22 +196,18 @@ pub async fn accept_invite(
}
/// Best-effort notification to the remote node that we joined their federation.
/// Signs the message with our ed25519 key so the remote peer can verify authenticity.
/// Prefers FIPS (if the remote advertised an npub in their invite) and
/// falls back to Tor. Signs the message with our ed25519 key so the
/// remote peer can verify authenticity regardless of transport.
async fn notify_join(
remote_onion: &str,
remote_fips_npub: Option<&str>,
local_did: &str,
local_onion: &str,
local_pubkey: &str,
local_fips_npub: Option<&str>,
sign_fn: impl FnOnce(&[u8]) -> String,
) -> Result<()> {
let host = if remote_onion.ends_with(".onion") {
remote_onion.to_string()
} else {
format!("{}.onion", remote_onion)
};
let url = format!("http://{}/rpc/v1", host);
// Sign the canonical message: "peer-joined:{did}:{onion}:{pubkey}"
// Signature domain intentionally unchanged — fips_npub is carried
// as an unsigned informational field. The FIPS daemon's own Noise
@ -234,15 +231,10 @@ async fn notify_join(
"params": params,
});
let proxy =
reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?;
let client = reqwest::Client::builder()
.proxy(proxy)
let _ = crate::fips::dial::PeerRequest::new(remote_fips_npub, remote_onion, "/rpc/v1")
.timeout(std::time::Duration::from_secs(30))
.build()
.context("Failed to build HTTP client")?;
let _ = client.post(&url).json(&body).send().await;
.send_json(&body)
.await;
Ok(())
}

View File

@ -1,26 +1,24 @@
//! Federation state sync and remote deployment.
//!
//! Requests prefer FIPS (direct ULA dial, ~LAN latency) and fall back to
//! Tor on any network failure. See `crate::fips::dial::PeerRequest` for
//! the fallback mechanics.
use anyhow::{Context, Result};
use std::path::Path;
use super::storage::update_node_state;
use super::types::{AppStatus, FederatedNode, NodeStateSnapshot, TrustLevel};
use crate::fips::dial::PeerRequest;
/// Sync state with a single federated peer over Tor.
/// Sync state with a single federated peer. Tries FIPS first; falls back
/// to Tor on any transport-level failure.
pub async fn sync_with_peer(
data_dir: &Path,
peer: &FederatedNode,
local_did: &str,
sign_fn: impl FnOnce(&[u8]) -> String,
) -> Result<NodeStateSnapshot> {
let host = if peer.onion.ends_with(".onion") {
peer.onion.clone()
} else {
format!("{}.onion", peer.onion)
};
let url = format!("http://{}/rpc/v1", host);
// Sign current timestamp for authentication
let timestamp = chrono::Utc::now().to_rfc3339();
let signature = sign_fn(timestamp.as_bytes());
@ -29,26 +27,17 @@ pub async fn sync_with_peer(
"params": {}
});
let proxy =
reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?;
let client = reqwest::Client::builder()
.proxy(proxy)
.timeout(std::time::Duration::from_secs(30))
.build()
.context("Failed to build HTTP client")?;
let resp = client
.post(&url)
let (resp, transport) = PeerRequest::new(peer.fips_npub.as_deref(), &peer.onion, "/rpc/v1")
.header("X-Federation-DID", local_did)
.header("X-Federation-Sig", &signature)
.header("X-Federation-Timestamp", &timestamp)
.json(&body)
.send()
.header("X-Federation-Sig", signature)
.header("X-Federation-Timestamp", timestamp)
.timeout(std::time::Duration::from_secs(30))
.send_json(&body)
.await
.context("Failed to reach federated peer")?;
if !resp.status().is_success() {
anyhow::bail!("Peer returned {}", resp.status());
anyhow::bail!("Peer returned {} (via {})", resp.status(), transport);
}
let result: serde_json::Value = resp.json().await.context("Invalid response from peer")?;
@ -109,13 +98,6 @@ pub async fn deploy_to_peer(
);
}
let host = if peer.onion.ends_with(".onion") {
peer.onion.clone()
} else {
format!("{}.onion", peer.onion)
};
let url = format!("http://{}/rpc/v1", host);
let timestamp = chrono::Utc::now().to_rfc3339();
let signature = sign_fn(timestamp.as_bytes());
@ -128,26 +110,17 @@ pub async fn deploy_to_peer(
}
});
let proxy =
reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?;
let client = reqwest::Client::builder()
.proxy(proxy)
.timeout(std::time::Duration::from_secs(120))
.build()
.context("Failed to build HTTP client")?;
let resp = client
.post(&url)
let (resp, transport) = PeerRequest::new(peer.fips_npub.as_deref(), &peer.onion, "/rpc/v1")
.header("X-Federation-DID", local_did)
.header("X-Federation-Sig", &signature)
.header("X-Federation-Timestamp", &timestamp)
.json(&body)
.send()
.header("X-Federation-Sig", signature)
.header("X-Federation-Timestamp", timestamp)
.timeout(std::time::Duration::from_secs(120))
.send_json(&body)
.await
.context("Failed to reach federated peer for deploy")?;
if !resp.status().is_success() {
anyhow::bail!("Remote node returned HTTP {}", resp.status());
anyhow::bail!("Remote node returned HTTP {} (via {})", resp.status(), transport);
}
let result: serde_json::Value = resp.json().await.context("Invalid response from peer")?;

View File

@ -209,6 +209,194 @@ pub fn as_ip_addr(v6: Ipv6Addr) -> IpAddr {
IpAddr::V6(v6)
}
// ── High-level peer request helpers ────────────────────────────────────
/// Quick poll: is the FIPS daemon (archipelago-supervised OR upstream)
/// currently `systemctl is-active`? Async wrapper intended for the
/// migration call sites; unlike `FipsTransport::is_available` this does
/// not maintain a cache, so callers that poll frequently should cache
/// themselves.
pub async fn is_service_active() -> bool {
for unit in [
crate::fips::SERVICE_UNIT,
crate::fips::UPSTREAM_SERVICE_UNIT,
] {
if crate::fips::service::unit_state(unit).await == "active" {
return true;
}
}
false
}
/// Builder for a peer request that may be sent over FIPS (preferred) or
/// Tor (fallback). The call sites migrating off direct-Tor dialing build
/// one of these and call [`send_json`] / [`send_get`]; the helper handles
/// dial, timeout, fallback, and cross-transport auth headers.
pub struct PeerRequest<'a> {
pub fips_npub: Option<&'a str>,
pub onion_host: &'a str,
pub path: &'a str,
pub headers: Vec<(&'a str, String)>,
pub timeout: std::time::Duration,
}
impl<'a> PeerRequest<'a> {
pub fn new(
fips_npub: Option<&'a str>,
onion_host: &'a str,
path: &'a str,
) -> Self {
Self {
fips_npub,
onion_host,
path,
headers: Vec::new(),
timeout: std::time::Duration::from_secs(30),
}
}
pub fn header(mut self, name: &'a str, value: impl Into<String>) -> Self {
self.headers.push((name, value.into()));
self
}
pub fn timeout(mut self, t: std::time::Duration) -> Self {
self.timeout = t;
self
}
/// POST a JSON body. Returns the `reqwest::Response` — caller decides
/// how to interpret the status code.
pub async fn send_json<B: serde::Serialize>(
&self,
body: &B,
) -> Result<(reqwest::Response, crate::transport::TransportKind)> {
if let Some(resp) = self.try_fips_post_json(body).await? {
return Ok((resp, crate::transport::TransportKind::Fips));
}
let resp = self.send_tor_post_json(body).await?;
Ok((resp, crate::transport::TransportKind::Tor))
}
/// GET with optional header-based auth.
pub async fn send_get(
&self,
) -> Result<(reqwest::Response, crate::transport::TransportKind)> {
if let Some(resp) = self.try_fips_get().await? {
return Ok((resp, crate::transport::TransportKind::Fips));
}
let resp = self.send_tor_get().await?;
Ok((resp, crate::transport::TransportKind::Tor))
}
async fn try_fips_post_json<B: serde::Serialize>(
&self,
body: &B,
) -> Result<Option<reqwest::Response>> {
let Some(npub) = self.fips_npub else {
return Ok(None);
};
if !is_service_active().await {
return Ok(None);
}
let base = match peer_base_url(npub).await {
Ok(b) => b,
Err(e) => {
tracing::debug!("FIPS resolve for {} failed: {}", npub, e);
return Ok(None);
}
};
let url = format!("{}{}", base, self.path);
let c = client();
let mut rb = c.post(&url).json(body);
for (k, v) in &self.headers {
rb = rb.header(*k, v);
}
match rb.send().await {
Ok(r) => Ok(Some(r)),
Err(e) => {
tracing::debug!("FIPS POST {} failed: {}, falling back to Tor", url, e);
Ok(None)
}
}
}
async fn try_fips_get(&self) -> Result<Option<reqwest::Response>> {
let Some(npub) = self.fips_npub else {
return Ok(None);
};
if !is_service_active().await {
return Ok(None);
}
let base = match peer_base_url(npub).await {
Ok(b) => b,
Err(e) => {
tracing::debug!("FIPS resolve for {} failed: {}", npub, e);
return Ok(None);
}
};
let url = format!("{}{}", base, self.path);
let c = client();
let mut rb = c.get(&url);
for (k, v) in &self.headers {
rb = rb.header(*k, v);
}
match rb.send().await {
Ok(r) => Ok(Some(r)),
Err(e) => {
tracing::debug!("FIPS GET {} failed: {}, falling back to Tor", url, e);
Ok(None)
}
}
}
async fn send_tor_post_json<B: serde::Serialize>(
&self,
body: &B,
) -> Result<reqwest::Response> {
let url = self.tor_url();
let client = self.tor_client()?;
let mut rb = client.post(&url).json(body);
for (k, v) in &self.headers {
rb = rb.header(*k, v);
}
rb.send()
.await
.with_context(|| format!("Tor POST {}", url))
}
async fn send_tor_get(&self) -> Result<reqwest::Response> {
let url = self.tor_url();
let client = self.tor_client()?;
let mut rb = client.get(&url);
for (k, v) in &self.headers {
rb = rb.header(*k, v);
}
rb.send()
.await
.with_context(|| format!("Tor GET {}", url))
}
fn tor_url(&self) -> String {
let host = if self.onion_host.ends_with(".onion") {
self.onion_host.to_string()
} else {
format!("{}.onion", self.onion_host)
};
format!("http://{}{}", host, self.path)
}
fn tor_client(&self) -> Result<reqwest::Client> {
let proxy = reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY)
.context("Invalid Tor SOCKS proxy URL")?;
reqwest::Client::builder()
.proxy(proxy)
.timeout(self.timeout)
.build()
.context("Build Tor HTTP client")
}
}
#[cfg(test)]
mod tests {
use super::*;