//! Mesh networking: Meshcore LoRa radio integration for offline peer discovery //! and encrypted messaging between Archipelago nodes. //! //! Supports Meshcore firmware on Heltec V3, T-Beam, RAK WisBlock, Station G2, //! and other ESP32/nRF52-based LoRa boards via USB serial (Companion USB mode). pub mod alerts; pub mod bitcoin_relay; pub mod crypto; pub mod listener; pub mod protocol; pub mod serial; pub mod types; pub mod message_types; pub mod outbox; pub mod ratchet; pub mod session; pub mod steganography; pub mod x3dh; pub use types::*; use alerts::DeadManSwitch; use anyhow::{Context, Result}; use bitcoin_relay::{BlockHeaderCache, RelayTracker}; use ed25519_dalek::SigningKey; use listener::MeshState; use serde::{Deserialize, Serialize}; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use tokio::fs; use tokio::sync::{broadcast, watch}; use tracing::{error, info, warn}; const MESH_CONFIG_FILE: &str = "mesh-config.json"; /// Mesh configuration (persisted to disk). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MeshConfig { pub enabled: bool, /// Specific device path, or None for auto-detection. #[serde(default)] pub device_path: Option, /// Channel name for broadcasts. #[serde(default)] pub channel_name: Option, /// Whether to periodically broadcast our identity. #[serde(default)] pub broadcast_identity: bool, /// Custom advertised name on the mesh network. #[serde(default)] pub advert_name: Option, /// Off-grid mode: disable Tor/internet, route everything via mesh only. #[serde(default)] pub mesh_only_mode: Option, /// Announce new Bitcoin block headers over mesh (internet-connected nodes only). #[serde(default)] pub announce_block_headers: bool, /// Steganographic encoding mode for mesh messages (Normal = disabled). #[serde(default)] pub steganography_mode: steganography::SteganographyMode, /// Encrypt directed relay messages (TX, Lightning, block headers) via ratchet or shared secret. /// Set to false to disable encryption for debugging or rollback. #[serde(default = "default_true")] pub encrypt_relay_messages: bool, } fn default_true() -> bool { true } impl Default for MeshConfig { fn default() -> Self { Self { enabled: false, device_path: None, channel_name: Some("archipelago".to_string()), broadcast_identity: true, advert_name: None, mesh_only_mode: None, announce_block_headers: false, steganography_mode: steganography::SteganographyMode::Normal, encrypt_relay_messages: true, } } } pub async fn load_config(data_dir: &Path) -> Result { let path = data_dir.join(MESH_CONFIG_FILE); if !path.exists() { return Ok(MeshConfig::default()); } let content = fs::read_to_string(&path) .await .context("Failed to read mesh config")?; let config: MeshConfig = serde_json::from_str(&content).unwrap_or_default(); Ok(config) } pub async fn save_config(data_dir: &Path, config: &MeshConfig) -> Result<()> { fs::create_dir_all(data_dir) .await .context("Failed to create data dir")?; let content = serde_json::to_string_pretty(config).context("Failed to serialize mesh config")?; fs::write(data_dir.join(MESH_CONFIG_FILE), content) .await .context("Failed to write mesh config")?; Ok(()) } /// Detect serial devices that could be mesh radios. /// Checks both Meshcore (via probe) and legacy Meshtastic paths. pub async fn detect_devices() -> Vec { serial::detect_serial_devices().await } // ─── MeshService ──────────────────────────────────────────────────────── /// Top-level mesh networking service. /// Manages the background listener, exposes APIs for RPC handlers. pub struct MeshService { state: Arc, config: MeshConfig, data_dir: PathBuf, shutdown_tx: Option>, listener_handle: Option>, deadman_handle: Option>, block_announcer_handle: Option>, cmd_rx: Option>, // Crypto identity for this node our_did: String, our_ed_pubkey_hex: String, our_x25519_secret: [u8; 32], our_x25519_pubkey_hex: String, signing_key: SigningKey, // Phase 4: off-grid Bitcoin operations pub block_header_cache: Arc, pub relay_tracker: Arc, pub dead_man_switch: Arc, } impl MeshService { /// Create a new MeshService. Does not start the listener yet. pub async fn new( data_dir: &Path, signing_key: &SigningKey, did: &str, ed_pubkey_hex: &str, ) -> Result { let config = load_config(data_dir).await?; let channel_name = config .channel_name .clone() .unwrap_or_else(|| "archipelago".to_string()); let block_header_cache = Arc::new(BlockHeaderCache::new()); let relay_tracker = Arc::new(RelayTracker::new()); let session_manager = Arc::new(session::SessionManager::new(data_dir)); let (state, _rx, cmd_rx) = MeshState::new( &channel_name, Arc::clone(&block_header_cache), Some(Arc::clone(&relay_tracker)), config.steganography_mode, config.encrypt_relay_messages, Arc::clone(&session_manager), ); // Derive X25519 keys from Ed25519 identity let x25519_secret = crypto::ed25519_secret_to_x25519(signing_key); let x25519_pubkey = crypto::ed25519_pubkey_to_x25519( &signing_key.verifying_key().to_bytes(), )?; let x25519_pubkey_hex = hex::encode(x25519_pubkey); let dead_man_switch = Arc::new( DeadManSwitch::new(data_dir) .await .unwrap_or_else(|e| { warn!("Failed to load dead man config (using defaults): {}", e); // Fallback: create with defaults (won't persist until configured) tokio::runtime::Handle::current() .block_on(DeadManSwitch::new(data_dir)) .expect("DeadManSwitch fallback should succeed") }), ); Ok(Self { state, config, data_dir: data_dir.to_path_buf(), shutdown_tx: None, listener_handle: None, deadman_handle: None, block_announcer_handle: None, cmd_rx: Some(cmd_rx), our_did: did.to_string(), our_ed_pubkey_hex: ed_pubkey_hex.to_string(), our_x25519_secret: x25519_secret, our_x25519_pubkey_hex: x25519_pubkey_hex, signing_key: signing_key.clone(), block_header_cache, relay_tracker, dead_man_switch, }) } /// Start the background mesh listener. pub fn start(&mut self) -> Result<()> { if self.listener_handle.is_some() { anyhow::bail!("Mesh listener already running"); } let (shutdown_tx, shutdown_rx) = watch::channel(false); self.shutdown_tx = Some(shutdown_tx); let cmd_rx = self.cmd_rx.take() .ok_or_else(|| anyhow::anyhow!("Command channel already consumed"))?; let handle = listener::spawn_mesh_listener( Arc::clone(&self.state), self.config.device_path.clone(), self.our_did.clone(), self.our_ed_pubkey_hex.clone(), self.our_x25519_secret, self.our_x25519_pubkey_hex.clone(), shutdown_rx, cmd_rx, ); self.listener_handle = Some(handle); // Spawn dead man's switch background checker let dms = Arc::clone(&self.dead_man_switch); let dms_state = Arc::clone(&self.state); let dms_key = self.signing_key.clone(); let dms_shutdown = self.shutdown_tx.as_ref() .ok_or_else(|| anyhow::anyhow!("Shutdown channel not initialized"))?.subscribe(); let dms_handle = tokio::spawn(async move { let mut shutdown = dms_shutdown; let mut interval = tokio::time::interval(Duration::from_secs(60)); interval.tick().await; // skip first immediate tick loop { tokio::select! { _ = interval.tick() => { if dms.is_triggered().await { let was_triggered = *dms.triggered_flag().await; if !was_triggered { error!("Dead man's switch TRIGGERED — broadcasting alert"); if let Ok(wire) = dms.build_signed_alert(&dms_key).await { for ch in [0u8, 1] { let _ = dms_state.cmd_tx.send( listener::MeshCommand::BroadcastChannel { channel: ch, payload: wire.clone(), }, ).await; } } dms.mark_triggered().await; } } } _ = shutdown.changed() => { if *shutdown.borrow() { return; } } } } }); self.deadman_handle = Some(dms_handle); // Spawn block header announcer (internet-connected nodes only) if self.config.announce_block_headers { let bha_state = Arc::clone(&self.state); let bha_cache = Arc::clone(&self.block_header_cache); let bha_key = self.signing_key.clone(); let bha_did = self.our_did.clone(); let bha_shutdown = self.shutdown_tx.as_ref() .ok_or_else(|| anyhow::anyhow!("Shutdown channel not initialized"))?.subscribe(); let bha_handle = tokio::spawn(async move { let mut shutdown = bha_shutdown; let mut interval = tokio::time::interval(Duration::from_secs(30)); interval.tick().await; // skip first let mut last_announced_height: u64 = 0; let client = match reqwest::Client::builder() .timeout(Duration::from_secs(10)) .build() { Ok(c) => c, Err(e) => { error!("Failed to create HTTP client for block announcer: {}", e); return; } }; loop { tokio::select! { _ = interval.tick() => { // Poll Bitcoin Core for latest block match bitcoin_rpc_getblockcount(&client).await { Ok(height) if height > last_announced_height => { if let Ok(header) = bitcoin_rpc_getblockheader_by_height(&client, height).await { // Store in cache let payload = message_types::BlockHeaderPayload { height, hash: header.hash.clone(), prev_hash: header.prev_hash.clone(), timestamp: header.timestamp, announced_by: bha_did.clone(), }; let _ = bha_cache.store_header(payload).await; // Build signed announcement and broadcast match bitcoin_relay::build_block_header_announcement( height, &header.hash, &header.prev_hash, header.timestamp, &bha_did, &bha_key, ) { Ok(wire) => { // Send to peers — prefer Archy nodes, fall back to all (max 5) let peers = bha_state.peers.read().await; let mut sent = 0u32; let max_peers = 5u32; // First pass: Archy nodes for peer in peers.values() { if sent >= max_peers { break; } if !peer.advert_name.starts_with("Archy-") { continue; } if let Some(ref pk) = peer.pubkey_hex { if let Ok(pk_bytes) = hex::decode(pk) { if pk_bytes.len() >= 6 { let mut prefix = [0u8; 6]; prefix.copy_from_slice(&pk_bytes[..6]); let _ = bha_state.cmd_tx.send( listener::MeshCommand::SendRaw { dest_pubkey_prefix: prefix, payload: wire.clone(), }, ).await; sent += 1; } } } } // Second pass: any peer if no Archy nodes found if sent == 0 { for peer in peers.values() { if sent >= max_peers { break; } if let Some(ref pk) = peer.pubkey_hex { if let Ok(pk_bytes) = hex::decode(pk) { if pk_bytes.len() >= 6 { let mut prefix = [0u8; 6]; prefix.copy_from_slice(&pk_bytes[..6]); let _ = bha_state.cmd_tx.send( listener::MeshCommand::SendRaw { dest_pubkey_prefix: prefix, payload: wire.clone(), }, ).await; sent += 1; } } } } } drop(peers); last_announced_height = height; info!(height, hash = %header.hash, peers = sent, "Announced block header to Archy peers"); } Err(e) => warn!("Failed to build block announcement: {}", e), } } } Ok(_) => {} // No new block Err(e) => { // Bitcoin not running or not reachable — that's fine, skip tracing::debug!("Block poll: {}", e); } } } _ = shutdown.changed() => { if *shutdown.borrow() { return; } } } } }); self.block_announcer_handle = Some(bha_handle); info!("Block header announcer started"); } info!("Mesh service started"); Ok(()) } /// Stop the background listener and dead man's switch. pub async fn stop(&mut self) { if let Some(tx) = self.shutdown_tx.take() { let _ = tx.send(true); } if let Some(handle) = self.listener_handle.take() { let _ = handle.await; } if let Some(handle) = self.deadman_handle.take() { handle.abort(); let _ = handle.await; } if let Some(handle) = self.block_announcer_handle.take() { handle.abort(); let _ = handle.await; } info!("Mesh service stopped"); } /// Get current mesh status. pub async fn status(&self) -> MeshStatus { self.state.status.read().await.clone() } /// Get list of discovered peers. pub async fn peers(&self) -> Vec { self.state.peers.read().await.values().cloned().collect() } /// Get message history. pub async fn messages(&self, limit: Option) -> Vec { let messages = self.state.messages.read().await; let limit = limit.unwrap_or(MAX_MESSAGES_DEFAULT); // Return in chronological order (oldest first) — take last N items let len = messages.len(); let skip = if len > limit { len - limit } else { 0 }; messages.iter().skip(skip).cloned().collect() } /// Send a message to a peer by contact_id. /// Routes through the background listener which owns the serial port. pub async fn send_message(&self, contact_id: u32, text: &str) -> Result { let status = self.state.status.read().await; if !status.device_connected { anyhow::bail!("No mesh device connected"); } drop(status); // Look up the peer's public key to get the 6-byte prefix for addressing let peers = self.state.peers.read().await; let peer = peers .get(&contact_id) .ok_or_else(|| anyhow::anyhow!("Peer not found"))?; let pubkey_hex = peer .pubkey_hex .as_ref() .ok_or_else(|| anyhow::anyhow!("Peer has no public key"))?; let pubkey_bytes = hex::decode(pubkey_hex) .map_err(|_| anyhow::anyhow!("Invalid peer public key"))?; if pubkey_bytes.len() < 6 { anyhow::bail!("Peer public key too short"); } let mut dest_prefix = [0u8; 6]; dest_prefix.copy_from_slice(&pubkey_bytes[..6]); drop(peers); let payload = text.as_bytes().to_vec(); let encrypted = false; if payload.len() > protocol::MAX_MESSAGE_LEN { anyhow::bail!( "Message too large for LoRa: {} bytes (max {})", payload.len(), protocol::MAX_MESSAGE_LEN ); } // Send through the listener's command channel self.state .cmd_tx .send(listener::MeshCommand::SendText { dest_pubkey_prefix: dest_prefix, payload, }) .await .map_err(|_| anyhow::anyhow!("Mesh listener not running"))?; let msg_id = self.state.next_id().await; let peer_name = self .state .peers .read() .await .get(&contact_id) .map(|p| p.advert_name.clone()); let msg = MeshMessage { id: msg_id, direction: MessageDirection::Sent, peer_contact_id: contact_id, peer_name, plaintext: text.to_string(), timestamp: chrono::Utc::now().to_rfc3339(), delivered: false, encrypted, }; self.state.store_message(msg.clone()).await; { let mut status = self.state.status.write().await; status.messages_sent += 1; } Ok(msg) } /// Broadcast our advertisement over mesh so other nodes can discover us. /// Sends an immediate advert via the listener's command channel. pub async fn broadcast_identity(&self) -> Result<()> { let status = self.state.status.read().await; if !status.device_connected { anyhow::bail!("No mesh device connected. Check USB connection."); } drop(status); self.state .cmd_tx .send(listener::MeshCommand::SendAdvert) .await .map_err(|_| anyhow::anyhow!("Mesh listener not running"))?; info!("Mesh self-advert broadcast triggered"); Ok(()) } /// Update mesh configuration. pub async fn configure(&mut self, config: MeshConfig) -> Result<()> { save_config(&self.data_dir, &config).await?; let was_enabled = self.config.enabled; self.config = config.clone(); // Update the status to reflect new config { let mut status = self.state.status.write().await; status.enabled = config.enabled; status.channel_name = config.channel_name.clone().unwrap_or_else(|| "archipelago".to_string()); } // If enabled state changed, start/stop the listener if config.enabled && !was_enabled { self.start()?; } else if !config.enabled && was_enabled { self.stop().await; // Clear connected state let mut status = self.state.status.write().await; status.device_connected = false; status.device_path = None; status.firmware_version = None; status.self_node_id = None; status.peer_count = 0; } Ok(()) } /// Subscribe to mesh events. pub fn subscribe(&self) -> broadcast::Receiver { self.state.event_tx.subscribe() } /// Get a reference to shared state (for RPC handlers). pub fn shared_state(&self) -> Arc { Arc::clone(&self.state) } /// Record user activity (resets dead man's switch timer). pub async fn dead_man_check_in(&self) { self.dead_man_switch.check_in().await; } /// Get the node's signing key (for signed messages). pub fn signing_key(&self) -> &SigningKey { &self.signing_key } /// Get our DID. pub fn our_did(&self) -> &str { &self.our_did } } const MAX_MESSAGES_DEFAULT: usize = 100; // ─── Bitcoin RPC helpers for block header announcer ──────────────────── #[derive(serde::Deserialize)] struct BitcoinRpcResponse { result: Option, error: Option, } struct BlockHeaderInfo { hash: String, prev_hash: String, timestamp: u32, } async fn bitcoin_rpc_getblockcount(client: &reqwest::Client) -> Result { let (rpc_user, rpc_pass) = crate::bitcoin_rpc::bitcoin_rpc_credentials().await; let body = serde_json::json!({ "jsonrpc": "1.0", "id": "mesh", "method": "getblockcount", "params": [] }); let resp: BitcoinRpcResponse = tokio::time::timeout( Duration::from_secs(10), client .post(crate::constants::BITCOIN_RPC_URL) .basic_auth(&rpc_user, Some(&rpc_pass)) .json(&body) .send() ) .await .map_err(|_| anyhow::anyhow!("Bitcoin RPC getblockcount timed out after 10s"))? .map_err(|e| anyhow::anyhow!("Bitcoin RPC send failed: {}", e))? .json() .await .map_err(|e| anyhow::anyhow!("Bitcoin RPC parse failed: {}", e))?; if let Some(err) = resp.error { anyhow::bail!("Bitcoin RPC: {}", err); } resp.result.ok_or_else(|| anyhow::anyhow!("Bitcoin RPC null result")) } async fn bitcoin_rpc_getblockheader_by_height( client: &reqwest::Client, height: u64, ) -> Result { let (rpc_user, rpc_pass) = crate::bitcoin_rpc::bitcoin_rpc_credentials().await; // First get block hash for this height let body = serde_json::json!({ "jsonrpc": "1.0", "id": "mesh", "method": "getblockhash", "params": [height] }); let resp: BitcoinRpcResponse = tokio::time::timeout( Duration::from_secs(10), client .post(crate::constants::BITCOIN_RPC_URL) .basic_auth(&rpc_user, Some(&rpc_pass)) .json(&body) .send() ) .await .map_err(|_| anyhow::anyhow!("Bitcoin RPC getblockhash timed out after 10s"))? .map_err(|e| anyhow::anyhow!("Bitcoin RPC getblockhash send failed: {}", e))? .json() .await .map_err(|e| anyhow::anyhow!("Bitcoin RPC getblockhash parse failed: {}", e))?; let hash = resp.result.ok_or_else(|| anyhow::anyhow!("No block hash"))?; // Then get full header let body = serde_json::json!({ "jsonrpc": "1.0", "id": "mesh", "method": "getblockheader", "params": [hash, true] }); let resp: BitcoinRpcResponse = tokio::time::timeout( Duration::from_secs(10), client .post(crate::constants::BITCOIN_RPC_URL) .basic_auth(&rpc_user, Some(&rpc_pass)) .json(&body) .send() ) .await .map_err(|_| anyhow::anyhow!("Bitcoin RPC getblockheader timed out after 10s"))? .map_err(|e| anyhow::anyhow!("Bitcoin RPC getblockheader send failed: {}", e))? .json() .await .map_err(|e| anyhow::anyhow!("Bitcoin RPC getblockheader parse failed: {}", e))?; let header = resp.result.ok_or_else(|| anyhow::anyhow!("No block header"))?; Ok(BlockHeaderInfo { hash: header["hash"].as_str().unwrap_or_default().to_string(), prev_hash: header["previousblockhash"].as_str().unwrap_or_default().to_string(), timestamp: header["time"].as_u64().unwrap_or(0) as u32, }) } #[cfg(test)] mod tests { use super::*; #[test] fn test_mesh_config_default() { let config = MeshConfig::default(); assert!(!config.enabled); assert_eq!(config.channel_name, Some("archipelago".to_string())); assert!(config.broadcast_identity); } #[test] fn test_mesh_config_serialization() { let config = MeshConfig { enabled: true, device_path: Some("/dev/ttyUSB0".to_string()), channel_name: Some("test".to_string()), broadcast_identity: false, advert_name: Some("MyNode".to_string()), ..Default::default() }; let json = serde_json::to_string(&config).unwrap(); let parsed: MeshConfig = serde_json::from_str(&json).unwrap(); assert!(parsed.enabled); assert_eq!(parsed.device_path, Some("/dev/ttyUSB0".to_string())); assert_eq!(parsed.advert_name, Some("MyNode".to_string())); } #[tokio::test] async fn test_load_config_default_when_no_file() { let dir = tempfile::tempdir().unwrap(); let config = load_config(dir.path()).await.unwrap(); assert!(!config.enabled); } #[tokio::test] async fn test_save_and_load_config_roundtrip() { let dir = tempfile::tempdir().unwrap(); let config = MeshConfig { enabled: true, device_path: Some("/dev/ttyUSB0".to_string()), channel_name: Some("archy".to_string()), broadcast_identity: true, advert_name: None, ..Default::default() }; save_config(dir.path(), &config).await.unwrap(); let loaded = load_config(dir.path()).await.unwrap(); assert!(loaded.enabled); assert_eq!(loaded.device_path, Some("/dev/ttyUSB0".to_string())); } }