fix(mesh): route ContentRef over federation when >160B
mesh.send-content was failing with "Message too large for LoRa: 624
bytes (max 160)" because a single ContentRef envelope (cid + onion +
cap_token + thumb) dwarfs a LoRa frame. Add a federation Tor fallback:
- New POST /archipelago/mesh-typed endpoint accepts
{from_pubkey, typed_envelope_b64, signature}, verifies ed25519 over
the raw wire bytes, and injects the decoded envelope into MeshState
via a new MeshService::inject_typed_from_federation helper. This
shares the same dispatch match as LoRa receives via a new pub(crate)
handle_typed_envelope_direct extracted from handle_typed_message.
- MeshService::send_typed_wire_via_federation POSTs the signed wire to
a peer's onion over TOR_SOCKS_PROXY and records a local Sent record.
- handle_mesh_send_content looks up the peer's onion in federation
storage and routes via federation when available, falling back to
LoRa only when no federation presence is known (still fails on
oversized — chunking is Phase 4).
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
649433b7fd
commit
bc3729d99f
@ -267,6 +267,15 @@ impl ApiHandler {
|
||||
Self::handle_node_message(body_bytes).await
|
||||
}
|
||||
|
||||
// Mesh typed envelope relay over federation — peers POST
|
||||
// pre-encoded TypedEnvelope wire bytes here when the envelope is
|
||||
// too large for a single LoRa frame (primarily ContentRef). No
|
||||
// session auth: the body carries a pubkey + ed25519 signature
|
||||
// over the wire bytes which we verify before dispatching.
|
||||
(Method::POST, "/archipelago/mesh-typed") => {
|
||||
Self::handle_mesh_typed_relay(self.rpc_handler.clone(), body_bytes).await
|
||||
}
|
||||
|
||||
// Blob upload — local/session use only. Session-authenticated so
|
||||
// only the node owner can push attachments into the blob store.
|
||||
(Method::POST, "/api/blob") => {
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
use crate::api::rpc::RpcHandler;
|
||||
use crate::node_message as node_msg;
|
||||
use super::build_response;use anyhow::Result;
|
||||
use hyper::{Response, StatusCode};
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::{ApiHandler, is_valid_pubkey_hex, sanitize_html, sanitize_log_string};
|
||||
|
||||
@ -74,4 +76,85 @@ impl ApiHandler {
|
||||
}
|
||||
Ok(build_response(StatusCode::OK, "application/json", hyper::Body::from(r#"{"ok":true}"#)))
|
||||
}
|
||||
|
||||
/// Federation-routed mesh typed envelope. Body:
|
||||
/// `{from_pubkey, from_name?, typed_envelope_b64, signature}`
|
||||
/// Signature is ed25519 over the raw wire bytes, verified against
|
||||
/// from_pubkey before dispatch.
|
||||
pub(super) async fn handle_mesh_typed_relay(
|
||||
rpc_handler: Arc<RpcHandler>,
|
||||
body: hyper::body::Bytes,
|
||||
) -> Result<Response<hyper::Body>> {
|
||||
use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
|
||||
#[derive(serde::Deserialize)]
|
||||
struct Incoming {
|
||||
from_pubkey: String,
|
||||
#[serde(default)]
|
||||
from_name: Option<String>,
|
||||
typed_envelope_b64: String,
|
||||
signature: String,
|
||||
}
|
||||
let incoming: Incoming = match serde_json::from_slice(&body) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
return Ok(build_response(
|
||||
StatusCode::BAD_REQUEST,
|
||||
"application/json",
|
||||
hyper::Body::from(format!(r#"{{"error":"bad json: {}"}}"#, e)),
|
||||
));
|
||||
}
|
||||
};
|
||||
if !is_valid_pubkey_hex(&incoming.from_pubkey) {
|
||||
return Ok(build_response(
|
||||
StatusCode::BAD_REQUEST,
|
||||
"application/json",
|
||||
hyper::Body::from(r#"{"error":"invalid pubkey"}"#),
|
||||
));
|
||||
}
|
||||
let wire = match BASE64.decode(incoming.typed_envelope_b64.as_bytes()) {
|
||||
Ok(v) => v,
|
||||
Err(_) => {
|
||||
return Ok(build_response(
|
||||
StatusCode::BAD_REQUEST,
|
||||
"application/json",
|
||||
hyper::Body::from(r#"{"error":"bad base64"}"#),
|
||||
));
|
||||
}
|
||||
};
|
||||
match crate::identity::NodeIdentity::verify(&incoming.from_pubkey, &wire, &incoming.signature) {
|
||||
Ok(true) => {}
|
||||
_ => {
|
||||
return Ok(build_response(
|
||||
StatusCode::FORBIDDEN,
|
||||
"application/json",
|
||||
hyper::Body::from(r#"{"error":"signature rejected"}"#),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Inject into mesh state via the shared MeshService. Mirrors a radio
|
||||
// receive, so the message lands in the same chat stream as LoRa-
|
||||
// delivered messages from the same peer.
|
||||
let service = rpc_handler.mesh_service_arc();
|
||||
let svc_guard = service.read().await;
|
||||
let Some(svc) = svc_guard.as_ref() else {
|
||||
return Ok(build_response(
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
"application/json",
|
||||
hyper::Body::from(r#"{"error":"mesh not running"}"#),
|
||||
));
|
||||
};
|
||||
if let Err(e) = svc
|
||||
.inject_typed_from_federation(&incoming.from_pubkey, incoming.from_name.as_deref(), wire)
|
||||
.await
|
||||
{
|
||||
tracing::warn!("mesh-typed relay inject failed: {}", e);
|
||||
return Ok(build_response(
|
||||
StatusCode::BAD_REQUEST,
|
||||
"application/json",
|
||||
hyper::Body::from(format!(r#"{{"error":"{}"}}"#, e)),
|
||||
));
|
||||
}
|
||||
Ok(build_response(StatusCode::OK, "application/json", hyper::Body::from(r#"{"ok":true}"#)))
|
||||
}
|
||||
}
|
||||
|
||||
@ -286,9 +286,35 @@ impl RpcHandler {
|
||||
(None, None) => format!("{} ({} bytes)", content.mime, content.size),
|
||||
};
|
||||
let typed_json = serde_json::to_value(&content).ok();
|
||||
let msg = svc
|
||||
.send_typed_wire(contact_id, wire, "content_ref", &display, typed_json, seq)
|
||||
.await?;
|
||||
|
||||
// ContentRef envelopes routinely exceed LoRa's ~160-byte per-frame
|
||||
// budget (cid alone is 64 hex chars, plus onion + cap). Route via
|
||||
// federation when the peer has a known onion; fall back to LoRa
|
||||
// only for tiny envelopes that could theoretically fit.
|
||||
let federation_onion = {
|
||||
let nodes = crate::federation::load_nodes(&self.config.data_dir)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
nodes
|
||||
.into_iter()
|
||||
.find(|n| n.pubkey == peer_pubkey_hex)
|
||||
.map(|n| n.onion)
|
||||
};
|
||||
let msg = if let Some(onion) = federation_onion {
|
||||
svc.send_typed_wire_via_federation(
|
||||
contact_id,
|
||||
&onion,
|
||||
wire,
|
||||
"content_ref",
|
||||
&display,
|
||||
typed_json,
|
||||
seq,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
svc.send_typed_wire(contact_id, wire, "content_ref", &display, typed_json, seq)
|
||||
.await?
|
||||
};
|
||||
|
||||
info!(contact_id, cid = %cid, size = meta.size, "Sent content_ref over mesh");
|
||||
Ok(serde_json::json!({
|
||||
|
||||
@ -71,7 +71,19 @@ pub(super) async fn handle_typed_message(
|
||||
return;
|
||||
}
|
||||
};
|
||||
handle_typed_envelope_direct(state, sender_contact_id, sender_name, envelope).await;
|
||||
}
|
||||
|
||||
/// Dispatch a pre-decoded TypedEnvelope. Shared between the radio receive
|
||||
/// path (handle_typed_message above) and the federation receive path
|
||||
/// (MeshService::inject_typed_from_federation) so both transports land the
|
||||
/// same payload in the same chat store.
|
||||
pub(crate) async fn handle_typed_envelope_direct(
|
||||
state: &Arc<MeshState>,
|
||||
sender_contact_id: u32,
|
||||
sender_name: &str,
|
||||
envelope: TypedEnvelope,
|
||||
) {
|
||||
// Verify envelope signature if present, using the sender's known Ed25519 key
|
||||
if envelope.sig.is_some() {
|
||||
let peer_pubkey = state.peers.read().await
|
||||
|
||||
@ -9,7 +9,7 @@
|
||||
|
||||
mod bitcoin;
|
||||
mod decode;
|
||||
mod dispatch;
|
||||
pub(crate) mod dispatch;
|
||||
mod frames;
|
||||
mod session;
|
||||
|
||||
|
||||
@ -527,6 +527,112 @@ impl MeshService {
|
||||
.await)
|
||||
}
|
||||
|
||||
/// Send a typed envelope to a peer via the federation (Tor) path rather
|
||||
/// than LoRa. Used when the envelope exceeds LoRa's per-frame budget —
|
||||
/// ContentRef is the canonical example, at 400+ bytes with thumb + cap.
|
||||
/// The peer's onion must already be in federation storage. Records a
|
||||
/// Sent MeshMessage locally on success so the UI gets the rich card.
|
||||
///
|
||||
/// This does NOT use chunking and does NOT go through the mesh radio —
|
||||
/// it is a straight HTTP POST over Tor to the peer's
|
||||
/// `/archipelago/mesh-typed` endpoint.
|
||||
pub async fn send_typed_wire_via_federation(
|
||||
&self,
|
||||
contact_id: u32,
|
||||
peer_onion: &str,
|
||||
wire: Vec<u8>,
|
||||
type_label: &str,
|
||||
display_text: &str,
|
||||
typed_payload: Option<serde_json::Value>,
|
||||
sender_seq: u64,
|
||||
) -> Result<MeshMessage> {
|
||||
use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
|
||||
use ed25519_dalek::Signer;
|
||||
|
||||
let host = if peer_onion.ends_with(".onion") {
|
||||
peer_onion.to_string()
|
||||
} else {
|
||||
format!("{}.onion", peer_onion.trim_end_matches('/'))
|
||||
};
|
||||
let url = format!("http://{}/archipelago/mesh-typed", host);
|
||||
|
||||
// Sign the raw wire bytes so the receiver can attribute the envelope
|
||||
// to our pubkey even when it arrives over federation/Tor rather than
|
||||
// the radio. Signature covers the wire only — the receiver re-hashes.
|
||||
let signature = hex::encode(self.signing_key.sign(&wire).to_bytes());
|
||||
let wire_b64 = BASE64.encode(&wire);
|
||||
let body = serde_json::json!({
|
||||
"from_pubkey": self.our_ed_pubkey_hex,
|
||||
"from_name": self.our_did,
|
||||
"typed_envelope_b64": wire_b64,
|
||||
"signature": signature,
|
||||
});
|
||||
|
||||
let proxy = reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY)
|
||||
.map_err(|e| anyhow::anyhow!("Invalid Tor proxy: {}", e))?;
|
||||
let client = reqwest::Client::builder()
|
||||
.proxy(proxy)
|
||||
.timeout(std::time::Duration::from_secs(120))
|
||||
.build()
|
||||
.map_err(|e| anyhow::anyhow!("HTTP client build failed: {}", e))?;
|
||||
let resp = client
|
||||
.post(&url)
|
||||
.json(&body)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Federation POST failed: {}", e))?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("Peer rejected typed envelope: HTTP {}", resp.status());
|
||||
}
|
||||
Ok(self
|
||||
.record_sent_typed(contact_id, type_label, display_text, typed_payload, sender_seq)
|
||||
.await)
|
||||
}
|
||||
|
||||
/// Inject a typed envelope received over federation (Tor) into MeshState
|
||||
/// as if it had arrived over the mesh radio. Looks up contact_id by
|
||||
/// matching pubkey_hex against known mesh peers; falls back to a
|
||||
/// synthetic id derived from the pubkey bytes so the UI can still
|
||||
/// address the chat (will render as a new "peer" if not in the list).
|
||||
pub async fn inject_typed_from_federation(
|
||||
&self,
|
||||
from_pubkey_hex: &str,
|
||||
from_name: Option<&str>,
|
||||
wire: Vec<u8>,
|
||||
) -> Result<()> {
|
||||
let envelope = crate::mesh::message_types::TypedEnvelope::from_wire(&wire)?;
|
||||
// Find the contact_id by pubkey match; fall back to synthetic.
|
||||
let contact_id = {
|
||||
let peers = self.state.peers.read().await;
|
||||
peers
|
||||
.iter()
|
||||
.find_map(|(cid, p)| {
|
||||
if p.pubkey_hex.as_deref() == Some(from_pubkey_hex) {
|
||||
Some(*cid)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
let bytes = hex::decode(from_pubkey_hex).unwrap_or_default();
|
||||
if bytes.len() >= 4 {
|
||||
u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
|
||||
} else {
|
||||
0
|
||||
}
|
||||
})
|
||||
};
|
||||
let display_name = from_name.unwrap_or("federation peer").to_string();
|
||||
listener::dispatch::handle_typed_envelope_direct(
|
||||
&self.state,
|
||||
contact_id,
|
||||
&display_name,
|
||||
envelope,
|
||||
)
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Allocate the next outbound seq for a target. Convenience passthrough
|
||||
/// to MeshState::next_send_seq; used by RPC handlers before encoding a
|
||||
/// TypedEnvelope so the seq on the wire matches the Sent record.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user