394 lines
15 KiB
Rust
Raw Normal View History

2026-03-17 00:03:08 +00:00
//! Async serial driver for Meshcore devices.
//!
//! Handles opening the serial port, reading/writing frames,
//! and the initialization handshake sequence.
use super::protocol::{self, InboundFrame};
use super::types::DeviceInfo;
use anyhow::{Context, Result};
use std::time::Duration;
use tracing::{debug, info, warn};
/// Serial port configuration for Meshcore Companion USB.
const BAUD_RATE: u32 = 115200;
/// Timeout for reading a response frame from the device.
const READ_TIMEOUT: Duration = Duration::from_secs(5);
/// Timeout for writing a frame to the device.
const WRITE_TIMEOUT: Duration = Duration::from_secs(2);
/// Buffer size for serial reads.
const READ_BUF_SIZE: usize = 512;
/// Application name sent during handshake.
const APP_NAME: &str = "Archipelago";
/// Async Meshcore device handle.
pub struct MeshcoreDevice {
port: serial2_tokio::SerialPort,
read_buf: Vec<u8>,
pub node_id: Option<u32>,
pub advert_name: Option<String>,
pub device_info: Option<DeviceInfo>,
device_path: String,
}
impl MeshcoreDevice {
/// Open a serial port and verify it's a Meshcore device.
pub async fn open(path: &str) -> Result<Self> {
// Check device exists before trying to open (better error message)
match tokio::fs::metadata(path).await {
Ok(meta) => {
debug!(path = %path, permissions = ?meta.permissions(), "Device node exists");
}
Err(e) => {
anyhow::bail!(
"Serial device {} not accessible: {} (check PrivateDevices in systemd, or USB connection)",
path, e
);
}
}
2026-03-17 00:03:08 +00:00
let port = serial2_tokio::SerialPort::open(path, BAUD_RATE)
.context(format!("Failed to open serial port {} (permission denied? device busy?)", path))?;
2026-03-17 00:03:08 +00:00
info!(path = %path, baud = BAUD_RATE, "Opened serial port");
Ok(Self {
port,
read_buf: Vec::with_capacity(READ_BUF_SIZE),
node_id: None,
advert_name: None,
device_info: None,
device_path: path.to_string(),
})
}
/// Run the Meshcore initialization handshake.
/// Matches the official meshcore_py library sequence:
/// 1. CMD_APP_START -> RESP_SELF_INFO (this is the first command, not device_query)
/// 2. CMD_SET_DEVICE_TIME (sync clock)
pub async fn initialize(&mut self) -> Result<DeviceInfo> {
info!("Starting Meshcore handshake on {}", self.device_path);
// Step 1: App start (the official library sends this first)
self.send_raw(&protocol::build_app_start(APP_NAME)).await?;
let frame = self
.recv_frame_timeout(READ_TIMEOUT)
.await
.context("No response to APP_START — is this a Meshcore Companion USB device?")?;
info!(code = frame.code, data_len = frame.data.len(), "Got response to APP_START");
if frame.code == protocol::RESP_ERR {
anyhow::bail!("App start failed: {}", protocol::parse_error(&frame.data));
}
// The response could be SELF_INFO or something else depending on firmware version
let (node_id, name) = if frame.code == protocol::RESP_SELF_INFO {
protocol::parse_self_info(&frame.data)
.context("Failed to parse self info")?
} else {
// Try to parse whatever we got
info!(code = frame.code, "Unexpected response code, trying to parse as self info");
protocol::parse_self_info(&frame.data)
.unwrap_or((0, String::new()))
};
info!(node_id, name = %name, "Meshcore identity");
self.node_id = Some(node_id);
self.advert_name = Some(name.clone());
// Step 2: Sync device clock
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.send_raw(&protocol::build_set_device_time(now)).await?;
// Time set response is best-effort — don't fail if it times out
match self.recv_frame_timeout(Duration::from_secs(2)).await {
Ok(frame) if frame.code == protocol::RESP_OK => {
debug!("Device clock synced");
}
Ok(frame) => {
warn!(code = frame.code, "Unexpected response to SET_DEVICE_TIME");
}
Err(_) => {
warn!("No response to SET_DEVICE_TIME (continuing anyway)");
}
}
let info = DeviceInfo {
firmware_version: name.clone(),
node_id,
max_contacts: 100,
device_type: super::types::DeviceType::Meshcore,
};
self.device_info = Some(info.clone());
info!("Meshcore initialization complete on {}", self.device_path);
Ok(info)
}
/// Set the advertised name on the mesh network.
pub async fn set_advert_name(&mut self, name: &str) -> Result<()> {
self.send_raw(&protocol::build_set_advert_name(name)).await?;
let frame = self.recv_frame_timeout(READ_TIMEOUT).await?;
if frame.code == protocol::RESP_ERR {
anyhow::bail!("Set advert name failed: {}", protocol::parse_error(&frame.data));
}
self.advert_name = Some(name.to_string());
Ok(())
}
/// Broadcast our advertisement to the mesh.
pub async fn send_self_advert(&mut self) -> Result<()> {
self.send_raw(&protocol::build_send_self_advert()).await?;
// Response is RESP_OK or RESP_SENT
let frame = self.recv_frame_timeout(READ_TIMEOUT).await?;
if frame.code == protocol::RESP_ERR {
anyhow::bail!("Self advert failed: {}", protocol::parse_error(&frame.data));
}
Ok(())
}
/// Send a text message to a contact by their public key prefix (first 6 bytes).
pub async fn send_text(&mut self, dest_pubkey_prefix: &[u8; 6], msg: &[u8]) -> Result<()> {
let frame_data = protocol::build_send_text(dest_pubkey_prefix, msg)?;
self.send_raw(&frame_data).await?;
let frame = self.recv_frame_timeout(READ_TIMEOUT).await?;
if frame.code == protocol::RESP_ERR {
anyhow::bail!("Send text failed: {}", protocol::parse_error(&frame.data));
}
Ok(())
}
/// Broadcast a text message on a channel.
pub async fn send_channel_text(&mut self, channel: u8, msg: &[u8]) -> Result<()> {
let frame_data = protocol::build_send_channel_text(channel, msg)?;
self.send_raw(&frame_data).await?;
let frame = self.recv_frame_timeout(READ_TIMEOUT).await?;
if frame.code == protocol::RESP_ERR {
anyhow::bail!(
"Channel broadcast failed: {}",
protocol::parse_error(&frame.data)
);
}
Ok(())
}
/// Get the list of known contacts from the device.
/// Protocol: CMD_GET_CONTACTS -> CONTACT_START(count) -> N×CONTACT -> CONTACT_END
pub async fn get_contacts(&mut self) -> Result<Vec<protocol::ParsedContact>> {
self.send_raw(&protocol::build_get_contacts()).await?;
let mut contacts = Vec::new();
loop {
let frame = self.recv_frame_timeout(READ_TIMEOUT).await?;
match frame.code {
protocol::RESP_CONTACT_START => {
// Contains the count of contacts to follow
let count = if frame.data.len() >= 4 {
u32::from_le_bytes([frame.data[0], frame.data[1], frame.data[2], frame.data[3]])
} else {
0
};
debug!(count, "Contact list start");
}
protocol::RESP_CONTACT => {
match protocol::parse_contact(&frame.data) {
Ok(contact) => contacts.push(contact),
Err(e) => warn!("Failed to parse contact: {}", e),
}
}
protocol::RESP_CONTACT_END => {
debug!(count = contacts.len(), "Contact list complete");
break;
}
protocol::RESP_OK => break,
protocol::RESP_ERR => {
anyhow::bail!("Get contacts failed: {}", protocol::parse_error(&frame.data));
}
_ => {
debug!(code = frame.code, "Unexpected response during contact list");
// Don't break — might be a push notification interspersed
}
}
}
Ok(contacts)
}
/// Retrieve queued messages from the device.
/// Returns raw frames (code + data) for the listener to parse.
pub async fn sync_messages(&mut self) -> Result<Vec<protocol::InboundFrame>> {
self.send_raw(&protocol::build_sync_next_message()).await?;
let mut frames = Vec::new();
loop {
let frame = self.recv_frame_timeout(READ_TIMEOUT).await?;
match frame.code {
// All message types (v1 and v3)
protocol::RESP_CONTACT_MSG | protocol::RESP_CONTACT_MSG_V3
| protocol::RESP_CHANNEL_MSG | protocol::RESP_CHANNEL_MSG_V3 => {
frames.push(frame);
// Request next message
self.send_raw(&protocol::build_sync_next_message()).await?;
}
protocol::RESP_NO_MORE_MESSAGES => break,
protocol::RESP_OK => break,
protocol::RESP_ERR => {
anyhow::bail!(
"Sync messages failed: {}",
protocol::parse_error(&frame.data)
);
}
_ => {
// Push notifications can arrive during sync — skip them
if protocol::is_push_notification(frame.code) {
continue;
}
debug!(code = frame.code, "Unexpected response during message sync");
break;
}
}
}
Ok(frames)
}
/// Write raw bytes to the serial port.
pub async fn send_raw(&mut self, data: &[u8]) -> Result<()> {
tokio::time::timeout(WRITE_TIMEOUT, self.port.write_all(data))
.await
.context("Serial write timed out")?
.context("Serial write failed")?;
Ok(())
}
/// Try to read and parse one complete inbound frame.
/// Returns the frame if one is available, or reads more data from serial.
pub async fn try_recv_frame(&mut self) -> Result<Option<InboundFrame>> {
// First check if we already have a complete frame in the buffer
if let Some(frame) = protocol::decode_frame(&self.read_buf) {
let consumed = frame.bytes_consumed;
let result = frame;
self.read_buf.drain(..consumed);
return Ok(Some(result));
}
// Try to read more data (non-blocking via small timeout)
let mut tmp = [0u8; READ_BUF_SIZE];
match tokio::time::timeout(Duration::from_millis(50), self.port.read(&mut tmp)).await {
Ok(Ok(n)) if n > 0 => {
self.read_buf.extend_from_slice(&tmp[..n]);
}
_ => return Ok(None),
}
// Try parsing again with new data
if let Some(frame) = protocol::decode_frame(&self.read_buf) {
let consumed = frame.bytes_consumed;
let result = frame;
self.read_buf.drain(..consumed);
return Ok(Some(result));
}
Ok(None)
}
/// Read one complete inbound frame with timeout.
pub async fn recv_frame_timeout(&mut self, timeout: Duration) -> Result<InboundFrame> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
// Check buffer for a complete frame
if let Some(frame) = protocol::decode_frame(&self.read_buf) {
let consumed = frame.bytes_consumed;
let result = frame;
self.read_buf.drain(..consumed);
return Ok(result);
}
// Read more data from serial
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
anyhow::bail!("Timeout waiting for serial frame");
}
let mut tmp = [0u8; READ_BUF_SIZE];
match tokio::time::timeout(remaining.min(Duration::from_millis(100)), self.port.read(&mut tmp))
.await
{
Ok(Ok(0)) => anyhow::bail!("Serial port closed"),
Ok(Ok(n)) => {
self.read_buf.extend_from_slice(&tmp[..n]);
}
Ok(Err(e)) => return Err(e).context("Serial read error"),
Err(_) => continue, // timeout on this read, try again if deadline not reached
}
}
}
/// Get the device path this handle is connected to.
pub fn path(&self) -> &str {
&self.device_path
}
}
// ─── Device detection ───────────────────────────────────────────────────
/// Candidate serial device paths to check on Linux.
/// /dev/mesh-radio is a stable udev symlink (see 99-mesh-radio.rules).
2026-03-17 00:03:08 +00:00
const SERIAL_CANDIDATES: &[&str] = &[
"/dev/mesh-radio",
2026-03-17 00:03:08 +00:00
"/dev/ttyUSB0",
"/dev/ttyUSB1",
"/dev/ttyUSB2",
"/dev/ttyACM0",
"/dev/ttyACM1",
"/dev/ttyACM2",
];
/// Scan for serial devices that could be Meshcore radios.
/// Returns paths to existing serial device files.
pub async fn detect_serial_devices() -> Vec<String> {
let mut devices = Vec::new();
for path in SERIAL_CANDIDATES {
if tokio::fs::metadata(path).await.is_ok() {
devices.push(path.to_string());
}
}
devices
}
/// Try to open and handshake with each detected serial device.
/// Returns the first device that responds as Meshcore.
pub async fn probe_for_meshcore(paths: &[String]) -> Option<(String, DeviceInfo)> {
for path in paths {
debug!(path = %path, "Probing for Meshcore device");
match MeshcoreDevice::open(path).await {
Ok(mut device) => {
match device.initialize().await {
Ok(info) => {
info!(path = %path, firmware = %info.firmware_version, "Found Meshcore device");
// Drop the device so the listener can open it
drop(device);
return Some((path.clone(), info));
}
Err(e) => {
debug!(path = %path, error = %e, "Not a Meshcore device");
}
}
}
Err(e) => {
debug!(path = %path, error = %e, "Could not open serial port");
}
}
}
None
}