feat(mesh): assistant Phase 1.5 — !ai channel trigger (issue #50)

A plain '!ai <q>' / '!ask <q>' on the channel is now answered by the node's
local model and broadcast back as plain text, so ANY client (bare meshcore
or Meshtastic) can ask. Generalised run_assist with an AssistReply target:
Typed chunks to a peer (archipelago UI path) vs plain channel-text (bare
clients). Trust/rate gate unchanged; asker identity is separate from reply
mode. Works over both radios.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
archipelago 2026-06-17 17:59:03 -04:00
parent d8d014bfd9
commit 87d0d53205
3 changed files with 157 additions and 57 deletions

View File

@ -7,9 +7,9 @@
//! ordered chunks. Airtime is scarce, so the reply is length-capped and each
//! asker is limited to one in-flight query.
use super::super::message_types::{self, AssistQueryPayload, AssistResponsePayload, MeshMessageType};
use super::super::message_types::{self, AssistResponsePayload, MeshMessageType};
use super::bitcoin::send_to_peer;
use super::MeshState;
use super::{MeshCommand, MeshState};
use crate::federation::TrustLevel;
use std::sync::Arc;
use std::time::Duration;
@ -25,26 +25,44 @@ const OLLAMA_TIMEOUT: Duration = Duration::from_secs(60);
const MAX_REPLY_CHARS: usize = 480;
/// Characters of answer text per `AssistResponse` chunk.
const CHUNK_CHARS: usize = 160;
/// Tighter cap for plain-text channel replies (bare `!ai` clients) — these
/// aren't reassembled by an archipelago UI, so keep them to a couple frames.
const CHANNEL_REPLY_CHARS: usize = 200;
/// Entry point: gate the query, run the model, send the answer back.
/// Spawned from the dispatch `AssistQuery` arm so the radio loop never blocks.
/// Where an answer should go.
pub(super) enum AssistReply {
/// Typed `AssistResponse` chunks addressed to one peer — the archipelago
/// UI path (rich, reassembled, correlated by `req_id`).
Typed { contact_id: u32 },
/// Plain-text broadcast on a mesh channel — the bare `!ai` path, so any
/// client (including non-archipelago meshcore/Meshtastic nodes) sees it.
ChannelText { channel: u8 },
}
/// Entry point: gate the query, run the model, send the answer back via the
/// requested reply path. Spawned off the radio loop so it never blocks.
pub(super) async fn run_assist(
payload: AssistQueryPayload,
sender_contact_id: u32,
prompt: String,
model_override: Option<String>,
req_id: u64,
asker_contact_id: u32,
sender_name: String,
reply: AssistReply,
state: Arc<MeshState>,
) {
let asker = asker_contact_id;
// Trust + block gate.
if !is_sender_allowed(&state, sender_contact_id).await {
if !is_sender_allowed(&state, asker).await {
warn!(
from = sender_contact_id,
from = asker,
name = %sender_name,
"AssistQuery denied — sender not permitted by assistant policy"
);
// Silent on the wire (no airtime spent on denials); surface to the UI.
let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady {
req_id: payload.req_id,
to_contact_id: sender_contact_id,
req_id,
to_contact_id: asker,
error: Some("denied".to_string()),
});
return;
@ -53,54 +71,46 @@ pub(super) async fn run_assist(
// One in-flight query per asker.
{
let mut inflight = state.assist_inflight.write().await;
if !inflight.insert(sender_contact_id) {
warn!(from = sender_contact_id, "AssistQuery dropped — asker already has one in flight");
if !inflight.insert(asker) {
warn!(from = asker, "AssistQuery dropped — asker already has one in flight");
return;
}
}
let _ = state.event_tx.send(super::super::types::MeshEvent::AssistQueryReceived {
from_contact_id: sender_contact_id,
prompt: payload.prompt.clone(),
from_contact_id: asker,
prompt: prompt.clone(),
});
let model = payload
.model
.clone()
let model = model_override
.or_else(|| state.assistant.model.clone())
.unwrap_or_else(|| DEFAULT_MODEL.to_string());
info!(
from = sender_contact_id,
req_id = payload.req_id,
model = %model,
"Answering AI query over mesh"
);
info!(from = asker, req_id, model = %model, "Answering AI query over mesh");
let result = call_ollama(&model, &payload.prompt).await;
let result = call_ollama(&model, &prompt).await;
match result {
Ok(answer) => {
let (text, _truncated) = cap_reply(&answer);
send_answer_chunks(&state, sender_contact_id, payload.req_id, &text).await;
send_reply(&state, &reply, req_id, &answer).await;
let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady {
req_id: payload.req_id,
to_contact_id: sender_contact_id,
req_id,
to_contact_id: asker,
error: None,
});
}
Err(e) => {
warn!(req_id = payload.req_id, "AI query failed: {}", e);
send_error(&state, sender_contact_id, payload.req_id, "AI unavailable").await;
warn!(req_id, "AI query failed: {}", e);
send_failure(&state, &reply, req_id, "AI unavailable").await;
let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady {
req_id: payload.req_id,
to_contact_id: sender_contact_id,
req_id,
to_contact_id: asker,
error: Some(e.to_string()),
});
}
}
state.assist_inflight.write().await.remove(&sender_contact_id);
state.assist_inflight.write().await.remove(&asker);
}
/// Whether `sender_contact_id` may invoke the assistant under the node's policy.
@ -154,14 +164,42 @@ fn cap_reply(answer: &str) -> (String, bool) {
(format!("{capped}…(truncated)"), true)
}
/// Send a successful answer via the requested reply path.
async fn send_reply(state: &Arc<MeshState>, reply: &AssistReply, req_id: u64, answer: &str) {
match reply {
AssistReply::Typed { contact_id } => {
let (text, _) = cap_reply(answer);
send_typed_chunks(state, *contact_id, req_id, &text).await;
}
AssistReply::ChannelText { channel } => {
let text = cap_channel(answer);
send_channel_text(state, *channel, &text).await;
}
}
}
/// Send a failure notice via the requested reply path.
async fn send_failure(state: &Arc<MeshState>, reply: &AssistReply, req_id: u64, msg: &str) {
match reply {
AssistReply::Typed { contact_id } => {
let payload = AssistResponsePayload {
req_id,
text: String::new(),
seq: 0,
done: true,
error: Some(msg.to_string()),
};
send_typed_response(state, *contact_id, &payload).await;
}
AssistReply::ChannelText { channel } => {
send_channel_text(state, *channel, &format!("AI: {msg}")).await;
}
}
}
/// Split the answer into ordered `AssistResponse` chunks and send each back to
/// the asker on the encrypted, peer-addressed path.
async fn send_answer_chunks(
state: &Arc<MeshState>,
dest_contact_id: u32,
req_id: u64,
text: &str,
) {
/// the asker on the encrypted, peer-addressed path (archipelago UI path).
async fn send_typed_chunks(state: &Arc<MeshState>, dest_contact_id: u32, req_id: u64, text: &str) {
let chars: Vec<char> = text.chars().collect();
let chunks: Vec<String> = if chars.is_empty() {
vec![String::new()]
@ -177,24 +215,16 @@ async fn send_answer_chunks(
done: i == last,
error: None,
};
send_response(state, dest_contact_id, &payload).await;
send_typed_response(state, dest_contact_id, &payload).await;
}
}
/// Send a single failure `AssistResponse` (one chunk, done, error set).
async fn send_error(state: &Arc<MeshState>, dest_contact_id: u32, req_id: u64, msg: &str) {
let payload = AssistResponsePayload {
req_id,
text: String::new(),
seq: 0,
done: true,
error: Some(msg.to_string()),
};
send_response(state, dest_contact_id, &payload).await;
}
/// Encode an `AssistResponse` payload and send it to a peer.
async fn send_response(state: &Arc<MeshState>, dest_contact_id: u32, payload: &AssistResponsePayload) {
async fn send_typed_response(
state: &Arc<MeshState>,
dest_contact_id: u32,
payload: &AssistResponsePayload,
) {
let bytes = match message_types::encode_payload(payload) {
Ok(b) => b,
Err(e) => {
@ -209,6 +239,26 @@ async fn send_response(state: &Arc<MeshState>, dest_contact_id: u32, payload: &A
}
}
/// Broadcast a plain-text answer on a channel for bare `!ai` clients.
async fn send_channel_text(state: &Arc<MeshState>, channel: u8, text: &str) {
let _ = state
.send_cmd(MeshCommand::BroadcastChannel {
channel,
payload: text.as_bytes().to_vec(),
})
.await;
}
/// Cap a plain-text channel reply to a couple of frames.
fn cap_channel(answer: &str) -> String {
let trimmed = answer.trim();
if trimmed.chars().count() <= CHANNEL_REPLY_CHARS {
return format!("AI: {trimmed}");
}
let capped: String = trimmed.chars().take(CHANNEL_REPLY_CHARS).collect();
format!("AI: {capped}")
}
/// Call the local Ollama model and return the generated text.
async fn call_ollama(model: &str, prompt: &str) -> anyhow::Result<String> {
let client = reqwest::Client::builder().timeout(OLLAMA_TIMEOUT).build()?;

View File

@ -352,6 +352,45 @@ pub(super) async fn store_plain_message(
state.store_message(msg.clone()).await;
state.status.write().await.messages_received += 1;
let _ = state.event_tx.send(MeshEvent::MessageReceived(msg));
// Mesh-AI assistant (issue #50): a plain `!ai`/`!ask <question>` on the
// channel is answered by this node's local model when the assistant is on.
// Reply goes back as plain channel text so bare (non-archipelago) clients
// see it. The trust/rate gate lives in run_assist.
if state.assistant.enabled {
if let Some(prompt) = strip_ai_trigger(text) {
if !prompt.is_empty() {
let req_id = state.next_id().await;
let prompt = prompt.to_string();
let name = peer_name.to_string();
let st = Arc::clone(state);
tokio::spawn(async move {
super::assist::run_assist(
prompt,
None,
req_id,
contact_id,
name,
super::assist::AssistReply::ChannelText { channel: 0 },
st,
)
.await;
});
}
}
}
}
/// Recognise a `!ai`/`!ask ` command prefix (case-insensitive) and return the
/// trimmed question after it, or `None` if the text isn't an AI command.
fn strip_ai_trigger(text: &str) -> Option<&str> {
let t = text.trim_start();
for p in ["!ai ", "!ask "] {
if t.len() >= p.len() && t[..p.len()].eq_ignore_ascii_case(p) {
return Some(t[p.len()..].trim());
}
}
None
}
/// Handle a received identity broadcast from a peer.

View File

@ -707,11 +707,22 @@ pub(crate) async fn handle_typed_envelope_direct(
Some(envelope.seq),
)
.await;
// Run the model + reply off the radio loop.
// Run the model + reply off the radio loop. Typed query →
// typed chunked reply back to the asking peer.
let assist_state = Arc::clone(state);
let name = sender_name.to_string();
tokio::spawn(async move {
super::assist::run_assist(query, sender_contact_id, name, assist_state)
super::assist::run_assist(
query.prompt,
query.model,
query.req_id,
sender_contact_id,
name,
super::assist::AssistReply::Typed {
contact_id: sender_contact_id,
},
assist_state,
)
.await;
});
}