From bce70b13f2d94e00c857e8fe4295ed7be4b59ada Mon Sep 17 00:00:00 2001 From: Dorian Date: Wed, 18 Mar 2026 23:36:57 +0000 Subject: [PATCH] =?UTF-8?q?feat(TASK-12):=20periodic=20telemetry=20reporte?= =?UTF-8?q?r=20=E2=80=94=2015min=20interval,=20collector=20POST?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Background task spawned on server startup: every 15 minutes, checks opt-in status, builds anonymous health report (node ID hash, version, uptime, CPU/RAM/disk %, container states, recent alerts), saves to disk, and POSTs to TELEMETRY_COLLECTOR_URL env var if configured. Non-fatal on failure. Fixed FiredAlert field references (kind not rule_type, timestamp not fired_at) in both monitoring and analytics modules. Co-Authored-By: Claude Opus 4.6 (1M context) --- core/archipelago/src/api/rpc/analytics.rs | 7 +- core/archipelago/src/monitoring/mod.rs | 143 ++++++++++++++++++++++ core/archipelago/src/server.rs | 8 ++ 3 files changed, 154 insertions(+), 4 deletions(-) diff --git a/core/archipelago/src/api/rpc/analytics.rs b/core/archipelago/src/api/rpc/analytics.rs index 6be413d9..4826958f 100644 --- a/core/archipelago/src/api/rpc/analytics.rs +++ b/core/archipelago/src/api/rpc/analytics.rs @@ -172,13 +172,12 @@ impl RpcHandler { .unwrap_or(0); // Recent alerts from metrics store - let recent_alerts: Vec = self.metrics_store.get_alerts().await + let recent_alerts: Vec = self.metrics_store.get_fired_alerts(10).await .into_iter() - .take(10) .map(|a| serde_json::json!({ - "rule": format!("{:?}", a.rule_type), + "rule": format!("{:?}", a.kind), "message": a.message, - "fired_at": a.fired_at.to_rfc3339(), + "timestamp": a.timestamp, })) .collect(); diff --git a/core/archipelago/src/monitoring/mod.rs b/core/archipelago/src/monitoring/mod.rs index 5199d97f..16e4199f 100644 --- a/core/archipelago/src/monitoring/mod.rs +++ b/core/archipelago/src/monitoring/mod.rs @@ -599,6 +599,149 @@ pub fn spawn_metrics_collector( }); } +/// Spawn the periodic telemetry reporter (runs every 15 minutes when opt-in enabled). +/// Collects anonymous health data and saves to disk. Posts to central collector if configured. +pub fn spawn_telemetry_reporter( + store: Arc, + state: Option>, + data_dir: PathBuf, +) { + tokio::spawn(async move { + // Wait 60s for system to fully stabilize + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + + let mut interval = tokio::time::interval(std::time::Duration::from_secs(900)); // 15 min + + loop { + interval.tick().await; + + // Check if telemetry is opted in + let config_path = data_dir.join("analytics-config.json"); + let enabled = match tokio::fs::read_to_string(&config_path).await { + Ok(data) => serde_json::from_str::(&data) + .ok() + .and_then(|c| c["enabled"].as_bool()) + .unwrap_or(false), + Err(_) => false, + }; + if !enabled { + debug!("Telemetry disabled — skipping report"); + continue; + } + + match build_telemetry_report(&store, &state, &data_dir).await { + Ok(report) => { + // Save latest report to disk + let report_path = data_dir.join("telemetry-latest.json"); + if let Ok(json) = serde_json::to_string_pretty(&report) { + let _ = tokio::fs::write(&report_path, &json).await; + } + + // POST to central collector if configured + let collector_url = std::env::var("TELEMETRY_COLLECTOR_URL").ok(); + if let Some(url) = collector_url { + match post_telemetry_report(&url, &report).await { + Ok(_) => info!("Telemetry report sent to collector"), + Err(e) => warn!("Failed to send telemetry report: {}", e), + } + } else { + debug!("Telemetry report saved locally (no collector URL configured)"); + } + } + Err(e) => { + warn!("Failed to build telemetry report: {}", e); + } + } + } + }); +} + +/// Build an anonymous telemetry report from current system state. +async fn build_telemetry_report( + store: &Arc, + state: &Option>, + data_dir: &std::path::Path, +) -> anyhow::Result { + // Anonymous node ID — truncated SHA-256 hash of pubkey + let (node_id, version, container_count, running_count, peer_count) = if let Some(ref sm) = state { + let (data, _) = sm.get_snapshot().await; + let id = { + use sha2::{Sha256, Digest}; + let mut h = Sha256::new(); + h.update(data.server_info.pubkey.as_bytes()); + hex::encode(h.finalize())[..16].to_string() + }; + let running = data.package_data.values() + .filter(|p| matches!(p.state, crate::data_model::PackageState::Running)) + .count(); + (id, data.server_info.version.clone(), data.package_data.len(), running, data.peer_health.len()) + } else { + ("unknown".to_string(), "unknown".to_string(), 0, 0, 0) + }; + + // System info + let cpu_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(0); + let uptime_secs = tokio::fs::read_to_string("/proc/uptime").await + .ok() + .and_then(|s| s.split_whitespace().next()?.parse::().ok()) + .map(|f| f as u64) + .unwrap_or(0); + + // Latest metrics snapshot + let latest = store.latest().await; + let (cpu_pct, mem_pct, disk_pct) = latest.map(|s| { + let mem_total = s.system.mem_total_bytes as f64; + let disk_total = s.system.disk_total_bytes as f64; + ( + s.system.cpu_usage_percent, + if mem_total > 0.0 { (s.system.mem_used_bytes as f64 / mem_total) * 100.0 } else { 0.0 }, + if disk_total > 0.0 { (s.system.disk_used_bytes as f64 / disk_total) * 100.0 } else { 0.0 }, + ) + }).unwrap_or((0.0, 0.0, 0.0)); + + // Recent alerts + let recent_alerts: Vec = store.get_fired_alerts(10).await + .into_iter() + .map(|a| serde_json::json!({ + "rule": format!("{:?}", a.kind), + "message": a.message, + "timestamp": a.timestamp, + })) + .collect(); + + Ok(serde_json::json!({ + "node_id": node_id, + "version": version, + "uptime_secs": uptime_secs, + "cpu_cores": cpu_cores, + "cpu_pct": (cpu_pct * 10.0).round() / 10.0, + "mem_pct": (mem_pct * 10.0).round() / 10.0, + "disk_pct": (disk_pct * 10.0).round() / 10.0, + "container_count": container_count, + "running_count": running_count, + "federation_peers": peer_count, + "recent_alerts": recent_alerts, + "reported_at": chrono::Utc::now().to_rfc3339(), + })) +} + +/// POST a telemetry report to the central collector. +async fn post_telemetry_report(url: &str, report: &serde_json::Value) -> anyhow::Result<()> { + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .build()?; + let response = client.post(url) + .header("Content-Type", "application/json") + .header("User-Agent", "Archipelago-Telemetry/1.0") + .json(report) + .send() + .await?; + if !response.status().is_success() { + anyhow::bail!("Collector returned {}", response.status()); + } + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/archipelago/src/server.rs b/core/archipelago/src/server.rs index 00e5cb3f..7fcd972f 100644 --- a/core/archipelago/src/server.rs +++ b/core/archipelago/src/server.rs @@ -103,6 +103,7 @@ impl Server { // Create metrics store and spawn background collector let metrics_store = Arc::new(MetricsStore::with_data_dir(config.data_dir.clone())); + let metrics_for_telemetry = metrics_store.clone(); crate::monitoring::spawn_metrics_collector(metrics_store.clone(), Some(state_manager.clone()), Some(config.data_dir.clone())); let api_handler = Arc::new( @@ -292,6 +293,13 @@ impl Server { // Respects webhook config: skips when disabled or ContainerCrash not subscribed crate::health_monitor::spawn_health_monitor(state_manager.clone(), config.data_dir.clone()); + // Periodic telemetry reporter (every 15 min when opted in) + crate::monitoring::spawn_telemetry_reporter( + metrics_for_telemetry, + Some(state_manager.clone()), + config.data_dir.clone(), + ); + Ok(Self { _config: config, _identity: identity,