From de92bb2cd44d1345f5de7b2892bfa99790f4a1e3 Mon Sep 17 00:00:00 2001 From: Dorian Date: Tue, 17 Mar 2026 02:08:58 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=203=20Week=203=20=E2=80=94=20type?= =?UTF-8?q?d=20messages=20+=20store-and-forward=20outbox?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Create mesh/message_types.rs: typed message envelope system - MeshMessageType enum: Text, Alert, Invoice, PsbtHash, Coordinate, PrekeyBundle, SessionInit, BlockHeader, TxRelay, LightningRelay - TypedEnvelope: CBOR wire format with 0x02 prefix, optional Ed25519 sig - Payload types: AlertPayload (with AlertType enum), InvoicePayload (sats as u64), Coordinate (integer microdegrees, no float), PsbtHashPayload, BlockHeaderPayload, TxRelayPayload, LightningRelayPayload - Signed envelope creation + verification for alerts/block headers - 8 unit tests - Create mesh/outbox.rs: store-and-forward message queue - PendingMessage with TTL (24h default), retry count, relay hops (max 3) - MeshOutbox: persistent VecDeque, max 200 messages, expiry, relay support - Disk persistence to mesh-outbox.json - 6 unit tests: enqueue, deliver, expire, persistence, max size, relay hops Co-Authored-By: Claude Opus 4.6 (1M context) --- core/archipelago/src/mesh/message_types.rs | 423 +++++++++++++++++++++ core/archipelago/src/mesh/mod.rs | 4 + core/archipelago/src/mesh/outbox.rs | 333 ++++++++++++++++ 3 files changed, 760 insertions(+) create mode 100644 core/archipelago/src/mesh/message_types.rs create mode 100644 core/archipelago/src/mesh/outbox.rs diff --git a/core/archipelago/src/mesh/message_types.rs b/core/archipelago/src/mesh/message_types.rs new file mode 100644 index 00000000..f49978c4 --- /dev/null +++ b/core/archipelago/src/mesh/message_types.rs @@ -0,0 +1,423 @@ +//! Typed message envelope for mesh communication. +//! +//! Wraps all mesh messages in a CBOR envelope with type discrimination, +//! enabling different message kinds (TEXT, ALERT, INVOICE, COORDINATE, etc.) +//! over the same encrypted channel. +//! +//! Wire format: `[0x02: typed_marker] [CBOR envelope]` +//! The 0x02 prefix distinguishes typed messages from plain text (0x00) +//! and identity broadcasts (0x01 / ARCHY:2/3). + +use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; + +/// Wire prefix for typed messages. +pub const TYPED_MESSAGE_MARKER: u8 = 0x02; + +/// Message type discriminator. +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum MeshMessageType { + Text = 0, + Alert = 1, + Invoice = 2, + PsbtHash = 3, + Coordinate = 4, + PrekeyBundle = 5, + SessionInit = 6, + BlockHeader = 7, + TxRelay = 8, + TxRelayResponse = 9, + LightningRelay = 10, + LightningRelayResponse = 11, +} + +impl MeshMessageType { + pub fn from_u8(v: u8) -> Option { + match v { + 0 => Some(Self::Text), + 1 => Some(Self::Alert), + 2 => Some(Self::Invoice), + 3 => Some(Self::PsbtHash), + 4 => Some(Self::Coordinate), + 5 => Some(Self::PrekeyBundle), + 6 => Some(Self::SessionInit), + 7 => Some(Self::BlockHeader), + 8 => Some(Self::TxRelay), + 9 => Some(Self::TxRelayResponse), + 10 => Some(Self::LightningRelay), + 11 => Some(Self::LightningRelayResponse), + _ => None, + } + } + + pub fn label(&self) -> &'static str { + match self { + Self::Text => "text", + Self::Alert => "alert", + Self::Invoice => "invoice", + Self::PsbtHash => "psbt_hash", + Self::Coordinate => "coordinate", + Self::PrekeyBundle => "prekey_bundle", + Self::SessionInit => "session_init", + Self::BlockHeader => "block_header", + Self::TxRelay => "tx_relay", + Self::TxRelayResponse => "tx_relay_response", + Self::LightningRelay => "lightning_relay", + Self::LightningRelayResponse => "lightning_relay_response", + } + } +} + +// ─── Wire Envelope ────────────────────────────────────────────────────── + +/// CBOR wire envelope wrapping any typed message. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TypedEnvelope { + /// Message type. + pub t: u8, + /// Payload bytes (type-specific CBOR or raw data). + pub v: Vec, + /// Unix timestamp (seconds since epoch). + pub ts: u32, + /// Optional Ed25519 signature of (t || v || ts_bytes) — for signed messages. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub sig: Option>, +} + +impl TypedEnvelope { + /// Create an unsigned envelope. + pub fn new(msg_type: MeshMessageType, payload: Vec) -> Self { + let ts = chrono::Utc::now().timestamp() as u32; + Self { + t: msg_type as u8, + v: payload, + ts, + sig: None, + } + } + + /// Create a signed envelope (for ALERT, BlockHeader). + pub fn new_signed( + msg_type: MeshMessageType, + payload: Vec, + signing_key: &ed25519_dalek::SigningKey, + ) -> Self { + use ed25519_dalek::Signer; + let ts = chrono::Utc::now().timestamp() as u32; + + // Sign: type byte || payload || timestamp (4 bytes LE) + let mut sign_data = Vec::with_capacity(1 + payload.len() + 4); + sign_data.push(msg_type as u8); + sign_data.extend_from_slice(&payload); + sign_data.extend_from_slice(&ts.to_le_bytes()); + let signature = signing_key.sign(&sign_data); + + Self { + t: msg_type as u8, + v: payload, + ts, + sig: Some(signature.to_bytes().to_vec()), + } + } + + /// Verify signature if present. + pub fn verify_signature(&self, verifying_key: &ed25519_dalek::VerifyingKey) -> Result { + let Some(sig_bytes) = &self.sig else { return Ok(false) }; + let signature = ed25519_dalek::Signature::from_slice(sig_bytes) + .context("Invalid signature bytes")?; + + let mut sign_data = Vec::with_capacity(1 + self.v.len() + 4); + sign_data.push(self.t); + sign_data.extend_from_slice(&self.v); + sign_data.extend_from_slice(&self.ts.to_le_bytes()); + + verifying_key + .verify_strict(&sign_data, &signature) + .context("Signature verification failed")?; + Ok(true) + } + + /// Get the message type. + pub fn message_type(&self) -> Option { + MeshMessageType::from_u8(self.t) + } + + /// Encode to wire format: [0x02] [CBOR envelope]. + pub fn to_wire(&self) -> Result> { + let mut buf = Vec::new(); + buf.push(TYPED_MESSAGE_MARKER); + ciborium::into_writer(self, &mut buf).context("CBOR encode failed")?; + Ok(buf) + } + + /// Decode from wire format. + pub fn from_wire(data: &[u8]) -> Result { + if data.is_empty() || data[0] != TYPED_MESSAGE_MARKER { + anyhow::bail!("Not a typed message (expected 0x02 prefix)"); + } + ciborium::from_reader(&data[1..]).context("CBOR decode failed") + } + + /// Check if raw bytes are a typed message. + pub fn is_typed(data: &[u8]) -> bool { + !data.is_empty() && data[0] == TYPED_MESSAGE_MARKER + } +} + +// ─── Payload Types ────────────────────────────────────────────────────── + +/// Alert severity / type. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum AlertType { + Emergency, + Status, + DeadMan, + BlockHeader, +} + +/// GPS coordinate stored as integer microdegrees (no floating point). +/// 1 microdegree ≈ 0.11 meters at the equator. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Coordinate { + /// Latitude in microdegrees (degrees × 1_000_000). + pub lat: i32, + /// Longitude in microdegrees (degrees × 1_000_000). + pub lng: i32, + /// Optional human-readable label. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub label: Option, +} + +impl Coordinate { + /// Create from floating-point degrees (convenience for UI layer). + pub fn from_degrees(lat: f64, lng: f64, label: Option) -> Self { + Self { + lat: (lat * 1_000_000.0) as i32, + lng: (lng * 1_000_000.0) as i32, + label, + } + } + + /// Convert to floating-point degrees (for display only). + pub fn lat_degrees(&self) -> f64 { + self.lat as f64 / 1_000_000.0 + } + + pub fn lng_degrees(&self) -> f64 { + self.lng as f64 / 1_000_000.0 + } +} + +/// Alert payload. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AlertPayload { + pub alert_type: AlertType, + pub message: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub coordinate: Option, +} + +/// Lightning invoice payload. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct InvoicePayload { + pub bolt11: String, + /// Amount in satoshis — always u64, never float. + pub amount_sats: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub memo: Option, + /// Payment hash hex (for tracking). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub payment_hash: Option, +} + +/// PSBT coordination payload. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PsbtHashPayload { + /// SHA-256 hash of the PSBT (hex). + pub psbt_hash: String, + pub description: String, + /// Amount in satoshis. + pub amount_sats: u64, +} + +/// Block header announcement (from internet-connected to mesh-only peers). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BlockHeaderPayload { + pub height: u64, + /// Block hash (64 hex chars). + pub hash: String, + /// Previous block hash for chain continuity. + pub prev_hash: String, + /// Block timestamp. + pub timestamp: u32, + /// Announced by DID. + pub announced_by: String, +} + +/// Transaction relay request (mesh-only → internet peer). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TxRelayPayload { + pub tx_hex: String, + pub request_id: u64, +} + +/// Transaction relay response (internet peer → mesh-only). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TxRelayResponsePayload { + pub request_id: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub txid: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +/// Lightning invoice relay request. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LightningRelayPayload { + pub bolt11: String, + pub amount_sats: u64, + pub request_id: u64, +} + +/// Lightning relay response (proof of payment). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LightningRelayResponsePayload { + pub request_id: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub payment_hash: Option, + /// Preimage as proof of payment. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub preimage: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +// ─── Helpers ──────────────────────────────────────────────────────────── + +/// Encode a payload type to CBOR bytes. +pub fn encode_payload(payload: &T) -> Result> { + let mut buf = Vec::new(); + ciborium::into_writer(payload, &mut buf).context("CBOR payload encode failed")?; + Ok(buf) +} + +/// Decode a payload type from CBOR bytes. +pub fn decode_payload Deserialize<'a>>(data: &[u8]) -> Result { + ciborium::from_reader(data).context("CBOR payload decode failed") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_typed_envelope_wire_roundtrip() { + let envelope = TypedEnvelope::new( + MeshMessageType::Text, + b"hello mesh".to_vec(), + ); + let wire = envelope.to_wire().unwrap(); + assert_eq!(wire[0], TYPED_MESSAGE_MARKER); + + let decoded = TypedEnvelope::from_wire(&wire).unwrap(); + assert_eq!(decoded.t, MeshMessageType::Text as u8); + assert_eq!(decoded.v, b"hello mesh"); + } + + #[test] + fn test_signed_envelope() { + use ed25519_dalek::SigningKey; + use rand::rngs::OsRng; + + let key = SigningKey::generate(&mut OsRng); + let envelope = TypedEnvelope::new_signed( + MeshMessageType::Alert, + b"emergency broadcast".to_vec(), + &key, + ); + + assert!(envelope.sig.is_some()); + assert!(envelope.verify_signature(&key.verifying_key()).unwrap()); + } + + #[test] + fn test_tampered_signature_fails() { + use ed25519_dalek::SigningKey; + use rand::rngs::OsRng; + + let key = SigningKey::generate(&mut OsRng); + let mut envelope = TypedEnvelope::new_signed( + MeshMessageType::Alert, + b"test".to_vec(), + &key, + ); + // Tamper with payload + envelope.v = b"tampered".to_vec(); + assert!(envelope.verify_signature(&key.verifying_key()).is_err()); + } + + #[test] + fn test_invoice_payload_roundtrip() { + let invoice = InvoicePayload { + bolt11: "lnbc500n1pjtest...".to_string(), + amount_sats: 50000, + memo: Some("Pizza money".to_string()), + payment_hash: None, + }; + let encoded = encode_payload(&invoice).unwrap(); + let decoded: InvoicePayload = decode_payload(&encoded).unwrap(); + assert_eq!(decoded.amount_sats, 50000); + assert_eq!(decoded.memo, Some("Pizza money".to_string())); + } + + #[test] + fn test_coordinate_microdegrees() { + let coord = Coordinate::from_degrees(51.5074, -0.1278, Some("London".to_string())); + assert_eq!(coord.lat, 51507400); + assert_eq!(coord.lng, -127800); + assert!((coord.lat_degrees() - 51.5074).abs() < 0.001); + assert!((coord.lng_degrees() - (-0.1278)).abs() < 0.001); + } + + #[test] + fn test_alert_payload_roundtrip() { + let alert = AlertPayload { + alert_type: AlertType::DeadMan, + message: "Node unresponsive for 6h".to_string(), + coordinate: Some(Coordinate::from_degrees(30.2672, -97.7431, Some("Austin".to_string()))), + }; + let encoded = encode_payload(&alert).unwrap(); + let decoded: AlertPayload = decode_payload(&encoded).unwrap(); + assert_eq!(decoded.alert_type, AlertType::DeadMan); + assert!(decoded.coordinate.is_some()); + } + + #[test] + fn test_is_typed() { + assert!(TypedEnvelope::is_typed(&[0x02, 0x01])); + assert!(!TypedEnvelope::is_typed(&[0x00, 0x01])); + assert!(!TypedEnvelope::is_typed(&[])); + } + + #[test] + fn test_message_type_label() { + assert_eq!(MeshMessageType::Invoice.label(), "invoice"); + assert_eq!(MeshMessageType::Alert.label(), "alert"); + } + + #[test] + fn test_block_header_payload() { + let header = BlockHeaderPayload { + height: 890412, + hash: "00000000000000000001abc".to_string(), + prev_hash: "00000000000000000001aab".to_string(), + timestamp: 1710633600, + announced_by: "did:key:z6MkTest".to_string(), + }; + let encoded = encode_payload(&header).unwrap(); + let decoded: BlockHeaderPayload = decode_payload(&encoded).unwrap(); + assert_eq!(decoded.height, 890412); + } +} diff --git a/core/archipelago/src/mesh/mod.rs b/core/archipelago/src/mesh/mod.rs index b4b56381..76c8b8c1 100644 --- a/core/archipelago/src/mesh/mod.rs +++ b/core/archipelago/src/mesh/mod.rs @@ -15,6 +15,10 @@ pub mod serial; #[allow(dead_code)] pub mod types; #[allow(dead_code)] +pub mod message_types; +#[allow(dead_code)] +pub mod outbox; +#[allow(dead_code)] pub mod ratchet; #[allow(dead_code)] pub mod session; diff --git a/core/archipelago/src/mesh/outbox.rs b/core/archipelago/src/mesh/outbox.rs new file mode 100644 index 00000000..b46d880f --- /dev/null +++ b/core/archipelago/src/mesh/outbox.rs @@ -0,0 +1,333 @@ +//! Store-and-forward message queue for mesh networking. +//! +//! When a destination peer is offline or unreachable, messages are queued +//! in the outbox and retried periodically. Messages expire after TTL (24h default). +//! Intermediate nodes can relay messages for peers up to 3 hops away. + +use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; +use std::path::{Path, PathBuf}; +use tokio::sync::RwLock; +use tracing::{debug, info, warn}; + +/// Default time-to-live for queued messages (24 hours). +const DEFAULT_TTL_SECS: u64 = 86400; + +/// Maximum relay hops for store-and-forward. +const MAX_RELAY_HOPS: u8 = 3; + +/// Maximum queued messages to prevent unbounded memory use. +const MAX_QUEUE_SIZE: usize = 200; + +const OUTBOX_FILE: &str = "mesh-outbox.json"; + +/// A message waiting to be delivered. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PendingMessage { + pub id: u64, + /// Destination peer DID. + pub dest_did: String, + /// Encrypted payload bytes (already ratchet-encrypted or static-encrypted). + #[serde(with = "base64_bytes")] + pub encrypted_payload: Vec, + /// When this message was created (RFC 3339). + pub created_at: String, + /// Time-to-live in seconds. + pub ttl_secs: u64, + /// Number of times we've attempted delivery. + pub retry_count: u32, + /// How many relay hops this message has traversed. + pub relay_hops: u8, + /// Original sender DID (for relayed messages). + pub from_did: String, +} + +impl PendingMessage { + /// Check if this message has expired. + pub fn is_expired(&self) -> bool { + let Ok(created) = chrono::DateTime::parse_from_rfc3339(&self.created_at) else { + return true; // Can't parse = treat as expired + }; + let age = chrono::Utc::now().signed_duration_since(created); + age.num_seconds() as u64 > self.ttl_secs + } + + /// Check if this message can be relayed further. + pub fn can_relay(&self) -> bool { + self.relay_hops < MAX_RELAY_HOPS + } +} + +/// Persistent store-and-forward queue. +pub struct MeshOutbox { + queue: RwLock>, + data_dir: PathBuf, + next_id: RwLock, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +struct OutboxFile { + messages: Vec, + next_id: u64, +} + +impl MeshOutbox { + /// Load outbox from disk or create empty. + pub async fn load(data_dir: &Path) -> Result { + let path = data_dir.join(OUTBOX_FILE); + let (messages, next_id) = if path.exists() { + let content = tokio::fs::read_to_string(&path) + .await + .context("Failed to read mesh outbox")?; + let file: OutboxFile = serde_json::from_str(&content).unwrap_or_default(); + (VecDeque::from(file.messages), file.next_id) + } else { + (VecDeque::new(), 1) + }; + + Ok(Self { + queue: RwLock::new(messages), + data_dir: data_dir.to_path_buf(), + next_id: RwLock::new(next_id), + }) + } + + /// Persist queue to disk. + pub async fn save(&self) -> Result<()> { + let queue = self.queue.read().await; + let next_id = *self.next_id.read().await; + let file = OutboxFile { + messages: queue.iter().cloned().collect(), + next_id, + }; + let content = serde_json::to_string_pretty(&file) + .context("Failed to serialize outbox")?; + tokio::fs::write(self.data_dir.join(OUTBOX_FILE), content) + .await + .context("Failed to write outbox")?; + Ok(()) + } + + /// Enqueue a message for delivery. + pub async fn enqueue( + &self, + dest_did: &str, + from_did: &str, + encrypted_payload: Vec, + ttl_secs: Option, + ) -> Result { + let mut next_id = self.next_id.write().await; + let id = *next_id; + *next_id += 1; + + let msg = PendingMessage { + id, + dest_did: dest_did.to_string(), + encrypted_payload, + created_at: chrono::Utc::now().to_rfc3339(), + ttl_secs: ttl_secs.unwrap_or(DEFAULT_TTL_SECS), + retry_count: 0, + relay_hops: 0, + from_did: from_did.to_string(), + }; + + let mut queue = self.queue.write().await; + // Evict oldest if over limit + while queue.len() >= MAX_QUEUE_SIZE { + queue.pop_front(); + } + queue.push_back(msg); + + info!(id = id, dest = %dest_did, "Message queued for delivery"); + Ok(id) + } + + /// Enqueue a relayed message (from another peer, not originated by us). + pub async fn enqueue_relay(&self, mut msg: PendingMessage) -> Result<()> { + if !msg.can_relay() { + anyhow::bail!("Message exceeded max relay hops ({})", MAX_RELAY_HOPS); + } + msg.relay_hops += 1; + + let mut queue = self.queue.write().await; + while queue.len() >= MAX_QUEUE_SIZE { + queue.pop_front(); + } + queue.push_back(msg); + Ok(()) + } + + /// Remove expired messages from the queue. + pub async fn expire_stale(&self) -> usize { + let mut queue = self.queue.write().await; + let before = queue.len(); + queue.retain(|msg| !msg.is_expired()); + let expired = before - queue.len(); + if expired > 0 { + debug!(expired = expired, "Expired stale outbox messages"); + } + expired + } + + /// Get messages pending for a specific peer. + pub async fn messages_for_peer(&self, did: &str) -> Vec { + self.queue + .read() + .await + .iter() + .filter(|m| m.dest_did == did) + .cloned() + .collect() + } + + /// Mark a message as delivered (remove from queue). + pub async fn mark_delivered(&self, id: u64) -> bool { + let mut queue = self.queue.write().await; + let before = queue.len(); + queue.retain(|m| m.id != id); + queue.len() < before + } + + /// Increment retry count for a message. + pub async fn increment_retry(&self, id: u64) { + let mut queue = self.queue.write().await; + if let Some(msg) = queue.iter_mut().find(|m| m.id == id) { + msg.retry_count += 1; + } + } + + /// Get all pending messages (for RPC status). + pub async fn list(&self, limit: Option) -> Vec { + let queue = self.queue.read().await; + let limit = limit.unwrap_or(50); + queue.iter().take(limit).cloned().collect() + } + + /// Count of pending messages. + pub async fn count(&self) -> usize { + self.queue.read().await.len() + } +} + +// ─── base64 serde for encrypted payloads ──────────────────────────────── + +mod base64_bytes { + use base64::Engine; + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(bytes: &Vec, s: S) -> Result { + let encoded = base64::engine::general_purpose::STANDARD.encode(bytes); + s.serialize_str(&encoded) + } + + pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result, D::Error> { + let s = String::deserialize(d)?; + base64::engine::general_purpose::STANDARD + .decode(&s) + .map_err(serde::de::Error::custom) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_enqueue_and_list() { + let dir = tempfile::tempdir().unwrap(); + let outbox = MeshOutbox::load(dir.path()).await.unwrap(); + + let id = outbox + .enqueue("did:key:z6MkDest", "did:key:z6MkSelf", vec![1, 2, 3], None) + .await + .unwrap(); + + assert_eq!(outbox.count().await, 1); + let msgs = outbox.list(None).await; + assert_eq!(msgs[0].id, id); + assert_eq!(msgs[0].dest_did, "did:key:z6MkDest"); + } + + #[tokio::test] + async fn test_mark_delivered() { + let dir = tempfile::tempdir().unwrap(); + let outbox = MeshOutbox::load(dir.path()).await.unwrap(); + + let id = outbox + .enqueue("did:key:z6MkDest", "did:key:z6MkSelf", vec![1], None) + .await + .unwrap(); + + assert!(outbox.mark_delivered(id).await); + assert_eq!(outbox.count().await, 0); + } + + #[tokio::test] + async fn test_expire_stale() { + let dir = tempfile::tempdir().unwrap(); + let outbox = MeshOutbox::load(dir.path()).await.unwrap(); + + // Enqueue with 0 TTL (immediately expired) + outbox + .enqueue("did:key:z6MkDest", "did:key:z6MkSelf", vec![1], Some(0)) + .await + .unwrap(); + + let expired = outbox.expire_stale().await; + assert_eq!(expired, 1); + assert_eq!(outbox.count().await, 0); + } + + #[tokio::test] + async fn test_persistence_roundtrip() { + let dir = tempfile::tempdir().unwrap(); + let outbox = MeshOutbox::load(dir.path()).await.unwrap(); + + outbox + .enqueue("did:key:z6MkDest", "did:key:z6MkSelf", vec![42, 43, 44], None) + .await + .unwrap(); + outbox.save().await.unwrap(); + + // Reload + let outbox2 = MeshOutbox::load(dir.path()).await.unwrap(); + assert_eq!(outbox2.count().await, 1); + let msgs = outbox2.list(None).await; + assert_eq!(msgs[0].encrypted_payload, vec![42, 43, 44]); + } + + #[tokio::test] + async fn test_max_queue_size() { + let dir = tempfile::tempdir().unwrap(); + let outbox = MeshOutbox::load(dir.path()).await.unwrap(); + + for i in 0..210 { + outbox + .enqueue("did:key:z6MkDest", "did:key:z6MkSelf", vec![i as u8], None) + .await + .unwrap(); + } + + // Should cap at MAX_QUEUE_SIZE + assert!(outbox.count().await <= 200); + } + + #[test] + fn test_relay_hops() { + let msg = PendingMessage { + id: 1, + dest_did: "did:key:test".to_string(), + encrypted_payload: vec![], + created_at: chrono::Utc::now().to_rfc3339(), + ttl_secs: 86400, + retry_count: 0, + relay_hops: 2, + from_did: "did:key:sender".to_string(), + }; + assert!(msg.can_relay()); // 2 < 3 + + let msg2 = PendingMessage { relay_hops: 3, ..msg }; + assert!(!msg2.can_relay()); // 3 >= 3 + } +}