diff --git a/core/archipelago/src/mesh/listener/assist.rs b/core/archipelago/src/mesh/listener/assist.rs new file mode 100644 index 00000000..cd5b5529 --- /dev/null +++ b/core/archipelago/src/mesh/listener/assist.rs @@ -0,0 +1,234 @@ +//! Mesh-AI assistant (issue #50) — answers `AssistQuery` messages with this +//! node's local LLM and sends the reply back over the mesh. +//! +//! This is the Rust-native lift of Meshroller's "LLM bridge": a trusted peer +//! asks a question over meshcore, an internet/compute-bearing node runs it +//! through a local model (Ollama) and streams the answer back in capped, +//! ordered chunks. Airtime is scarce, so the reply is length-capped and each +//! asker is limited to one in-flight query. + +use super::super::message_types::{self, AssistQueryPayload, AssistResponsePayload, MeshMessageType}; +use super::bitcoin::send_to_peer; +use super::MeshState; +use crate::federation::TrustLevel; +use std::sync::Arc; +use std::time::Duration; +use tracing::{info, warn}; + +/// Local Ollama generate endpoint (same host the Ollama app binds). +const OLLAMA_URL: &str = "http://localhost:11434/api/generate"; +/// Default model when the node hasn't configured one (matches Meshroller). +const DEFAULT_MODEL: &str = "qwen2.5-coder"; +/// Max time to wait on the model before giving up. +const OLLAMA_TIMEOUT: Duration = Duration::from_secs(60); +/// Hard cap on answer length sent over the radio — keeps airtime sane. +const MAX_REPLY_CHARS: usize = 480; +/// Characters of answer text per `AssistResponse` chunk. +const CHUNK_CHARS: usize = 160; + +/// Entry point: gate the query, run the model, send the answer back. +/// Spawned from the dispatch `AssistQuery` arm so the radio loop never blocks. +pub(super) async fn run_assist( + payload: AssistQueryPayload, + sender_contact_id: u32, + sender_name: String, + state: Arc, +) { + // Trust + block gate. + if !is_sender_allowed(&state, sender_contact_id).await { + warn!( + from = sender_contact_id, + name = %sender_name, + "AssistQuery denied — sender not permitted by assistant policy" + ); + // Silent on the wire (no airtime spent on denials); surface to the UI. + let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady { + req_id: payload.req_id, + to_contact_id: sender_contact_id, + error: Some("denied".to_string()), + }); + return; + } + + // One in-flight query per asker. + { + let mut inflight = state.assist_inflight.write().await; + if !inflight.insert(sender_contact_id) { + warn!(from = sender_contact_id, "AssistQuery dropped — asker already has one in flight"); + return; + } + } + + let _ = state.event_tx.send(super::super::types::MeshEvent::AssistQueryReceived { + from_contact_id: sender_contact_id, + prompt: payload.prompt.clone(), + }); + + let model = payload + .model + .clone() + .or_else(|| state.assistant.model.clone()) + .unwrap_or_else(|| DEFAULT_MODEL.to_string()); + + info!( + from = sender_contact_id, + req_id = payload.req_id, + model = %model, + "Answering AI query over mesh" + ); + + let result = call_ollama(&model, &payload.prompt).await; + + match result { + Ok(answer) => { + let (text, _truncated) = cap_reply(&answer); + send_answer_chunks(&state, sender_contact_id, payload.req_id, &text).await; + let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady { + req_id: payload.req_id, + to_contact_id: sender_contact_id, + error: None, + }); + } + Err(e) => { + warn!(req_id = payload.req_id, "AI query failed: {}", e); + send_error(&state, sender_contact_id, payload.req_id, "AI unavailable").await; + let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady { + req_id: payload.req_id, + to_contact_id: sender_contact_id, + error: Some(e.to_string()), + }); + } + } + + state.assist_inflight.write().await.remove(&sender_contact_id); +} + +/// Whether `sender_contact_id` may invoke the assistant under the node's policy. +/// Always denies user-blocked contacts. With `trusted_only`, requires a +/// federation-Trusted match on the peer's pubkey or DID. +async fn is_sender_allowed(state: &Arc, sender_contact_id: u32) -> bool { + let (pubkey_hex, did) = { + let peers = state.peers.read().await; + match peers.get(&sender_contact_id) { + Some(p) => (p.pubkey_hex.clone(), p.did.clone()), + None => (None, None), + } + }; + + // Never answer a user-blocked contact, regardless of policy. + if let Some(ref pk) = pubkey_hex { + if state + .contacts + .read() + .await + .get(pk) + .map(|c| c.blocked) + .unwrap_or(false) + { + return false; + } + } + + if !state.assistant.trusted_only { + return true; + } + + // Trusted-only: match against the federation trust list. + let nodes = crate::federation::load_nodes(&state.data_dir) + .await + .unwrap_or_default(); + nodes.iter().any(|n| { + n.trust_level == TrustLevel::Trusted + && (Some(&n.pubkey) == pubkey_hex.as_ref() || Some(&n.did) == did.as_ref()) + }) +} + +/// Cap the answer to `MAX_REPLY_CHARS`, appending a marker when truncated. +/// Returns (text_to_send, was_truncated). +fn cap_reply(answer: &str) -> (String, bool) { + let trimmed = answer.trim(); + if trimmed.chars().count() <= MAX_REPLY_CHARS { + return (trimmed.to_string(), false); + } + let capped: String = trimmed.chars().take(MAX_REPLY_CHARS).collect(); + (format!("{capped}…(truncated)"), true) +} + +/// Split the answer into ordered `AssistResponse` chunks and send each back to +/// the asker on the encrypted, peer-addressed path. +async fn send_answer_chunks( + state: &Arc, + dest_contact_id: u32, + req_id: u64, + text: &str, +) { + let chars: Vec = text.chars().collect(); + let chunks: Vec = if chars.is_empty() { + vec![String::new()] + } else { + chars.chunks(CHUNK_CHARS).map(|c| c.iter().collect()).collect() + }; + let last = chunks.len().saturating_sub(1); + for (i, chunk) in chunks.into_iter().enumerate() { + let payload = AssistResponsePayload { + req_id, + text: chunk, + seq: i as u16, + done: i == last, + error: None, + }; + send_response(state, dest_contact_id, &payload).await; + } +} + +/// Send a single failure `AssistResponse` (one chunk, done, error set). +async fn send_error(state: &Arc, dest_contact_id: u32, req_id: u64, msg: &str) { + let payload = AssistResponsePayload { + req_id, + text: String::new(), + seq: 0, + done: true, + error: Some(msg.to_string()), + }; + send_response(state, dest_contact_id, &payload).await; +} + +/// Encode an `AssistResponse` payload and send it to a peer. +async fn send_response(state: &Arc, dest_contact_id: u32, payload: &AssistResponsePayload) { + let bytes = match message_types::encode_payload(payload) { + Ok(b) => b, + Err(e) => { + warn!("Failed to encode AssistResponse: {}", e); + return; + } + }; + let envelope = message_types::TypedEnvelope::new(MeshMessageType::AssistResponse, bytes); + match envelope.to_wire() { + Ok(wire) => send_to_peer(state, dest_contact_id, wire).await, + Err(e) => warn!("Failed to encode AssistResponse envelope: {}", e), + } +} + +/// Call the local Ollama model and return the generated text. +async fn call_ollama(model: &str, prompt: &str) -> anyhow::Result { + let client = reqwest::Client::builder().timeout(OLLAMA_TIMEOUT).build()?; + let body = serde_json::json!({ + "model": model, + "prompt": prompt, + "stream": false, + }); + let resp = client.post(OLLAMA_URL).json(&body).send().await?; + if !resp.status().is_success() { + anyhow::bail!("Ollama returned HTTP {}", resp.status()); + } + let json: serde_json::Value = resp.json().await?; + let text = json + .get("response") + .and_then(|r| r.as_str()) + .unwrap_or("") + .to_string(); + if text.trim().is_empty() { + anyhow::bail!("Ollama returned an empty response"); + } + Ok(text) +} diff --git a/core/archipelago/src/mesh/listener/bitcoin.rs b/core/archipelago/src/mesh/listener/bitcoin.rs index 1ee4118b..7e605920 100644 --- a/core/archipelago/src/mesh/listener/bitcoin.rs +++ b/core/archipelago/src/mesh/listener/bitcoin.rs @@ -445,7 +445,9 @@ async fn encrypt_for_peer(state: &Arc, contact_id: u32, typed_wire: & /// Send raw wire bytes to a specific peer by contact_id. /// Encrypts directed messages via ratchet or shared secret when available. /// Falls back to channel 0 broadcast (plaintext) if peer's pubkey is unknown. -async fn send_to_peer(state: &Arc, contact_id: u32, typed_wire: Vec) { +/// `pub(super)` so sibling handlers (e.g. the AI assistant) can reply on the +/// same encrypted, peer-addressed path the relay handlers use. +pub(super) async fn send_to_peer(state: &Arc, contact_id: u32, typed_wire: Vec) { let peers = state.peers.read().await; if let Some(peer) = peers.get(&contact_id) { if let Some(ref pk) = peer.pubkey_hex { diff --git a/core/archipelago/src/mesh/listener/dispatch.rs b/core/archipelago/src/mesh/listener/dispatch.rs index 927fac4a..ea20a626 100644 --- a/core/archipelago/src/mesh/listener/dispatch.rs +++ b/core/archipelago/src/mesh/listener/dispatch.rs @@ -681,6 +681,69 @@ pub(crate) async fn handle_typed_envelope_direct( .await; } + Some(MeshMessageType::AssistQuery) => { + match message_types::decode_payload::(&envelope.v) { + Ok(query) => { + if !state.assistant.enabled { + debug!( + from = sender_contact_id, + "AssistQuery ignored — assistant disabled on this node" + ); + return; + } + info!( + from = sender_contact_id, + req_id = query.req_id, + "AI query received over mesh" + ); + let json = payload_to_json(&query); + store_typed_message( + state, + sender_contact_id, + sender_name, + &query.prompt, + "assist_query", + json, + Some(envelope.seq), + ) + .await; + // Run the model + reply off the radio loop. + let assist_state = Arc::clone(state); + let name = sender_name.to_string(); + tokio::spawn(async move { + super::assist::run_assist(query, sender_contact_id, name, assist_state) + .await; + }); + } + Err(e) => warn!("Failed to decode AssistQuery payload: {}", e), + } + } + + Some(MeshMessageType::AssistResponse) => { + match message_types::decode_payload::(&envelope.v) + { + Ok(resp) => { + let display = resp + .error + .clone() + .map(|e| format!("AI error: {e}")) + .unwrap_or_else(|| resp.text.clone()); + let json = payload_to_json(&resp); + store_typed_message( + state, + sender_contact_id, + sender_name, + &display, + "assist_response", + json, + Some(envelope.seq), + ) + .await; + } + Err(e) => warn!("Failed to decode AssistResponse payload: {}", e), + } + } + _ => { debug!( msg_type = ?msg_type, diff --git a/core/archipelago/src/mesh/listener/mod.rs b/core/archipelago/src/mesh/listener/mod.rs index d52584e1..384f8abb 100644 --- a/core/archipelago/src/mesh/listener/mod.rs +++ b/core/archipelago/src/mesh/listener/mod.rs @@ -7,6 +7,7 @@ //! - Reconnects on device disconnect //! - Manages peer cache and message store +mod assist; mod bitcoin; mod decode; pub(crate) mod dispatch; @@ -121,6 +122,27 @@ pub struct MeshState { /// persistent contact table from regenerating rows the user just /// wiped. Persisted to `mesh-ignored-radio-contacts.json`. pub radio_contact_blocklist: RwLock>, + /// Mesh-AI assistant settings (issue #50): whether this node answers + /// AssistQuery messages with its local LLM, and who may ask. + pub assistant: AssistantConfig, + /// Data dir — lets dispatch handlers reach disk-backed stores (e.g. the + /// federation trust list used to gate AI queries) without threading a path + /// through every call. + pub data_dir: std::path::PathBuf, + /// Contact-ids with an AI query currently being answered. Caps each asker to + /// one in-flight query so a peer can't flood the node's compute / airtime. + pub assist_inflight: RwLock>, +} + +/// Mesh-AI assistant configuration, snapshotted from `MeshConfig` at startup. +#[derive(Debug, Clone)] +pub struct AssistantConfig { + /// Answer AssistQuery messages with the local LLM. + pub enabled: bool, + /// Ollama model to use; None → the built-in default. + pub model: Option, + /// Restrict asking to federation-Trusted peers (vs. anyone on the mesh). + pub trusted_only: bool, } /// Contact metadata kept alongside MeshState.peers. Pinned contacts sort to @@ -153,6 +175,8 @@ impl MeshState { encrypt_relay: bool, session_manager: Arc, our_ed_pubkey_hex: String, + assistant: AssistantConfig, + data_dir: std::path::PathBuf, ) -> ( Arc, broadcast::Receiver, @@ -192,6 +216,9 @@ impl MeshState { our_ed_pubkey_hex, blob_store: RwLock::new(None), radio_contact_blocklist: RwLock::new(HashSet::new()), + assistant, + data_dir, + assist_inflight: RwLock::new(HashSet::new()), }); (state, rx, cmd_rx) } diff --git a/core/archipelago/src/mesh/message_types.rs b/core/archipelago/src/mesh/message_types.rs index c4c768ee..49778858 100644 --- a/core/archipelago/src/mesh/message_types.rs +++ b/core/archipelago/src/mesh/message_types.rs @@ -70,6 +70,12 @@ pub enum MeshMessageType { /// MCIIXXTT framing) and the peer has no Tor path. Recipient writes the /// bytes to its local BlobStore on reassembly. ContentInline = 23, + /// "Ask the node's AI" — a prompt to be answered by the receiving node's + /// local LLM (issue #50). Gated by the assistant config + trust policy. + AssistQuery = 24, + /// Reply to an AssistQuery — a chunk of the LLM's answer, addressed back to + /// the asker by `req_id`. Long answers span multiple chunks (`seq`/`done`). + AssistResponse = 25, } impl MeshMessageType { @@ -99,6 +105,8 @@ impl MeshMessageType { 21 => Some(Self::ChannelInvite), 22 => Some(Self::ContactCard), 23 => Some(Self::ContentInline), + 24 => Some(Self::AssistQuery), + 25 => Some(Self::AssistResponse), _ => None, } } @@ -132,6 +140,8 @@ impl MeshMessageType { "channel_invite" => Some(Self::ChannelInvite), "contact_card" => Some(Self::ContactCard), "content_inline" => Some(Self::ContentInline), + "assist_query" => Some(Self::AssistQuery), + "assist_response" => Some(Self::AssistResponse), _ => None, } } @@ -162,6 +172,8 @@ impl MeshMessageType { Self::ChannelInvite => "channel_invite", Self::ContactCard => "contact_card", Self::ContentInline => "content_inline", + Self::AssistQuery => "assist_query", + Self::AssistResponse => "assist_response", } } } @@ -407,6 +419,37 @@ pub struct LightningRelayPayload { pub request_id: u64, } +/// "Ask the node's AI" request (issue #50). Sent to a peer running a local +/// LLM; answered with one or more `AssistResponsePayload` chunks. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AssistQueryPayload { + /// Asker-chosen id correlating the query with its response chunks. + pub req_id: u64, + /// The natural-language prompt. + pub prompt: String, + /// Optional model override; falls back to the responder's configured model. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub model: Option, +} + +/// One chunk of an AI answer, addressed back to the asker by `req_id`. +/// Airtime is scarce, so long answers are capped and split into ordered chunks. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AssistResponsePayload { + pub req_id: u64, + /// This chunk's text. + pub text: String, + /// 0-based chunk index. + #[serde(default)] + pub seq: u16, + /// True on the final chunk. + #[serde(default)] + pub done: bool, + /// Set instead of `text` when the query failed (model unreachable, denied…). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub error: Option, +} + /// Lightning relay response (proof of payment). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LightningRelayResponsePayload { diff --git a/core/archipelago/src/mesh/mod.rs b/core/archipelago/src/mesh/mod.rs index 46984dd3..5452da06 100644 --- a/core/archipelago/src/mesh/mod.rs +++ b/core/archipelago/src/mesh/mod.rs @@ -176,6 +176,17 @@ pub struct MeshConfig { /// Set to false to disable encryption for debugging or rollback. #[serde(default = "default_true")] pub encrypt_relay_messages: bool, + /// Answer AI queries (AssistQuery) from peers using this node's local LLM + /// (issue #50). Off by default — the node only becomes a mesh AI on opt-in. + #[serde(default)] + pub assistant_enabled: bool, + /// Ollama model used to answer AI queries. None → the built-in default. + #[serde(default)] + pub assistant_model: Option, + /// When true (default), only federation-Trusted peers may ask; when false, + /// any peer on the mesh may ask (spends this node's compute + airtime). + #[serde(default = "default_true")] + pub assistant_trusted_only: bool, } fn default_true() -> bool { @@ -194,6 +205,9 @@ impl Default for MeshConfig { announce_block_headers: false, steganography_mode: steganography::SteganographyMode::Normal, encrypt_relay_messages: true, + assistant_enabled: false, + assistant_model: None, + assistant_trusted_only: true, } } } @@ -362,6 +376,12 @@ impl MeshService { config.encrypt_relay_messages, Arc::clone(&session_manager), ed_pubkey_hex.to_string(), + listener::AssistantConfig { + enabled: config.assistant_enabled, + model: config.assistant_model.clone(), + trusted_only: config.assistant_trusted_only, + }, + data_dir.to_path_buf(), ); // Derive X25519 keys from Ed25519 identity diff --git a/core/archipelago/src/mesh/types.rs b/core/archipelago/src/mesh/types.rs index 349a6ca2..f5eabdb2 100644 --- a/core/archipelago/src/mesh/types.rs +++ b/core/archipelago/src/mesh/types.rs @@ -161,4 +161,15 @@ pub enum MeshEvent { payment_hash: Option, error: Option, }, + /// An AI query arrived from a peer and was accepted for answering (#50). + AssistQueryReceived { + from_contact_id: u32, + prompt: String, + }, + /// A local-AI answer finished sending back to the asker (or failed) (#50). + AssistResponseReady { + req_id: u64, + to_contact_id: u32, + error: Option, + }, }