From 87d0d532057e400722c188ded0c69419de5abd74 Mon Sep 17 00:00:00 2001 From: archipelago Date: Wed, 17 Jun 2026 17:59:03 -0400 Subject: [PATCH] =?UTF-8?q?feat(mesh):=20assistant=20Phase=201.5=20?= =?UTF-8?q?=E2=80=94=20!ai=20channel=20trigger=20(issue=20#50)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A plain '!ai ' / '!ask ' on the channel is now answered by the node's local model and broadcast back as plain text, so ANY client (bare meshcore or Meshtastic) can ask. Generalised run_assist with an AssistReply target: Typed chunks to a peer (archipelago UI path) vs plain channel-text (bare clients). Trust/rate gate unchanged; asker identity is separate from reply mode. Works over both radios. Co-Authored-By: Claude Opus 4.8 (1M context) --- core/archipelago/src/mesh/listener/assist.rs | 158 ++++++++++++------ core/archipelago/src/mesh/listener/decode.rs | 39 +++++ .../archipelago/src/mesh/listener/dispatch.rs | 17 +- 3 files changed, 157 insertions(+), 57 deletions(-) diff --git a/core/archipelago/src/mesh/listener/assist.rs b/core/archipelago/src/mesh/listener/assist.rs index cd5b5529..f1b25f64 100644 --- a/core/archipelago/src/mesh/listener/assist.rs +++ b/core/archipelago/src/mesh/listener/assist.rs @@ -7,9 +7,9 @@ //! ordered chunks. Airtime is scarce, so the reply is length-capped and each //! asker is limited to one in-flight query. -use super::super::message_types::{self, AssistQueryPayload, AssistResponsePayload, MeshMessageType}; +use super::super::message_types::{self, AssistResponsePayload, MeshMessageType}; use super::bitcoin::send_to_peer; -use super::MeshState; +use super::{MeshCommand, MeshState}; use crate::federation::TrustLevel; use std::sync::Arc; use std::time::Duration; @@ -25,26 +25,44 @@ const OLLAMA_TIMEOUT: Duration = Duration::from_secs(60); const MAX_REPLY_CHARS: usize = 480; /// Characters of answer text per `AssistResponse` chunk. const CHUNK_CHARS: usize = 160; +/// Tighter cap for plain-text channel replies (bare `!ai` clients) — these +/// aren't reassembled by an archipelago UI, so keep them to a couple frames. +const CHANNEL_REPLY_CHARS: usize = 200; -/// Entry point: gate the query, run the model, send the answer back. -/// Spawned from the dispatch `AssistQuery` arm so the radio loop never blocks. +/// Where an answer should go. +pub(super) enum AssistReply { + /// Typed `AssistResponse` chunks addressed to one peer — the archipelago + /// UI path (rich, reassembled, correlated by `req_id`). + Typed { contact_id: u32 }, + /// Plain-text broadcast on a mesh channel — the bare `!ai` path, so any + /// client (including non-archipelago meshcore/Meshtastic nodes) sees it. + ChannelText { channel: u8 }, +} + +/// Entry point: gate the query, run the model, send the answer back via the +/// requested reply path. Spawned off the radio loop so it never blocks. pub(super) async fn run_assist( - payload: AssistQueryPayload, - sender_contact_id: u32, + prompt: String, + model_override: Option, + req_id: u64, + asker_contact_id: u32, sender_name: String, + reply: AssistReply, state: Arc, ) { + let asker = asker_contact_id; + // Trust + block gate. - if !is_sender_allowed(&state, sender_contact_id).await { + if !is_sender_allowed(&state, asker).await { warn!( - from = sender_contact_id, + from = asker, name = %sender_name, "AssistQuery denied — sender not permitted by assistant policy" ); // Silent on the wire (no airtime spent on denials); surface to the UI. let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady { - req_id: payload.req_id, - to_contact_id: sender_contact_id, + req_id, + to_contact_id: asker, error: Some("denied".to_string()), }); return; @@ -53,54 +71,46 @@ pub(super) async fn run_assist( // One in-flight query per asker. { let mut inflight = state.assist_inflight.write().await; - if !inflight.insert(sender_contact_id) { - warn!(from = sender_contact_id, "AssistQuery dropped — asker already has one in flight"); + if !inflight.insert(asker) { + warn!(from = asker, "AssistQuery dropped — asker already has one in flight"); return; } } let _ = state.event_tx.send(super::super::types::MeshEvent::AssistQueryReceived { - from_contact_id: sender_contact_id, - prompt: payload.prompt.clone(), + from_contact_id: asker, + prompt: prompt.clone(), }); - let model = payload - .model - .clone() + let model = model_override .or_else(|| state.assistant.model.clone()) .unwrap_or_else(|| DEFAULT_MODEL.to_string()); - info!( - from = sender_contact_id, - req_id = payload.req_id, - model = %model, - "Answering AI query over mesh" - ); + info!(from = asker, req_id, model = %model, "Answering AI query over mesh"); - let result = call_ollama(&model, &payload.prompt).await; + let result = call_ollama(&model, &prompt).await; match result { Ok(answer) => { - let (text, _truncated) = cap_reply(&answer); - send_answer_chunks(&state, sender_contact_id, payload.req_id, &text).await; + send_reply(&state, &reply, req_id, &answer).await; let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady { - req_id: payload.req_id, - to_contact_id: sender_contact_id, + req_id, + to_contact_id: asker, error: None, }); } Err(e) => { - warn!(req_id = payload.req_id, "AI query failed: {}", e); - send_error(&state, sender_contact_id, payload.req_id, "AI unavailable").await; + warn!(req_id, "AI query failed: {}", e); + send_failure(&state, &reply, req_id, "AI unavailable").await; let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady { - req_id: payload.req_id, - to_contact_id: sender_contact_id, + req_id, + to_contact_id: asker, error: Some(e.to_string()), }); } } - state.assist_inflight.write().await.remove(&sender_contact_id); + state.assist_inflight.write().await.remove(&asker); } /// Whether `sender_contact_id` may invoke the assistant under the node's policy. @@ -154,14 +164,42 @@ fn cap_reply(answer: &str) -> (String, bool) { (format!("{capped}…(truncated)"), true) } +/// Send a successful answer via the requested reply path. +async fn send_reply(state: &Arc, reply: &AssistReply, req_id: u64, answer: &str) { + match reply { + AssistReply::Typed { contact_id } => { + let (text, _) = cap_reply(answer); + send_typed_chunks(state, *contact_id, req_id, &text).await; + } + AssistReply::ChannelText { channel } => { + let text = cap_channel(answer); + send_channel_text(state, *channel, &text).await; + } + } +} + +/// Send a failure notice via the requested reply path. +async fn send_failure(state: &Arc, reply: &AssistReply, req_id: u64, msg: &str) { + match reply { + AssistReply::Typed { contact_id } => { + let payload = AssistResponsePayload { + req_id, + text: String::new(), + seq: 0, + done: true, + error: Some(msg.to_string()), + }; + send_typed_response(state, *contact_id, &payload).await; + } + AssistReply::ChannelText { channel } => { + send_channel_text(state, *channel, &format!("AI: {msg}")).await; + } + } +} + /// Split the answer into ordered `AssistResponse` chunks and send each back to -/// the asker on the encrypted, peer-addressed path. -async fn send_answer_chunks( - state: &Arc, - dest_contact_id: u32, - req_id: u64, - text: &str, -) { +/// the asker on the encrypted, peer-addressed path (archipelago UI path). +async fn send_typed_chunks(state: &Arc, dest_contact_id: u32, req_id: u64, text: &str) { let chars: Vec = text.chars().collect(); let chunks: Vec = if chars.is_empty() { vec![String::new()] @@ -177,24 +215,16 @@ async fn send_answer_chunks( done: i == last, error: None, }; - send_response(state, dest_contact_id, &payload).await; + send_typed_response(state, dest_contact_id, &payload).await; } } -/// Send a single failure `AssistResponse` (one chunk, done, error set). -async fn send_error(state: &Arc, dest_contact_id: u32, req_id: u64, msg: &str) { - let payload = AssistResponsePayload { - req_id, - text: String::new(), - seq: 0, - done: true, - error: Some(msg.to_string()), - }; - send_response(state, dest_contact_id, &payload).await; -} - /// Encode an `AssistResponse` payload and send it to a peer. -async fn send_response(state: &Arc, dest_contact_id: u32, payload: &AssistResponsePayload) { +async fn send_typed_response( + state: &Arc, + dest_contact_id: u32, + payload: &AssistResponsePayload, +) { let bytes = match message_types::encode_payload(payload) { Ok(b) => b, Err(e) => { @@ -209,6 +239,26 @@ async fn send_response(state: &Arc, dest_contact_id: u32, payload: &A } } +/// Broadcast a plain-text answer on a channel for bare `!ai` clients. +async fn send_channel_text(state: &Arc, channel: u8, text: &str) { + let _ = state + .send_cmd(MeshCommand::BroadcastChannel { + channel, + payload: text.as_bytes().to_vec(), + }) + .await; +} + +/// Cap a plain-text channel reply to a couple of frames. +fn cap_channel(answer: &str) -> String { + let trimmed = answer.trim(); + if trimmed.chars().count() <= CHANNEL_REPLY_CHARS { + return format!("AI: {trimmed}"); + } + let capped: String = trimmed.chars().take(CHANNEL_REPLY_CHARS).collect(); + format!("AI: {capped}…") +} + /// Call the local Ollama model and return the generated text. async fn call_ollama(model: &str, prompt: &str) -> anyhow::Result { let client = reqwest::Client::builder().timeout(OLLAMA_TIMEOUT).build()?; diff --git a/core/archipelago/src/mesh/listener/decode.rs b/core/archipelago/src/mesh/listener/decode.rs index 645e4927..4bdf6d9f 100644 --- a/core/archipelago/src/mesh/listener/decode.rs +++ b/core/archipelago/src/mesh/listener/decode.rs @@ -352,6 +352,45 @@ pub(super) async fn store_plain_message( state.store_message(msg.clone()).await; state.status.write().await.messages_received += 1; let _ = state.event_tx.send(MeshEvent::MessageReceived(msg)); + + // Mesh-AI assistant (issue #50): a plain `!ai`/`!ask ` on the + // channel is answered by this node's local model when the assistant is on. + // Reply goes back as plain channel text so bare (non-archipelago) clients + // see it. The trust/rate gate lives in run_assist. + if state.assistant.enabled { + if let Some(prompt) = strip_ai_trigger(text) { + if !prompt.is_empty() { + let req_id = state.next_id().await; + let prompt = prompt.to_string(); + let name = peer_name.to_string(); + let st = Arc::clone(state); + tokio::spawn(async move { + super::assist::run_assist( + prompt, + None, + req_id, + contact_id, + name, + super::assist::AssistReply::ChannelText { channel: 0 }, + st, + ) + .await; + }); + } + } + } +} + +/// Recognise a `!ai`/`!ask ` command prefix (case-insensitive) and return the +/// trimmed question after it, or `None` if the text isn't an AI command. +fn strip_ai_trigger(text: &str) -> Option<&str> { + let t = text.trim_start(); + for p in ["!ai ", "!ask "] { + if t.len() >= p.len() && t[..p.len()].eq_ignore_ascii_case(p) { + return Some(t[p.len()..].trim()); + } + } + None } /// Handle a received identity broadcast from a peer. diff --git a/core/archipelago/src/mesh/listener/dispatch.rs b/core/archipelago/src/mesh/listener/dispatch.rs index ea20a626..5a558ae0 100644 --- a/core/archipelago/src/mesh/listener/dispatch.rs +++ b/core/archipelago/src/mesh/listener/dispatch.rs @@ -707,12 +707,23 @@ pub(crate) async fn handle_typed_envelope_direct( Some(envelope.seq), ) .await; - // Run the model + reply off the radio loop. + // Run the model + reply off the radio loop. Typed query → + // typed chunked reply back to the asking peer. let assist_state = Arc::clone(state); let name = sender_name.to_string(); tokio::spawn(async move { - super::assist::run_assist(query, sender_contact_id, name, assist_state) - .await; + super::assist::run_assist( + query.prompt, + query.model, + query.req_id, + sender_contact_id, + name, + super::assist::AssistReply::Typed { + contact_id: sender_contact_id, + }, + assist_state, + ) + .await; }); } Err(e) => warn!("Failed to decode AssistQuery payload: {}", e),