From 390ceaa75d6f51379e50c291ad9139c89bfb3a93 Mon Sep 17 00:00:00 2001 From: Dorian Date: Mon, 13 Apr 2026 08:18:01 -0400 Subject: [PATCH] feat(mesh): MessageKey foundation and debug-dump RPC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds sender_pubkey + sender_seq fields to MeshMessage so received messages carry a stable cross-transport identity: (sender_pubkey, sender_seq) pair. This is the foundation for the upcoming reply, reaction, edit, and read-receipt variants — they need to target a message by an ID that is meaningful on every node, not just locally. Receive-side population lives in dispatch.rs::store_typed_message, which now looks up the peer's pubkey_hex and copies envelope.seq from the decoded TypedEnvelope. Sent-side population will land when we plumb a per-node monotonic seq counter through the RPC layer. Also adds mesh.debug-dump: a full in-memory state snapshot returning peers, messages, status, shared-secret peer ids, encrypt_relay flag, and stego mode — intended for smoke tests and bug investigation. Co-Authored-By: Claude Opus 4.6 (1M context) --- core/archipelago/src/api/rpc/dispatcher.rs | 1 + core/archipelago/src/api/rpc/mesh/status.rs | 12 ++++++++ core/archipelago/src/mesh/listener/decode.rs | 4 +++ .../archipelago/src/mesh/listener/dispatch.rs | 25 ++++++++++++---- core/archipelago/src/mesh/mod.rs | 30 +++++++++++++++++++ core/archipelago/src/mesh/types.rs | 12 ++++++++ 6 files changed, 79 insertions(+), 5 deletions(-) diff --git a/core/archipelago/src/api/rpc/dispatcher.rs b/core/archipelago/src/api/rpc/dispatcher.rs index ff53d22b..e14c9aac 100644 --- a/core/archipelago/src/api/rpc/dispatcher.rs +++ b/core/archipelago/src/api/rpc/dispatcher.rs @@ -287,6 +287,7 @@ impl RpcHandler { "mesh.status" => self.handle_mesh_status().await, "mesh.peers" => self.handle_mesh_peers().await, "mesh.messages" => self.handle_mesh_messages(params).await, + "mesh.debug-dump" => self.handle_mesh_debug_dump().await, "mesh.send" => self.handle_mesh_send(params).await, "mesh.send-channel" => self.handle_mesh_send_channel(params).await, "mesh.broadcast" => self.handle_mesh_broadcast().await, diff --git a/core/archipelago/src/api/rpc/mesh/status.rs b/core/archipelago/src/api/rpc/mesh/status.rs index 06d2764e..57925479 100644 --- a/core/archipelago/src/api/rpc/mesh/status.rs +++ b/core/archipelago/src/api/rpc/mesh/status.rs @@ -70,6 +70,18 @@ impl RpcHandler { } } + /// mesh.debug-dump — Full in-memory state snapshot for debugging. + /// Returns peers, all messages, status, shared-secret peer ids, encrypt_relay + /// flag, and stego mode. Intended for smoke tests and bug investigation. + pub(in crate::api::rpc) async fn handle_mesh_debug_dump(&self) -> Result { + let service = self.mesh_service.read().await; + if let Some(svc) = service.as_ref() { + Ok(svc.debug_dump().await) + } else { + Ok(serde_json::json!({ "running": false })) + } + } + /// mesh.session-status — Get ratchet session info for a peer. pub(in crate::api::rpc) async fn handle_mesh_session_status( &self, diff --git a/core/archipelago/src/mesh/listener/decode.rs b/core/archipelago/src/mesh/listener/decode.rs index bbd88c0d..e74041bd 100644 --- a/core/archipelago/src/mesh/listener/decode.rs +++ b/core/archipelago/src/mesh/listener/decode.rs @@ -278,6 +278,8 @@ pub(super) async fn store_plain_message( encrypted: false, message_type: "text".to_string(), typed_payload: None, + sender_pubkey: None, + sender_seq: None, }; state.store_message(msg.clone()).await; state.status.write().await.messages_received += 1; @@ -442,6 +444,8 @@ pub(super) async fn handle_received_message( encrypted, message_type: "text".to_string(), typed_payload: None, + sender_pubkey: None, + sender_seq: None, }; state.store_message(msg.clone()).await; diff --git a/core/archipelago/src/mesh/listener/dispatch.rs b/core/archipelago/src/mesh/listener/dispatch.rs index c5bb3516..cccd3fa8 100644 --- a/core/archipelago/src/mesh/listener/dispatch.rs +++ b/core/archipelago/src/mesh/listener/dispatch.rs @@ -16,8 +16,17 @@ async fn store_typed_message( display_text: &str, type_label: &str, typed_payload: Option, + sender_seq: Option, ) { let msg_id = state.next_id().await; + // Populate stable MessageKey components from the peer record so the UI + // can dedupe and target replies/reactions across transports. + let sender_pubkey = state + .peers + .read() + .await + .get(&contact_id) + .and_then(|p| p.pubkey_hex.clone()); let msg = MeshMessage { id: msg_id, direction: MessageDirection::Received, @@ -29,6 +38,8 @@ async fn store_typed_message( encrypted: false, message_type: type_label.to_string(), typed_payload, + sender_pubkey, + sender_seq, }; state.store_message(msg.clone()).await; state.status.write().await.messages_received += 1; @@ -122,6 +133,7 @@ pub(super) async fn handle_typed_message( &alert.message, "alert", json, + Some(envelope.seq), ) .await; let _ = state.event_tx.send(MeshEvent::AlertReceived { @@ -155,6 +167,7 @@ pub(super) async fn handle_typed_message( &format!("TX relay request #{} ({} hex chars)", relay.request_id, relay.tx_hex.len()), "tx_relay", json, + Some(envelope.seq), ) .await; @@ -191,6 +204,7 @@ pub(super) async fn handle_typed_message( &format!("Lightning relay: {} sats", relay.amount_sats), "lightning_relay", json, + Some(envelope.seq), ) .await; // Will be wired to LND in Week 9 @@ -217,7 +231,7 @@ pub(super) async fn handle_typed_message( format!("Lightning failed: {}", resp.error.as_deref().unwrap_or("unknown")) }; let json = payload_to_json(&resp); - store_typed_message(state, sender_contact_id, sender_name, &text, "lightning_relay_response", json).await; + store_typed_message(state, sender_contact_id, sender_name, &text, "lightning_relay_response", json, Some(envelope.seq)).await; let _ = state.event_tx.send(MeshEvent::LightningRelayCompleted { request_id: resp.request_id, payment_hash: resp.payment_hash, @@ -237,7 +251,7 @@ pub(super) async fn handle_typed_message( invoice.memo.as_ref().map(|m| format!(" — {}", m)).unwrap_or_default() ); let json = payload_to_json(&invoice); - store_typed_message(state, sender_contact_id, sender_name, &text, "invoice", json).await; + store_typed_message(state, sender_contact_id, sender_name, &text, "invoice", json, Some(envelope.seq)).await; } Err(e) => warn!("Failed to decode invoice payload: {}", e), } @@ -253,7 +267,7 @@ pub(super) async fn handle_typed_message( coord.label.as_ref().map(|l| format!(" ({})", l)).unwrap_or_default() ); let json = payload_to_json(&coord); - store_typed_message(state, sender_contact_id, sender_name, &text, "coordinate", json).await; + store_typed_message(state, sender_contact_id, sender_name, &text, "coordinate", json, Some(envelope.seq)).await; } Err(e) => warn!("Failed to decode coordinate payload: {}", e), } @@ -325,6 +339,7 @@ async fn dispatch_block_header( &text, "block_header", json, + Some(envelope.seq), ) .await; let _ = state.event_tx.send(MeshEvent::BlockHeaderReceived { @@ -360,7 +375,7 @@ async fn dispatch_tx_relay_response( format!("TX relay failed: {}", resp.error.as_deref().unwrap_or("unknown")) }; let json = payload_to_json(&resp); - store_typed_message(state, sender_contact_id, sender_name, &text, "tx_relay_response", json).await; + store_typed_message(state, sender_contact_id, sender_name, &text, "tx_relay_response", json, Some(envelope.seq)).await; // Store result for frontend polling if let Some(ref tracker) = state.relay_tracker { tracker.store_result(super::super::bitcoin_relay::RelayResult { @@ -402,7 +417,7 @@ async fn dispatch_tx_confirmation( "TX confirmation update received" ); let json = payload_to_json(&conf); - store_typed_message(state, sender_contact_id, sender_name, &status_text, "tx_confirmation", json).await; + store_typed_message(state, sender_contact_id, sender_name, &status_text, "tx_confirmation", json, Some(envelope.seq)).await; // Store confirmation for frontend polling if let Some(ref tracker) = state.relay_tracker { tracker.store_result(super::super::bitcoin_relay::RelayResult { diff --git a/core/archipelago/src/mesh/mod.rs b/core/archipelago/src/mesh/mod.rs index f62afeca..e7c12849 100644 --- a/core/archipelago/src/mesh/mod.rs +++ b/core/archipelago/src/mesh/mod.rs @@ -432,6 +432,28 @@ impl MeshService { messages.iter().skip(skip).cloned().collect() } + /// Full in-memory state dump for debugging. Returns peers, all messages, + /// status, shared-secret peer ids (not the secrets), encrypt_relay flag, + /// and stego mode. Intended for development/smoke-test use only — don't + /// call this on a hot path. + pub async fn debug_dump(&self) -> serde_json::Value { + let status = self.state.status.read().await.clone(); + let peers: Vec<_> = self.state.peers.read().await.values().cloned().collect(); + let messages: Vec<_> = self.state.messages.read().await.iter().cloned().collect(); + let secret_peer_ids: Vec = + self.state.shared_secrets.read().await.keys().copied().collect(); + serde_json::json!({ + "status": status, + "peers": peers, + "peer_count": peers.len(), + "messages": messages, + "message_count": messages.len(), + "secret_peer_ids": secret_peer_ids, + "encrypt_relay": self.state.encrypt_relay, + "stego_mode": format!("{:?}", self.state.stego_mode), + }) + } + /// Resolve a peer's 6-byte public-key prefix for mesh addressing. async fn peer_dest_prefix(&self, contact_id: u32) -> Result<[u8; 6]> { let peers = self.state.peers.read().await; @@ -547,6 +569,8 @@ impl MeshService { encrypted: false, message_type: type_label.to_string(), typed_payload, + sender_pubkey: None, + sender_seq: None, }; self.state.store_message(msg.clone()).await; { @@ -584,6 +608,8 @@ impl MeshService { encrypted, message_type: "text".to_string(), typed_payload: None, + sender_pubkey: None, + sender_seq: None, }; self.state.store_message(msg.clone()).await; @@ -625,6 +651,8 @@ impl MeshService { encrypted: false, message_type: type_label.to_string(), typed_payload, + sender_pubkey: None, + sender_seq: None, }; self.state.store_message(msg.clone()).await; { @@ -678,6 +706,8 @@ impl MeshService { encrypted: false, message_type: "text".to_string(), typed_payload: None, + sender_pubkey: None, + sender_seq: None, }; self.state.store_message(msg.clone()).await; diff --git a/core/archipelago/src/mesh/types.rs b/core/archipelago/src/mesh/types.rs index 42868914..b45363f3 100644 --- a/core/archipelago/src/mesh/types.rs +++ b/core/archipelago/src/mesh/types.rs @@ -78,6 +78,18 @@ pub struct MeshMessage { /// Structured payload as JSON — populated for non-text typed messages. #[serde(default, skip_serializing_if = "Option::is_none")] pub typed_payload: Option, + /// Hex-encoded sender pubkey. On Received: the peer's mesh public key + /// (or first 6 bytes / full key as available). On Sent: None today, + /// populated later when we own the key. Combined with `sender_seq` + /// this forms a stable cross-transport MessageKey that reactions, + /// replies, edits, and read-receipts can reference without relying + /// on the local `id` field (which is only meaningful to one node). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub sender_pubkey: Option, + /// Per-sender monotonic sequence from the typed envelope. Paired with + /// `sender_pubkey` to form the stable MessageKey. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub sender_seq: Option, } fn default_message_type() -> String {