use crate::api::ApiHandler; use crate::config::{Config, ContainerRuntime}; use crate::container::{docker_packages, DockerPackageScanner}; use crate::identity::{self, NodeIdentity}; use crate::monitoring::MetricsStore; use crate::node_message; use crate::nostr_discovery; use crate::nostr_handshake; use crate::peers; use crate::state::StateManager; use anyhow::Result; use hyper::server::conn::Http; use hyper::service::service_fn; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use tokio::net::TcpListener; use tracing::{debug, error, info, warn}; pub struct Server { _config: Config, _identity: Arc, api_handler: Arc, _state_manager: Arc, } impl Server { pub async fn new(config: Config) -> Result { let state_manager = Arc::new(StateManager::new()); // Load node identity and set stable server_info let identity_dir = config.data_dir.join("identity"); let identity = NodeIdentity::load_or_create(&identity_dir).await?; let (mut data, _) = state_manager.get_snapshot().await; data.server_info.id = identity.node_id(); data.server_info.pubkey = identity.pubkey_hex(); // Load persisted server name let name_file = config.data_dir.join("server-name"); if let Ok(name) = tokio::fs::read_to_string(&name_file).await { let name = name.trim().to_string(); if !name.is_empty() { data.server_info.name = Some(name); } } data.server_info.tor_address = docker_packages::read_tor_address("archipelago").await; if let Some(ref tor) = data.server_info.tor_address { data.server_info.node_address = Some(identity.node_address(tor)); } state_manager.update_data(data.clone()).await; // Load persisted messages (Archipelago channel) node_message::init(&config.data_dir).await; // Auto-create default identity if none exist (fresh boot or factory reset) { let im = crate::identity_manager::IdentityManager::new(&config.data_dir).await; if let Ok(mgr) = im { if let Ok((list, _)) = mgr.list().await { if list.is_empty() { match mgr.create("Default".to_string(), crate::identity_manager::IdentityPurpose::Personal).await { Ok(record) => { let _ = mgr.create_nostr_key(&record.id).await; tracing::info!(did = %record.did, "Auto-created default identity with Nostr key"); } Err(e) => tracing::debug!("Auto-identity creation (non-fatal): {}", e), } } } } } // Revoke any previously published Nostr data (runs before publish so revocation is not overwritten) let identity_dir = config.data_dir.join("identity"); let tor_proxy_revoke = config.nostr_tor_proxy.clone(); if let Err(e) = nostr_discovery::revoke_if_needed(&identity_dir, tor_proxy_revoke.as_deref()).await { tracing::debug!("Nostr revoke (non-fatal): {}", e); } // Publish presence-only to Nostr (DID + Nostr pubkey, NO onion address). // Onion addresses are exchanged privately via NIP-44 encrypted DMs. if config.nostr_discovery_enabled && !config.nostr_relays.is_empty() { let identity_dir = config.data_dir.join("identity"); let did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey).unwrap_or_default(); let version = data.server_info.version.clone(); let relays = config.nostr_relays.clone(); let tor_proxy = config.nostr_tor_proxy.clone(); tokio::spawn(async move { if let Err(e) = nostr_handshake::publish_presence( &identity_dir, &did, &version, &relays, tor_proxy.as_deref(), ) .await { tracing::debug!("Nostr presence publish (non-fatal): {}", e); } }); } info!("🔑 Node identity: {} (pubkey: {}...)", identity.node_id(), &identity.pubkey_hex()[..16.min(identity.pubkey_hex().len())]); let identity = Arc::new(identity); // Create metrics store and spawn background collector let metrics_store = Arc::new(MetricsStore::with_data_dir(config.data_dir.clone()).await); let metrics_for_telemetry = metrics_store.clone(); crate::monitoring::spawn_metrics_collector(metrics_store.clone(), Some(state_manager.clone()), Some(config.data_dir.clone())); let api_handler = Arc::new( ApiHandler::new(config.clone(), state_manager.clone(), metrics_store).await?, ); // Initialize mesh networking service (if config has enabled: true) { let data_dir = config.data_dir.clone(); let did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey) .unwrap_or_default(); let pubkey_hex = identity.pubkey_hex(); let signing_key = identity.signing_key(); match crate::mesh::MeshService::new(&data_dir, signing_key, &did, &pubkey_hex).await { Ok(mut mesh_service) => { let mesh_config = crate::mesh::load_config(&data_dir).await.unwrap_or_default(); if mesh_config.enabled { if let Err(e) = mesh_service.start() { warn!("Mesh service start failed (non-fatal): {}", e); } else { info!("📡 Mesh networking started"); } } api_handler.rpc_handler().set_mesh_service(mesh_service).await; info!("📡 Mesh service initialized"); } Err(e) => { warn!("Mesh service init failed (non-fatal): {}", e); } } } // Initialize transport router (unified routing: mesh > lan > tor) { let data_dir = config.data_dir.clone(); let did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey) .unwrap_or_default(); let pubkey_hex = identity.pubkey_hex(); let mesh_config = crate::mesh::load_config(&data_dir).await.unwrap_or_default(); let mesh_only = mesh_config.mesh_only_mode.unwrap_or(false); match crate::transport::PeerRegistry::load(&data_dir).await { Ok(registry) => { let registry = std::sync::Arc::new(registry); let mut transports: Vec> = Vec::new(); // Tor transport (always register — availability checked dynamically) transports.push(Box::new( crate::transport::tor::TorTransport::new(&pubkey_hex), )); // Mesh transport (wraps the mesh service) transports.push(Box::new( crate::transport::mesh_transport::MeshTransport::new( api_handler.rpc_handler().mesh_service_arc(), ), )); // LAN transport (mDNS discovery) let mut lan = crate::transport::lan::LanTransport::new(&did, &pubkey_hex, 5678); match lan.start(registry.clone()) { Ok(()) => info!("📡 LAN transport (mDNS) started"), Err(e) => debug!("LAN transport init (non-fatal): {}", e), } transports.push(Box::new(lan)); let router = std::sync::Arc::new(crate::transport::TransportRouter::new( transports, registry, mesh_only, )); api_handler.rpc_handler().set_transport_router(router).await; info!("📡 Transport router initialized (mesh_only={})", mesh_only); } Err(e) => { warn!("Transport router init failed (non-fatal): {}", e); } } } // Register Archipelago DWN protocols (background, non-blocking) { let data_dir = config.data_dir.clone(); tokio::spawn(async move { if let Err(e) = register_dwn_protocols(&data_dir).await { debug!("DWN protocol registration (non-fatal): {}", e); } }); } // Periodic Tor address refresh (runs regardless of dev_mode) // Picks up hostname when Tor creates it after startup/rotation (30-60s delay) { let state = state_manager.clone(); let identity_clone = identity.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(30)); loop { interval.tick().await; if let Err(e) = refresh_tor_address(&state, identity_clone.as_ref()).await { debug!("Tor address refresh (non-fatal): {}", e); } } }); } // Initialize container scanner — discovers installed apps from Podman/Docker { let scanner = create_docker_scanner(&config).await?; let state = state_manager.clone(); let identity_clone = identity.clone(); // Initial scan (delayed to let crash recovery finish first) tokio::spawn(async move { // Brief delay for containers to stabilize after boot tokio::time::sleep(Duration::from_secs(3)).await; info!("🐳 Scanning containers..."); if let Err(e) = scan_and_update_packages(&scanner, &state, identity_clone.as_ref()).await { error!("Failed to scan containers: {}", e); } // Periodic scan every 30 seconds (only broadcasts if state changed) // Uses an in-flight guard to skip scans when a previous one is still running let mut interval = tokio::time::interval(Duration::from_secs(30)); let scanning = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); loop { interval.tick().await; if scanning.load(std::sync::atomic::Ordering::Relaxed) { debug!("Skipping container scan — previous scan still in progress"); continue; } scanning.store(true, std::sync::atomic::Ordering::Relaxed); if let Err(e) = scan_and_update_packages(&scanner, &state, identity_clone.as_ref()).await { error!("Failed to update containers: {}", e); } scanning.store(false, std::sync::atomic::Ordering::Relaxed); } }); } // Peer health monitoring — check every 5 minutes { let state = state_manager.clone(); let data_dir = config.data_dir.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(300)); loop { interval.tick().await; if let Err(e) = check_peer_health(&state, &data_dir).await { debug!("Peer health check (non-fatal): {}", e); } } }); } // did:dht auto-refresh — re-publish DHT records every 2 hours if config.nostr_discovery_enabled { let data_dir = config.data_dir.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(7200)); loop { interval.tick().await; let identity_dir = data_dir.join("identity"); let node_key_path = identity_dir.join("node_key"); if !node_key_path.exists() { continue; } match tokio::fs::read(&node_key_path).await { Ok(key_bytes) if key_bytes.len() == 32 => { let mut seed = [0u8; 32]; seed.copy_from_slice(&key_bytes); let signing_key = ed25519_dalek::SigningKey::from_bytes(&seed); match crate::network::did_dht::create_and_publish(&signing_key, &[]).await { Ok(did) => tracing::info!(did = %did, "did:dht record refreshed"), Err(e) => tracing::debug!("did:dht refresh (non-fatal): {}", e), } } _ => { tracing::debug!("did:dht refresh skipped: no valid node key"); } } } }); } // Container health monitoring — auto-restart unhealthy containers // Respects webhook config: skips when disabled or ContainerCrash not subscribed crate::health_monitor::spawn_health_monitor(state_manager.clone(), config.data_dir.clone()); // Periodic telemetry reporter (every 15 min when opted in) crate::monitoring::spawn_telemetry_reporter( metrics_for_telemetry, Some(state_manager.clone()), config.data_dir.clone(), ); Ok(Self { _config: config, _identity: identity, api_handler, _state_manager: state_manager, }) } /// Serve with a graceful shutdown signal. /// When the shutdown future completes, stop accepting new connections and drain in-flight requests. pub async fn serve_with_shutdown( &self, addr: SocketAddr, shutdown: impl std::future::Future, ) -> Result<()> { let listener = TcpListener::bind(addr).await?; let active_connections = Arc::new(tokio::sync::Semaphore::new(1024)); tokio::pin!(shutdown); loop { tokio::select! { result = listener.accept() => { let (stream, peer_addr) = match result { Ok(conn) => conn, Err(e) => { error!("Failed to accept connection: {}", e); continue; } }; let handler = self.api_handler.clone(); let permit = active_connections.clone().acquire_owned().await; tokio::spawn(async move { let _permit = permit; let service = service_fn(move |req| { let handler = handler.clone(); async move { handler.handle_request(req).await .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e))) } }); if let Err(e) = Http::new() .http1_keep_alive(false) .serve_connection(stream, service) .with_upgrades() .await { error!("Error serving connection from {}: {}", peer_addr, e); } }); } _ = &mut shutdown => { info!("Shutdown signal received, draining connections..."); // Wait up to 5 seconds for in-flight requests to complete let drain_start = std::time::Instant::now(); let drain_timeout = std::time::Duration::from_secs(5); while active_connections.available_permits() < 1024 { if drain_start.elapsed() > drain_timeout { warn!("Drain timeout reached, forcing shutdown"); break; } tokio::time::sleep(std::time::Duration::from_millis(100)).await; } info!("Shutdown complete"); return Ok(()); } } } } } async fn create_docker_scanner(config: &Config) -> Result { let user = std::env::var("USER").unwrap_or_else(|_| "archipelago".to_string()); let runtime: Arc = match &config.container_runtime { ContainerRuntime::Podman => { Arc::new(archipelago_container::PodmanRuntime::new(user.clone())) } ContainerRuntime::Docker => { Arc::new(archipelago_container::DockerRuntime::new(user.clone())) } ContainerRuntime::Auto => { Arc::new( archipelago_container::AutoRuntime::new(user.clone()) .await? ) } }; Ok(DockerPackageScanner::new(runtime)) } async fn refresh_tor_address(state: &StateManager, identity: &NodeIdentity) -> Result<()> { let tor_addr = docker_packages::read_tor_address("archipelago").await; let (current_data, _) = state.get_snapshot().await; if tor_addr != current_data.server_info.tor_address { let mut data = current_data; data.server_info.tor_address = tor_addr.clone(); data.server_info.node_address = tor_addr.as_ref().map(|t| identity.node_address(t)); state.update_data(data).await; if let Some(ref addr) = tor_addr { info!("🔒 Tor address updated: {}", addr); } } Ok(()) } async fn scan_and_update_packages( scanner: &DockerPackageScanner, state: &StateManager, identity: &NodeIdentity, ) -> Result<()> { let packages = scanner.scan_containers().await?; let (current_data, _) = state.get_snapshot().await; let packages_changed = !packages.is_empty() && current_data.package_data != packages; let tor_addr = docker_packages::read_tor_address("archipelago").await; let tor_changed = tor_addr != current_data.server_info.tor_address; let first_scan = !current_data.server_info.status_info.containers_scanned; if packages_changed || tor_changed || first_scan { let mut data = current_data; if !packages.is_empty() { data.package_data = packages; } data.server_info.tor_address = tor_addr.clone(); data.server_info.node_address = tor_addr.as_ref().map(|t| identity.node_address(t)); data.server_info.status_info.containers_scanned = true; state.update_data(data).await; debug!("📦 State changed (packages={}, tor={}, first_scan={}), broadcasting update", packages_changed, tor_changed, first_scan); } Ok(()) } /// Register Archipelago DWN protocols on startup. async fn register_dwn_protocols(data_dir: &std::path::Path) -> Result<()> { use crate::network::dwn_store::{DwnStore, ProtocolDefinition}; let protocols = [ ("https://archipelago.dev/protocols/node-identity/v1", true), ("https://archipelago.dev/protocols/file-catalog/v1", true), ("https://archipelago.dev/protocols/federation/v1", false), ("https://archipelago.dev/protocols/app-deploy/v1", false), ]; let store = DwnStore::new(data_dir).await?; let existing = store.list_protocols().await?; let existing_uris: std::collections::HashSet = existing.iter().map(|p| p.protocol.clone()).collect(); let mut registered = 0; for (uri, published) in &protocols { if existing_uris.contains(*uri) { continue; } let def = ProtocolDefinition { protocol: uri.to_string(), published: *published, types: std::collections::HashMap::new(), structure: std::collections::HashMap::new(), date_registered: chrono::Utc::now().to_rfc3339(), }; store.register_protocol(&def).await?; registered += 1; } if registered > 0 { info!("📋 Registered {registered} DWN protocols"); } Ok(()) } /// Periodically check peer reachability and broadcast status changes. async fn check_peer_health(state: &StateManager, data_dir: &std::path::Path) -> Result<()> { let known_peers = peers::load_peers(data_dir).await.unwrap_or_default(); if known_peers.is_empty() { return Ok(()); } let mut new_health = std::collections::HashMap::new(); for peer in &known_peers { let reachable = node_message::check_peer_reachable(&peer.onion) .await .unwrap_or(false); new_health.insert(peer.onion.clone(), reachable); } let (current_data, _) = state.get_snapshot().await; if current_data.peer_health != new_health { let mut data = current_data; data.peer_health = new_health; state.update_data(data).await; debug!("🔗 Peer health updated, broadcasting changes"); } Ok(()) }