feat(TASK-12): periodic telemetry reporter — 15min interval, collector POST
Background task spawned on server startup: every 15 minutes, checks opt-in status, builds anonymous health report (node ID hash, version, uptime, CPU/RAM/disk %, container states, recent alerts), saves to disk, and POSTs to TELEMETRY_COLLECTOR_URL env var if configured. Non-fatal on failure. Fixed FiredAlert field references (kind not rule_type, timestamp not fired_at) in both monitoring and analytics modules. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
b28b0f335f
commit
bce70b13f2
@ -172,13 +172,12 @@ impl RpcHandler {
|
||||
.unwrap_or(0);
|
||||
|
||||
// Recent alerts from metrics store
|
||||
let recent_alerts: Vec<serde_json::Value> = self.metrics_store.get_alerts().await
|
||||
let recent_alerts: Vec<serde_json::Value> = self.metrics_store.get_fired_alerts(10).await
|
||||
.into_iter()
|
||||
.take(10)
|
||||
.map(|a| serde_json::json!({
|
||||
"rule": format!("{:?}", a.rule_type),
|
||||
"rule": format!("{:?}", a.kind),
|
||||
"message": a.message,
|
||||
"fired_at": a.fired_at.to_rfc3339(),
|
||||
"timestamp": a.timestamp,
|
||||
}))
|
||||
.collect();
|
||||
|
||||
|
||||
@ -599,6 +599,149 @@ pub fn spawn_metrics_collector(
|
||||
});
|
||||
}
|
||||
|
||||
/// Spawn the periodic telemetry reporter (runs every 15 minutes when opt-in enabled).
|
||||
/// Collects anonymous health data and saves to disk. Posts to central collector if configured.
|
||||
pub fn spawn_telemetry_reporter(
|
||||
store: Arc<MetricsStore>,
|
||||
state: Option<Arc<crate::state::StateManager>>,
|
||||
data_dir: PathBuf,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
// Wait 60s for system to fully stabilize
|
||||
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
|
||||
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(900)); // 15 min
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
// Check if telemetry is opted in
|
||||
let config_path = data_dir.join("analytics-config.json");
|
||||
let enabled = match tokio::fs::read_to_string(&config_path).await {
|
||||
Ok(data) => serde_json::from_str::<serde_json::Value>(&data)
|
||||
.ok()
|
||||
.and_then(|c| c["enabled"].as_bool())
|
||||
.unwrap_or(false),
|
||||
Err(_) => false,
|
||||
};
|
||||
if !enabled {
|
||||
debug!("Telemetry disabled — skipping report");
|
||||
continue;
|
||||
}
|
||||
|
||||
match build_telemetry_report(&store, &state, &data_dir).await {
|
||||
Ok(report) => {
|
||||
// Save latest report to disk
|
||||
let report_path = data_dir.join("telemetry-latest.json");
|
||||
if let Ok(json) = serde_json::to_string_pretty(&report) {
|
||||
let _ = tokio::fs::write(&report_path, &json).await;
|
||||
}
|
||||
|
||||
// POST to central collector if configured
|
||||
let collector_url = std::env::var("TELEMETRY_COLLECTOR_URL").ok();
|
||||
if let Some(url) = collector_url {
|
||||
match post_telemetry_report(&url, &report).await {
|
||||
Ok(_) => info!("Telemetry report sent to collector"),
|
||||
Err(e) => warn!("Failed to send telemetry report: {}", e),
|
||||
}
|
||||
} else {
|
||||
debug!("Telemetry report saved locally (no collector URL configured)");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to build telemetry report: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Build an anonymous telemetry report from current system state.
|
||||
async fn build_telemetry_report(
|
||||
store: &Arc<MetricsStore>,
|
||||
state: &Option<Arc<crate::state::StateManager>>,
|
||||
data_dir: &std::path::Path,
|
||||
) -> anyhow::Result<serde_json::Value> {
|
||||
// Anonymous node ID — truncated SHA-256 hash of pubkey
|
||||
let (node_id, version, container_count, running_count, peer_count) = if let Some(ref sm) = state {
|
||||
let (data, _) = sm.get_snapshot().await;
|
||||
let id = {
|
||||
use sha2::{Sha256, Digest};
|
||||
let mut h = Sha256::new();
|
||||
h.update(data.server_info.pubkey.as_bytes());
|
||||
hex::encode(h.finalize())[..16].to_string()
|
||||
};
|
||||
let running = data.package_data.values()
|
||||
.filter(|p| matches!(p.state, crate::data_model::PackageState::Running))
|
||||
.count();
|
||||
(id, data.server_info.version.clone(), data.package_data.len(), running, data.peer_health.len())
|
||||
} else {
|
||||
("unknown".to_string(), "unknown".to_string(), 0, 0, 0)
|
||||
};
|
||||
|
||||
// System info
|
||||
let cpu_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(0);
|
||||
let uptime_secs = tokio::fs::read_to_string("/proc/uptime").await
|
||||
.ok()
|
||||
.and_then(|s| s.split_whitespace().next()?.parse::<f64>().ok())
|
||||
.map(|f| f as u64)
|
||||
.unwrap_or(0);
|
||||
|
||||
// Latest metrics snapshot
|
||||
let latest = store.latest().await;
|
||||
let (cpu_pct, mem_pct, disk_pct) = latest.map(|s| {
|
||||
let mem_total = s.system.mem_total_bytes as f64;
|
||||
let disk_total = s.system.disk_total_bytes as f64;
|
||||
(
|
||||
s.system.cpu_usage_percent,
|
||||
if mem_total > 0.0 { (s.system.mem_used_bytes as f64 / mem_total) * 100.0 } else { 0.0 },
|
||||
if disk_total > 0.0 { (s.system.disk_used_bytes as f64 / disk_total) * 100.0 } else { 0.0 },
|
||||
)
|
||||
}).unwrap_or((0.0, 0.0, 0.0));
|
||||
|
||||
// Recent alerts
|
||||
let recent_alerts: Vec<serde_json::Value> = store.get_fired_alerts(10).await
|
||||
.into_iter()
|
||||
.map(|a| serde_json::json!({
|
||||
"rule": format!("{:?}", a.kind),
|
||||
"message": a.message,
|
||||
"timestamp": a.timestamp,
|
||||
}))
|
||||
.collect();
|
||||
|
||||
Ok(serde_json::json!({
|
||||
"node_id": node_id,
|
||||
"version": version,
|
||||
"uptime_secs": uptime_secs,
|
||||
"cpu_cores": cpu_cores,
|
||||
"cpu_pct": (cpu_pct * 10.0).round() / 10.0,
|
||||
"mem_pct": (mem_pct * 10.0).round() / 10.0,
|
||||
"disk_pct": (disk_pct * 10.0).round() / 10.0,
|
||||
"container_count": container_count,
|
||||
"running_count": running_count,
|
||||
"federation_peers": peer_count,
|
||||
"recent_alerts": recent_alerts,
|
||||
"reported_at": chrono::Utc::now().to_rfc3339(),
|
||||
}))
|
||||
}
|
||||
|
||||
/// POST a telemetry report to the central collector.
|
||||
async fn post_telemetry_report(url: &str, report: &serde_json::Value) -> anyhow::Result<()> {
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(std::time::Duration::from_secs(10))
|
||||
.build()?;
|
||||
let response = client.post(url)
|
||||
.header("Content-Type", "application/json")
|
||||
.header("User-Agent", "Archipelago-Telemetry/1.0")
|
||||
.json(report)
|
||||
.send()
|
||||
.await?;
|
||||
if !response.status().is_success() {
|
||||
anyhow::bail!("Collector returned {}", response.status());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@ -103,6 +103,7 @@ impl Server {
|
||||
|
||||
// Create metrics store and spawn background collector
|
||||
let metrics_store = Arc::new(MetricsStore::with_data_dir(config.data_dir.clone()));
|
||||
let metrics_for_telemetry = metrics_store.clone();
|
||||
crate::monitoring::spawn_metrics_collector(metrics_store.clone(), Some(state_manager.clone()), Some(config.data_dir.clone()));
|
||||
|
||||
let api_handler = Arc::new(
|
||||
@ -292,6 +293,13 @@ impl Server {
|
||||
// Respects webhook config: skips when disabled or ContainerCrash not subscribed
|
||||
crate::health_monitor::spawn_health_monitor(state_manager.clone(), config.data_dir.clone());
|
||||
|
||||
// Periodic telemetry reporter (every 15 min when opted in)
|
||||
crate::monitoring::spawn_telemetry_reporter(
|
||||
metrics_for_telemetry,
|
||||
Some(state_manager.clone()),
|
||||
config.data_dir.clone(),
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
_config: config,
|
||||
_identity: identity,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user