2026-05-17 18:07:40 -04:00

1400 lines
56 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Mesh networking: LoRa radio integration for offline peer discovery
//! and encrypted messaging between Archipelago nodes.
//!
//! Supports Meshcore firmware via Companion USB and Meshtastic firmware via
//! the Meshtastic serial API on Heltec, T-Beam, RAK WisBlock, Station G2,
//! and other ESP32/nRF52-based LoRa boards.
pub mod alerts;
pub mod bitcoin_relay;
pub mod crypto;
pub mod listener;
pub mod meshtastic;
pub mod message_types;
pub mod outbox;
pub mod protocol;
pub mod ratchet;
pub mod serial;
pub mod session;
pub mod steganography;
pub mod types;
pub mod x3dh;
pub use types::*;
use alerts::DeadManSwitch;
use anyhow::{Context, Result};
use bitcoin_relay::{BlockHeaderCache, RelayTracker};
use ed25519_dalek::SigningKey;
use listener::MeshState;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::fs;
use tokio::sync::watch;
use tracing::{error, info, warn};
const MESH_CONFIG_FILE: &str = "mesh-config.json";
const MESH_IGNORED_RADIO_FILE: &str = "mesh-ignored-radio-contacts.json";
/// Derive a stable synthetic `contact_id` for a federation peer from its
/// archipelago ed25519 pubkey. Mesh LoRa contacts use meshcore firmware's
/// own ID space (small ints 0..N), so federation peers are mapped into the
/// high half of u32 space to avoid collision. Both the receive path
/// (`inject_typed_from_federation`) and the startup pre-seed use this
/// formula so they always produce the same id for the same peer.
pub(crate) fn federation_peer_contact_id(archipelago_pubkey_hex: &str) -> u32 {
let bytes = hex::decode(archipelago_pubkey_hex).unwrap_or_default();
if bytes.len() < 4 {
return 0x8000_0001;
}
let low = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
0x8000_0000 | (low & 0x7FFF_FFFF)
}
/// Upsert a mesh peer record representing a federation node so the UI can
/// address it as a chat and `mesh.send-content` can route ContentRef to it.
/// Existing entries (same contact_id) are updated in place, preserving any
/// previously observed radio state (rssi/snr/hops).
pub(crate) async fn upsert_federation_peer(
state: &Arc<listener::MeshState>,
archipelago_pubkey_hex: &str,
did: &str,
name: Option<&str>,
) -> u32 {
let contact_id = federation_peer_contact_id(archipelago_pubkey_hex);
let display_name = name.map(|s| s.to_string()).unwrap_or_else(|| {
let short = &archipelago_pubkey_hex[..archipelago_pubkey_hex.len().min(8)];
format!("Archipelago {}", short)
});
let mut peers = state.peers.write().await;
let existing = peers.get(&contact_id).cloned();
let peer = MeshPeer {
contact_id,
advert_name: display_name,
did: Some(did.to_string()),
pubkey_hex: Some(archipelago_pubkey_hex.to_string()),
x25519_pubkey: existing.as_ref().and_then(|p| p.x25519_pubkey),
rssi: existing.as_ref().and_then(|p| p.rssi),
snr: existing.as_ref().and_then(|p| p.snr),
last_heard: chrono::Utc::now().to_rfc3339(),
hops: existing.as_ref().map(|p| p.hops).unwrap_or(0),
};
peers.insert(contact_id, peer);
drop(peers);
state.update_peer_count().await;
contact_id
}
/// Load federation nodes from disk and upsert each as a synthetic mesh peer.
/// Called at MeshService startup so the chat list already contains every
/// known federation node — users can share files to them without first
/// receiving a message.
pub(crate) async fn seed_federation_peers_into_mesh(
state: &Arc<listener::MeshState>,
data_dir: &Path,
) {
let nodes = match crate::federation::load_nodes(data_dir).await {
Ok(n) => n,
Err(_) => return,
};
for node in nodes {
upsert_federation_peer(state, &node.pubkey, &node.did, node.name.as_deref()).await;
}
}
/// Mesh configuration (persisted to disk).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MeshConfig {
pub enabled: bool,
/// Specific device path, or None for auto-detection.
#[serde(default)]
pub device_path: Option<String>,
/// Channel name for broadcasts.
#[serde(default)]
pub channel_name: Option<String>,
/// Whether to periodically broadcast our identity.
#[serde(default)]
pub broadcast_identity: bool,
/// Custom advertised name on the mesh network.
#[serde(default)]
pub advert_name: Option<String>,
/// Off-grid mode: disable Tor/internet, route everything via mesh only.
#[serde(default)]
pub mesh_only_mode: Option<bool>,
/// Announce new Bitcoin block headers over mesh (internet-connected nodes only).
#[serde(default)]
pub announce_block_headers: bool,
/// Steganographic encoding mode for mesh messages (Normal = disabled).
#[serde(default)]
pub steganography_mode: steganography::SteganographyMode,
/// Encrypt directed relay messages (TX, Lightning, block headers) via ratchet or shared secret.
/// Set to false to disable encryption for debugging or rollback.
#[serde(default = "default_true")]
pub encrypt_relay_messages: bool,
}
fn default_true() -> bool {
true
}
impl Default for MeshConfig {
fn default() -> Self {
Self {
enabled: false,
device_path: None,
channel_name: Some("archipelago".to_string()),
broadcast_identity: true,
advert_name: None,
mesh_only_mode: None,
announce_block_headers: false,
steganography_mode: steganography::SteganographyMode::Normal,
encrypt_relay_messages: true,
}
}
}
pub async fn load_config(data_dir: &Path) -> Result<MeshConfig> {
let path = data_dir.join(MESH_CONFIG_FILE);
if !path.exists() {
return Ok(MeshConfig::default());
}
let content = fs::read_to_string(&path)
.await
.context("Failed to read mesh config")?;
let config: MeshConfig = serde_json::from_str(&content).unwrap_or_default();
Ok(config)
}
pub async fn save_config(data_dir: &Path, config: &MeshConfig) -> Result<()> {
fs::create_dir_all(data_dir)
.await
.context("Failed to create data dir")?;
let content =
serde_json::to_string_pretty(config).context("Failed to serialize mesh config")?;
fs::write(data_dir.join(MESH_CONFIG_FILE), content)
.await
.context("Failed to write mesh config")?;
Ok(())
}
pub async fn load_ignored_radio_contacts(data_dir: &Path) -> Vec<String> {
let path = data_dir.join(MESH_IGNORED_RADIO_FILE);
if !path.exists() {
return Vec::new();
}
match fs::read_to_string(&path).await {
Ok(s) => serde_json::from_str::<Vec<String>>(&s).unwrap_or_default(),
Err(_) => Vec::new(),
}
}
pub async fn save_ignored_radio_contacts(data_dir: &Path, pubkeys: &[String]) -> Result<()> {
fs::create_dir_all(data_dir).await.ok();
let content =
serde_json::to_string_pretty(pubkeys).context("Failed to serialize ignored-radio list")?;
fs::write(data_dir.join(MESH_IGNORED_RADIO_FILE), content)
.await
.context("Failed to write ignored-radio list")?;
Ok(())
}
/// Detect serial devices that could be mesh radios.
/// Checks both Meshcore (via probe) and legacy Meshtastic paths.
pub async fn detect_devices() -> Vec<String> {
serial::detect_serial_devices().await
}
// ─── MeshService ────────────────────────────────────────────────────────
/// Top-level mesh networking service.
/// Manages the background listener, exposes APIs for RPC handlers.
pub struct MeshService {
state: Arc<MeshState>,
config: MeshConfig,
data_dir: PathBuf,
shutdown_tx: Option<watch::Sender<bool>>,
listener_handle: Option<tokio::task::JoinHandle<()>>,
deadman_handle: Option<tokio::task::JoinHandle<()>>,
block_announcer_handle: Option<tokio::task::JoinHandle<()>>,
presence_handle: Option<tokio::task::JoinHandle<()>>,
cmd_rx: Option<tokio::sync::mpsc::Receiver<listener::MeshCommand>>,
// Crypto identity for this node
our_did: String,
our_ed_pubkey_hex: String,
our_x25519_secret: [u8; 32],
our_x25519_pubkey_hex: String,
signing_key: SigningKey,
/// Human-readable server name (e.g. "Arch Dev", "ThinkPad") for mesh adverts.
server_name: Option<String>,
// Phase 4: off-grid Bitcoin operations
pub block_header_cache: Arc<BlockHeaderCache>,
pub relay_tracker: Arc<RelayTracker>,
pub dead_man_switch: Arc<DeadManSwitch>,
}
impl MeshService {
/// Create a new MeshService. Does not start the listener yet.
pub async fn new(
data_dir: &Path,
signing_key: &SigningKey,
did: &str,
ed_pubkey_hex: &str,
) -> Result<Self> {
let config = load_config(data_dir).await?;
let channel_name = config
.channel_name
.clone()
.unwrap_or_else(|| "archipelago".to_string());
let block_header_cache = Arc::new(BlockHeaderCache::new());
let relay_tracker = Arc::new(RelayTracker::new());
let session_manager = Arc::new(session::SessionManager::new(data_dir));
let (state, _rx, cmd_rx) = MeshState::new(
&channel_name,
Arc::clone(&block_header_cache),
Some(Arc::clone(&relay_tracker)),
config.steganography_mode,
config.encrypt_relay_messages,
Arc::clone(&session_manager),
ed_pubkey_hex.to_string(),
);
// Derive X25519 keys from Ed25519 identity
let x25519_secret = crypto::ed25519_secret_to_x25519(signing_key);
let x25519_pubkey =
crypto::ed25519_pubkey_to_x25519(&signing_key.verifying_key().to_bytes())?;
let x25519_pubkey_hex = hex::encode(x25519_pubkey);
let dead_man_switch = Arc::new(DeadManSwitch::new(data_dir).await.unwrap_or_else(|e| {
warn!("Failed to load dead man config (using defaults): {}", e);
// Fallback: create with defaults (won't persist until configured)
tokio::runtime::Handle::current()
.block_on(DeadManSwitch::new(data_dir))
.expect("DeadManSwitch fallback should succeed")
}));
// Pre-seed mesh state with a synthetic peer record for every known
// federation node. Mesh LoRa discovery and federation have disjoint
// identity namespaces, so without this step a user can't address a
// federated peer from the mesh UI until one side receives over the
// radio — which never happens for nodes that only share Tor.
seed_federation_peers_into_mesh(&state, data_dir).await;
// Load the radio-contact blocklist so previously-wiped firmware
// contacts stay hidden after restart. Without this, meshcore's
// persistent on-device contact table regenerates the rows on the
// next refresh_contacts cycle and the user sees stale entries
// they already cleared.
{
let ignored = load_ignored_radio_contacts(data_dir).await;
let mut set = state.radio_contact_blocklist.write().await;
for pk in ignored {
set.insert(pk);
}
}
Ok(Self {
state,
config,
data_dir: data_dir.to_path_buf(),
shutdown_tx: None,
listener_handle: None,
deadman_handle: None,
block_announcer_handle: None,
presence_handle: None,
cmd_rx: Some(cmd_rx),
our_did: did.to_string(),
our_ed_pubkey_hex: ed_pubkey_hex.to_string(),
our_x25519_secret: x25519_secret,
our_x25519_pubkey_hex: x25519_pubkey_hex,
signing_key: signing_key.clone(),
server_name: None,
block_header_cache,
relay_tracker,
dead_man_switch,
})
}
/// Set the human-readable server name used in mesh adverts.
pub fn set_server_name(&mut self, name: Option<String>) {
self.server_name = name;
}
/// Start the background mesh listener.
pub fn start(&mut self) -> Result<()> {
if self.listener_handle.is_some() {
anyhow::bail!("Mesh listener already running");
}
let (shutdown_tx, shutdown_rx) = watch::channel(false);
self.shutdown_tx = Some(shutdown_tx);
let cmd_rx = self
.cmd_rx
.take()
.ok_or_else(|| anyhow::anyhow!("Command channel already consumed"))?;
let handle = listener::spawn_mesh_listener(
Arc::clone(&self.state),
self.config.device_path.clone(),
self.our_did.clone(),
self.our_ed_pubkey_hex.clone(),
self.our_x25519_secret,
self.our_x25519_pubkey_hex.clone(),
self.server_name.clone(),
shutdown_rx,
cmd_rx,
);
self.listener_handle = Some(handle);
// Spawn dead man's switch background checker
let dms = Arc::clone(&self.dead_man_switch);
let dms_state = Arc::clone(&self.state);
let dms_key = self.signing_key.clone();
let dms_shutdown = self
.shutdown_tx
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Shutdown channel not initialized"))?
.subscribe();
let dms_handle = tokio::spawn(async move {
let mut shutdown = dms_shutdown;
let mut interval = tokio::time::interval(Duration::from_secs(60));
interval.tick().await; // skip first immediate tick
loop {
tokio::select! {
_ = interval.tick() => {
if dms.is_triggered().await {
let was_triggered = *dms.triggered_flag().await;
if !was_triggered {
error!("Dead man's switch TRIGGERED — broadcasting alert");
if let Ok(wire) = dms.build_signed_alert(&dms_key).await {
for ch in [0u8, 1] {
let _ = dms_state.send_cmd(
listener::MeshCommand::BroadcastChannel {
channel: ch,
payload: wire.clone(),
},
).await;
}
}
dms.mark_triggered().await;
}
}
}
_ = shutdown.changed() => {
if *shutdown.borrow() { return; }
}
}
}
});
self.deadman_handle = Some(dms_handle);
// Spawn block header announcer (internet-connected nodes only)
if self.config.announce_block_headers {
let bha_state = Arc::clone(&self.state);
let bha_cache = Arc::clone(&self.block_header_cache);
let bha_key = self.signing_key.clone();
let bha_did = self.our_did.clone();
let bha_shutdown = self
.shutdown_tx
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Shutdown channel not initialized"))?
.subscribe();
let bha_handle = tokio::spawn(async move {
let mut shutdown = bha_shutdown;
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.tick().await; // skip first
let mut last_announced_height: u64 = 0;
let client = match reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()
{
Ok(c) => c,
Err(e) => {
error!("Failed to create HTTP client for block announcer: {}", e);
return;
}
};
loop {
tokio::select! {
_ = interval.tick() => {
// Poll Bitcoin Core for latest block
match bitcoin_rpc_getblockcount(&client).await {
Ok(height) if height > last_announced_height => {
if let Ok(header) = bitcoin_rpc_getblockheader_by_height(&client, height).await {
// Store in cache
let payload = message_types::BlockHeaderPayload {
height,
hash: header.hash.clone(),
prev_hash: header.prev_hash.clone(),
timestamp: header.timestamp,
announced_by: bha_did.clone(),
};
let _ = bha_cache.store_header(payload).await;
// Build signed announcement and broadcast
match bitcoin_relay::build_block_header_announcement(
height,
&header.hash,
&header.prev_hash,
header.timestamp,
&bha_did,
&bha_key,
) {
Ok(wire) => {
// Send to peers — prefer Archy nodes, fall back to all (max 5)
let peers = bha_state.peers.read().await;
let mut sent = 0u32;
let max_peers = 5u32;
// First pass: Archy nodes
for peer in peers.values() {
if sent >= max_peers { break; }
if !peer.advert_name.starts_with("Archy-") { continue; }
if let Some(ref pk) = peer.pubkey_hex {
if let Ok(pk_bytes) = hex::decode(pk) {
if pk_bytes.len() >= 6 {
let mut prefix = [0u8; 6];
prefix.copy_from_slice(&pk_bytes[..6]);
let _ = bha_state.send_cmd(
listener::MeshCommand::SendRaw {
dest_pubkey_prefix: prefix,
payload: wire.clone(),
},
).await;
sent += 1;
}
}
}
}
// Second pass: any peer if no Archy nodes found
if sent == 0 {
for peer in peers.values() {
if sent >= max_peers { break; }
if let Some(ref pk) = peer.pubkey_hex {
if let Ok(pk_bytes) = hex::decode(pk) {
if pk_bytes.len() >= 6 {
let mut prefix = [0u8; 6];
prefix.copy_from_slice(&pk_bytes[..6]);
let _ = bha_state.send_cmd(
listener::MeshCommand::SendRaw {
dest_pubkey_prefix: prefix,
payload: wire.clone(),
},
).await;
sent += 1;
}
}
}
}
}
drop(peers);
last_announced_height = height;
info!(height, hash = %header.hash, peers = sent, "Announced block header to Archy peers");
}
Err(e) => warn!("Failed to build block announcement: {}", e),
}
}
}
Ok(_) => {} // No new block
Err(e) => {
// Bitcoin not running or not reachable — that's fine, skip
tracing::debug!("Block poll: {}", e);
}
}
}
_ = shutdown.changed() => {
if *shutdown.borrow() { return; }
}
}
}
});
self.block_announcer_handle = Some(bha_handle);
info!("Block header announcer started");
}
// Presence heartbeat broadcaster is DISABLED. The CBOR-encoded
// PresencePayload was rendering as garbled bytes on peers that
// didn't understand the typed envelope (e.g. the FreeMadeira
// repeater echoed it back as plaintext on channel 0), spamming
// every visible node with "Archy-…: av<61>…fstatusfonline…" every
// 120s. Re-enable only after either (a) presence moves to a
// non-broadcast path or (b) we can guarantee no plain-text-only
// receivers on the shared channel.
self.presence_handle = None;
info!("Mesh service started");
Ok(())
}
/// Stop the background listener and dead man's switch.
pub async fn stop(&mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(true);
}
if let Some(handle) = self.listener_handle.take() {
let _ = handle.await;
}
if let Some(handle) = self.deadman_handle.take() {
handle.abort();
let _ = handle.await;
}
if let Some(handle) = self.block_announcer_handle.take() {
handle.abort();
let _ = handle.await;
}
if let Some(handle) = self.presence_handle.take() {
handle.abort();
let _ = handle.await;
}
// Recreate the cmd channel so a subsequent start() has a fresh
// receiver. The listener task took ownership of the old receiver
// on its previous run and dropped it when the task ended, so
// without this swap the next start() hits "Command channel
// already consumed". Swapping the sender inside MeshState means
// every Arc holder transparently picks up the new channel.
let (new_tx, new_rx) = tokio::sync::mpsc::channel(32);
*self.state.cmd_tx.write().await = new_tx;
self.cmd_rx = Some(new_rx);
info!("Mesh service stopped");
}
/// Get current mesh status.
pub async fn status(&self) -> MeshStatus {
self.state.status.read().await.clone()
}
/// Get a reference to the shared mesh state.
pub fn state(&self) -> &Arc<listener::MeshState> {
&self.state
}
/// Get list of discovered peers.
pub async fn peers(&self) -> Vec<MeshPeer> {
self.state.peers.read().await.values().cloned().collect()
}
/// Get message history.
pub async fn messages(&self, limit: Option<usize>) -> Vec<MeshMessage> {
let messages = self.state.messages.read().await;
let limit = limit.unwrap_or(MAX_MESSAGES_DEFAULT);
// Return in chronological order (oldest first) — take last N items
let len = messages.len();
let skip = len.saturating_sub(limit);
messages.iter().skip(skip).cloned().collect()
}
/// Full in-memory state dump for debugging. Returns peers, all messages,
/// status, shared-secret peer ids (not the secrets), encrypt_relay flag,
/// and stego mode. Intended for development/smoke-test use only — don't
/// call this on a hot path.
pub async fn debug_dump(&self) -> serde_json::Value {
let status = self.state.status.read().await.clone();
let peers: Vec<_> = self.state.peers.read().await.values().cloned().collect();
let messages: Vec<_> = self.state.messages.read().await.iter().cloned().collect();
let secret_peer_ids: Vec<u32> = self
.state
.shared_secrets
.read()
.await
.keys()
.copied()
.collect();
serde_json::json!({
"status": status,
"peers": peers,
"peer_count": peers.len(),
"messages": messages,
"message_count": messages.len(),
"secret_peer_ids": secret_peer_ids,
"encrypt_relay": self.state.encrypt_relay,
"stego_mode": format!("{:?}", self.state.stego_mode),
})
}
/// Resolve a peer's 6-byte public-key prefix for mesh addressing.
async fn peer_dest_prefix(&self, contact_id: u32) -> Result<[u8; 6]> {
let peers = self.state.peers.read().await;
let peer = peers
.get(&contact_id)
.ok_or_else(|| anyhow::anyhow!("Peer not found"))?;
let pubkey_hex = peer
.pubkey_hex
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Peer has no public key"))?;
let pubkey_bytes =
hex::decode(pubkey_hex).map_err(|_| anyhow::anyhow!("Invalid peer public key"))?;
if pubkey_bytes.len() < 6 {
anyhow::bail!("Peer public key too short");
}
let mut dest_prefix = [0u8; 6];
dest_prefix.copy_from_slice(&pubkey_bytes[..6]);
Ok(dest_prefix)
}
/// Split an oversized wire payload into MC-framed base64 chunks and send
/// each via the mesh device. Matches the receive-side reassembly in
/// `mesh/listener/decode.rs::handle_chunked_frame` (header `MCIIXXTT`,
/// 20-chunk cap, 152 base64 chars per chunk). The caller must ensure
/// the peer exists and the device is connected.
#[allow(dead_code)]
async fn send_chunked_payload(&self, contact_id: u32, payload: Vec<u8>) -> Result<()> {
use base64::Engine;
const HEADER_LEN: usize = 8; // MC + msg_id(2) + chunk_idx(2) + total(2)
const MAX_CHUNK_B64: usize = protocol::MAX_MESSAGE_LEN - HEADER_LEN;
const MAX_CHUNKS: u8 = 20;
let b64 = base64::engine::general_purpose::STANDARD.encode(&payload);
let total_chunks = b64.len().div_ceil(MAX_CHUNK_B64) as u8;
if total_chunks == 0 || total_chunks > MAX_CHUNKS {
anyhow::bail!(
"Payload too large to chunk: {} bytes → {} chunks (max {})",
payload.len(),
total_chunks,
MAX_CHUNKS
);
}
// Pick a 1-byte msg_id. Use the low 8 bits of the unix nanos; not
// cryptographically unique but collisions within a 120s reassembly
// window are astronomically unlikely for normal send rates.
let msg_id: u8 =
(chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64 & 0xFF) as u8;
let dest_prefix = self.peer_dest_prefix(contact_id).await?;
for chunk_idx in 0..total_chunks {
let start = chunk_idx as usize * MAX_CHUNK_B64;
let end = (start + MAX_CHUNK_B64).min(b64.len());
let chunk = &b64[start..end];
let frame = format!(
"MC{:02X}{:02X}{:02X}{}",
msg_id, chunk_idx, total_chunks, chunk
);
self.state
.send_cmd(listener::MeshCommand::SendText {
dest_pubkey_prefix: dest_prefix,
payload: frame.into_bytes(),
})
.await
.map_err(|_| anyhow::anyhow!("Mesh listener not running"))?;
}
tracing::info!(
contact_id,
msg_id,
chunks = total_chunks,
bytes = payload.len(),
"Sent chunked payload over mesh"
);
Ok(())
}
/// Send raw wire payload bytes to a peer (no Sent-record bookkeeping).
/// Callers are responsible for storing the MeshMessage record afterwards.
///
/// Oversized payloads (>LoRa per-frame budget) are handled by the lower
/// `send_dm_via_channel` layer, which base64-encodes + MC-frame-chunks
/// the bytes into 80-char pieces and reassembles on the receiver. We
/// must NOT chunk here as well — doing so double-chunks and produces
/// bytes the receiver can't decode.
async fn send_raw_payload(&self, contact_id: u32, payload: Vec<u8>) -> Result<()> {
let status = self.state.status.read().await;
if !status.device_connected {
anyhow::bail!("No mesh device connected");
}
drop(status);
let dest_prefix = self.peer_dest_prefix(contact_id).await?;
self.state
.send_cmd(listener::MeshCommand::SendText {
dest_pubkey_prefix: dest_prefix,
payload,
})
.await
.map_err(|_| anyhow::anyhow!("Mesh listener not running"))?;
Ok(())
}
/// Send a typed envelope wire payload and record a rich Sent MeshMessage.
/// Used by RPC handlers that transmit invoice/coordinate/alert/etc. so the
/// UI sees a proper rich Sent card instead of garbage wire-byte plaintext.
///
/// `sender_seq` should match the seq encoded into the TypedEnvelope by the
/// caller — the RPC handler allocates it via `next_send_seq` before
/// building the envelope and threads it through here so the local Sent
/// record carries the same MessageKey the receiver will see.
pub async fn send_typed_wire(
&self,
contact_id: u32,
wire: Vec<u8>,
type_label: &str,
display_text: &str,
typed_payload: Option<serde_json::Value>,
sender_seq: u64,
) -> Result<MeshMessage> {
// Federation-synthetic contacts (high bit set) don't exist in the
// meshcore firmware contact table, so LoRa send would fail at
// `peer_dest_prefix`. Any envelope larger than the LoRa frame budget
// also needs the federation path. In both cases we look up the
// peer's onion (by archipelago pubkey first, then by DID) and POST
// over Tor; otherwise the send falls through to LoRa.
let is_federation_synthetic = contact_id & 0x8000_0000 != 0;
let exceeds_lora = wire.len() > protocol::MAX_MESSAGE_LEN;
if is_federation_synthetic || exceeds_lora {
let (peer_pubkey, peer_did) = {
let peers = self.state.peers.read().await;
match peers.get(&contact_id) {
Some(p) => (p.pubkey_hex.clone(), p.did.clone()),
None if is_federation_synthetic => {
anyhow::bail!("Unknown federation peer {}", contact_id);
}
None => (None, None),
}
};
let nodes = crate::federation::load_nodes(&self.data_dir)
.await
.unwrap_or_default();
let onion = peer_pubkey
.as_ref()
.and_then(|pk| {
nodes
.iter()
.find(|n| &n.pubkey == pk)
.map(|n| n.onion.clone())
})
.or_else(|| {
peer_did
.as_ref()
.and_then(|d| nodes.iter().find(|n| &n.did == d).map(|n| n.onion.clone()))
});
if let Some(onion) = onion {
return self
.send_typed_wire_via_federation(
contact_id,
&onion,
wire,
type_label,
display_text,
typed_payload,
sender_seq,
)
.await;
}
// No federation path — fall through to send_raw_payload, which
// hands the wire to the lower DM-via-channel layer. That layer
// (`send_dm_via_channel` in listener/session.rs) handles both
// single-frame and chunked transmission internally; we must NOT
// pre-chunk here as well or the receiver sees garbage.
}
self.send_raw_payload(contact_id, wire).await?;
Ok(self
.record_sent_typed(
contact_id,
type_label,
display_text,
typed_payload,
sender_seq,
)
.await)
}
/// Send a typed envelope to a peer via the federation (Tor) path rather
/// than LoRa. Used when the envelope exceeds LoRa's per-frame budget —
/// ContentRef is the canonical example, at 400+ bytes with thumb + cap.
/// The peer's onion must already be in federation storage. Records a
/// Sent MeshMessage locally on success so the UI gets the rich card.
///
/// This does NOT use chunking and does NOT go through the mesh radio —
/// it is a straight HTTP POST over Tor to the peer's
/// `/archipelago/mesh-typed` endpoint.
///
/// The POST itself is fire-and-forget: we record the Sent MeshMessage
/// synchronously (so the UI sees the bubble immediately) and spawn the
/// Tor HTTP in the background. Tor circuit setup is 15s per envelope
/// and blocking the RPC on it made `mesh.send` feel laggy — especially
/// over a held Enter key. Delivery failures still surface via the
/// absent read-receipt path: `delivered` stays `false` on the Sent
/// record if the peer never echoes back a receipt.
pub async fn send_typed_wire_via_federation(
&self,
contact_id: u32,
peer_onion: &str,
wire: Vec<u8>,
type_label: &str,
display_text: &str,
typed_payload: Option<serde_json::Value>,
sender_seq: u64,
) -> Result<MeshMessage> {
use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
use ed25519_dalek::Signer;
// Sign the raw wire bytes so the receiver can attribute the envelope
// to our pubkey even when it arrives over federation/FIPS/Tor rather
// than the radio. Signature covers the wire only — the receiver
// re-hashes.
let signature = hex::encode(self.signing_key.sign(&wire).to_bytes());
let wire_b64 = BASE64.encode(&wire);
let body = serde_json::json!({
"from_pubkey": self.our_ed_pubkey_hex,
"from_name": self.our_did,
"typed_envelope_b64": wire_b64,
"signature": signature,
});
// Record Sent now so the UI bubble appears immediately.
let msg = self
.record_sent_typed(
contact_id,
type_label,
display_text,
typed_payload,
sender_seq,
)
.await;
// Fire the send in the background. FIPS is preferred when the peer
// is federated and running fips; Tor is the fallback. Failures are
// logged but do not propagate — caller already has the Sent
// MeshMessage and the UI's delivery indicator tracks the receipt.
let peer_onion_owned = peer_onion.to_string();
let data_dir_owned = self.data_dir.clone();
tokio::spawn(async move {
let fips_npub =
crate::federation::fips_npub_for_onion(&data_dir_owned, &peer_onion_owned).await;
let req = crate::fips::dial::PeerRequest::new(
fips_npub.as_deref(),
&peer_onion_owned,
"/archipelago/mesh-typed",
)
.service(crate::settings::transport::PeerService::Messaging)
.timeout(std::time::Duration::from_secs(120));
match req.send_json(&body).await {
Ok((resp, transport)) if resp.status().is_success() => {
tracing::debug!(contact_id, transport = %transport, "Federation envelope delivered");
}
Ok((resp, transport)) => warn!(
contact_id,
status = %resp.status(),
transport = %transport,
"Peer rejected federation-routed envelope"
),
Err(e) => warn!(contact_id, "Federation POST failed: {}", e),
}
});
Ok(msg)
}
/// Inject a typed envelope received over federation (Tor) into MeshState
/// as if it had arrived over the mesh radio. Looks up contact_id by
/// matching pubkey_hex against known mesh peers; falls back to a
/// synthetic id derived from the pubkey bytes so the UI can still
/// address the chat (will render as a new "peer" if not in the list).
pub async fn inject_typed_from_federation(
&self,
from_pubkey_hex: &str,
from_name: Option<&str>,
wire: Vec<u8>,
) -> Result<()> {
let envelope = crate::mesh::message_types::TypedEnvelope::from_wire(&wire)?;
// Federation and mesh have disjoint identity namespaces: a LoRa
// mesh contact carries meshcore's firmware-issued pubkey, not the
// archipelago ed25519 key. So we cannot rely on matching pubkeys
// across transports. Instead, every federation peer has a stable
// synthetic mesh contact_id derived from its archipelago pubkey,
// and is upserted into mesh state the first time we hear from them.
let (federation_did, federation_name) = {
let nodes = crate::federation::load_nodes(&self.data_dir)
.await
.unwrap_or_default();
nodes
.into_iter()
.find(|n| n.pubkey == from_pubkey_hex)
.map(|n| (Some(n.did), n.name))
.unwrap_or((None, None))
};
// If the federation list knows this sender, use its DID; otherwise
// fall back to `from_name` (the sender's self-reported DID from the
// envelope body, signature-verified upstream in handle_mesh_typed_relay).
let effective_did = federation_did
.or_else(|| from_name.map(|s| s.to_string()))
.unwrap_or_else(|| {
format!(
"did:unknown:{}",
&from_pubkey_hex[..from_pubkey_hex.len().min(16)]
)
});
let display_name = federation_name
.clone()
.or_else(|| from_name.map(|s| s.to_string()))
.unwrap_or_else(|| "federation peer".to_string());
let contact_id = upsert_federation_peer(
&self.state,
from_pubkey_hex,
&effective_did,
Some(&display_name),
)
.await;
listener::dispatch::handle_typed_envelope_direct(
&self.state,
contact_id,
&display_name,
envelope,
)
.await;
Ok(())
}
/// Allocate the next outbound seq for a target. Convenience passthrough
/// to MeshState::next_send_seq; used by RPC handlers before encoding a
/// TypedEnvelope so the seq on the wire matches the Sent record.
pub async fn next_send_seq(&self, target: u32) -> u64 {
self.state.next_send_seq(target).await
}
/// Look up a stored MeshMessage by its local `id`. Used by the Forward
/// RPC to pull an existing record's typed payload for re-encoding.
pub async fn find_message_by_id(&self, id: u64) -> Option<MeshMessage> {
let messages = self.state.messages.read().await;
messages.iter().find(|m| m.id == id).cloned()
}
/// Drop a stored MeshMessage by local id. Used after sending control
/// envelopes (read receipts) so they don't surface as their own
/// bubbles in the chat history. The wire frame is already on its way;
/// this just prunes the local Sent record.
pub async fn drop_message_by_id(&self, id: u64) {
let mut messages = self.state.messages.write().await;
messages.retain(|m| m.id != id);
}
/// Apply an Edit locally to any own-Sent message matching `sender_seq`
/// (sender_pubkey is implicit = self). Rewrites `plaintext` and appends
/// an `edited_at` marker on `typed_payload` so the UI can show "(edited)".
/// Best-effort: missing target is silently ignored.
pub async fn apply_local_edit(&self, target_seq: u64, new_text: &str, edited_at: u32) {
let mut messages = self.state.messages.write().await;
for m in messages.iter_mut() {
if m.sender_seq == Some(target_seq)
&& matches!(m.direction, crate::mesh::types::MessageDirection::Sent)
{
m.plaintext = new_text.to_string();
let mut obj = match m.typed_payload.take() {
Some(serde_json::Value::Object(o)) => o,
_ => serde_json::Map::new(),
};
obj.insert("edited_at".to_string(), serde_json::json!(edited_at));
obj.insert("text".to_string(), serde_json::json!(new_text));
m.typed_payload = Some(serde_json::Value::Object(obj));
break;
}
}
}
/// Apply a Delete tombstone locally to an own-Sent message.
pub async fn apply_local_delete(&self, target_seq: u64) {
let mut messages = self.state.messages.write().await;
for m in messages.iter_mut() {
if m.sender_seq == Some(target_seq)
&& matches!(m.direction, crate::mesh::types::MessageDirection::Sent)
{
m.plaintext = "🗑 message deleted".to_string();
m.typed_payload = Some(serde_json::json!({ "deleted": true }));
m.message_type = "delete".to_string();
break;
}
}
}
/// Broadcast a typed envelope wire payload on a mesh channel and record a
/// rich Sent MeshMessage. Bytes are sent directly — do NOT utf8_lossy-encode
/// binary envelope bytes before handing them here.
pub async fn send_channel_typed_wire(
&self,
channel: u8,
wire: Vec<u8>,
type_label: &str,
display_text: &str,
typed_payload: Option<serde_json::Value>,
sender_seq: u64,
) -> Result<MeshMessage> {
let status = self.state.status.read().await;
if !status.device_connected {
anyhow::bail!("No mesh device connected");
}
drop(status);
if wire.len() > protocol::MAX_MESSAGE_LEN {
anyhow::bail!(
"Message too large for LoRa: {} bytes (max {})",
wire.len(),
protocol::MAX_MESSAGE_LEN
);
}
self.state
.send_cmd(listener::MeshCommand::BroadcastChannel {
channel,
payload: wire,
})
.await
.map_err(|_| anyhow::anyhow!("Mesh listener not running"))?;
let chan_contact_id = u32::MAX - (channel as u32);
let chan_name = format!("Channel {}", channel);
let msg_id = self.state.next_id().await;
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Sent,
peer_contact_id: chan_contact_id,
peer_name: Some(chan_name),
plaintext: display_text.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: false,
encrypted: false,
message_type: type_label.to_string(),
typed_payload,
sender_pubkey: Some(self.our_ed_pubkey_hex.clone()),
sender_seq: Some(sender_seq),
};
self.state.store_message(msg.clone()).await;
{
let mut status = self.state.status.write().await;
status.messages_sent += 1;
}
Ok(msg)
}
/// Send a text message to a peer. Wraps the text in a typed `Text`
/// envelope (variant 0) with an allocated `sender_seq`, so the resulting
/// MeshMessage carries a stable MessageKey — this is what makes replies
/// and reactions addressable against plain text bubbles.
pub async fn send_message(&self, contact_id: u32, text: &str) -> Result<MeshMessage> {
use crate::mesh::message_types::{MeshMessageType, TypedEnvelope};
let seq = self.state.next_send_seq(contact_id).await;
let envelope =
TypedEnvelope::new(MeshMessageType::Text, text.as_bytes().to_vec()).with_seq(seq);
let wire = envelope.to_wire()?;
self.send_typed_wire(contact_id, wire, "text", text, None, seq)
.await
}
/// Record a Sent MeshMessage for a typed envelope that has already been
/// transmitted by the caller. Used by the RPC layer after sending
/// invoice/coordinate/alert/etc. so the UI gets a proper rich Sent card
/// instead of a Sent record containing the raw wire bytes as plaintext.
pub async fn record_sent_typed(
&self,
contact_id: u32,
type_label: &str,
display_text: &str,
typed_payload: Option<serde_json::Value>,
sender_seq: u64,
) -> MeshMessage {
let msg_id = self.state.next_id().await;
let peer_name = self
.state
.peers
.read()
.await
.get(&contact_id)
.map(|p| p.advert_name.clone());
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Sent,
peer_contact_id: contact_id,
peer_name,
plaintext: display_text.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: false,
encrypted: false,
message_type: type_label.to_string(),
typed_payload,
sender_pubkey: Some(self.our_ed_pubkey_hex.clone()),
sender_seq: Some(sender_seq),
};
self.state.store_message(msg.clone()).await;
{
let mut status = self.state.status.write().await;
status.messages_sent += 1;
}
msg
}
/// Send a message to a mesh channel (broadcast).
/// Routes through the background listener which owns the serial port.
pub async fn send_channel_message(&self, channel: u8, text: &str) -> Result<MeshMessage> {
let status = self.state.status.read().await;
if !status.device_connected {
anyhow::bail!("No mesh device connected");
}
drop(status);
let payload = text.as_bytes().to_vec();
if payload.len() > protocol::MAX_MESSAGE_LEN {
anyhow::bail!(
"Message too large for LoRa: {} bytes (max {})",
payload.len(),
protocol::MAX_MESSAGE_LEN
);
}
// Send through the listener's command channel
self.state
.send_cmd(listener::MeshCommand::BroadcastChannel { channel, payload })
.await
.map_err(|_| anyhow::anyhow!("Mesh listener not running"))?;
let chan_contact_id = u32::MAX - (channel as u32);
let chan_name = format!("Channel {}", channel);
let msg_id = self.state.next_id().await;
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Sent,
peer_contact_id: chan_contact_id,
peer_name: Some(chan_name),
plaintext: text.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: false,
encrypted: false,
message_type: "text".to_string(),
typed_payload: None,
sender_pubkey: None,
sender_seq: None,
};
self.state.store_message(msg.clone()).await;
{
let mut status = self.state.status.write().await;
status.messages_sent += 1;
}
Ok(msg)
}
/// Broadcast our advertisement over mesh so other nodes can discover us.
/// Sends an immediate advert via the listener's command channel.
pub async fn broadcast_identity(&self) -> Result<()> {
let status = self.state.status.read().await;
if !status.device_connected {
anyhow::bail!("No mesh device connected. Check USB connection.");
}
drop(status);
self.state
.send_cmd(listener::MeshCommand::SendAdvert)
.await
.map_err(|_| anyhow::anyhow!("Mesh listener not running"))?;
info!("Mesh self-advert broadcast triggered");
Ok(())
}
/// Update mesh configuration.
pub async fn configure(&mut self, config: MeshConfig) -> Result<()> {
save_config(&self.data_dir, &config).await?;
let was_enabled = self.config.enabled;
self.config = config.clone();
// Update the status to reflect new config
{
let mut status = self.state.status.write().await;
status.enabled = config.enabled;
status.channel_name = config
.channel_name
.clone()
.unwrap_or_else(|| "archipelago".to_string());
}
// If enabled state changed, start/stop the listener
if config.enabled && !was_enabled {
self.start()?;
} else if !config.enabled && was_enabled {
self.stop().await;
// Clear connected state
let mut status = self.state.status.write().await;
status.device_connected = false;
status.device_path = None;
status.firmware_version = None;
status.self_node_id = None;
status.peer_count = 0;
}
Ok(())
}
/// Get a reference to shared state (for RPC handlers).
pub fn shared_state(&self) -> Arc<MeshState> {
Arc::clone(&self.state)
}
/// Record user activity (resets dead man's switch timer).
pub async fn dead_man_check_in(&self) {
self.dead_man_switch.check_in().await;
}
/// Get our DID.
pub fn our_did(&self) -> &str {
&self.our_did
}
}
const MAX_MESSAGES_DEFAULT: usize = 100;
// ─── Bitcoin RPC helpers for block header announcer ────────────────────
#[derive(serde::Deserialize)]
struct BitcoinRpcResponse<T> {
result: Option<T>,
error: Option<serde_json::Value>,
}
struct BlockHeaderInfo {
hash: String,
prev_hash: String,
timestamp: u32,
}
async fn bitcoin_rpc_getblockcount(client: &reqwest::Client) -> Result<u64> {
let (rpc_user, rpc_pass) = crate::bitcoin_rpc::bitcoin_rpc_credentials().await;
let body = serde_json::json!({
"jsonrpc": "1.0", "id": "mesh", "method": "getblockcount", "params": []
});
let resp: BitcoinRpcResponse<u64> = tokio::time::timeout(
Duration::from_secs(10),
client
.post(crate::constants::BITCOIN_RPC_URL)
.basic_auth(&rpc_user, Some(&rpc_pass))
.json(&body)
.send(),
)
.await
.map_err(|_| anyhow::anyhow!("Bitcoin RPC getblockcount timed out after 10s"))?
.map_err(|e| anyhow::anyhow!("Bitcoin RPC send failed: {}", e))?
.json()
.await
.map_err(|e| anyhow::anyhow!("Bitcoin RPC parse failed: {}", e))?;
if let Some(err) = resp.error {
anyhow::bail!("Bitcoin RPC: {}", err);
}
resp.result
.ok_or_else(|| anyhow::anyhow!("Bitcoin RPC null result"))
}
async fn bitcoin_rpc_getblockheader_by_height(
client: &reqwest::Client,
height: u64,
) -> Result<BlockHeaderInfo> {
let (rpc_user, rpc_pass) = crate::bitcoin_rpc::bitcoin_rpc_credentials().await;
// First get block hash for this height
let body = serde_json::json!({
"jsonrpc": "1.0", "id": "mesh", "method": "getblockhash", "params": [height]
});
let resp: BitcoinRpcResponse<String> = tokio::time::timeout(
Duration::from_secs(10),
client
.post(crate::constants::BITCOIN_RPC_URL)
.basic_auth(&rpc_user, Some(&rpc_pass))
.json(&body)
.send(),
)
.await
.map_err(|_| anyhow::anyhow!("Bitcoin RPC getblockhash timed out after 10s"))?
.map_err(|e| anyhow::anyhow!("Bitcoin RPC getblockhash send failed: {}", e))?
.json()
.await
.map_err(|e| anyhow::anyhow!("Bitcoin RPC getblockhash parse failed: {}", e))?;
let hash = resp
.result
.ok_or_else(|| anyhow::anyhow!("No block hash"))?;
// Then get full header
let body = serde_json::json!({
"jsonrpc": "1.0", "id": "mesh", "method": "getblockheader", "params": [hash, true]
});
let resp: BitcoinRpcResponse<serde_json::Value> = tokio::time::timeout(
Duration::from_secs(10),
client
.post(crate::constants::BITCOIN_RPC_URL)
.basic_auth(&rpc_user, Some(&rpc_pass))
.json(&body)
.send(),
)
.await
.map_err(|_| anyhow::anyhow!("Bitcoin RPC getblockheader timed out after 10s"))?
.map_err(|e| anyhow::anyhow!("Bitcoin RPC getblockheader send failed: {}", e))?
.json()
.await
.map_err(|e| anyhow::anyhow!("Bitcoin RPC getblockheader parse failed: {}", e))?;
let header = resp
.result
.ok_or_else(|| anyhow::anyhow!("No block header"))?;
Ok(BlockHeaderInfo {
hash: header["hash"].as_str().unwrap_or_default().to_string(),
prev_hash: header["previousblockhash"]
.as_str()
.unwrap_or_default()
.to_string(),
timestamp: header["time"].as_u64().unwrap_or(0) as u32,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mesh_config_default() {
let config = MeshConfig::default();
assert!(!config.enabled);
assert_eq!(config.channel_name, Some("archipelago".to_string()));
assert!(config.broadcast_identity);
}
#[test]
fn test_mesh_config_serialization() {
let config = MeshConfig {
enabled: true,
device_path: Some("/dev/ttyUSB0".to_string()),
channel_name: Some("test".to_string()),
broadcast_identity: false,
advert_name: Some("MyNode".to_string()),
..Default::default()
};
let json = serde_json::to_string(&config).unwrap();
let parsed: MeshConfig = serde_json::from_str(&json).unwrap();
assert!(parsed.enabled);
assert_eq!(parsed.device_path, Some("/dev/ttyUSB0".to_string()));
assert_eq!(parsed.advert_name, Some("MyNode".to_string()));
}
#[tokio::test]
async fn test_load_config_default_when_no_file() {
let dir = tempfile::tempdir().unwrap();
let config = load_config(dir.path()).await.unwrap();
assert!(!config.enabled);
}
#[tokio::test]
async fn test_save_and_load_config_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let config = MeshConfig {
enabled: true,
device_path: Some("/dev/ttyUSB0".to_string()),
channel_name: Some("archy".to_string()),
broadcast_identity: true,
advert_name: None,
..Default::default()
};
save_config(dir.path(), &config).await.unwrap();
let loaded = load_config(dir.path()).await.unwrap();
assert!(loaded.enabled);
assert_eq!(loaded.device_path, Some("/dev/ttyUSB0".to_string()));
}
}