From d8d014bfd9d609e74a9f48df7cb95be74d7662e6 Mon Sep 17 00:00:00 2001 From: archipelago Date: Wed, 17 Jun 2026 17:41:15 -0400 Subject: [PATCH 1/5] =?UTF-8?q?feat(mesh):=20mesh-AI=20assistant=20?= =?UTF-8?q?=E2=80=94=20Phase=201.1-1.4=20(issue=20#50)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rust-native lift of Meshroller's LLM bridge. Adds typed AssistQuery/ AssistResponse mesh messages, a trust-gated inbound handler that answers with the node's local Ollama model, and airtime discipline (reply cap, chunking, one in-flight query per asker). Works over both meshcore and Meshtastic radios via the existing MeshRadioDevice abstraction. - message_types: AssistQuery=24 / AssistResponse=25 + payloads - listener/assist.rs: run_assist (gate -> Ollama -> chunked reply) - listener/dispatch.rs: AssistQuery/AssistResponse arms - MeshConfig: assistant_enabled / assistant_model / assistant_trusted_only - MeshState: AssistantConfig + data_dir + in-flight guard Compiles clean (cargo check). Off by default. Co-Authored-By: Claude Opus 4.8 (1M context) --- core/archipelago/src/mesh/listener/assist.rs | 234 ++++++++++++++++++ core/archipelago/src/mesh/listener/bitcoin.rs | 4 +- .../archipelago/src/mesh/listener/dispatch.rs | 63 +++++ core/archipelago/src/mesh/listener/mod.rs | 27 ++ core/archipelago/src/mesh/message_types.rs | 43 ++++ core/archipelago/src/mesh/mod.rs | 20 ++ core/archipelago/src/mesh/types.rs | 11 + 7 files changed, 401 insertions(+), 1 deletion(-) create mode 100644 core/archipelago/src/mesh/listener/assist.rs diff --git a/core/archipelago/src/mesh/listener/assist.rs b/core/archipelago/src/mesh/listener/assist.rs new file mode 100644 index 00000000..cd5b5529 --- /dev/null +++ b/core/archipelago/src/mesh/listener/assist.rs @@ -0,0 +1,234 @@ +//! Mesh-AI assistant (issue #50) — answers `AssistQuery` messages with this +//! node's local LLM and sends the reply back over the mesh. +//! +//! This is the Rust-native lift of Meshroller's "LLM bridge": a trusted peer +//! asks a question over meshcore, an internet/compute-bearing node runs it +//! through a local model (Ollama) and streams the answer back in capped, +//! ordered chunks. Airtime is scarce, so the reply is length-capped and each +//! asker is limited to one in-flight query. + +use super::super::message_types::{self, AssistQueryPayload, AssistResponsePayload, MeshMessageType}; +use super::bitcoin::send_to_peer; +use super::MeshState; +use crate::federation::TrustLevel; +use std::sync::Arc; +use std::time::Duration; +use tracing::{info, warn}; + +/// Local Ollama generate endpoint (same host the Ollama app binds). +const OLLAMA_URL: &str = "http://localhost:11434/api/generate"; +/// Default model when the node hasn't configured one (matches Meshroller). +const DEFAULT_MODEL: &str = "qwen2.5-coder"; +/// Max time to wait on the model before giving up. +const OLLAMA_TIMEOUT: Duration = Duration::from_secs(60); +/// Hard cap on answer length sent over the radio — keeps airtime sane. +const MAX_REPLY_CHARS: usize = 480; +/// Characters of answer text per `AssistResponse` chunk. +const CHUNK_CHARS: usize = 160; + +/// Entry point: gate the query, run the model, send the answer back. +/// Spawned from the dispatch `AssistQuery` arm so the radio loop never blocks. +pub(super) async fn run_assist( + payload: AssistQueryPayload, + sender_contact_id: u32, + sender_name: String, + state: Arc, +) { + // Trust + block gate. + if !is_sender_allowed(&state, sender_contact_id).await { + warn!( + from = sender_contact_id, + name = %sender_name, + "AssistQuery denied — sender not permitted by assistant policy" + ); + // Silent on the wire (no airtime spent on denials); surface to the UI. + let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady { + req_id: payload.req_id, + to_contact_id: sender_contact_id, + error: Some("denied".to_string()), + }); + return; + } + + // One in-flight query per asker. + { + let mut inflight = state.assist_inflight.write().await; + if !inflight.insert(sender_contact_id) { + warn!(from = sender_contact_id, "AssistQuery dropped — asker already has one in flight"); + return; + } + } + + let _ = state.event_tx.send(super::super::types::MeshEvent::AssistQueryReceived { + from_contact_id: sender_contact_id, + prompt: payload.prompt.clone(), + }); + + let model = payload + .model + .clone() + .or_else(|| state.assistant.model.clone()) + .unwrap_or_else(|| DEFAULT_MODEL.to_string()); + + info!( + from = sender_contact_id, + req_id = payload.req_id, + model = %model, + "Answering AI query over mesh" + ); + + let result = call_ollama(&model, &payload.prompt).await; + + match result { + Ok(answer) => { + let (text, _truncated) = cap_reply(&answer); + send_answer_chunks(&state, sender_contact_id, payload.req_id, &text).await; + let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady { + req_id: payload.req_id, + to_contact_id: sender_contact_id, + error: None, + }); + } + Err(e) => { + warn!(req_id = payload.req_id, "AI query failed: {}", e); + send_error(&state, sender_contact_id, payload.req_id, "AI unavailable").await; + let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady { + req_id: payload.req_id, + to_contact_id: sender_contact_id, + error: Some(e.to_string()), + }); + } + } + + state.assist_inflight.write().await.remove(&sender_contact_id); +} + +/// Whether `sender_contact_id` may invoke the assistant under the node's policy. +/// Always denies user-blocked contacts. With `trusted_only`, requires a +/// federation-Trusted match on the peer's pubkey or DID. +async fn is_sender_allowed(state: &Arc, sender_contact_id: u32) -> bool { + let (pubkey_hex, did) = { + let peers = state.peers.read().await; + match peers.get(&sender_contact_id) { + Some(p) => (p.pubkey_hex.clone(), p.did.clone()), + None => (None, None), + } + }; + + // Never answer a user-blocked contact, regardless of policy. + if let Some(ref pk) = pubkey_hex { + if state + .contacts + .read() + .await + .get(pk) + .map(|c| c.blocked) + .unwrap_or(false) + { + return false; + } + } + + if !state.assistant.trusted_only { + return true; + } + + // Trusted-only: match against the federation trust list. + let nodes = crate::federation::load_nodes(&state.data_dir) + .await + .unwrap_or_default(); + nodes.iter().any(|n| { + n.trust_level == TrustLevel::Trusted + && (Some(&n.pubkey) == pubkey_hex.as_ref() || Some(&n.did) == did.as_ref()) + }) +} + +/// Cap the answer to `MAX_REPLY_CHARS`, appending a marker when truncated. +/// Returns (text_to_send, was_truncated). +fn cap_reply(answer: &str) -> (String, bool) { + let trimmed = answer.trim(); + if trimmed.chars().count() <= MAX_REPLY_CHARS { + return (trimmed.to_string(), false); + } + let capped: String = trimmed.chars().take(MAX_REPLY_CHARS).collect(); + (format!("{capped}…(truncated)"), true) +} + +/// Split the answer into ordered `AssistResponse` chunks and send each back to +/// the asker on the encrypted, peer-addressed path. +async fn send_answer_chunks( + state: &Arc, + dest_contact_id: u32, + req_id: u64, + text: &str, +) { + let chars: Vec = text.chars().collect(); + let chunks: Vec = if chars.is_empty() { + vec![String::new()] + } else { + chars.chunks(CHUNK_CHARS).map(|c| c.iter().collect()).collect() + }; + let last = chunks.len().saturating_sub(1); + for (i, chunk) in chunks.into_iter().enumerate() { + let payload = AssistResponsePayload { + req_id, + text: chunk, + seq: i as u16, + done: i == last, + error: None, + }; + send_response(state, dest_contact_id, &payload).await; + } +} + +/// Send a single failure `AssistResponse` (one chunk, done, error set). +async fn send_error(state: &Arc, dest_contact_id: u32, req_id: u64, msg: &str) { + let payload = AssistResponsePayload { + req_id, + text: String::new(), + seq: 0, + done: true, + error: Some(msg.to_string()), + }; + send_response(state, dest_contact_id, &payload).await; +} + +/// Encode an `AssistResponse` payload and send it to a peer. +async fn send_response(state: &Arc, dest_contact_id: u32, payload: &AssistResponsePayload) { + let bytes = match message_types::encode_payload(payload) { + Ok(b) => b, + Err(e) => { + warn!("Failed to encode AssistResponse: {}", e); + return; + } + }; + let envelope = message_types::TypedEnvelope::new(MeshMessageType::AssistResponse, bytes); + match envelope.to_wire() { + Ok(wire) => send_to_peer(state, dest_contact_id, wire).await, + Err(e) => warn!("Failed to encode AssistResponse envelope: {}", e), + } +} + +/// Call the local Ollama model and return the generated text. +async fn call_ollama(model: &str, prompt: &str) -> anyhow::Result { + let client = reqwest::Client::builder().timeout(OLLAMA_TIMEOUT).build()?; + let body = serde_json::json!({ + "model": model, + "prompt": prompt, + "stream": false, + }); + let resp = client.post(OLLAMA_URL).json(&body).send().await?; + if !resp.status().is_success() { + anyhow::bail!("Ollama returned HTTP {}", resp.status()); + } + let json: serde_json::Value = resp.json().await?; + let text = json + .get("response") + .and_then(|r| r.as_str()) + .unwrap_or("") + .to_string(); + if text.trim().is_empty() { + anyhow::bail!("Ollama returned an empty response"); + } + Ok(text) +} diff --git a/core/archipelago/src/mesh/listener/bitcoin.rs b/core/archipelago/src/mesh/listener/bitcoin.rs index 1ee4118b..7e605920 100644 --- a/core/archipelago/src/mesh/listener/bitcoin.rs +++ b/core/archipelago/src/mesh/listener/bitcoin.rs @@ -445,7 +445,9 @@ async fn encrypt_for_peer(state: &Arc, contact_id: u32, typed_wire: & /// Send raw wire bytes to a specific peer by contact_id. /// Encrypts directed messages via ratchet or shared secret when available. /// Falls back to channel 0 broadcast (plaintext) if peer's pubkey is unknown. -async fn send_to_peer(state: &Arc, contact_id: u32, typed_wire: Vec) { +/// `pub(super)` so sibling handlers (e.g. the AI assistant) can reply on the +/// same encrypted, peer-addressed path the relay handlers use. +pub(super) async fn send_to_peer(state: &Arc, contact_id: u32, typed_wire: Vec) { let peers = state.peers.read().await; if let Some(peer) = peers.get(&contact_id) { if let Some(ref pk) = peer.pubkey_hex { diff --git a/core/archipelago/src/mesh/listener/dispatch.rs b/core/archipelago/src/mesh/listener/dispatch.rs index 927fac4a..ea20a626 100644 --- a/core/archipelago/src/mesh/listener/dispatch.rs +++ b/core/archipelago/src/mesh/listener/dispatch.rs @@ -681,6 +681,69 @@ pub(crate) async fn handle_typed_envelope_direct( .await; } + Some(MeshMessageType::AssistQuery) => { + match message_types::decode_payload::(&envelope.v) { + Ok(query) => { + if !state.assistant.enabled { + debug!( + from = sender_contact_id, + "AssistQuery ignored — assistant disabled on this node" + ); + return; + } + info!( + from = sender_contact_id, + req_id = query.req_id, + "AI query received over mesh" + ); + let json = payload_to_json(&query); + store_typed_message( + state, + sender_contact_id, + sender_name, + &query.prompt, + "assist_query", + json, + Some(envelope.seq), + ) + .await; + // Run the model + reply off the radio loop. + let assist_state = Arc::clone(state); + let name = sender_name.to_string(); + tokio::spawn(async move { + super::assist::run_assist(query, sender_contact_id, name, assist_state) + .await; + }); + } + Err(e) => warn!("Failed to decode AssistQuery payload: {}", e), + } + } + + Some(MeshMessageType::AssistResponse) => { + match message_types::decode_payload::(&envelope.v) + { + Ok(resp) => { + let display = resp + .error + .clone() + .map(|e| format!("AI error: {e}")) + .unwrap_or_else(|| resp.text.clone()); + let json = payload_to_json(&resp); + store_typed_message( + state, + sender_contact_id, + sender_name, + &display, + "assist_response", + json, + Some(envelope.seq), + ) + .await; + } + Err(e) => warn!("Failed to decode AssistResponse payload: {}", e), + } + } + _ => { debug!( msg_type = ?msg_type, diff --git a/core/archipelago/src/mesh/listener/mod.rs b/core/archipelago/src/mesh/listener/mod.rs index d52584e1..384f8abb 100644 --- a/core/archipelago/src/mesh/listener/mod.rs +++ b/core/archipelago/src/mesh/listener/mod.rs @@ -7,6 +7,7 @@ //! - Reconnects on device disconnect //! - Manages peer cache and message store +mod assist; mod bitcoin; mod decode; pub(crate) mod dispatch; @@ -121,6 +122,27 @@ pub struct MeshState { /// persistent contact table from regenerating rows the user just /// wiped. Persisted to `mesh-ignored-radio-contacts.json`. pub radio_contact_blocklist: RwLock>, + /// Mesh-AI assistant settings (issue #50): whether this node answers + /// AssistQuery messages with its local LLM, and who may ask. + pub assistant: AssistantConfig, + /// Data dir — lets dispatch handlers reach disk-backed stores (e.g. the + /// federation trust list used to gate AI queries) without threading a path + /// through every call. + pub data_dir: std::path::PathBuf, + /// Contact-ids with an AI query currently being answered. Caps each asker to + /// one in-flight query so a peer can't flood the node's compute / airtime. + pub assist_inflight: RwLock>, +} + +/// Mesh-AI assistant configuration, snapshotted from `MeshConfig` at startup. +#[derive(Debug, Clone)] +pub struct AssistantConfig { + /// Answer AssistQuery messages with the local LLM. + pub enabled: bool, + /// Ollama model to use; None → the built-in default. + pub model: Option, + /// Restrict asking to federation-Trusted peers (vs. anyone on the mesh). + pub trusted_only: bool, } /// Contact metadata kept alongside MeshState.peers. Pinned contacts sort to @@ -153,6 +175,8 @@ impl MeshState { encrypt_relay: bool, session_manager: Arc, our_ed_pubkey_hex: String, + assistant: AssistantConfig, + data_dir: std::path::PathBuf, ) -> ( Arc, broadcast::Receiver, @@ -192,6 +216,9 @@ impl MeshState { our_ed_pubkey_hex, blob_store: RwLock::new(None), radio_contact_blocklist: RwLock::new(HashSet::new()), + assistant, + data_dir, + assist_inflight: RwLock::new(HashSet::new()), }); (state, rx, cmd_rx) } diff --git a/core/archipelago/src/mesh/message_types.rs b/core/archipelago/src/mesh/message_types.rs index c4c768ee..49778858 100644 --- a/core/archipelago/src/mesh/message_types.rs +++ b/core/archipelago/src/mesh/message_types.rs @@ -70,6 +70,12 @@ pub enum MeshMessageType { /// MCIIXXTT framing) and the peer has no Tor path. Recipient writes the /// bytes to its local BlobStore on reassembly. ContentInline = 23, + /// "Ask the node's AI" — a prompt to be answered by the receiving node's + /// local LLM (issue #50). Gated by the assistant config + trust policy. + AssistQuery = 24, + /// Reply to an AssistQuery — a chunk of the LLM's answer, addressed back to + /// the asker by `req_id`. Long answers span multiple chunks (`seq`/`done`). + AssistResponse = 25, } impl MeshMessageType { @@ -99,6 +105,8 @@ impl MeshMessageType { 21 => Some(Self::ChannelInvite), 22 => Some(Self::ContactCard), 23 => Some(Self::ContentInline), + 24 => Some(Self::AssistQuery), + 25 => Some(Self::AssistResponse), _ => None, } } @@ -132,6 +140,8 @@ impl MeshMessageType { "channel_invite" => Some(Self::ChannelInvite), "contact_card" => Some(Self::ContactCard), "content_inline" => Some(Self::ContentInline), + "assist_query" => Some(Self::AssistQuery), + "assist_response" => Some(Self::AssistResponse), _ => None, } } @@ -162,6 +172,8 @@ impl MeshMessageType { Self::ChannelInvite => "channel_invite", Self::ContactCard => "contact_card", Self::ContentInline => "content_inline", + Self::AssistQuery => "assist_query", + Self::AssistResponse => "assist_response", } } } @@ -407,6 +419,37 @@ pub struct LightningRelayPayload { pub request_id: u64, } +/// "Ask the node's AI" request (issue #50). Sent to a peer running a local +/// LLM; answered with one or more `AssistResponsePayload` chunks. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AssistQueryPayload { + /// Asker-chosen id correlating the query with its response chunks. + pub req_id: u64, + /// The natural-language prompt. + pub prompt: String, + /// Optional model override; falls back to the responder's configured model. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub model: Option, +} + +/// One chunk of an AI answer, addressed back to the asker by `req_id`. +/// Airtime is scarce, so long answers are capped and split into ordered chunks. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AssistResponsePayload { + pub req_id: u64, + /// This chunk's text. + pub text: String, + /// 0-based chunk index. + #[serde(default)] + pub seq: u16, + /// True on the final chunk. + #[serde(default)] + pub done: bool, + /// Set instead of `text` when the query failed (model unreachable, denied…). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub error: Option, +} + /// Lightning relay response (proof of payment). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LightningRelayResponsePayload { diff --git a/core/archipelago/src/mesh/mod.rs b/core/archipelago/src/mesh/mod.rs index 46984dd3..5452da06 100644 --- a/core/archipelago/src/mesh/mod.rs +++ b/core/archipelago/src/mesh/mod.rs @@ -176,6 +176,17 @@ pub struct MeshConfig { /// Set to false to disable encryption for debugging or rollback. #[serde(default = "default_true")] pub encrypt_relay_messages: bool, + /// Answer AI queries (AssistQuery) from peers using this node's local LLM + /// (issue #50). Off by default — the node only becomes a mesh AI on opt-in. + #[serde(default)] + pub assistant_enabled: bool, + /// Ollama model used to answer AI queries. None → the built-in default. + #[serde(default)] + pub assistant_model: Option, + /// When true (default), only federation-Trusted peers may ask; when false, + /// any peer on the mesh may ask (spends this node's compute + airtime). + #[serde(default = "default_true")] + pub assistant_trusted_only: bool, } fn default_true() -> bool { @@ -194,6 +205,9 @@ impl Default for MeshConfig { announce_block_headers: false, steganography_mode: steganography::SteganographyMode::Normal, encrypt_relay_messages: true, + assistant_enabled: false, + assistant_model: None, + assistant_trusted_only: true, } } } @@ -362,6 +376,12 @@ impl MeshService { config.encrypt_relay_messages, Arc::clone(&session_manager), ed_pubkey_hex.to_string(), + listener::AssistantConfig { + enabled: config.assistant_enabled, + model: config.assistant_model.clone(), + trusted_only: config.assistant_trusted_only, + }, + data_dir.to_path_buf(), ); // Derive X25519 keys from Ed25519 identity diff --git a/core/archipelago/src/mesh/types.rs b/core/archipelago/src/mesh/types.rs index 349a6ca2..f5eabdb2 100644 --- a/core/archipelago/src/mesh/types.rs +++ b/core/archipelago/src/mesh/types.rs @@ -161,4 +161,15 @@ pub enum MeshEvent { payment_hash: Option, error: Option, }, + /// An AI query arrived from a peer and was accepted for answering (#50). + AssistQueryReceived { + from_contact_id: u32, + prompt: String, + }, + /// A local-AI answer finished sending back to the asker (or failed) (#50). + AssistResponseReady { + req_id: u64, + to_contact_id: u32, + error: Option, + }, } From 87d0d532057e400722c188ded0c69419de5abd74 Mon Sep 17 00:00:00 2001 From: archipelago Date: Wed, 17 Jun 2026 17:59:03 -0400 Subject: [PATCH 2/5] =?UTF-8?q?feat(mesh):=20assistant=20Phase=201.5=20?= =?UTF-8?q?=E2=80=94=20!ai=20channel=20trigger=20(issue=20#50)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A plain '!ai ' / '!ask ' on the channel is now answered by the node's local model and broadcast back as plain text, so ANY client (bare meshcore or Meshtastic) can ask. Generalised run_assist with an AssistReply target: Typed chunks to a peer (archipelago UI path) vs plain channel-text (bare clients). Trust/rate gate unchanged; asker identity is separate from reply mode. Works over both radios. Co-Authored-By: Claude Opus 4.8 (1M context) --- core/archipelago/src/mesh/listener/assist.rs | 158 ++++++++++++------ core/archipelago/src/mesh/listener/decode.rs | 39 +++++ .../archipelago/src/mesh/listener/dispatch.rs | 17 +- 3 files changed, 157 insertions(+), 57 deletions(-) diff --git a/core/archipelago/src/mesh/listener/assist.rs b/core/archipelago/src/mesh/listener/assist.rs index cd5b5529..f1b25f64 100644 --- a/core/archipelago/src/mesh/listener/assist.rs +++ b/core/archipelago/src/mesh/listener/assist.rs @@ -7,9 +7,9 @@ //! ordered chunks. Airtime is scarce, so the reply is length-capped and each //! asker is limited to one in-flight query. -use super::super::message_types::{self, AssistQueryPayload, AssistResponsePayload, MeshMessageType}; +use super::super::message_types::{self, AssistResponsePayload, MeshMessageType}; use super::bitcoin::send_to_peer; -use super::MeshState; +use super::{MeshCommand, MeshState}; use crate::federation::TrustLevel; use std::sync::Arc; use std::time::Duration; @@ -25,26 +25,44 @@ const OLLAMA_TIMEOUT: Duration = Duration::from_secs(60); const MAX_REPLY_CHARS: usize = 480; /// Characters of answer text per `AssistResponse` chunk. const CHUNK_CHARS: usize = 160; +/// Tighter cap for plain-text channel replies (bare `!ai` clients) — these +/// aren't reassembled by an archipelago UI, so keep them to a couple frames. +const CHANNEL_REPLY_CHARS: usize = 200; -/// Entry point: gate the query, run the model, send the answer back. -/// Spawned from the dispatch `AssistQuery` arm so the radio loop never blocks. +/// Where an answer should go. +pub(super) enum AssistReply { + /// Typed `AssistResponse` chunks addressed to one peer — the archipelago + /// UI path (rich, reassembled, correlated by `req_id`). + Typed { contact_id: u32 }, + /// Plain-text broadcast on a mesh channel — the bare `!ai` path, so any + /// client (including non-archipelago meshcore/Meshtastic nodes) sees it. + ChannelText { channel: u8 }, +} + +/// Entry point: gate the query, run the model, send the answer back via the +/// requested reply path. Spawned off the radio loop so it never blocks. pub(super) async fn run_assist( - payload: AssistQueryPayload, - sender_contact_id: u32, + prompt: String, + model_override: Option, + req_id: u64, + asker_contact_id: u32, sender_name: String, + reply: AssistReply, state: Arc, ) { + let asker = asker_contact_id; + // Trust + block gate. - if !is_sender_allowed(&state, sender_contact_id).await { + if !is_sender_allowed(&state, asker).await { warn!( - from = sender_contact_id, + from = asker, name = %sender_name, "AssistQuery denied — sender not permitted by assistant policy" ); // Silent on the wire (no airtime spent on denials); surface to the UI. let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady { - req_id: payload.req_id, - to_contact_id: sender_contact_id, + req_id, + to_contact_id: asker, error: Some("denied".to_string()), }); return; @@ -53,54 +71,46 @@ pub(super) async fn run_assist( // One in-flight query per asker. { let mut inflight = state.assist_inflight.write().await; - if !inflight.insert(sender_contact_id) { - warn!(from = sender_contact_id, "AssistQuery dropped — asker already has one in flight"); + if !inflight.insert(asker) { + warn!(from = asker, "AssistQuery dropped — asker already has one in flight"); return; } } let _ = state.event_tx.send(super::super::types::MeshEvent::AssistQueryReceived { - from_contact_id: sender_contact_id, - prompt: payload.prompt.clone(), + from_contact_id: asker, + prompt: prompt.clone(), }); - let model = payload - .model - .clone() + let model = model_override .or_else(|| state.assistant.model.clone()) .unwrap_or_else(|| DEFAULT_MODEL.to_string()); - info!( - from = sender_contact_id, - req_id = payload.req_id, - model = %model, - "Answering AI query over mesh" - ); + info!(from = asker, req_id, model = %model, "Answering AI query over mesh"); - let result = call_ollama(&model, &payload.prompt).await; + let result = call_ollama(&model, &prompt).await; match result { Ok(answer) => { - let (text, _truncated) = cap_reply(&answer); - send_answer_chunks(&state, sender_contact_id, payload.req_id, &text).await; + send_reply(&state, &reply, req_id, &answer).await; let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady { - req_id: payload.req_id, - to_contact_id: sender_contact_id, + req_id, + to_contact_id: asker, error: None, }); } Err(e) => { - warn!(req_id = payload.req_id, "AI query failed: {}", e); - send_error(&state, sender_contact_id, payload.req_id, "AI unavailable").await; + warn!(req_id, "AI query failed: {}", e); + send_failure(&state, &reply, req_id, "AI unavailable").await; let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady { - req_id: payload.req_id, - to_contact_id: sender_contact_id, + req_id, + to_contact_id: asker, error: Some(e.to_string()), }); } } - state.assist_inflight.write().await.remove(&sender_contact_id); + state.assist_inflight.write().await.remove(&asker); } /// Whether `sender_contact_id` may invoke the assistant under the node's policy. @@ -154,14 +164,42 @@ fn cap_reply(answer: &str) -> (String, bool) { (format!("{capped}…(truncated)"), true) } +/// Send a successful answer via the requested reply path. +async fn send_reply(state: &Arc, reply: &AssistReply, req_id: u64, answer: &str) { + match reply { + AssistReply::Typed { contact_id } => { + let (text, _) = cap_reply(answer); + send_typed_chunks(state, *contact_id, req_id, &text).await; + } + AssistReply::ChannelText { channel } => { + let text = cap_channel(answer); + send_channel_text(state, *channel, &text).await; + } + } +} + +/// Send a failure notice via the requested reply path. +async fn send_failure(state: &Arc, reply: &AssistReply, req_id: u64, msg: &str) { + match reply { + AssistReply::Typed { contact_id } => { + let payload = AssistResponsePayload { + req_id, + text: String::new(), + seq: 0, + done: true, + error: Some(msg.to_string()), + }; + send_typed_response(state, *contact_id, &payload).await; + } + AssistReply::ChannelText { channel } => { + send_channel_text(state, *channel, &format!("AI: {msg}")).await; + } + } +} + /// Split the answer into ordered `AssistResponse` chunks and send each back to -/// the asker on the encrypted, peer-addressed path. -async fn send_answer_chunks( - state: &Arc, - dest_contact_id: u32, - req_id: u64, - text: &str, -) { +/// the asker on the encrypted, peer-addressed path (archipelago UI path). +async fn send_typed_chunks(state: &Arc, dest_contact_id: u32, req_id: u64, text: &str) { let chars: Vec = text.chars().collect(); let chunks: Vec = if chars.is_empty() { vec![String::new()] @@ -177,24 +215,16 @@ async fn send_answer_chunks( done: i == last, error: None, }; - send_response(state, dest_contact_id, &payload).await; + send_typed_response(state, dest_contact_id, &payload).await; } } -/// Send a single failure `AssistResponse` (one chunk, done, error set). -async fn send_error(state: &Arc, dest_contact_id: u32, req_id: u64, msg: &str) { - let payload = AssistResponsePayload { - req_id, - text: String::new(), - seq: 0, - done: true, - error: Some(msg.to_string()), - }; - send_response(state, dest_contact_id, &payload).await; -} - /// Encode an `AssistResponse` payload and send it to a peer. -async fn send_response(state: &Arc, dest_contact_id: u32, payload: &AssistResponsePayload) { +async fn send_typed_response( + state: &Arc, + dest_contact_id: u32, + payload: &AssistResponsePayload, +) { let bytes = match message_types::encode_payload(payload) { Ok(b) => b, Err(e) => { @@ -209,6 +239,26 @@ async fn send_response(state: &Arc, dest_contact_id: u32, payload: &A } } +/// Broadcast a plain-text answer on a channel for bare `!ai` clients. +async fn send_channel_text(state: &Arc, channel: u8, text: &str) { + let _ = state + .send_cmd(MeshCommand::BroadcastChannel { + channel, + payload: text.as_bytes().to_vec(), + }) + .await; +} + +/// Cap a plain-text channel reply to a couple of frames. +fn cap_channel(answer: &str) -> String { + let trimmed = answer.trim(); + if trimmed.chars().count() <= CHANNEL_REPLY_CHARS { + return format!("AI: {trimmed}"); + } + let capped: String = trimmed.chars().take(CHANNEL_REPLY_CHARS).collect(); + format!("AI: {capped}…") +} + /// Call the local Ollama model and return the generated text. async fn call_ollama(model: &str, prompt: &str) -> anyhow::Result { let client = reqwest::Client::builder().timeout(OLLAMA_TIMEOUT).build()?; diff --git a/core/archipelago/src/mesh/listener/decode.rs b/core/archipelago/src/mesh/listener/decode.rs index 645e4927..4bdf6d9f 100644 --- a/core/archipelago/src/mesh/listener/decode.rs +++ b/core/archipelago/src/mesh/listener/decode.rs @@ -352,6 +352,45 @@ pub(super) async fn store_plain_message( state.store_message(msg.clone()).await; state.status.write().await.messages_received += 1; let _ = state.event_tx.send(MeshEvent::MessageReceived(msg)); + + // Mesh-AI assistant (issue #50): a plain `!ai`/`!ask ` on the + // channel is answered by this node's local model when the assistant is on. + // Reply goes back as plain channel text so bare (non-archipelago) clients + // see it. The trust/rate gate lives in run_assist. + if state.assistant.enabled { + if let Some(prompt) = strip_ai_trigger(text) { + if !prompt.is_empty() { + let req_id = state.next_id().await; + let prompt = prompt.to_string(); + let name = peer_name.to_string(); + let st = Arc::clone(state); + tokio::spawn(async move { + super::assist::run_assist( + prompt, + None, + req_id, + contact_id, + name, + super::assist::AssistReply::ChannelText { channel: 0 }, + st, + ) + .await; + }); + } + } + } +} + +/// Recognise a `!ai`/`!ask ` command prefix (case-insensitive) and return the +/// trimmed question after it, or `None` if the text isn't an AI command. +fn strip_ai_trigger(text: &str) -> Option<&str> { + let t = text.trim_start(); + for p in ["!ai ", "!ask "] { + if t.len() >= p.len() && t[..p.len()].eq_ignore_ascii_case(p) { + return Some(t[p.len()..].trim()); + } + } + None } /// Handle a received identity broadcast from a peer. diff --git a/core/archipelago/src/mesh/listener/dispatch.rs b/core/archipelago/src/mesh/listener/dispatch.rs index ea20a626..5a558ae0 100644 --- a/core/archipelago/src/mesh/listener/dispatch.rs +++ b/core/archipelago/src/mesh/listener/dispatch.rs @@ -707,12 +707,23 @@ pub(crate) async fn handle_typed_envelope_direct( Some(envelope.seq), ) .await; - // Run the model + reply off the radio loop. + // Run the model + reply off the radio loop. Typed query → + // typed chunked reply back to the asking peer. let assist_state = Arc::clone(state); let name = sender_name.to_string(); tokio::spawn(async move { - super::assist::run_assist(query, sender_contact_id, name, assist_state) - .await; + super::assist::run_assist( + query.prompt, + query.model, + query.req_id, + sender_contact_id, + name, + super::assist::AssistReply::Typed { + contact_id: sender_contact_id, + }, + assist_state, + ) + .await; }); } Err(e) => warn!("Failed to decode AssistQuery payload: {}", e), From ef601c6d2638a330b3d90e24a50af8a63141d265 Mon Sep 17 00:00:00 2001 From: archipelago Date: Wed, 17 Jun 2026 18:20:12 -0400 Subject: [PATCH 3/5] feat(mesh): wire ARCHY identity broadcast for trust over both radios (#50) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ARCHY:2 identity broadcast (DID + ed25519 + x25519) was unwired dead code on both send and receive. Wiring it lets a radio peer prove its archipelago identity, so the assistant's trusted-only gate (and encrypted DMs) work over meshcore AND Meshtastic — the latter otherwise only exposes synthetic node keys. - session.rs: broadcast ARCHY:2 as channel text at startup + each advert tick - frames.rs: parse inbound ARCHY:2 on the channel path, dedupe-keyed by archipelago pubkey (federation_peer_contact_id) so it MERGES with the federation-seeded peer instead of duplicating; self-echo guarded - threads our_x25519_secret into handle_channel_payload (was reserved) Reuses the existing handle_identity_received verifier (ed/x25519 consistency check + shared-secret derivation). Compiles clean. Needs a live 2-radio test before trusting trusted-only over radio. Co-Authored-By: Claude Opus 4.8 (1M context) --- core/archipelago/src/mesh/listener/frames.rs | 48 ++++++++++++++++--- core/archipelago/src/mesh/listener/session.rs | 22 ++++++++- 2 files changed, 62 insertions(+), 8 deletions(-) diff --git a/core/archipelago/src/mesh/listener/frames.rs b/core/archipelago/src/mesh/listener/frames.rs index 82658208..78fbba36 100644 --- a/core/archipelago/src/mesh/listener/frames.rs +++ b/core/archipelago/src/mesh/listener/frames.rs @@ -3,8 +3,8 @@ use super::super::message_types::TypedEnvelope; use super::super::protocol; use super::decode::{ - is_mc_chunk_frame, resolve_peer, store_plain_message, try_base64_typed, try_chunk_reassemble, - try_decrypt_base64, try_decrypt_ratchet_base64, + handle_identity_received, is_mc_chunk_frame, resolve_peer, store_plain_message, + try_base64_typed, try_chunk_reassemble, try_decrypt_base64, try_decrypt_ratchet_base64, }; use super::dispatch::handle_typed_message; use super::MeshState; @@ -18,7 +18,6 @@ pub(super) async fn handle_frame( state: &Arc, our_x25519_secret: &[u8; 32], ) -> bool { - let _ = our_x25519_secret; // reserved for future per-frame decryption match frame.code { protocol::PUSH_NEW_CONTACT | protocol::PUSH_CONTACT_ADVERT => { info!( @@ -109,7 +108,8 @@ pub(super) async fn handle_frame( match protocol::parse_channel_msg_v3_raw(&frame.data) { Ok((channel_idx, payload)) => { if !payload.is_empty() { - handle_channel_payload(state, channel_idx, &payload).await; + handle_channel_payload(state, channel_idx, &payload, our_x25519_secret) + .await; } } Err(e) => warn!("Failed to parse v3 channel message: {}", e), @@ -121,7 +121,8 @@ pub(super) async fn handle_frame( match protocol::parse_channel_msg_v1_raw(&frame.data) { Ok((channel_idx, payload)) => { if !payload.is_empty() { - handle_channel_payload(state, channel_idx, &payload).await; + handle_channel_payload(state, channel_idx, &payload, our_x25519_secret) + .await; } } Err(e) => warn!("Failed to parse channel message: {}", e), @@ -146,7 +147,12 @@ pub(super) async fn handle_frame( /// local mesh peer pubkeys (or we can't tell), the inner payload is /// dispatched through the direct-message path so it lands in the right /// chat. Otherwise it's handled as a normal channel text/typed message. -async fn handle_channel_payload(state: &Arc, channel_idx: u8, payload: &[u8]) { +async fn handle_channel_payload( + state: &Arc, + channel_idx: u8, + payload: &[u8], + our_x25519_secret: &[u8; 32], +) { // DM-via-channel wrapper (text form): the channel text carries an // ASCII "@DM:" token somewhere in the body. We locate the // marker anywhere in the payload (the firmware auto-prepends the @@ -326,6 +332,36 @@ async fn handle_channel_payload(state: &Arc, channel_idx: u8, payload return; } + // Archipelago identity broadcast (`ARCHY:`): upsert the sender's real + // archipelago identity (DID + ed25519 + x25519) so trust-gating and + // encrypted DMs work over BOTH meshcore and Meshtastic — the latter + // otherwise only exposes synthetic node keys. Keyed by the archipelago + // pubkey (federation_peer_contact_id) so it MERGES with the federation- + // seeded peer instead of creating a duplicate chat thread. Not stored as + // a chat message. + if let Ok(text) = std::str::from_utf8(payload) { + if let Some((did, ed_hex, x_hex)) = + super::super::protocol::parse_identity_broadcast(text) + { + // Ignore our own identity echoed back by the radio/channel. + if ed_hex.eq_ignore_ascii_case(&state.our_ed_pubkey_hex) { + return; + } + let contact_id = super::super::federation_peer_contact_id(&ed_hex); + handle_identity_received( + contact_id, + 0, + &did, + &ed_hex, + &x_hex, + state, + our_x25519_secret, + ) + .await; + return; + } + } + // Regular channel broadcast (not DM-wrapped) let chan_contact_id = u32::MAX - (channel_idx as u32); let chan_name = format!("Channel {}", channel_idx); diff --git a/core/archipelago/src/mesh/listener/session.rs b/core/archipelago/src/mesh/listener/session.rs index 27786857..6e5448d4 100644 --- a/core/archipelago/src/mesh/listener/session.rs +++ b/core/archipelago/src/mesh/listener/session.rs @@ -363,9 +363,9 @@ pub(super) async fn run_mesh_session( state: &Arc, preferred_path: Option<&str>, our_did: &str, - _our_ed_pubkey_hex: &str, + our_ed_pubkey_hex: &str, our_x25519_secret: &[u8; 32], - _our_x25519_pubkey_hex: &str, + our_x25519_pubkey_hex: &str, server_name: Option<&str>, shutdown: &mut tokio::sync::watch::Receiver, cmd_rx: &mut mpsc::Receiver, @@ -424,6 +424,19 @@ pub(super) async fn run_mesh_session( warn!("Failed to send initial advert: {}", e); } + // Archipelago identity advert (`ARCHY:2:{ed}:{x25519}`): broadcast as channel + // text so peers can bind our radio presence to our DID + keys. The firmware + // advert alone carries the meshcore key (and nothing on Meshtastic), so this + // is what makes trust-gating + encrypted DMs work across BOTH transports. + let identity_advert = super::super::protocol::encode_identity_broadcast( + our_did, + our_ed_pubkey_hex, + our_x25519_pubkey_hex, + ); + if let Err(e) = device.send_channel_text(0, identity_advert.as_bytes()).await { + warn!("Failed to broadcast archipelago identity: {}", e); + } + // Fetch existing contacts from the device refresh_contacts(&mut device, state).await; @@ -491,6 +504,11 @@ pub(super) async fn run_mesh_session( } else { consecutive_write_failures = 0; } + // Re-broadcast archipelago identity so peers that joined since + // startup (or missed it) can bind our DID/keys. + if let Err(e) = device.send_channel_text(0, identity_advert.as_bytes()).await { + warn!("Failed to re-broadcast archipelago identity: {}", e); + } refresh_contacts(&mut device, state).await; } From 0947ecee11801553f67d895626a6d2e20bc54fcf Mon Sep 17 00:00:00 2001 From: archipelago Date: Wed, 17 Jun 2026 18:29:36 -0400 Subject: [PATCH 4/5] feat(mesh): assistant config RPCs + live toggle + Ollama detect (#50) Phase 2 backend. AssistantConfig is now live-updatable (RwLock) so the UI toggle applies without a listener restart. New RPCs: - mesh.assistant-status -> {enabled, model, trusted_only, default_model, ollama_detected, models[]} (probes local Ollama :11434/api/tags) - mesh.assistant-configure -> set enabled/model/trusted_only live + persist MeshService::assistant_config / configure_assistant. Compiles clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- core/archipelago/src/api/rpc/dispatcher.rs | 2 + .../archipelago/src/api/rpc/mesh/assistant.rs | 95 +++++++++++++++++++ core/archipelago/src/api/rpc/mesh/mod.rs | 1 + core/archipelago/src/mesh/listener/assist.rs | 5 +- core/archipelago/src/mesh/listener/decode.rs | 2 +- .../archipelago/src/mesh/listener/dispatch.rs | 2 +- core/archipelago/src/mesh/listener/mod.rs | 7 +- core/archipelago/src/mesh/mod.rs | 40 ++++++++ 8 files changed, 147 insertions(+), 7 deletions(-) create mode 100644 core/archipelago/src/api/rpc/mesh/assistant.rs diff --git a/core/archipelago/src/api/rpc/dispatcher.rs b/core/archipelago/src/api/rpc/dispatcher.rs index cdc65627..18eea939 100644 --- a/core/archipelago/src/api/rpc/dispatcher.rs +++ b/core/archipelago/src/api/rpc/dispatcher.rs @@ -381,6 +381,8 @@ impl RpcHandler { "mesh.deadman-status" => self.handle_mesh_deadman_status().await, "mesh.deadman-configure" => self.handle_mesh_deadman_configure(params).await, "mesh.deadman-checkin" => self.handle_mesh_deadman_checkin().await, + "mesh.assistant-status" => self.handle_mesh_assistant_status().await, + "mesh.assistant-configure" => self.handle_mesh_assistant_configure(params).await, "mesh.test-send" => self.handle_mesh_test_send(params).await, // Transport layer (unified routing) diff --git a/core/archipelago/src/api/rpc/mesh/assistant.rs b/core/archipelago/src/api/rpc/mesh/assistant.rs new file mode 100644 index 00000000..5b266434 --- /dev/null +++ b/core/archipelago/src/api/rpc/mesh/assistant.rs @@ -0,0 +1,95 @@ +//! Mesh-AI assistant RPCs (issue #50): read/update the local assistant config +//! and report whether a local Ollama is available (for the install deep-link). + +use super::super::RpcHandler; +use anyhow::Result; +use std::time::Duration; + +/// Default model when the node hasn't picked one (kept in sync with the mesh +/// assistant handler's `DEFAULT_MODEL`). +const DEFAULT_MODEL: &str = "qwen2.5-coder"; + +impl RpcHandler { + /// mesh.assistant-status — current settings + local Ollama availability. + pub(in crate::api::rpc) async fn handle_mesh_assistant_status( + &self, + ) -> Result { + let cfg = { + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + svc.assistant_config().await + }; + + let (ollama_detected, models) = detect_ollama().await; + Ok(serde_json::json!({ + "enabled": cfg.enabled, + "model": cfg.model, + "trusted_only": cfg.trusted_only, + "default_model": DEFAULT_MODEL, + "ollama_detected": ollama_detected, + "models": models, + })) + } + + /// mesh.assistant-configure — update assistant settings live. + /// Params: `enabled?: bool`, `trusted_only?: bool`, + /// `model?: string|null` (string sets, null clears to default, absent leaves). + pub(in crate::api::rpc) async fn handle_mesh_assistant_configure( + &self, + params: Option, + ) -> Result { + let params = params.unwrap_or_default(); + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + + let enabled = params.get("enabled").and_then(|v| v.as_bool()); + let trusted_only = params.get("trusted_only").and_then(|v| v.as_bool()); + // model: key present + string => set; present + null => clear; absent => leave + let model = if let Some(v) = params.get("model") { + Some(v.as_str().map(|s| s.to_string())) + } else { + None + }; + + svc.configure_assistant(enabled, model, trusted_only).await?; + let cfg = svc.assistant_config().await; + Ok(serde_json::json!({ + "enabled": cfg.enabled, + "model": cfg.model, + "trusted_only": cfg.trusted_only, + })) + } +} + +/// Probe the local Ollama HTTP API; return (detected, model_names). +async fn detect_ollama() -> (bool, Vec) { + let client = match reqwest::Client::builder() + .timeout(Duration::from_secs(2)) + .build() + { + Ok(c) => c, + Err(_) => return (false, Vec::new()), + }; + match client.get("http://localhost:11434/api/tags").send().await { + Ok(resp) if resp.status().is_success() => { + let json: serde_json::Value = resp.json().await.unwrap_or_default(); + let models = json + .get("models") + .and_then(|m| m.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|m| { + m.get("name").and_then(|n| n.as_str()).map(|s| s.to_string()) + }) + .collect() + }) + .unwrap_or_default(); + (true, models) + } + _ => (false, Vec::new()), + } +} diff --git a/core/archipelago/src/api/rpc/mesh/mod.rs b/core/archipelago/src/api/rpc/mesh/mod.rs index bd5b1405..fe497876 100644 --- a/core/archipelago/src/api/rpc/mesh/mod.rs +++ b/core/archipelago/src/api/rpc/mesh/mod.rs @@ -1,3 +1,4 @@ +mod assistant; mod bitcoin_ops; mod messaging; mod safety; diff --git a/core/archipelago/src/mesh/listener/assist.rs b/core/archipelago/src/mesh/listener/assist.rs index f1b25f64..e5f817b2 100644 --- a/core/archipelago/src/mesh/listener/assist.rs +++ b/core/archipelago/src/mesh/listener/assist.rs @@ -82,8 +82,9 @@ pub(super) async fn run_assist( prompt: prompt.clone(), }); + let configured_model = state.assistant.read().await.model.clone(); let model = model_override - .or_else(|| state.assistant.model.clone()) + .or(configured_model) .unwrap_or_else(|| DEFAULT_MODEL.to_string()); info!(from = asker, req_id, model = %model, "Answering AI query over mesh"); @@ -139,7 +140,7 @@ async fn is_sender_allowed(state: &Arc, sender_contact_id: u32) -> bo } } - if !state.assistant.trusted_only { + if !state.assistant.read().await.trusted_only { return true; } diff --git a/core/archipelago/src/mesh/listener/decode.rs b/core/archipelago/src/mesh/listener/decode.rs index 4bdf6d9f..cafa6ddf 100644 --- a/core/archipelago/src/mesh/listener/decode.rs +++ b/core/archipelago/src/mesh/listener/decode.rs @@ -357,7 +357,7 @@ pub(super) async fn store_plain_message( // channel is answered by this node's local model when the assistant is on. // Reply goes back as plain channel text so bare (non-archipelago) clients // see it. The trust/rate gate lives in run_assist. - if state.assistant.enabled { + if state.assistant.read().await.enabled { if let Some(prompt) = strip_ai_trigger(text) { if !prompt.is_empty() { let req_id = state.next_id().await; diff --git a/core/archipelago/src/mesh/listener/dispatch.rs b/core/archipelago/src/mesh/listener/dispatch.rs index 5a558ae0..21cf4be0 100644 --- a/core/archipelago/src/mesh/listener/dispatch.rs +++ b/core/archipelago/src/mesh/listener/dispatch.rs @@ -684,7 +684,7 @@ pub(crate) async fn handle_typed_envelope_direct( Some(MeshMessageType::AssistQuery) => { match message_types::decode_payload::(&envelope.v) { Ok(query) => { - if !state.assistant.enabled { + if !state.assistant.read().await.enabled { debug!( from = sender_contact_id, "AssistQuery ignored — assistant disabled on this node" diff --git a/core/archipelago/src/mesh/listener/mod.rs b/core/archipelago/src/mesh/listener/mod.rs index 384f8abb..3e02cbe2 100644 --- a/core/archipelago/src/mesh/listener/mod.rs +++ b/core/archipelago/src/mesh/listener/mod.rs @@ -123,8 +123,9 @@ pub struct MeshState { /// wiped. Persisted to `mesh-ignored-radio-contacts.json`. pub radio_contact_blocklist: RwLock>, /// Mesh-AI assistant settings (issue #50): whether this node answers - /// AssistQuery messages with its local LLM, and who may ask. - pub assistant: AssistantConfig, + /// AssistQuery messages with its local LLM, and who may ask. Live-updatable + /// so the UI toggle applies without restarting the listener. + pub assistant: RwLock, /// Data dir — lets dispatch handlers reach disk-backed stores (e.g. the /// federation trust list used to gate AI queries) without threading a path /// through every call. @@ -216,7 +217,7 @@ impl MeshState { our_ed_pubkey_hex, blob_store: RwLock::new(None), radio_contact_blocklist: RwLock::new(HashSet::new()), - assistant, + assistant: RwLock::new(assistant), data_dir, assist_inflight: RwLock::new(HashSet::new()), }); diff --git a/core/archipelago/src/mesh/mod.rs b/core/archipelago/src/mesh/mod.rs index 5452da06..48efca4f 100644 --- a/core/archipelago/src/mesh/mod.rs +++ b/core/archipelago/src/mesh/mod.rs @@ -1346,6 +1346,46 @@ impl MeshService { Ok(()) } + /// Current mesh-AI assistant settings (issue #50). + pub async fn assistant_config(&self) -> listener::AssistantConfig { + self.state.assistant.read().await.clone() + } + + /// Update the mesh-AI assistant settings live (no listener restart) and + /// persist them to the mesh config. `model: Some(None)` clears the override + /// (falls back to the built-in default); `None` leaves a field unchanged. + pub async fn configure_assistant( + &self, + enabled: Option, + model: Option>, + trusted_only: Option, + ) -> Result<()> { + { + let mut a = self.state.assistant.write().await; + if let Some(e) = enabled { + a.enabled = e; + } + if let Some(m) = model { + a.model = m; + } + if let Some(t) = trusted_only { + a.trusted_only = t; + } + } + // Persist by updating the on-disk config (the in-memory `self.config` + // snapshot stays as-is; the live `state.assistant` is the runtime + // source of truth and is re-seeded from disk on the next start). + let mut cfg = load_config(&self.data_dir).await.unwrap_or_default(); + { + let a = self.state.assistant.read().await; + cfg.assistant_enabled = a.enabled; + cfg.assistant_model = a.model.clone(); + cfg.assistant_trusted_only = a.trusted_only; + } + save_config(&self.data_dir, &cfg).await?; + Ok(()) + } + /// Update mesh configuration. pub async fn configure(&mut self, config: MeshConfig) -> Result<()> { save_config(&self.data_dir, &config).await?; From 7a76d32e4bd20f1e3edd1262a1b2fdb7aa04576b Mon Sep 17 00:00:00 2001 From: archipelago Date: Wed, 17 Jun 2026 19:19:32 -0400 Subject: [PATCH 5/5] feat(mesh): mesh-AI assistant scheduler + config panel (#50) Adds the assistant scheduler, MeshAssistantPanel UI, and the remaining config-RPC / live-toggle / Ollama-detect wiring on top of Phase 1.x. Co-Authored-By: Claude Opus 4.8 (1M context) --- core/archipelago/src/api/rpc/dispatcher.rs | 3 + .../archipelago/src/api/rpc/mesh/assistant.rs | 76 ++++++- core/archipelago/src/mesh/listener/assist.rs | 70 +++++- core/archipelago/src/mesh/listener/mod.rs | 4 +- core/archipelago/src/mesh/mod.rs | 40 ++++ core/archipelago/src/mesh/scheduler.rs | 213 ++++++++++++++++++ neode-ui/src/stores/mesh.ts | 80 +++++++ neode-ui/src/views/Mesh.vue | 44 ++-- .../src/views/mesh/MeshAssistantPanel.vue | 155 +++++++++++++ neode-ui/src/views/mesh/mesh-styles.css | 10 +- 10 files changed, 667 insertions(+), 28 deletions(-) create mode 100644 core/archipelago/src/mesh/scheduler.rs create mode 100644 neode-ui/src/views/mesh/MeshAssistantPanel.vue diff --git a/core/archipelago/src/api/rpc/dispatcher.rs b/core/archipelago/src/api/rpc/dispatcher.rs index 18eea939..6f519d38 100644 --- a/core/archipelago/src/api/rpc/dispatcher.rs +++ b/core/archipelago/src/api/rpc/dispatcher.rs @@ -383,6 +383,9 @@ impl RpcHandler { "mesh.deadman-checkin" => self.handle_mesh_deadman_checkin().await, "mesh.assistant-status" => self.handle_mesh_assistant_status().await, "mesh.assistant-configure" => self.handle_mesh_assistant_configure(params).await, + "mesh.schedule-message" => self.handle_mesh_schedule_message(params).await, + "mesh.list-scheduled" => self.handle_mesh_list_scheduled().await, + "mesh.cancel-scheduled" => self.handle_mesh_cancel_scheduled(params).await, "mesh.test-send" => self.handle_mesh_test_send(params).await, // Transport layer (unified routing) diff --git a/core/archipelago/src/api/rpc/mesh/assistant.rs b/core/archipelago/src/api/rpc/mesh/assistant.rs index 5b266434..f9f3e72d 100644 --- a/core/archipelago/src/api/rpc/mesh/assistant.rs +++ b/core/archipelago/src/api/rpc/mesh/assistant.rs @@ -23,12 +23,19 @@ impl RpcHandler { }; let (ollama_detected, models) = detect_ollama().await; + let claude_available = tokio::fs::metadata( + self.config.data_dir.join("secrets/claude-api-key"), + ) + .await + .is_ok(); Ok(serde_json::json!({ "enabled": cfg.enabled, "model": cfg.model, "trusted_only": cfg.trusted_only, + "backend": cfg.backend, "default_model": DEFAULT_MODEL, "ollama_detected": ollama_detected, + "claude_available": claude_available, "models": models, })) } @@ -48,6 +55,10 @@ impl RpcHandler { let enabled = params.get("enabled").and_then(|v| v.as_bool()); let trusted_only = params.get("trusted_only").and_then(|v| v.as_bool()); + let backend = params + .get("backend") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); // model: key present + string => set; present + null => clear; absent => leave let model = if let Some(v) = params.get("model") { Some(v.as_str().map(|s| s.to_string())) @@ -55,14 +66,77 @@ impl RpcHandler { None }; - svc.configure_assistant(enabled, model, trusted_only).await?; + svc.configure_assistant(enabled, model, trusted_only, backend) + .await?; let cfg = svc.assistant_config().await; Ok(serde_json::json!({ "enabled": cfg.enabled, "model": cfg.model, "trusted_only": cfg.trusted_only, + "backend": cfg.backend, })) } + + /// mesh.schedule-message — queue a message to send at a future time. + /// Params: `body: string`, `fire_at: i64` (unix secs), and one of + /// `contact_id: u32` (DM) or `channel: u8` (broadcast). + pub(in crate::api::rpc) async fn handle_mesh_schedule_message( + &self, + params: Option, + ) -> Result { + let p = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; + let body = p + .get("body") + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow::anyhow!("body is required"))? + .to_string(); + let fire_at = p + .get("fire_at") + .and_then(|v| v.as_i64()) + .ok_or_else(|| anyhow::anyhow!("fire_at (unix seconds) is required"))?; + let contact_id = p.get("contact_id").and_then(|v| v.as_u64()).map(|v| v as u32); + let channel = p.get("channel").and_then(|v| v.as_u64()).map(|v| v as u8); + if contact_id.is_none() && channel.is_none() { + anyhow::bail!("either contact_id or channel is required"); + } + + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let msg = svc.scheduler.add(contact_id, channel, body, fire_at).await?; + Ok(serde_json::to_value(msg)?) + } + + /// mesh.list-scheduled — list queued messages (sorted by fire time). + pub(in crate::api::rpc) async fn handle_mesh_list_scheduled( + &self, + ) -> Result { + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let messages = svc.scheduler.list().await; + Ok(serde_json::json!({ "messages": messages })) + } + + /// mesh.cancel-scheduled — remove a queued message by id. + pub(in crate::api::rpc) async fn handle_mesh_cancel_scheduled( + &self, + params: Option, + ) -> Result { + let id = params + .as_ref() + .and_then(|p| p.get("id")) + .and_then(|v| v.as_u64()) + .ok_or_else(|| anyhow::anyhow!("id is required"))?; + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let cancelled = svc.scheduler.cancel(id).await?; + Ok(serde_json::json!({ "cancelled": cancelled })) + } } /// Probe the local Ollama HTTP API; return (detected, model_names). diff --git a/core/archipelago/src/mesh/listener/assist.rs b/core/archipelago/src/mesh/listener/assist.rs index e5f817b2..f53125ff 100644 --- a/core/archipelago/src/mesh/listener/assist.rs +++ b/core/archipelago/src/mesh/listener/assist.rs @@ -11,6 +11,7 @@ use super::super::message_types::{self, AssistResponsePayload, MeshMessageType}; use super::bitcoin::send_to_peer; use super::{MeshCommand, MeshState}; use crate::federation::TrustLevel; +use std::path::Path; use std::sync::Arc; use std::time::Duration; use tracing::{info, warn}; @@ -19,6 +20,10 @@ use tracing::{info, warn}; const OLLAMA_URL: &str = "http://localhost:11434/api/generate"; /// Default model when the node hasn't configured one (matches Meshroller). const DEFAULT_MODEL: &str = "qwen2.5-coder"; +/// Anthropic Messages API (called with the shared proxy token). +const CLAUDE_URL: &str = "https://api.anthropic.com/v1/messages"; +/// Default Claude model — Haiku 4.5: fast + cheap, ideal for short mesh answers. +const CLAUDE_DEFAULT_MODEL: &str = "claude-haiku-4-5-20251001"; /// Max time to wait on the model before giving up. const OLLAMA_TIMEOUT: Duration = Duration::from_secs(60); /// Hard cap on answer length sent over the radio — keeps airtime sane. @@ -82,14 +87,23 @@ pub(super) async fn run_assist( prompt: prompt.clone(), }); - let configured_model = state.assistant.read().await.model.clone(); + let (backend, configured_model) = { + let a = state.assistant.read().await; + (a.backend.clone(), a.model.clone()) + }; + let is_claude = backend == "claude"; + let default_model = if is_claude { CLAUDE_DEFAULT_MODEL } else { DEFAULT_MODEL }; let model = model_override .or(configured_model) - .unwrap_or_else(|| DEFAULT_MODEL.to_string()); + .unwrap_or_else(|| default_model.to_string()); - info!(from = asker, req_id, model = %model, "Answering AI query over mesh"); + info!(from = asker, req_id, backend = %backend, model = %model, "Answering AI query over mesh"); - let result = call_ollama(&model, &prompt).await; + let result = if is_claude { + call_claude(&state.data_dir, &model, &prompt).await + } else { + call_ollama(&model, &prompt).await + }; match result { Ok(answer) => { @@ -283,3 +297,51 @@ async fn call_ollama(model: &str, prompt: &str) -> anyhow::Result { } Ok(text) } + +/// Call Claude via the Anthropic Messages API using the node's shared proxy +/// token at `secrets/claude-api-key`. Keeps answers short for radio airtime. +async fn call_claude(data_dir: &Path, model: &str, prompt: &str) -> anyhow::Result { + let key = tokio::fs::read_to_string(data_dir.join("secrets/claude-api-key")) + .await + .map_err(|_| anyhow::anyhow!("Claude API key not configured on this node"))?; + let key = key.trim(); + if key.is_empty() { + anyhow::bail!("Claude API key is empty"); + } + let client = reqwest::Client::builder().timeout(OLLAMA_TIMEOUT).build()?; + let body = serde_json::json!({ + "model": model, + "max_tokens": 512, + "system": "You answer questions over a low-bandwidth radio mesh. Reply in at most two short sentences. No markdown, no preamble.", + "messages": [{ "role": "user", "content": prompt }], + }); + let resp = client + .post(CLAUDE_URL) + .header("x-api-key", key) + .header("anthropic-version", "2023-06-01") + .header("content-type", "application/json") + .json(&body) + .send() + .await?; + if !resp.status().is_success() { + let status = resp.status(); + let txt = resp.text().await.unwrap_or_default(); + anyhow::bail!( + "Claude API HTTP {}: {}", + status, + txt.chars().take(180).collect::() + ); + } + let json: serde_json::Value = resp.json().await?; + // `content` is an array of blocks; take the first text block. + let text = json + .get("content") + .and_then(|c| c.as_array()) + .and_then(|arr| arr.iter().find_map(|b| b.get("text").and_then(|t| t.as_str()))) + .unwrap_or("") + .to_string(); + if text.trim().is_empty() { + anyhow::bail!("Claude returned an empty response"); + } + Ok(text) +} diff --git a/core/archipelago/src/mesh/listener/mod.rs b/core/archipelago/src/mesh/listener/mod.rs index 3e02cbe2..4c67296f 100644 --- a/core/archipelago/src/mesh/listener/mod.rs +++ b/core/archipelago/src/mesh/listener/mod.rs @@ -140,10 +140,12 @@ pub struct MeshState { pub struct AssistantConfig { /// Answer AssistQuery messages with the local LLM. pub enabled: bool, - /// Ollama model to use; None → the built-in default. + /// Model to use; None → the backend's built-in default. pub model: Option, /// Restrict asking to federation-Trusted peers (vs. anyone on the mesh). pub trusted_only: bool, + /// AI backend: "claude" (shared proxy token) or "ollama" (local model). + pub backend: String, } /// Contact metadata kept alongside MeshState.peers. Pinned contacts sort to diff --git a/core/archipelago/src/mesh/mod.rs b/core/archipelago/src/mesh/mod.rs index 48efca4f..2cedf228 100644 --- a/core/archipelago/src/mesh/mod.rs +++ b/core/archipelago/src/mesh/mod.rs @@ -14,6 +14,7 @@ pub mod message_types; pub mod outbox; pub mod protocol; pub mod ratchet; +pub mod scheduler; pub mod serial; pub mod session; pub mod steganography; @@ -187,6 +188,15 @@ pub struct MeshConfig { /// any peer on the mesh may ask (spends this node's compute + airtime). #[serde(default = "default_true")] pub assistant_trusted_only: bool, + /// Which AI backend answers queries: "claude" (the shared Claude proxy + /// token at secrets/claude-api-key — default for now, works without a + /// local GPU) or "ollama" (a local model on this node). + #[serde(default = "default_assistant_backend")] + pub assistant_backend: String, +} + +fn default_assistant_backend() -> String { + "claude".to_string() } fn default_true() -> bool { @@ -208,6 +218,7 @@ impl Default for MeshConfig { assistant_enabled: false, assistant_model: None, assistant_trusted_only: true, + assistant_backend: default_assistant_backend(), } } } @@ -336,6 +347,7 @@ pub struct MeshService { deadman_handle: Option>, block_announcer_handle: Option>, presence_handle: Option>, + scheduler_handle: Option>, cmd_rx: Option>, // Crypto identity for this node our_did: String, @@ -349,6 +361,8 @@ pub struct MeshService { pub block_header_cache: Arc, pub relay_tracker: Arc, pub dead_man_switch: Arc, + /// Scheduled / queued outbound mesh messages (issue #50, phase 1.7). + pub scheduler: Arc, } impl MeshService { @@ -380,6 +394,7 @@ impl MeshService { enabled: config.assistant_enabled, model: config.assistant_model.clone(), trusted_only: config.assistant_trusted_only, + backend: config.assistant_backend.clone(), }, data_dir.to_path_buf(), ); @@ -438,6 +453,7 @@ impl MeshService { deadman_handle: None, block_announcer_handle: None, presence_handle: None, + scheduler_handle: None, cmd_rx: Some(cmd_rx), our_did: did.to_string(), our_ed_pubkey_hex: ed_pubkey_hex.to_string(), @@ -448,6 +464,7 @@ impl MeshService { block_header_cache, relay_tracker, dead_man_switch, + scheduler: Arc::new(scheduler::MeshScheduler::load(data_dir).await), }) } @@ -525,6 +542,20 @@ impl MeshService { }); self.deadman_handle = Some(dms_handle); + // Scheduled-message task (issue #50, phase 1.7): fires queued messages + // when due, retrying peer DMs until the peer is back in range. + let sched = Arc::clone(&self.scheduler); + let sched_state = Arc::clone(&self.state); + let sched_shutdown = self + .shutdown_tx + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Shutdown channel not initialized"))? + .subscribe(); + let sched_handle = tokio::spawn(async move { + scheduler::run_scheduler(sched, sched_state, sched_shutdown).await; + }); + self.scheduler_handle = Some(sched_handle); + // Spawn block header announcer (internet-connected nodes only) if self.config.announce_block_headers { let bha_state = Arc::clone(&self.state); @@ -675,6 +706,10 @@ impl MeshService { handle.abort(); let _ = handle.await; } + if let Some(handle) = self.scheduler_handle.take() { + handle.abort(); + let _ = handle.await; + } if let Some(handle) = self.block_announcer_handle.take() { handle.abort(); let _ = handle.await; @@ -1359,6 +1394,7 @@ impl MeshService { enabled: Option, model: Option>, trusted_only: Option, + backend: Option, ) -> Result<()> { { let mut a = self.state.assistant.write().await; @@ -1371,6 +1407,9 @@ impl MeshService { if let Some(t) = trusted_only { a.trusted_only = t; } + if let Some(b) = backend { + a.backend = b; + } } // Persist by updating the on-disk config (the in-memory `self.config` // snapshot stays as-is; the live `state.assistant` is the runtime @@ -1381,6 +1420,7 @@ impl MeshService { cfg.assistant_enabled = a.enabled; cfg.assistant_model = a.model.clone(); cfg.assistant_trusted_only = a.trusted_only; + cfg.assistant_backend = a.backend.clone(); } save_config(&self.data_dir, &cfg).await?; Ok(()) diff --git a/core/archipelago/src/mesh/scheduler.rs b/core/archipelago/src/mesh/scheduler.rs new file mode 100644 index 00000000..0778749b --- /dev/null +++ b/core/archipelago/src/mesh/scheduler.rs @@ -0,0 +1,213 @@ +//! Scheduled / queued mesh messages (issue #50, phase 1.7). +//! +//! A small persisted queue of messages to send at a future time. A background +//! task fires due messages via the listener. A message addressed to a peer that +//! isn't currently in the contact table stays queued and retries on later ticks +//! — i.e. it sends itself when the peer comes back in range. + +use super::listener::{MeshCommand, MeshState}; +use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tokio::fs; +use tokio::sync::{watch, RwLock}; +use tracing::warn; + +const SCHEDULER_FILE: &str = "mesh-scheduled.json"; +/// Wake interval for firing due messages. +const TICK_SECS: u64 = 10; +/// Drop a still-undeliverable message after this many attempts (~1h at 10s). +const MAX_ATTEMPTS: u32 = 360; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScheduledMessage { + pub id: u64, + /// Direct-message target (peer contact_id), or None for a channel broadcast. + #[serde(default)] + pub contact_id: Option, + /// Channel to broadcast on, or None for a direct message. + #[serde(default)] + pub channel: Option, + pub body: String, + /// Unix seconds when the message becomes due. + pub fire_at: i64, + #[serde(default)] + pub attempts: u32, +} + +pub struct MeshScheduler { + path: PathBuf, + queue: RwLock>, + next_id: RwLock, +} + +impl MeshScheduler { + pub async fn load(data_dir: &Path) -> Self { + let path = data_dir.join(SCHEDULER_FILE); + let queue: Vec = match fs::read_to_string(&path).await { + Ok(s) => serde_json::from_str(&s).unwrap_or_default(), + Err(_) => Vec::new(), + }; + let next = queue.iter().map(|m| m.id).max().unwrap_or(0) + 1; + Self { + path, + queue: RwLock::new(queue), + next_id: RwLock::new(next), + } + } + + async fn save(&self) -> Result<()> { + let json = { + let q = self.queue.read().await; + serde_json::to_string_pretty(&*q).context("serialize scheduled queue")? + }; + if let Some(parent) = self.path.parent() { + fs::create_dir_all(parent).await.ok(); + } + fs::write(&self.path, json) + .await + .context("write scheduled queue")?; + Ok(()) + } + + pub async fn add( + &self, + contact_id: Option, + channel: Option, + body: String, + fire_at: i64, + ) -> Result { + let id = { + let mut n = self.next_id.write().await; + let id = *n; + *n += 1; + id + }; + let msg = ScheduledMessage { + id, + contact_id, + channel, + body, + fire_at, + attempts: 0, + }; + self.queue.write().await.push(msg.clone()); + self.save().await?; + Ok(msg) + } + + pub async fn list(&self) -> Vec { + let mut v = self.queue.read().await.clone(); + v.sort_by_key(|m| m.fire_at); + v + } + + pub async fn cancel(&self, id: u64) -> Result { + let removed = { + let mut q = self.queue.write().await; + let before = q.len(); + q.retain(|m| m.id != id); + q.len() != before + }; + if removed { + self.save().await?; + } + Ok(removed) + } +} + +/// Background loop: every `TICK_SECS`, fire any due messages. +pub async fn run_scheduler( + scheduler: Arc, + state: Arc, + mut shutdown: watch::Receiver, +) { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(TICK_SECS)); + loop { + tokio::select! { + _ = interval.tick() => fire_due(&scheduler, &state).await, + _ = shutdown.changed() => { + if *shutdown.borrow() { return; } + } + } + } +} + +async fn fire_due(scheduler: &Arc, state: &Arc) { + let now = chrono::Utc::now().timestamp(); + let due: Vec = scheduler + .queue + .read() + .await + .iter() + .filter(|m| m.fire_at <= now) + .cloned() + .collect(); + if due.is_empty() { + return; + } + + let mut delivered: Vec = Vec::new(); + let mut failed: Vec = Vec::new(); + for msg in &due { + if try_send(state, msg).await { + delivered.push(msg.id); + } else { + failed.push(msg.id); + } + } + + let mut to_remove = delivered; + { + let mut q = scheduler.queue.write().await; + for m in q.iter_mut() { + if failed.contains(&m.id) { + m.attempts += 1; + if m.attempts >= MAX_ATTEMPTS { + warn!(id = m.id, attempts = m.attempts, "Dropping undeliverable scheduled message"); + to_remove.push(m.id); + } + } + } + q.retain(|m| !to_remove.contains(&m.id)); + } + let _ = scheduler.save().await; +} + +/// Hand a due message to the radio. Returns true if it was sent (or should be +/// dropped); false to keep it queued for a later retry (peer not in range yet). +async fn try_send(state: &Arc, msg: &ScheduledMessage) -> bool { + let payload = msg.body.clone().into_bytes(); + if let Some(channel) = msg.channel { + return state + .send_cmd(MeshCommand::BroadcastChannel { channel, payload }) + .await + .is_ok(); + } + if let Some(contact_id) = msg.contact_id { + let pubkey = { + let peers = state.peers.read().await; + peers.get(&contact_id).and_then(|p| p.pubkey_hex.clone()) + }; + if let Some(pk) = pubkey { + if let Ok(bytes) = hex::decode(&pk) { + if bytes.len() >= 6 { + let mut dest = [0u8; 6]; + dest.copy_from_slice(&bytes[..6]); + return state + .send_cmd(MeshCommand::SendText { + dest_pubkey_prefix: dest, + payload, + }) + .await + .is_ok(); + } + } + } + // Peer unknown / not in range yet — keep queued, retry next tick. + return false; + } + warn!("Scheduled message has neither channel nor contact_id — dropping"); + true +} diff --git a/neode-ui/src/stores/mesh.ts b/neode-ui/src/stores/mesh.ts index 8e4517a1..07697e6d 100644 --- a/neode-ui/src/stores/mesh.ts +++ b/neode-ui/src/stores/mesh.ts @@ -122,6 +122,26 @@ export interface BlockHeader { announced_by: string } +export interface AssistantStatus { + enabled: boolean + model: string | null + trusted_only: boolean + backend: string + default_model: string + ollama_detected: boolean + claude_available: boolean + models: string[] +} + +export interface ScheduledMessage { + id: number + contact_id: number | null + channel: number | null + body: string + fire_at: number + attempts: number +} + export interface NodePosition { lat: number lng: number @@ -574,6 +594,59 @@ export const useMeshStore = defineStore('mesh', () => { const blockHeaders = ref([]) const latestBlockHeight = ref(0) + // Mesh-AI assistant (issue #50) + const assistantStatus = ref(null) + + async function fetchAssistantStatus() { + try { + assistantStatus.value = await rpcClient.call({ method: 'mesh.assistant-status' }) + } catch { + // Assistant not available (mesh service down) + } + } + + async function configureAssistant(config: { + enabled?: boolean + model?: string | null + trusted_only?: boolean + backend?: string + }) { + const res = await rpcClient.call>({ + method: 'mesh.assistant-configure', + params: config, + }) + await fetchAssistantStatus() + return res + } + + // Scheduled / queued mesh messages (issue #50, phase 1.7) + const scheduledMessages = ref([]) + + async function fetchScheduledMessages() { + try { + const res = await rpcClient.call<{ messages: ScheduledMessage[] }>({ method: 'mesh.list-scheduled' }) + scheduledMessages.value = res.messages || [] + } catch { + scheduledMessages.value = [] + } + } + + async function scheduleMessage(params: { + contact_id?: number + channel?: number + body: string + fire_at: number + }) { + const res = await rpcClient.call({ method: 'mesh.schedule-message', params }) + await fetchScheduledMessages() + return res + } + + async function cancelScheduledMessage(id: number) { + await rpcClient.call({ method: 'mesh.cancel-scheduled', params: { id } }) + await fetchScheduledMessages() + } + async function fetchDeadmanStatus() { try { deadmanStatus.value = await rpcClient.call({ method: 'mesh.deadman-status' }) @@ -693,6 +766,13 @@ export const useMeshStore = defineStore('mesh', () => { fetchDeadmanStatus, configureDeadman, deadmanCheckin, + assistantStatus, + fetchAssistantStatus, + configureAssistant, + scheduledMessages, + fetchScheduledMessages, + scheduleMessage, + cancelScheduledMessage, fetchBlockHeaders, relayTransaction, relayLightning, diff --git a/neode-ui/src/views/Mesh.vue b/neode-ui/src/views/Mesh.vue index 3d896a8b..61d8ab14 100644 --- a/neode-ui/src/views/Mesh.vue +++ b/neode-ui/src/views/Mesh.vue @@ -8,6 +8,7 @@ import AnimatedLogo from '@/components/AnimatedLogo.vue' import MeshMap from '@/components/MeshMap.vue' import MeshBitcoinPanel from '@/views/mesh/MeshBitcoinPanel.vue' import MeshDeadmanPanel from '@/views/mesh/MeshDeadmanPanel.vue' +import MeshAssistantPanel from '@/views/mesh/MeshAssistantPanel.vue' import { rpcClient } from '@/api/rpc-client' import '@/views/mesh/mesh-styles.css' @@ -253,10 +254,10 @@ async function clearAllMesh() { } // Phase 4: Off-grid Bitcoin + Dead Man's Switch -const activeTab = ref<'chat' | 'bitcoin' | 'deadman' | 'map'>('chat') +const activeTab = ref<'chat' | 'bitcoin' | 'deadman' | 'assistant' | 'map'>('chat') // Tools tab for 3rd column on wide desktop and mobile below-chat -const toolsTab = ref<'bitcoin' | 'deadman' | 'map'>('bitcoin') +const toolsTab = ref<'bitcoin' | 'deadman' | 'assistant' | 'map'>('bitcoin') // Panel visibility computeds const showChatPanel = computed(() => @@ -274,6 +275,12 @@ const showDeadmanPanel = computed(() => { if (isMobile.value && !mobileShowChat.value) return toolsTab.value === 'deadman' return activeTab.value === 'deadman' }) +const showAssistantPanel = computed(() => { + if (isVeryWideDesktop.value) return true + if (isWideDesktop.value) return toolsTab.value === 'assistant' + if (isMobile.value && !mobileShowChat.value) return toolsTab.value === 'assistant' + return activeTab.value === 'assistant' +}) const showMapPanel = computed(() => { if (isVeryWideDesktop.value) return true if (isWideDesktop.value) return toolsTab.value === 'map' @@ -1478,17 +1485,16 @@ function isImageMime(mime?: string): boolean {
- + +
@@ -1764,21 +1770,19 @@ function isImageMime(mime?: string): boolean {
- + - +
+
@@ -1786,22 +1790,20 @@ function isImageMime(mime?: string): boolean {
- + - +
+
diff --git a/neode-ui/src/views/mesh/MeshAssistantPanel.vue b/neode-ui/src/views/mesh/MeshAssistantPanel.vue new file mode 100644 index 00000000..c87ae8be --- /dev/null +++ b/neode-ui/src/views/mesh/MeshAssistantPanel.vue @@ -0,0 +1,155 @@ + + + diff --git a/neode-ui/src/views/mesh/mesh-styles.css b/neode-ui/src/views/mesh/mesh-styles.css index 50e8ec90..101fd465 100644 --- a/neode-ui/src/views/mesh/mesh-styles.css +++ b/neode-ui/src/views/mesh/mesh-styles.css @@ -47,9 +47,11 @@ .mesh-columns-very-wide .mesh-tools-wrapper { display: grid; grid-template-rows: minmax(0, 1fr) minmax(0, 0.85fr) minmax(0, 1fr); gap: 12px; overflow: hidden; } .mesh-columns-very-wide .mesh-tools-wrapper .mesh-bitcoin-panel, .mesh-columns-very-wide .mesh-tools-wrapper .mesh-deadman-panel, +.mesh-columns-very-wide .mesh-tools-wrapper .mesh-assistant-panel, .mesh-columns-very-wide .mesh-tools-wrapper .mesh-map-panel { min-height: 0; height: 100%; overflow: hidden; } .mesh-columns-wide:not(.mesh-columns-very-wide) .mesh-tools-wrapper .mesh-bitcoin-panel, .mesh-columns-wide:not(.mesh-columns-very-wide) .mesh-tools-wrapper .mesh-deadman-panel, +.mesh-columns-wide:not(.mesh-columns-very-wide) .mesh-tools-wrapper .mesh-assistant-panel, .mesh-columns-wide:not(.mesh-columns-very-wide) .mesh-tools-wrapper .mesh-map-panel { flex: 1 1 auto; min-height: 0; height: auto; } .mesh-columns-very-wide .mesh-tools-tab-bar { display: none; } .mesh-columns-wide .mesh-mobile-back-btn, @@ -154,12 +156,14 @@ .mesh-mobile-tools { margin-top: 12px; display: flex; flex-direction: column; gap: 12px; } .mesh-mobile-tools .mesh-tools-tab-bar { display: flex; gap: 2px; background: rgba(0,0,0,0.3); border-radius: 10px; padding: 3px; } .mesh-mobile-tools :deep(.mesh-bitcoin-panel), + .mesh-mobile-tools :deep(.mesh-assistant-panel), .mesh-mobile-tools :deep(.mesh-deadman-panel) { min-height: 320px; max-height: min(68dvh, 620px); overflow-y: auto; } .mesh-mobile-tools .mesh-map-panel { min-height: 360px; max-height: min(68dvh, 620px); overflow: hidden; } .mesh-status-grid { grid-template-columns: repeat(2, 1fr); } .mesh-chat-back { display: block; } .mobile-hidden { display: none !important; } :deep(.mesh-bitcoin-panel), + :deep(.mesh-assistant-panel), :deep(.mesh-deadman-panel) { flex: none; cursor: pointer; flex-shrink: 0; } .mesh-mobile-back-btn:hover { color: rgba(255, 255, 255, 0.9); } } @@ -223,7 +227,11 @@ /* Bitcoin & Deadman panels (child components) */ .mesh-bitcoin-panel, -.mesh-deadman-panel { padding: 16px; display: flex; flex-direction: column; gap: 12px; flex: 1; min-height: 0; overflow-y: auto; } +.mesh-deadman-panel, +.mesh-assistant-panel { padding: 16px; display: flex; flex-direction: column; gap: 12px; flex: 1; min-height: 0; overflow-y: auto; } +.mesh-assistant-field { display: flex; flex-direction: column; gap: 4px; } +.mesh-assistant-install { padding: 12px; background: rgba(251,146,60,0.08); border: 1px solid rgba(251,146,60,0.25); border-radius: 10px; } +.mesh-assistant-install-btn { display: inline-block; text-align: center; padding: 8px 14px; font-size: 0.8rem; } .mesh-panel-title { font-size: 1rem; font-weight: 700; color: rgba(255,255,255,0.95); margin: 0; } .mesh-panel-sub { font-size: 0.8rem; color: rgba(255,255,255,0.45); margin: -4px 0 0; } .mesh-bitcoin-section { display: flex; flex-direction: column; gap: 8px; }