diff --git a/core/archipelago/src/api/handler/mod.rs b/core/archipelago/src/api/handler/mod.rs index 4ef12c22..86ed579b 100644 --- a/core/archipelago/src/api/handler/mod.rs +++ b/core/archipelago/src/api/handler/mod.rs @@ -78,6 +78,13 @@ impl ApiHandler { let blob_store = Arc::new(BlobStore::open(&config.data_dir, cap_key).await?); let self_pubkey_hex = hex::encode(identity.signing_key().verifying_key().as_bytes()); + // Share blob store with the RPC layer so mesh.send-content / + // mesh.fetch-content can reach the same instance (single cap_key, + // single on-disk root) without re-opening it. + rpc_handler + .set_blob_store(blob_store.clone(), self_pubkey_hex.clone()) + .await; + Ok(Self { config, rpc_handler, diff --git a/core/archipelago/src/api/rpc/dispatcher.rs b/core/archipelago/src/api/rpc/dispatcher.rs index e14c9aac..0ea0dd7f 100644 --- a/core/archipelago/src/api/rpc/dispatcher.rs +++ b/core/archipelago/src/api/rpc/dispatcher.rs @@ -295,6 +295,8 @@ impl RpcHandler { "mesh.send-invoice" => self.handle_mesh_send_invoice(params).await, "mesh.send-coordinate" => self.handle_mesh_send_coordinate(params).await, "mesh.send-alert" => self.handle_mesh_send_alert(params).await, + "mesh.send-content" => self.handle_mesh_send_content(params).await, + "mesh.fetch-content" => self.handle_mesh_fetch_content(params).await, "mesh.outbox" => self.handle_mesh_outbox(params).await, "mesh.session-status" => self.handle_mesh_session_status(params).await, "mesh.rotate-prekeys" => self.handle_mesh_rotate_prekeys().await, diff --git a/core/archipelago/src/api/rpc/mesh/typed_messages.rs b/core/archipelago/src/api/rpc/mesh/typed_messages.rs index ae924ed7..dcbb0e3b 100644 --- a/core/archipelago/src/api/rpc/mesh/typed_messages.rs +++ b/core/archipelago/src/api/rpc/mesh/typed_messages.rs @@ -1,6 +1,8 @@ use super::super::RpcHandler; +use crate::blobs::DEFAULT_CAP_TTL_SECS; use crate::mesh::message_types::{ - self, AlertPayload, AlertType, Coordinate, InvoicePayload, MeshMessageType, TypedEnvelope, + self, AlertPayload, AlertType, ContentRefPayload, Coordinate, InvoicePayload, MeshMessageType, + TypedEnvelope, }; use anyhow::Result; use tracing::info; @@ -196,4 +198,223 @@ impl RpcHandler { "signed": envelope.sig.is_some(), })) } + + /// mesh.send-content — Send a ContentRef for a locally-stored blob to a mesh peer. + /// Params: { contact_id, cid, caption? }. Looks up the peer's pubkey from + /// MeshState, loads BlobMeta, issues a peer-scoped capability, and sends a + /// typed envelope over the mesh. The recipient fetches the blob out-of-band + /// via `GET {sender_onion}/blob/{cid}?cap=..&exp=..&peer=..`. + pub(in crate::api::rpc) async fn handle_mesh_send_content( + &self, + params: Option, + ) -> Result { + let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; + let contact_id = params["contact_id"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32; + let cid = params["cid"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing cid"))? + .to_string(); + let caption = params["caption"].as_str().map(|s| s.to_string()); + + // Pull the shared blob store (set by ApiHandler at startup). + let blob_store = { + let guard = self.blob_store.read().await; + guard + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Blob store not initialised"))? + .clone() + }; + let meta = blob_store.meta(&cid).await?; + + // Need the live mesh service for peer lookup + send. + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + + // Resolve peer Ed25519 pubkey from contact_id so we can scope the cap. + let state = svc.shared_state(); + let peer_pubkey_hex = { + let peers = state.peers.read().await; + let peer = peers + .get(&contact_id) + .ok_or_else(|| anyhow::anyhow!("Unknown mesh contact_id {}", contact_id))?; + peer.pubkey_hex + .clone() + .ok_or_else(|| anyhow::anyhow!("Peer has no pubkey yet"))? + }; + + // Our onion (stripped of scheme/trailing slash) for the URL the receiver will hit. + let (data, _) = self.state_manager.get_snapshot().await; + let sender_onion = data + .server_info + .tor_address + .clone() + .ok_or_else(|| anyhow::anyhow!("Tor onion not yet available"))? + .trim_end_matches('/') + .to_string(); + + let exp = (chrono::Utc::now().timestamp() as u64) + DEFAULT_CAP_TTL_SECS; + let cap_token = blob_store.issue_capability(&cid, &peer_pubkey_hex, exp); + + let content = ContentRefPayload { + cid: cid.clone(), + size: meta.size, + mime: meta.mime.clone(), + filename: meta.filename.clone(), + thumb_bytes: meta.thumb_bytes.clone(), + caption: caption.clone(), + sender_onion, + cap_token, + cap_exp: exp, + }; + + let payload = message_types::encode_payload(&content)?; + let envelope = TypedEnvelope::new(MeshMessageType::ContentRef, payload); + let wire = envelope.to_wire()?; + + let display = match (&content.filename, &content.caption) { + (Some(fname), Some(c)) => format!("{} — {}", fname, c), + (Some(fname), None) => fname.clone(), + (None, Some(c)) => c.clone(), + (None, None) => format!("{} ({} bytes)", content.mime, content.size), + }; + let typed_json = serde_json::to_value(&content).ok(); + let msg = svc + .send_typed_wire(contact_id, wire, "content_ref", &display, typed_json) + .await?; + + info!(contact_id, cid = %cid, size = meta.size, "Sent content_ref over mesh"); + Ok(serde_json::json!({ + "sent": true, + "message_id": msg.id, + "cid": cid, + "size": meta.size, + })) + } + + /// mesh.fetch-content — Fetch a ContentRef blob from the sender's onion and + /// persist it to our local blob store. Params must include everything the + /// receiver needs to construct and authorise the URL: `{ cid, sender_onion, + /// cap_token, cap_exp, mime?, filename? }`. The caller is typically the UI + /// clicking a content card it just received. + pub(in crate::api::rpc) async fn handle_mesh_fetch_content( + &self, + params: Option, + ) -> Result { + let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; + let cid = params["cid"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing cid"))? + .to_string(); + let sender_onion = params["sender_onion"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing sender_onion"))? + .trim_end_matches('/') + .to_string(); + let cap_token = params["cap_token"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing cap_token"))? + .to_string(); + let cap_exp = params["cap_exp"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing cap_exp"))?; + let mime_hint = params["mime"].as_str().unwrap_or("application/octet-stream").to_string(); + let filename_hint = params["filename"].as_str().map(|s| s.to_string()); + + let blob_store = { + let guard = self.blob_store.read().await; + guard + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Blob store not initialised"))? + .clone() + }; + + let self_pubkey_hex = { + let guard = self.self_pubkey_hex.read().await; + guard + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Self pubkey not set"))? + .clone() + }; + + // Short-circuit if we already hold the blob — still issue a fresh + // self-cap so the UI gets a displayable local URL. + if blob_store.has(&cid).await { + let local_exp = (chrono::Utc::now().timestamp() as u64) + DEFAULT_CAP_TTL_SECS; + let local_cap = blob_store.issue_capability(&cid, &self_pubkey_hex, local_exp); + let local_url = format!( + "/blob/{}?cap={}&exp={}&peer={}", + cid, local_cap, local_exp, self_pubkey_hex + ); + return Ok(serde_json::json!({ + "fetched": false, + "cached": true, + "cid": cid, + "local_url": local_url, + })); + } + + // Reach the sender over Tor. Onion host is used verbatim; cap/exp/peer + // match what the sender signed in handle_mesh_send_content. + let url = format!( + "http://{}/blob/{}?cap={}&exp={}&peer={}", + sender_onion.trim_start_matches("http://").trim_start_matches("https://"), + cid, + cap_token, + cap_exp, + self_pubkey_hex, + ); + + let socks_proxy = reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY) + .map_err(|e| anyhow::anyhow!("SOCKS proxy setup failed: {}", e))?; + let client = reqwest::Client::builder() + .proxy(socks_proxy) + .timeout(std::time::Duration::from_secs(120)) + .build() + .map_err(|e| anyhow::anyhow!("HTTP client build failed: {}", e))?; + + let resp = client + .get(&url) + .send() + .await + .map_err(|e| anyhow::anyhow!("Fetch failed: {}", e))?; + if !resp.status().is_success() { + anyhow::bail!("Blob fetch HTTP {}", resp.status()); + } + let mime = resp + .headers() + .get(reqwest::header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()) + .unwrap_or(mime_hint); + let bytes = resp + .bytes() + .await + .map_err(|e| anyhow::anyhow!("Read body failed: {}", e))?; + + let meta = blob_store.put(&bytes, &mime, filename_hint, None).await?; + if meta.cid != cid { + anyhow::bail!("CID mismatch: expected {}, got {}", cid, meta.cid); + } + // Self-signed capability so the UI can GET /blob/ from our own + // node without a second round-trip. + let local_exp = (chrono::Utc::now().timestamp() as u64) + DEFAULT_CAP_TTL_SECS; + let local_cap = blob_store.issue_capability(&meta.cid, &self_pubkey_hex, local_exp); + let local_url = format!( + "/blob/{}?cap={}&exp={}&peer={}", + meta.cid, local_cap, local_exp, self_pubkey_hex + ); + info!(cid = %cid, size = meta.size, "Fetched content_ref blob via tor"); + Ok(serde_json::json!({ + "fetched": true, + "cached": false, + "cid": meta.cid, + "size": meta.size, + "mime": meta.mime, + "local_url": local_url, + })) + } } diff --git a/core/archipelago/src/api/rpc/mod.rs b/core/archipelago/src/api/rpc/mod.rs index e311d3b3..80caa4ae 100644 --- a/core/archipelago/src/api/rpc/mod.rs +++ b/core/archipelago/src/api/rpc/mod.rs @@ -71,6 +71,13 @@ pub struct RpcHandler { response_cache: ResponseCache, mesh_service: Arc>>, transport_router: Arc>>>, + /// Shared content-addressed blob store. Set by ApiHandler after construction + /// so mesh.send-content / mesh.fetch-content RPCs can reach it without a + /// second instance and duplicated cap_key. + pub(crate) blob_store: Arc>>>, + /// Our own Ed25519 pubkey hex — needed by ContentRef senders for cap scoping + /// and by ContentRef receivers to request caps scoped to themselves. + pub(crate) self_pubkey_hex: Arc>>, } impl RpcHandler { @@ -128,6 +135,8 @@ impl RpcHandler { response_cache: ResponseCache::new(5), mesh_service: Arc::new(tokio::sync::RwLock::new(None)), transport_router: Arc::new(tokio::sync::RwLock::new(None)), + blob_store: Arc::new(tokio::sync::RwLock::new(None)), + self_pubkey_hex: Arc::new(tokio::sync::RwLock::new(None)), }) } @@ -141,6 +150,13 @@ impl RpcHandler { *self.transport_router.write().await = Some(router); } + /// Share the blob store + our pubkey so mesh.send-content / fetch-content + /// can reach them. Called once from ApiHandler::new. + pub async fn set_blob_store(&self, store: Arc, self_pubkey_hex: String) { + *self.blob_store.write().await = Some(store); + *self.self_pubkey_hex.write().await = Some(self_pubkey_hex); + } + /// Get reference to the mesh service Arc (for MeshTransport wrapper). pub fn mesh_service_arc(&self) -> Arc>> { Arc::clone(&self.mesh_service) diff --git a/core/archipelago/src/mesh/listener/dispatch.rs b/core/archipelago/src/mesh/listener/dispatch.rs index cccd3fa8..3519c4d0 100644 --- a/core/archipelago/src/mesh/listener/dispatch.rs +++ b/core/archipelago/src/mesh/listener/dispatch.rs @@ -277,6 +277,22 @@ pub(super) async fn handle_typed_message( dispatch_tx_confirmation(&envelope, sender_contact_id, sender_name, state).await; } + Some(MeshMessageType::ContentRef) => { + match message_types::decode_payload::(&envelope.v) { + Ok(content) => { + let text = match (&content.filename, &content.caption) { + (Some(fname), Some(c)) => format!("📎 {} — {}", fname, c), + (Some(fname), None) => format!("📎 {}", fname), + (None, Some(c)) => format!("📎 {}", c), + (None, None) => format!("📎 {} ({} bytes)", content.mime, content.size), + }; + let json = payload_to_json(&content); + store_typed_message(state, sender_contact_id, sender_name, &text, "content_ref", json, Some(envelope.seq)).await; + } + Err(e) => warn!("Failed to decode content_ref payload: {}", e), + } + } + Some(MeshMessageType::Text) => { // Typed text message — extract and store as plain text let text = String::from_utf8_lossy(&envelope.v).to_string(); diff --git a/core/archipelago/src/mesh/message_types.rs b/core/archipelago/src/mesh/message_types.rs index 881f71b1..7c61e12a 100644 --- a/core/archipelago/src/mesh/message_types.rs +++ b/core/archipelago/src/mesh/message_types.rs @@ -42,6 +42,8 @@ pub enum MeshMessageType { LightningRelayResponse = 11, /// Confirmation update for a relayed transaction (1, 2, 3 confs). TxConfirmation = 12, + /// Attachment/file reference: CID of a blob held by the sender, fetched out-of-band. + ContentRef = 19, } impl MeshMessageType { @@ -60,6 +62,7 @@ impl MeshMessageType { 10 => Some(Self::LightningRelay), 11 => Some(Self::LightningRelayResponse), 12 => Some(Self::TxConfirmation), + 19 => Some(Self::ContentRef), _ => None, } } @@ -79,6 +82,7 @@ impl MeshMessageType { Self::LightningRelay => "lightning_relay", Self::LightningRelayResponse => "lightning_relay_response", Self::TxConfirmation => "tx_confirmation", + Self::ContentRef => "content_ref", } } } @@ -316,6 +320,28 @@ pub struct LightningRelayResponsePayload { pub error: Option, } +/// Content/attachment reference: points at a blob held by the sender that +/// recipients fetch out-of-band via `GET {sender_onion}/blob/{cid}?cap=..&exp=..&peer=..`. +/// Thumb bytes (≤60B) may be inlined for immediate display; full blob is lazy. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ContentRefPayload { + pub cid: String, + pub size: u64, + pub mime: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub filename: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub thumb_bytes: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub caption: Option, + /// Sender's onion host (no scheme, no trailing slash) — `.onion`. + pub sender_onion: String, + /// HMAC capability token scoped to (cid, recipient_pubkey, exp). + pub cap_token: String, + /// Capability expiry (unix seconds). + pub cap_exp: u64, +} + /// Transaction confirmation update (relay node → originator). /// Sent after each new confirmation (1, 2, 3) until fully confirmed. #[derive(Debug, Clone, Serialize, Deserialize)]