feat(mesh): mesh-AI assistant — Phase 1.1-1.4 (issue #50)
Rust-native lift of Meshroller's LLM bridge. Adds typed AssistQuery/ AssistResponse mesh messages, a trust-gated inbound handler that answers with the node's local Ollama model, and airtime discipline (reply cap, chunking, one in-flight query per asker). Works over both meshcore and Meshtastic radios via the existing MeshRadioDevice abstraction. - message_types: AssistQuery=24 / AssistResponse=25 + payloads - listener/assist.rs: run_assist (gate -> Ollama -> chunked reply) - listener/dispatch.rs: AssistQuery/AssistResponse arms - MeshConfig: assistant_enabled / assistant_model / assistant_trusted_only - MeshState: AssistantConfig + data_dir + in-flight guard Compiles clean (cargo check). Off by default. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
c10f2ac22e
commit
d8d014bfd9
234
core/archipelago/src/mesh/listener/assist.rs
Normal file
234
core/archipelago/src/mesh/listener/assist.rs
Normal file
@ -0,0 +1,234 @@
|
||||
//! Mesh-AI assistant (issue #50) — answers `AssistQuery` messages with this
|
||||
//! node's local LLM and sends the reply back over the mesh.
|
||||
//!
|
||||
//! This is the Rust-native lift of Meshroller's "LLM bridge": a trusted peer
|
||||
//! asks a question over meshcore, an internet/compute-bearing node runs it
|
||||
//! through a local model (Ollama) and streams the answer back in capped,
|
||||
//! ordered chunks. Airtime is scarce, so the reply is length-capped and each
|
||||
//! asker is limited to one in-flight query.
|
||||
|
||||
use super::super::message_types::{self, AssistQueryPayload, AssistResponsePayload, MeshMessageType};
|
||||
use super::bitcoin::send_to_peer;
|
||||
use super::MeshState;
|
||||
use crate::federation::TrustLevel;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::{info, warn};
|
||||
|
||||
/// Local Ollama generate endpoint (same host the Ollama app binds).
|
||||
const OLLAMA_URL: &str = "http://localhost:11434/api/generate";
|
||||
/// Default model when the node hasn't configured one (matches Meshroller).
|
||||
const DEFAULT_MODEL: &str = "qwen2.5-coder";
|
||||
/// Max time to wait on the model before giving up.
|
||||
const OLLAMA_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
/// Hard cap on answer length sent over the radio — keeps airtime sane.
|
||||
const MAX_REPLY_CHARS: usize = 480;
|
||||
/// Characters of answer text per `AssistResponse` chunk.
|
||||
const CHUNK_CHARS: usize = 160;
|
||||
|
||||
/// Entry point: gate the query, run the model, send the answer back.
|
||||
/// Spawned from the dispatch `AssistQuery` arm so the radio loop never blocks.
|
||||
pub(super) async fn run_assist(
|
||||
payload: AssistQueryPayload,
|
||||
sender_contact_id: u32,
|
||||
sender_name: String,
|
||||
state: Arc<MeshState>,
|
||||
) {
|
||||
// Trust + block gate.
|
||||
if !is_sender_allowed(&state, sender_contact_id).await {
|
||||
warn!(
|
||||
from = sender_contact_id,
|
||||
name = %sender_name,
|
||||
"AssistQuery denied — sender not permitted by assistant policy"
|
||||
);
|
||||
// Silent on the wire (no airtime spent on denials); surface to the UI.
|
||||
let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady {
|
||||
req_id: payload.req_id,
|
||||
to_contact_id: sender_contact_id,
|
||||
error: Some("denied".to_string()),
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// One in-flight query per asker.
|
||||
{
|
||||
let mut inflight = state.assist_inflight.write().await;
|
||||
if !inflight.insert(sender_contact_id) {
|
||||
warn!(from = sender_contact_id, "AssistQuery dropped — asker already has one in flight");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let _ = state.event_tx.send(super::super::types::MeshEvent::AssistQueryReceived {
|
||||
from_contact_id: sender_contact_id,
|
||||
prompt: payload.prompt.clone(),
|
||||
});
|
||||
|
||||
let model = payload
|
||||
.model
|
||||
.clone()
|
||||
.or_else(|| state.assistant.model.clone())
|
||||
.unwrap_or_else(|| DEFAULT_MODEL.to_string());
|
||||
|
||||
info!(
|
||||
from = sender_contact_id,
|
||||
req_id = payload.req_id,
|
||||
model = %model,
|
||||
"Answering AI query over mesh"
|
||||
);
|
||||
|
||||
let result = call_ollama(&model, &payload.prompt).await;
|
||||
|
||||
match result {
|
||||
Ok(answer) => {
|
||||
let (text, _truncated) = cap_reply(&answer);
|
||||
send_answer_chunks(&state, sender_contact_id, payload.req_id, &text).await;
|
||||
let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady {
|
||||
req_id: payload.req_id,
|
||||
to_contact_id: sender_contact_id,
|
||||
error: None,
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(req_id = payload.req_id, "AI query failed: {}", e);
|
||||
send_error(&state, sender_contact_id, payload.req_id, "AI unavailable").await;
|
||||
let _ = state.event_tx.send(super::super::types::MeshEvent::AssistResponseReady {
|
||||
req_id: payload.req_id,
|
||||
to_contact_id: sender_contact_id,
|
||||
error: Some(e.to_string()),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
state.assist_inflight.write().await.remove(&sender_contact_id);
|
||||
}
|
||||
|
||||
/// Whether `sender_contact_id` may invoke the assistant under the node's policy.
|
||||
/// Always denies user-blocked contacts. With `trusted_only`, requires a
|
||||
/// federation-Trusted match on the peer's pubkey or DID.
|
||||
async fn is_sender_allowed(state: &Arc<MeshState>, sender_contact_id: u32) -> bool {
|
||||
let (pubkey_hex, did) = {
|
||||
let peers = state.peers.read().await;
|
||||
match peers.get(&sender_contact_id) {
|
||||
Some(p) => (p.pubkey_hex.clone(), p.did.clone()),
|
||||
None => (None, None),
|
||||
}
|
||||
};
|
||||
|
||||
// Never answer a user-blocked contact, regardless of policy.
|
||||
if let Some(ref pk) = pubkey_hex {
|
||||
if state
|
||||
.contacts
|
||||
.read()
|
||||
.await
|
||||
.get(pk)
|
||||
.map(|c| c.blocked)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if !state.assistant.trusted_only {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Trusted-only: match against the federation trust list.
|
||||
let nodes = crate::federation::load_nodes(&state.data_dir)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
nodes.iter().any(|n| {
|
||||
n.trust_level == TrustLevel::Trusted
|
||||
&& (Some(&n.pubkey) == pubkey_hex.as_ref() || Some(&n.did) == did.as_ref())
|
||||
})
|
||||
}
|
||||
|
||||
/// Cap the answer to `MAX_REPLY_CHARS`, appending a marker when truncated.
|
||||
/// Returns (text_to_send, was_truncated).
|
||||
fn cap_reply(answer: &str) -> (String, bool) {
|
||||
let trimmed = answer.trim();
|
||||
if trimmed.chars().count() <= MAX_REPLY_CHARS {
|
||||
return (trimmed.to_string(), false);
|
||||
}
|
||||
let capped: String = trimmed.chars().take(MAX_REPLY_CHARS).collect();
|
||||
(format!("{capped}…(truncated)"), true)
|
||||
}
|
||||
|
||||
/// Split the answer into ordered `AssistResponse` chunks and send each back to
|
||||
/// the asker on the encrypted, peer-addressed path.
|
||||
async fn send_answer_chunks(
|
||||
state: &Arc<MeshState>,
|
||||
dest_contact_id: u32,
|
||||
req_id: u64,
|
||||
text: &str,
|
||||
) {
|
||||
let chars: Vec<char> = text.chars().collect();
|
||||
let chunks: Vec<String> = if chars.is_empty() {
|
||||
vec![String::new()]
|
||||
} else {
|
||||
chars.chunks(CHUNK_CHARS).map(|c| c.iter().collect()).collect()
|
||||
};
|
||||
let last = chunks.len().saturating_sub(1);
|
||||
for (i, chunk) in chunks.into_iter().enumerate() {
|
||||
let payload = AssistResponsePayload {
|
||||
req_id,
|
||||
text: chunk,
|
||||
seq: i as u16,
|
||||
done: i == last,
|
||||
error: None,
|
||||
};
|
||||
send_response(state, dest_contact_id, &payload).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a single failure `AssistResponse` (one chunk, done, error set).
|
||||
async fn send_error(state: &Arc<MeshState>, dest_contact_id: u32, req_id: u64, msg: &str) {
|
||||
let payload = AssistResponsePayload {
|
||||
req_id,
|
||||
text: String::new(),
|
||||
seq: 0,
|
||||
done: true,
|
||||
error: Some(msg.to_string()),
|
||||
};
|
||||
send_response(state, dest_contact_id, &payload).await;
|
||||
}
|
||||
|
||||
/// Encode an `AssistResponse` payload and send it to a peer.
|
||||
async fn send_response(state: &Arc<MeshState>, dest_contact_id: u32, payload: &AssistResponsePayload) {
|
||||
let bytes = match message_types::encode_payload(payload) {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
warn!("Failed to encode AssistResponse: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let envelope = message_types::TypedEnvelope::new(MeshMessageType::AssistResponse, bytes);
|
||||
match envelope.to_wire() {
|
||||
Ok(wire) => send_to_peer(state, dest_contact_id, wire).await,
|
||||
Err(e) => warn!("Failed to encode AssistResponse envelope: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
/// Call the local Ollama model and return the generated text.
|
||||
async fn call_ollama(model: &str, prompt: &str) -> anyhow::Result<String> {
|
||||
let client = reqwest::Client::builder().timeout(OLLAMA_TIMEOUT).build()?;
|
||||
let body = serde_json::json!({
|
||||
"model": model,
|
||||
"prompt": prompt,
|
||||
"stream": false,
|
||||
});
|
||||
let resp = client.post(OLLAMA_URL).json(&body).send().await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("Ollama returned HTTP {}", resp.status());
|
||||
}
|
||||
let json: serde_json::Value = resp.json().await?;
|
||||
let text = json
|
||||
.get("response")
|
||||
.and_then(|r| r.as_str())
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
if text.trim().is_empty() {
|
||||
anyhow::bail!("Ollama returned an empty response");
|
||||
}
|
||||
Ok(text)
|
||||
}
|
||||
@ -445,7 +445,9 @@ async fn encrypt_for_peer(state: &Arc<MeshState>, contact_id: u32, typed_wire: &
|
||||
/// Send raw wire bytes to a specific peer by contact_id.
|
||||
/// Encrypts directed messages via ratchet or shared secret when available.
|
||||
/// Falls back to channel 0 broadcast (plaintext) if peer's pubkey is unknown.
|
||||
async fn send_to_peer(state: &Arc<MeshState>, contact_id: u32, typed_wire: Vec<u8>) {
|
||||
/// `pub(super)` so sibling handlers (e.g. the AI assistant) can reply on the
|
||||
/// same encrypted, peer-addressed path the relay handlers use.
|
||||
pub(super) async fn send_to_peer(state: &Arc<MeshState>, contact_id: u32, typed_wire: Vec<u8>) {
|
||||
let peers = state.peers.read().await;
|
||||
if let Some(peer) = peers.get(&contact_id) {
|
||||
if let Some(ref pk) = peer.pubkey_hex {
|
||||
|
||||
@ -681,6 +681,69 @@ pub(crate) async fn handle_typed_envelope_direct(
|
||||
.await;
|
||||
}
|
||||
|
||||
Some(MeshMessageType::AssistQuery) => {
|
||||
match message_types::decode_payload::<message_types::AssistQueryPayload>(&envelope.v) {
|
||||
Ok(query) => {
|
||||
if !state.assistant.enabled {
|
||||
debug!(
|
||||
from = sender_contact_id,
|
||||
"AssistQuery ignored — assistant disabled on this node"
|
||||
);
|
||||
return;
|
||||
}
|
||||
info!(
|
||||
from = sender_contact_id,
|
||||
req_id = query.req_id,
|
||||
"AI query received over mesh"
|
||||
);
|
||||
let json = payload_to_json(&query);
|
||||
store_typed_message(
|
||||
state,
|
||||
sender_contact_id,
|
||||
sender_name,
|
||||
&query.prompt,
|
||||
"assist_query",
|
||||
json,
|
||||
Some(envelope.seq),
|
||||
)
|
||||
.await;
|
||||
// Run the model + reply off the radio loop.
|
||||
let assist_state = Arc::clone(state);
|
||||
let name = sender_name.to_string();
|
||||
tokio::spawn(async move {
|
||||
super::assist::run_assist(query, sender_contact_id, name, assist_state)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
Err(e) => warn!("Failed to decode AssistQuery payload: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
Some(MeshMessageType::AssistResponse) => {
|
||||
match message_types::decode_payload::<message_types::AssistResponsePayload>(&envelope.v)
|
||||
{
|
||||
Ok(resp) => {
|
||||
let display = resp
|
||||
.error
|
||||
.clone()
|
||||
.map(|e| format!("AI error: {e}"))
|
||||
.unwrap_or_else(|| resp.text.clone());
|
||||
let json = payload_to_json(&resp);
|
||||
store_typed_message(
|
||||
state,
|
||||
sender_contact_id,
|
||||
sender_name,
|
||||
&display,
|
||||
"assist_response",
|
||||
json,
|
||||
Some(envelope.seq),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(e) => warn!("Failed to decode AssistResponse payload: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
_ => {
|
||||
debug!(
|
||||
msg_type = ?msg_type,
|
||||
|
||||
@ -7,6 +7,7 @@
|
||||
//! - Reconnects on device disconnect
|
||||
//! - Manages peer cache and message store
|
||||
|
||||
mod assist;
|
||||
mod bitcoin;
|
||||
mod decode;
|
||||
pub(crate) mod dispatch;
|
||||
@ -121,6 +122,27 @@ pub struct MeshState {
|
||||
/// persistent contact table from regenerating rows the user just
|
||||
/// wiped. Persisted to `mesh-ignored-radio-contacts.json`.
|
||||
pub radio_contact_blocklist: RwLock<HashSet<String>>,
|
||||
/// Mesh-AI assistant settings (issue #50): whether this node answers
|
||||
/// AssistQuery messages with its local LLM, and who may ask.
|
||||
pub assistant: AssistantConfig,
|
||||
/// Data dir — lets dispatch handlers reach disk-backed stores (e.g. the
|
||||
/// federation trust list used to gate AI queries) without threading a path
|
||||
/// through every call.
|
||||
pub data_dir: std::path::PathBuf,
|
||||
/// Contact-ids with an AI query currently being answered. Caps each asker to
|
||||
/// one in-flight query so a peer can't flood the node's compute / airtime.
|
||||
pub assist_inflight: RwLock<HashSet<u32>>,
|
||||
}
|
||||
|
||||
/// Mesh-AI assistant configuration, snapshotted from `MeshConfig` at startup.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AssistantConfig {
|
||||
/// Answer AssistQuery messages with the local LLM.
|
||||
pub enabled: bool,
|
||||
/// Ollama model to use; None → the built-in default.
|
||||
pub model: Option<String>,
|
||||
/// Restrict asking to federation-Trusted peers (vs. anyone on the mesh).
|
||||
pub trusted_only: bool,
|
||||
}
|
||||
|
||||
/// Contact metadata kept alongside MeshState.peers. Pinned contacts sort to
|
||||
@ -153,6 +175,8 @@ impl MeshState {
|
||||
encrypt_relay: bool,
|
||||
session_manager: Arc<super::session::SessionManager>,
|
||||
our_ed_pubkey_hex: String,
|
||||
assistant: AssistantConfig,
|
||||
data_dir: std::path::PathBuf,
|
||||
) -> (
|
||||
Arc<Self>,
|
||||
broadcast::Receiver<MeshEvent>,
|
||||
@ -192,6 +216,9 @@ impl MeshState {
|
||||
our_ed_pubkey_hex,
|
||||
blob_store: RwLock::new(None),
|
||||
radio_contact_blocklist: RwLock::new(HashSet::new()),
|
||||
assistant,
|
||||
data_dir,
|
||||
assist_inflight: RwLock::new(HashSet::new()),
|
||||
});
|
||||
(state, rx, cmd_rx)
|
||||
}
|
||||
|
||||
@ -70,6 +70,12 @@ pub enum MeshMessageType {
|
||||
/// MCIIXXTT framing) and the peer has no Tor path. Recipient writes the
|
||||
/// bytes to its local BlobStore on reassembly.
|
||||
ContentInline = 23,
|
||||
/// "Ask the node's AI" — a prompt to be answered by the receiving node's
|
||||
/// local LLM (issue #50). Gated by the assistant config + trust policy.
|
||||
AssistQuery = 24,
|
||||
/// Reply to an AssistQuery — a chunk of the LLM's answer, addressed back to
|
||||
/// the asker by `req_id`. Long answers span multiple chunks (`seq`/`done`).
|
||||
AssistResponse = 25,
|
||||
}
|
||||
|
||||
impl MeshMessageType {
|
||||
@ -99,6 +105,8 @@ impl MeshMessageType {
|
||||
21 => Some(Self::ChannelInvite),
|
||||
22 => Some(Self::ContactCard),
|
||||
23 => Some(Self::ContentInline),
|
||||
24 => Some(Self::AssistQuery),
|
||||
25 => Some(Self::AssistResponse),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@ -132,6 +140,8 @@ impl MeshMessageType {
|
||||
"channel_invite" => Some(Self::ChannelInvite),
|
||||
"contact_card" => Some(Self::ContactCard),
|
||||
"content_inline" => Some(Self::ContentInline),
|
||||
"assist_query" => Some(Self::AssistQuery),
|
||||
"assist_response" => Some(Self::AssistResponse),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@ -162,6 +172,8 @@ impl MeshMessageType {
|
||||
Self::ChannelInvite => "channel_invite",
|
||||
Self::ContactCard => "contact_card",
|
||||
Self::ContentInline => "content_inline",
|
||||
Self::AssistQuery => "assist_query",
|
||||
Self::AssistResponse => "assist_response",
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -407,6 +419,37 @@ pub struct LightningRelayPayload {
|
||||
pub request_id: u64,
|
||||
}
|
||||
|
||||
/// "Ask the node's AI" request (issue #50). Sent to a peer running a local
|
||||
/// LLM; answered with one or more `AssistResponsePayload` chunks.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AssistQueryPayload {
|
||||
/// Asker-chosen id correlating the query with its response chunks.
|
||||
pub req_id: u64,
|
||||
/// The natural-language prompt.
|
||||
pub prompt: String,
|
||||
/// Optional model override; falls back to the responder's configured model.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub model: Option<String>,
|
||||
}
|
||||
|
||||
/// One chunk of an AI answer, addressed back to the asker by `req_id`.
|
||||
/// Airtime is scarce, so long answers are capped and split into ordered chunks.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AssistResponsePayload {
|
||||
pub req_id: u64,
|
||||
/// This chunk's text.
|
||||
pub text: String,
|
||||
/// 0-based chunk index.
|
||||
#[serde(default)]
|
||||
pub seq: u16,
|
||||
/// True on the final chunk.
|
||||
#[serde(default)]
|
||||
pub done: bool,
|
||||
/// Set instead of `text` when the query failed (model unreachable, denied…).
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
/// Lightning relay response (proof of payment).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LightningRelayResponsePayload {
|
||||
|
||||
@ -176,6 +176,17 @@ pub struct MeshConfig {
|
||||
/// Set to false to disable encryption for debugging or rollback.
|
||||
#[serde(default = "default_true")]
|
||||
pub encrypt_relay_messages: bool,
|
||||
/// Answer AI queries (AssistQuery) from peers using this node's local LLM
|
||||
/// (issue #50). Off by default — the node only becomes a mesh AI on opt-in.
|
||||
#[serde(default)]
|
||||
pub assistant_enabled: bool,
|
||||
/// Ollama model used to answer AI queries. None → the built-in default.
|
||||
#[serde(default)]
|
||||
pub assistant_model: Option<String>,
|
||||
/// When true (default), only federation-Trusted peers may ask; when false,
|
||||
/// any peer on the mesh may ask (spends this node's compute + airtime).
|
||||
#[serde(default = "default_true")]
|
||||
pub assistant_trusted_only: bool,
|
||||
}
|
||||
|
||||
fn default_true() -> bool {
|
||||
@ -194,6 +205,9 @@ impl Default for MeshConfig {
|
||||
announce_block_headers: false,
|
||||
steganography_mode: steganography::SteganographyMode::Normal,
|
||||
encrypt_relay_messages: true,
|
||||
assistant_enabled: false,
|
||||
assistant_model: None,
|
||||
assistant_trusted_only: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -362,6 +376,12 @@ impl MeshService {
|
||||
config.encrypt_relay_messages,
|
||||
Arc::clone(&session_manager),
|
||||
ed_pubkey_hex.to_string(),
|
||||
listener::AssistantConfig {
|
||||
enabled: config.assistant_enabled,
|
||||
model: config.assistant_model.clone(),
|
||||
trusted_only: config.assistant_trusted_only,
|
||||
},
|
||||
data_dir.to_path_buf(),
|
||||
);
|
||||
|
||||
// Derive X25519 keys from Ed25519 identity
|
||||
|
||||
@ -161,4 +161,15 @@ pub enum MeshEvent {
|
||||
payment_hash: Option<String>,
|
||||
error: Option<String>,
|
||||
},
|
||||
/// An AI query arrived from a peer and was accepted for answering (#50).
|
||||
AssistQueryReceived {
|
||||
from_contact_id: u32,
|
||||
prompt: String,
|
||||
},
|
||||
/// A local-AI answer finished sending back to the asker (or failed) (#50).
|
||||
AssistResponseReady {
|
||||
req_id: u64,
|
||||
to_contact_id: u32,
|
||||
error: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user