archy/core/archipelago/src/mesh/meshtastic.rs
2026-05-17 20:45:56 -04:00

622 lines
20 KiB
Rust

//! Async serial driver for Meshtastic devices.
//!
//! Meshtastic uses protobuf payloads over a SLIP-like serial stream. This
//! module implements only the small subset Archipelago needs: connect,
//! discover the local node, send/receive text packets, and provide synthetic
//! contacts to the existing mesh listener.
use super::protocol::{InboundFrame, ParsedContact};
use super::types::{DeviceInfo, DeviceType};
use anyhow::{Context, Result};
use std::collections::HashMap;
use std::time::Duration;
use tracing::{debug, info, warn};
const BAUD_RATE: u32 = 115200;
const READ_TIMEOUT: Duration = Duration::from_secs(5);
const WRITE_TIMEOUT: Duration = Duration::from_secs(2);
const READ_BUF_SIZE: usize = 512;
const START1: u8 = 0x94;
const START2: u8 = 0xc3;
const TO_RADIO_MAX: usize = 512;
const BROADCAST_NUM: u32 = 0xffff_ffff;
const TEXT_MESSAGE_APP: u32 = 1;
const TO_RADIO_PACKET: u64 = 1;
const TO_RADIO_WANT_CONFIG_ID: u64 = 3;
const TO_RADIO_HEARTBEAT: u64 = 7;
const FROM_RADIO_PACKET: u64 = 2;
const FROM_RADIO_MY_INFO: u64 = 3;
const FROM_RADIO_NODE_INFO: u64 = 4;
const FROM_RADIO_CONFIG_COMPLETE_ID: u64 = 7;
const FROM_RADIO_REBOOTED: u64 = 8;
/// Async Meshtastic device handle.
pub struct MeshtasticDevice {
port: serial2_tokio::SerialPort,
read_buf: Vec<u8>,
node_num: Option<u32>,
user_id: Option<String>,
long_name: Option<String>,
short_name: Option<String>,
contacts: HashMap<u32, ParsedContact>,
device_path: String,
}
impl MeshtasticDevice {
pub async fn open(path: &str) -> Result<Self> {
match tokio::fs::metadata(path).await {
Ok(meta) => {
debug!(path = %path, permissions = ?meta.permissions(), "Device node exists")
}
Err(e) => anyhow::bail!("Serial device {} not accessible: {}", path, e),
}
let port = serial2_tokio::SerialPort::open(path, BAUD_RATE).context(format!(
"Failed to open serial port {} (permission denied? device busy?)",
path
))?;
info!(path = %path, baud = BAUD_RATE, "Opened Meshtastic serial port");
Ok(Self {
port,
read_buf: Vec::with_capacity(READ_BUF_SIZE),
node_num: None,
user_id: None,
long_name: None,
short_name: None,
contacts: HashMap::new(),
device_path: path.to_string(),
})
}
pub async fn initialize(&mut self) -> Result<DeviceInfo> {
info!(path = %self.device_path, "Starting Meshtastic handshake");
self.send_to_radio(&encode_want_config()).await?;
let deadline = tokio::time::Instant::now() + READ_TIMEOUT;
let mut saw_meshtastic_frame = false;
let mut saw_config_complete = false;
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(
remaining.min(Duration::from_millis(250)),
self.read_from_radio(),
)
.await
{
Ok(Ok(Some(frame))) => {
saw_meshtastic_frame = true;
if matches!(
decode_top_level_variant(&frame),
Some((FROM_RADIO_CONFIG_COMPLETE_ID, _))
) {
saw_config_complete = true;
}
self.handle_from_radio(&frame);
if saw_config_complete && self.node_num.is_some() {
break;
}
}
Ok(Ok(None)) | Err(_) => {}
Ok(Err(e)) => return Err(e),
}
}
if !saw_meshtastic_frame {
anyhow::bail!("No Meshtastic serial API response");
}
let node_id = self
.node_num
.ok_or_else(|| anyhow::anyhow!("Meshtastic serial API did not provide MyInfo"))?;
if self.user_id.is_none() && self.long_name.is_none() && self.short_name.is_none() {
anyhow::bail!("Meshtastic serial API did not provide node identity");
}
let firmware_version = self
.long_name
.clone()
.or_else(|| self.user_id.clone())
.unwrap_or_else(|| "Meshtastic".to_string());
info!(node_id, name = %firmware_version, "Meshtastic identity");
Ok(DeviceInfo {
firmware_version,
node_id,
max_contacts: 200,
device_type: DeviceType::Meshtastic,
})
}
pub async fn set_advert_name(&mut self, name: &str) -> Result<()> {
self.long_name = Some(name.to_string());
Ok(())
}
pub async fn send_self_advert(&mut self) -> Result<()> {
self.send_to_radio(&encode_heartbeat()).await
}
pub async fn send_channel_text(&mut self, _channel: u8, msg: &[u8]) -> Result<()> {
let text = String::from_utf8_lossy(msg);
let packet = encode_mesh_packet(BROADCAST_NUM, TEXT_MESSAGE_APP, text.as_bytes());
self.send_to_radio(&encode_to_radio_variant(TO_RADIO_PACKET, &packet))
.await
}
pub async fn get_contacts(&mut self) -> Result<Vec<ParsedContact>> {
if self.contacts.is_empty() {
self.send_to_radio(&encode_want_config()).await?;
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
while tokio::time::Instant::now() < deadline {
match self.read_from_radio().await? {
Some(frame) => {
let config_complete = matches!(
decode_top_level_variant(&frame),
Some((FROM_RADIO_CONFIG_COMPLETE_ID, _))
);
self.handle_from_radio(&frame);
if config_complete || !self.contacts.is_empty() {
break;
}
}
None => tokio::time::sleep(Duration::from_millis(50)).await,
}
}
}
Ok(self.contacts.values().cloned().collect())
}
pub async fn reset_contact_path(&mut self, _pubkey: &[u8; 32]) -> Result<()> {
Ok(())
}
pub async fn sync_messages(&mut self) -> Result<Vec<InboundFrame>> {
Ok(Vec::new())
}
pub async fn try_recv_frame(&mut self) -> Result<Option<InboundFrame>> {
let Some(frame) = self.read_from_radio().await? else {
return Ok(None);
};
Ok(self.handle_from_radio(&frame))
}
pub fn advert_name(&self) -> Option<String> {
self.long_name
.clone()
.or_else(|| self.short_name.clone())
.or_else(|| self.user_id.clone())
}
async fn send_to_radio(&mut self, payload: &[u8]) -> Result<()> {
if payload.len() > TO_RADIO_MAX {
anyhow::bail!("Meshtastic payload too large: {} bytes", payload.len());
}
let mut frame = Vec::with_capacity(4 + payload.len());
frame.push(START1);
frame.push(START2);
frame.extend_from_slice(&(payload.len() as u16).to_be_bytes());
frame.extend_from_slice(payload);
tokio::time::timeout(WRITE_TIMEOUT, self.port.write_all(&frame))
.await
.context("Meshtastic serial write timed out")?
.context("Meshtastic serial write failed")?;
Ok(())
}
async fn read_from_radio(&mut self) -> Result<Option<Vec<u8>>> {
if let Some(frame) = decode_serial_frame(&mut self.read_buf) {
return Ok(Some(frame));
}
let mut tmp = [0u8; READ_BUF_SIZE];
match tokio::time::timeout(Duration::from_millis(50), self.port.read(&mut tmp)).await {
Ok(Ok(0)) => anyhow::bail!("Meshtastic serial port closed"),
Ok(Ok(n)) => self.read_buf.extend_from_slice(&tmp[..n]),
Ok(Err(e)) => return Err(e).context("Meshtastic serial read error"),
Err(_) => return Ok(None),
}
Ok(decode_serial_frame(&mut self.read_buf))
}
fn handle_from_radio(&mut self, frame: &[u8]) -> Option<InboundFrame> {
let Some((field, value)) = decode_top_level_variant(frame) else {
return None;
};
match field {
FROM_RADIO_MY_INFO => {
if let Some((node_num, user_id)) = parse_my_info(value) {
self.node_num = Some(node_num);
if let Some(user_id) = user_id {
self.user_id = Some(user_id);
}
}
None
}
FROM_RADIO_NODE_INFO => {
self.update_node_info(value);
None
}
FROM_RADIO_PACKET => self.packet_to_inbound_frame(value),
FROM_RADIO_CONFIG_COMPLETE_ID | FROM_RADIO_REBOOTED => None,
other => {
debug!(
field = other,
len = value.len(),
"Unhandled Meshtastic FromRadio field"
);
None
}
}
}
fn update_node_info(&mut self, data: &[u8]) {
if let Some(node) = parse_node_info(data) {
let key = synthetic_pubkey(node.num);
let name = node
.long_name
.or(node.short_name)
.or(node.id)
.unwrap_or_else(|| format!("Meshtastic !{:08x}", node.num));
if Some(node.num) == self.node_num {
self.long_name = Some(name.clone());
}
self.contacts.insert(
node.num,
ParsedContact {
public_key_hex: hex::encode(key),
advert_name: name,
last_advert: node.last_heard.unwrap_or_default(),
contact_type: 1,
path_len: 0xff,
flags: 0,
},
);
}
}
fn packet_to_inbound_frame(&mut self, data: &[u8]) -> Option<InboundFrame> {
let packet = parse_mesh_packet(data)?;
if packet.portnum != TEXT_MESSAGE_APP || packet.payload.is_empty() {
return None;
}
let from = packet.from.unwrap_or(0);
if Some(from) == self.node_num {
return None;
}
let from_key = synthetic_pubkey(from);
self.contacts.entry(from).or_insert_with(|| ParsedContact {
public_key_hex: hex::encode(synthetic_pubkey(from)),
advert_name: format!("Meshtastic !{:08x}", from),
last_advert: 0,
contact_type: 1,
path_len: 0xff,
flags: 0,
});
let mut payload = Vec::with_capacity(15 + packet.payload.len());
payload.push(0); // SNR unknown
payload.extend_from_slice(&[0, 0]); // reserved
payload.extend_from_slice(&from_key[..6]);
payload.push(0xff); // unknown/flood path
payload.push(0); // text type
payload.extend_from_slice(&0u32.to_le_bytes());
payload.extend_from_slice(&packet.payload);
Some(InboundFrame {
code: super::protocol::RESP_CONTACT_MSG_V3,
data: payload,
bytes_consumed: 0,
})
}
}
fn decode_serial_frame(buf: &mut Vec<u8>) -> Option<Vec<u8>> {
let start = buf.windows(2).position(|w| w == [START1, START2])?;
if start > 0 {
buf.drain(..start);
}
if buf.len() < 4 {
return None;
}
let len = u16::from_be_bytes([buf[2], buf[3]]) as usize;
if buf.len() < 4 + len {
return None;
}
let payload = buf[4..4 + len].to_vec();
buf.drain(..4 + len);
Some(payload)
}
fn encode_want_config() -> Vec<u8> {
encode_varint_field(TO_RADIO_WANT_CONFIG_ID, 1)
}
fn encode_heartbeat() -> Vec<u8> {
encode_to_radio_variant(TO_RADIO_HEARTBEAT, &[])
}
fn encode_to_radio_variant(field: u64, bytes: &[u8]) -> Vec<u8> {
let mut out = Vec::new();
encode_len_field(field, bytes, &mut out);
out
}
fn encode_mesh_packet(to: u32, portnum: u32, payload: &[u8]) -> Vec<u8> {
let mut decoded = Vec::new();
encode_varint_field_into(1, portnum as u64, &mut decoded);
encode_len_field(2, payload, &mut decoded);
let mut packet = Vec::new();
encode_fixed32_field(2, to, &mut packet);
encode_len_field(4, &decoded, &mut packet);
packet
}
fn decode_top_level_variant(buf: &[u8]) -> Option<(u64, &[u8])> {
let mut idx = 0;
while idx < buf.len() {
let (key, n) = read_varint(&buf[idx..])?;
idx += n;
let field = key >> 3;
match key & 0x07 {
0 => {
let (_, n) = read_varint(&buf[idx..])?;
idx += n;
if matches!(field, FROM_RADIO_CONFIG_COMPLETE_ID | FROM_RADIO_REBOOTED) {
return Some((field, &[]));
}
}
2 => {
let (len, n) = read_varint(&buf[idx..])?;
idx += n;
let end = idx.checked_add(len as usize)?;
if end > buf.len() {
return None;
}
if matches!(
field,
FROM_RADIO_PACKET | FROM_RADIO_MY_INFO | FROM_RADIO_NODE_INFO
) {
return Some((field, &buf[idx..end]));
}
idx = end;
}
_ => return None,
}
}
None
}
fn parse_my_info(data: &[u8]) -> Option<(u32, Option<String>)> {
let mut idx = 0;
let mut node_num = None;
let mut user_id = None;
while idx < data.len() {
let (field, value, next) = next_field(data, idx)?;
idx = next;
match (field, value) {
(1, FieldValue::Varint(v)) => node_num = Some(v as u32),
(1, FieldValue::Fixed32(v)) => node_num = Some(v),
(3, FieldValue::Bytes(b)) => user_id = parse_user(b).and_then(|u| u.id),
_ => {}
}
}
node_num.map(|n| (n, user_id))
}
struct ParsedNode {
num: u32,
id: Option<String>,
long_name: Option<String>,
short_name: Option<String>,
last_heard: Option<u32>,
}
fn parse_node_info(data: &[u8]) -> Option<ParsedNode> {
let mut idx = 0;
let mut node = ParsedNode {
num: 0,
id: None,
long_name: None,
short_name: None,
last_heard: None,
};
while idx < data.len() {
let (field, value, next) = next_field(data, idx)?;
idx = next;
match (field, value) {
(1, FieldValue::Varint(v)) => node.num = v as u32,
(1, FieldValue::Fixed32(v)) => node.num = v,
(2, FieldValue::Bytes(b)) => {
if let Some(user) = parse_user(b) {
node.id = user.id;
node.long_name = user.long_name;
node.short_name = user.short_name;
}
}
(5, FieldValue::Fixed32(v)) => node.last_heard = Some(v),
_ => {}
}
}
if node.num == 0 {
None
} else {
Some(node)
}
}
struct ParsedUser {
id: Option<String>,
long_name: Option<String>,
short_name: Option<String>,
}
fn parse_user(data: &[u8]) -> Option<ParsedUser> {
let mut idx = 0;
let mut user = ParsedUser {
id: None,
long_name: None,
short_name: None,
};
while idx < data.len() {
let (field, value, next) = next_field(data, idx)?;
idx = next;
match (field, value) {
(1, FieldValue::Bytes(b)) => user.id = string_field(b),
(2, FieldValue::Bytes(b)) => user.long_name = string_field(b),
(3, FieldValue::Bytes(b)) => user.short_name = string_field(b),
_ => {}
}
}
Some(user)
}
struct ParsedPacket {
from: Option<u32>,
portnum: u32,
payload: Vec<u8>,
}
fn parse_mesh_packet(data: &[u8]) -> Option<ParsedPacket> {
let mut idx = 0;
let mut from = None;
let mut decoded = None;
while idx < data.len() {
let (field, value, next) = next_field(data, idx)?;
idx = next;
match (field, value) {
(1, FieldValue::Fixed32(v)) => from = Some(v),
(4, FieldValue::Bytes(b)) => decoded = Some(b),
_ => {}
}
}
let decoded = decoded?;
let mut didx = 0;
let mut portnum = 0;
let mut payload = Vec::new();
while didx < decoded.len() {
let (field, value, next) = next_field(decoded, didx)?;
didx = next;
match (field, value) {
(1, FieldValue::Varint(v)) => portnum = v as u32,
(2, FieldValue::Bytes(b)) => payload = b.to_vec(),
_ => {}
}
}
Some(ParsedPacket {
from,
portnum,
payload,
})
}
enum FieldValue<'a> {
Varint(u64),
Fixed32(u32),
Bytes(&'a [u8]),
}
fn next_field(buf: &[u8], idx: usize) -> Option<(u64, FieldValue<'_>, usize)> {
let (key, n) = read_varint(&buf[idx..])?;
let field = key >> 3;
let mut pos = idx + n;
match key & 0x07 {
0 => {
let (v, n) = read_varint(&buf[pos..])?;
pos += n;
Some((field, FieldValue::Varint(v), pos))
}
2 => {
let (len, n) = read_varint(&buf[pos..])?;
pos += n;
let end = pos.checked_add(len as usize)?;
if end > buf.len() {
return None;
}
Some((field, FieldValue::Bytes(&buf[pos..end]), end))
}
5 => {
let end = pos.checked_add(4)?;
if end > buf.len() {
None
} else {
let value =
u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]]);
Some((field, FieldValue::Fixed32(value), end))
}
}
1 => {
let end = pos.checked_add(8)?;
if end > buf.len() {
None
} else {
Some((field, FieldValue::Bytes(&buf[pos..end]), end))
}
}
wire => {
warn!(wire, "Unsupported Meshtastic protobuf wire type");
None
}
}
}
fn read_varint(buf: &[u8]) -> Option<(u64, usize)> {
let mut out = 0u64;
for (i, b) in buf.iter().copied().enumerate().take(10) {
out |= ((b & 0x7f) as u64) << (7 * i);
if b & 0x80 == 0 {
return Some((out, i + 1));
}
}
None
}
fn encode_varint_field(field: u64, value: u64) -> Vec<u8> {
let mut out = Vec::new();
encode_varint_field_into(field, value, &mut out);
out
}
fn encode_varint_field_into(field: u64, value: u64, out: &mut Vec<u8>) {
write_varint((field << 3) | 0, out);
write_varint(value, out);
}
fn encode_len_field(field: u64, bytes: &[u8], out: &mut Vec<u8>) {
write_varint((field << 3) | 2, out);
write_varint(bytes.len() as u64, out);
out.extend_from_slice(bytes);
}
fn encode_fixed32_field(field: u64, value: u32, out: &mut Vec<u8>) {
write_varint((field << 3) | 5, out);
out.extend_from_slice(&value.to_le_bytes());
}
fn write_varint(mut value: u64, out: &mut Vec<u8>) {
while value >= 0x80 {
out.push((value as u8 & 0x7f) | 0x80);
value >>= 7;
}
out.push(value as u8);
}
fn string_field(bytes: &[u8]) -> Option<String> {
std::str::from_utf8(bytes).ok().map(|s| s.to_string())
}
fn synthetic_pubkey(node_num: u32) -> [u8; 32] {
let mut out = [0u8; 32];
out[..4].copy_from_slice(&node_num.to_le_bytes());
out[4..15].copy_from_slice(b"meshtastic:");
out
}