feat(messaging,dwn,mesh): route peer messaging + DWN sync + blob fetch via FIPS first

Migrates the remaining Tor-direct peer call sites to PeerRequest so
FIPS is the default when the peer is federated and running the daemon:

- node_message::send_to_peer / check_peer_reachable: gain a
  fips_npub parameter. Error messages updated to reference both
  transports.
- Callers (api/rpc/network.rs, api/rpc/peers.rs, server health
  loop): look up fips_npub from federation storage by onion and
  pass it.
- mesh::send_typed_wire_via_federation: the spawned background POST
  for the /archipelago/mesh-typed endpoint now uses PeerRequest with
  federation-resolved fips_npub. Signature domain unchanged.
- api/rpc/mesh/typed_messages.rs fetch_blob_from_peer: blob URL
  rebuilt as (base_url, path_with_query) so PeerRequest can append
  the query string after swapping the host. Cap/exp/peer
  parameters are still signed over the content ref itself, so
  transport choice is invisible to the signature.
- network/dwn_sync.rs sync_with_peers: per-peer fips_npub lookup
  before sync_single_peer; health/pull/push each dial through
  PeerRequest, so any DWN peer known to federation gets FIPS.

Left Tor-only on purpose:
- api/rpc/identity/handlers.rs handle_identity_resolve_peer_onion —
  resolving TO a DID, no anchor yet.
- content.browse / preview calls to non-federated peers fall
  through to Tor naturally inside PeerRequest (no fips_npub → skip
  FIPS branch).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dorian 2026-04-19 01:36:04 -04:00
parent ba825c13a5
commit dbd19006f2
7 changed files with 112 additions and 137 deletions

View File

