diff --git a/core/archipelago/src/api/rpc/mod.rs b/core/archipelago/src/api/rpc/mod.rs index 1001b7a6..07779577 100644 --- a/core/archipelago/src/api/rpc/mod.rs +++ b/core/archipelago/src/api/rpc/mod.rs @@ -475,6 +475,7 @@ impl RpcHandler { "node-send-message" => self.handle_node_send_message(params).await, "node-check-peer" => self.handle_node_check_peer(params).await, "node-messages-received" => self.handle_node_messages_received().await, + "node-store-sent" => self.handle_node_store_sent(params).await, "node-nostr-discover" => self.handle_node_nostr_discover().await, "node.did" => self.handle_node_did().await, "node.signChallenge" => self.handle_node_sign_challenge(params).await, diff --git a/core/archipelago/src/api/rpc/peers.rs b/core/archipelago/src/api/rpc/peers.rs index d2a7880f..adf30f87 100644 --- a/core/archipelago/src/api/rpc/peers.rs +++ b/core/archipelago/src/api/rpc/peers.rs @@ -111,6 +111,20 @@ impl RpcHandler { Ok(serde_json::json!({ "messages": messages })) } + /// Store a sent message for Archipelago channel history persistence. + pub(super) async fn handle_node_store_sent( + &self, + params: Option, + ) -> Result { + let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; + let message = params + .get("message") + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow::anyhow!("Missing message"))?; + node_message::store_sent(message); + Ok(serde_json::json!({ "ok": true })) + } + pub(super) async fn handle_node_nostr_discover(&self) -> Result { let identity_dir = self.config.data_dir.join("identity"); let nodes = nostr_discovery::discover_archipelago_nodes( diff --git a/core/archipelago/src/node_message.rs b/core/archipelago/src/node_message.rs index deccad58..c8375379 100644 --- a/core/archipelago/src/node_message.rs +++ b/core/archipelago/src/node_message.rs @@ -1,51 +1,142 @@ //! Node-to-node messaging over Tor. //! Sends messages to peer .onion addresses via SOCKS5 proxy. +//! Messages are persisted to disk and survive restarts. use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; +use std::path::{Path, PathBuf}; use std::sync::{Mutex, OnceLock}; const TOR_SOCKS: &str = "socks5h://127.0.0.1:9050"; +const MAX_STORED: usize = 200; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct IncomingMessage { pub from_pubkey: String, + #[serde(default)] pub from_onion: Option, pub message: String, pub timestamp: String, + /// "sent" or "received" + #[serde(default = "default_received")] + pub direction: String, } -fn received_messages() -> &'static Mutex> { - static RECEIVED: OnceLock>> = OnceLock::new(); - RECEIVED.get_or_init(|| Mutex::new(Vec::new())) +fn default_received() -> String { + "received".to_string() } -const MAX_STORED: usize = 100; +#[derive(Debug, Default, Serialize, Deserialize)] +struct MessageStore { + messages: Vec, +} + +fn store() -> &'static Mutex { + static STORE: OnceLock> = OnceLock::new(); + STORE.get_or_init(|| Mutex::new(MessageStore::default())) +} + +fn data_path() -> &'static Mutex> { + static PATH: OnceLock>> = OnceLock::new(); + PATH.get_or_init(|| Mutex::new(None)) +} + +/// Initialize message store — load from disk. Call once at startup. +pub async fn init(data_dir: &Path) { + let path = data_dir.join("messages.json"); + *data_path().lock().unwrap_or_else(|e| e.into_inner()) = Some(path.clone()); + + if let Ok(content) = tokio::fs::read_to_string(&path).await { + if let Ok(loaded) = serde_json::from_str::(&content) { + let mut guard = store().lock().unwrap_or_else(|e| e.into_inner()); + *guard = loaded; + tracing::info!("Loaded {} messages from disk", guard.messages.len()); + } + } +} + +/// Persist current messages to disk. +fn persist() { + let guard = store().lock().unwrap_or_else(|e| e.into_inner()); + let path_guard = data_path().lock().unwrap_or_else(|e| e.into_inner()); + if let Some(ref path) = *path_guard { + if let Ok(content) = serde_json::to_string(&*guard) { + // Use sync write — called from store functions already holding the lock + let _ = std::fs::write(path, content); + } + } +} /// Store a received message (called from HTTP handler). pub fn store_received_sync(from_pubkey: &str, message: &str) { - let mut guard = received_messages().lock().unwrap_or_else(|e| e.into_inner()); - guard.push(IncomingMessage { + let ts = chrono::Utc::now().to_rfc3339(); + let mut guard = store().lock().unwrap_or_else(|e| e.into_inner()); + + // Deduplication: skip if same pubkey + message within last 30 seconds + let dominated = guard.messages.iter().rev().take(20).any(|m| { + m.from_pubkey == from_pubkey && m.message == message && m.direction == "received" + && within_seconds(&m.timestamp, &ts, 30) + }); + if dominated { + return; + } + + guard.messages.push(IncomingMessage { from_pubkey: from_pubkey.to_string(), from_onion: None, message: message.to_string(), - timestamp: chrono::Utc::now().to_rfc3339(), + timestamp: ts, + direction: "received".to_string(), }); - let len = guard.len(); - if len > MAX_STORED { - guard.drain(0..len - MAX_STORED); - } + trim_messages(&mut guard); + drop(guard); + persist(); } pub async fn store_received(from_pubkey: &str, message: &str) { store_received_sync(from_pubkey, message); } -/// Get received messages for UI display. -pub fn get_received() -> Vec { - received_messages().lock().unwrap_or_else(|e| e.into_inner()).clone() +/// Store a sent message (for display in Archipelago channel). +pub fn store_sent(message: &str) { + let mut guard = store().lock().unwrap_or_else(|e| e.into_inner()); + guard.messages.push(IncomingMessage { + from_pubkey: "me".to_string(), + from_onion: None, + message: message.to_string(), + timestamp: chrono::Utc::now().to_rfc3339(), + direction: "sent".to_string(), + }); + trim_messages(&mut guard); + drop(guard); + persist(); } +/// Get all messages (sent + received) for UI display. +pub fn get_received() -> Vec { + store().lock().unwrap_or_else(|e| e.into_inner()).messages.clone() +} + +fn trim_messages(store: &mut MessageStore) { + if store.messages.len() > MAX_STORED { + let drain = store.messages.len() - MAX_STORED; + store.messages.drain(0..drain); + } +} + +/// Check if two ISO8601 timestamps are within N seconds of each other. +fn within_seconds(ts1: &str, ts2: &str, secs: i64) -> bool { + use chrono::DateTime; + let a = DateTime::parse_from_rfc3339(ts1).ok(); + let b = DateTime::parse_from_rfc3339(ts2).ok(); + match (a, b) { + (Some(a), Some(b)) => (a - b).num_seconds().unsigned_abs() < secs as u64, + _ => false, + } +} + +// ─── Tor Messaging ────────────────────────────────────────────── + /// Tor v3 onion hostname is 56 base32 chars (a-z, 2-7). Reject invalid formats. fn validate_onion(onion: &str) -> Result<()> { let host = onion.trim_end_matches(".onion"); @@ -93,7 +184,7 @@ pub async fn send_to_peer(onion: &str, from_pubkey: &str, message: &str) -> Resu .map_err(|e| { let msg = e.to_string(); if msg.contains("connection refused") || msg.contains("Connection refused") { - anyhow::anyhow!("Tor not reachable at 127.0.0.1:9050. Is the Tor container running?") + anyhow::anyhow!("Tor not reachable at 127.0.0.1:9050. Is Tor running?") } else if msg.contains("timeout") || msg.contains("timed out") { anyhow::anyhow!("Connection timed out. The peer may be offline or unreachable over Tor.") } else { diff --git a/core/archipelago/src/server.rs b/core/archipelago/src/server.rs index 7fcd972f..2d2e8ac7 100644 --- a/core/archipelago/src/server.rs +++ b/core/archipelago/src/server.rs @@ -48,6 +48,9 @@ impl Server { } state_manager.update_data(data.clone()).await; + // Load persisted messages (Archipelago channel) + node_message::init(&config.data_dir).await; + // Auto-create default identity if none exist (fresh boot or factory reset) { let im = crate::identity_manager::IdentityManager::new(&config.data_dir).await; diff --git a/neode-ui/src/views/Mesh.vue b/neode-ui/src/views/Mesh.vue index 68c2f1d0..78e0abfd 100644 --- a/neode-ui/src/views/Mesh.vue +++ b/neode-ui/src/views/Mesh.vue @@ -67,7 +67,6 @@ async function sendArchMessage() { sendError.value = '' sendingArch.value = true try { - // Broadcast to all federated peers over Tor const nodes = await rpcClient.federationListNodes() const msg = messageText.value.trim() let sent = 0 @@ -77,16 +76,14 @@ async function sendArchMessage() { sent++ } catch { /* some peers may be offline */ } } - // Local echo — show the sent message immediately - archMessages.value.push({ - from_pubkey: 'me', - message: msg, - timestamp: new Date().toISOString(), - }) + // Persist sent message on backend (survives restarts) + try { + await rpcClient.call({ method: 'node-store-sent', params: { message: msg } }) + } catch { /* non-fatal */ } messageText.value = '' if (sent === 0) sendError.value = 'No peers reachable — message may arrive when they come online' - // Also reload in background to pick up any replies - setTimeout(loadArchMessages, 5000) + // Reload to show the persisted sent message + await loadArchMessages() } catch (e) { sendError.value = e instanceof Error ? e.message : 'Send failed' } finally { @@ -322,18 +319,21 @@ const hasActiveChat = computed(() => !!activeChatPeer.value || !!activeChatChann const chatMessages = computed(() => { if (archChannelActive.value) { // Map Tor messages to mesh message format for rendering - return archMessages.value.map((m, i) => ({ - id: i, - peer_contact_id: -99, - peer_name: m.from_pubkey === 'me' ? 'You' : (m.from_pubkey.slice(0, 12) + '...'), - direction: (m.from_pubkey === 'me' ? 'sent' : 'received') as 'sent' | 'received', - plaintext: m.message, - timestamp: m.timestamp, - delivered: true, - encrypted: false, - message_type: undefined, - typed_payload: undefined, - })) + return archMessages.value.map((m, i) => { + const isSent = (m as Record).direction === 'sent' || m.from_pubkey === 'me' + return { + id: i, + peer_contact_id: -99, + peer_name: isSent ? 'You' : (m.from_pubkey.slice(0, 12) + '...'), + direction: (isSent ? 'sent' : 'received') as 'sent' | 'received', + plaintext: m.message, + timestamp: m.timestamp, + delivered: true, + encrypted: false, + message_type: undefined, + typed_payload: undefined, + } + }) } if (activeChatChannel.value) { // Channel messages have negative contact_id = -(channel_index + 1)