feat(mesh): MessageKey + Reply/Reaction variants and sender seq (Phase 2a)

Per-target outbound seq counter on MeshState allocates a monotonic seq
before each typed envelope is encoded; send_typed_wire +
send_channel_typed_wire record it (alongside our own pubkey_hex) on the
Sent MeshMessage so the local store carries the same MessageKey the
receiver will see. TypedEnvelope.with_seq lets the RPC layer stamp the
seq AFTER signing (signature covers t/v/ts only).

New MessageKey struct pairs sender_pubkey+sender_seq as the stable
cross-transport identity. Adds variants 13 Reply and 14 Reaction with
ReplyPayload {target, text} and ReactionPayload {target, emoji}, plus
mesh.send-reply / mesh.send-reaction RPCs and receive-side dispatch
arms that store the payload json for the UI to index.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dorian 2026-04-13 13:19:30 -04:00
parent a530a906b8
commit 4991c213ae
6 changed files with 235 additions and 33 deletions

View File

@ -297,6 +297,8 @@ impl RpcHandler {
"mesh.send-alert" => self.handle_mesh_send_alert(params).await,
"mesh.send-content" => self.handle_mesh_send_content(params).await,
"mesh.fetch-content" => self.handle_mesh_fetch_content(params).await,
"mesh.send-reply" => self.handle_mesh_send_reply(params).await,
"mesh.send-reaction" => self.handle_mesh_send_reaction(params).await,
"mesh.outbox" => self.handle_mesh_outbox(params).await,
"mesh.session-status" => self.handle_mesh_session_status(params).await,
"mesh.rotate-prekeys" => self.handle_mesh_rotate_prekeys().await,

View File

@ -1,8 +1,8 @@
use super::super::RpcHandler;
use crate::blobs::DEFAULT_CAP_TTL_SECS;
use crate::mesh::message_types::{
self, AlertPayload, AlertType, ContentRefPayload, Coordinate, InvoicePayload, MeshMessageType,
TypedEnvelope,
self, AlertPayload, AlertType, ContentRefPayload, Coordinate, InvoicePayload, MessageKey,
MeshMessageType, ReactionPayload, ReplyPayload, TypedEnvelope,
};
use anyhow::Result;
use tracing::info;
@ -30,16 +30,17 @@ impl RpcHandler {
payment_hash: None,
};
let payload = message_types::encode_payload(&invoice)?;
let envelope = TypedEnvelope::new(MeshMessageType::Invoice, payload);
let wire = envelope.to_wire()?;
// Send via mesh
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let seq = svc.next_send_seq(contact_id).await;
let payload = message_types::encode_payload(&invoice)?;
let envelope = TypedEnvelope::new(MeshMessageType::Invoice, payload).with_seq(seq);
let wire = envelope.to_wire()?;
let display = format!(
"Invoice: {} sats{}",
amount_sats,
@ -47,7 +48,7 @@ impl RpcHandler {
);
let typed_json = serde_json::to_value(&invoice).ok();
let msg = svc
.send_typed_wire(contact_id, wire, "invoice", &display, typed_json)
.send_typed_wire(contact_id, wire, "invoice", &display, typed_json, seq)
.await?;
info!(contact_id, amount_sats, "Sent invoice over mesh");
@ -77,15 +78,17 @@ impl RpcHandler {
let label = params["label"].as_str().map(|s| s.to_string());
let coord = Coordinate::from_degrees(lat, lng, label);
let payload = message_types::encode_payload(&coord)?;
let envelope = TypedEnvelope::new(MeshMessageType::Coordinate, payload);
let wire = envelope.to_wire()?;
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let seq = svc.next_send_seq(contact_id).await;
let payload = message_types::encode_payload(&coord)?;
let envelope = TypedEnvelope::new(MeshMessageType::Coordinate, payload).with_seq(seq);
let wire = envelope.to_wire()?;
let display = format!(
"Location: {:.6}, {:.6}{}",
coord.lat_degrees(),
@ -94,7 +97,7 @@ impl RpcHandler {
);
let typed_json = serde_json::to_value(&coord).ok();
let msg = svc
.send_typed_wire(contact_id, wire, "coordinate", &display, typed_json)
.send_typed_wire(contact_id, wire, "coordinate", &display, typed_json, seq)
.await?;
info!(contact_id, "Sent coordinate over mesh");
@ -149,7 +152,7 @@ impl RpcHandler {
let identity_dir = self.config.data_dir.join("identity");
let node_key_path = identity_dir.join("node_key");
let envelope = if node_key_path.exists() {
let unsigned_envelope = if node_key_path.exists() {
let key_bytes = tokio::fs::read(&node_key_path).await?;
if key_bytes.len() == 32 {
let mut seed = [0u8; 32];
@ -163,8 +166,6 @@ impl RpcHandler {
TypedEnvelope::new(MeshMessageType::Alert, payload)
};
let wire = envelope.to_wire()?;
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
@ -172,21 +173,23 @@ impl RpcHandler {
let display = alert.message.clone();
let typed_json = serde_json::to_value(&alert).ok();
let signed = unsigned_envelope.sig.is_some();
if broadcast {
// Send on public channel (all peers) as raw bytes so the binary
// envelope is not corrupted by utf8 conversion.
svc.send_channel_typed_wire(0, wire, "alert", &display, typed_json.clone())
// Channel 0 uses target=0 for seq allocation. Signature covers
// (t, v, ts) — setting seq afterwards does NOT invalidate it.
let seq = svc.next_send_seq(0).await;
let envelope = unsigned_envelope.with_seq(seq);
let wire = envelope.to_wire()?;
svc.send_channel_typed_wire(0, wire, "alert", &display, typed_json.clone(), seq)
.await?;
info!(alert_type = alert_type_str, "Broadcast alert over mesh");
} else if let Some(contact_id) = params["contact_id"].as_u64() {
svc.send_typed_wire(
contact_id as u32,
wire,
"alert",
&display,
typed_json,
)
.await?;
let contact_id = contact_id as u32;
let seq = svc.next_send_seq(contact_id).await;
let envelope = unsigned_envelope.with_seq(seq);
let wire = envelope.to_wire()?;
svc.send_typed_wire(contact_id, wire, "alert", &display, typed_json, seq)
.await?;
info!(contact_id, alert_type = alert_type_str, "Sent alert to peer");
} else {
anyhow::bail!("Must specify contact_id or broadcast: true");
@ -195,7 +198,7 @@ impl RpcHandler {
Ok(serde_json::json!({
"sent": true,
"alert_type": alert_type_str,
"signed": envelope.sig.is_some(),
"signed": signed,
}))
}
@ -271,8 +274,9 @@ impl RpcHandler {
cap_exp: exp,
};
let seq = svc.next_send_seq(contact_id).await;
let payload = message_types::encode_payload(&content)?;
let envelope = TypedEnvelope::new(MeshMessageType::ContentRef, payload);
let envelope = TypedEnvelope::new(MeshMessageType::ContentRef, payload).with_seq(seq);
let wire = envelope.to_wire()?;
let display = match (&content.filename, &content.caption) {
@ -283,7 +287,7 @@ impl RpcHandler {
};
let typed_json = serde_json::to_value(&content).ok();
let msg = svc
.send_typed_wire(contact_id, wire, "content_ref", &display, typed_json)
.send_typed_wire(contact_id, wire, "content_ref", &display, typed_json, seq)
.await?;
info!(contact_id, cid = %cid, size = meta.size, "Sent content_ref over mesh");
@ -295,6 +299,101 @@ impl RpcHandler {
}))
}
/// mesh.send-reply — Send a text reply targeted at an earlier message.
/// Params: { contact_id, target_pubkey, target_seq, text }. The target
/// MessageKey identifies the message being replied to; it does NOT need
/// to be our peer — cross-transport replies to anyone work.
pub(in crate::api::rpc) async fn handle_mesh_send_reply(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let contact_id = params["contact_id"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32;
let target_pubkey = params["target_pubkey"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing target_pubkey"))?
.to_string();
let target_seq = params["target_seq"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing target_seq"))?;
let text = params["text"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing text"))?
.to_string();
let reply = ReplyPayload {
target: MessageKey { sender_pubkey: target_pubkey, sender_seq: target_seq },
text: text.clone(),
};
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let seq = svc.next_send_seq(contact_id).await;
let payload = message_types::encode_payload(&reply)?;
let envelope = TypedEnvelope::new(MeshMessageType::Reply, payload).with_seq(seq);
let wire = envelope.to_wire()?;
let typed_json = serde_json::to_value(&reply).ok();
let msg = svc
.send_typed_wire(contact_id, wire, "reply", &text, typed_json, seq)
.await?;
info!(contact_id, seq, "Sent reply over mesh");
Ok(serde_json::json!({
"sent": true,
"message_id": msg.id,
"sender_seq": seq,
}))
}
/// mesh.send-reaction — Emoji reaction on an earlier message.
/// Params: { contact_id, target_pubkey, target_seq, emoji }. An empty
/// emoji string clears any existing reaction from us on that target.
pub(in crate::api::rpc) async fn handle_mesh_send_reaction(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let contact_id = params["contact_id"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32;
let target_pubkey = params["target_pubkey"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing target_pubkey"))?
.to_string();
let target_seq = params["target_seq"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing target_seq"))?;
let emoji = params["emoji"].as_str().unwrap_or("").to_string();
let reaction = ReactionPayload {
target: MessageKey { sender_pubkey: target_pubkey, sender_seq: target_seq },
emoji: emoji.clone(),
};
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let seq = svc.next_send_seq(contact_id).await;
let payload = message_types::encode_payload(&reaction)?;
let envelope = TypedEnvelope::new(MeshMessageType::Reaction, payload).with_seq(seq);
let wire = envelope.to_wire()?;
let display = if emoji.is_empty() { "(cleared)".to_string() } else { emoji.clone() };
let typed_json = serde_json::to_value(&reaction).ok();
let msg = svc
.send_typed_wire(contact_id, wire, "reaction", &display, typed_json, seq)
.await?;
info!(contact_id, seq, emoji = %emoji, "Sent reaction over mesh");
Ok(serde_json::json!({
"sent": true,
"message_id": msg.id,
"sender_seq": seq,
}))
}
/// mesh.fetch-content — Fetch a ContentRef blob from the sender's onion and
/// persist it to our local blob store. Params must include everything the
/// receiver needs to construct and authorise the URL: `{ cid, sender_onion,

View File

@ -277,6 +277,31 @@ pub(super) async fn handle_typed_message(
dispatch_tx_confirmation(&envelope, sender_contact_id, sender_name, state).await;
}
Some(MeshMessageType::Reply) => {
match message_types::decode_payload::<message_types::ReplyPayload>(&envelope.v) {
Ok(reply) => {
let json = payload_to_json(&reply);
store_typed_message(state, sender_contact_id, sender_name, &reply.text, "reply", json, Some(envelope.seq)).await;
}
Err(e) => warn!("Failed to decode reply payload: {}", e),
}
}
Some(MeshMessageType::Reaction) => {
match message_types::decode_payload::<message_types::ReactionPayload>(&envelope.v) {
Ok(reaction) => {
let display = if reaction.emoji.is_empty() {
"(reaction cleared)".to_string()
} else {
reaction.emoji.clone()
};
let json = payload_to_json(&reaction);
store_typed_message(state, sender_contact_id, sender_name, &display, "reaction", json, Some(envelope.seq)).await;
}
Err(e) => warn!("Failed to decode reaction payload: {}", e),
}
}
Some(MeshMessageType::ContentRef) => {
match message_types::decode_payload::<message_types::ContentRefPayload>(&envelope.v) {
Ok(content) => {

View File

@ -69,6 +69,11 @@ pub struct MeshState {
pub event_tx: broadcast::Sender<MeshEvent>,
pub cmd_tx: mpsc::Sender<MeshCommand>,
next_message_id: RwLock<u64>,
/// Per-contact outbound sequence counter. Increments on every typed
/// envelope we send to a given peer so the receiver (and anyone else
/// forwarding/reacting) can build a stable MessageKey = (our_pubkey, seq).
/// Channel broadcasts use contact_id = 0 as a shared counter.
next_send_seq: RwLock<HashMap<u32, u64>>,
/// Block header cache — populated when receiving headers from internet-connected peers.
pub block_header_cache: Arc<super::bitcoin_relay::BlockHeaderCache>,
/// Relay tracker — stores completed relay results for frontend polling.
@ -121,6 +126,7 @@ impl MeshState {
}),
event_tx: tx,
next_message_id: RwLock::new(1),
next_send_seq: RwLock::new(HashMap::new()),
block_header_cache,
relay_tracker,
stego_mode,
@ -138,6 +144,16 @@ impl MeshState {
current
}
/// Allocate the next outbound sequence number for a given target
/// (contact_id for direct peers, 0 for channel broadcasts). Monotonic
/// per target, starts at 1.
pub async fn next_send_seq(&self, target: u32) -> u64 {
let mut map = self.next_send_seq.write().await;
let slot = map.entry(target).or_insert(0);
*slot += 1;
*slot
}
pub async fn store_message(&self, msg: MeshMessage) {
let mut messages = self.messages.write().await;
// Deduplicate: skip if we already have a message with the same text,

View File

@ -42,6 +42,10 @@ pub enum MeshMessageType {
LightningRelayResponse = 11,
/// Confirmation update for a relayed transaction (1, 2, 3 confs).
TxConfirmation = 12,
/// Reply to an earlier message, targeted by MessageKey.
Reply = 13,
/// Emoji reaction on an earlier message, targeted by MessageKey.
Reaction = 14,
/// Attachment/file reference: CID of a blob held by the sender, fetched out-of-band.
ContentRef = 19,
}
@ -62,6 +66,8 @@ impl MeshMessageType {
10 => Some(Self::LightningRelay),
11 => Some(Self::LightningRelayResponse),
12 => Some(Self::TxConfirmation),
13 => Some(Self::Reply),
14 => Some(Self::Reaction),
19 => Some(Self::ContentRef),
_ => None,
}
@ -82,11 +88,24 @@ impl MeshMessageType {
Self::LightningRelay => "lightning_relay",
Self::LightningRelayResponse => "lightning_relay_response",
Self::TxConfirmation => "tx_confirmation",
Self::Reply => "reply",
Self::Reaction => "reaction",
Self::ContentRef => "content_ref",
}
}
}
/// Cross-transport stable identity for any mesh message. `sender_pubkey` is
/// the hex Ed25519 key of the originator; `sender_seq` is that sender's
/// per-peer (or per-channel) monotonic counter at send time. Receivers use
/// this pair — not the local u64 `id` — when addressing replies, reactions,
/// reads, edits, or deletes.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MessageKey {
pub sender_pubkey: String,
pub sender_seq: u64,
}
// ─── Wire Envelope ──────────────────────────────────────────────────────
/// CBOR wire envelope wrapping any typed message.
@ -166,6 +185,14 @@ impl TypedEnvelope {
MeshMessageType::from_u8(self.t)
}
/// Set the outbound sequence number. Called by the send path after the
/// target's counter has been incremented. Safe to call AFTER `new_signed`
/// because the signature covers `(t, v, ts)` — not `seq`.
pub fn with_seq(mut self, seq: u64) -> Self {
self.seq = seq;
self
}
/// Encode to wire format: [0x02] [CBOR envelope].
pub fn to_wire(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
@ -320,6 +347,24 @@ pub struct LightningRelayResponsePayload {
pub error: Option<String>,
}
/// Reply to an earlier message. `target` is the MessageKey the reply points
/// at; `text` is the reply body. Receivers render this as a "replying to"
/// card above the text.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplyPayload {
pub target: MessageKey,
pub text: String,
}
/// Emoji reaction on an earlier message. `target` identifies the reacted-to
/// message; `emoji` is a short unicode string (typically 14 code points).
/// An empty emoji is treated as "clear my reaction" on the target.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReactionPayload {
pub target: MessageKey,
pub emoji: String,
}
/// Content/attachment reference: points at a blob held by the sender that
/// recipients fetch out-of-band via `GET {sender_onion}/blob/{cid}?cap=..&exp=..&peer=..`.
/// Thumb bytes (≤60B) may be inlined for immediate display; full blob is lazy.

View File

@ -507,6 +507,11 @@ impl MeshService {
/// Send a typed envelope wire payload and record a rich Sent MeshMessage.
/// Used by RPC handlers that transmit invoice/coordinate/alert/etc. so the
/// UI sees a proper rich Sent card instead of garbage wire-byte plaintext.
///
/// `sender_seq` should match the seq encoded into the TypedEnvelope by the
/// caller — the RPC handler allocates it via `next_send_seq` before
/// building the envelope and threads it through here so the local Sent
/// record carries the same MessageKey the receiver will see.
pub async fn send_typed_wire(
&self,
contact_id: u32,
@ -514,13 +519,21 @@ impl MeshService {
type_label: &str,
display_text: &str,
typed_payload: Option<serde_json::Value>,
sender_seq: u64,
) -> Result<MeshMessage> {
self.send_raw_payload(contact_id, wire).await?;
Ok(self
.record_sent_typed(contact_id, type_label, display_text, typed_payload)
.record_sent_typed(contact_id, type_label, display_text, typed_payload, sender_seq)
.await)
}
/// Allocate the next outbound seq for a target. Convenience passthrough
/// to MeshState::next_send_seq; used by RPC handlers before encoding a
/// TypedEnvelope so the seq on the wire matches the Sent record.
pub async fn next_send_seq(&self, target: u32) -> u64 {
self.state.next_send_seq(target).await
}
/// Broadcast a typed envelope wire payload on a mesh channel and record a
/// rich Sent MeshMessage. Bytes are sent directly — do NOT utf8_lossy-encode
/// binary envelope bytes before handing them here.
@ -531,6 +544,7 @@ impl MeshService {
type_label: &str,
display_text: &str,
typed_payload: Option<serde_json::Value>,
sender_seq: u64,
) -> Result<MeshMessage> {
let status = self.state.status.read().await;
if !status.device_connected {
@ -569,8 +583,8 @@ impl MeshService {
encrypted: false,
message_type: type_label.to_string(),
typed_payload,
sender_pubkey: None,
sender_seq: None,
sender_pubkey: Some(self.our_ed_pubkey_hex.clone()),
sender_seq: Some(sender_seq),
};
self.state.store_message(msg.clone()).await;
{
@ -631,6 +645,7 @@ impl MeshService {
type_label: &str,
display_text: &str,
typed_payload: Option<serde_json::Value>,
sender_seq: u64,
) -> MeshMessage {
let msg_id = self.state.next_id().await;
let peer_name = self
@ -651,8 +666,8 @@ impl MeshService {
encrypted: false,
message_type: type_label.to_string(),
typed_payload,
sender_pubkey: None,
sender_seq: None,
sender_pubkey: Some(self.our_ed_pubkey_hex.clone()),
sender_seq: Some(sender_seq),
};
self.state.store_message(msg.clone()).await;
{