From 3ed9243c50da8a1f5b985d32005e2fdd7b6bfefd Mon Sep 17 00:00:00 2001 From: Dorian Date: Mon, 13 Apr 2026 08:01:10 -0400 Subject: [PATCH] feat(mesh): rich typed Sent records and echo dedup Adds message_type + typed_payload (JSON) to MeshMessage so the UI can render invoice/alert/coordinate/tx/lightning messages as structured cards in both directions instead of showing raw wire bytes on the Sent side. RPC handlers now route through send_typed_wire / send_channel_typed_wire which transmit the binary envelope directly (no utf8_lossy corruption) and record a rich Sent MeshMessage. Also: store_message deduplicates echo-back doubles (20-msg lookback, 30s window), from_name is plumbed through the federation Incoming path, and peer_dest_prefix / send_raw_payload are factored out of send_message. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/api/handler/node_message.rs | 1 + .../src/api/rpc/mesh/typed_messages.rs | 41 ++++- core/archipelago/src/mesh/listener/decode.rs | 4 + .../archipelago/src/mesh/listener/dispatch.rs | 40 ++++- core/archipelago/src/mesh/listener/mod.rs | 21 +++ core/archipelago/src/mesh/mod.rs | 155 ++++++++++++++++-- core/archipelago/src/mesh/types.rs | 13 +- core/archipelago/src/node_message.rs | 1 + 8 files changed, 244 insertions(+), 32 deletions(-) diff --git a/core/archipelago/src/api/handler/node_message.rs b/core/archipelago/src/api/handler/node_message.rs index d9c1eaf0..b405c026 100644 --- a/core/archipelago/src/api/handler/node_message.rs +++ b/core/archipelago/src/api/handler/node_message.rs @@ -17,6 +17,7 @@ impl ApiHandler { } let incoming: Incoming = serde_json::from_slice(&body).unwrap_or(Incoming { from_pubkey: None, + from_name: None, message: None, signature: None, encrypted: false, diff --git a/core/archipelago/src/api/rpc/mesh/typed_messages.rs b/core/archipelago/src/api/rpc/mesh/typed_messages.rs index f51cd97f..ae924ed7 100644 --- a/core/archipelago/src/api/rpc/mesh/typed_messages.rs +++ b/core/archipelago/src/api/rpc/mesh/typed_messages.rs @@ -38,8 +38,15 @@ impl RpcHandler { .as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; - let wire_str = String::from_utf8_lossy(&wire).to_string(); - let msg = svc.send_message(contact_id, &wire_str).await?; + let display = format!( + "Invoice: {} sats{}", + amount_sats, + memo.as_ref().map(|m| format!(" — {}", m)).unwrap_or_default() + ); + let typed_json = serde_json::to_value(&invoice).ok(); + let msg = svc + .send_typed_wire(contact_id, wire, "invoice", &display, typed_json) + .await?; info!(contact_id, amount_sats, "Sent invoice over mesh"); Ok(serde_json::json!({ @@ -77,8 +84,16 @@ impl RpcHandler { .as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; - let wire_str = String::from_utf8_lossy(&wire).to_string(); - let msg = svc.send_message(contact_id, &wire_str).await?; + let display = format!( + "Location: {:.6}, {:.6}{}", + coord.lat_degrees(), + coord.lng_degrees(), + coord.label.as_ref().map(|l| format!(" ({})", l)).unwrap_or_default() + ); + let typed_json = serde_json::to_value(&coord).ok(); + let msg = svc + .send_typed_wire(contact_id, wire, "coordinate", &display, typed_json) + .await?; info!(contact_id, "Sent coordinate over mesh"); Ok(serde_json::json!({ @@ -153,13 +168,23 @@ impl RpcHandler { .as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; - let wire_str = String::from_utf8_lossy(&wire).to_string(); + let display = alert.message.clone(); + let typed_json = serde_json::to_value(&alert).ok(); if broadcast { - // Send on channel (all peers) - svc.send_message(0, &wire_str).await?; + // Send on public channel (all peers) as raw bytes so the binary + // envelope is not corrupted by utf8 conversion. + svc.send_channel_typed_wire(0, wire, "alert", &display, typed_json.clone()) + .await?; info!(alert_type = alert_type_str, "Broadcast alert over mesh"); } else if let Some(contact_id) = params["contact_id"].as_u64() { - svc.send_message(contact_id as u32, &wire_str).await?; + svc.send_typed_wire( + contact_id as u32, + wire, + "alert", + &display, + typed_json, + ) + .await?; info!(contact_id, alert_type = alert_type_str, "Sent alert to peer"); } else { anyhow::bail!("Must specify contact_id or broadcast: true"); diff --git a/core/archipelago/src/mesh/listener/decode.rs b/core/archipelago/src/mesh/listener/decode.rs index d9bccfc6..bbd88c0d 100644 --- a/core/archipelago/src/mesh/listener/decode.rs +++ b/core/archipelago/src/mesh/listener/decode.rs @@ -276,6 +276,8 @@ pub(super) async fn store_plain_message( timestamp: chrono::Utc::now().to_rfc3339(), delivered: true, encrypted: false, + message_type: "text".to_string(), + typed_payload: None, }; state.store_message(msg.clone()).await; state.status.write().await.messages_received += 1; @@ -438,6 +440,8 @@ pub(super) async fn handle_received_message( timestamp: chrono::Utc::now().to_rfc3339(), delivered: true, encrypted, + message_type: "text".to_string(), + typed_payload: None, }; state.store_message(msg.clone()).await; diff --git a/core/archipelago/src/mesh/listener/dispatch.rs b/core/archipelago/src/mesh/listener/dispatch.rs index 9e6d33a2..c5bb3516 100644 --- a/core/archipelago/src/mesh/listener/dispatch.rs +++ b/core/archipelago/src/mesh/listener/dispatch.rs @@ -8,13 +8,14 @@ use super::super::types::*; use std::sync::Arc; use tracing::{debug, info, warn}; -/// Store a typed message with a type label for UI rendering. +/// Store a typed message with a type label and structured JSON payload for UI rendering. async fn store_typed_message( state: &Arc, contact_id: u32, peer_name: &str, - text: &str, + display_text: &str, type_label: &str, + typed_payload: Option, ) { let msg_id = state.next_id().await; let msg = MeshMessage { @@ -22,16 +23,24 @@ async fn store_typed_message( direction: MessageDirection::Received, peer_contact_id: contact_id, peer_name: Some(peer_name.to_string()), - plaintext: format!("[{}] {}", type_label, text), + plaintext: display_text.to_string(), timestamp: chrono::Utc::now().to_rfc3339(), delivered: true, encrypted: false, + message_type: type_label.to_string(), + typed_payload, }; state.store_message(msg.clone()).await; state.status.write().await.messages_received += 1; let _ = state.event_tx.send(MeshEvent::MessageReceived(msg)); } +/// Serialize a decoded payload to JSON for the UI layer. +/// Falls back to `None` on serialization failure (shouldn't happen for our serde types). +fn payload_to_json(v: &T) -> Option { + serde_json::to_value(v).ok() +} + /// Handle a typed message envelope (0x02 prefix). /// Dispatches to type-specific handlers: BlockHeader, Alert, TxRelay, etc. pub(super) async fn handle_typed_message( @@ -105,12 +114,14 @@ pub(super) async fn handle_typed_message( "Alert received via mesh: {}", alert.message ); + let json = payload_to_json(&alert); store_typed_message( state, sender_contact_id, sender_name, &alert.message, "alert", + json, ) .await; let _ = state.event_tx.send(MeshEvent::AlertReceived { @@ -136,12 +147,14 @@ pub(super) async fn handle_typed_message( tx_len = relay.tx_hex.len(), "TX relay request received — broadcasting to Bitcoin network" ); + let json = payload_to_json(&relay); store_typed_message( state, sender_contact_id, sender_name, &format!("TX relay request #{} ({} hex chars)", relay.request_id, relay.tx_hex.len()), "tx_relay", + json, ) .await; @@ -170,12 +183,14 @@ pub(super) async fn handle_typed_message( amount_sats = relay.amount_sats, "Lightning relay request received" ); + let json = payload_to_json(&relay); store_typed_message( state, sender_contact_id, sender_name, &format!("Lightning relay: {} sats", relay.amount_sats), "lightning_relay", + json, ) .await; // Will be wired to LND in Week 9 @@ -201,7 +216,8 @@ pub(super) async fn handle_typed_message( } else { format!("Lightning failed: {}", resp.error.as_deref().unwrap_or("unknown")) }; - store_typed_message(state, sender_contact_id, sender_name, &text, "lightning_relay_response").await; + let json = payload_to_json(&resp); + store_typed_message(state, sender_contact_id, sender_name, &text, "lightning_relay_response", json).await; let _ = state.event_tx.send(MeshEvent::LightningRelayCompleted { request_id: resp.request_id, payment_hash: resp.payment_hash, @@ -220,7 +236,8 @@ pub(super) async fn handle_typed_message( invoice.amount_sats, invoice.memo.as_ref().map(|m| format!(" — {}", m)).unwrap_or_default() ); - store_typed_message(state, sender_contact_id, sender_name, &text, "invoice").await; + let json = payload_to_json(&invoice); + store_typed_message(state, sender_contact_id, sender_name, &text, "invoice", json).await; } Err(e) => warn!("Failed to decode invoice payload: {}", e), } @@ -235,7 +252,8 @@ pub(super) async fn handle_typed_message( coord.lng_degrees(), coord.label.as_ref().map(|l| format!(" ({})", l)).unwrap_or_default() ); - store_typed_message(state, sender_contact_id, sender_name, &text, "coordinate").await; + let json = payload_to_json(&coord); + store_typed_message(state, sender_contact_id, sender_name, &text, "coordinate", json).await; } Err(e) => warn!("Failed to decode coordinate payload: {}", e), } @@ -291,7 +309,7 @@ async fn dispatch_block_header( timestamp, announced_by: sender_name.to_string(), }; - let _ = state.block_header_cache.store_header(header_payload).await; + let _ = state.block_header_cache.store_header(header_payload.clone()).await; let text = format!( "Block #{} — {}...{}", @@ -299,12 +317,14 @@ async fn dispatch_block_header( &hash_hex[..8.min(hash_hex.len())], &hash_hex[hash_hex.len().saturating_sub(8)..] ); + let json = payload_to_json(&header_payload); store_typed_message( state, sender_contact_id, sender_name, &text, "block_header", + json, ) .await; let _ = state.event_tx.send(MeshEvent::BlockHeaderReceived { @@ -339,7 +359,8 @@ async fn dispatch_tx_relay_response( } else { format!("TX relay failed: {}", resp.error.as_deref().unwrap_or("unknown")) }; - store_typed_message(state, sender_contact_id, sender_name, &text, "tx_relay_response").await; + let json = payload_to_json(&resp); + store_typed_message(state, sender_contact_id, sender_name, &text, "tx_relay_response", json).await; // Store result for frontend polling if let Some(ref tracker) = state.relay_tracker { tracker.store_result(super::super::bitcoin_relay::RelayResult { @@ -380,7 +401,8 @@ async fn dispatch_tx_confirmation( block_height = conf.block_height, "TX confirmation update received" ); - store_typed_message(state, sender_contact_id, sender_name, &status_text, "tx_confirmation").await; + let json = payload_to_json(&conf); + store_typed_message(state, sender_contact_id, sender_name, &status_text, "tx_confirmation", json).await; // Store confirmation for frontend polling if let Some(ref tracker) = state.relay_tracker { tracker.store_result(super::super::bitcoin_relay::RelayResult { diff --git a/core/archipelago/src/mesh/listener/mod.rs b/core/archipelago/src/mesh/listener/mod.rs index 626bbe1e..389afa7c 100644 --- a/core/archipelago/src/mesh/listener/mod.rs +++ b/core/archipelago/src/mesh/listener/mod.rs @@ -29,6 +29,17 @@ const SYNC_INTERVAL: Duration = Duration::from_secs(10); /// Maximum stored messages (circular buffer). const MAX_MESSAGES: usize = 100; +/// Check if two ISO8601 timestamps are within N seconds of each other. +fn within_seconds_iso(ts1: &str, ts2: &str, secs: i64) -> bool { + use chrono::DateTime; + let a = DateTime::parse_from_rfc3339(ts1).ok(); + let b = DateTime::parse_from_rfc3339(ts2).ok(); + match (a, b) { + (Some(a), Some(b)) => (a - b).num_seconds().unsigned_abs() < secs as u64, + _ => false, + } +} + /// Initial delay before reconnection attempt after device disconnect. const RECONNECT_DELAY_INIT: Duration = Duration::from_secs(5); @@ -129,6 +140,16 @@ impl MeshState { pub async fn store_message(&self, msg: MeshMessage) { let mut messages = self.messages.write().await; + // Deduplicate: skip if we already have a message with the same text, + // peer, and timestamp within 30 seconds (prevents echo-back doubles) + let dominated = messages.iter().rev().take(20).any(|m| { + m.peer_contact_id == msg.peer_contact_id + && m.plaintext == msg.plaintext + && within_seconds_iso(&m.timestamp, &msg.timestamp, 30) + }); + if dominated { + return; + } messages.push_back(msg); if messages.len() > MAX_MESSAGES { messages.pop_front(); diff --git a/core/archipelago/src/mesh/mod.rs b/core/archipelago/src/mesh/mod.rs index 3bccc490..f62afeca 100644 --- a/core/archipelago/src/mesh/mod.rs +++ b/core/archipelago/src/mesh/mod.rs @@ -432,16 +432,8 @@ impl MeshService { messages.iter().skip(skip).cloned().collect() } - /// Send a message to a peer by contact_id. - /// Routes through the background listener which owns the serial port. - pub async fn send_message(&self, contact_id: u32, text: &str) -> Result { - let status = self.state.status.read().await; - if !status.device_connected { - anyhow::bail!("No mesh device connected"); - } - drop(status); - - // Look up the peer's public key to get the 6-byte prefix for addressing + /// Resolve a peer's 6-byte public-key prefix for mesh addressing. + async fn peer_dest_prefix(&self, contact_id: u32) -> Result<[u8; 6]> { let peers = self.state.peers.read().await; let peer = peers .get(&contact_id) @@ -457,10 +449,17 @@ impl MeshService { } let mut dest_prefix = [0u8; 6]; dest_prefix.copy_from_slice(&pubkey_bytes[..6]); - drop(peers); + Ok(dest_prefix) + } - let payload = text.as_bytes().to_vec(); - let encrypted = false; + /// Send raw wire payload bytes to a peer (no Sent-record bookkeeping). + /// Callers are responsible for storing the MeshMessage record afterwards. + async fn send_raw_payload(&self, contact_id: u32, payload: Vec) -> Result<()> { + let status = self.state.status.read().await; + if !status.device_connected { + anyhow::bail!("No mesh device connected"); + } + drop(status); if payload.len() > protocol::MAX_MESSAGE_LEN { anyhow::bail!( @@ -470,7 +469,8 @@ impl MeshService { ); } - // Send through the listener's command channel + let dest_prefix = self.peer_dest_prefix(contact_id).await?; + self.state .cmd_tx .send(listener::MeshCommand::SendText { @@ -479,6 +479,90 @@ impl MeshService { }) .await .map_err(|_| anyhow::anyhow!("Mesh listener not running"))?; + Ok(()) + } + + /// Send a typed envelope wire payload and record a rich Sent MeshMessage. + /// Used by RPC handlers that transmit invoice/coordinate/alert/etc. so the + /// UI sees a proper rich Sent card instead of garbage wire-byte plaintext. + pub async fn send_typed_wire( + &self, + contact_id: u32, + wire: Vec, + type_label: &str, + display_text: &str, + typed_payload: Option, + ) -> Result { + self.send_raw_payload(contact_id, wire).await?; + Ok(self + .record_sent_typed(contact_id, type_label, display_text, typed_payload) + .await) + } + + /// Broadcast a typed envelope wire payload on a mesh channel and record a + /// rich Sent MeshMessage. Bytes are sent directly — do NOT utf8_lossy-encode + /// binary envelope bytes before handing them here. + pub async fn send_channel_typed_wire( + &self, + channel: u8, + wire: Vec, + type_label: &str, + display_text: &str, + typed_payload: Option, + ) -> Result { + let status = self.state.status.read().await; + if !status.device_connected { + anyhow::bail!("No mesh device connected"); + } + drop(status); + + if wire.len() > protocol::MAX_MESSAGE_LEN { + anyhow::bail!( + "Message too large for LoRa: {} bytes (max {})", + wire.len(), + protocol::MAX_MESSAGE_LEN + ); + } + + self.state + .cmd_tx + .send(listener::MeshCommand::BroadcastChannel { + channel, + payload: wire, + }) + .await + .map_err(|_| anyhow::anyhow!("Mesh listener not running"))?; + + let chan_contact_id = u32::MAX - (channel as u32); + let chan_name = format!("Channel {}", channel); + let msg_id = self.state.next_id().await; + let msg = MeshMessage { + id: msg_id, + direction: MessageDirection::Sent, + peer_contact_id: chan_contact_id, + peer_name: Some(chan_name), + plaintext: display_text.to_string(), + timestamp: chrono::Utc::now().to_rfc3339(), + delivered: false, + encrypted: false, + message_type: type_label.to_string(), + typed_payload, + }; + self.state.store_message(msg.clone()).await; + { + let mut status = self.state.status.write().await; + status.messages_sent += 1; + } + Ok(msg) + } + + /// Send a message to a peer by contact_id. + /// Routes through the background listener which owns the serial port. + pub async fn send_message(&self, contact_id: u32, text: &str) -> Result { + let payload = text.as_bytes().to_vec(); + let encrypted = false; + + self.send_raw_payload(contact_id, payload).await?; let msg_id = self.state.next_id().await; let peer_name = self @@ -498,6 +582,8 @@ impl MeshService { timestamp: chrono::Utc::now().to_rfc3339(), delivered: false, encrypted, + message_type: "text".to_string(), + typed_payload: None, }; self.state.store_message(msg.clone()).await; @@ -509,6 +595,45 @@ impl MeshService { Ok(msg) } + /// Record a Sent MeshMessage for a typed envelope that has already been + /// transmitted by the caller. Used by the RPC layer after sending + /// invoice/coordinate/alert/etc. so the UI gets a proper rich Sent card + /// instead of a Sent record containing the raw wire bytes as plaintext. + pub async fn record_sent_typed( + &self, + contact_id: u32, + type_label: &str, + display_text: &str, + typed_payload: Option, + ) -> MeshMessage { + let msg_id = self.state.next_id().await; + let peer_name = self + .state + .peers + .read() + .await + .get(&contact_id) + .map(|p| p.advert_name.clone()); + let msg = MeshMessage { + id: msg_id, + direction: MessageDirection::Sent, + peer_contact_id: contact_id, + peer_name, + plaintext: display_text.to_string(), + timestamp: chrono::Utc::now().to_rfc3339(), + delivered: false, + encrypted: false, + message_type: type_label.to_string(), + typed_payload, + }; + self.state.store_message(msg.clone()).await; + { + let mut status = self.state.status.write().await; + status.messages_sent += 1; + } + msg + } + /// Send a message to a mesh channel (broadcast). /// Routes through the background listener which owns the serial port. pub async fn send_channel_message(&self, channel: u8, text: &str) -> Result { @@ -551,6 +676,8 @@ impl MeshService { timestamp: chrono::Utc::now().to_rfc3339(), delivered: false, encrypted: false, + message_type: "text".to_string(), + typed_payload: None, }; self.state.store_message(msg.clone()).await; diff --git a/core/archipelago/src/mesh/types.rs b/core/archipelago/src/mesh/types.rs index ae522e88..42868914 100644 --- a/core/archipelago/src/mesh/types.rs +++ b/core/archipelago/src/mesh/types.rs @@ -64,13 +64,24 @@ pub struct MeshMessage { pub peer_contact_id: u32, /// Peer name (for display). pub peer_name: Option, - /// Decrypted plaintext content. + /// Human-readable rendering — for text messages this is the raw text, + /// for typed messages a short summary used as a fallback in lists. pub plaintext: String, pub timestamp: String, /// Whether delivery was confirmed via ACK. pub delivered: bool, /// Whether the message was end-to-end encrypted. pub encrypted: bool, + /// Typed-envelope label ("text", "invoice", "alert", "coordinate", ...). + #[serde(default = "default_message_type")] + pub message_type: String, + /// Structured payload as JSON — populated for non-text typed messages. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub typed_payload: Option, +} + +fn default_message_type() -> String { + "text".to_string() } /// Overall mesh subsystem status. diff --git a/core/archipelago/src/node_message.rs b/core/archipelago/src/node_message.rs index 69bad59d..0c946262 100644 --- a/core/archipelago/src/node_message.rs +++ b/core/archipelago/src/node_message.rs @@ -112,6 +112,7 @@ pub fn store_sent(message: &str) { guard.messages.push(IncomingMessage { from_pubkey: "me".to_string(), from_onion: None, + from_name: None, message: message.to_string(), timestamp: chrono::Utc::now().to_rfc3339(), direction: "sent".to_string(),