use crate::api::ApiHandler; use crate::config::{Config, ContainerRuntime}; use crate::container::{docker_packages, DockerPackageScanner}; use crate::identity::{self, NodeIdentity}; use crate::monitoring::MetricsStore; use crate::node_message; use crate::nostr_discovery; use crate::nostr_handshake; use crate::peers; use crate::state::StateManager; use anyhow::Result; use hyper::server::conn::Http; use hyper::service::service_fn; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use tokio::net::TcpListener; use tracing::{debug, error, info, warn}; pub struct Server { _config: Config, _identity: Arc, api_handler: Arc, _state_manager: Arc, } impl Server { pub async fn new(config: Config) -> Result { let state_manager = Arc::new(StateManager::new()); // Load node identity and set stable server_info let identity_dir = config.data_dir.join("identity"); let identity = NodeIdentity::load_or_create(&identity_dir).await?; let (mut data, _) = state_manager.get_snapshot().await; data.server_info.id = identity.node_id(); data.server_info.pubkey = identity.pubkey_hex(); data.server_info.tor_address = docker_packages::read_tor_address("archipelago"); if let Some(ref tor) = data.server_info.tor_address { data.server_info.node_address = Some(identity.node_address(tor)); } state_manager.update_data(data.clone()).await; // Revoke any previously published Nostr data (runs before publish so revocation is not overwritten) let identity_dir = config.data_dir.join("identity"); let tor_proxy_revoke = config.nostr_tor_proxy.clone(); if let Err(e) = nostr_discovery::revoke_if_needed(&identity_dir, tor_proxy_revoke.as_deref()).await { tracing::debug!("Nostr revoke (non-fatal): {}", e); } // Publish presence-only to Nostr (DID + Nostr pubkey, NO onion address). // Onion addresses are exchanged privately via NIP-44 encrypted DMs. if config.nostr_discovery_enabled && !config.nostr_relays.is_empty() { let identity_dir = config.data_dir.join("identity"); let did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey).unwrap_or_default(); let version = data.server_info.version.clone(); let relays = config.nostr_relays.clone(); let tor_proxy = config.nostr_tor_proxy.clone(); tokio::spawn(async move { if let Err(e) = nostr_handshake::publish_presence( &identity_dir, &did, &version, &relays, tor_proxy.as_deref(), ) .await { tracing::debug!("Nostr presence publish (non-fatal): {}", e); } }); } info!("🔑 Node identity: {} (pubkey: {}...)", identity.node_id(), &identity.pubkey_hex()[..16.min(identity.pubkey_hex().len())]); let identity = Arc::new(identity); // Create metrics store and spawn background collector let metrics_store = Arc::new(MetricsStore::with_data_dir(config.data_dir.clone())); crate::monitoring::spawn_metrics_collector(metrics_store.clone(), Some(state_manager.clone()), Some(config.data_dir.clone())); let api_handler = Arc::new( ApiHandler::new(config.clone(), state_manager.clone(), metrics_store).await?, ); // Periodic Tor address refresh (runs regardless of dev_mode) // Picks up hostname when Tor creates it after startup/rotation (30-60s delay) { let state = state_manager.clone(); let identity_clone = identity.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(30)); loop { interval.tick().await; if let Err(e) = refresh_tor_address(&state, identity_clone.as_ref()).await { debug!("Tor address refresh (non-fatal): {}", e); } } }); } // Initialize container scanner — discovers installed apps from Podman/Docker { let scanner = create_docker_scanner(&config).await?; let state = state_manager.clone(); let identity_clone = identity.clone(); // Initial scan tokio::spawn(async move { info!("🐳 Scanning containers..."); if let Err(e) = scan_and_update_packages(&scanner, &state, identity_clone.as_ref()).await { error!("Failed to scan containers: {}", e); } // Periodic scan every 10 seconds (only broadcasts if state changed) let mut interval = tokio::time::interval(Duration::from_secs(10)); loop { interval.tick().await; if let Err(e) = scan_and_update_packages(&scanner, &state, identity_clone.as_ref()).await { error!("Failed to update containers: {}", e); } } }); } // Peer health monitoring — check every 5 minutes { let state = state_manager.clone(); let data_dir = config.data_dir.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(300)); loop { interval.tick().await; if let Err(e) = check_peer_health(&state, &data_dir).await { debug!("Peer health check (non-fatal): {}", e); } } }); } // Container health monitoring — auto-restart unhealthy containers // Respects webhook config: skips when disabled or ContainerCrash not subscribed crate::health_monitor::spawn_health_monitor(state_manager.clone(), config.data_dir.clone()); Ok(Self { _config: config, _identity: identity, api_handler, _state_manager: state_manager, }) } pub async fn serve(&self, addr: SocketAddr) -> Result<()> { self.serve_with_shutdown(addr, std::future::pending()).await } /// Serve with a graceful shutdown signal. /// When the shutdown future completes, stop accepting new connections and drain in-flight requests. pub async fn serve_with_shutdown( &self, addr: SocketAddr, shutdown: impl std::future::Future, ) -> Result<()> { let listener = TcpListener::bind(addr).await?; let active_connections = Arc::new(tokio::sync::Semaphore::new(1024)); tokio::pin!(shutdown); loop { tokio::select! { result = listener.accept() => { let (stream, peer_addr) = match result { Ok(conn) => conn, Err(e) => { error!("Failed to accept connection: {}", e); continue; } }; let handler = self.api_handler.clone(); let permit = active_connections.clone().acquire_owned().await; tokio::spawn(async move { let _permit = permit; let service = service_fn(move |req| { let handler = handler.clone(); async move { handler.handle_request(req).await .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e))) } }); if let Err(e) = Http::new() .http1_keep_alive(false) .serve_connection(stream, service) .with_upgrades() .await { error!("Error serving connection from {}: {}", peer_addr, e); } }); } _ = &mut shutdown => { info!("Shutdown signal received, draining connections..."); // Wait up to 5 seconds for in-flight requests to complete let drain_start = std::time::Instant::now(); let drain_timeout = std::time::Duration::from_secs(5); while active_connections.available_permits() < 1024 { if drain_start.elapsed() > drain_timeout { warn!("Drain timeout reached, forcing shutdown"); break; } tokio::time::sleep(std::time::Duration::from_millis(100)).await; } info!("Shutdown complete"); return Ok(()); } } } } } async fn create_docker_scanner(config: &Config) -> Result { let user = std::env::var("USER").unwrap_or_else(|_| "archipelago".to_string()); let runtime: Arc = match &config.container_runtime { ContainerRuntime::Podman => { Arc::new(archipelago_container::PodmanRuntime::new(user.clone())) } ContainerRuntime::Docker => { Arc::new(archipelago_container::DockerRuntime::new(user.clone())) } ContainerRuntime::Auto => { Arc::new( archipelago_container::AutoRuntime::new(user.clone()) .await? ) } }; Ok(DockerPackageScanner::new(runtime)) } async fn refresh_tor_address(state: &StateManager, identity: &NodeIdentity) -> Result<()> { let tor_addr = docker_packages::read_tor_address("archipelago"); let (current_data, _) = state.get_snapshot().await; if tor_addr != current_data.server_info.tor_address { let mut data = current_data; data.server_info.tor_address = tor_addr.clone(); data.server_info.node_address = tor_addr.as_ref().map(|t| identity.node_address(t)); state.update_data(data).await; if let Some(ref addr) = tor_addr { info!("🔒 Tor address updated: {}", addr); } } Ok(()) } async fn scan_and_update_packages( scanner: &DockerPackageScanner, state: &StateManager, identity: &NodeIdentity, ) -> Result<()> { let packages = scanner.scan_containers().await?; let (current_data, _) = state.get_snapshot().await; let packages_changed = !packages.is_empty() && current_data.package_data != packages; let tor_addr = docker_packages::read_tor_address("archipelago"); let tor_changed = tor_addr != current_data.server_info.tor_address; if packages_changed || tor_changed { let mut data = current_data; if !packages.is_empty() { data.package_data = packages; } data.server_info.tor_address = tor_addr.clone(); data.server_info.node_address = tor_addr.as_ref().map(|t| identity.node_address(t)); state.update_data(data).await; debug!("📦 State changed (packages={}, tor={}), broadcasting update", packages_changed, tor_changed); } Ok(()) } /// Periodically check peer reachability and broadcast status changes. async fn check_peer_health(state: &StateManager, data_dir: &std::path::Path) -> Result<()> { let known_peers = peers::load_peers(data_dir).await.unwrap_or_default(); if known_peers.is_empty() { return Ok(()); } let mut new_health = std::collections::HashMap::new(); for peer in &known_peers { let reachable = node_message::check_peer_reachable(&peer.onion) .await .unwrap_or(false); new_health.insert(peer.onion.clone(), reachable); } let (current_data, _) = state.get_snapshot().await; if current_data.peer_health != new_health { let mut data = current_data; data.peer_health = new_health; state.update_data(data).await; debug!("🔗 Peer health updated, broadcasting changes"); } Ok(()) }