feat(mesh): rich typed Sent records and echo dedup

Adds message_type + typed_payload (JSON) to MeshMessage so the UI can
render invoice/alert/coordinate/tx/lightning messages as structured
cards in both directions instead of showing raw wire bytes on the
Sent side. RPC handlers now route through send_typed_wire /
send_channel_typed_wire which transmit the binary envelope directly
(no utf8_lossy corruption) and record a rich Sent MeshMessage.

Also: store_message deduplicates echo-back doubles (20-msg lookback,
30s window), from_name is plumbed through the federation Incoming
path, and peer_dest_prefix / send_raw_payload are factored out of
send_message.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dorian 2026-04-13 08:01:10 -04:00
parent 18284e1592
commit 3ed9243c50
8 changed files with 244 additions and 32 deletions

View File

@ -17,6 +17,7 @@ impl ApiHandler {
}
let incoming: Incoming = serde_json::from_slice(&body).unwrap_or(Incoming {
from_pubkey: None,
from_name: None,
message: None,
signature: None,
encrypted: false,

View File

@ -38,8 +38,15 @@ impl RpcHandler {
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let wire_str = String::from_utf8_lossy(&wire).to_string();
let msg = svc.send_message(contact_id, &wire_str).await?;
let display = format!(
"Invoice: {} sats{}",
amount_sats,
memo.as_ref().map(|m| format!("{}", m)).unwrap_or_default()
);
let typed_json = serde_json::to_value(&invoice).ok();
let msg = svc
.send_typed_wire(contact_id, wire, "invoice", &display, typed_json)
.await?;
info!(contact_id, amount_sats, "Sent invoice over mesh");
Ok(serde_json::json!({
@ -77,8 +84,16 @@ impl RpcHandler {
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let wire_str = String::from_utf8_lossy(&wire).to_string();
let msg = svc.send_message(contact_id, &wire_str).await?;
let display = format!(
"Location: {:.6}, {:.6}{}",
coord.lat_degrees(),
coord.lng_degrees(),
coord.label.as_ref().map(|l| format!(" ({})", l)).unwrap_or_default()
);
let typed_json = serde_json::to_value(&coord).ok();
let msg = svc
.send_typed_wire(contact_id, wire, "coordinate", &display, typed_json)
.await?;
info!(contact_id, "Sent coordinate over mesh");
Ok(serde_json::json!({
@ -153,13 +168,23 @@ impl RpcHandler {
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let wire_str = String::from_utf8_lossy(&wire).to_string();
let display = alert.message.clone();
let typed_json = serde_json::to_value(&alert).ok();
if broadcast {
// Send on channel (all peers)
svc.send_message(0, &wire_str).await?;
// Send on public channel (all peers) as raw bytes so the binary
// envelope is not corrupted by utf8 conversion.
svc.send_channel_typed_wire(0, wire, "alert", &display, typed_json.clone())
.await?;
info!(alert_type = alert_type_str, "Broadcast alert over mesh");
} else if let Some(contact_id) = params["contact_id"].as_u64() {
svc.send_message(contact_id as u32, &wire_str).await?;
svc.send_typed_wire(
contact_id as u32,
wire,
"alert",
&display,
typed_json,
)
.await?;
info!(contact_id, alert_type = alert_type_str, "Sent alert to peer");
} else {
anyhow::bail!("Must specify contact_id or broadcast: true");

View File

@ -276,6 +276,8 @@ pub(super) async fn store_plain_message(
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: true,
encrypted: false,
message_type: "text".to_string(),
typed_payload: None,
};
state.store_message(msg.clone()).await;
state.status.write().await.messages_received += 1;
@ -438,6 +440,8 @@ pub(super) async fn handle_received_message(
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: true,
encrypted,
message_type: "text".to_string(),
typed_payload: None,
};
state.store_message(msg.clone()).await;

View File

@ -8,13 +8,14 @@ use super::super::types::*;
use std::sync::Arc;
use tracing::{debug, info, warn};
/// Store a typed message with a type label for UI rendering.
/// Store a typed message with a type label and structured JSON payload for UI rendering.
async fn store_typed_message(
state: &Arc<MeshState>,
contact_id: u32,
peer_name: &str,
text: &str,
display_text: &str,
type_label: &str,
typed_payload: Option<serde_json::Value>,
) {
let msg_id = state.next_id().await;
let msg = MeshMessage {
@ -22,16 +23,24 @@ async fn store_typed_message(
direction: MessageDirection::Received,
peer_contact_id: contact_id,
peer_name: Some(peer_name.to_string()),
plaintext: format!("[{}] {}", type_label, text),
plaintext: display_text.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: true,
encrypted: false,
message_type: type_label.to_string(),
typed_payload,
};
state.store_message(msg.clone()).await;
state.status.write().await.messages_received += 1;
let _ = state.event_tx.send(MeshEvent::MessageReceived(msg));
}
/// Serialize a decoded payload to JSON for the UI layer.
/// Falls back to `None` on serialization failure (shouldn't happen for our serde types).
fn payload_to_json<T: serde::Serialize>(v: &T) -> Option<serde_json::Value> {
serde_json::to_value(v).ok()
}
/// Handle a typed message envelope (0x02 prefix).
/// Dispatches to type-specific handlers: BlockHeader, Alert, TxRelay, etc.
pub(super) async fn handle_typed_message(
@ -105,12 +114,14 @@ pub(super) async fn handle_typed_message(
"Alert received via mesh: {}",
alert.message
);
let json = payload_to_json(&alert);
store_typed_message(
state,
sender_contact_id,
sender_name,
&alert.message,
"alert",
json,
)
.await;
let _ = state.event_tx.send(MeshEvent::AlertReceived {
@ -136,12 +147,14 @@ pub(super) async fn handle_typed_message(
tx_len = relay.tx_hex.len(),
"TX relay request received — broadcasting to Bitcoin network"
);
let json = payload_to_json(&relay);
store_typed_message(
state,
sender_contact_id,
sender_name,
&format!("TX relay request #{} ({} hex chars)", relay.request_id, relay.tx_hex.len()),
"tx_relay",
json,
)
.await;
@ -170,12 +183,14 @@ pub(super) async fn handle_typed_message(
amount_sats = relay.amount_sats,
"Lightning relay request received"
);
let json = payload_to_json(&relay);
store_typed_message(
state,
sender_contact_id,
sender_name,
&format!("Lightning relay: {} sats", relay.amount_sats),
"lightning_relay",
json,
)
.await;
// Will be wired to LND in Week 9
@ -201,7 +216,8 @@ pub(super) async fn handle_typed_message(
} else {
format!("Lightning failed: {}", resp.error.as_deref().unwrap_or("unknown"))
};
store_typed_message(state, sender_contact_id, sender_name, &text, "lightning_relay_response").await;
let json = payload_to_json(&resp);
store_typed_message(state, sender_contact_id, sender_name, &text, "lightning_relay_response", json).await;
let _ = state.event_tx.send(MeshEvent::LightningRelayCompleted {
request_id: resp.request_id,
payment_hash: resp.payment_hash,
@ -220,7 +236,8 @@ pub(super) async fn handle_typed_message(
invoice.amount_sats,
invoice.memo.as_ref().map(|m| format!("{}", m)).unwrap_or_default()
);
store_typed_message(state, sender_contact_id, sender_name, &text, "invoice").await;
let json = payload_to_json(&invoice);
store_typed_message(state, sender_contact_id, sender_name, &text, "invoice", json).await;
}
Err(e) => warn!("Failed to decode invoice payload: {}", e),
}
@ -235,7 +252,8 @@ pub(super) async fn handle_typed_message(
coord.lng_degrees(),
coord.label.as_ref().map(|l| format!(" ({})", l)).unwrap_or_default()
);
store_typed_message(state, sender_contact_id, sender_name, &text, "coordinate").await;
let json = payload_to_json(&coord);
store_typed_message(state, sender_contact_id, sender_name, &text, "coordinate", json).await;
}
Err(e) => warn!("Failed to decode coordinate payload: {}", e),
}
@ -291,7 +309,7 @@ async fn dispatch_block_header(
timestamp,
announced_by: sender_name.to_string(),
};
let _ = state.block_header_cache.store_header(header_payload).await;
let _ = state.block_header_cache.store_header(header_payload.clone()).await;
let text = format!(
"Block #{} — {}...{}",
@ -299,12 +317,14 @@ async fn dispatch_block_header(
&hash_hex[..8.min(hash_hex.len())],
&hash_hex[hash_hex.len().saturating_sub(8)..]
);
let json = payload_to_json(&header_payload);
store_typed_message(
state,
sender_contact_id,
sender_name,
&text,
"block_header",
json,
)
.await;
let _ = state.event_tx.send(MeshEvent::BlockHeaderReceived {
@ -339,7 +359,8 @@ async fn dispatch_tx_relay_response(
} else {
format!("TX relay failed: {}", resp.error.as_deref().unwrap_or("unknown"))
};
store_typed_message(state, sender_contact_id, sender_name, &text, "tx_relay_response").await;
let json = payload_to_json(&resp);
store_typed_message(state, sender_contact_id, sender_name, &text, "tx_relay_response", json).await;
// Store result for frontend polling
if let Some(ref tracker) = state.relay_tracker {
tracker.store_result(super::super::bitcoin_relay::RelayResult {
@ -380,7 +401,8 @@ async fn dispatch_tx_confirmation(
block_height = conf.block_height,
"TX confirmation update received"
);
store_typed_message(state, sender_contact_id, sender_name, &status_text, "tx_confirmation").await;
let json = payload_to_json(&conf);
store_typed_message(state, sender_contact_id, sender_name, &status_text, "tx_confirmation", json).await;
// Store confirmation for frontend polling
if let Some(ref tracker) = state.relay_tracker {
tracker.store_result(super::super::bitcoin_relay::RelayResult {

View File

@ -29,6 +29,17 @@ const SYNC_INTERVAL: Duration = Duration::from_secs(10);
/// Maximum stored messages (circular buffer).
const MAX_MESSAGES: usize = 100;
/// Check if two ISO8601 timestamps are within N seconds of each other.
fn within_seconds_iso(ts1: &str, ts2: &str, secs: i64) -> bool {
use chrono::DateTime;
let a = DateTime::parse_from_rfc3339(ts1).ok();
let b = DateTime::parse_from_rfc3339(ts2).ok();
match (a, b) {
(Some(a), Some(b)) => (a - b).num_seconds().unsigned_abs() < secs as u64,
_ => false,
}
}
/// Initial delay before reconnection attempt after device disconnect.
const RECONNECT_DELAY_INIT: Duration = Duration::from_secs(5);
@ -129,6 +140,16 @@ impl MeshState {
pub async fn store_message(&self, msg: MeshMessage) {
let mut messages = self.messages.write().await;
// Deduplicate: skip if we already have a message with the same text,
// peer, and timestamp within 30 seconds (prevents echo-back doubles)
let dominated = messages.iter().rev().take(20).any(|m| {
m.peer_contact_id == msg.peer_contact_id
&& m.plaintext == msg.plaintext
&& within_seconds_iso(&m.timestamp, &msg.timestamp, 30)
});
if dominated {
return;
}
messages.push_back(msg);
if messages.len() > MAX_MESSAGES {
messages.pop_front();

View File

@ -432,16 +432,8 @@ impl MeshService {
messages.iter().skip(skip).cloned().collect()
}
/// Send a message to a peer by contact_id.
/// Routes through the background listener which owns the serial port.
pub async fn send_message(&self, contact_id: u32, text: &str) -> Result<MeshMessage> {
let status = self.state.status.read().await;
if !status.device_connected {
anyhow::bail!("No mesh device connected");
}
drop(status);
// Look up the peer's public key to get the 6-byte prefix for addressing
/// Resolve a peer's 6-byte public-key prefix for mesh addressing.
async fn peer_dest_prefix(&self, contact_id: u32) -> Result<[u8; 6]> {
let peers = self.state.peers.read().await;
let peer = peers
.get(&contact_id)
@ -457,10 +449,17 @@ impl MeshService {
}
let mut dest_prefix = [0u8; 6];
dest_prefix.copy_from_slice(&pubkey_bytes[..6]);
drop(peers);
Ok(dest_prefix)
}
let payload = text.as_bytes().to_vec();
let encrypted = false;
/// Send raw wire payload bytes to a peer (no Sent-record bookkeeping).
/// Callers are responsible for storing the MeshMessage record afterwards.
async fn send_raw_payload(&self, contact_id: u32, payload: Vec<u8>) -> Result<()> {
let status = self.state.status.read().await;
if !status.device_connected {
anyhow::bail!("No mesh device connected");
}
drop(status);
if payload.len() > protocol::MAX_MESSAGE_LEN {
anyhow::bail!(
@ -470,7 +469,8 @@ impl MeshService {
);
}
// Send through the listener's command channel
let dest_prefix = self.peer_dest_prefix(contact_id).await?;
self.state
.cmd_tx
.send(listener::MeshCommand::SendText {
@ -479,6 +479,90 @@ impl MeshService {
})
.await
.map_err(|_| anyhow::anyhow!("Mesh listener not running"))?;
Ok(())
}
/// Send a typed envelope wire payload and record a rich Sent MeshMessage.
/// Used by RPC handlers that transmit invoice/coordinate/alert/etc. so the
/// UI sees a proper rich Sent card instead of garbage wire-byte plaintext.
pub async fn send_typed_wire(
&self,
contact_id: u32,
wire: Vec<u8>,
type_label: &str,
display_text: &str,
typed_payload: Option<serde_json::Value>,
) -> Result<MeshMessage> {
self.send_raw_payload(contact_id, wire).await?;
Ok(self
.record_sent_typed(contact_id, type_label, display_text, typed_payload)
.await)
}
/// Broadcast a typed envelope wire payload on a mesh channel and record a
/// rich Sent MeshMessage. Bytes are sent directly — do NOT utf8_lossy-encode
/// binary envelope bytes before handing them here.
pub async fn send_channel_typed_wire(
&self,
channel: u8,
wire: Vec<u8>,
type_label: &str,
display_text: &str,
typed_payload: Option<serde_json::Value>,
) -> Result<MeshMessage> {
let status = self.state.status.read().await;
if !status.device_connected {
anyhow::bail!("No mesh device connected");
}
drop(status);
if wire.len() > protocol::MAX_MESSAGE_LEN {
anyhow::bail!(
"Message too large for LoRa: {} bytes (max {})",
wire.len(),
protocol::MAX_MESSAGE_LEN
);
}
self.state
.cmd_tx
.send(listener::MeshCommand::BroadcastChannel {
channel,
payload: wire,
})
.await
.map_err(|_| anyhow::anyhow!("Mesh listener not running"))?;
let chan_contact_id = u32::MAX - (channel as u32);
let chan_name = format!("Channel {}", channel);
let msg_id = self.state.next_id().await;
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Sent,
peer_contact_id: chan_contact_id,
peer_name: Some(chan_name),
plaintext: display_text.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: false,
encrypted: false,
message_type: type_label.to_string(),
typed_payload,
};
self.state.store_message(msg.clone()).await;
{
let mut status = self.state.status.write().await;
status.messages_sent += 1;
}
Ok(msg)
}
/// Send a message to a peer by contact_id.
/// Routes through the background listener which owns the serial port.
pub async fn send_message(&self, contact_id: u32, text: &str) -> Result<MeshMessage> {
let payload = text.as_bytes().to_vec();
let encrypted = false;
self.send_raw_payload(contact_id, payload).await?;
let msg_id = self.state.next_id().await;
let peer_name = self
@ -498,6 +582,8 @@ impl MeshService {
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: false,
encrypted,
message_type: "text".to_string(),
typed_payload: None,
};
self.state.store_message(msg.clone()).await;
@ -509,6 +595,45 @@ impl MeshService {
Ok(msg)
}
/// Record a Sent MeshMessage for a typed envelope that has already been
/// transmitted by the caller. Used by the RPC layer after sending
/// invoice/coordinate/alert/etc. so the UI gets a proper rich Sent card
/// instead of a Sent record containing the raw wire bytes as plaintext.
pub async fn record_sent_typed(
&self,
contact_id: u32,
type_label: &str,
display_text: &str,
typed_payload: Option<serde_json::Value>,
) -> MeshMessage {
let msg_id = self.state.next_id().await;
let peer_name = self
.state
.peers
.read()
.await
.get(&contact_id)
.map(|p| p.advert_name.clone());
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Sent,
peer_contact_id: contact_id,
peer_name,
plaintext: display_text.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: false,
encrypted: false,
message_type: type_label.to_string(),
typed_payload,
};
self.state.store_message(msg.clone()).await;
{
let mut status = self.state.status.write().await;
status.messages_sent += 1;
}
msg
}
/// Send a message to a mesh channel (broadcast).
/// Routes through the background listener which owns the serial port.
pub async fn send_channel_message(&self, channel: u8, text: &str) -> Result<MeshMessage> {
@ -551,6 +676,8 @@ impl MeshService {
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: false,
encrypted: false,
message_type: "text".to_string(),
typed_payload: None,
};
self.state.store_message(msg.clone()).await;

View File

@ -64,13 +64,24 @@ pub struct MeshMessage {
pub peer_contact_id: u32,
/// Peer name (for display).
pub peer_name: Option<String>,
/// Decrypted plaintext content.
/// Human-readable rendering — for text messages this is the raw text,
/// for typed messages a short summary used as a fallback in lists.
pub plaintext: String,
pub timestamp: String,
/// Whether delivery was confirmed via ACK.
pub delivered: bool,
/// Whether the message was end-to-end encrypted.
pub encrypted: bool,
/// Typed-envelope label ("text", "invoice", "alert", "coordinate", ...).
#[serde(default = "default_message_type")]
pub message_type: String,
/// Structured payload as JSON — populated for non-text typed messages.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub typed_payload: Option<serde_json::Value>,
}
fn default_message_type() -> String {
"text".to_string()
}
/// Overall mesh subsystem status.

View File

@ -112,6 +112,7 @@ pub fn store_sent(message: &str) {
guard.messages.push(IncomingMessage {
from_pubkey: "me".to_string(),
from_onion: None,
from_name: None,
message: message.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
direction: "sent".to_string(),