diff --git a/core/archipelago/src/api/handler/mod.rs b/core/archipelago/src/api/handler/mod.rs index d1193819..f3927a63 100644 --- a/core/archipelago/src/api/handler/mod.rs +++ b/core/archipelago/src/api/handler/mod.rs @@ -267,6 +267,15 @@ impl ApiHandler { Self::handle_node_message(body_bytes).await } + // Mesh typed envelope relay over federation — peers POST + // pre-encoded TypedEnvelope wire bytes here when the envelope is + // too large for a single LoRa frame (primarily ContentRef). No + // session auth: the body carries a pubkey + ed25519 signature + // over the wire bytes which we verify before dispatching. + (Method::POST, "/archipelago/mesh-typed") => { + Self::handle_mesh_typed_relay(self.rpc_handler.clone(), body_bytes).await + } + // Blob upload — local/session use only. Session-authenticated so // only the node owner can push attachments into the blob store. (Method::POST, "/api/blob") => { diff --git a/core/archipelago/src/api/handler/node_message.rs b/core/archipelago/src/api/handler/node_message.rs index b405c026..9e170864 100644 --- a/core/archipelago/src/api/handler/node_message.rs +++ b/core/archipelago/src/api/handler/node_message.rs @@ -1,6 +1,8 @@ +use crate::api::rpc::RpcHandler; use crate::node_message as node_msg; use super::build_response;use anyhow::Result; use hyper::{Response, StatusCode}; +use std::sync::Arc; use super::{ApiHandler, is_valid_pubkey_hex, sanitize_html, sanitize_log_string}; @@ -74,4 +76,85 @@ impl ApiHandler { } Ok(build_response(StatusCode::OK, "application/json", hyper::Body::from(r#"{"ok":true}"#))) } + + /// Federation-routed mesh typed envelope. Body: + /// `{from_pubkey, from_name?, typed_envelope_b64, signature}` + /// Signature is ed25519 over the raw wire bytes, verified against + /// from_pubkey before dispatch. + pub(super) async fn handle_mesh_typed_relay( + rpc_handler: Arc, + body: hyper::body::Bytes, + ) -> Result> { + use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _}; + #[derive(serde::Deserialize)] + struct Incoming { + from_pubkey: String, + #[serde(default)] + from_name: Option, + typed_envelope_b64: String, + signature: String, + } + let incoming: Incoming = match serde_json::from_slice(&body) { + Ok(v) => v, + Err(e) => { + return Ok(build_response( + StatusCode::BAD_REQUEST, + "application/json", + hyper::Body::from(format!(r#"{{"error":"bad json: {}"}}"#, e)), + )); + } + }; + if !is_valid_pubkey_hex(&incoming.from_pubkey) { + return Ok(build_response( + StatusCode::BAD_REQUEST, + "application/json", + hyper::Body::from(r#"{"error":"invalid pubkey"}"#), + )); + } + let wire = match BASE64.decode(incoming.typed_envelope_b64.as_bytes()) { + Ok(v) => v, + Err(_) => { + return Ok(build_response( + StatusCode::BAD_REQUEST, + "application/json", + hyper::Body::from(r#"{"error":"bad base64"}"#), + )); + } + }; + match crate::identity::NodeIdentity::verify(&incoming.from_pubkey, &wire, &incoming.signature) { + Ok(true) => {} + _ => { + return Ok(build_response( + StatusCode::FORBIDDEN, + "application/json", + hyper::Body::from(r#"{"error":"signature rejected"}"#), + )); + } + } + + // Inject into mesh state via the shared MeshService. Mirrors a radio + // receive, so the message lands in the same chat stream as LoRa- + // delivered messages from the same peer. + let service = rpc_handler.mesh_service_arc(); + let svc_guard = service.read().await; + let Some(svc) = svc_guard.as_ref() else { + return Ok(build_response( + StatusCode::SERVICE_UNAVAILABLE, + "application/json", + hyper::Body::from(r#"{"error":"mesh not running"}"#), + )); + }; + if let Err(e) = svc + .inject_typed_from_federation(&incoming.from_pubkey, incoming.from_name.as_deref(), wire) + .await + { + tracing::warn!("mesh-typed relay inject failed: {}", e); + return Ok(build_response( + StatusCode::BAD_REQUEST, + "application/json", + hyper::Body::from(format!(r#"{{"error":"{}"}}"#, e)), + )); + } + Ok(build_response(StatusCode::OK, "application/json", hyper::Body::from(r#"{"ok":true}"#))) + } } diff --git a/core/archipelago/src/api/rpc/mesh/typed_messages.rs b/core/archipelago/src/api/rpc/mesh/typed_messages.rs index 4c7963ff..f0709726 100644 --- a/core/archipelago/src/api/rpc/mesh/typed_messages.rs +++ b/core/archipelago/src/api/rpc/mesh/typed_messages.rs @@ -286,9 +286,35 @@ impl RpcHandler { (None, None) => format!("{} ({} bytes)", content.mime, content.size), }; let typed_json = serde_json::to_value(&content).ok(); - let msg = svc - .send_typed_wire(contact_id, wire, "content_ref", &display, typed_json, seq) - .await?; + + // ContentRef envelopes routinely exceed LoRa's ~160-byte per-frame + // budget (cid alone is 64 hex chars, plus onion + cap). Route via + // federation when the peer has a known onion; fall back to LoRa + // only for tiny envelopes that could theoretically fit. + let federation_onion = { + let nodes = crate::federation::load_nodes(&self.config.data_dir) + .await + .unwrap_or_default(); + nodes + .into_iter() + .find(|n| n.pubkey == peer_pubkey_hex) + .map(|n| n.onion) + }; + let msg = if let Some(onion) = federation_onion { + svc.send_typed_wire_via_federation( + contact_id, + &onion, + wire, + "content_ref", + &display, + typed_json, + seq, + ) + .await? + } else { + svc.send_typed_wire(contact_id, wire, "content_ref", &display, typed_json, seq) + .await? + }; info!(contact_id, cid = %cid, size = meta.size, "Sent content_ref over mesh"); Ok(serde_json::json!({ diff --git a/core/archipelago/src/mesh/listener/dispatch.rs b/core/archipelago/src/mesh/listener/dispatch.rs index 349fde10..3911bdac 100644 --- a/core/archipelago/src/mesh/listener/dispatch.rs +++ b/core/archipelago/src/mesh/listener/dispatch.rs @@ -71,7 +71,19 @@ pub(super) async fn handle_typed_message( return; } }; + handle_typed_envelope_direct(state, sender_contact_id, sender_name, envelope).await; +} +/// Dispatch a pre-decoded TypedEnvelope. Shared between the radio receive +/// path (handle_typed_message above) and the federation receive path +/// (MeshService::inject_typed_from_federation) so both transports land the +/// same payload in the same chat store. +pub(crate) async fn handle_typed_envelope_direct( + state: &Arc, + sender_contact_id: u32, + sender_name: &str, + envelope: TypedEnvelope, +) { // Verify envelope signature if present, using the sender's known Ed25519 key if envelope.sig.is_some() { let peer_pubkey = state.peers.read().await diff --git a/core/archipelago/src/mesh/listener/mod.rs b/core/archipelago/src/mesh/listener/mod.rs index b1a4b423..945a4d72 100644 --- a/core/archipelago/src/mesh/listener/mod.rs +++ b/core/archipelago/src/mesh/listener/mod.rs @@ -9,7 +9,7 @@ mod bitcoin; mod decode; -mod dispatch; +pub(crate) mod dispatch; mod frames; mod session; diff --git a/core/archipelago/src/mesh/mod.rs b/core/archipelago/src/mesh/mod.rs index 3d49e9c3..5f4f0aaf 100644 --- a/core/archipelago/src/mesh/mod.rs +++ b/core/archipelago/src/mesh/mod.rs @@ -527,6 +527,112 @@ impl MeshService { .await) } + /// Send a typed envelope to a peer via the federation (Tor) path rather + /// than LoRa. Used when the envelope exceeds LoRa's per-frame budget — + /// ContentRef is the canonical example, at 400+ bytes with thumb + cap. + /// The peer's onion must already be in federation storage. Records a + /// Sent MeshMessage locally on success so the UI gets the rich card. + /// + /// This does NOT use chunking and does NOT go through the mesh radio — + /// it is a straight HTTP POST over Tor to the peer's + /// `/archipelago/mesh-typed` endpoint. + pub async fn send_typed_wire_via_federation( + &self, + contact_id: u32, + peer_onion: &str, + wire: Vec, + type_label: &str, + display_text: &str, + typed_payload: Option, + sender_seq: u64, + ) -> Result { + use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _}; + use ed25519_dalek::Signer; + + let host = if peer_onion.ends_with(".onion") { + peer_onion.to_string() + } else { + format!("{}.onion", peer_onion.trim_end_matches('/')) + }; + let url = format!("http://{}/archipelago/mesh-typed", host); + + // Sign the raw wire bytes so the receiver can attribute the envelope + // to our pubkey even when it arrives over federation/Tor rather than + // the radio. Signature covers the wire only — the receiver re-hashes. + let signature = hex::encode(self.signing_key.sign(&wire).to_bytes()); + let wire_b64 = BASE64.encode(&wire); + let body = serde_json::json!({ + "from_pubkey": self.our_ed_pubkey_hex, + "from_name": self.our_did, + "typed_envelope_b64": wire_b64, + "signature": signature, + }); + + let proxy = reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY) + .map_err(|e| anyhow::anyhow!("Invalid Tor proxy: {}", e))?; + let client = reqwest::Client::builder() + .proxy(proxy) + .timeout(std::time::Duration::from_secs(120)) + .build() + .map_err(|e| anyhow::anyhow!("HTTP client build failed: {}", e))?; + let resp = client + .post(&url) + .json(&body) + .send() + .await + .map_err(|e| anyhow::anyhow!("Federation POST failed: {}", e))?; + if !resp.status().is_success() { + anyhow::bail!("Peer rejected typed envelope: HTTP {}", resp.status()); + } + Ok(self + .record_sent_typed(contact_id, type_label, display_text, typed_payload, sender_seq) + .await) + } + + /// Inject a typed envelope received over federation (Tor) into MeshState + /// as if it had arrived over the mesh radio. Looks up contact_id by + /// matching pubkey_hex against known mesh peers; falls back to a + /// synthetic id derived from the pubkey bytes so the UI can still + /// address the chat (will render as a new "peer" if not in the list). + pub async fn inject_typed_from_federation( + &self, + from_pubkey_hex: &str, + from_name: Option<&str>, + wire: Vec, + ) -> Result<()> { + let envelope = crate::mesh::message_types::TypedEnvelope::from_wire(&wire)?; + // Find the contact_id by pubkey match; fall back to synthetic. + let contact_id = { + let peers = self.state.peers.read().await; + peers + .iter() + .find_map(|(cid, p)| { + if p.pubkey_hex.as_deref() == Some(from_pubkey_hex) { + Some(*cid) + } else { + None + } + }) + .unwrap_or_else(|| { + let bytes = hex::decode(from_pubkey_hex).unwrap_or_default(); + if bytes.len() >= 4 { + u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) + } else { + 0 + } + }) + }; + let display_name = from_name.unwrap_or("federation peer").to_string(); + listener::dispatch::handle_typed_envelope_direct( + &self.state, + contact_id, + &display_name, + envelope, + ) + .await; + Ok(()) + } + /// Allocate the next outbound seq for a target. Convenience passthrough /// to MeshState::next_send_seq; used by RPC handlers before encoding a /// TypedEnvelope so the seq on the wire matches the Sent record.