use super::RpcHandler; use crate::mesh; use crate::mesh::message_types::{ self, AlertPayload, AlertType, Coordinate, InvoicePayload, MeshMessageType, TypedEnvelope, }; use anyhow::Result; use tracing::info; impl RpcHandler { /// mesh.status — Get mesh radio status, device info, and peer count. pub(super) async fn handle_mesh_status(&self) -> Result { let service = self.mesh_service.read().await; if let Some(svc) = service.as_ref() { let status = svc.status().await; Ok(serde_json::to_value(status)?) } else { // No service running — return basic config + device detection let config = mesh::load_config(&self.config.data_dir).await?; let devices = mesh::detect_devices().await; Ok(serde_json::json!({ "enabled": config.enabled, "device_connected": false, "device_type": "unknown", "device_path": config.device_path, "channel_name": config.channel_name.unwrap_or_else(|| "archipelago".to_string()), "detected_devices": devices, "peer_count": 0, "messages_sent": 0, "messages_received": 0, })) } } /// mesh.peers — List discovered mesh peers. pub(super) async fn handle_mesh_peers(&self) -> Result { let service = self.mesh_service.read().await; if let Some(svc) = service.as_ref() { let peers = svc.peers().await; Ok(serde_json::json!({ "peers": peers, "count": peers.len(), })) } else { Ok(serde_json::json!({ "peers": [], "count": 0, })) } } /// mesh.messages — Get recent mesh message history. pub(super) async fn handle_mesh_messages( &self, params: Option, ) -> Result { let limit = params .as_ref() .and_then(|p| p.get("limit")) .and_then(|v| v.as_u64()) .map(|n| n as usize); let service = self.mesh_service.read().await; if let Some(svc) = service.as_ref() { let messages = svc.messages(limit).await; Ok(serde_json::json!({ "messages": messages, "count": messages.len(), })) } else { Ok(serde_json::json!({ "messages": [], "count": 0, })) } } /// mesh.send — Send an encrypted message to a mesh peer. pub(super) async fn handle_mesh_send( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let contact_id = params .get("contact_id") .and_then(|v| v.as_u64()) .ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32; let message = params .get("message") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing message"))?; if message.is_empty() { anyhow::bail!("Message cannot be empty"); } let service = self.mesh_service.read().await; let svc = service .as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running. Enable mesh first."))?; let msg = svc.send_message(contact_id, message).await?; info!(contact_id, encrypted = msg.encrypted, "Sent mesh message"); Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "encrypted": msg.encrypted, })) } /// mesh.broadcast — Broadcast our node identity over mesh. pub(super) async fn handle_mesh_broadcast(&self) -> Result { let service = self.mesh_service.read().await; let svc = service .as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running. Enable mesh first."))?; svc.broadcast_identity().await?; info!("Broadcast identity over mesh"); Ok(serde_json::json!({ "broadcast": true })) } /// mesh.configure — Enable/disable mesh and set device path. pub(super) async fn handle_mesh_configure( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let mut config = mesh::load_config(&self.config.data_dir).await?; if let Some(enabled) = params.get("enabled").and_then(|v| v.as_bool()) { config.enabled = enabled; } if let Some(device) = params.get("device_path").and_then(|v| v.as_str()) { config.device_path = Some(device.to_string()); } if let Some(channel) = params.get("channel_name").and_then(|v| v.as_str()) { config.channel_name = Some(channel.to_string()); } if let Some(broadcast) = params.get("broadcast_identity").and_then(|v| v.as_bool()) { config.broadcast_identity = broadcast; } if let Some(name) = params.get("advert_name").and_then(|v| v.as_str()) { config.advert_name = Some(name.to_string()); } mesh::save_config(&self.config.data_dir, &config).await?; // If we have a running service, update its config let mut service = self.mesh_service.write().await; if let Some(svc) = service.as_mut() { svc.configure(config.clone()).await?; } info!("Mesh config updated"); Ok(serde_json::json!({ "configured": true, "enabled": config.enabled, "device_path": config.device_path, })) } // ─── Phase 3: Typed Messages ──────────────────────────────────────── /// mesh.send-invoice — Create a Lightning invoice and send bolt11 to mesh peer. pub(super) async fn handle_mesh_send_invoice( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let contact_id = params["contact_id"] .as_u64() .ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32; let amount_sats = params["amount_sats"] .as_u64() .ok_or_else(|| anyhow::anyhow!("Missing amount_sats"))?; let memo = params["memo"].as_str().map(|s| s.to_string()); // Build invoice payload let invoice = InvoicePayload { bolt11: format!("lnbc{}n1pjmesh...", amount_sats), // Placeholder — real LND call in Phase 4 amount_sats, memo: memo.clone(), payment_hash: None, }; let payload = message_types::encode_payload(&invoice)?; let envelope = TypedEnvelope::new(MeshMessageType::Invoice, payload); let wire = envelope.to_wire()?; // Send via mesh let service = self.mesh_service.read().await; let svc = service .as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; let wire_str = String::from_utf8_lossy(&wire).to_string(); let msg = svc.send_message(contact_id, &wire_str).await?; info!(contact_id, amount_sats, "Sent invoice over mesh"); Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "amount_sats": amount_sats, "bolt11": invoice.bolt11, })) } /// mesh.send-coordinate — Send GPS coordinates to a mesh peer. pub(super) async fn handle_mesh_send_coordinate( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let contact_id = params["contact_id"] .as_u64() .ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32; let lat = params["lat"] .as_f64() .ok_or_else(|| anyhow::anyhow!("Missing lat"))?; let lng = params["lng"] .as_f64() .ok_or_else(|| anyhow::anyhow!("Missing lng"))?; let label = params["label"].as_str().map(|s| s.to_string()); let coord = Coordinate::from_degrees(lat, lng, label); let payload = message_types::encode_payload(&coord)?; let envelope = TypedEnvelope::new(MeshMessageType::Coordinate, payload); let wire = envelope.to_wire()?; let service = self.mesh_service.read().await; let svc = service .as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; let wire_str = String::from_utf8_lossy(&wire).to_string(); let msg = svc.send_message(contact_id, &wire_str).await?; info!(contact_id, "Sent coordinate over mesh"); Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "lat": coord.lat, "lng": coord.lng, })) } /// mesh.send-alert — Send a signed emergency alert over mesh. pub(super) async fn handle_mesh_send_alert( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let message = params["message"] .as_str() .ok_or_else(|| anyhow::anyhow!("Missing message"))?; let alert_type_str = params["alert_type"] .as_str() .unwrap_or("status"); let broadcast = params["broadcast"].as_bool().unwrap_or(false); let alert_type = match alert_type_str { "emergency" => AlertType::Emergency, "dead_man" => AlertType::DeadMan, _ => AlertType::Status, }; // Optional GPS let coordinate = if let (Some(lat), Some(lng)) = ( params["lat"].as_f64(), params["lng"].as_f64(), ) { Some(Coordinate::from_degrees(lat, lng, None)) } else { None }; let alert = AlertPayload { alert_type, message: message.to_string(), coordinate, }; let payload = message_types::encode_payload(&alert)?; // Sign the alert with node identity let (data, _) = self.state_manager.get_snapshot().await; let identity_dir = self.config.data_dir.join("identity"); let node_key_path = identity_dir.join("node_key"); let envelope = if node_key_path.exists() { let key_bytes = tokio::fs::read(&node_key_path).await?; if key_bytes.len() == 32 { let mut seed = [0u8; 32]; seed.copy_from_slice(&key_bytes); let signing_key = ed25519_dalek::SigningKey::from_bytes(&seed); TypedEnvelope::new_signed(MeshMessageType::Alert, payload, &signing_key) } else { TypedEnvelope::new(MeshMessageType::Alert, payload) } } else { TypedEnvelope::new(MeshMessageType::Alert, payload) }; let wire = envelope.to_wire()?; let service = self.mesh_service.read().await; let svc = service .as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; let wire_str = String::from_utf8_lossy(&wire).to_string(); if broadcast { // Send on channel (all peers) svc.send_message(0, &wire_str).await?; info!(alert_type = alert_type_str, "Broadcast alert over mesh"); } else if let Some(contact_id) = params["contact_id"].as_u64() { svc.send_message(contact_id as u32, &wire_str).await?; info!(contact_id, alert_type = alert_type_str, "Sent alert to peer"); } else { anyhow::bail!("Must specify contact_id or broadcast: true"); } Ok(serde_json::json!({ "sent": true, "alert_type": alert_type_str, "signed": envelope.sig.is_some(), })) } /// mesh.outbox — List pending store-and-forward messages. pub(super) async fn handle_mesh_outbox( &self, params: Option, ) -> Result { let limit = params .as_ref() .and_then(|p| p["limit"].as_u64()) .map(|n| n as usize); // Check if outbox file exists let outbox = mesh::outbox::MeshOutbox::load(&self.config.data_dir).await?; let messages = outbox.list(limit).await; let count = outbox.count().await; Ok(serde_json::json!({ "messages": messages.iter().map(|m| serde_json::json!({ "id": m.id, "dest_did": m.dest_did, "from_did": m.from_did, "created_at": m.created_at, "ttl_secs": m.ttl_secs, "retry_count": m.retry_count, "relay_hops": m.relay_hops, "expired": m.is_expired(), })).collect::>(), "count": count, })) } /// mesh.session-status — Get ratchet session info for a peer. pub(super) async fn handle_mesh_session_status( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let contact_id = params["contact_id"] .as_u64() .ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32; // Look up peer DID from mesh service let service = self.mesh_service.read().await; let peer_did = if let Some(svc) = service.as_ref() { let peers = svc.peers().await; peers.iter().find(|p| p.contact_id == contact_id).and_then(|p| p.did.clone()) } else { None }; if let Some(did) = peer_did { let session_mgr = mesh::session::SessionManager::new(&self.config.data_dir); if let Some(info) = session_mgr.session_info(&did).await { Ok(serde_json::json!({ "has_session": info.has_session, "forward_secrecy": info.forward_secrecy, "message_count": info.message_count, "ratchet_generation": info.ratchet_generation, "peer_did": did, })) } else { Ok(serde_json::json!({ "has_session": false, "forward_secrecy": false, "message_count": 0, "ratchet_generation": 0, "peer_did": did, })) } } else { Ok(serde_json::json!({ "has_session": false, "forward_secrecy": false, "message_count": 0, "ratchet_generation": 0, "peer_did": null, })) } } // ─── Phase 4: Off-Grid Bitcoin Operations ──────────────────────────── /// mesh.relay-tx — Send a raw transaction for relay by an internet-connected mesh peer. pub(super) async fn handle_mesh_relay_tx( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let tx_hex = params["tx_hex"] .as_str() .ok_or_else(|| anyhow::anyhow!("Missing tx_hex"))?; let relay_mode = params["relay_mode"] .as_str() .unwrap_or("archy"); if tx_hex.len() < 20 || tx_hex.len() > 200_000 { anyhow::bail!("Invalid tx_hex length"); } // Validate hex if hex::decode(tx_hex).is_err() { anyhow::bail!("tx_hex is not valid hexadecimal"); } let service = self.mesh_service.read().await; let svc = service.as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; let request_id = chrono::Utc::now().timestamp() as u64; svc.relay_tracker.track_tx_relay(request_id, svc.our_did()).await; let wire = crate::mesh::bitcoin_relay::build_tx_relay_request(tx_hex, request_id)?; let mut sent_count = 0u32; if relay_mode == "broadcast" { // Broadcast mode: send on channel 0 (all mesh nodes relay) // Still encrypted — only Archy nodes can decrypt and broadcast the TX let shared_state = svc.shared_state(); let shared_secrets = shared_state.shared_secrets.read().await; // Encrypt with first available Archy peer's shared secret // (any Archy node that receives it can try decrypting) let payload = shared_secrets.values().next() .and_then(|secret| { crate::mesh::crypto::encrypt(secret, &wire).ok().map(|ct| { let mut encrypted = Vec::with_capacity(1 + ct.len()); encrypted.push(crate::mesh::message_types::ENCRYPTED_TYPED_MARKER); encrypted.extend_from_slice(&ct); encrypted }) }) .unwrap_or_else(|| wire.clone()); drop(shared_secrets); { use base64::Engine; let b64 = base64::engine::general_purpose::STANDARD.encode(&payload); let _ = shared_state .cmd_tx .send(crate::mesh::listener::MeshCommand::BroadcastChannel { channel: 0, payload: b64.into_bytes(), }) .await; } sent_count = 1; info!(request_id, tx_len = tx_hex.len(), "TX relay broadcast on mesh channel 0 (encrypted)"); } else { // Archy mode: E2E encrypted per-peer, direct to known Archy nodes let peers = svc.peers().await; let shared_state = svc.shared_state(); let shared_secrets = shared_state.shared_secrets.read().await; for peer in &peers { if !peer.advert_name.starts_with("Archy-") { continue; } if let Some(ref pk) = peer.pubkey_hex { if let Ok(pk_bytes) = hex::decode(pk) { if pk_bytes.len() >= 6 { let mut prefix = [0u8; 6]; prefix.copy_from_slice(&pk_bytes[..6]); let payload = if let Some(secret) = shared_secrets.get(&peer.contact_id) { match crate::mesh::crypto::encrypt(secret, &wire) { Ok(ciphertext) => { let mut encrypted = Vec::with_capacity(1 + ciphertext.len()); encrypted.push(crate::mesh::message_types::ENCRYPTED_TYPED_MARKER); encrypted.extend_from_slice(&ciphertext); encrypted } Err(_) => wire.clone(), } } else { wire.clone() }; let _ = svc.shared_state() .cmd_tx .send(crate::mesh::listener::MeshCommand::SendRaw { dest_pubkey_prefix: prefix, payload, }) .await; sent_count += 1; } } } } drop(shared_secrets); info!(request_id, tx_len = tx_hex.len(), archy_peers = sent_count, "TX relay sent to Archy peers (E2E encrypted)"); } Ok(serde_json::json!({ "request_id": request_id, "queued": true, "tx_hex_len": tx_hex.len(), })) } /// mesh.relay-status — Check the status of a pending or completed TX relay. pub(super) async fn handle_mesh_relay_status( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let request_id = params["request_id"] .as_u64() .ok_or_else(|| anyhow::anyhow!("Missing request_id"))?; let service = self.mesh_service.read().await; let svc = service.as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; // Check completed results first if let Some(result) = svc.relay_tracker.get_result(request_id).await { return Ok(serde_json::json!({ "status": if result.txid.is_some() { "confirmed" } else { "failed" }, "request_id": result.request_id, "txid": result.txid, "error": result.error, "error_code": result.error_code, "completed_at": result.completed_at, })); } // Check if still pending if svc.relay_tracker.is_pending(request_id).await { return Ok(serde_json::json!({ "status": "pending", "request_id": request_id, })); } // Unknown — either expired or never existed Ok(serde_json::json!({ "status": "unknown", "request_id": request_id, })) } /// mesh.block-headers — Get cached block headers received from mesh peers. pub(super) async fn handle_mesh_block_headers( &self, params: Option, ) -> Result { let count = params .as_ref() .and_then(|p| p["count"].as_u64()) .unwrap_or(10) as usize; let service = self.mesh_service.read().await; let svc = service.as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; let headers = svc.block_header_cache.recent_headers(count).await; let latest = svc.block_header_cache.latest_height().await; Ok(serde_json::json!({ "headers": headers.iter().map(|h| serde_json::json!({ "height": h.height, "hash": h.hash, "prev_hash": h.prev_hash, "timestamp": h.timestamp, "announced_by": h.announced_by, })).collect::>(), "latest_height": latest, "count": headers.len(), })) } /// mesh.relay-lightning — Send a Lightning invoice for payment by an internet-connected peer. pub(super) async fn handle_mesh_relay_lightning( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let bolt11 = params["bolt11"] .as_str() .ok_or_else(|| anyhow::anyhow!("Missing bolt11"))?; let amount_sats = params["amount_sats"] .as_u64() .ok_or_else(|| anyhow::anyhow!("Missing amount_sats"))?; if !bolt11.starts_with("lnbc") && !bolt11.starts_with("lntb") { anyhow::bail!("Invalid bolt11 invoice — must start with lnbc or lntb"); } let service = self.mesh_service.read().await; let svc = service.as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; let request_id = chrono::Utc::now().timestamp() as u64; svc.relay_tracker.track_lightning_relay(request_id, svc.our_did()).await; let wire = crate::mesh::bitcoin_relay::build_lightning_relay_request( bolt11, amount_sats, request_id, )?; // Send to Archipelago peers — E2E encrypted per-peer let peers = svc.peers().await; let shared_state = svc.shared_state(); let shared_secrets = shared_state.shared_secrets.read().await; let mut sent_count = 0u32; for peer in &peers { if !peer.advert_name.starts_with("Archy-") { continue; } if let Some(ref pk) = peer.pubkey_hex { if let Ok(pk_bytes) = hex::decode(pk) { if pk_bytes.len() >= 6 { let mut prefix = [0u8; 6]; prefix.copy_from_slice(&pk_bytes[..6]); let payload = if let Some(secret) = shared_secrets.get(&peer.contact_id) { match crate::mesh::crypto::encrypt(secret, &wire) { Ok(ciphertext) => { let mut encrypted = Vec::with_capacity(1 + ciphertext.len()); encrypted.push(crate::mesh::message_types::ENCRYPTED_TYPED_MARKER); encrypted.extend_from_slice(&ciphertext); encrypted } Err(_) => wire.clone(), } } else { wire.clone() }; let _ = svc.shared_state() .cmd_tx .send(crate::mesh::listener::MeshCommand::SendRaw { dest_pubkey_prefix: prefix, payload, }) .await; sent_count += 1; } } } } drop(shared_secrets); info!(request_id, amount_sats, archy_peers = sent_count, "Lightning relay sent (E2E encrypted)"); Ok(serde_json::json!({ "request_id": request_id, "queued": true, "amount_sats": amount_sats, })) } /// mesh.deadman-status — Get dead man's switch status. pub(super) async fn handle_mesh_deadman_status(&self) -> Result { let service = self.mesh_service.read().await; let svc = service.as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; let status = svc.dead_man_switch.status().await; Ok(serde_json::to_value(status)?) } /// mesh.deadman-configure — Configure the dead man's switch. pub(super) async fn handle_mesh_deadman_configure( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let service = self.mesh_service.read().await; let svc = service.as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; let mut config = svc.dead_man_switch.get_config().await; if let Some(enabled) = params.get("enabled").and_then(|v| v.as_bool()) { config.dead_man_enabled = enabled; } if let Some(interval) = params.get("interval_secs").and_then(|v| v.as_u64()) { if interval < 60 { anyhow::bail!("Interval must be at least 60 seconds"); } config.dead_man_interval_secs = interval; } if let (Some(lat), Some(lng)) = ( params.get("lat").and_then(|v| v.as_f64()), params.get("lng").and_then(|v| v.as_f64()), ) { let label = params.get("label").and_then(|v| v.as_str()).map(|s| s.to_string()); config.last_gps = Some(Coordinate::from_degrees(lat, lng, label)); } if let Some(contacts) = params.get("contacts").and_then(|v| v.as_array()) { config.emergency_contacts = contacts .iter() .filter_map(|c| c.as_str().map(|s| s.to_string())) .collect(); } if let Some(msg) = params.get("custom_message").and_then(|v| v.as_str()) { config.custom_message = Some(msg.to_string()); } if let Some(auto_gps) = params.get("auto_gps").and_then(|v| v.as_bool()) { config.auto_include_gps = auto_gps; } svc.dead_man_switch.configure(config).await?; // Reset timer on configure svc.dead_man_switch.check_in().await; let status = svc.dead_man_switch.status().await; info!("Dead man's switch configured"); Ok(serde_json::to_value(status)?) } /// mesh.deadman-checkin — Heartbeat to reset the dead man's switch timer. pub(super) async fn handle_mesh_deadman_checkin(&self) -> Result { let service = self.mesh_service.read().await; let svc = service.as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; svc.dead_man_check_in().await; let remaining = svc.dead_man_switch.time_remaining_secs().await; Ok(serde_json::json!({ "checked_in": true, "time_remaining_secs": remaining, })) } /// mesh.rotate-prekeys — Force prekey rotation for X3DH. pub(super) async fn handle_mesh_rotate_prekeys(&self) -> Result { // Load identity signing key let identity_dir = self.config.data_dir.join("identity"); let node_key_path = identity_dir.join("node_key"); let key_bytes = tokio::fs::read(&node_key_path) .await .map_err(|_| anyhow::anyhow!("Node identity not found"))?; if key_bytes.len() != 32 { anyhow::bail!("Invalid node key"); } let mut seed = [0u8; 32]; seed.copy_from_slice(&key_bytes); let signing_key = ed25519_dalek::SigningKey::from_bytes(&seed); // Generate new prekey bundle let (bundle, _secrets) = mesh::x3dh::generate_prekey_bundle(&signing_key, 10)?; // Save bundle for distribution let bundle_bytes = mesh::x3dh::encode_bundle(&bundle)?; let prekey_dir = self.config.data_dir.join("prekeys"); tokio::fs::create_dir_all(&prekey_dir).await?; tokio::fs::write(prekey_dir.join("bundle.cbor"), &bundle_bytes).await?; info!( one_time_keys = bundle.one_time_prekeys.len(), "Prekey bundle rotated" ); Ok(serde_json::json!({ "rotated": true, "signed_prekey_id": bundle.signed_prekey.id, "one_time_prekeys": bundle.one_time_prekeys.len(), })) } // ─── Radio Diagnostics ───────────────────────────────────────────── /// mesh.test-send — Send test payloads of various sizes to diagnose radio link. /// Sends plain text markers that the receiver can count. pub(super) async fn handle_mesh_test_send( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let contact_id = params["contact_id"] .as_u64() .ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32; // Test modes: "ping" (small), "medium" (80 bytes), "large" (150 bytes), "chunked" (400 bytes) let mode = params["mode"].as_str().unwrap_or("ping"); let count = params["count"].as_u64().unwrap_or(3) as usize; let service = self.mesh_service.read().await; let svc = service.as_ref() .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; let mut sent = 0usize; let test_id = chrono::Utc::now().timestamp() as u32; for i in 0..count { let payload = match mode { "ping" => format!("MESHTEST:{}:{}:PING", test_id, i), "medium" => format!("MESHTEST:{}:{}:{}", test_id, i, "X".repeat(60)), "large" => format!("MESHTEST:{}:{}:{}", test_id, i, "X".repeat(130)), "chunked" => { // Send a TypedEnvelope that requires chunking (>140 base64 chars) let fake_tx = "0".repeat(400); // simulates TX hex let wire = crate::mesh::bitcoin_relay::build_tx_relay_request(&fake_tx, test_id as u64 + i as u64)?; // Send via SendRaw which handles base64 + chunking let peers = svc.peers().await; if let Some(peer) = peers.iter().find(|p| p.contact_id == contact_id) { if let Some(ref pk) = peer.pubkey_hex { if let Ok(pk_bytes) = hex::decode(pk) { if pk_bytes.len() >= 6 { let mut prefix = [0u8; 6]; prefix.copy_from_slice(&pk_bytes[..6]); let _ = svc.shared_state().cmd_tx.send( crate::mesh::listener::MeshCommand::SendRaw { dest_pubkey_prefix: prefix, payload: wire, }, ).await; sent += 1; } } } } // Delay between chunked sends tokio::time::sleep(std::time::Duration::from_secs(3)).await; continue; } _ => format!("MESHTEST:{}:{}:UNKNOWN", test_id, i), }; // Send as plain text for ping/medium/large let msg = svc.send_message(contact_id, &payload).await?; sent += 1; info!(test_id, seq = i, mode, len = payload.len(), "Test message sent"); // Small delay between sends tokio::time::sleep(std::time::Duration::from_millis(1000)).await; } Ok(serde_json::json!({ "test_id": test_id, "mode": mode, "sent": sent, "count": count, })) } }