//! Mesh networking: Meshcore LoRa radio integration for offline peer discovery //! and encrypted messaging between Archipelago nodes. //! //! Supports Meshcore firmware on Heltec V3, T-Beam, RAK WisBlock, Station G2, //! and other ESP32/nRF52-based LoRa boards via USB serial (Companion USB mode). pub mod alerts; pub mod bitcoin_relay; pub mod crypto; pub mod listener; pub mod protocol; pub mod serial; pub mod types; pub mod message_types; pub mod outbox; pub mod ratchet; pub mod session; pub mod steganography; pub mod x3dh; pub use types::*; use alerts::DeadManSwitch; use anyhow::{Context, Result}; use bitcoin_relay::{BlockHeaderCache, RelayTracker}; use ed25519_dalek::SigningKey; use listener::MeshState; use serde::{Deserialize, Serialize}; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use tokio::fs; use tokio::sync::watch; use tracing::{error, info, warn}; const MESH_CONFIG_FILE: &str = "mesh-config.json"; /// Derive a stable synthetic `contact_id` for a federation peer from its /// archipelago ed25519 pubkey. Mesh LoRa contacts use meshcore firmware's /// own ID space (small ints 0..N), so federation peers are mapped into the /// high half of u32 space to avoid collision. Both the receive path /// (`inject_typed_from_federation`) and the startup pre-seed use this /// formula so they always produce the same id for the same peer. pub(crate) fn federation_peer_contact_id(archipelago_pubkey_hex: &str) -> u32 { let bytes = hex::decode(archipelago_pubkey_hex).unwrap_or_default(); if bytes.len() < 4 { return 0x8000_0001; } let low = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]); 0x8000_0000 | (low & 0x7FFF_FFFF) } /// Upsert a mesh peer record representing a federation node so the UI can /// address it as a chat and `mesh.send-content` can route ContentRef to it. /// Existing entries (same contact_id) are updated in place, preserving any /// previously observed radio state (rssi/snr/hops). pub(crate) async fn upsert_federation_peer( state: &Arc, archipelago_pubkey_hex: &str, did: &str, name: Option<&str>, ) -> u32 { let contact_id = federation_peer_contact_id(archipelago_pubkey_hex); let display_name = name .map(|s| s.to_string()) .unwrap_or_else(|| { let short = &archipelago_pubkey_hex[..archipelago_pubkey_hex.len().min(8)]; format!("Archipelago {}", short) }); let mut peers = state.peers.write().await; let existing = peers.get(&contact_id).cloned(); let peer = MeshPeer { contact_id, advert_name: display_name, did: Some(did.to_string()), pubkey_hex: Some(archipelago_pubkey_hex.to_string()), x25519_pubkey: existing.as_ref().and_then(|p| p.x25519_pubkey), rssi: existing.as_ref().and_then(|p| p.rssi), snr: existing.as_ref().and_then(|p| p.snr), last_heard: chrono::Utc::now().to_rfc3339(), hops: existing.as_ref().map(|p| p.hops).unwrap_or(0), }; peers.insert(contact_id, peer); drop(peers); state.update_peer_count().await; contact_id } /// Load federation nodes from disk and upsert each as a synthetic mesh peer. /// Called at MeshService startup so the chat list already contains every /// known federation node — users can share files to them without first /// receiving a message. pub(crate) async fn seed_federation_peers_into_mesh( state: &Arc, data_dir: &Path, ) { let nodes = match crate::federation::load_nodes(data_dir).await { Ok(n) => n, Err(_) => return, }; for node in nodes { upsert_federation_peer(state, &node.pubkey, &node.did, node.name.as_deref()).await; } } /// Mesh configuration (persisted to disk). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MeshConfig { pub enabled: bool, /// Specific device path, or None for auto-detection. #[serde(default)] pub device_path: Option, /// Channel name for broadcasts. #[serde(default)] pub channel_name: Option, /// Whether to periodically broadcast our identity. #[serde(default)] pub broadcast_identity: bool, /// Custom advertised name on the mesh network. #[serde(default)] pub advert_name: Option, /// Off-grid mode: disable Tor/internet, route everything via mesh only. #[serde(default)] pub mesh_only_mode: Option, /// Announce new Bitcoin block headers over mesh (internet-connected nodes only). #[serde(default)] pub announce_block_headers: bool, /// Steganographic encoding mode for mesh messages (Normal = disabled). #[serde(default)] pub steganography_mode: steganography::SteganographyMode, /// Encrypt directed relay messages (TX, Lightning, block headers) via ratchet or shared secret. /// Set to false to disable encryption for debugging or rollback. #[serde(default = "default_true")] pub encrypt_relay_messages: bool, } fn default_true() -> bool { true } impl Default for MeshConfig { fn default() -> Self { Self { enabled: false, device_path: None, channel_name: Some("archipelago".to_string()), broadcast_identity: true, advert_name: None, mesh_only_mode: None, announce_block_headers: false, steganography_mode: steganography::SteganographyMode::Normal, encrypt_relay_messages: true, } } } pub async fn load_config(data_dir: &Path) -> Result { let path = data_dir.join(MESH_CONFIG_FILE); if !path.exists() { return Ok(MeshConfig::default()); } let content = fs::read_to_string(&path) .await .context("Failed to read mesh config")?; let config: MeshConfig = serde_json::from_str(&content).unwrap_or_default(); Ok(config) } pub async fn save_config(data_dir: &Path, config: &MeshConfig) -> Result<()> { fs::create_dir_all(data_dir) .await .context("Failed to create data dir")?; let content = serde_json::to_string_pretty(config).context("Failed to serialize mesh config")?; fs::write(data_dir.join(MESH_CONFIG_FILE), content) .await .context("Failed to write mesh config")?; Ok(()) } /// Detect serial devices that could be mesh radios. /// Checks both Meshcore (via probe) and legacy Meshtastic paths. pub async fn detect_devices() -> Vec { serial::detect_serial_devices().await } // ─── MeshService ──────────────────────────────────────────────────────── /// Top-level mesh networking service. /// Manages the background listener, exposes APIs for RPC handlers. pub struct MeshService { state: Arc, config: MeshConfig, data_dir: PathBuf, shutdown_tx: Option>, listener_handle: Option>, deadman_handle: Option>, block_announcer_handle: Option>, cmd_rx: Option>, // Crypto identity for this node our_did: String, our_ed_pubkey_hex: String, our_x25519_secret: [u8; 32], our_x25519_pubkey_hex: String, signing_key: SigningKey, // Phase 4: off-grid Bitcoin operations pub block_header_cache: Arc, pub relay_tracker: Arc, pub dead_man_switch: Arc, } impl MeshService { /// Create a new MeshService. Does not start the listener yet. pub async fn new( data_dir: &Path, signing_key: &SigningKey, did: &str, ed_pubkey_hex: &str, ) -> Result { let config = load_config(data_dir).await?; let channel_name = config .channel_name .clone() .unwrap_or_else(|| "archipelago".to_string()); let block_header_cache = Arc::new(BlockHeaderCache::new()); let relay_tracker = Arc::new(RelayTracker::new()); let session_manager = Arc::new(session::SessionManager::new(data_dir)); let (state, _rx, cmd_rx) = MeshState::new( &channel_name, Arc::clone(&block_header_cache), Some(Arc::clone(&relay_tracker)), config.steganography_mode, config.encrypt_relay_messages, Arc::clone(&session_manager), ); // Derive X25519 keys from Ed25519 identity let x25519_secret = crypto::ed25519_secret_to_x25519(signing_key); let x25519_pubkey = crypto::ed25519_pubkey_to_x25519( &signing_key.verifying_key().to_bytes(), )?; let x25519_pubkey_hex = hex::encode(x25519_pubkey); let dead_man_switch = Arc::new( DeadManSwitch::new(data_dir) .await .unwrap_or_else(|e| { warn!("Failed to load dead man config (using defaults): {}", e); // Fallback: create with defaults (won't persist until configured) tokio::runtime::Handle::current() .block_on(DeadManSwitch::new(data_dir)) .expect("DeadManSwitch fallback should succeed") }), ); // Pre-seed mesh state with a synthetic peer record for every known // federation node. Mesh LoRa discovery and federation have disjoint // identity namespaces, so without this step a user can't address a // federated peer from the mesh UI until one side receives over the // radio — which never happens for nodes that only share Tor. seed_federation_peers_into_mesh(&state, data_dir).await; Ok(Self { state, config, data_dir: data_dir.to_path_buf(), shutdown_tx: None, listener_handle: None, deadman_handle: None, block_announcer_handle: None, cmd_rx: Some(cmd_rx), our_did: did.to_string(), our_ed_pubkey_hex: ed_pubkey_hex.to_string(), our_x25519_secret: x25519_secret, our_x25519_pubkey_hex: x25519_pubkey_hex, signing_key: signing_key.clone(), block_header_cache, relay_tracker, dead_man_switch, }) } /// Start the background mesh listener. pub fn start(&mut self) -> Result<()> { if self.listener_handle.is_some() { anyhow::bail!("Mesh listener already running"); } let (shutdown_tx, shutdown_rx) = watch::channel(false); self.shutdown_tx = Some(shutdown_tx); let cmd_rx = self.cmd_rx.take() .ok_or_else(|| anyhow::anyhow!("Command channel already consumed"))?; let handle = listener::spawn_mesh_listener( Arc::clone(&self.state), self.config.device_path.clone(), self.our_did.clone(), self.our_ed_pubkey_hex.clone(), self.our_x25519_secret, self.our_x25519_pubkey_hex.clone(), shutdown_rx, cmd_rx, ); self.listener_handle = Some(handle); // Spawn dead man's switch background checker let dms = Arc::clone(&self.dead_man_switch); let dms_state = Arc::clone(&self.state); let dms_key = self.signing_key.clone(); let dms_shutdown = self.shutdown_tx.as_ref() .ok_or_else(|| anyhow::anyhow!("Shutdown channel not initialized"))?.subscribe(); let dms_handle = tokio::spawn(async move { let mut shutdown = dms_shutdown; let mut interval = tokio::time::interval(Duration::from_secs(60)); interval.tick().await; // skip first immediate tick loop { tokio::select! { _ = interval.tick() => { if dms.is_triggered().await { let was_triggered = *dms.triggered_flag().await; if !was_triggered { error!("Dead man's switch TRIGGERED — broadcasting alert"); if let Ok(wire) = dms.build_signed_alert(&dms_key).await { for ch in [0u8, 1] { let _ = dms_state.cmd_tx.send( listener::MeshCommand::BroadcastChannel { channel: ch, payload: wire.clone(), }, ).await; } } dms.mark_triggered().await; } } } _ = shutdown.changed() => { if *shutdown.borrow() { return; } } } } }); self.deadman_handle = Some(dms_handle); // Spawn block header announcer (internet-connected nodes only) if self.config.announce_block_headers { let bha_state = Arc::clone(&self.state); let bha_cache = Arc::clone(&self.block_header_cache); let bha_key = self.signing_key.clone(); let bha_did = self.our_did.clone(); let bha_shutdown = self.shutdown_tx.as_ref() .ok_or_else(|| anyhow::anyhow!("Shutdown channel not initialized"))?.subscribe(); let bha_handle = tokio::spawn(async move { let mut shutdown = bha_shutdown; let mut interval = tokio::time::interval(Duration::from_secs(30)); interval.tick().await; // skip first let mut last_announced_height: u64 = 0; let client = match reqwest::Client::builder() .timeout(Duration::from_secs(10)) .build() { Ok(c) => c, Err(e) => { error!("Failed to create HTTP client for block announcer: {}", e); return; } }; loop { tokio::select! { _ = interval.tick() => { // Poll Bitcoin Core for latest block match bitcoin_rpc_getblockcount(&client).await { Ok(height) if height > last_announced_height => { if let Ok(header) = bitcoin_rpc_getblockheader_by_height(&client, height).await { // Store in cache let payload = message_types::BlockHeaderPayload { height, hash: header.hash.clone(), prev_hash: header.prev_hash.clone(), timestamp: header.timestamp, announced_by: bha_did.clone(), }; let _ = bha_cache.store_header(payload).await; // Build signed announcement and broadcast match bitcoin_relay::build_block_header_announcement( height, &header.hash, &header.prev_hash, header.timestamp, &bha_did, &bha_key, ) { Ok(wire) => { // Send to peers — prefer Archy nodes, fall back to all (max 5) let peers = bha_state.peers.read().await; let mut sent = 0u32; let max_peers = 5u32; // First pass: Archy nodes for peer in peers.values() { if sent >= max_peers { break; } if !peer.advert_name.starts_with("Archy-") { continue; } if let Some(ref pk) = peer.pubkey_hex { if let Ok(pk_bytes) = hex::decode(pk) { if pk_bytes.len() >= 6 { let mut prefix = [0u8; 6]; prefix.copy_from_slice(&pk_bytes[..6]); let _ = bha_state.cmd_tx.send( listener::MeshCommand::SendRaw { dest_pubkey_prefix: prefix, payload: wire.clone(), }, ).await; sent += 1; } } } } // Second pass: any peer if no Archy nodes found if sent == 0 { for peer in peers.values() { if sent >= max_peers { break; } if let Some(ref pk) = peer.pubkey_hex { if let Ok(pk_bytes) = hex::decode(pk) { if pk_bytes.len() >= 6 { let mut prefix = [0u8; 6]; prefix.copy_from_slice(&pk_bytes[..6]); let _ = bha_state.cmd_tx.send( listener::MeshCommand::SendRaw { dest_pubkey_prefix: prefix, payload: wire.clone(), }, ).await; sent += 1; } } } } } drop(peers); last_announced_height = height; info!(height, hash = %header.hash, peers = sent, "Announced block header to Archy peers"); } Err(e) => warn!("Failed to build block announcement: {}", e), } } } Ok(_) => {} // No new block Err(e) => { // Bitcoin not running or not reachable — that's fine, skip tracing::debug!("Block poll: {}", e); } } } _ = shutdown.changed() => { if *shutdown.borrow() { return; } } } } }); self.block_announcer_handle = Some(bha_handle); info!("Block header announcer started"); } info!("Mesh service started"); Ok(()) } /// Stop the background listener and dead man's switch. pub async fn stop(&mut self) { if let Some(tx) = self.shutdown_tx.take() { let _ = tx.send(true); } if let Some(handle) = self.listener_handle.take() { let _ = handle.await; } if let Some(handle) = self.deadman_handle.take() { handle.abort(); let _ = handle.await; } if let Some(handle) = self.block_announcer_handle.take() { handle.abort(); let _ = handle.await; } info!("Mesh service stopped"); } /// Get current mesh status. pub async fn status(&self) -> MeshStatus { self.state.status.read().await.clone() } /// Get list of discovered peers. pub async fn peers(&self) -> Vec { self.state.peers.read().await.values().cloned().collect() } /// Get message history. pub async fn messages(&self, limit: Option) -> Vec { let messages = self.state.messages.read().await; let limit = limit.unwrap_or(MAX_MESSAGES_DEFAULT); // Return in chronological order (oldest first) — take last N items let len = messages.len(); let skip = if len > limit { len - limit } else { 0 }; messages.iter().skip(skip).cloned().collect() } /// Full in-memory state dump for debugging. Returns peers, all messages, /// status, shared-secret peer ids (not the secrets), encrypt_relay flag, /// and stego mode. Intended for development/smoke-test use only — don't /// call this on a hot path. pub async fn debug_dump(&self) -> serde_json::Value { let status = self.state.status.read().await.clone(); let peers: Vec<_> = self.state.peers.read().await.values().cloned().collect(); let messages: Vec<_> = self.state.messages.read().await.iter().cloned().collect(); let secret_peer_ids: Vec = self.state.shared_secrets.read().await.keys().copied().collect(); serde_json::json!({ "status": status, "peers": peers, "peer_count": peers.len(), "messages": messages, "message_count": messages.len(), "secret_peer_ids": secret_peer_ids, "encrypt_relay": self.state.encrypt_relay, "stego_mode": format!("{:?}", self.state.stego_mode), }) } /// Resolve a peer's 6-byte public-key prefix for mesh addressing. async fn peer_dest_prefix(&self, contact_id: u32) -> Result<[u8; 6]> { let peers = self.state.peers.read().await; let peer = peers .get(&contact_id) .ok_or_else(|| anyhow::anyhow!("Peer not found"))?; let pubkey_hex = peer .pubkey_hex .as_ref() .ok_or_else(|| anyhow::anyhow!("Peer has no public key"))?; let pubkey_bytes = hex::decode(pubkey_hex) .map_err(|_| anyhow::anyhow!("Invalid peer public key"))?; if pubkey_bytes.len() < 6 { anyhow::bail!("Peer public key too short"); } let mut dest_prefix = [0u8; 6]; dest_prefix.copy_from_slice(&pubkey_bytes[..6]); Ok(dest_prefix) } /// Split an oversized wire payload into MC-framed base64 chunks and send /// each via the mesh device. Matches the receive-side reassembly in /// `mesh/listener/decode.rs::handle_chunked_frame` (header `MCIIXXTT`, /// 20-chunk cap, 152 base64 chars per chunk). The caller must ensure /// the peer exists and the device is connected. async fn send_chunked_payload(&self, contact_id: u32, payload: Vec) -> Result<()> { use base64::Engine; const HEADER_LEN: usize = 8; // MC + msg_id(2) + chunk_idx(2) + total(2) const MAX_CHUNK_B64: usize = protocol::MAX_MESSAGE_LEN - HEADER_LEN; const MAX_CHUNKS: u8 = 20; let b64 = base64::engine::general_purpose::STANDARD.encode(&payload); let total_chunks = ((b64.len() + MAX_CHUNK_B64 - 1) / MAX_CHUNK_B64) as u8; if total_chunks == 0 || total_chunks > MAX_CHUNKS { anyhow::bail!( "Payload too large to chunk: {} bytes → {} chunks (max {})", payload.len(), total_chunks, MAX_CHUNKS ); } // Pick a 1-byte msg_id. Use the low 8 bits of the unix nanos; not // cryptographically unique but collisions within a 120s reassembly // window are astronomically unlikely for normal send rates. let msg_id: u8 = (chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64 & 0xFF) as u8; let dest_prefix = self.peer_dest_prefix(contact_id).await?; for chunk_idx in 0..total_chunks { let start = chunk_idx as usize * MAX_CHUNK_B64; let end = (start + MAX_CHUNK_B64).min(b64.len()); let chunk = &b64[start..end]; let frame = format!("MC{:02X}{:02X}{:02X}{}", msg_id, chunk_idx, total_chunks, chunk); self.state .cmd_tx .send(listener::MeshCommand::SendText { dest_pubkey_prefix: dest_prefix, payload: frame.into_bytes(), }) .await .map_err(|_| anyhow::anyhow!("Mesh listener not running"))?; } tracing::info!( contact_id, msg_id, chunks = total_chunks, bytes = payload.len(), "Sent chunked payload over mesh" ); Ok(()) } /// Send raw wire payload bytes to a peer (no Sent-record bookkeeping). /// Callers are responsible for storing the MeshMessage record afterwards. async fn send_raw_payload(&self, contact_id: u32, payload: Vec) -> Result<()> { let status = self.state.status.read().await; if !status.device_connected { anyhow::bail!("No mesh device connected"); } drop(status); if payload.len() > protocol::MAX_MESSAGE_LEN { anyhow::bail!( "Message too large for LoRa: {} bytes (max {})", payload.len(), protocol::MAX_MESSAGE_LEN ); } let dest_prefix = self.peer_dest_prefix(contact_id).await?; self.state .cmd_tx .send(listener::MeshCommand::SendText { dest_pubkey_prefix: dest_prefix, payload, }) .await .map_err(|_| anyhow::anyhow!("Mesh listener not running"))?; Ok(()) } /// Send a typed envelope wire payload and record a rich Sent MeshMessage. /// Used by RPC handlers that transmit invoice/coordinate/alert/etc. so the /// UI sees a proper rich Sent card instead of garbage wire-byte plaintext. /// /// `sender_seq` should match the seq encoded into the TypedEnvelope by the /// caller — the RPC handler allocates it via `next_send_seq` before /// building the envelope and threads it through here so the local Sent /// record carries the same MessageKey the receiver will see. pub async fn send_typed_wire( &self, contact_id: u32, wire: Vec, type_label: &str, display_text: &str, typed_payload: Option, sender_seq: u64, ) -> Result { // Federation-synthetic contacts (high bit set) don't exist in the // meshcore firmware contact table, so LoRa send would fail at // `peer_dest_prefix`. Any envelope larger than the LoRa frame budget // also needs the federation path. In both cases we look up the // peer's onion (by archipelago pubkey first, then by DID) and POST // over Tor; otherwise the send falls through to LoRa. let is_federation_synthetic = contact_id & 0x8000_0000 != 0; let exceeds_lora = wire.len() > protocol::MAX_MESSAGE_LEN; if is_federation_synthetic || exceeds_lora { let (peer_pubkey, peer_did) = { let peers = self.state.peers.read().await; match peers.get(&contact_id) { Some(p) => (p.pubkey_hex.clone(), p.did.clone()), None if is_federation_synthetic => { anyhow::bail!("Unknown federation peer {}", contact_id); } None => (None, None), } }; let nodes = crate::federation::load_nodes(&self.data_dir) .await .unwrap_or_default(); let onion = peer_pubkey .as_ref() .and_then(|pk| nodes.iter().find(|n| &n.pubkey == pk).map(|n| n.onion.clone())) .or_else(|| { peer_did.as_ref().and_then(|d| { nodes.iter().find(|n| &n.did == d).map(|n| n.onion.clone()) }) }); if let Some(onion) = onion { return self .send_typed_wire_via_federation( contact_id, &onion, wire, type_label, display_text, typed_payload, sender_seq, ) .await; } if exceeds_lora { // No federation path — fall back to send-side chunking. Receive // side already handles MC-framed base64 reassembly for up to 20 // chunks (~3KB) per message, which is plenty for ContentRef or // long replies when the peer is LoRa-only. self.send_chunked_payload(contact_id, wire).await?; return Ok(self .record_sent_typed(contact_id, type_label, display_text, typed_payload, sender_seq) .await); } // Fall through: federation-synthetic case handled above, shouldn't reach here. } self.send_raw_payload(contact_id, wire).await?; Ok(self .record_sent_typed(contact_id, type_label, display_text, typed_payload, sender_seq) .await) } /// Send a typed envelope to a peer via the federation (Tor) path rather /// than LoRa. Used when the envelope exceeds LoRa's per-frame budget — /// ContentRef is the canonical example, at 400+ bytes with thumb + cap. /// The peer's onion must already be in federation storage. Records a /// Sent MeshMessage locally on success so the UI gets the rich card. /// /// This does NOT use chunking and does NOT go through the mesh radio — /// it is a straight HTTP POST over Tor to the peer's /// `/archipelago/mesh-typed` endpoint. pub async fn send_typed_wire_via_federation( &self, contact_id: u32, peer_onion: &str, wire: Vec, type_label: &str, display_text: &str, typed_payload: Option, sender_seq: u64, ) -> Result { use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _}; use ed25519_dalek::Signer; let host = if peer_onion.ends_with(".onion") { peer_onion.to_string() } else { format!("{}.onion", peer_onion.trim_end_matches('/')) }; let url = format!("http://{}/archipelago/mesh-typed", host); // Sign the raw wire bytes so the receiver can attribute the envelope // to our pubkey even when it arrives over federation/Tor rather than // the radio. Signature covers the wire only — the receiver re-hashes. let signature = hex::encode(self.signing_key.sign(&wire).to_bytes()); let wire_b64 = BASE64.encode(&wire); let body = serde_json::json!({ "from_pubkey": self.our_ed_pubkey_hex, "from_name": self.our_did, "typed_envelope_b64": wire_b64, "signature": signature, }); let proxy = reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY) .map_err(|e| anyhow::anyhow!("Invalid Tor proxy: {}", e))?; let client = reqwest::Client::builder() .proxy(proxy) .timeout(std::time::Duration::from_secs(120)) .build() .map_err(|e| anyhow::anyhow!("HTTP client build failed: {}", e))?; let resp = client .post(&url) .json(&body) .send() .await .map_err(|e| anyhow::anyhow!("Federation POST failed: {}", e))?; if !resp.status().is_success() { anyhow::bail!("Peer rejected typed envelope: HTTP {}", resp.status()); } Ok(self .record_sent_typed(contact_id, type_label, display_text, typed_payload, sender_seq) .await) } /// Inject a typed envelope received over federation (Tor) into MeshState /// as if it had arrived over the mesh radio. Looks up contact_id by /// matching pubkey_hex against known mesh peers; falls back to a /// synthetic id derived from the pubkey bytes so the UI can still /// address the chat (will render as a new "peer" if not in the list). pub async fn inject_typed_from_federation( &self, from_pubkey_hex: &str, from_name: Option<&str>, wire: Vec, ) -> Result<()> { let envelope = crate::mesh::message_types::TypedEnvelope::from_wire(&wire)?; // Federation and mesh have disjoint identity namespaces: a LoRa // mesh contact carries meshcore's firmware-issued pubkey, not the // archipelago ed25519 key. So we cannot rely on matching pubkeys // across transports. Instead, every federation peer has a stable // synthetic mesh contact_id derived from its archipelago pubkey, // and is upserted into mesh state the first time we hear from them. let (federation_did, federation_name) = { let nodes = crate::federation::load_nodes(&self.data_dir) .await .unwrap_or_default(); nodes .into_iter() .find(|n| n.pubkey == from_pubkey_hex) .map(|n| (Some(n.did), n.name)) .unwrap_or((None, None)) }; // If the federation list knows this sender, use its DID; otherwise // fall back to `from_name` (the sender's self-reported DID from the // envelope body, signature-verified upstream in handle_mesh_typed_relay). let effective_did = federation_did .or_else(|| from_name.map(|s| s.to_string())) .unwrap_or_else(|| format!("did:unknown:{}", &from_pubkey_hex[..from_pubkey_hex.len().min(16)])); let display_name = federation_name .clone() .or_else(|| from_name.map(|s| s.to_string())) .unwrap_or_else(|| "federation peer".to_string()); let contact_id = upsert_federation_peer( &self.state, from_pubkey_hex, &effective_did, Some(&display_name), ) .await; listener::dispatch::handle_typed_envelope_direct( &self.state, contact_id, &display_name, envelope, ) .await; Ok(()) } /// Allocate the next outbound seq for a target. Convenience passthrough /// to MeshState::next_send_seq; used by RPC handlers before encoding a /// TypedEnvelope so the seq on the wire matches the Sent record. pub async fn next_send_seq(&self, target: u32) -> u64 { self.state.next_send_seq(target).await } /// Look up a stored MeshMessage by its local `id`. Used by the Forward /// RPC to pull an existing record's typed payload for re-encoding. pub async fn find_message_by_id(&self, id: u64) -> Option { let messages = self.state.messages.read().await; messages.iter().find(|m| m.id == id).cloned() } /// Apply an Edit locally to any own-Sent message matching `sender_seq` /// (sender_pubkey is implicit = self). Rewrites `plaintext` and appends /// an `edited_at` marker on `typed_payload` so the UI can show "(edited)". /// Best-effort: missing target is silently ignored. pub async fn apply_local_edit(&self, target_seq: u64, new_text: &str, edited_at: u32) { let mut messages = self.state.messages.write().await; for m in messages.iter_mut() { if m.sender_seq == Some(target_seq) && matches!(m.direction, crate::mesh::types::MessageDirection::Sent) { m.plaintext = new_text.to_string(); let mut obj = match m.typed_payload.take() { Some(serde_json::Value::Object(o)) => o, _ => serde_json::Map::new(), }; obj.insert("edited_at".to_string(), serde_json::json!(edited_at)); obj.insert("text".to_string(), serde_json::json!(new_text)); m.typed_payload = Some(serde_json::Value::Object(obj)); break; } } } /// Apply a Delete tombstone locally to an own-Sent message. pub async fn apply_local_delete(&self, target_seq: u64) { let mut messages = self.state.messages.write().await; for m in messages.iter_mut() { if m.sender_seq == Some(target_seq) && matches!(m.direction, crate::mesh::types::MessageDirection::Sent) { m.plaintext = "🗑 message deleted".to_string(); m.typed_payload = Some(serde_json::json!({ "deleted": true })); m.message_type = "delete".to_string(); break; } } } /// Broadcast a typed envelope wire payload on a mesh channel and record a /// rich Sent MeshMessage. Bytes are sent directly — do NOT utf8_lossy-encode /// binary envelope bytes before handing them here. pub async fn send_channel_typed_wire( &self, channel: u8, wire: Vec, type_label: &str, display_text: &str, typed_payload: Option, sender_seq: u64, ) -> Result { let status = self.state.status.read().await; if !status.device_connected { anyhow::bail!("No mesh device connected"); } drop(status); if wire.len() > protocol::MAX_MESSAGE_LEN { anyhow::bail!( "Message too large for LoRa: {} bytes (max {})", wire.len(), protocol::MAX_MESSAGE_LEN ); } self.state .cmd_tx .send(listener::MeshCommand::BroadcastChannel { channel, payload: wire, }) .await .map_err(|_| anyhow::anyhow!("Mesh listener not running"))?; let chan_contact_id = u32::MAX - (channel as u32); let chan_name = format!("Channel {}", channel); let msg_id = self.state.next_id().await; let msg = MeshMessage { id: msg_id, direction: MessageDirection::Sent, peer_contact_id: chan_contact_id, peer_name: Some(chan_name), plaintext: display_text.to_string(), timestamp: chrono::Utc::now().to_rfc3339(), delivered: false, encrypted: false, message_type: type_label.to_string(), typed_payload, sender_pubkey: Some(self.our_ed_pubkey_hex.clone()), sender_seq: Some(sender_seq), }; self.state.store_message(msg.clone()).await; { let mut status = self.state.status.write().await; status.messages_sent += 1; } Ok(msg) } /// Send a text message to a peer. Wraps the text in a typed `Text` /// envelope (variant 0) with an allocated `sender_seq`, so the resulting /// MeshMessage carries a stable MessageKey — this is what makes replies /// and reactions addressable against plain text bubbles. pub async fn send_message(&self, contact_id: u32, text: &str) -> Result { use crate::mesh::message_types::{MeshMessageType, TypedEnvelope}; let seq = self.state.next_send_seq(contact_id).await; let envelope = TypedEnvelope::new(MeshMessageType::Text, text.as_bytes().to_vec()) .with_seq(seq); let wire = envelope.to_wire()?; self.send_typed_wire(contact_id, wire, "text", text, None, seq).await } /// Record a Sent MeshMessage for a typed envelope that has already been /// transmitted by the caller. Used by the RPC layer after sending /// invoice/coordinate/alert/etc. so the UI gets a proper rich Sent card /// instead of a Sent record containing the raw wire bytes as plaintext. pub async fn record_sent_typed( &self, contact_id: u32, type_label: &str, display_text: &str, typed_payload: Option, sender_seq: u64, ) -> MeshMessage { let msg_id = self.state.next_id().await; let peer_name = self .state .peers .read() .await .get(&contact_id) .map(|p| p.advert_name.clone()); let msg = MeshMessage { id: msg_id, direction: MessageDirection::Sent, peer_contact_id: contact_id, peer_name, plaintext: display_text.to_string(), timestamp: chrono::Utc::now().to_rfc3339(), delivered: false, encrypted: false, message_type: type_label.to_string(), typed_payload, sender_pubkey: Some(self.our_ed_pubkey_hex.clone()), sender_seq: Some(sender_seq), }; self.state.store_message(msg.clone()).await; { let mut status = self.state.status.write().await; status.messages_sent += 1; } msg } /// Send a message to a mesh channel (broadcast). /// Routes through the background listener which owns the serial port. pub async fn send_channel_message(&self, channel: u8, text: &str) -> Result { let status = self.state.status.read().await; if !status.device_connected { anyhow::bail!("No mesh device connected"); } drop(status); let payload = text.as_bytes().to_vec(); if payload.len() > protocol::MAX_MESSAGE_LEN { anyhow::bail!( "Message too large for LoRa: {} bytes (max {})", payload.len(), protocol::MAX_MESSAGE_LEN ); } // Send through the listener's command channel self.state .cmd_tx .send(listener::MeshCommand::BroadcastChannel { channel, payload, }) .await .map_err(|_| anyhow::anyhow!("Mesh listener not running"))?; let chan_contact_id = u32::MAX - (channel as u32); let chan_name = format!("Channel {}", channel); let msg_id = self.state.next_id().await; let msg = MeshMessage { id: msg_id, direction: MessageDirection::Sent, peer_contact_id: chan_contact_id, peer_name: Some(chan_name), plaintext: text.to_string(), timestamp: chrono::Utc::now().to_rfc3339(), delivered: false, encrypted: false, message_type: "text".to_string(), typed_payload: None, sender_pubkey: None, sender_seq: None, }; self.state.store_message(msg.clone()).await; { let mut status = self.state.status.write().await; status.messages_sent += 1; } Ok(msg) } /// Broadcast our advertisement over mesh so other nodes can discover us. /// Sends an immediate advert via the listener's command channel. pub async fn broadcast_identity(&self) -> Result<()> { let status = self.state.status.read().await; if !status.device_connected { anyhow::bail!("No mesh device connected. Check USB connection."); } drop(status); self.state .cmd_tx .send(listener::MeshCommand::SendAdvert) .await .map_err(|_| anyhow::anyhow!("Mesh listener not running"))?; info!("Mesh self-advert broadcast triggered"); Ok(()) } /// Update mesh configuration. pub async fn configure(&mut self, config: MeshConfig) -> Result<()> { save_config(&self.data_dir, &config).await?; let was_enabled = self.config.enabled; self.config = config.clone(); // Update the status to reflect new config { let mut status = self.state.status.write().await; status.enabled = config.enabled; status.channel_name = config.channel_name.clone().unwrap_or_else(|| "archipelago".to_string()); } // If enabled state changed, start/stop the listener if config.enabled && !was_enabled { self.start()?; } else if !config.enabled && was_enabled { self.stop().await; // Clear connected state let mut status = self.state.status.write().await; status.device_connected = false; status.device_path = None; status.firmware_version = None; status.self_node_id = None; status.peer_count = 0; } Ok(()) } /// Get a reference to shared state (for RPC handlers). pub fn shared_state(&self) -> Arc { Arc::clone(&self.state) } /// Record user activity (resets dead man's switch timer). pub async fn dead_man_check_in(&self) { self.dead_man_switch.check_in().await; } /// Get our DID. pub fn our_did(&self) -> &str { &self.our_did } } const MAX_MESSAGES_DEFAULT: usize = 100; // ─── Bitcoin RPC helpers for block header announcer ──────────────────── #[derive(serde::Deserialize)] struct BitcoinRpcResponse { result: Option, error: Option, } struct BlockHeaderInfo { hash: String, prev_hash: String, timestamp: u32, } async fn bitcoin_rpc_getblockcount(client: &reqwest::Client) -> Result { let (rpc_user, rpc_pass) = crate::bitcoin_rpc::bitcoin_rpc_credentials().await; let body = serde_json::json!({ "jsonrpc": "1.0", "id": "mesh", "method": "getblockcount", "params": [] }); let resp: BitcoinRpcResponse = tokio::time::timeout( Duration::from_secs(10), client .post(crate::constants::BITCOIN_RPC_URL) .basic_auth(&rpc_user, Some(&rpc_pass)) .json(&body) .send() ) .await .map_err(|_| anyhow::anyhow!("Bitcoin RPC getblockcount timed out after 10s"))? .map_err(|e| anyhow::anyhow!("Bitcoin RPC send failed: {}", e))? .json() .await .map_err(|e| anyhow::anyhow!("Bitcoin RPC parse failed: {}", e))?; if let Some(err) = resp.error { anyhow::bail!("Bitcoin RPC: {}", err); } resp.result.ok_or_else(|| anyhow::anyhow!("Bitcoin RPC null result")) } async fn bitcoin_rpc_getblockheader_by_height( client: &reqwest::Client, height: u64, ) -> Result { let (rpc_user, rpc_pass) = crate::bitcoin_rpc::bitcoin_rpc_credentials().await; // First get block hash for this height let body = serde_json::json!({ "jsonrpc": "1.0", "id": "mesh", "method": "getblockhash", "params": [height] }); let resp: BitcoinRpcResponse = tokio::time::timeout( Duration::from_secs(10), client .post(crate::constants::BITCOIN_RPC_URL) .basic_auth(&rpc_user, Some(&rpc_pass)) .json(&body) .send() ) .await .map_err(|_| anyhow::anyhow!("Bitcoin RPC getblockhash timed out after 10s"))? .map_err(|e| anyhow::anyhow!("Bitcoin RPC getblockhash send failed: {}", e))? .json() .await .map_err(|e| anyhow::anyhow!("Bitcoin RPC getblockhash parse failed: {}", e))?; let hash = resp.result.ok_or_else(|| anyhow::anyhow!("No block hash"))?; // Then get full header let body = serde_json::json!({ "jsonrpc": "1.0", "id": "mesh", "method": "getblockheader", "params": [hash, true] }); let resp: BitcoinRpcResponse = tokio::time::timeout( Duration::from_secs(10), client .post(crate::constants::BITCOIN_RPC_URL) .basic_auth(&rpc_user, Some(&rpc_pass)) .json(&body) .send() ) .await .map_err(|_| anyhow::anyhow!("Bitcoin RPC getblockheader timed out after 10s"))? .map_err(|e| anyhow::anyhow!("Bitcoin RPC getblockheader send failed: {}", e))? .json() .await .map_err(|e| anyhow::anyhow!("Bitcoin RPC getblockheader parse failed: {}", e))?; let header = resp.result.ok_or_else(|| anyhow::anyhow!("No block header"))?; Ok(BlockHeaderInfo { hash: header["hash"].as_str().unwrap_or_default().to_string(), prev_hash: header["previousblockhash"].as_str().unwrap_or_default().to_string(), timestamp: header["time"].as_u64().unwrap_or(0) as u32, }) } #[cfg(test)] mod tests { use super::*; #[test] fn test_mesh_config_default() { let config = MeshConfig::default(); assert!(!config.enabled); assert_eq!(config.channel_name, Some("archipelago".to_string())); assert!(config.broadcast_identity); } #[test] fn test_mesh_config_serialization() { let config = MeshConfig { enabled: true, device_path: Some("/dev/ttyUSB0".to_string()), channel_name: Some("test".to_string()), broadcast_identity: false, advert_name: Some("MyNode".to_string()), ..Default::default() }; let json = serde_json::to_string(&config).unwrap(); let parsed: MeshConfig = serde_json::from_str(&json).unwrap(); assert!(parsed.enabled); assert_eq!(parsed.device_path, Some("/dev/ttyUSB0".to_string())); assert_eq!(parsed.advert_name, Some("MyNode".to_string())); } #[tokio::test] async fn test_load_config_default_when_no_file() { let dir = tempfile::tempdir().unwrap(); let config = load_config(dir.path()).await.unwrap(); assert!(!config.enabled); } #[tokio::test] async fn test_save_and_load_config_roundtrip() { let dir = tempfile::tempdir().unwrap(); let config = MeshConfig { enabled: true, device_path: Some("/dev/ttyUSB0".to_string()), channel_name: Some("archy".to_string()), broadcast_identity: true, advert_name: None, ..Default::default() }; save_config(dir.path(), &config).await.unwrap(); let loaded = load_config(dir.path()).await.unwrap(); assert!(loaded.enabled); assert_eq!(loaded.device_path, Some("/dev/ttyUSB0".to_string())); } }