pub mod collector; use crate::webhooks::{self, WebhookEvent, WebhookPayload}; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::path::PathBuf; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use tokio::sync::RwLock; use tracing::{debug, info, warn}; const ALERT_RULES_FILE: &str = "alert-rules.json"; /// Maximum entries at 1-minute resolution (24 hours = 1440 minutes) const MAX_1MIN_ENTRIES: usize = 1440; /// Maximum entries at 15-minute resolution (7 days = 672 quarter-hours) const MAX_15MIN_ENTRIES: usize = 672; /// A single metrics snapshot collected at a point in time. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MetricSnapshot { pub timestamp: i64, pub system: SystemMetrics, pub containers: Vec, pub rpc_latency_ms: f64, pub ws_connections: u32, } /// System-wide resource metrics. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SystemMetrics { pub cpu_percent: f64, pub mem_used_bytes: u64, pub mem_total_bytes: u64, pub disk_used_bytes: u64, pub disk_total_bytes: u64, pub net_rx_bytes: u64, pub net_tx_bytes: u64, pub load_avg_1: f64, pub load_avg_5: f64, pub load_avg_15: f64, } /// Per-container resource metrics. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ContainerMetrics { pub name: String, pub cpu_percent: f64, pub mem_used_bytes: u64, pub mem_limit_bytes: u64, pub net_rx_bytes: u64, pub net_tx_bytes: u64, pub block_read_bytes: u64, pub block_write_bytes: u64, } /// Maximum number of fired alerts to keep in history. const MAX_ALERT_HISTORY: usize = 100; /// Types of alert rules the system can evaluate. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "snake_case")] pub enum AlertRuleKind { DiskUsage, RamUsage, ContainerCrash, BackendErrorSpike, SslCertExpiry, } /// A configured alert rule with threshold and enabled state. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AlertRule { pub kind: AlertRuleKind, pub threshold: f64, pub enabled: bool, pub description: String, } /// A fired alert instance. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FiredAlert { pub id: String, pub kind: AlertRuleKind, pub message: String, pub value: f64, pub threshold: f64, pub timestamp: i64, pub acknowledged: bool, } impl AlertRule { fn default_rules() -> Vec { vec![ AlertRule { kind: AlertRuleKind::DiskUsage, threshold: 90.0, enabled: true, description: "Disk usage exceeds threshold".to_string(), }, AlertRule { kind: AlertRuleKind::RamUsage, threshold: 90.0, enabled: true, description: "RAM usage exceeds threshold".to_string(), }, AlertRule { kind: AlertRuleKind::ContainerCrash, threshold: 1.0, enabled: true, description: "Container stopped unexpectedly".to_string(), }, AlertRule { kind: AlertRuleKind::BackendErrorSpike, threshold: 500.0, enabled: true, description: "RPC latency exceeds threshold (ms)".to_string(), }, AlertRule { kind: AlertRuleKind::SslCertExpiry, threshold: 30.0, enabled: true, description: "SSL certificate expires within N days".to_string(), }, ] } } /// Thread-safe metrics store with ring buffers at two resolutions. pub struct MetricsStore { minute_data: RwLock>, quarter_hour_data: RwLock>, minute_count: RwLock, rpc_latency: RwLock<(f64, u64)>, ws_connections: AtomicU32, alert_rules: RwLock>, fired_alerts: RwLock>, data_dir: Option, } impl MetricsStore { pub fn new() -> Self { Self { minute_data: RwLock::new(VecDeque::with_capacity(MAX_1MIN_ENTRIES)), quarter_hour_data: RwLock::new(VecDeque::with_capacity(MAX_15MIN_ENTRIES)), minute_count: RwLock::new(0), rpc_latency: RwLock::new((0.0, 0)), ws_connections: AtomicU32::new(0), alert_rules: RwLock::new(AlertRule::default_rules()), fired_alerts: RwLock::new(VecDeque::with_capacity(MAX_ALERT_HISTORY)), data_dir: None, } } /// Create a MetricsStore that persists alert rules to disk. pub fn with_data_dir(data_dir: PathBuf) -> Self { let rules = load_alert_rules_sync(&data_dir); Self { minute_data: RwLock::new(VecDeque::with_capacity(MAX_1MIN_ENTRIES)), quarter_hour_data: RwLock::new(VecDeque::with_capacity(MAX_15MIN_ENTRIES)), minute_count: RwLock::new(0), rpc_latency: RwLock::new((0.0, 0)), ws_connections: AtomicU32::new(0), alert_rules: RwLock::new(rules), fired_alerts: RwLock::new(VecDeque::with_capacity(MAX_ALERT_HISTORY)), data_dir: Some(data_dir), } } /// Record a new metric snapshot (called every minute by collector). pub async fn push(&self, mut snapshot: MetricSnapshot) { // Fill in RPC latency from accumulated samples { let mut latency = self.rpc_latency.write().await; if latency.1 > 0 { snapshot.rpc_latency_ms = (latency.0 / latency.1 as f64 * 10.0).round() / 10.0; *latency = (0.0, 0); } } // Fill in current WS connection count snapshot.ws_connections = self.ws_connections.load(Ordering::Relaxed); // Push to 1-minute ring buffer { let mut buf = self.minute_data.write().await; if buf.len() >= MAX_1MIN_ENTRIES { buf.pop_front(); } buf.push_back(snapshot.clone()); } // Every 15 minutes, push to quarter-hour ring buffer { let mut count = self.minute_count.write().await; *count += 1; if *count >= 15 { *count = 0; let mut buf = self.quarter_hour_data.write().await; if buf.len() >= MAX_15MIN_ENTRIES { buf.pop_front(); } buf.push_back(snapshot); } } } /// Record an RPC request latency sample (milliseconds). pub async fn record_rpc_latency(&self, latency_ms: f64) { let mut data = self.rpc_latency.write().await; data.0 += latency_ms; data.1 += 1; } /// Increment WebSocket connection count (called on connect). pub fn increment_ws(&self) { self.ws_connections.fetch_add(1, Ordering::Relaxed); } /// Decrement WebSocket connection count (called on disconnect). pub fn decrement_ws(&self) { // Use saturating semantics to avoid underflow let _ = self.ws_connections.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| { if v > 0 { Some(v - 1) } else { Some(0) } }); } /// Get the latest snapshot. pub async fn latest(&self) -> Option { self.minute_data.read().await.back().cloned() } /// Get minute-resolution data for the last N minutes. pub async fn history_minutes(&self, last_n: usize) -> Vec { let buf = self.minute_data.read().await; let start = buf.len().saturating_sub(last_n); buf.iter().skip(start).cloned().collect() } /// Get quarter-hour-resolution data for the last N entries. pub async fn history_quarter_hours(&self, last_n: usize) -> Vec { let buf = self.quarter_hour_data.read().await; let start = buf.len().saturating_sub(last_n); buf.iter().skip(start).cloned().collect() } /// Get the current alert rules. pub async fn get_alert_rules(&self) -> Vec { self.alert_rules.read().await.clone() } /// Update an alert rule by kind and persist to disk. pub async fn update_alert_rule(&self, kind: &AlertRuleKind, enabled: Option, threshold: Option) { let mut rules = self.alert_rules.write().await; if let Some(rule) = rules.iter_mut().find(|r| &r.kind == kind) { if let Some(e) = enabled { rule.enabled = e; } if let Some(t) = threshold { rule.threshold = t; } } // Persist to disk so changes survive restarts if let Some(ref dir) = self.data_dir { if let Err(e) = save_alert_rules(dir, &rules).await { warn!("Failed to persist alert rules: {}", e); } } } /// Get fired alert history. pub async fn get_fired_alerts(&self, last_n: usize) -> Vec { let buf = self.fired_alerts.read().await; let start = buf.len().saturating_sub(last_n); buf.iter().skip(start).cloned().collect() } /// Acknowledge a fired alert by id. pub async fn acknowledge_alert(&self, alert_id: &str) -> bool { let mut buf = self.fired_alerts.write().await; if let Some(alert) = buf.iter_mut().find(|a| a.id == alert_id) { alert.acknowledged = true; true } else { false } } /// Evaluate alert rules against a snapshot and return any new alerts. pub async fn check_alerts(&self, snapshot: &MetricSnapshot) -> Vec { let rules = self.alert_rules.read().await; let mut new_alerts = Vec::new(); let ts = snapshot.timestamp; for rule in rules.iter() { if !rule.enabled { continue; } match rule.kind { AlertRuleKind::DiskUsage => { if snapshot.system.disk_total_bytes > 0 { let pct = (snapshot.system.disk_used_bytes as f64 / snapshot.system.disk_total_bytes as f64) * 100.0; if pct > rule.threshold { new_alerts.push(FiredAlert { id: format!("disk-{}", ts), kind: AlertRuleKind::DiskUsage, message: format!("Disk usage at {:.1}% (threshold: {:.0}%)", pct, rule.threshold), value: pct, threshold: rule.threshold, timestamp: ts, acknowledged: false, }); } } } AlertRuleKind::RamUsage => { if snapshot.system.mem_total_bytes > 0 { let pct = (snapshot.system.mem_used_bytes as f64 / snapshot.system.mem_total_bytes as f64) * 100.0; if pct > rule.threshold { new_alerts.push(FiredAlert { id: format!("ram-{}", ts), kind: AlertRuleKind::RamUsage, message: format!("RAM usage at {:.1}% (threshold: {:.0}%)", pct, rule.threshold), value: pct, threshold: rule.threshold, timestamp: ts, acknowledged: false, }); } } } AlertRuleKind::BackendErrorSpike => { if snapshot.rpc_latency_ms > rule.threshold { new_alerts.push(FiredAlert { id: format!("latency-{}", ts), kind: AlertRuleKind::BackendErrorSpike, message: format!( "RPC latency at {:.0}ms (threshold: {:.0}ms)", snapshot.rpc_latency_ms, rule.threshold ), value: snapshot.rpc_latency_ms, threshold: rule.threshold, timestamp: ts, acknowledged: false, }); } } // ContainerCrash and SslCertExpiry are checked via dedicated paths _ => {} } } // Store fired alerts if !new_alerts.is_empty() { let mut buf = self.fired_alerts.write().await; for alert in &new_alerts { if buf.len() >= MAX_ALERT_HISTORY { buf.pop_front(); } buf.push_back(alert.clone()); } } new_alerts } /// Fire a container crash alert (called by health monitor). pub async fn fire_container_alert(&self, container_name: &str, state: &str) { let rules = self.alert_rules.read().await; let enabled = rules .iter() .any(|r| r.kind == AlertRuleKind::ContainerCrash && r.enabled); drop(rules); if !enabled { return; } let ts = chrono::Utc::now().timestamp(); let alert = FiredAlert { id: format!("container-{}-{}", container_name, ts), kind: AlertRuleKind::ContainerCrash, message: format!("Container '{}' is {} — may need attention", container_name, state), value: 1.0, threshold: 1.0, timestamp: ts, acknowledged: false, }; let mut buf = self.fired_alerts.write().await; if buf.len() >= MAX_ALERT_HISTORY { buf.pop_front(); } buf.push_back(alert); } /// Fire an SSL cert expiry alert. pub async fn fire_ssl_alert(&self, days_remaining: f64) { let rules = self.alert_rules.read().await; let threshold = rules .iter() .find(|r| r.kind == AlertRuleKind::SslCertExpiry && r.enabled) .map(|r| r.threshold); drop(rules); let threshold = match threshold { Some(t) if days_remaining < t => t, _ => return, }; let ts = chrono::Utc::now().timestamp(); let alert = FiredAlert { id: format!("ssl-{}", ts), kind: AlertRuleKind::SslCertExpiry, message: format!( "SSL certificate expires in {:.0} days (threshold: {:.0} days)", days_remaining, threshold ), value: days_remaining, threshold, timestamp: ts, acknowledged: false, }; let mut buf = self.fired_alerts.write().await; if buf.len() >= MAX_ALERT_HISTORY { buf.pop_front(); } buf.push_back(alert); } } /// Load alert rules from disk, falling back to defaults if file missing or corrupt. fn load_alert_rules_sync(data_dir: &std::path::Path) -> Vec { let path = data_dir.join(ALERT_RULES_FILE); match std::fs::read_to_string(&path) { Ok(content) => match serde_json::from_str::>(&content) { Ok(saved) => { // Merge with defaults: use saved enabled/threshold, add any new rule kinds let defaults = AlertRule::default_rules(); let mut merged = Vec::new(); for default in &defaults { if let Some(saved_rule) = saved.iter().find(|r| r.kind == default.kind) { merged.push(AlertRule { kind: default.kind.clone(), threshold: saved_rule.threshold, enabled: saved_rule.enabled, description: default.description.clone(), }); } else { merged.push(default.clone()); } } info!("Loaded alert rules from {}", path.display()); merged } Err(e) => { warn!("Failed to parse alert rules ({}), using defaults", e); AlertRule::default_rules() } }, Err(_) => AlertRule::default_rules(), } } /// Save alert rules to disk. async fn save_alert_rules(data_dir: &std::path::Path, rules: &[AlertRule]) -> anyhow::Result<()> { tokio::fs::create_dir_all(data_dir).await?; let content = serde_json::to_string_pretty(rules)?; tokio::fs::write(data_dir.join(ALERT_RULES_FILE), content).await?; Ok(()) } /// Spawn the background metrics collector (runs every 60 seconds). /// Also evaluates alert rules on each snapshot and pushes notifications. pub fn spawn_metrics_collector( store: Arc, state: Option>, data_dir: Option, ) { tokio::spawn(async move { // Wait 30s for system to stabilize after boot tokio::time::sleep(std::time::Duration::from_secs(30)).await; let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); loop { interval.tick().await; match collector::collect_snapshot().await { Ok(snapshot) => { // Check alert rules before pushing (needs snapshot data) let alerts = store.check_alerts(&snapshot).await; store.push(snapshot).await; debug!("Metrics snapshot collected"); // Push alert notifications via WebSocket if !alerts.is_empty() { if let Some(ref state_mgr) = state { let (mut data, _rev) = state_mgr.get_snapshot().await; for alert in &alerts { let level = match alert.kind { AlertRuleKind::DiskUsage | AlertRuleKind::RamUsage => { if alert.value > 95.0 { crate::data_model::NotificationLevel::Error } else { crate::data_model::NotificationLevel::Warning } } AlertRuleKind::ContainerCrash => { crate::data_model::NotificationLevel::Error } _ => crate::data_model::NotificationLevel::Warning, }; let notification = crate::data_model::Notification { id: alert.id.clone(), level, title: format!("{:?} Alert", alert.kind), message: alert.message.clone(), timestamp: chrono::Utc::now().to_rfc3339(), app_id: None, }; data.notifications.push(notification); } // Keep max 20 notifications while data.notifications.len() > 20 { data.notifications.remove(0); } state_mgr.update_data(data).await; info!("Fired {} alert(s)", alerts.len()); } // Fire-and-forget webhook delivery for mapped alert types if let Some(ref dir) = data_dir { for alert in &alerts { let event = match alert.kind { AlertRuleKind::DiskUsage => Some(WebhookEvent::DiskWarning), AlertRuleKind::ContainerCrash => Some(WebhookEvent::ContainerCrash), _ => None, }; if let Some(event) = event { let payload = WebhookPayload { event, title: format!("{:?} Alert", alert.kind), message: alert.message.clone(), timestamp: chrono::Utc::now().to_rfc3339(), node_id: String::new(), details: Some(serde_json::json!({ "value": alert.value, "threshold": alert.threshold, })), }; webhooks::send_webhook(dir, payload).await; } } } } } Err(e) => { warn!("Failed to collect metrics: {}", e); } } } }); } #[cfg(test)] mod tests { use super::*; #[test] fn test_metrics_store_new() { let store = MetricsStore::new(); assert_eq!(store.ws_connections.load(Ordering::Relaxed), 0); } #[test] fn test_ws_connection_tracking() { let store = MetricsStore::new(); store.increment_ws(); store.increment_ws(); assert_eq!(store.ws_connections.load(Ordering::Relaxed), 2); store.decrement_ws(); assert_eq!(store.ws_connections.load(Ordering::Relaxed), 1); store.decrement_ws(); assert_eq!(store.ws_connections.load(Ordering::Relaxed), 0); // Decrement below zero should stay at 0 store.decrement_ws(); assert_eq!(store.ws_connections.load(Ordering::Relaxed), 0); } #[tokio::test] async fn test_push_and_latest() { let store = MetricsStore::new(); assert!(store.latest().await.is_none()); let snapshot = MetricSnapshot { timestamp: 1000, system: SystemMetrics { cpu_percent: 25.0, mem_used_bytes: 1_000_000, mem_total_bytes: 4_000_000, disk_used_bytes: 500_000, disk_total_bytes: 1_000_000, net_rx_bytes: 100, net_tx_bytes: 200, load_avg_1: 1.0, load_avg_5: 0.5, load_avg_15: 0.3, }, containers: vec![], rpc_latency_ms: 0.0, ws_connections: 0, }; store.push(snapshot).await; let latest = store.latest().await.unwrap(); assert_eq!(latest.timestamp, 1000); assert_eq!(latest.system.cpu_percent, 25.0); } #[tokio::test] async fn test_rpc_latency_recording() { let store = MetricsStore::new(); store.record_rpc_latency(10.0).await; store.record_rpc_latency(20.0).await; store.record_rpc_latency(30.0).await; let snapshot = MetricSnapshot { timestamp: 2000, system: SystemMetrics { cpu_percent: 0.0, mem_used_bytes: 0, mem_total_bytes: 0, disk_used_bytes: 0, disk_total_bytes: 0, net_rx_bytes: 0, net_tx_bytes: 0, load_avg_1: 0.0, load_avg_5: 0.0, load_avg_15: 0.0, }, containers: vec![], rpc_latency_ms: 0.0, ws_connections: 0, }; store.push(snapshot).await; let latest = store.latest().await.unwrap(); assert_eq!(latest.rpc_latency_ms, 20.0); // average of 10+20+30 = 20 } #[tokio::test] async fn test_history_minutes() { let store = MetricsStore::new(); for i in 0..5 { let snapshot = MetricSnapshot { timestamp: i * 60, system: SystemMetrics { cpu_percent: i as f64, mem_used_bytes: 0, mem_total_bytes: 0, disk_used_bytes: 0, disk_total_bytes: 0, net_rx_bytes: 0, net_tx_bytes: 0, load_avg_1: 0.0, load_avg_5: 0.0, load_avg_15: 0.0, }, containers: vec![], rpc_latency_ms: 0.0, ws_connections: 0, }; store.push(snapshot).await; } let history = store.history_minutes(3).await; assert_eq!(history.len(), 3); assert_eq!(history[0].timestamp, 120); assert_eq!(history[2].timestamp, 240); } #[tokio::test] async fn test_ring_buffer_eviction() { let store = MetricsStore::new(); // Push more than MAX_1MIN_ENTRIES for i in 0..(MAX_1MIN_ENTRIES + 10) { let snapshot = MetricSnapshot { timestamp: i as i64, system: SystemMetrics { cpu_percent: 0.0, mem_used_bytes: 0, mem_total_bytes: 0, disk_used_bytes: 0, disk_total_bytes: 0, net_rx_bytes: 0, net_tx_bytes: 0, load_avg_1: 0.0, load_avg_5: 0.0, load_avg_15: 0.0, }, containers: vec![], rpc_latency_ms: 0.0, ws_connections: 0, }; store.push(snapshot).await; } let all = store.history_minutes(MAX_1MIN_ENTRIES + 100).await; assert_eq!(all.len(), MAX_1MIN_ENTRIES); // Oldest entry should be 10 (first 10 were evicted) assert_eq!(all[0].timestamp, 10); } #[tokio::test] async fn test_quarter_hour_downsampling() { let store = MetricsStore::new(); // Push exactly 15 entries to trigger one quarter-hour sample for i in 0..15 { let snapshot = MetricSnapshot { timestamp: i * 60, system: SystemMetrics { cpu_percent: 50.0, mem_used_bytes: 0, mem_total_bytes: 0, disk_used_bytes: 0, disk_total_bytes: 0, net_rx_bytes: 0, net_tx_bytes: 0, load_avg_1: 0.0, load_avg_5: 0.0, load_avg_15: 0.0, }, containers: vec![], rpc_latency_ms: 0.0, ws_connections: 0, }; store.push(snapshot).await; } let qh = store.history_quarter_hours(10).await; assert_eq!(qh.len(), 1); assert_eq!(qh[0].timestamp, 14 * 60); // The 15th entry (index 14) } #[test] fn test_constants() { assert_eq!(MAX_1MIN_ENTRIES, 1440); assert_eq!(MAX_15MIN_ENTRIES, 672); } }