use crate::api::ApiHandler; use crate::config::{Config, ContainerRuntime}; use crate::container::{ docker_packages, ContainerOrchestrator, DevContainerOrchestrator, DockerPackageScanner, }; use crate::identity::{self, NodeIdentity}; use crate::monitoring::MetricsStore; use crate::node_message; use crate::nostr_discovery; use crate::nostr_handshake; use crate::peers; use crate::state::StateManager; use anyhow::Result; use hyper::server::conn::Http; use hyper::service::service_fn; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; use tracing::{debug, error, info, warn}; pub struct Server { _config: Config, _identity: Arc, api_handler: Arc, _state_manager: Arc, } struct ContainerScanGuard<'a> { scanning: &'a AtomicBool, } impl<'a> ContainerScanGuard<'a> { fn try_acquire(scanning: &'a AtomicBool) -> Option { scanning .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) .ok() .map(|_| Self { scanning }) } } impl Drop for ContainerScanGuard<'_> { fn drop(&mut self) { self.scanning.store(false, Ordering::Release); } } impl Server { pub async fn new( config: Config, orchestrator: Option>, dev_orchestrator: Option>, ) -> Result { let state_manager = Arc::new(StateManager::new()); // Load node identity and set stable server_info. // Detect seed-backed vs legacy vs fresh install. let identity_dir = config.data_dir.join("identity"); let has_seed = crate::seed::seed_exists(&config.data_dir); let has_node_key = NodeIdentity::key_exists(&identity_dir); let identity = if has_node_key { // Existing keys on disk (seed-derived or legacy random) — load them. NodeIdentity::load_or_create(&identity_dir).await? } else { // Fresh install — create a temporary identity. // Onboarding will overwrite this with seed-derived keys. NodeIdentity::load_or_create(&identity_dir).await? }; let (mut data, _) = state_manager.get_snapshot().await; data.server_info.id = identity.node_id(); data.server_info.pubkey = identity.pubkey_hex(); data.server_info.seed_backed = has_seed; // Load persisted server name let name_file = config.data_dir.join("server-name"); if let Ok(name) = tokio::fs::read_to_string(&name_file).await { let name = name.trim().to_string(); if !name.is_empty() { data.server_info.name = Some(name); } } data.server_info.tor_address = docker_packages::read_tor_address("archipelago").await; if let Some(ref tor) = data.server_info.tor_address { data.server_info.node_address = Some(identity.node_address(tor)); } state_manager.update_data(data.clone()).await; // Retry Tor address in background — Tor may not be ready at startup if data.server_info.tor_address.is_none() { let sm = state_manager.clone(); let pubkey = identity.pubkey_hex(); tokio::spawn(async move { for delay in [5, 10, 20, 30, 60] { tokio::time::sleep(std::time::Duration::from_secs(delay)).await; if let Some(tor) = docker_packages::read_tor_address("archipelago").await { let (mut d, _) = sm.get_snapshot().await; let addr = format!("archipelago://{}#{}", tor.trim_end_matches('/'), pubkey); d.server_info.tor_address = Some(tor.clone()); d.server_info.node_address = Some(addr); sm.update_data(d).await; tracing::info!( "Tor address discovered after startup: {}", &tor[..20.min(tor.len())] ); break; } } }); } // Load persisted messages (Archipelago channel) node_message::init(&config.data_dir).await; // Auto-create the Node identity on fresh boot, mirroring the node's // own signing key (seed-derived when onboarded, random otherwise). // This keeps the DID shown on the Identities page, the DID Status // card, and the DID used for peer-to-peer connects all aligned on // one value — the seed-derived node DID. Idempotent: if the entry // already exists from a prior boot, create_from_signing_key returns // the existing record unchanged. { let im = crate::identity_manager::IdentityManager::new(&config.data_dir).await; if let Ok(mgr) = im { if let Ok((list, _)) = mgr.list().await { if list.is_empty() { let signing_key = ed25519_dalek::SigningKey::from_bytes( &identity.signing_key().to_bytes(), ); match mgr .create_from_signing_key( "Node".to_string(), crate::identity_manager::IdentityPurpose::Personal, signing_key, ) .await { Ok(record) => { let _ = mgr.create_nostr_key(&record.id).await; tracing::info!(did = %record.did, "Auto-created Node identity mirroring node key"); } Err(e) => tracing::debug!("Auto-identity creation (non-fatal): {}", e), } } } } } // DHT swarm-assist (Phase 3): build the iroh provider once at startup so // release downloads can fetch from peers (origin always wins) and seed // what they hold. Inert unless built with `iroh-swarm` AND swarm_enabled. if let Err(e) = crate::swarm::init( &config.data_dir, &config.nostr_relays, config.nostr_tor_proxy.as_deref(), config.swarm_enabled, ) .await { tracing::warn!("Swarm init (non-fatal, falling back to origin-only): {}", e); } // Revoke any previously published Nostr data (runs before publish so revocation is not overwritten) let identity_dir = config.data_dir.join("identity"); let tor_proxy_revoke = config.nostr_tor_proxy.clone(); if let Err(e) = nostr_discovery::revoke_if_needed(&identity_dir, tor_proxy_revoke.as_deref()).await { tracing::debug!("Nostr revoke (non-fatal): {}", e); } // Publish presence-only to Nostr (DID + Nostr pubkey, NO onion address). // Onion addresses are exchanged privately via NIP-44 encrypted DMs. if config.nostr_discovery_enabled && !config.nostr_relays.is_empty() { let identity_dir = config.data_dir.join("identity"); let did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey).unwrap_or_default(); let version = data.server_info.version.clone(); let relays = config.nostr_relays.clone(); let tor_proxy = config.nostr_tor_proxy.clone(); tokio::spawn(async move { if let Err(e) = nostr_handshake::publish_presence( &identity_dir, &did, &version, &relays, tor_proxy.as_deref(), ) .await { tracing::debug!("Nostr presence publish (non-fatal): {}", e); } }); } info!( "🔑 Node identity: {} (pubkey: {}...)", identity.node_id(), &identity.pubkey_hex()[..16.min(identity.pubkey_hex().len())] ); let identity = Arc::new(identity); // Create metrics store and spawn background collector let metrics_store = Arc::new(MetricsStore::with_data_dir(config.data_dir.clone()).await); let metrics_for_telemetry = metrics_store.clone(); crate::monitoring::spawn_metrics_collector( metrics_store.clone(), Some(state_manager.clone()), Some(config.data_dir.clone()), ); let api_handler = Arc::new( ApiHandler::new( config.clone(), state_manager.clone(), metrics_store, orchestrator, dev_orchestrator, ) .await?, ); // Initialize mesh networking service (if config has enabled: true) { let data_dir = config.data_dir.clone(); let did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey).unwrap_or_default(); let pubkey_hex = identity.pubkey_hex(); let signing_key = identity.signing_key(); match crate::mesh::MeshService::new(&data_dir, signing_key, &did, &pubkey_hex).await { Ok(mut mesh_service) => { // Pass the human-readable server name for mesh adverts mesh_service.set_server_name(data.server_info.name.clone()); let mut mesh_config = crate::mesh::load_config(&data_dir) .await .unwrap_or_default(); // Auto-enable mesh if a radio is detected and no config exists yet if !mesh_config.enabled { let devices = crate::mesh::detect_devices().await; if !devices.is_empty() { info!("📡 Auto-detected mesh radio: {:?} — enabling mesh", devices); mesh_config.enabled = true; mesh_config.device_path = Some(devices[0].clone()); let _ = crate::mesh::save_config(&data_dir, &mesh_config).await; } } if mesh_config.enabled { if let Err(e) = mesh_service.start() { warn!("Mesh service start failed (non-fatal): {}", e); } else { info!("📡 Mesh networking started"); } } api_handler .rpc_handler() .set_mesh_service(mesh_service) .await; info!("📡 Mesh service initialized"); } Err(e) => { warn!("Mesh service init failed (non-fatal): {}", e); } } } // Initialize transport router (unified routing: mesh > lan > tor) { let data_dir = config.data_dir.clone(); let did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey).unwrap_or_default(); let pubkey_hex = identity.pubkey_hex(); let mesh_config = crate::mesh::load_config(&data_dir) .await .unwrap_or_default(); let mesh_only = mesh_config.mesh_only_mode.unwrap_or(false); match crate::transport::PeerRegistry::load(&data_dir).await { Ok(registry) => { let registry = std::sync::Arc::new(registry); let mut transports: Vec> = Vec::new(); // Tor transport (always register — availability checked dynamically) transports.push(Box::new(crate::transport::tor::TorTransport::new( &pubkey_hex, ))); // Mesh transport (wraps the mesh service) transports.push(Box::new( crate::transport::mesh_transport::MeshTransport::new( api_handler.rpc_handler().mesh_service_arc(), ), )); // LAN transport (mDNS discovery) let mut lan = crate::transport::lan::LanTransport::new(&did, &pubkey_hex, 5678); match lan.start(registry.clone()) { Ok(()) => info!("📡 LAN transport (mDNS) started"), Err(e) => debug!("LAN transport init (non-fatal): {}", e), } transports.push(Box::new(lan)); let router = std::sync::Arc::new(crate::transport::TransportRouter::new( transports, registry, mesh_only, )); api_handler.rpc_handler().set_transport_router(router).await; info!("📡 Transport router initialized (mesh_only={})", mesh_only); } Err(e) => { warn!("Transport router init failed (non-fatal): {}", e); } } } // Register Archipelago DWN protocols (background, non-blocking) { let data_dir = config.data_dir.clone(); tokio::spawn(async move { if let Err(e) = register_dwn_protocols(&data_dir).await { debug!("DWN protocol registration (non-fatal): {}", e); } }); } // Periodic Tor address refresh (runs regardless of dev_mode) // Picks up hostname when Tor creates it after startup/rotation (30-60s delay) { let state = state_manager.clone(); let identity_clone = identity.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(30)); loop { interval.tick().await; if let Err(e) = refresh_tor_address(&state, identity_clone.as_ref()).await { debug!("Tor address refresh (non-fatal): {}", e); } } }); } // Initialize container scanner — discovers installed apps from Podman/Docker { let scanner = create_docker_scanner(&config).await?; let state = state_manager.clone(); let identity_clone = identity.clone(); let data_dir = config.data_dir.clone(); let scan_kick = api_handler.rpc_handler().scan_kick(); let scan_tick = api_handler.rpc_handler().scan_tick(); // Initial scan (delayed to let crash recovery finish first) tokio::spawn(async move { // Brief delay for containers to stabilize after boot tokio::time::sleep(Duration::from_secs(3)).await; info!("🐳 Scanning containers..."); // Tracks how many consecutive scans each container has been absent from. // Prevents UI flapping when podman intermittently returns incomplete results. let mut absence_tracker: HashMap = HashMap::new(); // Tracks when each container first entered a transitional state // (Stopping / Starting / Restarting / ...). Used by the merge // loop below to ignore podman's live state during a pending // lifecycle op, and to break out if the spawned task dies // without ever writing a final state. let mut transitional_since: HashMap = HashMap::new(); let mut scan_backoff_until: Option = None; if let Err(e) = scan_and_update_packages( &scanner, &state, identity_clone.as_ref(), &data_dir, &mut absence_tracker, &mut transitional_since, ) .await { error!("Failed to scan containers: {}", e); if is_podman_scan_timeout(&e) { scan_backoff_until = Some(Instant::now() + Duration::from_secs(30)); warn!("Podman container scan timed out; backing off scans for 30s"); } } // Bump the scan-completion counter so any caller waiting on a // kicked scan (install/update success path) can proceed. scan_tick.send_modify(|n| *n = n.wrapping_add(1)); // Periodic scan every 60 seconds (only broadcasts if state changed). // Also wakes immediately when `scan_kick` fires — install/update // success paths poke it so the fresh manifest (with populated // interfaces) lands before they flip state to Running. // Uses an in-flight guard to skip scans when a previous one is still running let mut interval = tokio::time::interval(Duration::from_secs(60)); // Skip missed ticks instead of catching up — prevents burst of scans // after a slow podman response (which causes DB lock storms) interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let scanning = std::sync::Arc::new(AtomicBool::new(false)); loop { tokio::select! { _ = interval.tick() => {} _ = scan_kick.notified() => { debug!("Scan kicked by install/update success — running immediately"); } } if let Some(until) = scan_backoff_until { if Instant::now() < until { debug!("Skipping container scan — Podman scan backoff active"); scan_tick.send_modify(|n| *n = n.wrapping_add(1)); continue; } } let Some(_scan_guard) = ContainerScanGuard::try_acquire(&scanning) else { debug!("Skipping container scan — previous scan still in progress"); scan_tick.send_modify(|n| *n = n.wrapping_add(1)); continue; }; let scan_result = scan_and_update_packages( &scanner, &state, identity_clone.as_ref(), &data_dir, &mut absence_tracker, &mut transitional_since, ) .await; if let Err(e) = scan_result { error!("Failed to update containers: {}", e); if is_podman_scan_timeout(&e) { scan_backoff_until = Some(Instant::now() + Duration::from_secs(30)); warn!("Podman container scan timed out; backing off scans for 30s"); } } else { scan_backoff_until = None; } scan_tick.send_modify(|n| *n = n.wrapping_add(1)); } }); } // Peer health monitoring — check every 5 minutes { let state = state_manager.clone(); let data_dir = config.data_dir.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(300)); loop { interval.tick().await; if let Err(e) = check_peer_health(&state, &data_dir).await { debug!("Peer health check (non-fatal): {}", e); } } }); } // FIPS seed-anchor apply loop — every 5 minutes we re-push the // configured seed anchors into the running fips daemon via // `fipsctl connect`. This keeps the mesh bootstrap resilient: // operators add cluster-local anchors in the UI, and a daemon // restart or a flaky public anchor can't strand the node. // First run is delayed 30s so fips has time to come up after // onboarding before we start dialing. { let data_dir = config.data_dir.clone(); tokio::spawn(async move { tokio::time::sleep(Duration::from_secs(30)).await; let mut interval = tokio::time::interval(Duration::from_secs(300)); loop { interval.tick().await; match crate::fips::anchors::load(&data_dir).await { Ok(list) if !list.is_empty() => { let _ = crate::fips::anchors::apply(&list).await; } Ok(_) => { /* no seed anchors configured yet */ } Err(e) => { tracing::debug!("Seed-anchor apply: load failed (non-fatal): {}", e) } } } }); } // did:dht auto-refresh — re-publish DHT records every 2 hours if config.nostr_discovery_enabled { let data_dir = config.data_dir.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(7200)); loop { interval.tick().await; let identity_dir = data_dir.join("identity"); let node_key_path = identity_dir.join("node_key"); if !node_key_path.exists() { continue; } match tokio::fs::read(&node_key_path).await { Ok(key_bytes) if key_bytes.len() == 32 => { let mut seed = [0u8; 32]; seed.copy_from_slice(&key_bytes); let signing_key = ed25519_dalek::SigningKey::from_bytes(&seed); match crate::network::did_dht::create_and_publish(&signing_key, &[]) .await { Ok(did) => tracing::info!(did = %did, "did:dht record refreshed"), Err(e) => tracing::debug!("did:dht refresh (non-fatal): {}", e), } } _ => { tracing::debug!("did:dht refresh skipped: no valid node key"); } } } }); } // Periodic federation state sync — every 30 min we call // federation::sync_with_peer on each Trusted peer. Without this // users had to manually click Sync for `fips_npub`/transport // badge/state updates to propagate; now it happens in the // background. Staggers peers with a 5s delay so we don't thunder // the Tor SOCKS proxy. Sync itself already prefers FIPS. { let data_dir = config.data_dir.clone(); let state = state_manager.clone(); tokio::spawn(async move { // First run 60s after boot to let onboarding settle. tokio::time::sleep(Duration::from_secs(60)).await; let mut interval = tokio::time::interval(Duration::from_secs(1800)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { interval.tick().await; let Ok(nodes) = crate::federation::load_nodes(&data_dir).await else { continue; }; if nodes.is_empty() { continue; } let (data, _) = state.get_snapshot().await; let Ok(local_did) = crate::identity::did_key_from_pubkey_hex(&data.server_info.pubkey) else { continue; }; let identity_dir = data_dir.join("identity"); let Ok(node_identity) = crate::identity::NodeIdentity::load_or_create(&identity_dir).await else { continue; }; for node in &nodes { if node.trust_level == crate::federation::TrustLevel::Untrusted { continue; } match crate::federation::sync_with_peer( &data_dir, node, &local_did, |bytes| node_identity.sign(bytes), ) .await { Ok(_) => debug!( "Periodic federation sync ok: {}", node.did.chars().take(20).collect::() ), Err(e) => debug!( "Periodic federation sync with {}: {}", node.did.chars().take(20).collect::(), e ), } tokio::time::sleep(Duration::from_secs(5)).await; } } }); } // Container health monitoring — auto-restart unhealthy containers // Respects webhook config: skips when disabled or ContainerCrash not subscribed crate::health_monitor::spawn_health_monitor(state_manager.clone(), config.data_dir.clone()); // Periodic telemetry reporter (every 15 min when opted in) crate::monitoring::spawn_telemetry_reporter( metrics_for_telemetry, Some(state_manager.clone()), config.data_dir.clone(), ); // Post-onboarding auto-activation for archipelago-fips. Runs once // at startup: if fips_key is on disk, install /etc/fips/fips.yaml // (schema-refreshed) and start the service. This removes the // need for a user-facing "Activate" button — the node comes up // with FIPS running whenever the seed has been onboarded. Also // self-heals legacy raw-byte fips.key files (load_fips_keys // rewrites them as bech32 nsec the first time they're read). // Pre-onboarding nodes: ConditionPathExists on the service unit // + the `fips_key_exists` guard here keep this quiet. { let data_dir = config.data_dir.clone(); tokio::spawn(async move { let identity_dir = data_dir.join("identity"); if !crate::identity::fips_key_exists(&identity_dir) { tracing::debug!("FIPS auto-activate skipped: fips_key not on disk"); return; } // Trigger the migration path in load_fips_keys so old raw-byte // key files are rewritten as bech32 before fips.yaml install. if let Err(e) = crate::identity::load_fips_keys(&identity_dir).await { tracing::warn!("FIPS key load/migrate failed: {}", e); return; } // Check if the installed fips.yaml matches what we'd // render now. If not, we need to restart the daemon after // reinstalling so it picks up schema changes (e.g. the // v1.7.25 re-addition of the TCP transport). Without this, // OTA'd nodes would be stuck on the old UDP-only config // until someone manually clicked Reconnect. let expected = crate::fips::config::render_config_yaml(); let installed = tokio::fs::read_to_string("/etc/fips/fips.yaml").await.ok(); let config_changed = installed.as_deref() != Some(expected.as_str()); if let Err(e) = crate::fips::config::install(&identity_dir).await { tracing::warn!("FIPS config install failed on startup: {}", e); return; } if config_changed { tracing::info!( "FIPS config schema changed on disk — restarting daemon to pick up new transports" ); // Restart whichever unit is actually supervising // the daemon (archipelago-fips vs upstream fips). let unit = crate::fips::service::active_unit().await; if let Err(e) = crate::fips::service::restart(unit).await { tracing::warn!( "FIPS restart after config migration failed on {}: {} — user can retry via fips.reconnect", unit, e ); } } if let Err(e) = crate::fips::service::activate(crate::fips::SERVICE_UNIT).await { tracing::warn!( "archipelago-fips activate failed on startup: {} — user can retry via fips.install RPC", e ); return; } tracing::info!("archipelago-fips auto-activated on startup"); }); } Ok(Self { _config: config, _identity: identity, api_handler, _state_manager: state_manager, }) } /// Serve with a graceful shutdown signal. /// /// `main_addr` is the primary listener (historically `127.0.0.1:5678`). /// The main listener always comes up on `main_addr`. The FIPS peer /// listener (path-filtered, bound to `fips0`'s ULA) is managed by a /// late-binding task that polls every 30s: if fips0 isn't up at /// startup (pre-onboarding install, legacy node pre-fips.install), /// it keeps trying until the interface appears — no archipelago /// restart required after the user activates FIPS. /// /// When `shutdown` completes, both listeners stop accepting and drain /// in-flight requests (bounded by `DRAIN_TIMEOUT`). pub async fn serve_with_shutdown( &self, main_addr: SocketAddr, shutdown: impl std::future::Future, ) -> Result<()> { let active_connections = Arc::new(tokio::sync::Semaphore::new(1024)); let (tx, rx_main) = tokio::sync::watch::channel(false); let main_task = tokio::spawn(accept_loop( self.api_handler.clone(), TcpListener::bind(main_addr).await?, active_connections.clone(), false, // main listener: no path filter rx_main, main_addr, )); // Peer listener: late-binding so we don't need an archipelago // restart when fips0 comes up after onboarding. let peer_task = tokio::spawn(peer_late_bind_loop( self.api_handler.clone(), active_connections.clone(), tx.subscribe(), )); shutdown.await; info!("Shutdown signal received, draining connections..."); let _ = tx.send(true); // Wait up to 5s for in-flight requests. let drain_start = std::time::Instant::now(); let drain_timeout = std::time::Duration::from_secs(5); while active_connections.available_permits() < 1024 { if drain_start.elapsed() > drain_timeout { warn!("Drain timeout reached, forcing shutdown"); break; } tokio::time::sleep(std::time::Duration::from_millis(100)).await; } let _ = main_task.await; let _ = peer_task.await; info!("Shutdown complete"); Ok(()) } } /// Poll every 30s for `fips0`'s ULA; when it appears, bind the peer /// listener and run the normal accept loop. If the bind fails (port /// already taken, permissions), log and keep retrying. Returns on /// shutdown. First tick fires immediately so the hot path for /// already-up fips0 is still zero-cost. async fn peer_late_bind_loop( handler: Arc, active_connections: Arc, mut shutdown_rx: tokio::sync::watch::Receiver, ) { let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { tokio::select! { _ = interval.tick() => { let Some(ip) = crate::fips::iface::fips0_ula() else { continue }; let addr = SocketAddr::new( std::net::IpAddr::V6(ip), crate::fips::dial::PEER_PORT, ); let listener = match TcpListener::bind(addr).await { Ok(l) => l, Err(e) => { warn!("FIPS peer listener bind {} failed: {} — retrying in 30s", addr, e); continue; } }; info!("FIPS peer listener bound {}", addr); // Once bound, serve until shutdown fires. accept_loop // returns on shutdown, which also ends this outer loop. accept_loop( handler, listener, active_connections, true, // peer listener: apply path filter shutdown_rx, addr, ) .await; return; } _ = shutdown_rx.changed() => { if *shutdown_rx.borrow() { return; } } } } } /// Whitelist of HTTP paths reachable via the peer-facing (FIPS) listener. /// Every entry is an endpoint already protected by cryptographic auth /// (ed25519 signature verification inside the handler, federation DID /// headers checked by the content server, or JSON-RPC methods whose /// handlers verify per-message signatures). /// /// Anything not on this list returns 404 on the peer listener. pub fn is_peer_allowed_path(path: &str) -> bool { // Exact matches matches!( path, "/health" | "/rpc/v1" | "/archipelago/node-message" | "/archipelago/mesh-typed" | "/dwn" | "/transport/inbox" // Content *catalog* — the peer-browse entry point. This is the // exact path `/content` (no trailing slash); the prefix match // below only covers `/content/` item fetches, so without // this the catalog 404s over the mesh and `content.browse-peer` // fails with "Peer returned error: 404 Not Found" (and never // falls back to Tor, since a 404 is a successful HTTP exchange). | "/content" ) // Prefix-matched content endpoints (peer file browse + fetch) || path.starts_with("/content/") } async fn accept_loop( handler: Arc, listener: TcpListener, active_connections: Arc, peer_only: bool, mut shutdown_rx: tokio::sync::watch::Receiver, local_addr: SocketAddr, ) { loop { tokio::select! { result = listener.accept() => { let (stream, peer_addr) = match result { Ok(c) => c, Err(e) => { error!("{} accept error: {}", local_addr, e); continue; } }; let handler = handler.clone(); let permit = active_connections.clone().acquire_owned().await; tokio::spawn(async move { let _permit = permit; let service = service_fn(move |req: hyper::Request| { let handler = handler.clone(); async move { if peer_only && !is_peer_allowed_path(req.uri().path()) { let resp = hyper::Response::builder() .status(hyper::StatusCode::NOT_FOUND) .body(hyper::Body::empty()) .expect("static response builds"); return Ok::<_, std::io::Error>(resp); } handler .handle_request(req) .await .map_err(|e| std::io::Error::other(format!("{}", e))) } }); if let Err(e) = Http::new() .http1_keep_alive(false) .serve_connection(stream, service) .with_upgrades() .await { error!("Error serving connection from {}: {}", peer_addr, e); } }); } _ = shutdown_rx.changed() => { if *shutdown_rx.borrow() { return; } } } } } async fn create_docker_scanner(config: &Config) -> Result { let user = std::env::var("USER").unwrap_or_else(|_| "archipelago".to_string()); let runtime: Arc = match &config.container_runtime { ContainerRuntime::Podman => { Arc::new(archipelago_container::PodmanRuntime::new(user.clone())) } ContainerRuntime::Docker => { Arc::new(archipelago_container::DockerRuntime::new(user.clone())) } ContainerRuntime::Auto => { Arc::new(archipelago_container::AutoRuntime::new(user.clone()).await?) } }; Ok(DockerPackageScanner::new(runtime)) } async fn refresh_tor_address(state: &StateManager, identity: &NodeIdentity) -> Result<()> { let tor_addr = docker_packages::read_tor_address("archipelago").await; let (current_data, _) = state.get_snapshot().await; if tor_addr != current_data.server_info.tor_address { let mut data = current_data; data.server_info.tor_address = tor_addr.clone(); data.server_info.node_address = tor_addr.as_ref().map(|t| identity.node_address(t)); state.update_data(data).await; if let Some(ref addr) = tor_addr { info!("🔒 Tor address updated: {}", addr); } } Ok(()) } /// Number of consecutive absent scans before removing a container from state. /// 3 scans × 30s = 90 seconds of absence before removal. const CONTAINER_ABSENCE_THRESHOLD: u32 = 3; /// Maximum time a package entry may remain stuck in a transitional state /// before the scan loop overrides it with podman's live state. /// /// Rationale: the longest single-container stop timeout is bitcoin-core at /// 600s. 2× that gives the spawned task ample margin before we assume it /// died (panic, OOM, process restart mid-stop) and fall back to the /// scanner's authoritative view. Applies to all transitional variants. const TRANSITIONAL_STUCK_TIMEOUT: Duration = Duration::from_secs(120); /// Multi-container installs can legitimately spend several minutes before the /// primary user-facing container exists. BTCPay, for example, pulls/starts /// Postgres and NBXplorer before `btcpay-server`; do not erase its installing /// card just because the primary container is absent during that setup window. const INSTALLING_STUCK_TIMEOUT: Duration = Duration::from_secs(20 * 60); fn transitional_stuck_timeout(state: &crate::data_model::PackageState) -> Duration { use crate::data_model::PackageState::*; match state { Installing | Starting | Restarting => INSTALLING_STUCK_TIMEOUT, _ => TRANSITIONAL_STUCK_TIMEOUT, } } /// Returns true if `state` is one of the transitional variants that a /// `spawn_transitional`-style background task owns. While such a state is /// set, the package scanner must not overwrite it with whatever podman /// reports (see `merge_preserving_transitional`). fn is_transitional(state: &crate::data_model::PackageState) -> bool { use crate::data_model::PackageState::*; matches!( state, Installing | Stopping | Starting | Restarting | Updating | Removing | CreatingBackup | RestoringBackup | BackingUp ) } fn absent_transitional_replacement( state: &crate::data_model::PackageState, ) -> Option { match state { // A stop operation is complete once the container record disappears. // Do not leave the app card wedged in "Stopping..." just because the // background task died or the backend restarted before it wrote back. crate::data_model::PackageState::Stopping => Some(crate::data_model::PackageState::Stopped), _ => None, } } /// Merge a fresh scan entry `fresh` into `existing` while preserving /// `existing.state` (which is transitional — the RPC spawn task owns it). /// Non-state observability fields are taken from `fresh` so the UI still /// sees live health / exit_code / lan_address readings during a transition. fn merge_preserving_transitional( existing: &crate::data_model::PackageDataEntry, fresh: &crate::data_model::PackageDataEntry, user_stop_requested: bool, ) -> crate::data_model::PackageDataEntry { let state = match (&existing.state, &fresh.state) { // A user-initiated stop must keep showing Stopping while podman still // reports Running. Repair/restart transitions do not have a user-stop // marker, so a fresh Running scan means the app recovered. (crate::data_model::PackageState::Stopping, crate::data_model::PackageState::Running) if !user_stop_requested => { fresh.state.clone() } // Removing with a live running container is stale: uninstall either // failed or Archipelago restarted before the spawned task could revert // state. Let the scanner recover the UI immediately instead of // keeping the app wedged in Removing for 20 minutes. (crate::data_model::PackageState::Removing, crate::data_model::PackageState::Running) => { fresh.state.clone() } _ => existing.state.clone(), }; crate::data_model::PackageDataEntry { state, // install_progress and uninstall_stage are also owned by the // initiating op (same reason as state) — keep them. install_progress: existing.install_progress.clone(), uninstall_stage: existing.uninstall_stage.clone(), // Everything else comes from the fresh scan. health: fresh.health.clone(), exit_code: fresh.exit_code, static_files: fresh.static_files.clone(), manifest: fresh.manifest.clone(), installed: fresh.installed.clone(), available_update: fresh.available_update.clone(), } } fn is_podman_scan_timeout(error: &anyhow::Error) -> bool { let msg = format!("{:#}", error); msg.contains("podman ps") && msg.contains("timed out") } async fn scan_and_update_packages( scanner: &DockerPackageScanner, state: &StateManager, identity: &NodeIdentity, data_dir: &std::path::Path, absence_tracker: &mut HashMap, transitional_since: &mut HashMap, ) -> Result<()> { let mut packages = scanner.scan_containers().await?; let user_stopped = crate::crash_recovery::load_user_stopped(data_dir).await; for (id, pkg) in packages.iter_mut() { if pkg.state == crate::data_model::PackageState::Exited && user_stopped.contains(id) { pkg.state = crate::data_model::PackageState::Stopped; pkg.exit_code = None; } } normalize_reachable_package_health(&mut packages).await; let (current_data, _) = state.get_snapshot().await; let tor_addr = docker_packages::read_tor_address("archipelago").await; let tor_changed = tor_addr != current_data.server_info.tor_address; let first_scan = !current_data.server_info.status_info.containers_scanned; // Check if update scheduler has found an available update let update_available = crate::update::load_state(std::path::Path::new("/var/lib/archipelago")) .await .map(|s| s.available_update.is_some()) .unwrap_or(false); let update_changed = update_available != current_data.server_info.status_info.updated; // Empty scan result = podman failure or timeout, preserve existing state if packages.is_empty() && !first_scan { if tor_changed || update_changed { let mut data = current_data; data.server_info.tor_address = tor_addr.clone(); data.server_info.node_address = tor_addr.as_ref().map(|t| identity.node_address(t)); data.server_info.status_info.updated = update_available; state.update_data(data).await; } return Ok(()); } // Merge scan results with current state instead of full replacement. // This prevents containers from vanishing when podman intermittently // returns incomplete results under heavy load. let mut merged = current_data.package_data.clone(); let mut changed = false; // Update/add containers found in this scan. // // Transitional states (Stopping, Starting, Restarting, Installing, // Updating, Removing, backup variants) are owned by the RPC spawn_task // that initiated the operation — podman's live state during the op is // meaningless ("running" during a graceful stop, "exited" during a // restart, etc.) and must not be written back. See // `merge_preserving_transitional` for the exact rule. // // Escape hatch: if a package has been in a transitional state for // longer than TRANSITIONAL_STUCK_TIMEOUT we assume the spawned task // died without cleanup and let the scan override it. let now = Instant::now(); for (id, pkg) in &packages { absence_tracker.remove(id); let existing = merged.get(id); let overwrite = match existing { Some(existing_entry) if is_transitional(&existing_entry.state) => { let entered = *transitional_since.entry(id.clone()).or_insert(now); let timeout = transitional_stuck_timeout(&existing_entry.state); let stuck = now.duration_since(entered) > timeout; if stuck { warn!( "Container {} stuck in {:?} for >{}s; overriding with scan state {:?}", id, existing_entry.state, timeout.as_secs(), pkg.state ); transitional_since.remove(id); true } else { // Keep existing transitional state, but merge non-state // observability fields (health, exit_code, lan_address // via installed) from the fresh scan so the UI still // sees live readings. let merged_entry = merge_preserving_transitional( existing_entry, pkg, user_stopped.contains(id), ); if existing.cloned() != Some(merged_entry.clone()) { merged.insert(id.clone(), merged_entry); changed = true; } false } } Some(_) => { // Not transitional: the side-table may hold a stale entry // from a previous transition on this id; drop it. transitional_since.remove(id); existing != Some(pkg) } None => { transitional_since.remove(id); true } }; if overwrite && merged.get(id) != Some(pkg) { merged.insert(id.clone(), pkg.clone()); changed = true; } } // Track containers in state but missing from this scan. // Only remove after CONTAINER_ABSENCE_THRESHOLD consecutive absent scans. let current_ids: Vec = merged.keys().cloned().collect(); for id in current_ids { if !packages.contains_key(&id) { // Don't evict packages mid-transition: Installing/Updating/Removing // legitimately have no live container yet (image still pulling) or // briefly (during recreate). The absence-eviction here was racing // installs and removing apps from the UI 14s in. The transitional // owner (spawn_task) is responsible for clearing state, not us. if let Some(entry) = merged.get(&id) { if is_transitional(&entry.state) { if let Some(replacement) = absent_transitional_replacement(&entry.state) { let mut updated = entry.clone(); updated.state = replacement; updated.health = None; updated.exit_code = None; updated.install_progress = None; updated.uninstall_stage = None; merged.insert(id.clone(), updated); transitional_since.remove(&id); absence_tracker.remove(&id); changed = true; continue; } let entered = *transitional_since.entry(id.clone()).or_insert(now); let timeout = transitional_stuck_timeout(&entry.state); if now.duration_since(entered) > timeout { warn!( "Container {} stuck in {:?} and absent for >{}s; removing stale transitional state", id, entry.state, timeout.as_secs() ); merged.remove(&id); transitional_since.remove(&id); changed = true; } absence_tracker.remove(&id); continue; } // Quadlet-generated units run containers with `--rm`, so a // clean user stop removes the Podman record. Keep the package // visible as Stopped while the user-stopped marker exists so // package.start can recreate it via systemd/Quadlet. if entry.state == crate::data_model::PackageState::Stopped && user_stopped.contains(&id) { absence_tracker.remove(&id); continue; } } let count = absence_tracker.entry(id.clone()).or_insert(0); *count += 1; if *count >= CONTAINER_ABSENCE_THRESHOLD { debug!( "Removing {} from state after {} consecutive absent scans", id, count ); merged.remove(&id); absence_tracker.remove(&id); transitional_since.remove(&id); changed = true; } } } if changed || tor_changed || first_scan || update_changed { let mut data = current_data; data.package_data = merged; data.server_info.tor_address = tor_addr.clone(); data.server_info.node_address = tor_addr.as_ref().map(|t| identity.node_address(t)); data.server_info.status_info.containers_scanned = true; data.server_info.status_info.updated = update_available; state.update_data(data).await; debug!( "📦 State changed (packages={}, tor={}, first_scan={}, update={}), broadcasting update", changed, tor_changed, first_scan, update_changed ); } Ok(()) } async fn normalize_reachable_package_health( packages: &mut HashMap, ) { for (id, pkg) in packages.iter_mut() { if pkg.state != crate::data_model::PackageState::Running { continue; } if !matches!(pkg.health.as_deref(), Some("starting" | "unhealthy" | "1")) { continue; } let Some(port) = pkg .installed .as_ref() .and_then(|i| i.interface_addresses.get("main")) .and_then(|a| a.lan_address.as_deref()) .and_then(port_from_url) .or_else(|| fallback_package_port(id)) else { continue; }; if frontend_port_http_ready(port).await { debug!(app_id = %id, port, "normalizing reachable package health to healthy"); pkg.health = Some("healthy".to_string()); ensure_main_lan_address(pkg, port); } } } async fn frontend_port_http_ready(port: u16) -> bool { let Ok(Ok(mut stream)) = tokio::time::timeout( Duration::from_secs(2), tokio::net::TcpStream::connect(("127.0.0.1", port)), ) .await else { return false; }; let request = b"GET / HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n"; if stream.write_all(request).await.is_err() { return false; } let mut buf = [0u8; 64]; let Ok(Ok(n)) = tokio::time::timeout(Duration::from_secs(2), stream.read(&mut buf)).await else { return false; }; if n == 0 { return false; } let head = String::from_utf8_lossy(&buf[..n]); head.starts_with("HTTP/1.1 2") || head.starts_with("HTTP/1.1 3") || head.starts_with("HTTP/1.0 2") || head.starts_with("HTTP/1.0 3") } fn ensure_main_lan_address(pkg: &mut crate::data_model::PackageDataEntry, port: u16) { let Some(installed) = pkg.installed.as_mut() else { return; }; let main = installed .interface_addresses .entry("main".to_string()) .or_insert_with(|| crate::data_model::InterfaceAddress { tor_address: String::new(), lan_address: None, }); if main.lan_address.is_none() { main.lan_address = Some(format!("http://localhost:{port}")); } } fn fallback_package_port(app_id: &str) -> Option { match app_id { "fedimint" | "fedimintd" => Some(8175), "filebrowser" => Some(8083), "indeedhub" => Some(7778), "nginx-proxy-manager" => Some(8081), "nostr-rs-relay" => Some(18081), _ => None, } } fn port_from_url(url: &str) -> Option { let after_scheme = url.split_once("://").map(|(_, rest)| rest).unwrap_or(url); let host_port = after_scheme.split('/').next().unwrap_or(after_scheme); let port = host_port.rsplit_once(':')?.1; port.parse::().ok() } /// Register Archipelago DWN protocols on startup. async fn register_dwn_protocols(data_dir: &std::path::Path) -> Result<()> { use crate::network::dwn_store::{DwnStore, ProtocolDefinition}; let protocols = [ ("https://archipelago.dev/protocols/node-identity/v1", true), ("https://archipelago.dev/protocols/file-catalog/v1", true), ("https://archipelago.dev/protocols/federation/v1", false), ("https://archipelago.dev/protocols/app-deploy/v1", false), ]; let store = DwnStore::new(data_dir).await?; let existing = store.list_protocols().await?; let existing_uris: std::collections::HashSet = existing.iter().map(|p| p.protocol.clone()).collect(); let mut registered = 0; for (uri, published) in &protocols { if existing_uris.contains(*uri) { continue; } let def = ProtocolDefinition { protocol: uri.to_string(), published: *published, types: std::collections::HashMap::new(), structure: std::collections::HashMap::new(), date_registered: chrono::Utc::now().to_rfc3339(), }; store.register_protocol(&def).await?; registered += 1; } if registered > 0 { info!("📋 Registered {registered} DWN protocols"); } Ok(()) } /// Periodically check peer reachability and broadcast status changes. async fn check_peer_health(state: &StateManager, data_dir: &std::path::Path) -> Result<()> { let known_peers = peers::load_peers(data_dir).await.unwrap_or_default(); if known_peers.is_empty() { return Ok(()); } let mut new_health = std::collections::HashMap::new(); for peer in &known_peers { let fips_npub = crate::federation::fips_npub_for_onion(data_dir, &peer.onion).await; let reachable = node_message::check_peer_reachable(&peer.onion, fips_npub.as_deref()) .await .unwrap_or(false); new_health.insert(peer.onion.clone(), reachable); } let (current_data, _) = state.get_snapshot().await; if current_data.peer_health != new_health { let mut data = current_data; data.peer_health = new_health; state.update_data(data).await; debug!("🔗 Peer health updated, broadcasting changes"); } Ok(()) } #[cfg(test)] mod merge_tests { use super::*; use crate::data_model::{Description, Manifest, PackageDataEntry, PackageState, StaticFiles}; fn make_manifest() -> Manifest { Manifest { id: "lnd".to_string(), title: "LND".to_string(), version: "0.18.4".to_string(), description: Description { short: "".to_string(), long: "".to_string(), }, release_notes: "".to_string(), license: "".to_string(), wrapper_repo: "".to_string(), upstream_repo: "".to_string(), support_site: "".to_string(), marketing_site: "".to_string(), donation_url: None, author: None, website: None, interfaces: None, tier: None, } } fn make_static() -> StaticFiles { StaticFiles { license: "".to_string(), instructions: "".to_string(), icon: "".to_string(), } } fn make_entry(state: PackageState, health: Option<&str>) -> PackageDataEntry { PackageDataEntry { state, health: health.map(|s| s.to_string()), exit_code: None, static_files: make_static(), manifest: make_manifest(), installed: None, install_progress: None, uninstall_stage: None, available_update: None, } } #[test] fn peer_path_filter_allows_content_catalog_and_items() { // Regression: the content *catalog* is exactly "/content" (no trailing // slash). It must be reachable over the peer (FIPS) listener, else // `content.browse-peer` 404s over the mesh. Item fetches are // "/content/". assert!(is_peer_allowed_path("/content"), "catalog must be allowed"); assert!( is_peer_allowed_path("/content/abc123"), "items must be allowed" ); assert!(is_peer_allowed_path("/rpc/v1")); assert!(is_peer_allowed_path("/health")); // Not on the allow-list → rejected (no broad surface over the mesh). assert!(!is_peer_allowed_path("/contention"), "must not prefix-leak"); assert!(!is_peer_allowed_path("/")); assert!(!is_peer_allowed_path("/rpc/v2")); } #[test] fn preserves_transitional_state_on_merge() { // existing: user initiated a stop, spawn_transitional set Stopping. // fresh: podman hasn't finished the stop yet, still reports Running. // Expected: merged state stays Stopping — podman's live view must // not clobber the transitional state owned by the RPC spawn task. let existing = make_entry(PackageState::Stopping, Some("healthy")); let fresh = make_entry(PackageState::Running, Some("starting")); let merged = merge_preserving_transitional(&existing, &fresh, true); assert_eq!(merged.state, PackageState::Stopping); } #[test] fn non_user_stopping_recovers_when_container_is_running() { let existing = make_entry(PackageState::Stopping, Some("unknown")); let fresh = make_entry(PackageState::Running, Some("healthy")); let merged = merge_preserving_transitional(&existing, &fresh, false); assert_eq!(merged.state, PackageState::Running); assert_eq!(merged.health.as_deref(), Some("healthy")); } #[test] fn merges_fresh_observability_fields() { // Non-state observability fields (health, exit_code, installed) // MUST come from the fresh scan even while state is preserved — // the UI still shows live health/health during a transition. let mut existing = make_entry(PackageState::Stopping, Some("healthy")); existing.exit_code = None; let mut fresh = make_entry(PackageState::Running, Some("unhealthy")); fresh.exit_code = Some(0); let merged = merge_preserving_transitional(&existing, &fresh, true); assert_eq!(merged.state, PackageState::Stopping); assert_eq!(merged.health.as_deref(), Some("unhealthy")); assert_eq!(merged.exit_code, Some(0)); } #[test] fn stale_removing_recovers_when_container_is_running() { let existing = make_entry(PackageState::Removing, Some("unknown")); let fresh = make_entry(PackageState::Running, Some("healthy")); let merged = merge_preserving_transitional(&existing, &fresh, false); assert_eq!(merged.state, PackageState::Running); assert_eq!(merged.health.as_deref(), Some("healthy")); } #[test] fn is_transitional_covers_all_variants() { for s in [ PackageState::Installing, PackageState::Stopping, PackageState::Starting, PackageState::Restarting, PackageState::Updating, PackageState::Removing, PackageState::CreatingBackup, PackageState::RestoringBackup, PackageState::BackingUp, ] { assert!(is_transitional(&s), "{:?} should be transitional", s); } for s in [ PackageState::Installed, PackageState::Stopped, PackageState::Exited, PackageState::Running, ] { assert!(!is_transitional(&s), "{:?} should NOT be transitional", s); } } #[test] fn installing_uses_longer_stale_timeout_than_other_transitions() { assert!(transitional_stuck_timeout(&PackageState::Installing) > TRANSITIONAL_STUCK_TIMEOUT); assert_eq!( transitional_stuck_timeout(&PackageState::Stopping), TRANSITIONAL_STUCK_TIMEOUT ); } #[test] fn absent_stopping_transitions_to_stopped() { assert_eq!( absent_transitional_replacement(&PackageState::Stopping), Some(PackageState::Stopped) ); } #[test] fn absent_installing_still_waits_for_owner() { assert_eq!( absent_transitional_replacement(&PackageState::Installing), None ); } }