//! Node federation: trusted multi-node clusters with state sync. //! //! Nodes federate by exchanging invite codes containing DID + onion address. //! Trust is bilateral — both sides must agree. Federated nodes periodically //! sync container status, health metrics, and availability. use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; use std::path::Path; use tokio::fs; const FEDERATION_DIR: &str = "federation"; const NODES_FILE: &str = "nodes.json"; const INVITES_FILE: &str = "invites.json"; /// Trust level for a federated node. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum TrustLevel { Trusted, Observer, Untrusted, } impl std::fmt::Display for TrustLevel { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { TrustLevel::Trusted => write!(f, "trusted"), TrustLevel::Observer => write!(f, "observer"), TrustLevel::Untrusted => write!(f, "untrusted"), } } } /// A federated node in our cluster. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FederatedNode { pub did: String, pub pubkey: String, pub onion: String, #[serde(default)] pub name: Option, pub trust_level: TrustLevel, pub added_at: String, #[serde(default)] pub last_seen: Option, #[serde(default)] pub last_state: Option, } /// State snapshot received from a federated peer during sync. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NodeStateSnapshot { pub timestamp: String, #[serde(default)] pub node_name: Option, #[serde(default)] pub apps: Vec, #[serde(default)] pub cpu_usage_percent: Option, #[serde(default)] pub mem_used_bytes: Option, #[serde(default)] pub mem_total_bytes: Option, #[serde(default)] pub disk_used_bytes: Option, #[serde(default)] pub disk_total_bytes: Option, #[serde(default)] pub uptime_secs: Option, #[serde(default)] pub tor_active: Option, } /// Status of a single app/container on a remote node. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AppStatus { pub id: String, pub status: String, // "running", "stopped", "installed" #[serde(default)] pub version: Option, } /// A pending invite (outgoing or incoming). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FederationInvite { pub code: String, pub did: String, pub onion: String, pub pubkey: String, pub created_at: String, #[serde(default)] pub accepted: bool, } /// Top-level file structures. #[derive(Debug, Default, Serialize, Deserialize)] struct NodesFile { nodes: Vec, } #[derive(Debug, Default, Serialize, Deserialize)] struct InvitesFile { outgoing: Vec, incoming: Vec, } /// Ensure federation directory exists. async fn ensure_dir(data_dir: &Path) -> Result { let dir = data_dir.join(FEDERATION_DIR); fs::create_dir_all(&dir) .await .context("Failed to create federation directory")?; Ok(dir) } // ──────────────────────────── Node Management ──────────────────────────── pub async fn load_nodes(data_dir: &Path) -> Result> { let dir = data_dir.join(FEDERATION_DIR); let path = dir.join(NODES_FILE); if !path.exists() { return Ok(Vec::new()); } let content = fs::read_to_string(&path) .await .context("Failed to read federation nodes")?; let file: NodesFile = serde_json::from_str(&content).unwrap_or_default(); Ok(file.nodes) } pub async fn save_nodes(data_dir: &Path, nodes: &[FederatedNode]) -> Result<()> { let dir = ensure_dir(data_dir).await?; let file = NodesFile { nodes: nodes.to_vec(), }; let content = serde_json::to_string_pretty(&file).context("Failed to serialize nodes")?; fs::write(dir.join(NODES_FILE), content) .await .context("Failed to write federation nodes")?; Ok(()) } pub async fn add_node(data_dir: &Path, node: FederatedNode) -> Result> { let mut nodes = load_nodes(data_dir).await?; let exists = nodes.iter().any(|n| n.did == node.did); if exists { anyhow::bail!("Node with DID {} is already federated", node.did); } nodes.push(node); save_nodes(data_dir, &nodes).await?; Ok(nodes) } pub async fn remove_node(data_dir: &Path, did: &str) -> Result> { let mut nodes = load_nodes(data_dir).await?; let before = nodes.len(); nodes.retain(|n| n.did != did); if nodes.len() == before { anyhow::bail!("No federated node with DID {}", did); } save_nodes(data_dir, &nodes).await?; Ok(nodes) } pub async fn set_trust_level( data_dir: &Path, did: &str, trust: TrustLevel, ) -> Result> { let mut nodes = load_nodes(data_dir).await?; let node = nodes .iter_mut() .find(|n| n.did == did) .ok_or_else(|| anyhow::anyhow!("No federated node with DID {}", did))?; node.trust_level = trust; save_nodes(data_dir, &nodes).await?; Ok(nodes) } pub async fn update_node_state( data_dir: &Path, did: &str, state: NodeStateSnapshot, ) -> Result<()> { let mut nodes = load_nodes(data_dir).await?; if let Some(node) = nodes.iter_mut().find(|n| n.did == did) { node.last_seen = Some(state.timestamp.clone()); // Update node name from sync if provided (peer announced their name) if let Some(ref name) = state.node_name { if !name.is_empty() { node.name = Some(name.clone()); } } node.last_state = Some(state); save_nodes(data_dir, &nodes).await?; } Ok(()) } // ──────────────────────────── Invite Management ──────────────────────────── async fn load_invites(data_dir: &Path) -> Result { let dir = data_dir.join(FEDERATION_DIR); let path = dir.join(INVITES_FILE); if !path.exists() { return Ok(InvitesFile::default()); } let content = fs::read_to_string(&path) .await .context("Failed to read invites")?; let file: InvitesFile = serde_json::from_str(&content).unwrap_or_default(); Ok(file) } async fn save_invites(data_dir: &Path, invites: &InvitesFile) -> Result<()> { let dir = ensure_dir(data_dir).await?; let content = serde_json::to_string_pretty(invites).context("Failed to serialize invites")?; fs::write(dir.join(INVITES_FILE), content) .await .context("Failed to write invites")?; Ok(()) } /// Generate an invite code. Format: `fed1:` pub async fn create_invite( data_dir: &Path, did: &str, onion: &str, pubkey: &str, ) -> Result { use base64::Engine; use rand::Rng; let mut token_bytes = [0u8; 16]; rand::thread_rng().fill(&mut token_bytes); let token = hex::encode(token_bytes); let payload = serde_json::json!({ "did": did, "onion": onion, "pubkey": pubkey, "token": token, }); let json = serde_json::to_string(&payload).context("Failed to serialize invite")?; let code = format!( "fed1:{}", base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json.as_bytes()) ); let invite = FederationInvite { code: code.clone(), did: did.to_string(), onion: onion.to_string(), pubkey: pubkey.to_string(), created_at: chrono::Utc::now().to_rfc3339(), accepted: false, }; let mut invites = load_invites(data_dir).await?; invites.outgoing.push(invite); save_invites(data_dir, &invites).await?; Ok(code) } /// Parse an invite code into its components. pub fn parse_invite(code: &str) -> Result<(String, String, String, String)> { use base64::Engine; let encoded = code .strip_prefix("fed1:") .ok_or_else(|| anyhow::anyhow!("Invalid invite format: must start with fed1:"))?; let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD .decode(encoded) .context("Invalid base64 in invite code")?; let payload: serde_json::Value = serde_json::from_slice(&bytes).context("Invalid JSON in invite")?; let did = payload["did"] .as_str() .ok_or_else(|| anyhow::anyhow!("Missing did in invite"))? .to_string(); let onion = payload["onion"] .as_str() .ok_or_else(|| anyhow::anyhow!("Missing onion in invite"))? .to_string(); let pubkey = payload["pubkey"] .as_str() .ok_or_else(|| anyhow::anyhow!("Missing pubkey in invite"))? .to_string(); let token = payload["token"] .as_str() .ok_or_else(|| anyhow::anyhow!("Missing token in invite"))? .to_string(); Ok((did, onion, pubkey, token)) } /// Accept an invite: parse code, verify the remote node, add to federation. pub async fn accept_invite( data_dir: &Path, code: &str, local_did: &str, local_onion: &str, local_pubkey: &str, sign_fn: impl FnOnce(&[u8]) -> String, ) -> Result { let (did, onion, pubkey, _token) = parse_invite(code)?; // Check not already federated let nodes = load_nodes(data_dir).await?; if nodes.iter().any(|n| n.did == did) { anyhow::bail!("Already federated with node {}", did); } let node = FederatedNode { did: did.clone(), pubkey, onion, name: None, trust_level: TrustLevel::Trusted, added_at: chrono::Utc::now().to_rfc3339(), last_seen: None, last_state: None, }; add_node(data_dir, node.clone()).await?; // Record as incoming accepted invite let mut invites = load_invites(data_dir).await?; invites.incoming.push(FederationInvite { code: code.to_string(), did: did.clone(), onion: node.onion.clone(), pubkey: node.pubkey.clone(), created_at: chrono::Utc::now().to_rfc3339(), accepted: true, }); save_invites(data_dir, &invites).await?; // Notify remote node (best-effort over Tor) let _ = notify_join(&node.onion, local_did, local_onion, local_pubkey, sign_fn).await; Ok(node) } /// Best-effort notification to the remote node that we joined their federation. /// Signs the message with our ed25519 key so the remote peer can verify authenticity. async fn notify_join( remote_onion: &str, local_did: &str, local_onion: &str, local_pubkey: &str, sign_fn: impl FnOnce(&[u8]) -> String, ) -> Result<()> { let host = if remote_onion.ends_with(".onion") { remote_onion.to_string() } else { format!("{}.onion", remote_onion) }; let url = format!("http://{}/rpc/v1", host); // Sign the canonical message: "peer-joined:{did}:{onion}:{pubkey}" let sign_data = format!("peer-joined:{}:{}:{}", local_did, local_onion, local_pubkey); let signature = sign_fn(sign_data.as_bytes()); let body = serde_json::json!({ "method": "federation.peer-joined", "params": { "did": local_did, "onion": local_onion, "pubkey": local_pubkey, "signature": signature, } }); let proxy = reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?; let client = reqwest::Client::builder() .proxy(proxy) .timeout(std::time::Duration::from_secs(30)) .build() .context("Failed to build HTTP client")?; let _ = client.post(&url).json(&body).send().await; Ok(()) } /// Sync state with a single federated peer over Tor. pub async fn sync_with_peer( data_dir: &Path, peer: &FederatedNode, local_did: &str, sign_fn: impl FnOnce(&[u8]) -> String, ) -> Result { let host = if peer.onion.ends_with(".onion") { peer.onion.clone() } else { format!("{}.onion", peer.onion) }; let url = format!("http://{}/rpc/v1", host); // Sign current timestamp for authentication let timestamp = chrono::Utc::now().to_rfc3339(); let signature = sign_fn(timestamp.as_bytes()); let body = serde_json::json!({ "method": "federation.get-state", "params": {} }); let proxy = reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?; let client = reqwest::Client::builder() .proxy(proxy) .timeout(std::time::Duration::from_secs(30)) .build() .context("Failed to build HTTP client")?; let resp = client .post(&url) .header("X-Federation-DID", local_did) .header("X-Federation-Sig", &signature) .header("X-Federation-Timestamp", ×tamp) .json(&body) .send() .await .context("Failed to reach federated peer")?; if !resp.status().is_success() { anyhow::bail!("Peer returned {}", resp.status()); } let result: serde_json::Value = resp.json().await.context("Invalid response from peer")?; let state_val = result .get("result") .ok_or_else(|| anyhow::anyhow!("No result in peer response"))?; let state: NodeStateSnapshot = serde_json::from_value(state_val.clone()).context("Failed to parse peer state")?; update_node_state(data_dir, &peer.did, state.clone()).await?; Ok(state) } /// Sync with a peer using the transport router (Mesh > LAN > Tor). /// Uses CBOR delta encoding for compact payloads over constrained links. /// Falls back to `sync_with_peer()` if no transport router is available. pub async fn sync_with_peer_via_transport( data_dir: &Path, peer: &FederatedNode, local_did: &str, previous_state: Option<&NodeStateSnapshot>, router: &crate::transport::TransportRouter, ) -> Result<()> { use crate::transport::{MessageType, TransportMessage}; use crate::transport::delta; // Build the sync request payload — if we have a previous state from this peer, // send a delta request (tells the peer we only need changes since timestamp). let payload = if let Some(prev) = previous_state { // Request delta since our last known state let request = serde_json::json!({ "type": "state_sync_request", "since": prev.timestamp, }); delta::encode_cbor(&delta::StateDelta { ts: prev.timestamp.clone(), v: 1, ..Default::default() })? } else { // First sync — request full state let request = serde_json::json!({ "type": "state_sync_request" }); serde_json::to_vec(&request)? }; let message = TransportMessage { from_did: local_did.to_string(), payload, message_type: MessageType::StateSync, }; let transport_used = router.send_to_peer(&peer.did, &message).await?; tracing::info!( peer = %peer.did, transport = %transport_used, "Federation sync sent via transport" ); Ok(()) } /// Build the local node's state snapshot for sharing with peers. pub fn build_local_state( apps: Vec, cpu: f64, mem_used: u64, mem_total: u64, disk_used: u64, disk_total: u64, uptime: u64, tor_active: bool, server_name: Option, ) -> NodeStateSnapshot { NodeStateSnapshot { timestamp: chrono::Utc::now().to_rfc3339(), node_name: server_name, apps, cpu_usage_percent: Some(cpu), mem_used_bytes: Some(mem_used), mem_total_bytes: Some(mem_total), disk_used_bytes: Some(disk_used), disk_total_bytes: Some(disk_total), uptime_secs: Some(uptime), tor_active: Some(tor_active), } } /// Deploy an app to a remote federated peer over Tor. /// Only works if the peer is trusted and the app exists in our marketplace. pub async fn deploy_to_peer( peer: &FederatedNode, app_id: &str, version: &str, marketplace_url: &str, local_did: &str, sign_fn: impl FnOnce(&[u8]) -> String, ) -> Result { if peer.trust_level != TrustLevel::Trusted { anyhow::bail!("Can only deploy to trusted peers (current: {})", peer.trust_level); } let host = if peer.onion.ends_with(".onion") { peer.onion.clone() } else { format!("{}.onion", peer.onion) }; let url = format!("http://{}/rpc/v1", host); let timestamp = chrono::Utc::now().to_rfc3339(); let signature = sign_fn(timestamp.as_bytes()); let body = serde_json::json!({ "method": "package.install", "params": { "id": app_id, "version": version, "marketplace-url": marketplace_url, } }); let proxy = reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?; let client = reqwest::Client::builder() .proxy(proxy) .timeout(std::time::Duration::from_secs(120)) .build() .context("Failed to build HTTP client")?; let resp = client .post(&url) .header("X-Federation-DID", local_did) .header("X-Federation-Sig", &signature) .header("X-Federation-Timestamp", ×tamp) .json(&body) .send() .await .context("Failed to reach federated peer for deploy")?; if !resp.status().is_success() { anyhow::bail!("Remote node returned HTTP {}", resp.status()); } let result: serde_json::Value = resp.json().await.context("Invalid response from peer")?; if let Some(err) = result.get("error") { if !err.is_null() { let msg = err.get("message").and_then(|m| m.as_str()).unwrap_or("Unknown remote error"); anyhow::bail!("Remote node refused deploy: {}", msg); } } Ok(serde_json::json!({ "deployed": true, "app_id": app_id, "peer_did": peer.did, "peer_onion": peer.onion, })) } #[cfg(test)] mod tests { use super::*; fn make_node(did: &str, onion: &str) -> FederatedNode { FederatedNode { did: did.to_string(), pubkey: "aabbccdd".to_string(), onion: onion.to_string(), name: None, trust_level: TrustLevel::Trusted, added_at: "2026-01-01T00:00:00Z".to_string(), last_seen: None, last_state: None, } } #[test] fn test_trust_level_serialization() { let json = serde_json::to_string(&TrustLevel::Trusted).unwrap(); assert_eq!(json, "\"trusted\""); let parsed: TrustLevel = serde_json::from_str("\"observer\"").unwrap(); assert_eq!(parsed, TrustLevel::Observer); } #[test] fn test_federated_node_serialization_roundtrip() { let node = make_node("did:key:zABC", "test.onion"); let json = serde_json::to_string(&node).unwrap(); let parsed: FederatedNode = serde_json::from_str(&json).unwrap(); assert_eq!(parsed.did, "did:key:zABC"); assert_eq!(parsed.trust_level, TrustLevel::Trusted); assert!(parsed.last_state.is_none()); } #[test] fn test_node_state_snapshot_defaults() { let json = r#"{"timestamp": "2026-01-01T00:00:00Z"}"#; let state: NodeStateSnapshot = serde_json::from_str(json).unwrap(); assert!(state.apps.is_empty()); assert!(state.cpu_usage_percent.is_none()); } #[tokio::test] async fn test_load_nodes_empty_when_no_file() { let dir = tempfile::tempdir().unwrap(); let nodes = load_nodes(dir.path()).await.unwrap(); assert!(nodes.is_empty()); } #[tokio::test] async fn test_save_and_load_nodes_roundtrip() { let dir = tempfile::tempdir().unwrap(); let nodes = vec![ make_node("did:key:z1", "a.onion"), make_node("did:key:z2", "b.onion"), ]; save_nodes(dir.path(), &nodes).await.unwrap(); let loaded = load_nodes(dir.path()).await.unwrap(); assert_eq!(loaded.len(), 2); assert_eq!(loaded[0].did, "did:key:z1"); } #[tokio::test] async fn test_add_node_deduplicates_by_did() { let dir = tempfile::tempdir().unwrap(); add_node(dir.path(), make_node("did:key:z1", "a.onion")) .await .unwrap(); let result = add_node(dir.path(), make_node("did:key:z1", "b.onion")).await; assert!(result.is_err()); } #[tokio::test] async fn test_remove_node_by_did() { let dir = tempfile::tempdir().unwrap(); add_node(dir.path(), make_node("did:key:z1", "a.onion")) .await .unwrap(); add_node(dir.path(), make_node("did:key:z2", "b.onion")) .await .unwrap(); let result = remove_node(dir.path(), "did:key:z1").await.unwrap(); assert_eq!(result.len(), 1); assert_eq!(result[0].did, "did:key:z2"); } #[tokio::test] async fn test_remove_nonexistent_node_errors() { let dir = tempfile::tempdir().unwrap(); let result = remove_node(dir.path(), "did:key:nonexistent").await; assert!(result.is_err()); } #[tokio::test] async fn test_set_trust_level() { let dir = tempfile::tempdir().unwrap(); add_node(dir.path(), make_node("did:key:z1", "a.onion")) .await .unwrap(); let nodes = set_trust_level(dir.path(), "did:key:z1", TrustLevel::Observer) .await .unwrap(); assert_eq!(nodes[0].trust_level, TrustLevel::Observer); } #[tokio::test] async fn test_update_node_state() { let dir = tempfile::tempdir().unwrap(); add_node(dir.path(), make_node("did:key:z1", "a.onion")) .await .unwrap(); let state = NodeStateSnapshot { timestamp: "2026-03-10T12:00:00Z".to_string(), apps: vec![AppStatus { id: "bitcoin".to_string(), status: "running".to_string(), version: Some("27.0".to_string()), }], cpu_usage_percent: Some(45.2), mem_used_bytes: Some(4_000_000_000), mem_total_bytes: Some(8_000_000_000), disk_used_bytes: None, disk_total_bytes: None, uptime_secs: Some(86400), tor_active: Some(true), }; update_node_state(dir.path(), "did:key:z1", state) .await .unwrap(); let nodes = load_nodes(dir.path()).await.unwrap(); assert!(nodes[0].last_seen.is_some()); let ls = nodes[0].last_state.as_ref().unwrap(); assert_eq!(ls.apps.len(), 1); assert_eq!(ls.cpu_usage_percent, Some(45.2)); } #[tokio::test] async fn test_create_and_parse_invite() { let dir = tempfile::tempdir().unwrap(); let code = create_invite(dir.path(), "did:key:z1", "test.onion", "aabbcc") .await .unwrap(); assert!(code.starts_with("fed1:")); let (did, onion, pubkey, token) = parse_invite(&code).unwrap(); assert_eq!(did, "did:key:z1"); assert_eq!(onion, "test.onion"); assert_eq!(pubkey, "aabbcc"); assert_eq!(token.len(), 32); // 16 bytes = 32 hex chars } #[test] fn test_parse_invalid_invite() { assert!(parse_invite("invalid").is_err()); assert!(parse_invite("fed1:not-valid-base64!!!").is_err()); } #[tokio::test] async fn test_accept_invite_creates_node() { let dir = tempfile::tempdir().unwrap(); let code = create_invite(dir.path(), "did:key:zRemote", "remote.onion", "remotepub") .await .unwrap(); // Accept from a different "local" perspective let dir2 = tempfile::tempdir().unwrap(); let node = accept_invite( dir2.path(), &code, "did:key:zLocal", "local.onion", "localpub", ) .await .unwrap(); assert_eq!(node.did, "did:key:zRemote"); assert_eq!(node.trust_level, TrustLevel::Trusted); let nodes = load_nodes(dir2.path()).await.unwrap(); assert_eq!(nodes.len(), 1); } #[tokio::test] async fn test_accept_invite_rejects_duplicate() { let dir = tempfile::tempdir().unwrap(); let code = create_invite(dir.path(), "did:key:zRemote", "remote.onion", "remotepub") .await .unwrap(); let dir2 = tempfile::tempdir().unwrap(); accept_invite( dir2.path(), &code, "did:key:zLocal", "local.onion", "localpub", ) .await .unwrap(); // Accepting the same invite again should fail let result = accept_invite( dir2.path(), &code, "did:key:zLocal", "local.onion", "localpub", ) .await; assert!(result.is_err()); } #[test] fn test_build_local_state() { let state = build_local_state( vec![AppStatus { id: "lnd".to_string(), status: "running".to_string(), version: Some("0.18".to_string()), }], 25.5, 2_000_000_000, 8_000_000_000, 100_000_000_000, 500_000_000_000, 3600, true, Some("Test Node".to_string()), ); assert_eq!(state.apps.len(), 1); assert_eq!(state.cpu_usage_percent, Some(25.5)); assert_eq!(state.tor_active, Some(true)); assert_eq!(state.node_name, Some("Test Node".to_string())); } }