diff --git a/core/archipelago/src/api/rpc/dispatcher.rs b/core/archipelago/src/api/rpc/dispatcher.rs index 0ea0dd7f..e2afecd2 100644 --- a/core/archipelago/src/api/rpc/dispatcher.rs +++ b/core/archipelago/src/api/rpc/dispatcher.rs @@ -297,6 +297,8 @@ impl RpcHandler { "mesh.send-alert" => self.handle_mesh_send_alert(params).await, "mesh.send-content" => self.handle_mesh_send_content(params).await, "mesh.fetch-content" => self.handle_mesh_fetch_content(params).await, + "mesh.send-reply" => self.handle_mesh_send_reply(params).await, + "mesh.send-reaction" => self.handle_mesh_send_reaction(params).await, "mesh.outbox" => self.handle_mesh_outbox(params).await, "mesh.session-status" => self.handle_mesh_session_status(params).await, "mesh.rotate-prekeys" => self.handle_mesh_rotate_prekeys().await, diff --git a/core/archipelago/src/api/rpc/mesh/typed_messages.rs b/core/archipelago/src/api/rpc/mesh/typed_messages.rs index dcbb0e3b..4c7963ff 100644 --- a/core/archipelago/src/api/rpc/mesh/typed_messages.rs +++ b/core/archipelago/src/api/rpc/mesh/typed_messages.rs @@ -1,8 +1,8 @@ use super::super::RpcHandler; use crate::blobs::DEFAULT_CAP_TTL_SECS; use crate::mesh::message_types::{ - self, AlertPayload, AlertType, ContentRefPayload, Coordinate, InvoicePayload, MeshMessageType, - TypedEnvelope, + self, AlertPayload, AlertType, ContentRefPayload, Coordinate, InvoicePayload, MessageKey, + MeshMessageType, ReactionPayload, ReplyPayload, TypedEnvelope, }; use anyhow::Result; use tracing::info; @@ -30,16 +30,17 @@ impl RpcHandler { payment_hash: None, }; - let payload = message_types::encode_payload(&invoice)?; - let envelope = TypedEnvelope::new(MeshMessageType::Invoice, payload); - let wire = envelope.to_wire()?; - // Send via mesh let service = self.mesh_service.read().await; let svc = service .as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let seq = svc.next_send_seq(contact_id).await; + let payload = message_types::encode_payload(&invoice)?; + let envelope = TypedEnvelope::new(MeshMessageType::Invoice, payload).with_seq(seq); + let wire = envelope.to_wire()?; + let display = format!( "Invoice: {} sats{}", amount_sats, @@ -47,7 +48,7 @@ impl RpcHandler { ); let typed_json = serde_json::to_value(&invoice).ok(); let msg = svc - .send_typed_wire(contact_id, wire, "invoice", &display, typed_json) + .send_typed_wire(contact_id, wire, "invoice", &display, typed_json, seq) .await?; info!(contact_id, amount_sats, "Sent invoice over mesh"); @@ -77,15 +78,17 @@ impl RpcHandler { let label = params["label"].as_str().map(|s| s.to_string()); let coord = Coordinate::from_degrees(lat, lng, label); - let payload = message_types::encode_payload(&coord)?; - let envelope = TypedEnvelope::new(MeshMessageType::Coordinate, payload); - let wire = envelope.to_wire()?; let service = self.mesh_service.read().await; let svc = service .as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let seq = svc.next_send_seq(contact_id).await; + let payload = message_types::encode_payload(&coord)?; + let envelope = TypedEnvelope::new(MeshMessageType::Coordinate, payload).with_seq(seq); + let wire = envelope.to_wire()?; + let display = format!( "Location: {:.6}, {:.6}{}", coord.lat_degrees(), @@ -94,7 +97,7 @@ impl RpcHandler { ); let typed_json = serde_json::to_value(&coord).ok(); let msg = svc - .send_typed_wire(contact_id, wire, "coordinate", &display, typed_json) + .send_typed_wire(contact_id, wire, "coordinate", &display, typed_json, seq) .await?; info!(contact_id, "Sent coordinate over mesh"); @@ -149,7 +152,7 @@ impl RpcHandler { let identity_dir = self.config.data_dir.join("identity"); let node_key_path = identity_dir.join("node_key"); - let envelope = if node_key_path.exists() { + let unsigned_envelope = if node_key_path.exists() { let key_bytes = tokio::fs::read(&node_key_path).await?; if key_bytes.len() == 32 { let mut seed = [0u8; 32]; @@ -163,8 +166,6 @@ impl RpcHandler { TypedEnvelope::new(MeshMessageType::Alert, payload) }; - let wire = envelope.to_wire()?; - let service = self.mesh_service.read().await; let svc = service .as_ref() @@ -172,21 +173,23 @@ impl RpcHandler { let display = alert.message.clone(); let typed_json = serde_json::to_value(&alert).ok(); + let signed = unsigned_envelope.sig.is_some(); if broadcast { - // Send on public channel (all peers) as raw bytes so the binary - // envelope is not corrupted by utf8 conversion. - svc.send_channel_typed_wire(0, wire, "alert", &display, typed_json.clone()) + // Channel 0 uses target=0 for seq allocation. Signature covers + // (t, v, ts) — setting seq afterwards does NOT invalidate it. + let seq = svc.next_send_seq(0).await; + let envelope = unsigned_envelope.with_seq(seq); + let wire = envelope.to_wire()?; + svc.send_channel_typed_wire(0, wire, "alert", &display, typed_json.clone(), seq) .await?; info!(alert_type = alert_type_str, "Broadcast alert over mesh"); } else if let Some(contact_id) = params["contact_id"].as_u64() { - svc.send_typed_wire( - contact_id as u32, - wire, - "alert", - &display, - typed_json, - ) - .await?; + let contact_id = contact_id as u32; + let seq = svc.next_send_seq(contact_id).await; + let envelope = unsigned_envelope.with_seq(seq); + let wire = envelope.to_wire()?; + svc.send_typed_wire(contact_id, wire, "alert", &display, typed_json, seq) + .await?; info!(contact_id, alert_type = alert_type_str, "Sent alert to peer"); } else { anyhow::bail!("Must specify contact_id or broadcast: true"); @@ -195,7 +198,7 @@ impl RpcHandler { Ok(serde_json::json!({ "sent": true, "alert_type": alert_type_str, - "signed": envelope.sig.is_some(), + "signed": signed, })) } @@ -271,8 +274,9 @@ impl RpcHandler { cap_exp: exp, }; + let seq = svc.next_send_seq(contact_id).await; let payload = message_types::encode_payload(&content)?; - let envelope = TypedEnvelope::new(MeshMessageType::ContentRef, payload); + let envelope = TypedEnvelope::new(MeshMessageType::ContentRef, payload).with_seq(seq); let wire = envelope.to_wire()?; let display = match (&content.filename, &content.caption) { @@ -283,7 +287,7 @@ impl RpcHandler { }; let typed_json = serde_json::to_value(&content).ok(); let msg = svc - .send_typed_wire(contact_id, wire, "content_ref", &display, typed_json) + .send_typed_wire(contact_id, wire, "content_ref", &display, typed_json, seq) .await?; info!(contact_id, cid = %cid, size = meta.size, "Sent content_ref over mesh"); @@ -295,6 +299,101 @@ impl RpcHandler { })) } + /// mesh.send-reply — Send a text reply targeted at an earlier message. + /// Params: { contact_id, target_pubkey, target_seq, text }. The target + /// MessageKey identifies the message being replied to; it does NOT need + /// to be our peer — cross-transport replies to anyone work. + pub(in crate::api::rpc) async fn handle_mesh_send_reply( + &self, + params: Option, + ) -> Result { + let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; + let contact_id = params["contact_id"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32; + let target_pubkey = params["target_pubkey"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing target_pubkey"))? + .to_string(); + let target_seq = params["target_seq"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing target_seq"))?; + let text = params["text"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing text"))? + .to_string(); + + let reply = ReplyPayload { + target: MessageKey { sender_pubkey: target_pubkey, sender_seq: target_seq }, + text: text.clone(), + }; + + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let seq = svc.next_send_seq(contact_id).await; + let payload = message_types::encode_payload(&reply)?; + let envelope = TypedEnvelope::new(MeshMessageType::Reply, payload).with_seq(seq); + let wire = envelope.to_wire()?; + let typed_json = serde_json::to_value(&reply).ok(); + let msg = svc + .send_typed_wire(contact_id, wire, "reply", &text, typed_json, seq) + .await?; + info!(contact_id, seq, "Sent reply over mesh"); + Ok(serde_json::json!({ + "sent": true, + "message_id": msg.id, + "sender_seq": seq, + })) + } + + /// mesh.send-reaction — Emoji reaction on an earlier message. + /// Params: { contact_id, target_pubkey, target_seq, emoji }. An empty + /// emoji string clears any existing reaction from us on that target. + pub(in crate::api::rpc) async fn handle_mesh_send_reaction( + &self, + params: Option, + ) -> Result { + let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; + let contact_id = params["contact_id"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32; + let target_pubkey = params["target_pubkey"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing target_pubkey"))? + .to_string(); + let target_seq = params["target_seq"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing target_seq"))?; + let emoji = params["emoji"].as_str().unwrap_or("").to_string(); + + let reaction = ReactionPayload { + target: MessageKey { sender_pubkey: target_pubkey, sender_seq: target_seq }, + emoji: emoji.clone(), + }; + + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let seq = svc.next_send_seq(contact_id).await; + let payload = message_types::encode_payload(&reaction)?; + let envelope = TypedEnvelope::new(MeshMessageType::Reaction, payload).with_seq(seq); + let wire = envelope.to_wire()?; + let display = if emoji.is_empty() { "(cleared)".to_string() } else { emoji.clone() }; + let typed_json = serde_json::to_value(&reaction).ok(); + let msg = svc + .send_typed_wire(contact_id, wire, "reaction", &display, typed_json, seq) + .await?; + info!(contact_id, seq, emoji = %emoji, "Sent reaction over mesh"); + Ok(serde_json::json!({ + "sent": true, + "message_id": msg.id, + "sender_seq": seq, + })) + } + /// mesh.fetch-content — Fetch a ContentRef blob from the sender's onion and /// persist it to our local blob store. Params must include everything the /// receiver needs to construct and authorise the URL: `{ cid, sender_onion, diff --git a/core/archipelago/src/mesh/listener/dispatch.rs b/core/archipelago/src/mesh/listener/dispatch.rs index 3519c4d0..349fde10 100644 --- a/core/archipelago/src/mesh/listener/dispatch.rs +++ b/core/archipelago/src/mesh/listener/dispatch.rs @@ -277,6 +277,31 @@ pub(super) async fn handle_typed_message( dispatch_tx_confirmation(&envelope, sender_contact_id, sender_name, state).await; } + Some(MeshMessageType::Reply) => { + match message_types::decode_payload::(&envelope.v) { + Ok(reply) => { + let json = payload_to_json(&reply); + store_typed_message(state, sender_contact_id, sender_name, &reply.text, "reply", json, Some(envelope.seq)).await; + } + Err(e) => warn!("Failed to decode reply payload: {}", e), + } + } + + Some(MeshMessageType::Reaction) => { + match message_types::decode_payload::(&envelope.v) { + Ok(reaction) => { + let display = if reaction.emoji.is_empty() { + "(reaction cleared)".to_string() + } else { + reaction.emoji.clone() + }; + let json = payload_to_json(&reaction); + store_typed_message(state, sender_contact_id, sender_name, &display, "reaction", json, Some(envelope.seq)).await; + } + Err(e) => warn!("Failed to decode reaction payload: {}", e), + } + } + Some(MeshMessageType::ContentRef) => { match message_types::decode_payload::(&envelope.v) { Ok(content) => { diff --git a/core/archipelago/src/mesh/listener/mod.rs b/core/archipelago/src/mesh/listener/mod.rs index 389afa7c..b1a4b423 100644 --- a/core/archipelago/src/mesh/listener/mod.rs +++ b/core/archipelago/src/mesh/listener/mod.rs @@ -69,6 +69,11 @@ pub struct MeshState { pub event_tx: broadcast::Sender, pub cmd_tx: mpsc::Sender, next_message_id: RwLock, + /// Per-contact outbound sequence counter. Increments on every typed + /// envelope we send to a given peer so the receiver (and anyone else + /// forwarding/reacting) can build a stable MessageKey = (our_pubkey, seq). + /// Channel broadcasts use contact_id = 0 as a shared counter. + next_send_seq: RwLock>, /// Block header cache — populated when receiving headers from internet-connected peers. pub block_header_cache: Arc, /// Relay tracker — stores completed relay results for frontend polling. @@ -121,6 +126,7 @@ impl MeshState { }), event_tx: tx, next_message_id: RwLock::new(1), + next_send_seq: RwLock::new(HashMap::new()), block_header_cache, relay_tracker, stego_mode, @@ -138,6 +144,16 @@ impl MeshState { current } + /// Allocate the next outbound sequence number for a given target + /// (contact_id for direct peers, 0 for channel broadcasts). Monotonic + /// per target, starts at 1. + pub async fn next_send_seq(&self, target: u32) -> u64 { + let mut map = self.next_send_seq.write().await; + let slot = map.entry(target).or_insert(0); + *slot += 1; + *slot + } + pub async fn store_message(&self, msg: MeshMessage) { let mut messages = self.messages.write().await; // Deduplicate: skip if we already have a message with the same text, diff --git a/core/archipelago/src/mesh/message_types.rs b/core/archipelago/src/mesh/message_types.rs index 7c61e12a..16839b37 100644 --- a/core/archipelago/src/mesh/message_types.rs +++ b/core/archipelago/src/mesh/message_types.rs @@ -42,6 +42,10 @@ pub enum MeshMessageType { LightningRelayResponse = 11, /// Confirmation update for a relayed transaction (1, 2, 3 confs). TxConfirmation = 12, + /// Reply to an earlier message, targeted by MessageKey. + Reply = 13, + /// Emoji reaction on an earlier message, targeted by MessageKey. + Reaction = 14, /// Attachment/file reference: CID of a blob held by the sender, fetched out-of-band. ContentRef = 19, } @@ -62,6 +66,8 @@ impl MeshMessageType { 10 => Some(Self::LightningRelay), 11 => Some(Self::LightningRelayResponse), 12 => Some(Self::TxConfirmation), + 13 => Some(Self::Reply), + 14 => Some(Self::Reaction), 19 => Some(Self::ContentRef), _ => None, } @@ -82,11 +88,24 @@ impl MeshMessageType { Self::LightningRelay => "lightning_relay", Self::LightningRelayResponse => "lightning_relay_response", Self::TxConfirmation => "tx_confirmation", + Self::Reply => "reply", + Self::Reaction => "reaction", Self::ContentRef => "content_ref", } } } +/// Cross-transport stable identity for any mesh message. `sender_pubkey` is +/// the hex Ed25519 key of the originator; `sender_seq` is that sender's +/// per-peer (or per-channel) monotonic counter at send time. Receivers use +/// this pair — not the local u64 `id` — when addressing replies, reactions, +/// reads, edits, or deletes. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct MessageKey { + pub sender_pubkey: String, + pub sender_seq: u64, +} + // ─── Wire Envelope ────────────────────────────────────────────────────── /// CBOR wire envelope wrapping any typed message. @@ -166,6 +185,14 @@ impl TypedEnvelope { MeshMessageType::from_u8(self.t) } + /// Set the outbound sequence number. Called by the send path after the + /// target's counter has been incremented. Safe to call AFTER `new_signed` + /// because the signature covers `(t, v, ts)` — not `seq`. + pub fn with_seq(mut self, seq: u64) -> Self { + self.seq = seq; + self + } + /// Encode to wire format: [0x02] [CBOR envelope]. pub fn to_wire(&self) -> Result> { let mut buf = Vec::new(); @@ -320,6 +347,24 @@ pub struct LightningRelayResponsePayload { pub error: Option, } +/// Reply to an earlier message. `target` is the MessageKey the reply points +/// at; `text` is the reply body. Receivers render this as a "replying to" +/// card above the text. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReplyPayload { + pub target: MessageKey, + pub text: String, +} + +/// Emoji reaction on an earlier message. `target` identifies the reacted-to +/// message; `emoji` is a short unicode string (typically 1–4 code points). +/// An empty emoji is treated as "clear my reaction" on the target. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReactionPayload { + pub target: MessageKey, + pub emoji: String, +} + /// Content/attachment reference: points at a blob held by the sender that /// recipients fetch out-of-band via `GET {sender_onion}/blob/{cid}?cap=..&exp=..&peer=..`. /// Thumb bytes (≤60B) may be inlined for immediate display; full blob is lazy. diff --git a/core/archipelago/src/mesh/mod.rs b/core/archipelago/src/mesh/mod.rs index e7c12849..3d49e9c3 100644 --- a/core/archipelago/src/mesh/mod.rs +++ b/core/archipelago/src/mesh/mod.rs @@ -507,6 +507,11 @@ impl MeshService { /// Send a typed envelope wire payload and record a rich Sent MeshMessage. /// Used by RPC handlers that transmit invoice/coordinate/alert/etc. so the /// UI sees a proper rich Sent card instead of garbage wire-byte plaintext. + /// + /// `sender_seq` should match the seq encoded into the TypedEnvelope by the + /// caller — the RPC handler allocates it via `next_send_seq` before + /// building the envelope and threads it through here so the local Sent + /// record carries the same MessageKey the receiver will see. pub async fn send_typed_wire( &self, contact_id: u32, @@ -514,13 +519,21 @@ impl MeshService { type_label: &str, display_text: &str, typed_payload: Option, + sender_seq: u64, ) -> Result { self.send_raw_payload(contact_id, wire).await?; Ok(self - .record_sent_typed(contact_id, type_label, display_text, typed_payload) + .record_sent_typed(contact_id, type_label, display_text, typed_payload, sender_seq) .await) } + /// Allocate the next outbound seq for a target. Convenience passthrough + /// to MeshState::next_send_seq; used by RPC handlers before encoding a + /// TypedEnvelope so the seq on the wire matches the Sent record. + pub async fn next_send_seq(&self, target: u32) -> u64 { + self.state.next_send_seq(target).await + } + /// Broadcast a typed envelope wire payload on a mesh channel and record a /// rich Sent MeshMessage. Bytes are sent directly — do NOT utf8_lossy-encode /// binary envelope bytes before handing them here. @@ -531,6 +544,7 @@ impl MeshService { type_label: &str, display_text: &str, typed_payload: Option, + sender_seq: u64, ) -> Result { let status = self.state.status.read().await; if !status.device_connected { @@ -569,8 +583,8 @@ impl MeshService { encrypted: false, message_type: type_label.to_string(), typed_payload, - sender_pubkey: None, - sender_seq: None, + sender_pubkey: Some(self.our_ed_pubkey_hex.clone()), + sender_seq: Some(sender_seq), }; self.state.store_message(msg.clone()).await; { @@ -631,6 +645,7 @@ impl MeshService { type_label: &str, display_text: &str, typed_payload: Option, + sender_seq: u64, ) -> MeshMessage { let msg_id = self.state.next_id().await; let peer_name = self @@ -651,8 +666,8 @@ impl MeshService { encrypted: false, message_type: type_label.to_string(), typed_payload, - sender_pubkey: None, - sender_seq: None, + sender_pubkey: Some(self.our_ed_pubkey_hex.clone()), + sender_seq: Some(sender_seq), }; self.state.store_message(msg.clone()).await; {