// Container Health Monitor // Checks container health every 60s, auto-restarts unhealthy containers (max 3 times) // with exponential backoff (10s, 30s, 90s), dependency-aware startup ordering, // and sends WebSocket notifications to the UI on failure. use crate::data_model::{Notification, NotificationLevel}; use crate::state::StateManager; use crate::webhooks::{self, WebhookEvent}; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; use tracing::{debug, info, warn}; const MAX_RESTART_ATTEMPTS: u32 = 3; const CHECK_INTERVAL_SECS: u64 = 60; /// Backoff delays per attempt: 10s, 30s, 90s const BACKOFF_DELAYS_SECS: [u64; 3] = [10, 30, 90]; /// Reset restart counter after 1 hour of stability const STABILITY_RESET_SECS: u64 = 3600; /// Container startup tier for dependency ordering. /// Lower tiers start first. Containers in the same tier start together. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] enum StartupTier { /// Databases: postgres, redis, mariadb, mysql Database = 0, /// Core infrastructure: bitcoin-knots, bitcoin-core CoreInfra = 1, /// Services depending on core: lnd, electrs, nbxplorer DependentService = 2, /// Application layer: mempool-api, btcpay-server, fedimint, nextcloud, etc. Application = 3, /// UI/frontend containers: mempool-web, bitcoin-ui, lnd-ui Frontend = 4, } fn container_tier(name: &str) -> StartupTier { let id = name.strip_prefix("archy-").unwrap_or(name); match id { // Tier 0: Databases "btcpay-db" | "mempool-db" | "penpot-postgres" | "immich_postgres" | "immich_redis" | "penpot-valkey" | "endurain-db" | "nextcloud-db" => StartupTier::Database, // Tier 1: Core infrastructure "bitcoin-knots" | "bitcoin-core" | "bitcoin" => StartupTier::CoreInfra, // Tier 2: Dependent services "lnd" | "mempool-electrs" | "electrs" | "nbxplorer" => StartupTier::DependentService, // Tier 4: Frontend/UI "mempool-web" | "bitcoin-ui" | "lnd-ui" | "electrs-ui" | "penpot-frontend" | "penpot-exporter" => StartupTier::Frontend, // Tier 3: Everything else _ => StartupTier::Application, } } /// Track restart attempts per container with exponential backoff and stability reset. struct RestartTracker { attempts: HashMap, last_failure: HashMap, last_healthy: HashMap, } impl RestartTracker { fn new() -> Self { Self { attempts: HashMap::new(), last_failure: HashMap::new(), last_healthy: HashMap::new(), } } /// Record a restart attempt. Returns false if max attempts exceeded. fn record_attempt(&mut self, name: &str) -> bool { let count = self.attempts.entry(name.to_string()).or_insert(0); *count += 1; self.last_failure.insert(name.to_string(), Instant::now()); *count <= MAX_RESTART_ATTEMPTS } /// Clear restart count when a container is healthy again. fn clear(&mut self, name: &str) { self.attempts.remove(name); self.last_failure.remove(name); self.last_healthy.insert(name.to_string(), Instant::now()); } fn attempt_count(&self, name: &str) -> u32 { *self.attempts.get(name).unwrap_or(&0) } /// Get the backoff delay in seconds for the current attempt number. fn backoff_delay_secs(&self, name: &str) -> u64 { let attempts = self.attempt_count(name); if attempts == 0 { return BACKOFF_DELAYS_SECS[0]; } let idx = (attempts as usize).saturating_sub(1).min(BACKOFF_DELAYS_SECS.len() - 1); BACKOFF_DELAYS_SECS[idx] } /// Check if enough time has passed since last failure for the backoff delay. fn backoff_elapsed(&self, name: &str) -> bool { let delay = self.backoff_delay_secs(name); match self.last_failure.get(name) { Some(last) => last.elapsed().as_secs() >= delay, None => true, } } /// Check if a failed container should have its counter reset (1h stability window). fn should_reset_failed(&self, name: &str) -> bool { if self.attempt_count(name) < MAX_RESTART_ATTEMPTS { return false; } match self.last_failure.get(name) { Some(last) => last.elapsed().as_secs() >= STABILITY_RESET_SECS, None => false, } } } #[derive(Debug, Clone)] struct ContainerHealth { name: String, app_id: String, state: String, healthy: bool, } /// Track container memory usage over time for leak detection. struct MemoryTracker { /// Per-container memory samples: (timestamp, rss_bytes) samples: HashMap>, } impl MemoryTracker { fn new() -> Self { Self { samples: HashMap::new(), } } /// Record a memory sample for a container. fn record(&mut self, name: &str, rss_bytes: u64) { let entry = self.samples.entry(name.to_string()).or_default(); entry.push((Instant::now(), rss_bytes)); // Keep only last 288 samples (24h at 5min intervals) if entry.len() > 288 { entry.remove(0); } } /// Check if a container's memory has grown by more than 50% over the tracking period. /// Returns Some(growth_percent) if a leak is detected, None otherwise. fn check_leak(&self, name: &str) -> Option { let samples = self.samples.get(name)?; if samples.len() < 12 { return None; // Need at least 1 hour of data } let (oldest_time, oldest_rss) = samples.first()?; let (_, latest_rss) = samples.last()?; let elapsed_hours = oldest_time.elapsed().as_secs() as f64 / 3600.0; if elapsed_hours < 1.0 || *oldest_rss == 0 { return None; } let growth = (*latest_rss as f64 - *oldest_rss as f64) / *oldest_rss as f64 * 100.0; if growth > 50.0 { Some(growth) } else { None } } fn remove(&mut self, name: &str) { self.samples.remove(name); } } /// Query container memory stats from podman. async fn check_container_memory() -> HashMap { let output = match tokio::process::Command::new("sudo") .args(["podman", "stats", "--no-stream", "--format", "{{.Name}} {{.MemUsage}}"]) .output() .await { Ok(o) if o.status.success() => o, _ => return HashMap::new(), }; let stdout = String::from_utf8_lossy(&output.stdout); let mut result = HashMap::new(); for line in stdout.lines() { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 2 { let name = parts[0].to_string(); // Parse memory like "123.4MiB", "1.2GiB", "45.6kB" let mem_str = parts[1]; if let Some(bytes) = parse_memory_string(mem_str) { result.insert(name, bytes); } } } result } /// Parse memory string like "123.4MiB" or "1.2GiB" to bytes. fn parse_memory_string(s: &str) -> Option { let s = s.trim(); if s.ends_with("GiB") { let num: f64 = s.strip_suffix("GiB")?.parse().ok()?; Some((num * 1_073_741_824.0) as u64) } else if s.ends_with("MiB") { let num: f64 = s.strip_suffix("MiB")?.parse().ok()?; Some((num * 1_048_576.0) as u64) } else if s.ends_with("KiB") || s.ends_with("kB") { let suffix = if s.ends_with("KiB") { "KiB" } else { "kB" }; let num: f64 = s.strip_suffix(suffix)?.parse().ok()?; Some((num * 1024.0) as u64) } else if s.ends_with("B") { let num: f64 = s.strip_suffix('B')?.parse().ok()?; Some(num as u64) } else { None } } /// Query all containers and their health status. async fn check_containers() -> Vec { let output = match tokio::process::Command::new("sudo") .args(["podman", "ps", "-a", "--format", "json"]) .output() .await { Ok(o) if o.status.success() => o, _ => return Vec::new(), }; let stdout = String::from_utf8_lossy(&output.stdout); let containers: Vec = serde_json::from_str(&stdout).unwrap_or_default(); // Backend services to skip let skip = [ "btcpay-db", "nbxplorer", "mempool-db", "mempool-api", "penpot-postgres", "penpot-backend", "penpot-exporter", "penpot-valkey", "penpot-mailcatch", "immich_postgres", "immich_redis", "endurain-db", "nextcloud-db", ]; containers .iter() .filter_map(|c| { let name = c.get("Names").and_then(|v| { if let Some(arr) = v.as_array() { arr.first().and_then(|n| n.as_str()).map(|s| s.to_string()) } else { v.as_str().map(|s| s.to_string()) } })?; let app_id = name .strip_prefix("archy-") .unwrap_or(&name) .to_string(); if skip.contains(&app_id.as_str()) || app_id.ends_with("-ui") { return None; } let state = c.get("State") .and_then(|v| v.as_str()) .unwrap_or("unknown") .to_lowercase(); let healthy = state == "running"; Some(ContainerHealth { name, app_id, state, healthy, }) }) .collect() } /// Try to restart a container. async fn restart_container(name: &str) -> bool { info!("Auto-restarting unhealthy container: {}", name); let result = tokio::process::Command::new("sudo") .args(["podman", "start", name]) .output() .await; match result { Ok(output) if output.status.success() => { info!("Successfully restarted container: {}", name); true } Ok(output) => { let stderr = String::from_utf8_lossy(&output.stderr); warn!("Failed to restart container {}: {}", name, stderr.trim()); false } Err(e) => { warn!("Failed to execute podman start for {}: {}", name, e); false } } } /// Spawn the health monitor background task. pub fn spawn_health_monitor(state: Arc, data_dir: PathBuf) { tokio::spawn(async move { // Wait 2 minutes for containers to start up tokio::time::sleep(std::time::Duration::from_secs(120)).await; let mut tracker = RestartTracker::new(); let mut mem_tracker = MemoryTracker::new(); let mut mem_check_counter: u32 = 0; let mut interval = tokio::time::interval(std::time::Duration::from_secs(CHECK_INTERVAL_SECS)); loop { interval.tick().await; mem_check_counter += 1; // Check container memory every 5 minutes (every 5th health check) if mem_check_counter % 5 == 0 { let mem_stats = check_container_memory().await; for (name, rss) in &mem_stats { mem_tracker.record(name, *rss); if let Some(growth) = mem_tracker.check_leak(name) { warn!("Potential memory leak in {}: {:.0}% growth over tracking period", name, growth); } } } let containers = check_containers().await; if containers.is_empty() { continue; } // Sort containers by startup tier so databases restart before dependent services let mut unhealthy: Vec<&ContainerHealth> = Vec::new(); let mut state_changed = false; let (mut data, _) = state.get_snapshot().await; for container in &containers { if container.healthy { if tracker.attempt_count(&container.name) > 0 { info!("Container {} is healthy again after restart", container.name); tracker.clear(&container.name); } continue; } if container.state == "exited" || container.state == "stopped" { unhealthy.push(container); } } // Sort by startup tier: databases first, then core, then dependent, then apps, then UIs unhealthy.sort_by_key(|c| container_tier(&c.name)); let mut prev_tier: Option = None; for container in &unhealthy { let tier = container_tier(&container.name); let attempts = tracker.attempt_count(&container.name); // Reset counter after 1 hour for permanently failed containers if tracker.should_reset_failed(&container.name) { info!("Resetting restart counter for {} after {}s stability window", container.name, STABILITY_RESET_SECS); tracker.clear(&container.name); } if tracker.attempt_count(&container.name) >= MAX_RESTART_ATTEMPTS { debug!("Container {} exceeded max restart attempts ({})", container.name, MAX_RESTART_ATTEMPTS); continue; } // Wait for backoff delay before retrying if !tracker.backoff_elapsed(&container.name) { let delay = tracker.backoff_delay_secs(&container.name); debug!("Container {} waiting for backoff ({}s)", container.name, delay); continue; } // When transitioning to a higher tier, wait briefly for previous tier to stabilize if let Some(prev) = prev_tier { if tier > prev { tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } prev_tier = Some(tier); if tracker.record_attempt(&container.name) { let attempt = tracker.attempt_count(&container.name); info!("Restarting {} (tier {:?}, attempt {}/{}, backoff {}s)", container.name, tier, attempt, MAX_RESTART_ATTEMPTS, BACKOFF_DELAYS_SECS.get(attempt.saturating_sub(1) as usize).unwrap_or(&90)); let restarted = restart_container(&container.name).await; if !restarted || attempt >= MAX_RESTART_ATTEMPTS { let notification = Notification { id: format!("health-{}-{}", container.app_id, chrono::Utc::now().timestamp()), level: NotificationLevel::Error, title: format!("{} is unhealthy", container.app_id), message: if restarted { format!( "Container restarted ({}/{} attempts). May need manual attention.", attempt, MAX_RESTART_ATTEMPTS ) } else { format!( "Auto-restart failed (attempt {}/{}). Container state: {}", attempt, MAX_RESTART_ATTEMPTS, container.state ) }, timestamp: chrono::Utc::now().to_rfc3339(), app_id: Some(container.app_id.clone()), }; data.notifications.push(notification.clone()); if data.notifications.len() > 20 { data.notifications = data.notifications.split_off(data.notifications.len() - 20); } state_changed = true; let webhook_payload = webhooks::WebhookPayload { event: WebhookEvent::ContainerCrash, title: notification.title, message: notification.message, timestamp: notification.timestamp, node_id: String::new(), details: Some(serde_json::json!({ "container": container.name, "app_id": container.app_id, "state": container.state, "attempt": attempt, "tier": format!("{:?}", tier), })), }; webhooks::send_webhook(&data_dir, webhook_payload).await; } } } if state_changed { state.update_data(data).await; debug!("Health monitor: state updated with notifications"); } } }); } #[cfg(test)] mod tests { use super::*; #[test] fn test_restart_tracker_new_is_empty() { let tracker = RestartTracker::new(); assert_eq!(tracker.attempt_count("any-container"), 0); } #[test] fn test_restart_tracker_record_attempt_increments() { let mut tracker = RestartTracker::new(); assert!(tracker.record_attempt("test-container")); assert_eq!(tracker.attempt_count("test-container"), 1); assert!(tracker.record_attempt("test-container")); assert_eq!(tracker.attempt_count("test-container"), 2); assert!(tracker.record_attempt("test-container")); assert_eq!(tracker.attempt_count("test-container"), 3); } #[test] fn test_restart_tracker_max_attempts_exceeded() { let mut tracker = RestartTracker::new(); for i in 1..=MAX_RESTART_ATTEMPTS { assert!( tracker.record_attempt("container-a"), "Attempt {} should be allowed", i ); } assert!(!tracker.record_attempt("container-a")); assert_eq!(tracker.attempt_count("container-a"), MAX_RESTART_ATTEMPTS + 1); } #[test] fn test_restart_tracker_independent_containers() { let mut tracker = RestartTracker::new(); tracker.record_attempt("container-a"); tracker.record_attempt("container-a"); tracker.record_attempt("container-b"); assert_eq!(tracker.attempt_count("container-a"), 2); assert_eq!(tracker.attempt_count("container-b"), 1); assert_eq!(tracker.attempt_count("container-c"), 0); } #[test] fn test_restart_tracker_clear_resets_count() { let mut tracker = RestartTracker::new(); tracker.record_attempt("container-x"); tracker.record_attempt("container-x"); assert_eq!(tracker.attempt_count("container-x"), 2); tracker.clear("container-x"); assert_eq!(tracker.attempt_count("container-x"), 0); } #[test] fn test_restart_tracker_clear_allows_new_attempts() { let mut tracker = RestartTracker::new(); for _ in 0..=MAX_RESTART_ATTEMPTS { tracker.record_attempt("container-y"); } assert!(!tracker.record_attempt("container-y")); tracker.clear("container-y"); assert!(tracker.record_attempt("container-y")); assert_eq!(tracker.attempt_count("container-y"), 1); } #[test] fn test_restart_tracker_clear_nonexistent_is_safe() { let mut tracker = RestartTracker::new(); tracker.clear("nonexistent"); assert_eq!(tracker.attempt_count("nonexistent"), 0); } #[test] fn test_container_health_struct() { let health = ContainerHealth { name: "archy-bitcoin-knots".to_string(), app_id: "bitcoin-knots".to_string(), state: "running".to_string(), healthy: true, }; assert!(health.healthy); assert_eq!(health.name, "archy-bitcoin-knots"); assert_eq!(health.app_id, "bitcoin-knots"); assert_eq!(health.state, "running"); } #[test] fn test_container_health_unhealthy() { let health = ContainerHealth { name: "archy-mempool-web".to_string(), app_id: "mempool-web".to_string(), state: "exited".to_string(), healthy: false, }; assert!(!health.healthy); assert_eq!(health.state, "exited"); } #[test] fn test_max_restart_attempts_constant() { assert!(MAX_RESTART_ATTEMPTS >= 1); assert!(MAX_RESTART_ATTEMPTS <= 10); assert_eq!(MAX_RESTART_ATTEMPTS, 3); } #[test] fn test_check_interval_constant() { assert_eq!(CHECK_INTERVAL_SECS, 60); } #[test] fn test_backoff_delays() { let tracker = RestartTracker::new(); // Before any attempts, delay is first backoff assert_eq!(tracker.backoff_delay_secs("test"), BACKOFF_DELAYS_SECS[0]); } #[test] fn test_backoff_delays_escalate() { let mut tracker = RestartTracker::new(); tracker.record_attempt("test"); assert_eq!(tracker.backoff_delay_secs("test"), BACKOFF_DELAYS_SECS[0]); // 10s tracker.record_attempt("test"); assert_eq!(tracker.backoff_delay_secs("test"), BACKOFF_DELAYS_SECS[1]); // 30s tracker.record_attempt("test"); assert_eq!(tracker.backoff_delay_secs("test"), BACKOFF_DELAYS_SECS[2]); // 90s } #[test] fn test_backoff_elapsed_true_for_new() { let tracker = RestartTracker::new(); assert!(tracker.backoff_elapsed("test")); } #[test] fn test_stability_reset_not_triggered_early() { let mut tracker = RestartTracker::new(); tracker.record_attempt("test"); assert!(!tracker.should_reset_failed("test")); } #[test] fn test_container_tier_database() { assert_eq!(container_tier("archy-btcpay-db"), StartupTier::Database); assert_eq!(container_tier("immich_postgres"), StartupTier::Database); assert_eq!(container_tier("penpot-valkey"), StartupTier::Database); } #[test] fn test_container_tier_core() { assert_eq!(container_tier("bitcoin-knots"), StartupTier::CoreInfra); } #[test] fn test_container_tier_dependent() { assert_eq!(container_tier("lnd"), StartupTier::DependentService); assert_eq!(container_tier("mempool-electrs"), StartupTier::DependentService); assert_eq!(container_tier("archy-nbxplorer"), StartupTier::DependentService); } #[test] fn test_container_tier_frontend() { assert_eq!(container_tier("archy-mempool-web"), StartupTier::Frontend); assert_eq!(container_tier("archy-bitcoin-ui"), StartupTier::Frontend); } #[test] fn test_container_tier_application_default() { assert_eq!(container_tier("nextcloud"), StartupTier::Application); assert_eq!(container_tier("grafana"), StartupTier::Application); assert_eq!(container_tier("btcpay-server"), StartupTier::Application); } #[test] fn test_tier_ordering() { assert!(StartupTier::Database < StartupTier::CoreInfra); assert!(StartupTier::CoreInfra < StartupTier::DependentService); assert!(StartupTier::DependentService < StartupTier::Application); assert!(StartupTier::Application < StartupTier::Frontend); } #[test] fn test_parse_memory_gib() { assert_eq!(parse_memory_string("1.5GiB"), Some(1_610_612_736)); } #[test] fn test_parse_memory_mib() { assert_eq!(parse_memory_string("256MiB"), Some(268_435_456)); } #[test] fn test_parse_memory_kib() { assert_eq!(parse_memory_string("512KiB"), Some(524_288)); } #[test] fn test_parse_memory_invalid() { assert_eq!(parse_memory_string("abc"), None); } #[test] fn test_memory_tracker_no_leak_few_samples() { let mut tracker = MemoryTracker::new(); tracker.record("test", 100_000_000); assert!(tracker.check_leak("test").is_none()); } }