feat(mesh): ContentRef typed variant + send/fetch RPCs (Phase 3b)

Adds attachment sharing over the mesh: a ContentRef envelope (variant 19)
carries the blob CID, size, mime, optional thumb/caption, and a per-peer
HMAC capability URL so the recipient fetches the full blob out-of-band via
`GET {sender_onion}/blob/{cid}?cap=..&exp=..&peer=..`. BlobStore is shared
from ApiHandler into RpcHandler so mesh.send-content and mesh.fetch-content
(reqwest via TOR_SOCKS_PROXY) hit the same store and cap_key.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dorian 2026-04-13 11:10:49 -04:00
parent e8a729a4c7
commit dce5084451
6 changed files with 289 additions and 1 deletions

View File

@ -78,6 +78,13 @@ impl ApiHandler {
let blob_store = Arc::new(BlobStore::open(&config.data_dir, cap_key).await?);
let self_pubkey_hex = hex::encode(identity.signing_key().verifying_key().as_bytes());
// Share blob store with the RPC layer so mesh.send-content /
// mesh.fetch-content can reach the same instance (single cap_key,
// single on-disk root) without re-opening it.
rpc_handler
.set_blob_store(blob_store.clone(), self_pubkey_hex.clone())
.await;
Ok(Self {
config,
rpc_handler,

View File

@ -295,6 +295,8 @@ impl RpcHandler {
"mesh.send-invoice" => self.handle_mesh_send_invoice(params).await,
"mesh.send-coordinate" => self.handle_mesh_send_coordinate(params).await,
"mesh.send-alert" => self.handle_mesh_send_alert(params).await,
"mesh.send-content" => self.handle_mesh_send_content(params).await,
"mesh.fetch-content" => self.handle_mesh_fetch_content(params).await,
"mesh.outbox" => self.handle_mesh_outbox(params).await,
"mesh.session-status" => self.handle_mesh_session_status(params).await,
"mesh.rotate-prekeys" => self.handle_mesh_rotate_prekeys().await,

View File

@ -1,6 +1,8 @@
use super::super::RpcHandler;
use crate::blobs::DEFAULT_CAP_TTL_SECS;
use crate::mesh::message_types::{
self, AlertPayload, AlertType, Coordinate, InvoicePayload, MeshMessageType, TypedEnvelope,
self, AlertPayload, AlertType, ContentRefPayload, Coordinate, InvoicePayload, MeshMessageType,
TypedEnvelope,
};
use anyhow::Result;
use tracing::info;
@ -196,4 +198,223 @@ impl RpcHandler {
"signed": envelope.sig.is_some(),
}))
}
/// mesh.send-content — Send a ContentRef for a locally-stored blob to a mesh peer.
/// Params: { contact_id, cid, caption? }. Looks up the peer's pubkey from
/// MeshState, loads BlobMeta, issues a peer-scoped capability, and sends a
/// typed envelope over the mesh. The recipient fetches the blob out-of-band
/// via `GET {sender_onion}/blob/{cid}?cap=..&exp=..&peer=..`.
pub(in crate::api::rpc) async fn handle_mesh_send_content(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let contact_id = params["contact_id"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32;
let cid = params["cid"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing cid"))?
.to_string();
let caption = params["caption"].as_str().map(|s| s.to_string());
// Pull the shared blob store (set by ApiHandler at startup).
let blob_store = {
let guard = self.blob_store.read().await;
guard
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Blob store not initialised"))?
.clone()
};
let meta = blob_store.meta(&cid).await?;
// Need the live mesh service for peer lookup + send.
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
// Resolve peer Ed25519 pubkey from contact_id so we can scope the cap.
let state = svc.shared_state();
let peer_pubkey_hex = {
let peers = state.peers.read().await;
let peer = peers
.get(&contact_id)
.ok_or_else(|| anyhow::anyhow!("Unknown mesh contact_id {}", contact_id))?;
peer.pubkey_hex
.clone()
.ok_or_else(|| anyhow::anyhow!("Peer has no pubkey yet"))?
};
// Our onion (stripped of scheme/trailing slash) for the URL the receiver will hit.
let (data, _) = self.state_manager.get_snapshot().await;
let sender_onion = data
.server_info
.tor_address
.clone()
.ok_or_else(|| anyhow::anyhow!("Tor onion not yet available"))?
.trim_end_matches('/')
.to_string();
let exp = (chrono::Utc::now().timestamp() as u64) + DEFAULT_CAP_TTL_SECS;
let cap_token = blob_store.issue_capability(&cid, &peer_pubkey_hex, exp);
let content = ContentRefPayload {
cid: cid.clone(),
size: meta.size,
mime: meta.mime.clone(),
filename: meta.filename.clone(),
thumb_bytes: meta.thumb_bytes.clone(),
caption: caption.clone(),
sender_onion,
cap_token,
cap_exp: exp,
};
let payload = message_types::encode_payload(&content)?;
let envelope = TypedEnvelope::new(MeshMessageType::ContentRef, payload);
let wire = envelope.to_wire()?;
let display = match (&content.filename, &content.caption) {
(Some(fname), Some(c)) => format!("{}{}", fname, c),
(Some(fname), None) => fname.clone(),
(None, Some(c)) => c.clone(),
(None, None) => format!("{} ({} bytes)", content.mime, content.size),
};
let typed_json = serde_json::to_value(&content).ok();
let msg = svc
.send_typed_wire(contact_id, wire, "content_ref", &display, typed_json)
.await?;
info!(contact_id, cid = %cid, size = meta.size, "Sent content_ref over mesh");
Ok(serde_json::json!({
"sent": true,
"message_id": msg.id,
"cid": cid,
"size": meta.size,
}))
}
/// mesh.fetch-content — Fetch a ContentRef blob from the sender's onion and
/// persist it to our local blob store. Params must include everything the
/// receiver needs to construct and authorise the URL: `{ cid, sender_onion,
/// cap_token, cap_exp, mime?, filename? }`. The caller is typically the UI
/// clicking a content card it just received.
pub(in crate::api::rpc) async fn handle_mesh_fetch_content(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let cid = params["cid"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing cid"))?
.to_string();
let sender_onion = params["sender_onion"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing sender_onion"))?
.trim_end_matches('/')
.to_string();
let cap_token = params["cap_token"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing cap_token"))?
.to_string();
let cap_exp = params["cap_exp"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing cap_exp"))?;
let mime_hint = params["mime"].as_str().unwrap_or("application/octet-stream").to_string();
let filename_hint = params["filename"].as_str().map(|s| s.to_string());
let blob_store = {
let guard = self.blob_store.read().await;
guard
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Blob store not initialised"))?
.clone()
};
let self_pubkey_hex = {
let guard = self.self_pubkey_hex.read().await;
guard
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Self pubkey not set"))?
.clone()
};
// Short-circuit if we already hold the blob — still issue a fresh
// self-cap so the UI gets a displayable local URL.
if blob_store.has(&cid).await {
let local_exp = (chrono::Utc::now().timestamp() as u64) + DEFAULT_CAP_TTL_SECS;
let local_cap = blob_store.issue_capability(&cid, &self_pubkey_hex, local_exp);
let local_url = format!(
"/blob/{}?cap={}&exp={}&peer={}",
cid, local_cap, local_exp, self_pubkey_hex
);
return Ok(serde_json::json!({
"fetched": false,
"cached": true,
"cid": cid,
"local_url": local_url,
}));
}
// Reach the sender over Tor. Onion host is used verbatim; cap/exp/peer
// match what the sender signed in handle_mesh_send_content.
let url = format!(
"http://{}/blob/{}?cap={}&exp={}&peer={}",
sender_onion.trim_start_matches("http://").trim_start_matches("https://"),
cid,
cap_token,
cap_exp,
self_pubkey_hex,
);
let socks_proxy = reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY)
.map_err(|e| anyhow::anyhow!("SOCKS proxy setup failed: {}", e))?;
let client = reqwest::Client::builder()
.proxy(socks_proxy)
.timeout(std::time::Duration::from_secs(120))
.build()
.map_err(|e| anyhow::anyhow!("HTTP client build failed: {}", e))?;
let resp = client
.get(&url)
.send()
.await
.map_err(|e| anyhow::anyhow!("Fetch failed: {}", e))?;
if !resp.status().is_success() {
anyhow::bail!("Blob fetch HTTP {}", resp.status());
}
let mime = resp
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
.unwrap_or(mime_hint);
let bytes = resp
.bytes()
.await
.map_err(|e| anyhow::anyhow!("Read body failed: {}", e))?;
let meta = blob_store.put(&bytes, &mime, filename_hint, None).await?;
if meta.cid != cid {
anyhow::bail!("CID mismatch: expected {}, got {}", cid, meta.cid);
}
// Self-signed capability so the UI can GET /blob/<cid> from our own
// node without a second round-trip.
let local_exp = (chrono::Utc::now().timestamp() as u64) + DEFAULT_CAP_TTL_SECS;
let local_cap = blob_store.issue_capability(&meta.cid, &self_pubkey_hex, local_exp);
let local_url = format!(
"/blob/{}?cap={}&exp={}&peer={}",
meta.cid, local_cap, local_exp, self_pubkey_hex
);
info!(cid = %cid, size = meta.size, "Fetched content_ref blob via tor");
Ok(serde_json::json!({
"fetched": true,
"cached": false,
"cid": meta.cid,
"size": meta.size,
"mime": meta.mime,
"local_url": local_url,
}))
}
}

View File

@ -71,6 +71,13 @@ pub struct RpcHandler {
response_cache: ResponseCache,
mesh_service: Arc<tokio::sync::RwLock<Option<crate::mesh::MeshService>>>,
transport_router: Arc<tokio::sync::RwLock<Option<Arc<crate::transport::TransportRouter>>>>,
/// Shared content-addressed blob store. Set by ApiHandler after construction
/// so mesh.send-content / mesh.fetch-content RPCs can reach it without a
/// second instance and duplicated cap_key.
pub(crate) blob_store: Arc<tokio::sync::RwLock<Option<Arc<crate::blobs::BlobStore>>>>,
/// Our own Ed25519 pubkey hex — needed by ContentRef senders for cap scoping
/// and by ContentRef receivers to request caps scoped to themselves.
pub(crate) self_pubkey_hex: Arc<tokio::sync::RwLock<Option<String>>>,
}
impl RpcHandler {
@ -128,6 +135,8 @@ impl RpcHandler {
response_cache: ResponseCache::new(5),
mesh_service: Arc::new(tokio::sync::RwLock::new(None)),
transport_router: Arc::new(tokio::sync::RwLock::new(None)),
blob_store: Arc::new(tokio::sync::RwLock::new(None)),
self_pubkey_hex: Arc::new(tokio::sync::RwLock::new(None)),
})
}
@ -141,6 +150,13 @@ impl RpcHandler {
*self.transport_router.write().await = Some(router);
}
/// Share the blob store + our pubkey so mesh.send-content / fetch-content
/// can reach them. Called once from ApiHandler::new.
pub async fn set_blob_store(&self, store: Arc<crate::blobs::BlobStore>, self_pubkey_hex: String) {
*self.blob_store.write().await = Some(store);
*self.self_pubkey_hex.write().await = Some(self_pubkey_hex);
}
/// Get reference to the mesh service Arc (for MeshTransport wrapper).
pub fn mesh_service_arc(&self) -> Arc<tokio::sync::RwLock<Option<crate::mesh::MeshService>>> {
Arc::clone(&self.mesh_service)

View File

@ -277,6 +277,22 @@ pub(super) async fn handle_typed_message(
dispatch_tx_confirmation(&envelope, sender_contact_id, sender_name, state).await;
}
Some(MeshMessageType::ContentRef) => {
match message_types::decode_payload::<message_types::ContentRefPayload>(&envelope.v) {
Ok(content) => {
let text = match (&content.filename, &content.caption) {
(Some(fname), Some(c)) => format!("📎 {}{}", fname, c),
(Some(fname), None) => format!("📎 {}", fname),
(None, Some(c)) => format!("📎 {}", c),
(None, None) => format!("📎 {} ({} bytes)", content.mime, content.size),
};
let json = payload_to_json(&content);
store_typed_message(state, sender_contact_id, sender_name, &text, "content_ref", json, Some(envelope.seq)).await;
}
Err(e) => warn!("Failed to decode content_ref payload: {}", e),
}
}
Some(MeshMessageType::Text) => {
// Typed text message — extract and store as plain text
let text = String::from_utf8_lossy(&envelope.v).to_string();

View File

@ -42,6 +42,8 @@ pub enum MeshMessageType {
LightningRelayResponse = 11,
/// Confirmation update for a relayed transaction (1, 2, 3 confs).
TxConfirmation = 12,
/// Attachment/file reference: CID of a blob held by the sender, fetched out-of-band.
ContentRef = 19,
}
impl MeshMessageType {
@ -60,6 +62,7 @@ impl MeshMessageType {
10 => Some(Self::LightningRelay),
11 => Some(Self::LightningRelayResponse),
12 => Some(Self::TxConfirmation),
19 => Some(Self::ContentRef),
_ => None,
}
}
@ -79,6 +82,7 @@ impl MeshMessageType {
Self::LightningRelay => "lightning_relay",
Self::LightningRelayResponse => "lightning_relay_response",
Self::TxConfirmation => "tx_confirmation",
Self::ContentRef => "content_ref",
}
}
}
@ -316,6 +320,28 @@ pub struct LightningRelayResponsePayload {
pub error: Option<String>,
}
/// Content/attachment reference: points at a blob held by the sender that
/// recipients fetch out-of-band via `GET {sender_onion}/blob/{cid}?cap=..&exp=..&peer=..`.
/// Thumb bytes (≤60B) may be inlined for immediate display; full blob is lazy.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContentRefPayload {
pub cid: String,
pub size: u64,
pub mime: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub filename: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub thumb_bytes: Option<Vec<u8>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub caption: Option<String>,
/// Sender's onion host (no scheme, no trailing slash) — `<host>.onion`.
pub sender_onion: String,
/// HMAC capability token scoped to (cid, recipient_pubkey, exp).
pub cap_token: String,
/// Capability expiry (unix seconds).
pub cap_exp: u64,
}
/// Transaction confirmation update (relay node → originator).
/// Sent after each new confirmation (1, 2, 3) until fully confirmed.
#[derive(Debug, Clone, Serialize, Deserialize)]