diff --git a/core/archipelago/src/api/handler.rs b/core/archipelago/src/api/handler.rs index fa57d014..1fc03f7d 100644 --- a/core/archipelago/src/api/handler.rs +++ b/core/archipelago/src/api/handler.rs @@ -147,8 +147,13 @@ impl ApiHandler { // Health — unauthenticated, returns JSON with service status (Method::GET, "/health") => { + let recovery_complete = crate::crash_recovery::is_recovery_complete(); + let uptime = crate::crash_recovery::uptime_seconds(); + let health_status = if recovery_complete { "ok" } else { "degraded" }; let status = serde_json::json!({ - "status": "ok", + "status": health_status, + "crash_recovery_complete": recovery_complete, + "uptime_seconds": uptime, "version": env!("CARGO_PKG_VERSION"), "services": { "rpc": true, diff --git a/core/archipelago/src/api/rpc/mod.rs b/core/archipelago/src/api/rpc/mod.rs index 2a38a2e0..22842e52 100644 --- a/core/archipelago/src/api/rpc/mod.rs +++ b/core/archipelago/src/api/rpc/mod.rs @@ -440,6 +440,7 @@ impl RpcHandler { let result = match rpc_req.method.as_str() { "echo" => self.handle_echo(params).await, "server.echo" => self.handle_echo(params).await, + "health" => self.handle_health().await, "auth.login" => self.handle_auth_login(params).await, "auth.logout" => self.handle_auth_logout().await, "auth.changePassword" => self.handle_auth_change_password(params, &session_token).await, @@ -1040,6 +1041,18 @@ impl RpcHandler { } Ok(serde_json::json!({ "message": "Hello from Archipelago!" })) } + + async fn handle_health(&self) -> Result { + let recovery_complete = crate::crash_recovery::is_recovery_complete(); + let uptime = crate::crash_recovery::uptime_seconds(); + let status = if recovery_complete { "ok" } else { "degraded" }; + Ok(serde_json::json!({ + "status": status, + "crash_recovery_complete": recovery_complete, + "uptime_seconds": uptime, + "version": env!("CARGO_PKG_VERSION"), + })) + } } /// Derive a CSRF token from the session token via HMAC. diff --git a/core/archipelago/src/backup/full.rs b/core/archipelago/src/backup/full.rs index 1b9aef67..7cb70c93 100644 --- a/core/archipelago/src/backup/full.rs +++ b/core/archipelago/src/backup/full.rs @@ -120,6 +120,9 @@ pub async fn create_full_backup( } /// Restore a full backup from an encrypted archive. +/// +/// Uses atomic staging: extracts to a temporary directory first, validates, +/// then swaps into place with rollback on failure. pub async fn restore_full_backup( data_dir: &Path, backup_id: &str, @@ -134,20 +137,127 @@ pub async fn restore_full_backup( .await .context("Failed to read backup file")?; + // Check disk space: need at least 2x backup size free + let backup_size = encrypted.len() as u64; + if let Ok(output) = tokio::process::Command::new("df") + .args(["--output=avail", "-B1"]) + .arg(data_dir) + .output() + .await + { + if let Ok(stdout) = String::from_utf8(output.stdout) { + if let Some(avail) = stdout.lines().nth(1).and_then(|l| l.trim().parse::().ok()) { + if avail < backup_size * 2 { + anyhow::bail!( + "Insufficient disk space for restore: need {}MB, have {}MB", + backup_size * 2 / (1024 * 1024), + avail / (1024 * 1024), + ); + } + } + } + } + let tar_gz_data = decrypt_data(&encrypted, passphrase)?; - // Extract to data_dir - tokio::task::spawn_blocking({ - let data_dir = data_dir.to_path_buf(); - move || extract_tar_gz(&data_dir, &tar_gz_data) - }) - .await? - .context("Failed to extract backup")?; + let staging_dir = data_dir.join(".restore-staging"); + let rollback_dir = data_dir.join(".restore-backup"); - info!(id = %backup_id, "Backup restored"); + // Clean up any previous failed restore + let _ = fs::remove_dir_all(&staging_dir).await; + let _ = fs::remove_dir_all(&rollback_dir).await; + + // Extract to staging directory + fs::create_dir_all(&staging_dir) + .await + .context("Failed to create staging directory")?; + + let staging_clone = staging_dir.clone(); + if let Err(e) = tokio::task::spawn_blocking(move || extract_tar_gz(&staging_clone, &tar_gz_data)) + .await? + { + let _ = fs::remove_dir_all(&staging_dir).await; + return Err(e).context("Failed to extract backup to staging"); + } + + // Validate staging has required files + let has_identity = staging_dir.join("identity").exists(); + if !has_identity { + let _ = fs::remove_dir_all(&staging_dir).await; + anyhow::bail!("Invalid backup: missing identity directory"); + } + + // Move current data to rollback directory + fs::create_dir_all(&rollback_dir) + .await + .context("Failed to create rollback directory")?; + + for dir_name in BACKUP_DIRS { + let src = data_dir.join(dir_name); + if src.exists() { + let dst = rollback_dir.join(dir_name); + if let Err(e) = fs::rename(&src, &dst).await { + // Rollback: restore what we already moved + info!("Restore failed during move, rolling back: {}", e); + restore_from_rollback(data_dir, &rollback_dir).await; + let _ = fs::remove_dir_all(&staging_dir).await; + let _ = fs::remove_dir_all(&rollback_dir).await; + return Err(e).context("Failed to move current data to rollback"); + } + } + } + for file_name in BACKUP_FILES { + let src = data_dir.join(file_name); + if src.exists() { + let dst = rollback_dir.join(file_name); + let _ = fs::rename(&src, &dst).await; + } + } + + // Move staging contents to data_dir + if let Err(e) = move_staging_to_data(data_dir, &staging_dir).await { + info!("Restore failed during staging swap, rolling back: {}", e); + restore_from_rollback(data_dir, &rollback_dir).await; + let _ = fs::remove_dir_all(&staging_dir).await; + let _ = fs::remove_dir_all(&rollback_dir).await; + return Err(e).context("Failed to move staging data to data_dir"); + } + + // Clean up + let _ = fs::remove_dir_all(&staging_dir).await; + let _ = fs::remove_dir_all(&rollback_dir).await; + + info!(id = %backup_id, "Backup restored atomically"); Ok(()) } +/// Move staging directory contents into data_dir. +async fn move_staging_to_data(data_dir: &Path, staging_dir: &Path) -> Result<()> { + let mut entries = fs::read_dir(staging_dir) + .await + .context("Failed to read staging dir")?; + while let Some(entry) = entries.next_entry().await? { + let src = entry.path(); + let name = entry.file_name(); + let dst = data_dir.join(&name); + fs::rename(&src, &dst) + .await + .with_context(|| format!("Failed to move {:?} from staging", name))?; + } + Ok(()) +} + +/// Restore data from rollback directory back to data_dir. +async fn restore_from_rollback(data_dir: &Path, rollback_dir: &Path) { + if let Ok(mut entries) = fs::read_dir(rollback_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let src = entry.path(); + let dst = data_dir.join(entry.file_name()); + let _ = fs::rename(&src, &dst).await; + } + } +} + /// List available backups by reading metadata files. pub async fn list_backups(data_dir: &Path) -> Result> { let backups_dir = data_dir.join("backups"); diff --git a/core/archipelago/src/crash_recovery.rs b/core/archipelago/src/crash_recovery.rs index 4f795075..db7fc835 100644 --- a/core/archipelago/src/crash_recovery.rs +++ b/core/archipelago/src/crash_recovery.rs @@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::time::Instant; use tokio::fs; use tracing::{info, warn}; @@ -23,6 +24,22 @@ const USER_STOPPED_FILE: &str = "user-stopped.json"; /// Shared flag: true once boot recovery is complete. Health monitor should wait for this. pub static RECOVERY_COMPLETE: AtomicBool = AtomicBool::new(false); +/// Process start time for uptime calculation. +static START_TIME: std::sync::OnceLock = std::sync::OnceLock::new(); + +/// Initialize the start time. Call once at startup. +pub fn init_start_time() { + START_TIME.get_or_init(Instant::now); +} + +/// Get uptime in seconds since process start. +pub fn uptime_seconds() -> u64 { + START_TIME + .get() + .map(|t| t.elapsed().as_secs()) + .unwrap_or(0) +} + /// Mark boot recovery as complete. Call after crash recovery + start_stopped_containers finish. pub fn mark_recovery_complete() { RECOVERY_COMPLETE.store(true, Ordering::SeqCst); diff --git a/core/archipelago/src/main.rs b/core/archipelago/src/main.rs index daf27519..4c8d0dcf 100644 --- a/core/archipelago/src/main.rs +++ b/core/archipelago/src/main.rs @@ -53,6 +53,7 @@ const DEV_DEFAULT_PASSWORD: &str = "password123"; #[tokio::main] async fn main() -> Result<()> { let startup_start = std::time::Instant::now(); + crash_recovery::init_start_time(); // Initialize tracing tracing_subscriber::fmt() diff --git a/core/archipelago/src/nostr_handshake.rs b/core/archipelago/src/nostr_handshake.rs index 3a977d67..502f8251 100644 --- a/core/archipelago/src/nostr_handshake.rs +++ b/core/archipelago/src/nostr_handshake.rs @@ -16,7 +16,9 @@ use nostr_sdk::prelude::*; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; use std::path::Path; +use std::time::Duration; use tokio::fs; +use tracing::warn; const NOSTR_SECRET_FILE: &str = "nostr_secret"; @@ -121,7 +123,9 @@ pub async fn publish_presence( for url in relays { let _ = client.add_relay(url).await; } - client.connect().await; + if tokio::time::timeout(Duration::from_secs(10), client.connect()).await.is_err() { + warn!("Nostr relay connection timed out after 10s, continuing anyway"); + } let builder = EventBuilder::new(Kind::Custom(30078), content) .tag(Tag::identifier("archipelago-node")); @@ -158,7 +162,9 @@ pub async fn discover_nodes( for url in relays { let _ = client.add_relay(url).await; } - client.connect().await; + if tokio::time::timeout(Duration::from_secs(10), client.connect()).await.is_err() { + warn!("Nostr relay connection timed out after 10s, continuing anyway"); + } let filter = Filter::new() .kind(Kind::Custom(30078)) @@ -259,7 +265,9 @@ pub async fn send_connect_request( for url in relays { let _ = client.add_relay(url).await; } - client.connect().await; + if tokio::time::timeout(Duration::from_secs(10), client.connect()).await.is_err() { + warn!("Nostr relay connection timed out after 10s, continuing anyway"); + } // Kind 4 encrypted DM with p-tag for recipient let builder = EventBuilder::new(Kind::EncryptedDirectMessage, encrypted) @@ -319,7 +327,9 @@ pub async fn send_connect_response( for url in relays { let _ = client.add_relay(url).await; } - client.connect().await; + if tokio::time::timeout(Duration::from_secs(10), client.connect()).await.is_err() { + warn!("Nostr relay connection timed out after 10s, continuing anyway"); + } let builder = EventBuilder::new(Kind::EncryptedDirectMessage, encrypted) .tag(Tag::public_key(recipient_pk)); @@ -355,7 +365,9 @@ pub async fn poll_handshakes( for url in relays { let _ = client.add_relay(url).await; } - client.connect().await; + if tokio::time::timeout(Duration::from_secs(10), client.connect()).await.is_err() { + warn!("Nostr relay connection timed out after 10s, continuing anyway"); + } // Query for encrypted DMs addressed to us let mut filter = Filter::new() diff --git a/image-recipe/configs/nginx-archipelago.conf b/image-recipe/configs/nginx-archipelago.conf index 0e4ca036..144787af 100644 --- a/image-recipe/configs/nginx-archipelago.conf +++ b/image-recipe/configs/nginx-archipelago.conf @@ -1,6 +1,7 @@ # Rate limit zones limit_req_zone $binary_remote_addr zone=rpc:10m rate=20r/s; limit_req_zone $binary_remote_addr zone=auth:10m rate=3r/s; +limit_req_zone $binary_remote_addr zone=peer:10m rate=10r/s; server { listen 80; @@ -114,12 +115,17 @@ server { # Peer-to-peer node messaging (receives from other nodes over Tor) location /archipelago/ { + limit_req zone=peer burst=20 nodelay; + client_max_body_size 10m; + proxy_connect_timeout 30s; + proxy_read_timeout 60s; + proxy_send_timeout 30s; proxy_pass http://127.0.0.1:5678; proxy_http_version 1.1; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; } - + # Proxy API requests to backend location /rpc/ { limit_req zone=rpc burst=40 nodelay; @@ -165,6 +171,11 @@ server { # Content sharing — peer access over Tor (no auth) location /content { + limit_req zone=peer burst=20 nodelay; + client_max_body_size 10m; + proxy_connect_timeout 30s; + proxy_read_timeout 60s; + proxy_send_timeout 30s; proxy_pass http://127.0.0.1:5678; proxy_http_version 1.1; proxy_set_header Host $host; @@ -173,6 +184,11 @@ server { # DWN endpoints — peer access over Tor (no auth) location /dwn { + limit_req zone=peer burst=20 nodelay; + client_max_body_size 10m; + proxy_connect_timeout 30s; + proxy_read_timeout 60s; + proxy_send_timeout 30s; proxy_pass http://127.0.0.1:5678; proxy_http_version 1.1; proxy_set_header Host $host; @@ -790,6 +806,11 @@ server { } location /archipelago/ { + limit_req zone=peer burst=20 nodelay; + client_max_body_size 10m; + proxy_connect_timeout 30s; + proxy_read_timeout 60s; + proxy_send_timeout 30s; proxy_pass http://127.0.0.1:5678; proxy_http_version 1.1; proxy_set_header Host $host; @@ -821,6 +842,11 @@ server { # Content sharing — peer access over Tor (no auth) location /content { + limit_req zone=peer burst=20 nodelay; + client_max_body_size 10m; + proxy_connect_timeout 30s; + proxy_read_timeout 60s; + proxy_send_timeout 30s; proxy_pass http://127.0.0.1:5678; proxy_http_version 1.1; proxy_set_header Host $host; @@ -829,6 +855,11 @@ server { # DWN endpoints — peer access over Tor (no auth) location /dwn { + limit_req zone=peer burst=20 nodelay; + client_max_body_size 10m; + proxy_connect_timeout 30s; + proxy_read_timeout 60s; + proxy_send_timeout 30s; proxy_pass http://127.0.0.1:5678; proxy_http_version 1.1; proxy_set_header Host $host;