//! Async serial driver for Meshtastic devices. //! //! Meshtastic uses protobuf payloads over a SLIP-like serial stream. This //! module implements only the small subset Archipelago needs: connect, //! discover the local node, send/receive text packets, and provide synthetic //! contacts to the existing mesh listener. use super::protocol::{InboundFrame, ParsedContact}; use super::types::{DeviceInfo, DeviceType}; use anyhow::{Context, Result}; use std::collections::HashMap; use std::time::Duration; use tracing::{debug, info, warn}; const BAUD_RATE: u32 = 115200; const READ_TIMEOUT: Duration = Duration::from_secs(5); const WRITE_TIMEOUT: Duration = Duration::from_secs(2); const READ_BUF_SIZE: usize = 512; const START1: u8 = 0x94; const START2: u8 = 0xc3; const TO_RADIO_MAX: usize = 512; const BROADCAST_NUM: u32 = 0xffff_ffff; const TEXT_MESSAGE_APP: u32 = 1; const TO_RADIO_PACKET: u64 = 1; const TO_RADIO_WANT_CONFIG_ID: u64 = 3; const TO_RADIO_HEARTBEAT: u64 = 7; const FROM_RADIO_PACKET: u64 = 2; const FROM_RADIO_MY_INFO: u64 = 3; const FROM_RADIO_NODE_INFO: u64 = 4; const FROM_RADIO_CONFIG_COMPLETE_ID: u64 = 7; const FROM_RADIO_REBOOTED: u64 = 8; /// Async Meshtastic device handle. pub struct MeshtasticDevice { port: serial2_tokio::SerialPort, read_buf: Vec, node_num: Option, user_id: Option, long_name: Option, short_name: Option, contacts: HashMap, device_path: String, } impl MeshtasticDevice { pub async fn open(path: &str) -> Result { match tokio::fs::metadata(path).await { Ok(meta) => debug!(path = %path, permissions = ?meta.permissions(), "Device node exists"), Err(e) => anyhow::bail!("Serial device {} not accessible: {}", path, e), } let port = serial2_tokio::SerialPort::open(path, BAUD_RATE).context(format!( "Failed to open serial port {} (permission denied? device busy?)", path ))?; info!(path = %path, baud = BAUD_RATE, "Opened Meshtastic serial port"); Ok(Self { port, read_buf: Vec::with_capacity(READ_BUF_SIZE), node_num: None, user_id: None, long_name: None, short_name: None, contacts: HashMap::new(), device_path: path.to_string(), }) } pub async fn initialize(&mut self) -> Result { info!(path = %self.device_path, "Starting Meshtastic handshake"); self.send_to_radio(&encode_want_config()).await?; let deadline = tokio::time::Instant::now() + READ_TIMEOUT; let mut saw_meshtastic_frame = false; loop { let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); if remaining.is_zero() { break; } match tokio::time::timeout(remaining.min(Duration::from_millis(250)), self.read_from_radio()).await { Ok(Ok(Some(frame))) => { saw_meshtastic_frame = true; self.handle_from_radio(&frame); if self.node_num.is_some() && self.user_id.is_some() { break; } } Ok(Ok(None)) | Err(_) => {} Ok(Err(e)) => return Err(e), } } if !saw_meshtastic_frame { anyhow::bail!("No Meshtastic serial API response"); } let node_id = self.node_num.unwrap_or(0); let firmware_version = self .long_name .clone() .or_else(|| self.user_id.clone()) .unwrap_or_else(|| "Meshtastic".to_string()); info!(node_id, name = %firmware_version, "Meshtastic identity"); Ok(DeviceInfo { firmware_version, node_id, max_contacts: 200, device_type: DeviceType::Meshtastic, }) } pub async fn set_advert_name(&mut self, name: &str) -> Result<()> { self.long_name = Some(name.to_string()); Ok(()) } pub async fn send_self_advert(&mut self) -> Result<()> { self.send_to_radio(&encode_heartbeat()).await } pub async fn send_channel_text(&mut self, _channel: u8, msg: &[u8]) -> Result<()> { let text = String::from_utf8_lossy(msg); let packet = encode_mesh_packet(BROADCAST_NUM, TEXT_MESSAGE_APP, text.as_bytes()); self.send_to_radio(&encode_to_radio_variant(TO_RADIO_PACKET, &packet)) .await } pub async fn get_contacts(&mut self) -> Result> { Ok(self.contacts.values().cloned().collect()) } pub async fn reset_contact_path(&mut self, _pubkey: &[u8; 32]) -> Result<()> { Ok(()) } pub async fn sync_messages(&mut self) -> Result> { Ok(Vec::new()) } pub async fn try_recv_frame(&mut self) -> Result> { let Some(frame) = self.read_from_radio().await? else { return Ok(None); }; Ok(self.handle_from_radio(&frame)) } pub fn advert_name(&self) -> Option { self.long_name .clone() .or_else(|| self.short_name.clone()) .or_else(|| self.user_id.clone()) } async fn send_to_radio(&mut self, payload: &[u8]) -> Result<()> { if payload.len() > TO_RADIO_MAX { anyhow::bail!("Meshtastic payload too large: {} bytes", payload.len()); } let mut frame = Vec::with_capacity(4 + payload.len()); frame.push(START1); frame.push(START2); frame.extend_from_slice(&(payload.len() as u16).to_be_bytes()); frame.extend_from_slice(payload); tokio::time::timeout(WRITE_TIMEOUT, self.port.write_all(&frame)) .await .context("Meshtastic serial write timed out")? .context("Meshtastic serial write failed")?; Ok(()) } async fn read_from_radio(&mut self) -> Result>> { if let Some(frame) = decode_serial_frame(&mut self.read_buf) { return Ok(Some(frame)); } let mut tmp = [0u8; READ_BUF_SIZE]; match tokio::time::timeout(Duration::from_millis(50), self.port.read(&mut tmp)).await { Ok(Ok(0)) => anyhow::bail!("Meshtastic serial port closed"), Ok(Ok(n)) => self.read_buf.extend_from_slice(&tmp[..n]), Ok(Err(e)) => return Err(e).context("Meshtastic serial read error"), Err(_) => return Ok(None), } Ok(decode_serial_frame(&mut self.read_buf)) } fn handle_from_radio(&mut self, frame: &[u8]) -> Option { let Some((field, value)) = decode_top_level_variant(frame) else { return None; }; match field { FROM_RADIO_MY_INFO => { if let Some((node_num, user_id)) = parse_my_info(value) { self.node_num = Some(node_num); if let Some(user_id) = user_id { self.user_id = Some(user_id); } } None } FROM_RADIO_NODE_INFO => { self.update_node_info(value); None } FROM_RADIO_PACKET => self.packet_to_inbound_frame(value), FROM_RADIO_CONFIG_COMPLETE_ID | FROM_RADIO_REBOOTED => None, other => { debug!(field = other, len = value.len(), "Unhandled Meshtastic FromRadio field"); None } } } fn update_node_info(&mut self, data: &[u8]) { if let Some(node) = parse_node_info(data) { let key = synthetic_pubkey(node.num); let name = node .long_name .or(node.short_name) .or(node.id) .unwrap_or_else(|| format!("Meshtastic !{:08x}", node.num)); if Some(node.num) == self.node_num { self.long_name = Some(name.clone()); } self.contacts.insert( node.num, ParsedContact { public_key_hex: hex::encode(key), advert_name: name, last_advert: node.last_heard.unwrap_or_default(), contact_type: 1, path_len: 0xff, flags: 0, }, ); } } fn packet_to_inbound_frame(&mut self, data: &[u8]) -> Option { let packet = parse_mesh_packet(data)?; if packet.portnum != TEXT_MESSAGE_APP || packet.payload.is_empty() { return None; } let from = packet.from.unwrap_or(0); if Some(from) == self.node_num { return None; } let from_key = synthetic_pubkey(from); self.contacts.entry(from).or_insert_with(|| ParsedContact { public_key_hex: hex::encode(synthetic_pubkey(from)), advert_name: format!("Meshtastic !{:08x}", from), last_advert: 0, contact_type: 1, path_len: 0xff, flags: 0, }); let mut payload = Vec::with_capacity(15 + packet.payload.len()); payload.push(0); // SNR unknown payload.extend_from_slice(&[0, 0]); // reserved payload.extend_from_slice(&from_key[..6]); payload.push(0xff); // unknown/flood path payload.push(0); // text type payload.extend_from_slice(&0u32.to_le_bytes()); payload.extend_from_slice(&packet.payload); Some(InboundFrame { code: super::protocol::RESP_CONTACT_MSG_V3, data: payload, bytes_consumed: 0, }) } } fn decode_serial_frame(buf: &mut Vec) -> Option> { let start = buf.windows(2).position(|w| w == [START1, START2])?; if start > 0 { buf.drain(..start); } if buf.len() < 4 { return None; } let len = u16::from_be_bytes([buf[2], buf[3]]) as usize; if buf.len() < 4 + len { return None; } let payload = buf[4..4 + len].to_vec(); buf.drain(..4 + len); Some(payload) } fn encode_want_config() -> Vec { encode_to_radio_variant(TO_RADIO_WANT_CONFIG_ID, &encode_varint_field(1, 1)) } fn encode_heartbeat() -> Vec { encode_to_radio_variant(TO_RADIO_HEARTBEAT, &[]) } fn encode_to_radio_variant(field: u64, bytes: &[u8]) -> Vec { let mut out = Vec::new(); encode_len_field(field, bytes, &mut out); out } fn encode_mesh_packet(to: u32, portnum: u32, payload: &[u8]) -> Vec { let mut decoded = Vec::new(); encode_varint_field_into(1, portnum as u64, &mut decoded); encode_len_field(2, payload, &mut decoded); let mut packet = Vec::new(); encode_fixed32_field(2, to, &mut packet); encode_len_field(4, &decoded, &mut packet); packet } fn decode_top_level_variant(buf: &[u8]) -> Option<(u64, &[u8])> { let mut idx = 0; while idx < buf.len() { let (key, n) = read_varint(&buf[idx..])?; idx += n; let field = key >> 3; match key & 0x07 { 0 => { let (_, n) = read_varint(&buf[idx..])?; idx += n; } 2 => { let (len, n) = read_varint(&buf[idx..])?; idx += n; let end = idx.checked_add(len as usize)?; if end > buf.len() { return None; } if matches!(field, FROM_RADIO_PACKET | FROM_RADIO_MY_INFO | FROM_RADIO_NODE_INFO) { return Some((field, &buf[idx..end])); } idx = end; } _ => return None, } } None } fn parse_my_info(data: &[u8]) -> Option<(u32, Option)> { let mut idx = 0; let mut node_num = None; let mut user_id = None; while idx < data.len() { let (field, value, next) = next_field(data, idx)?; idx = next; match (field, value) { (1, FieldValue::Varint(v)) => node_num = Some(v as u32), (1, FieldValue::Fixed32(v)) => node_num = Some(v), (3, FieldValue::Bytes(b)) => user_id = parse_user(b).and_then(|u| u.id), _ => {} } } node_num.map(|n| (n, user_id)) } struct ParsedNode { num: u32, id: Option, long_name: Option, short_name: Option, last_heard: Option, } fn parse_node_info(data: &[u8]) -> Option { let mut idx = 0; let mut node = ParsedNode { num: 0, id: None, long_name: None, short_name: None, last_heard: None, }; while idx < data.len() { let (field, value, next) = next_field(data, idx)?; idx = next; match (field, value) { (1, FieldValue::Varint(v)) => node.num = v as u32, (1, FieldValue::Fixed32(v)) => node.num = v, (2, FieldValue::Bytes(b)) => { if let Some(user) = parse_user(b) { node.id = user.id; node.long_name = user.long_name; node.short_name = user.short_name; } } (5, FieldValue::Fixed32(v)) => node.last_heard = Some(v), _ => {} } } if node.num == 0 { None } else { Some(node) } } struct ParsedUser { id: Option, long_name: Option, short_name: Option, } fn parse_user(data: &[u8]) -> Option { let mut idx = 0; let mut user = ParsedUser { id: None, long_name: None, short_name: None, }; while idx < data.len() { let (field, value, next) = next_field(data, idx)?; idx = next; match (field, value) { (1, FieldValue::Bytes(b)) => user.id = string_field(b), (2, FieldValue::Bytes(b)) => user.long_name = string_field(b), (3, FieldValue::Bytes(b)) => user.short_name = string_field(b), _ => {} } } Some(user) } struct ParsedPacket { from: Option, portnum: u32, payload: Vec, } fn parse_mesh_packet(data: &[u8]) -> Option { let mut idx = 0; let mut from = None; let mut decoded = None; while idx < data.len() { let (field, value, next) = next_field(data, idx)?; idx = next; match (field, value) { (1, FieldValue::Fixed32(v)) => from = Some(v), (4, FieldValue::Bytes(b)) => decoded = Some(b), _ => {} } } let decoded = decoded?; let mut didx = 0; let mut portnum = 0; let mut payload = Vec::new(); while didx < decoded.len() { let (field, value, next) = next_field(decoded, didx)?; didx = next; match (field, value) { (1, FieldValue::Varint(v)) => portnum = v as u32, (2, FieldValue::Bytes(b)) => payload = b.to_vec(), _ => {} } } Some(ParsedPacket { from, portnum, payload, }) } enum FieldValue<'a> { Varint(u64), Fixed32(u32), Bytes(&'a [u8]), } fn next_field(buf: &[u8], idx: usize) -> Option<(u64, FieldValue<'_>, usize)> { let (key, n) = read_varint(&buf[idx..])?; let field = key >> 3; let mut pos = idx + n; match key & 0x07 { 0 => { let (v, n) = read_varint(&buf[pos..])?; pos += n; Some((field, FieldValue::Varint(v), pos)) } 2 => { let (len, n) = read_varint(&buf[pos..])?; pos += n; let end = pos.checked_add(len as usize)?; if end > buf.len() { return None; } Some((field, FieldValue::Bytes(&buf[pos..end]), end)) } 5 => { let end = pos.checked_add(4)?; if end > buf.len() { None } else { let value = u32::from_le_bytes([ buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3], ]); Some((field, FieldValue::Fixed32(value), end)) } } 1 => { let end = pos.checked_add(8)?; if end > buf.len() { None } else { Some((field, FieldValue::Bytes(&buf[pos..end]), end)) } } wire => { warn!(wire, "Unsupported Meshtastic protobuf wire type"); None } } } fn read_varint(buf: &[u8]) -> Option<(u64, usize)> { let mut out = 0u64; for (i, b) in buf.iter().copied().enumerate().take(10) { out |= ((b & 0x7f) as u64) << (7 * i); if b & 0x80 == 0 { return Some((out, i + 1)); } } None } fn encode_varint_field(field: u64, value: u64) -> Vec { let mut out = Vec::new(); encode_varint_field_into(field, value, &mut out); out } fn encode_varint_field_into(field: u64, value: u64, out: &mut Vec) { write_varint((field << 3) | 0, out); write_varint(value, out); } fn encode_len_field(field: u64, bytes: &[u8], out: &mut Vec) { write_varint((field << 3) | 2, out); write_varint(bytes.len() as u64, out); out.extend_from_slice(bytes); } fn encode_fixed32_field(field: u64, value: u32, out: &mut Vec) { write_varint((field << 3) | 5, out); out.extend_from_slice(&value.to_le_bytes()); } fn write_varint(mut value: u64, out: &mut Vec) { while value >= 0x80 { out.push((value as u8 & 0x7f) | 0x80); value >>= 7; } out.push(value as u8); } fn string_field(bytes: &[u8]) -> Option { std::str::from_utf8(bytes).ok().map(|s| s.to_string()) } fn synthetic_pubkey(node_num: u32) -> [u8; 32] { let mut out = [0u8; 32]; out[..4].copy_from_slice(&node_num.to_le_bytes()); out[4..15].copy_from_slice(b"meshtastic:"); out }