@ -725,34 +725,29 @@ impl RpcHandler {
})); }));
} }
// Reach the sender over Tor. Onion host is used verbatim; cap/exp/peer // Reach the sender: FIPS preferred when the sender is federated
// match what the sender signed in handle_mesh_send_content. // and has advertised a FIPS npub, Tor fallback otherwise.
let url = format!( // Cap/exp/peer in the query string match what the sender signed in
"http://{}/blob/{}?cap={}&exp={}&peer={}", // handle_mesh_send_content — signature domain unchanged.
sender_onion let onion_bare = sender_onion
.trim_start_matches("http://") .trim_start_matches("http://")
.trim_start_matches("https://"), .trim_start_matches("https://")
cid, .to_string();
cap_token, let path = format!(
cap_exp, "/blob/{}?cap={}&exp={}&peer={}",
self_pubkey_hex, cid, cap_token, cap_exp, self_pubkey_hex
); );
let fips_npub =
crate::federation::fips_npub_for_onion(&self.config.data_dir, &onion_bare).await;
let socks_proxy = reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY) let (resp, transport) =
.map_err(|e| anyhow::anyhow!("SOCKS proxy setup failed: {}", e))?; crate::fips::dial::PeerRequest::new(fips_npub.as_deref(), &onion_bare, &path)
let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(120))
.proxy(socks_proxy) .send_get()
.timeout(std::time::Duration::from_secs(120)) .await
.build() .map_err(|e| anyhow::anyhow!("Fetch failed: {}", e))?;
.map_err(|e| anyhow::anyhow!("HTTP client build failed: {}", e))?;
let resp = client
.get(&url)
.send()
.await
.map_err(|e| anyhow::anyhow!("Fetch failed: {}", e))?;
if !resp.status().is_success() { if !resp.status().is_success() {
anyhow::bail!("Blob fetch HTTP {}", resp.status()); anyhow::bail!("Blob fetch HTTP {} (via {})", resp.status(), transport);
} }
let mime = resp let mime = resp
.headers() .headers()
@ -777,7 +772,7 @@ impl RpcHandler {
"/blob/{}?cap={}&exp={}&peer={}", "/blob/{}?cap={}&exp={}&peer={}",
meta.cid, local_cap, local_exp, self_pubkey_hex meta.cid, local_cap, local_exp, self_pubkey_hex
); );
info!(cid = %cid, size = meta.size, "Fetched content_ref blob via tor"); info!(cid = %cid, size = meta.size, transport = %transport, "Fetched content_ref blob");
Ok(serde_json::json!({ Ok(serde_json::json!({
"fetched": true, "fetched": true,
"cached": false, "cached": false,

View File

@ -127,8 +127,11 @@ impl RpcHandler {
"message": message, "message": message,
}); });
let to_fips_npub =
crate::federation::fips_npub_for_onion(&self.config.data_dir, to_onion).await;
crate::node_message::send_to_peer( crate::node_message::send_to_peer(
to_onion, to_onion,
to_fips_npub.as_deref(),
my_pubkey, my_pubkey,
&req_msg.to_string(), &req_msg.to_string(),
None, None,

View File

@ -114,20 +114,20 @@ impl RpcHandler {
let fed_nodes = federation::load_nodes(&self.config.data_dir) let fed_nodes = federation::load_nodes(&self.config.data_dir)
.await .await
.unwrap_or_default(); .unwrap_or_default();
let recipient_pubkey = fed_nodes let recipient = fed_nodes.iter().find(|n| {
.iter() n.onion == onion
.find(|n| { || n.onion == format!("{}.onion", onion)
n.onion == onion || format!("{}.onion", n.onion) == onion
|| n.onion == format!("{}.onion", onion) });
|| format!("{}.onion", n.onion) == onion let recipient_pubkey = recipient.map(|n| n.pubkey.clone());
}) let recipient_fips_npub = recipient.and_then(|n| n.fips_npub.clone());
.map(|n| n.pubkey.clone());
// Include our node name so the recipient can display it // Include our node name so the recipient can display it
let node_name = data.server_info.name.clone(); let node_name = data.server_info.name.clone();
node_message::send_to_peer( node_message::send_to_peer(
onion, onion,
recipient_fips_npub.as_deref(),
&pubkey, &pubkey,
message, message,
Some(node_id.signing_key()), Some(node_id.signing_key()),
@ -147,7 +147,9 @@ impl RpcHandler {
.get("onion") .get("onion")
.and_then(|v| v.as_str()) .and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing onion"))?; .ok_or_else(|| anyhow::anyhow!("Missing onion"))?;
let reachable = node_message::check_peer_reachable(onion) let fips_npub =
crate::federation::fips_npub_for_onion(&self.config.data_dir, onion).await;
let reachable = node_message::check_peer_reachable(onion, fips_npub.as_deref())
.await .await
.unwrap_or(false); .unwrap_or(false);
Ok(serde_json::json!({ "onion": onion, "reachable": reachable })) Ok(serde_json::json!({ "onion": onion, "reachable": reachable }))

View File

@ -827,16 +827,10 @@ impl MeshService {
use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _}; use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
use ed25519_dalek::Signer; use ed25519_dalek::Signer;
let host = if peer_onion.ends_with(".onion") {
peer_onion.to_string()
} else {
format!("{}.onion", peer_onion.trim_end_matches('/'))
};
let url = format!("http://{}/archipelago/mesh-typed", host);
// Sign the raw wire bytes so the receiver can attribute the envelope // Sign the raw wire bytes so the receiver can attribute the envelope
// to our pubkey even when it arrives over federation/Tor rather than // to our pubkey even when it arrives over federation/FIPS/Tor rather
// the radio. Signature covers the wire only — the receiver re-hashes. // than the radio. Signature covers the wire only — the receiver
// re-hashes.
let signature = hex::encode(self.signing_key.sign(&wire).to_bytes()); let signature = hex::encode(self.signing_key.sign(&wire).to_bytes());
let wire_b64 = BASE64.encode(&wire); let wire_b64 = BASE64.encode(&wire);
let body = serde_json::json!({ let body = serde_json::json!({
@ -857,33 +851,29 @@ impl MeshService {
) )
.await; .await;
// Fire the Tor POST in the background. Failures are logged but do // Fire the send in the background. FIPS is preferred when the peer
// not propagate — the caller has already been handed the Sent // is federated and running fips; Tor is the fallback. Failures are
// logged but do not propagate — caller already has the Sent
// MeshMessage and the UI's delivery indicator tracks the receipt. // MeshMessage and the UI's delivery indicator tracks the receipt.
let peer_onion_owned = peer_onion.to_string();
let data_dir_owned = self.data_dir.clone();
tokio::spawn(async move { tokio::spawn(async move {
let proxy = match reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY) { let fips_npub =
Ok(p) => p, crate::federation::fips_npub_for_onion(&data_dir_owned, &peer_onion_owned).await;
Err(e) => { let req = crate::fips::dial::PeerRequest::new(
warn!(contact_id, "Invalid Tor proxy: {}", e); fips_npub.as_deref(),
return; &peer_onion_owned,
"/archipelago/mesh-typed",
)
.timeout(std::time::Duration::from_secs(120));
match req.send_json(&body).await {
Ok((resp, transport)) if resp.status().is_success() => {
tracing::debug!(contact_id, transport = %transport, "Federation envelope delivered");
} }
}; Ok((resp, transport)) => warn!(
let client = match reqwest::Client::builder()
.proxy(proxy)
.timeout(std::time::Duration::from_secs(120))
.build()
{
Ok(c) => c,
Err(e) => {
warn!(contact_id, "HTTP client build failed: {}", e);
return;
}
};
match client.post(&url).json(&body).send().await {
Ok(resp) if resp.status().is_success() => {}
Ok(resp) => warn!(
contact_id, contact_id,
status = %resp.status(), status = %resp.status(),
transport = %transport,
"Peer rejected federation-routed envelope" "Peer rejected federation-routed envelope"
), ),
Err(e) => warn!(contact_id, "Federation POST failed: {}", e), Err(e) => warn!(contact_id, "Federation POST failed: {}", e),

View File

@ -104,16 +104,6 @@ pub async fn sync_with_peers(data_dir: &Path, peer_onions: &[String]) -> Result<
state.status = SyncStatus::Syncing; state.status = SyncStatus::Syncing;
save_sync_state(data_dir, &state).await?; save_sync_state(data_dir, &state).await?;
let socks_proxy = reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY)
.context("Failed to create SOCKS proxy")?;
let client = reqwest::Client::builder()
.proxy(socks_proxy)
.connect_timeout(std::time::Duration::from_secs(15))
.timeout(std::time::Duration::from_secs(30))
.build()
.context("Failed to build Tor HTTP client")?;
let store = DwnStore::new(data_dir).await?; let store = DwnStore::new(data_dir).await?;
let mut synced_count = 0u64; let mut synced_count = 0u64;
@ -142,7 +132,15 @@ pub async fn sync_with_peers(data_dir: &Path, peer_onions: &[String]) -> Result<
// Overall sync timeout: 90 seconds // Overall sync timeout: 90 seconds
let sync_future = async { let sync_future = async {
for onion in &unique_onions { for onion in &unique_onions {
match sync_single_peer(&client, &store, onion, &local_messages, &state.last_sync).await let fips_npub = crate::federation::fips_npub_for_onion(data_dir, onion).await;
match sync_single_peer(
fips_npub.as_deref(),
&store,
onion,
&local_messages,
&state.last_sync,
)
.await
{ {
Ok(count) => { Ok(count) => {
debug!(peer = %onion, messages = count, "Peer sync complete"); debug!(peer = %onion, messages = count, "Peer sync complete");
@ -173,29 +171,28 @@ pub async fn sync_with_peers(data_dir: &Path, peer_onions: &[String]) -> Result<
} }
/// Sync with a single peer: pull their messages and push ours. /// Sync with a single peer: pull their messages and push ours.
/// Each HTTP call picks FIPS when a npub is known, otherwise Tor.
async fn sync_single_peer( async fn sync_single_peer(
client: &reqwest::Client, fips_npub: Option<&str>,
store: &crate::network::dwn_store::DwnStore, store: &crate::network::dwn_store::DwnStore,
onion: &str, onion: &str,
local_messages: &[crate::network::dwn_store::DwnMessage], local_messages: &[crate::network::dwn_store::DwnMessage],
last_sync: &Option<String>, last_sync: &Option<String>,
) -> Result<u64> { ) -> Result<u64> {
let base_url = format!("http://{}", onion); use crate::fips::dial::PeerRequest;
let mut imported = 0u64; let mut imported = 0u64;
// Step 1: Check peer health // Step 1: Check peer health
let health_url = format!("{}/dwn/health", base_url); let (health_resp, _) = PeerRequest::new(fips_npub, onion, "/dwn/health")
let res = client .timeout(std::time::Duration::from_secs(30))
.get(&health_url) .send_get()
.send()
.await .await
.context("Peer DWN unreachable")?; .context("Peer DWN unreachable")?;
if !res.status().is_success() { if !health_resp.status().is_success() {
return Err(anyhow::anyhow!("Peer DWN not healthy")); return Err(anyhow::anyhow!("Peer DWN not healthy"));
} }
// Step 2: Pull — query peer for messages since our last sync // Step 2: Pull — query peer for messages since our last sync
let dwn_url = format!("{}/dwn", base_url);
let mut query_filter = serde_json::json!({}); let mut query_filter = serde_json::json!({});
if let Some(ref since) = last_sync { if let Some(ref since) = last_sync {
query_filter = serde_json::json!({ "dateSort": "createdAscending", "dateFrom": since }); query_filter = serde_json::json!({ "dateSort": "createdAscending", "dateFrom": since });
@ -210,10 +207,9 @@ async fn sync_single_peer(
}] }]
}); });
let pull_res = client let (pull_res, _) = PeerRequest::new(fips_npub, onion, "/dwn")
.post(&dwn_url) .timeout(std::time::Duration::from_secs(30))
.json(&pull_body) .send_json(&pull_body)
.send()
.await .await
.context("Failed to query peer DWN")?; .context("Failed to query peer DWN")?;
@ -267,10 +263,14 @@ async fn sync_single_peer(
let push_body = serde_json::json!({ "messages": messages }); let push_body = serde_json::json!({ "messages": messages });
// Best-effort push — don't fail the whole sync if a batch fails // Best-effort push — don't fail the whole sync if a batch fails.
match client.post(&dwn_url).json(&push_body).send().await { match PeerRequest::new(fips_npub, onion, "/dwn")
Ok(_) => { .timeout(std::time::Duration::from_secs(30))
debug!(count = chunk.len(), "Pushed message batch to peer"); .send_json(&push_body)
.await
{
Ok((_, t)) => {
debug!(count = chunk.len(), transport = %t, "Pushed message batch to peer");
} }
Err(e) => { Err(e) => {
debug!(error = %e, "Failed to push message batch to peer"); debug!(error = %e, "Failed to push message batch to peer");

View File

@ -244,6 +244,7 @@ fn validate_onion(onion: &str) -> Result<()> {
/// derived from both nodes' ed25519 keys. /// derived from both nodes' ed25519 keys.
pub async fn send_to_peer( pub async fn send_to_peer(
onion: &str, onion: &str,
fips_npub: Option<&str>,
from_pubkey: &str, from_pubkey: &str,
message: &str, message: &str,
signing_key: Option<&ed25519_dalek::SigningKey>, signing_key: Option<&ed25519_dalek::SigningKey>,
@ -252,13 +253,6 @@ pub async fn send_to_peer(
) -> Result<()> { ) -> Result<()> {
validate_onion(onion)?; validate_onion(onion)?;
let host = if onion.ends_with(".onion") {
onion.to_string()
} else {
format!("{}.onion", onion)
};
let url = format!("http://{}/archipelago/node-message", host);
// Encrypt message if we have both keys // Encrypt message if we have both keys
let (payload_message, encrypted) = match (signing_key, recipient_pubkey) { let (payload_message, encrypted) = match (signing_key, recipient_pubkey) {
(Some(sk), Some(rpk)) => match encrypt_for_peer(sk, rpk, message) { (Some(sk), Some(rpk)) => match encrypt_for_peer(sk, rpk, message) {
@ -281,57 +275,46 @@ pub async fn send_to_peer(
body["from_name"] = serde_json::Value::String(name.to_string()); body["from_name"] = serde_json::Value::String(name.to_string());
} }
let proxy = let (resp, transport) = crate::fips::dial::PeerRequest::new(
reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?; fips_npub,
let client = reqwest::Client::builder() onion,
.proxy(proxy) "/archipelago/node-message",
.timeout(std::time::Duration::from_secs(60)) )
.build() .timeout(std::time::Duration::from_secs(60))
.context("Failed to build HTTP client")?; .send_json(&body)
.await
let resp = client.post(&url).json(&body).send().await.map_err(|e| { .map_err(|e| {
let msg = e.to_string(); let msg = e.to_string();
if msg.contains("connection refused") || msg.contains("Connection refused") { if msg.contains("connection refused") || msg.contains("Connection refused") {
anyhow::anyhow!("Tor not reachable at 127.0.0.1:9050. Is Tor running?") anyhow::anyhow!("Peer unreachable. Check Tor (127.0.0.1:9050) and FIPS daemon status.")
} else if msg.contains("timeout") || msg.contains("timed out") { } else if msg.contains("timeout") || msg.contains("timed out") {
anyhow::anyhow!( anyhow::anyhow!("Connection timed out. The peer may be offline.")
"Connection timed out. The peer may be offline or unreachable over Tor."
)
} else { } else {
anyhow::anyhow!("Failed to send over Tor: {}", msg) anyhow::anyhow!("Failed to send: {}", msg)
} }
})?; })?;
if !resp.status().is_success() { if !resp.status().is_success() {
anyhow::bail!( anyhow::bail!(
"Peer returned {} {}. The peer may need /archipelago/ in its nginx config.", "Peer returned {} {} (via {}). The peer may need /archipelago/ in its nginx config.",
resp.status().as_u16(), resp.status().as_u16(),
resp.status().canonical_reason().unwrap_or("") resp.status().canonical_reason().unwrap_or(""),
transport,
); );
} }
Ok(()) Ok(())
} }
/// Check if a peer is reachable (ping over Tor). /// Check if a peer is reachable (ping). FIPS is preferred when an npub
pub async fn check_peer_reachable(onion: &str) -> Result<bool> { /// is known, Tor is the fallback.
pub async fn check_peer_reachable(onion: &str, fips_npub: Option<&str>) -> Result<bool> {
validate_onion(onion)?; validate_onion(onion)?;
match crate::fips::dial::PeerRequest::new(fips_npub, onion, "/health")
let host = if onion.ends_with(".onion") {
onion.to_string()
} else {
format!("{}.onion", onion)
};
let url = format!("http://{}/health", host);
let proxy =
reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?;
let client = reqwest::Client::builder()
.proxy(proxy)
.timeout(std::time::Duration::from_secs(30)) .timeout(std::time::Duration::from_secs(30))
.build() .send_get()
.context("Failed to build HTTP client")?; .await
{
match client.get(&url).send().await { Ok((resp, _)) => Ok(resp.status().is_success()),
Ok(resp) => Ok(resp.status().is_success()),
Err(_) => Ok(false), Err(_) => Ok(false),
} }
} }

View File

@ -727,9 +727,11 @@ async fn check_peer_health(state: &StateManager, data_dir: &std::path::Path) ->
let mut new_health = std::collections::HashMap::new(); let mut new_health = std::collections::HashMap::new();
for peer in &known_peers { for peer in &known_peers {
let reachable = node_message::check_peer_reachable(&peer.onion) let fips_npub = crate::federation::fips_npub_for_onion(data_dir, &peer.onion).await;
.await let reachable =
.unwrap_or(false); node_message::check_peer_reachable(&peer.onion, fips_npub.as_deref())
.await
.unwrap_or(false);
new_health.insert(peer.onion.clone(), reachable); new_health.insert(peer.onion.clone(), reachable);
} }