archy/core/archipelago/src/mesh/reticulum.rs
archipelago 02b6b52a8c feat(mesh): Meshtastic RSSI/SNR + peer-location map wiring (backlog #14/#15, part 1)
Backend: parse_mesh_packet now decodes MeshPacket.rx_snr (field 8, float) and
rx_rssi (field 12, int32), and a new POSITION_APP branch decodes Position.
latitude_i/longitude_i (fields 1/2, sfixed32) -- all field numbers confirmed
against the canonical meshtastic/protobufs mesh.proto, not guessed. Threaded
through ParsedContact -> refresh_contacts -> MeshPeer (mirroring how
pkc_capable was wired for #17), so mesh.peers now surfaces real rssi/snr/lat/
lon instead of always-null. Fixed a real bug found along the way:
update_node_info's unconditional contact replace would have silently wiped
any already-tracked signal/position data on the next NodeInfo packet -- now
preserves it.

Frontend: mesh.ts's updateNodePositionsFromPeers() feeds real position data
into the SAME nodePositions map MeshMap.vue already renders from (parallel to
the existing Coordinate/Alert-message path) -- MeshMap.vue itself needed zero
changes, it was already built for this.

105/105 mesh tests pass (4 new: rx_snr/rx_rssi decode, position decode +
incomplete-field handling, full packet_to_inbound_frame integration).

Co-Authored-By: Claude Sonnet 5 <noreply@anthropic.com>
2026-06-30 22:52:42 -04:00

890 lines
37 KiB
Rust

// WIP mesh/transport protocol — suppress dead code warnings
#![allow(dead_code)]
//! Reticulum (RNS + LXMF) bridge.
//!
//! Unlike Meshcore/Meshtastic — simple framed-serial protocols driven entirely
//! in-process — Reticulum is a full network stack (identity, announce, multi-hop
//! routing, LXMF store-and-forward) that we run as a **host-supervised Python
//! daemon** (`reticulum-daemon/`, canonical `rns`+`lxmf`, chosen over the sub-1.0
//! Rust port for interop with Sideband/NomadNet/MeshChat — see the plan). This
//! module is the Rust-side half of that bridge: it owns the child process and
//! speaks the daemon's Unix-socket JSON-RPC, while presenting the same method
//! surface `MeshRadioDevice` (listener/session.rs) already calls on
//! `MeshcoreDevice`/`MeshtasticDevice`.
//!
//! Two contract details that are easy to get wrong (see the plan §2b/§2d):
//! 1. The wrapper's `send_text_msg` is handed only a 6-byte prefix, but an RNS
//! destination is 16 bytes — `prefix_to_hash` below is the mandatory
//! resolver, populated from announces/contacts.
//! 2. Inbound LXMF deliveries are translated into the exact same synthetic
//! `InboundFrame` byte layout Meshtastic already produces
//! (`RESP_CONTACT_MSG_V3[_E2E]`), so `frames::handle_frame` needs zero
//! changes to route them.
use super::message_types::{self, ContentInlinePayload, MeshMessageType, TypedEnvelope};
use super::protocol::{self, InboundFrame, ParsedContact};
use super::types::DeviceInfo;
use anyhow::{Context, Result};
use serde_json::Value;
use std::collections::HashMap;
use std::path::Path;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
use tokio::process::{Child, Command};
use tracing::{debug, info, warn};
/// RNode KISS protocol bytes, verified against the canonical Reticulum source
/// (`RNS/Interfaces/RNodeInterface.py`, `detect()`/`readLoop()`) — NOT guessed.
/// See `docs/RETICULUM-TRANSPORT-PROGRESS.md` for the citation.
const KISS_FEND: u8 = 0xC0;
const KISS_CMD_DETECT: u8 = 0x08;
const KISS_DETECT_REQ: u8 = 0x73;
const KISS_DETECT_RESP: u8 = 0x46;
const KISS_CMD_FW_VERSION: u8 = 0x50;
const KISS_CMD_PLATFORM: u8 = 0x48;
const KISS_CMD_MCU: u8 = 0x49;
const PROBE_BAUD: u32 = 115200;
const PROBE_READ_TIMEOUT: Duration = Duration::from_millis(800);
/// Path to the supervised daemon binary. In production this is the bundled
/// PyInstaller artifact shipped beside `/usr/local/bin/archipelago` (Phase 1
/// packaging); during development it can point at the venv's interpreter
/// invoking `reticulum_daemon.py` directly. Overridable for testing/packaging.
///
/// `archy_ed_pubkey_hex`/`archy_x25519_pubkey_hex` (when known) are embedded
/// by the daemon in its announce app_data as `ARCHY:2:{ed}:{x25519}` — the
/// SAME wire format meshcore/Meshtastic identity adverts use — so a
/// Reticulum-carried identity binds onto the existing Archy contact via the
/// existing `parse_identity_broadcast`/`handle_identity_received` path,
/// satisfying cross-protocol DM convergence with zero new Rust dispatch code.
fn daemon_command(
socket_path: &Path,
serial_port: &str,
identity_key: &Path,
archy_ed_pubkey_hex: Option<&str>,
archy_x25519_pubkey_hex: Option<&str>,
) -> Command {
let bin = std::env::var("ARCHY_RETICULUM_DAEMON_BIN")
.unwrap_or_else(|_| "/usr/local/bin/archy-reticulum-daemon".to_string());
let mut cmd = if Path::new(&bin).exists() {
Command::new(bin)
} else {
// Dev fallback: run the script through its venv interpreter.
let py = std::env::var("ARCHY_RETICULUM_DAEMON_PY")
.unwrap_or_else(|_| "reticulum-daemon/.venv/bin/python".to_string());
let script = std::env::var("ARCHY_RETICULUM_DAEMON_SCRIPT")
.unwrap_or_else(|_| "reticulum-daemon/reticulum_daemon.py".to_string());
let mut c = Command::new(py);
c.arg(script);
c
};
cmd.arg("--identity-key")
.arg(identity_key)
.arg("--socket")
.arg(socket_path)
.arg("--serial-port")
.arg(serial_port);
if let (Some(ed), Some(x)) = (archy_ed_pubkey_hex, archy_x25519_pubkey_hex) {
cmd.arg("--archy-ed-pubkey-hex")
.arg(ed)
.arg("--archy-x25519-pubkey-hex")
.arg(x);
}
cmd.kill_on_drop(true)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
cmd
}
/// One peer learned via an RNS announce (LXMF delivery destination).
#[derive(Clone)]
struct ReticulumPeer {
dest_hash: [u8; 16],
display_name: String,
/// Archy ed25519 identity hex, once carried in a verified announce
/// app-data blob. Not yet wired (TODO, lands with the signed-announce
/// work) — present so `get_contacts` has a stable shape to extend into.
arch_pubkey_hex: Option<String>,
reachable: bool,
}
/// Bridge handle to one supervised `reticulum-daemon` instance, one per active
/// Reticulum (RNode) radio. Implements the same method shapes
/// `MeshRadioDevice` calls on `MeshcoreDevice`/`MeshtasticDevice`.
pub struct ReticulumLink {
device_path: String,
socket_path: std::path::PathBuf,
child: Child,
writer: tokio::net::unix::OwnedWriteHalf,
reader: BufReader<tokio::net::unix::OwnedReadHalf>,
dest_hash: [u8; 16],
display_name: Option<String>,
/// Mandatory: the wrapper's `send_text_msg`/inbound frames only carry a
/// 6-byte prefix, but an RNS destination is 16 bytes. Populated from
/// announces and `get_contacts`.
prefix_to_hash: HashMap<[u8; 6], [u8; 16]>,
peers: HashMap<[u8; 16], ReticulumPeer>,
inbound: std::collections::VecDeque<InboundFrame>,
/// Monotonic correlation id for `send_resource` RPC calls — purely for
/// matching `resource_progress`/`resource_sent`/`resource_failed` events
/// back to a log line; sends are fire-and-forget (see `send_resource`).
resource_id_counter: u64,
}
impl ReticulumLink {
/// Cheap probe: send the verified RNode KISS detect sequence over the raw
/// serial port and look for `DETECT_RESP`, WITHOUT spawning the (heavy)
/// daemon. Mirrors the open()/initialize() split Meshcore/Meshtastic use,
/// so a non-RNode port is rejected fast and the daemon is only ever
/// started against a port we've confirmed is an RNode.
///
/// `data_dir` is the same archipelago data directory `NodeIdentity` was
/// loaded from (`{data_dir}/identity/node_key`) — the daemon reads that
/// key file directly to derive its RNS identity (we pass the path, not
/// the key bytes, so it never travels through more hops than necessary).
///
/// `our_ed_pubkey_hex`/`our_x25519_pubkey_hex` are this node's real Archy
/// identity pubkeys (already computed by the caller — same values passed
/// to `run_mesh_session`); forwarded to the daemon so its announces carry
/// a peer-bindable `ARCHY:2:...` identity. Pass `None` to announce with
/// just the plain display name (e.g. a non-archy/dev run).
pub async fn open(
path: &str,
data_dir: &Path,
our_ed_pubkey_hex: Option<&str>,
our_x25519_pubkey_hex: Option<&str>,
) -> Result<Self> {
probe_rnode(path).await.context("RNode KISS detect failed")?;
Self::spawn(path, data_dir, our_ed_pubkey_hex, our_x25519_pubkey_hex).await
}
async fn spawn(
path: &str,
data_dir: &Path,
our_ed_pubkey_hex: Option<&str>,
our_x25519_pubkey_hex: Option<&str>,
) -> Result<Self> {
// Keep the RPC socket under the archipelago-owned data dir (not the
// shared system temp dir) so its access is bounded by the same
// permissions as the rest of our state — consistent with the
// "archipelago-owned runtime dir, 0600" security posture.
let runtime_dir = data_dir.join("reticulum");
tokio::fs::create_dir_all(&runtime_dir)
.await
.context("Failed to create reticulum runtime dir")?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = tokio::fs::set_permissions(&runtime_dir, std::fs::Permissions::from_mode(0o700))
.await;
}
let socket_path = runtime_dir.join(format!(
"{}.sock",
path.replace(['/', ' '], "_")
));
if socket_path.exists() {
let _ = std::fs::remove_file(&socket_path);
}
let identity_key = data_dir.join("identity").join("node_key");
if !identity_key.exists() {
anyhow::bail!(
"Archy identity key not found at {} — cannot derive a Reticulum identity",
identity_key.display()
);
}
let mut cmd = daemon_command(
&socket_path,
path,
&identity_key,
our_ed_pubkey_hex,
our_x25519_pubkey_hex,
);
let mut child = cmd
.spawn()
.context("Failed to spawn reticulum-daemon — is it installed/packaged?")?;
// Wait for the socket to appear, then for the daemon's "ready" event.
let deadline = tokio::time::Instant::now() + Duration::from_secs(15);
let stream = loop {
if tokio::time::Instant::now() > deadline {
let _ = child.start_kill();
anyhow::bail!("reticulum-daemon did not create its RPC socket in time");
}
match UnixStream::connect(&socket_path).await {
Ok(s) => break s,
Err(_) => tokio::time::sleep(Duration::from_millis(150)).await,
}
};
let (read_half, write_half) = stream.into_split();
let mut reader = BufReader::new(read_half);
let mut line = String::new();
tokio::time::timeout(Duration::from_secs(10), reader.read_line(&mut line))
.await
.context("Timed out waiting for reticulum-daemon ready event")?
.context("reticulum-daemon RPC connection closed before ready")?;
let ready: Value = serde_json::from_str(line.trim())
.context("reticulum-daemon sent a non-JSON ready line")?;
if ready.get("event").and_then(Value::as_str) != Some("ready") {
anyhow::bail!("reticulum-daemon's first message was not 'ready': {ready}");
}
let dest_hash_hex = ready
.get("dest_hash")
.and_then(Value::as_str)
.context("ready event missing dest_hash")?;
let dest_hash = parse_hash16(dest_hash_hex)?;
let display_name = ready
.get("display_name")
.and_then(Value::as_str)
.map(str::to_string);
info!(
path = %path,
dest_hash = %dest_hash_hex,
"Reticulum daemon ready"
);
Ok(Self {
device_path: path.to_string(),
socket_path,
child,
writer: write_half,
reader,
dest_hash,
display_name,
prefix_to_hash: HashMap::new(),
peers: HashMap::new(),
inbound: std::collections::VecDeque::new(),
resource_id_counter: 0,
})
}
fn next_resource_id(&mut self) -> String {
self.resource_id_counter += 1;
self.resource_id_counter.to_string()
}
/// Handshake is a no-op here — `open()` already waited for the daemon's
/// `ready` event, so by the time `MeshRadioDevice` calls `initialize()`
/// the daemon (and the RNS/LXMF stack inside it) is already up.
pub async fn initialize(&mut self) -> Result<DeviceInfo> {
Ok(DeviceInfo {
firmware_version: "reticulum-daemon".to_string(),
node_id: reticulum_contact_id_from_hash(&self.dest_hash),
max_contacts: u16::MAX,
device_type: super::types::DeviceType::Reticulum,
})
}
pub fn advert_name(&self) -> Option<String> {
self.display_name.clone()
}
pub async fn set_advert_name(&mut self, name: &str) -> Result<()> {
// The daemon's display_name is fixed at spawn time (CLI arg); changing
// it live would require an RPC verb we haven't added. Track locally so
// `advert_name()` reflects the caller's intent even though the
// RNS-visible name doesn't change until the daemon restarts.
self.display_name = Some(name.to_string());
Ok(())
}
pub async fn send_self_advert(&mut self) -> Result<()> {
self.send_rpc(serde_json::json!({"cmd": "announce"})).await
}
/// Reticulum/LXMF has no shared-PSK broadcast channel like Meshcore's
/// channel 0 or Meshtastic's primary channel — it's point-to-point +
/// propagation-node store-and-forward. Treat as unsupported-but-harmless
/// (no-op success) rather than failing the caller, matching the no-op
/// pattern used for other not-applicable operations on this enum arm.
/// Per-network channel support is tracked in the plan's Phase 4.
pub async fn send_channel_text(&mut self, _channel: u8, _payload: &[u8]) -> Result<()> {
debug!("Reticulum has no broadcast-channel concept — ignoring channel send");
Ok(())
}
pub async fn send_text_msg(&mut self, dest_pubkey_prefix: &[u8; 6], payload: &[u8]) -> Result<()> {
let dest_hash = self
.prefix_to_hash
.get(dest_pubkey_prefix)
.copied()
.with_context(|| {
format!(
"Unknown Reticulum prefix {} — peer hasn't announced yet",
hex::encode(dest_pubkey_prefix)
)
})?;
self.send_rpc(serde_json::json!({
"cmd": "send",
"dest_hash": hex::encode(dest_hash),
"content": String::from_utf8_lossy(payload),
"method": "direct",
}))
.await
}
/// Send an image to a peer via LXMF's native `FIELD_IMAGE`, instead of our
/// own typed-envelope wire format — for a stock Sideband/NomadNet peer
/// (not an archy contact), which has no way to decode our CBOR envelope.
/// Caller (the RPC layer) gates this on `is_archy_peer(contact_id) ==
/// false`; archy peers keep using `send_text_msg`/`send_resource` with
/// the typed envelope so rich fields (caption, cid, thumb) survive.
pub async fn send_native_image(
&mut self,
dest_pubkey_prefix: &[u8; 6],
mime: &str,
bytes: &[u8],
caption: Option<&str>,
) -> Result<()> {
use base64::{engine::general_purpose::STANDARD as B64, Engine as _};
let dest_hash = self
.prefix_to_hash
.get(dest_pubkey_prefix)
.copied()
.with_context(|| {
format!(
"Unknown Reticulum prefix {} — peer hasn't announced yet",
hex::encode(dest_pubkey_prefix)
)
})?;
self.send_rpc(serde_json::json!({
"cmd": "send",
"dest_hash": hex::encode(dest_hash),
"content": caption.unwrap_or(""),
"method": "direct",
"image_format": mime_to_lxmf_format(mime),
"image_b64": B64.encode(bytes),
}))
.await
}
/// Send `data` (typically an already-built typed-envelope wire blob) to a
/// peer over a dedicated RNS Resource transfer instead of the small LXMF
/// "content" path `send_text_msg` uses — for payloads too large for the
/// inline-chunk cap but well within what LoRa can carry over a proper RNS
/// Resource (native chunked transfer with retries, unlike our own
/// MC-chunk scheme). Fire-and-forget, matching `send_text_msg`'s existing
/// semantics (no synchronous delivery confirmation) — `resource_sent`/
/// `resource_failed`/`resource_progress` events are drained and logged by
/// `handle_event`, not awaited here.
pub async fn send_resource(&mut self, dest_pubkey_prefix: &[u8; 6], data: &[u8]) -> Result<()> {
use base64::{engine::general_purpose::STANDARD as B64, Engine as _};
let dest_hash = self
.prefix_to_hash
.get(dest_pubkey_prefix)
.copied()
.with_context(|| {
format!(
"Unknown Reticulum prefix {} — peer hasn't announced yet",
hex::encode(dest_pubkey_prefix)
)
})?;
let req_id = self.next_resource_id();
self.send_rpc(serde_json::json!({
"cmd": "send_resource",
"id": req_id,
"dest_hash": hex::encode(dest_hash),
"data_b64": B64.encode(data),
}))
.await
}
pub async fn remove_contact(&mut self, _pubkey: &[u8; 32]) -> Result<()> {
// RNS has no firmware-side contact table to prune — peers simply stop
// being reachable when their announce/path ages out.
Ok(())
}
pub async fn add_contact(
&mut self,
_pubkey: &[u8; 32],
_contact_type: u8,
_flags: u8,
_out_path_len: u8,
_name: &str,
_last_advert: u32,
) -> Result<()> {
// No firmware contact table to seed — RNS learns peers from announces
// (handled by drain_events) and from path requests issued on send.
Ok(())
}
pub async fn get_contacts(&mut self) -> Result<Vec<ParsedContact>> {
self.drain_events().await;
Ok(self
.peers
.values()
.map(|p| ParsedContact {
public_key_hex: hex::encode(p.dest_hash),
advert_name: p.display_name.clone(),
last_advert: 0,
// Deliberately not 1 ("friend"/meshcore type), so the
// meshcore-only auto-heal `reset_contact_path` loop in
// `refresh_contacts` (session.rs) skips these — RNS does its
// own pathfinding, there is no firmware path to reset.
contact_type: 2,
path_len: if p.reachable { 1 } else { 0 },
flags: 0,
// RNS/LXMF is unconditionally E2E — see `take_rx_encrypted`.
// This field tracks Meshtastic's per-contact PKC capability,
// which has no Reticulum analogue (always true, tracked
// elsewhere via `take_rx_encrypted`), so leave it false here.
pkc_capable: false,
// RSSI/SNR/position are Meshtastic-only for now (see the
// Meshtastic 1.8.0 backlog plan) — RNS doesn't expose
// per-packet signal quality through LXMF, and there's no
// Reticulum position-sharing convention wired up.
rssi: None,
snr: None,
lat: None,
lon: None,
})
.collect())
}
pub async fn sync_messages(&mut self) -> Result<Vec<InboundFrame>> {
self.drain_events().await;
Ok(self.inbound.drain(..).collect())
}
pub async fn try_recv_frame(&mut self) -> Result<Option<InboundFrame>> {
self.drain_events().await;
Ok(self.inbound.pop_front())
}
/// RNS/LXMF links are end-to-end encrypted by default (no plaintext mode),
/// so every inbound delivery is E2E. Unlike Meshtastic, which only
/// sometimes gets PKI delivery, this is unconditionally true.
pub fn take_rx_encrypted(&mut self) -> bool {
true
}
// ── internals ──────────────────────────────────────────────────────
async fn send_rpc(&mut self, req: Value) -> Result<()> {
let mut line = serde_json::to_vec(&req)?;
line.push(b'\n');
self.writer
.write_all(&line)
.await
.context("Reticulum daemon RPC write failed")?;
Ok(())
}
/// Drain any buffered daemon events (non-blocking) and translate them into
/// peer-table updates / synthetic InboundFrames.
async fn drain_events(&mut self) {
loop {
let mut line = String::new();
let read = tokio::time::timeout(
Duration::from_millis(20),
self.reader.read_line(&mut line),
)
.await;
let n = match read {
Ok(Ok(n)) => n,
_ => break, // timeout (no data) or read error — stop draining
};
if n == 0 {
warn!("Reticulum daemon RPC connection closed");
break;
}
let Ok(ev) = serde_json::from_str::<Value>(line.trim()) else {
continue;
};
self.handle_event(ev);
}
}
fn handle_event(&mut self, ev: Value) {
match ev.get("event").and_then(Value::as_str) {
Some("announce") => {
let Some(hash) = ev
.get("dest_hash")
.and_then(Value::as_str)
.and_then(|h| parse_hash16(h).ok())
else {
return;
};
let prefix: [u8; 6] = hash[..6].try_into().unwrap();
self.prefix_to_hash.insert(prefix, hash);
let app_data_text = ev
.get("app_data")
.and_then(Value::as_str)
.and_then(|h| hex::decode(h).ok())
.map(|b| String::from_utf8_lossy(&b).to_string())
.filter(|s| !s.is_empty());
// If the announce app_data is an ARCHY:n: identity blob (see
// daemon_command's doc comment), bind it onto this peer AND
// surface it through the SAME channel-text path
// meshcore/Meshtastic identity adverts use
// (frames::handle_channel_payload -> parse_identity_broadcast
// -> handle_identity_received -> bind_federation_twins), so a
// Reticulum-carried identity merges into the same conversation
// as that node's other-transport twins — zero new bind logic.
let is_identity_blob = app_data_text
.as_deref()
.map(|t| protocol::parse_identity_broadcast(t).is_some())
.unwrap_or(false);
if is_identity_blob {
let text = app_data_text.clone().unwrap();
let mut data = Vec::with_capacity(7 + text.len());
data.push(0); // channel index — unused by the identity path
data.extend_from_slice(&prefix);
data.extend_from_slice(text.as_bytes());
self.inbound.push_back(InboundFrame {
code: protocol::RESP_MESHTASTIC_CHANNEL_TEXT,
data,
bytes_consumed: 0,
});
}
let display_name = app_data_text
.filter(|_| !is_identity_blob)
.unwrap_or_else(|| format!("Reticulum {}", hex::encode(&hash[..4])));
self.peers
.entry(hash)
.and_modify(|p| {
p.display_name = display_name.clone();
p.reachable = true;
})
.or_insert(ReticulumPeer {
dest_hash: hash,
display_name,
arch_pubkey_hex: None,
reachable: true,
});
}
Some("recv") => {
let Some(source_hex) = ev.get("source_hash").and_then(Value::as_str) else {
return;
};
let Ok(source_hash) = parse_hash16(source_hex) else {
return;
};
let prefix: [u8; 6] = source_hash[..6].try_into().unwrap();
self.prefix_to_hash.insert(prefix, source_hash);
// A stock LXMF client (Sideband/NomadNet — not an archy peer)
// carries photos/files in native LXMF fields, not our own
// typed-envelope wire format. Check those FIRST: if present,
// build the SAME ContentInline typed envelope our own
// attachment pipeline uses, so it renders identically in the
// UI (dispatch.rs's existing ContentInline handling, zero new
// frontend code) instead of the plain text bytes below.
use base64::{engine::general_purpose::STANDARD as B64, Engine as _};
let caption = ev.get("content").and_then(Value::as_str).filter(|s| !s.trim().is_empty());
if let (Some(fmt), Some(b64)) = (
ev.get("image_format").and_then(Value::as_str),
ev.get("image_b64").and_then(Value::as_str),
) {
if let Ok(bytes) = B64.decode(b64) {
match build_content_inline_frame(&prefix, image_format_to_mime(fmt), None, caption, bytes) {
Ok(frame) => {
self.inbound.push_back(frame);
return;
}
Err(e) => warn!("Failed to build native image frame: {}", e),
}
}
}
if let (Some(filename), Some(b64)) = (
ev.get("attachment_filename").and_then(Value::as_str),
ev.get("attachment_b64").and_then(Value::as_str),
) {
if let Ok(bytes) = B64.decode(b64) {
match build_content_inline_frame(
&prefix,
"application/octet-stream",
Some(filename),
caption,
bytes,
) {
Ok(frame) => {
self.inbound.push_back(frame);
return;
}
Err(e) => warn!("Failed to build native attachment frame: {}", e),
}
}
}
let content = ev
.get("content")
.and_then(Value::as_str)
.unwrap_or("")
.as_bytes()
.to_vec();
self.inbound.push_back(build_synthetic_frame(&prefix, &content));
}
Some("resource_recv") => {
let Some(source_hex) = ev.get("source_hash").and_then(Value::as_str) else {
return;
};
let Ok(source_hash) = parse_hash16(source_hex) else {
return;
};
let prefix: [u8; 6] = source_hash[..6].try_into().unwrap();
self.prefix_to_hash.insert(prefix, source_hash);
use base64::{engine::general_purpose::STANDARD as B64, Engine as _};
let Some(data) = ev
.get("data_b64")
.and_then(Value::as_str)
.and_then(|b64| B64.decode(b64).ok())
else {
warn!("resource_recv event with missing/invalid data_b64");
return;
};
// Resources carry the complete typed-envelope wire bytes
// directly (no MC-chunk/base64 textification needed — RNS
// Resources are already a binary-safe whole-blob transfer),
// so this is the same payload shape `decode.rs` already
// accepts for a single-frame (non-chunked) typed envelope.
self.inbound.push_back(build_synthetic_frame(&prefix, &data));
}
Some("resource_progress") => {
debug!(
id = ?ev.get("id"),
transferred = ?ev.get("transferred"),
total = ?ev.get("total"),
"Reticulum resource transfer progress"
);
}
Some("resource_sent") => {
debug!(id = ?ev.get("id"), "Reticulum resource transfer completed");
}
Some("resource_failed") => {
warn!(
id = ?ev.get("id"),
reason = ?ev.get("reason"),
"Reticulum resource transfer failed"
);
}
Some("delivered") | Some("status") | Some("ready") | Some("error") | None => {}
_ => {}
}
}
}
/// Build the synthetic `RESP_CONTACT_MSG_V3_E2E` InboundFrame for an inbound
/// LXMF message, byte-for-byte matching the layout Meshtastic already
/// produces (meshtastic.rs `parse_meshtastic_frame`) so `frames::handle_frame`
/// needs no Reticulum-specific branch:
/// [snr(1)=0][reserved(2)][sender_prefix(6)][path(1)=0xff][type(1)=0][rx_time(4 LE)][payload]
fn build_synthetic_frame(sender_prefix: &[u8; 6], payload: &[u8]) -> InboundFrame {
let mut data = Vec::with_capacity(15 + payload.len());
data.push(0); // SNR unknown (RNS doesn't expose per-packet SNR through LXMF)
data.extend_from_slice(&[0, 0]); // reserved
data.extend_from_slice(sender_prefix);
data.push(0xff); // path: RNS does its own multi-hop routing, not exposed here
data.push(0); // text type
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as u32;
data.extend_from_slice(&now.to_le_bytes());
data.extend_from_slice(payload);
InboundFrame {
// Reticulum/LXMF is always end-to-end encrypted — no plaintext mode.
code: protocol::RESP_CONTACT_MSG_V3_E2E,
data,
bytes_consumed: 0,
}
}
/// Wrap a native LXMF attachment (image field or file-attachments field, from
/// a stock Sideband/NomadNet peer — see the `Some("recv")` branch above) as
/// the SAME `ContentInline` typed envelope our own attachment pipeline
/// produces, so it renders identically in the UI via the existing
/// `dispatch.rs` `ContentInline` handling — no new frontend code needed.
fn build_content_inline_frame(
sender_prefix: &[u8; 6],
mime: &str,
filename: Option<&str>,
caption: Option<&str>,
bytes: Vec<u8>,
) -> Result<InboundFrame> {
let payload = ContentInlinePayload {
mime: mime.to_string(),
filename: filename.map(str::to_string),
caption: caption.map(str::to_string),
bytes,
};
let encoded = message_types::encode_payload(&payload)?;
let wire = TypedEnvelope::new(MeshMessageType::ContentInline, encoded).to_wire()?;
Ok(build_synthetic_frame(sender_prefix, &wire))
}
/// Map an LXMF `FIELD_IMAGE` format string (Sideband uses bare extensions
/// like "png"/"jpg"/"webp", confirmed against its own source) to a MIME type
/// the frontend's `isImageMime`/`<img>` rendering already understands.
fn image_format_to_mime(fmt: &str) -> &'static str {
match fmt.trim_start_matches('.').to_ascii_lowercase().as_str() {
"jpg" | "jpeg" => "image/jpeg",
"webp" => "image/webp",
"gif" => "image/gif",
"bmp" => "image/bmp",
_ => "image/png",
}
}
/// Inverse of `image_format_to_mime`, for `send_native_image` — our attach
/// pipeline always compresses to JPEG (`imageCompression.ts`) except the
/// 'original' preset, so this covers the mimes that can actually reach here.
fn mime_to_lxmf_format(mime: &str) -> &'static str {
match mime {
"image/jpeg" | "image/jpg" => "jpg",
"image/webp" => "webp",
"image/gif" => "gif",
"image/bmp" => "bmp",
_ => "png",
}
}
/// Derive a stable `u32` contact id from the 16-byte RNS destination hash,
/// masked to the low (non-federation-synthetic) id space. Sibling to
/// `meshtastic_contact_id` (listener/session.rs). Kept here so `initialize()`
/// can report a `node_id` consistent with what `refresh_contacts` will later
/// assign via the public helper of the same name in session.rs.
pub(crate) fn reticulum_contact_id_from_hash(hash: &[u8; 16]) -> u32 {
let raw = u32::from_le_bytes([hash[0], hash[1], hash[2], hash[3]]);
let masked = raw & 0x7FFF_FFFF;
if masked == 0 {
1
} else {
masked
}
}
fn parse_hash16(hex_str: &str) -> Result<[u8; 16]> {
let bytes = hex::decode(hex_str).context("invalid hex")?;
bytes
.try_into()
.map_err(|b: Vec<u8>| anyhow::anyhow!("expected 16 bytes, got {}", b.len()))
}
/// Send the verified RNode KISS detect sequence and look for `DETECT_RESP`.
/// Bytes confirmed against the canonical Reticulum source — see the module
/// doc comment and `docs/RETICULUM-TRANSPORT-PROGRESS.md`.
async fn probe_rnode(path: &str) -> Result<()> {
let port = serial2_tokio::SerialPort::open(path, PROBE_BAUD)
.with_context(|| format!("Failed to open {} for Reticulum probe", path))?;
let probe: [u8; 13] = [
KISS_FEND,
KISS_CMD_DETECT,
KISS_DETECT_REQ,
KISS_FEND,
KISS_CMD_FW_VERSION,
0x00,
KISS_FEND,
KISS_CMD_PLATFORM,
0x00,
KISS_FEND,
KISS_CMD_MCU,
0x00,
KISS_FEND,
];
tokio::time::timeout(Duration::from_millis(500), port.write_all(&probe))
.await
.context("RNode probe write timed out")?
.context("RNode probe write failed")?;
let mut buf = [0u8; 256];
let mut seen = Vec::new();
let deadline = tokio::time::Instant::now() + PROBE_READ_TIMEOUT;
while tokio::time::Instant::now() < deadline {
match tokio::time::timeout(Duration::from_millis(150), port.read(&mut buf)).await {
Ok(Ok(n)) if n > 0 => {
seen.extend_from_slice(&buf[..n]);
if contains_detect_resp(&seen) {
return Ok(());
}
}
_ => continue,
}
}
anyhow::bail!("No RNode DETECT_RESP within {:?}", PROBE_READ_TIMEOUT)
}
/// Look for the `[FEND, CMD_DETECT, DETECT_RESP]` sequence anywhere in the
/// buffer (KISS framing means other command responses may interleave first).
fn contains_detect_resp(buf: &[u8]) -> bool {
buf.windows(3)
.any(|w| w == [KISS_FEND, KISS_CMD_DETECT, KISS_DETECT_RESP])
}
impl Drop for ReticulumLink {
fn drop(&mut self) {
// Best-effort: ask the daemon to shut down cleanly (frees the serial
// port promptly); `kill_on_drop` on the Command is the hard backstop
// if the daemon doesn't exit in time.
let _ = self.child.start_kill();
let _ = std::fs::remove_file(&self.socket_path);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn detect_resp_found_in_kiss_stream() {
let stream = [0x10, 0x20, KISS_FEND, KISS_CMD_DETECT, KISS_DETECT_RESP, 0x99];
assert!(contains_detect_resp(&stream));
}
/// Hardware gate: `probe_rnode` against a real RNode-flashed board. Not run in
/// CI (no hardware there) — run manually with
/// `ARCHY_RNODE_TEST_PORT=/dev/ttyUSB0 cargo test -p archipelago --lib
/// mesh::reticulum::tests::probe_rnode_detects_real_hardware -- --ignored --nocapture`
/// once an RNode-flashed board is attached. Confirms the byte-for-byte KISS
/// constants documented above (cited from canonical RNS source) actually work
/// against real firmware, not just the unit-tested byte matcher above.
#[tokio::test]
#[ignore = "requires a real RNode-flashed board; set ARCHY_RNODE_TEST_PORT"]
async fn probe_rnode_detects_real_hardware() {
let port = std::env::var("ARCHY_RNODE_TEST_PORT")
.expect("set ARCHY_RNODE_TEST_PORT to the RNode's serial path");
probe_rnode(&port).await.expect("KISS detect probe failed against real hardware");
}
#[test]
fn detect_resp_absent_in_unrelated_stream() {
let stream = [0x10, 0x20, 0x30, 0x40];
assert!(!contains_detect_resp(&stream));
}
#[test]
fn contact_id_masks_high_bit_and_avoids_zero() {
let hash_high_bit = {
let mut h = [0u8; 16];
h[0..4].copy_from_slice(&0xFFFF_FFFFu32.to_le_bytes());
h
};
let id = reticulum_contact_id_from_hash(&hash_high_bit);
assert!(id < 0x8000_0000, "must not collide with federation-synthetic space");
assert_ne!(id, 0);
let zero_hash = [0u8; 16];
assert_eq!(reticulum_contact_id_from_hash(&zero_hash), 1);
}
#[test]
fn synthetic_frame_matches_meshtastic_layout() {
let prefix = [1, 2, 3, 4, 5, 6];
let frame = build_synthetic_frame(&prefix, b"hello");
assert_eq!(frame.code, protocol::RESP_CONTACT_MSG_V3_E2E);
// header is 15 bytes before the payload, per the documented layout
assert_eq!(&frame.data[3..9], &prefix);
assert_eq!(frame.data[9], 0xff);
assert_eq!(frame.data[10], 0);
assert_eq!(&frame.data[15..], b"hello");
}
}