use crate::api::ApiHandler; use crate::config::{Config, ContainerRuntime}; use crate::container::{docker_packages, DockerPackageScanner}; use crate::identity::{self, NodeIdentity}; use crate::node_message; use crate::nostr_discovery; use crate::peers; use crate::state::StateManager; use anyhow::Result; use hyper::server::conn::Http; use hyper::service::service_fn; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use tokio::net::TcpListener; use tracing::{debug, error, info}; pub struct Server { _config: Config, _identity: Arc, api_handler: Arc, _state_manager: Arc, } impl Server { pub async fn new(config: Config) -> Result { let state_manager = Arc::new(StateManager::new()); // Load node identity and set stable server_info let identity_dir = config.data_dir.join("identity"); let identity = NodeIdentity::load_or_create(&identity_dir).await?; let (mut data, _) = state_manager.get_snapshot().await; data.server_info.id = identity.node_id(); data.server_info.pubkey = identity.pubkey_hex(); data.server_info.tor_address = docker_packages::read_tor_address("archipelago"); if let Some(ref tor) = data.server_info.tor_address { data.server_info.node_address = Some(identity.node_address(tor)); } state_manager.update_data(data.clone()).await; // Revoke any previously published Nostr data (runs before publish so revocation is not overwritten) let identity_dir = config.data_dir.join("identity"); let tor_proxy_revoke = config.nostr_tor_proxy.clone(); if let Err(e) = nostr_discovery::revoke_if_needed(&identity_dir, tor_proxy_revoke.as_deref()).await { tracing::debug!("Nostr revoke (non-fatal): {}", e); } // Publish node identity to Nostr only when opt-in (nostr_discovery_enabled + relays) if config.nostr_discovery_enabled && !config.nostr_relays.is_empty() && data.server_info.node_address.is_some() { let identity_dir = config.data_dir.join("identity"); let did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey).unwrap_or_default(); let node_addr = data.server_info.node_address.clone().unwrap_or_default(); let version = data.server_info.version.clone(); let relays = config.nostr_relays.clone(); let tor_proxy = config.nostr_tor_proxy.clone(); tokio::spawn(async move { if let Err(e) = nostr_discovery::publish_node_identity( &identity_dir, &did, &node_addr, &version, &relays, tor_proxy.as_deref(), ) .await { tracing::debug!("Nostr publish (non-fatal): {}", e); } }); } info!("🔑 Node identity: {} (pubkey: {}...)", identity.node_id(), &identity.pubkey_hex()[..16.min(identity.pubkey_hex().len())]); let identity = Arc::new(identity); let api_handler = Arc::new(ApiHandler::new(config.clone(), state_manager.clone()).await?); // Periodic Tor address refresh (runs regardless of dev_mode) // Picks up hostname when Tor creates it after startup/rotation (30-60s delay) { let state = state_manager.clone(); let identity_clone = identity.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(30)); loop { interval.tick().await; if let Err(e) = refresh_tor_address(&state, identity_clone.as_ref()).await { debug!("Tor address refresh (non-fatal): {}", e); } } }); } // Initialize Docker scanner if in dev mode if config.dev_mode { let scanner = create_docker_scanner(&config).await?; let state = state_manager.clone(); let identity_clone = identity.clone(); // Initial scan tokio::spawn(async move { info!("🐳 Scanning Docker containers..."); if let Err(e) = scan_and_update_packages(&scanner, &state, identity_clone.as_ref()).await { error!("Failed to scan Docker containers: {}", e); } // Periodic scan every 10 seconds (only broadcasts if state changed) let mut interval = tokio::time::interval(Duration::from_secs(10)); loop { interval.tick().await; if let Err(e) = scan_and_update_packages(&scanner, &state, identity_clone.as_ref()).await { error!("Failed to update Docker containers: {}", e); } } }); } // Peer health monitoring — check every 5 minutes { let state = state_manager.clone(); let data_dir = config.data_dir.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(300)); loop { interval.tick().await; if let Err(e) = check_peer_health(&state, &data_dir).await { debug!("Peer health check (non-fatal): {}", e); } } }); } Ok(Self { _config: config, _identity: identity, api_handler, _state_manager: state_manager, }) } pub async fn serve(&self, addr: SocketAddr) -> Result<()> { let listener = TcpListener::bind(addr).await?; loop { let (stream, peer_addr) = match listener.accept().await { Ok(conn) => conn, Err(e) => { error!("Failed to accept connection: {}", e); continue; } }; let handler = self.api_handler.clone(); tokio::spawn(async move { let service = service_fn(move |req| { let handler = handler.clone(); async move { handler.handle_request(req).await .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e))) } }); if let Err(e) = Http::new() .http1_keep_alive(false) .serve_connection(stream, service) .with_upgrades() .await { error!("Error serving connection from {}: {}", peer_addr, e); } }); } } } async fn create_docker_scanner(config: &Config) -> Result { let user = std::env::var("USER").unwrap_or_else(|_| "archipelago".to_string()); let runtime: Arc = match &config.container_runtime { ContainerRuntime::Podman => { Arc::new(archipelago_container::PodmanRuntime::new(user.clone())) } ContainerRuntime::Docker => { Arc::new(archipelago_container::DockerRuntime::new(user.clone())) } ContainerRuntime::Auto => { Arc::new( archipelago_container::AutoRuntime::new(user.clone()) .await? ) } }; Ok(DockerPackageScanner::new(runtime)) } async fn refresh_tor_address(state: &StateManager, identity: &NodeIdentity) -> Result<()> { let tor_addr = docker_packages::read_tor_address("archipelago"); let (current_data, _) = state.get_snapshot().await; if tor_addr != current_data.server_info.tor_address { let mut data = current_data; data.server_info.tor_address = tor_addr.clone(); data.server_info.node_address = tor_addr.as_ref().map(|t| identity.node_address(t)); state.update_data(data).await; if let Some(ref addr) = tor_addr { info!("🔒 Tor address updated: {}", addr); } } Ok(()) } async fn scan_and_update_packages( scanner: &DockerPackageScanner, state: &StateManager, identity: &NodeIdentity, ) -> Result<()> { let packages = scanner.scan_containers().await?; let (current_data, _) = state.get_snapshot().await; let packages_changed = !packages.is_empty() && current_data.package_data != packages; let tor_addr = docker_packages::read_tor_address("archipelago"); let tor_changed = tor_addr != current_data.server_info.tor_address; if packages_changed || tor_changed { let mut data = current_data; if !packages.is_empty() { data.package_data = packages; } data.server_info.tor_address = tor_addr.clone(); data.server_info.node_address = tor_addr.as_ref().map(|t| identity.node_address(t)); state.update_data(data).await; debug!("📦 State changed (packages={}, tor={}), broadcasting update", packages_changed, tor_changed); } Ok(()) } /// Periodically check peer reachability and broadcast status changes. async fn check_peer_health(state: &StateManager, data_dir: &std::path::Path) -> Result<()> { let known_peers = peers::load_peers(data_dir).await.unwrap_or_default(); if known_peers.is_empty() { return Ok(()); } let mut new_health = std::collections::HashMap::new(); for peer in &known_peers { let reachable = node_message::check_peer_reachable(&peer.onion) .await .unwrap_or(false); new_health.insert(peer.onion.clone(), reachable); } let (current_data, _) = state.get_snapshot().await; if current_data.peer_health != new_health { let mut data = current_data; data.peer_health = new_health; state.update_data(data).await; debug!("🔗 Peer health updated, broadcasting changes"); } Ok(()) }