use super::RpcHandler; use crate::credentials; use crate::federation::{self, FederatedNode, TrustLevel}; use crate::identity; use crate::identity_manager::IdentityManager; use crate::network::dwn_store::DwnStore; use anyhow::Result; use tracing::{debug, info}; const FEDERATION_PROTOCOL: &str = "https://archipelago.dev/protocols/federation/v1"; /// Validate a DID parameter: must start with "did:", max 256 chars, no path traversal. fn validate_did(did: &str) -> Result<()> { if did.is_empty() || did.len() > 256 { anyhow::bail!("Invalid DID: must be 1-256 characters"); } if !did.starts_with("did:") { anyhow::bail!("Invalid DID: must start with 'did:'"); } if did.contains("..") || did.contains('/') || did.contains('\\') || did.contains('\0') { anyhow::bail!("Invalid DID: contains forbidden characters"); } Ok(()) } impl RpcHandler { /// federation.invite — Generate an invite code containing our DID + onion for a peer. pub(super) async fn handle_federation_invite(&self) -> Result { let (data, _) = self.state_manager.get_snapshot().await; let did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey)?; let onion = data .server_info .tor_address .clone() .unwrap_or_default(); let pubkey = data.server_info.pubkey.clone(); if onion.is_empty() { anyhow::bail!("Tor address not available. Tor may not be running."); } let code = federation::create_invite(&self.config.data_dir, &did, &onion, &pubkey).await?; info!(did = %did, "Generated federation invite"); Ok(serde_json::json!({ "code": code, "did": did, "onion": onion, })) } /// federation.join — Accept an invite code and establish federation with the remote node. pub(super) async fn handle_federation_join( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let code = params .get("code") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing 'code' parameter"))?; let (data, _) = self.state_manager.get_snapshot().await; let local_did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey)?; let local_onion = data.server_info.tor_address.clone().unwrap_or_default(); let local_pubkey = data.server_info.pubkey.clone(); let node = federation::accept_invite( &self.config.data_dir, code, &local_did, &local_onion, &local_pubkey, ) .await?; info!(peer_did = %node.did, "Joined federation with peer"); // Store federation membership as DWN message if let Ok(store) = DwnStore::new(&self.config.data_dir).await { let dwn_data = serde_json::json!({ "node_did": node.did, "trust_level": node.trust_level.to_string(), "joined_at": chrono::Utc::now().to_rfc3339(), "apps": [], }); if let Err(e) = store .write_message( &local_did, Some(FEDERATION_PROTOCOL), Some("https://archipelago.dev/schemas/federation-membership/v1"), Some("application/json"), Some(dwn_data), ) .await { debug!("DWN federation membership write (non-fatal): {}", e); } } // Issue a federation trust VC attesting the peer relationship { let data_dir = self.config.data_dir.clone(); let peer_did = node.did.clone(); let issuer_did_vc = local_did.clone(); tokio::spawn(async move { let claims = serde_json::json!({ "federationPeer": true, "establishedAt": chrono::Utc::now().to_rfc3339(), }); match credentials::issue_credential( &data_dir, &issuer_did_vc, &peer_did, "FederationTrustCredential", claims, None, |bytes| { // Sign with node identity key let identity_dir = data_dir.join("identity"); tokio::task::block_in_place(|| { let rt = tokio::runtime::Handle::current(); rt.block_on(async { let id = crate::identity::NodeIdentity::load_or_create(&identity_dir).await?; Ok(id.sign(bytes)) }) }) }, ) .await { Ok(vc) => debug!(vc_id = %vc.id, peer = %peer_did, "Issued federation trust VC"), Err(e) => debug!(error = %e, "Federation trust VC issuance failed (non-fatal)"), } }); } Ok(serde_json::json!({ "joined": true, "node": { "did": node.did, "onion": node.onion, "pubkey": node.pubkey, "trust_level": node.trust_level.to_string(), } })) } /// federation.list-nodes — List all federated nodes with their status and last state. pub(super) async fn handle_federation_list_nodes(&self) -> Result { let nodes = federation::load_nodes(&self.config.data_dir).await?; let nodes_json: Vec = nodes .iter() .map(|n| { let mut obj = serde_json::json!({ "did": n.did, "pubkey": n.pubkey, "onion": n.onion, "trust_level": n.trust_level.to_string(), "added_at": n.added_at, }); if let Some(name) = &n.name { obj["name"] = serde_json::json!(name); } if let Some(last_seen) = &n.last_seen { obj["last_seen"] = serde_json::json!(last_seen); } if let Some(state) = &n.last_state { obj["last_state"] = serde_json::to_value(state).unwrap_or_default(); } obj }) .collect(); Ok(serde_json::json!({ "nodes": nodes_json })) } /// federation.remove-node — Remove a node from the federation by DID. pub(super) async fn handle_federation_remove_node( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let did = params .get("did") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing 'did' parameter"))?; validate_did(did)?; let nodes = federation::remove_node(&self.config.data_dir, did).await?; info!(did = %did, "Removed node from federation"); Ok(serde_json::json!({ "removed": true, "nodes_remaining": nodes.len(), })) } /// federation.set-trust — Change trust level for a federated node. pub(super) async fn handle_federation_set_trust( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let did = params .get("did") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing 'did' parameter"))?; validate_did(did)?; let trust_str = params .get("trust_level") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing 'trust_level' parameter"))?; let trust = match trust_str { "trusted" => TrustLevel::Trusted, "observer" => TrustLevel::Observer, "untrusted" => TrustLevel::Untrusted, _ => anyhow::bail!("Invalid trust level: {} (expected trusted/observer/untrusted)", trust_str), }; federation::set_trust_level(&self.config.data_dir, did, trust).await?; Ok(serde_json::json!({ "updated": true, "did": did, "trust_level": trust.to_string(), })) } /// federation.sync-state — Manually trigger state sync with all federated peers. pub(super) async fn handle_federation_sync_state(&self) -> Result { let nodes = federation::load_nodes(&self.config.data_dir).await?; if nodes.is_empty() { return Ok(serde_json::json!({ "synced": 0, "failed": 0, "results": [], })); } let (data, _) = self.state_manager.get_snapshot().await; let local_did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey)?; let identity_dir = self.config.data_dir.join("identity"); let node_identity = identity::NodeIdentity::load_or_create(&identity_dir).await?; let mut synced = 0u32; let mut failed = 0u32; let mut results = Vec::new(); for node in &nodes { if node.trust_level == TrustLevel::Untrusted { continue; } let did_clone = local_did.clone(); match federation::sync_with_peer( &self.config.data_dir, node, &did_clone, |bytes| node_identity.sign(bytes), ) .await { Ok(state) => { synced += 1; results.push(serde_json::json!({ "did": node.did, "status": "ok", "apps": state.apps.len(), })); } Err(e) => { failed += 1; results.push(serde_json::json!({ "did": node.did, "status": "error", "error": e.to_string(), })); } } } Ok(serde_json::json!({ "synced": synced, "failed": failed, "results": results, })) } /// federation.get-state — Return this node's state snapshot (called by peers during sync). pub(super) async fn handle_federation_get_state(&self) -> Result { let (data, _) = self.state_manager.get_snapshot().await; // Build app statuses from package_data let apps: Vec = data .package_data .iter() .map(|(id, pkg)| federation::AppStatus { id: id.clone(), status: format!("{:?}", pkg.state).to_lowercase(), version: Some(pkg.manifest.version.clone()), }) .collect(); let tor_active = data.server_info.tor_address.is_some(); let state = federation::build_local_state( apps, 0.0, 0, 0, 0, 0, 0, tor_active, ); Ok(serde_json::to_value(&state)?) } /// federation.peer-joined — Called by a remote peer after they accept our invite. pub(super) async fn handle_federation_peer_joined( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let did = params .get("did") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing 'did'"))?; let onion = params .get("onion") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing 'onion'"))?; let pubkey = params .get("pubkey") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing 'pubkey'"))?; let nodes = federation::load_nodes(&self.config.data_dir).await?; if nodes.iter().any(|n| n.did == did) { return Ok(serde_json::json!({ "accepted": true, "already_known": true })); } let node = FederatedNode { did: did.to_string(), pubkey: pubkey.to_string(), onion: onion.to_string(), name: None, trust_level: TrustLevel::Trusted, added_at: chrono::Utc::now().to_rfc3339(), last_seen: None, last_state: None, }; federation::add_node(&self.config.data_dir, node).await?; info!(peer_did = %did, "Peer joined our federation"); Ok(serde_json::json!({ "accepted": true })) } /// federation.deploy-app — Deploy an app to a remote federated node. pub(super) async fn handle_federation_deploy_app( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let peer_did = params .get("did") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing 'did' (target node)"))?; let app_id = params .get("app_id") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing 'app_id'"))?; let version = params .get("version") .and_then(|v| v.as_str()) .unwrap_or("latest"); let marketplace_url = params .get("marketplace_url") .and_then(|v| v.as_str()) .unwrap_or(""); let nodes = federation::load_nodes(&self.config.data_dir).await?; let peer = nodes .iter() .find(|n| n.did == peer_did) .ok_or_else(|| anyhow::anyhow!("No federated node with DID {}", peer_did))?; let (data, _) = self.state_manager.get_snapshot().await; let local_did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey)?; let identity_dir = self.config.data_dir.join("identity"); let node_identity = identity::NodeIdentity::load_or_create(&identity_dir).await?; let result = federation::deploy_to_peer( peer, app_id, version, marketplace_url, &local_did, |bytes| node_identity.sign(bytes), ) .await?; info!(app = %app_id, peer = %peer_did, "Deployed app to federated peer"); Ok(result) } /// federation.peer-address-changed — A peer notifies us that their .onion changed. pub(super) async fn handle_federation_peer_address_changed( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let did = params .get("did") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing did"))?; let new_onion = params .get("new_onion") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing new_onion"))?; // Load existing nodes, find the peer by DID, update their onion let mut nodes = federation::load_nodes(&self.config.data_dir).await?; let found = nodes.iter_mut().find(|n| n.did == did); match found { Some(node) => { let old = node.onion.clone(); node.onion = new_onion.to_string(); federation::save_nodes(&self.config.data_dir, &nodes).await?; info!(did = %did, old_onion = %old, new_onion = %new_onion, "Updated federated peer address"); Ok(serde_json::json!({ "updated": true, "did": did, "old_onion": old, "new_onion": new_onion, })) } None => { info!(did = %did, "Received address change from unknown peer — ignoring"); Ok(serde_json::json!({ "updated": false, "reason": "Unknown peer DID", })) } } } }