fix: add health RPC handler, Nostr connect timeouts, atomic backup restore, nginx rate limits
- R1: Add health RPC endpoint with crash recovery status, uptime, and version - R2: Wrap all 5 Nostr client.connect() calls in 10s timeout - R3: Make backup restore atomic with staging dir and rollback on failure - I1: Add rate limiting, body size, and proxy timeouts to unauthenticated nginx endpoints Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
9fc13f3079
commit
b57ca4f171
@ -147,8 +147,13 @@ impl ApiHandler {
|
||||
|
||||
// Health — unauthenticated, returns JSON with service status
|
||||
(Method::GET, "/health") => {
|
||||
let recovery_complete = crate::crash_recovery::is_recovery_complete();
|
||||
let uptime = crate::crash_recovery::uptime_seconds();
|
||||
let health_status = if recovery_complete { "ok" } else { "degraded" };
|
||||
let status = serde_json::json!({
|
||||
"status": "ok",
|
||||
"status": health_status,
|
||||
"crash_recovery_complete": recovery_complete,
|
||||
"uptime_seconds": uptime,
|
||||
"version": env!("CARGO_PKG_VERSION"),
|
||||
"services": {
|
||||
"rpc": true,
|
||||
|
||||
@ -440,6 +440,7 @@ impl RpcHandler {
|
||||
let result = match rpc_req.method.as_str() {
|
||||
"echo" => self.handle_echo(params).await,
|
||||
"server.echo" => self.handle_echo(params).await,
|
||||
"health" => self.handle_health().await,
|
||||
"auth.login" => self.handle_auth_login(params).await,
|
||||
"auth.logout" => self.handle_auth_logout().await,
|
||||
"auth.changePassword" => self.handle_auth_change_password(params, &session_token).await,
|
||||
@ -1040,6 +1041,18 @@ impl RpcHandler {
|
||||
}
|
||||
Ok(serde_json::json!({ "message": "Hello from Archipelago!" }))
|
||||
}
|
||||
|
||||
async fn handle_health(&self) -> Result<serde_json::Value> {
|
||||
let recovery_complete = crate::crash_recovery::is_recovery_complete();
|
||||
let uptime = crate::crash_recovery::uptime_seconds();
|
||||
let status = if recovery_complete { "ok" } else { "degraded" };
|
||||
Ok(serde_json::json!({
|
||||
"status": status,
|
||||
"crash_recovery_complete": recovery_complete,
|
||||
"uptime_seconds": uptime,
|
||||
"version": env!("CARGO_PKG_VERSION"),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
/// Derive a CSRF token from the session token via HMAC.
|
||||
|
||||
@ -120,6 +120,9 @@ pub async fn create_full_backup(
|
||||
}
|
||||
|
||||
/// Restore a full backup from an encrypted archive.
|
||||
///
|
||||
/// Uses atomic staging: extracts to a temporary directory first, validates,
|
||||
/// then swaps into place with rollback on failure.
|
||||
pub async fn restore_full_backup(
|
||||
data_dir: &Path,
|
||||
backup_id: &str,
|
||||
@ -134,20 +137,127 @@ pub async fn restore_full_backup(
|
||||
.await
|
||||
.context("Failed to read backup file")?;
|
||||
|
||||
// Check disk space: need at least 2x backup size free
|
||||
let backup_size = encrypted.len() as u64;
|
||||
if let Ok(output) = tokio::process::Command::new("df")
|
||||
.args(["--output=avail", "-B1"])
|
||||
.arg(data_dir)
|
||||
.output()
|
||||
.await
|
||||
{
|
||||
if let Ok(stdout) = String::from_utf8(output.stdout) {
|
||||
if let Some(avail) = stdout.lines().nth(1).and_then(|l| l.trim().parse::<u64>().ok()) {
|
||||
if avail < backup_size * 2 {
|
||||
anyhow::bail!(
|
||||
"Insufficient disk space for restore: need {}MB, have {}MB",
|
||||
backup_size * 2 / (1024 * 1024),
|
||||
avail / (1024 * 1024),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let tar_gz_data = decrypt_data(&encrypted, passphrase)?;
|
||||
|
||||
// Extract to data_dir
|
||||
tokio::task::spawn_blocking({
|
||||
let data_dir = data_dir.to_path_buf();
|
||||
move || extract_tar_gz(&data_dir, &tar_gz_data)
|
||||
})
|
||||
.await?
|
||||
.context("Failed to extract backup")?;
|
||||
let staging_dir = data_dir.join(".restore-staging");
|
||||
let rollback_dir = data_dir.join(".restore-backup");
|
||||
|
||||
info!(id = %backup_id, "Backup restored");
|
||||
// Clean up any previous failed restore
|
||||
let _ = fs::remove_dir_all(&staging_dir).await;
|
||||
let _ = fs::remove_dir_all(&rollback_dir).await;
|
||||
|
||||
// Extract to staging directory
|
||||
fs::create_dir_all(&staging_dir)
|
||||
.await
|
||||
.context("Failed to create staging directory")?;
|
||||
|
||||
let staging_clone = staging_dir.clone();
|
||||
if let Err(e) = tokio::task::spawn_blocking(move || extract_tar_gz(&staging_clone, &tar_gz_data))
|
||||
.await?
|
||||
{
|
||||
let _ = fs::remove_dir_all(&staging_dir).await;
|
||||
return Err(e).context("Failed to extract backup to staging");
|
||||
}
|
||||
|
||||
// Validate staging has required files
|
||||
let has_identity = staging_dir.join("identity").exists();
|
||||
if !has_identity {
|
||||
let _ = fs::remove_dir_all(&staging_dir).await;
|
||||
anyhow::bail!("Invalid backup: missing identity directory");
|
||||
}
|
||||
|
||||
// Move current data to rollback directory
|
||||
fs::create_dir_all(&rollback_dir)
|
||||
.await
|
||||
.context("Failed to create rollback directory")?;
|
||||
|
||||
for dir_name in BACKUP_DIRS {
|
||||
let src = data_dir.join(dir_name);
|
||||
if src.exists() {
|
||||
let dst = rollback_dir.join(dir_name);
|
||||
if let Err(e) = fs::rename(&src, &dst).await {
|
||||
// Rollback: restore what we already moved
|
||||
info!("Restore failed during move, rolling back: {}", e);
|
||||
restore_from_rollback(data_dir, &rollback_dir).await;
|
||||
let _ = fs::remove_dir_all(&staging_dir).await;
|
||||
let _ = fs::remove_dir_all(&rollback_dir).await;
|
||||
return Err(e).context("Failed to move current data to rollback");
|
||||
}
|
||||
}
|
||||
}
|
||||
for file_name in BACKUP_FILES {
|
||||
let src = data_dir.join(file_name);
|
||||
if src.exists() {
|
||||
let dst = rollback_dir.join(file_name);
|
||||
let _ = fs::rename(&src, &dst).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Move staging contents to data_dir
|
||||
if let Err(e) = move_staging_to_data(data_dir, &staging_dir).await {
|
||||
info!("Restore failed during staging swap, rolling back: {}", e);
|
||||
restore_from_rollback(data_dir, &rollback_dir).await;
|
||||
let _ = fs::remove_dir_all(&staging_dir).await;
|
||||
let _ = fs::remove_dir_all(&rollback_dir).await;
|
||||
return Err(e).context("Failed to move staging data to data_dir");
|
||||
}
|
||||
|
||||
// Clean up
|
||||
let _ = fs::remove_dir_all(&staging_dir).await;
|
||||
let _ = fs::remove_dir_all(&rollback_dir).await;
|
||||
|
||||
info!(id = %backup_id, "Backup restored atomically");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Move staging directory contents into data_dir.
|
||||
async fn move_staging_to_data(data_dir: &Path, staging_dir: &Path) -> Result<()> {
|
||||
let mut entries = fs::read_dir(staging_dir)
|
||||
.await
|
||||
.context("Failed to read staging dir")?;
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let src = entry.path();
|
||||
let name = entry.file_name();
|
||||
let dst = data_dir.join(&name);
|
||||
fs::rename(&src, &dst)
|
||||
.await
|
||||
.with_context(|| format!("Failed to move {:?} from staging", name))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Restore data from rollback directory back to data_dir.
|
||||
async fn restore_from_rollback(data_dir: &Path, rollback_dir: &Path) {
|
||||
if let Ok(mut entries) = fs::read_dir(rollback_dir).await {
|
||||
while let Ok(Some(entry)) = entries.next_entry().await {
|
||||
let src = entry.path();
|
||||
let dst = data_dir.join(entry.file_name());
|
||||
let _ = fs::rename(&src, &dst).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// List available backups by reading metadata files.
|
||||
pub async fn list_backups(data_dir: &Path) -> Result<Vec<BackupMetadata>> {
|
||||
let backups_dir = data_dir.join("backups");
|
||||
|
||||
@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::fs;
|
||||
use tracing::{info, warn};
|
||||
|
||||
@ -23,6 +24,22 @@ const USER_STOPPED_FILE: &str = "user-stopped.json";
|
||||
/// Shared flag: true once boot recovery is complete. Health monitor should wait for this.
|
||||
pub static RECOVERY_COMPLETE: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
/// Process start time for uptime calculation.
|
||||
static START_TIME: std::sync::OnceLock<Instant> = std::sync::OnceLock::new();
|
||||
|
||||
/// Initialize the start time. Call once at startup.
|
||||
pub fn init_start_time() {
|
||||
START_TIME.get_or_init(Instant::now);
|
||||
}
|
||||
|
||||
/// Get uptime in seconds since process start.
|
||||
pub fn uptime_seconds() -> u64 {
|
||||
START_TIME
|
||||
.get()
|
||||
.map(|t| t.elapsed().as_secs())
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Mark boot recovery as complete. Call after crash recovery + start_stopped_containers finish.
|
||||
pub fn mark_recovery_complete() {
|
||||
RECOVERY_COMPLETE.store(true, Ordering::SeqCst);
|
||||
|
||||
@ -53,6 +53,7 @@ const DEV_DEFAULT_PASSWORD: &str = "password123";
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let startup_start = std::time::Instant::now();
|
||||
crash_recovery::init_start_time();
|
||||
|
||||
// Initialize tracing
|
||||
tracing_subscriber::fmt()
|
||||
|
||||
@ -16,7 +16,9 @@ use nostr_sdk::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::net::SocketAddr;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use tokio::fs;
|
||||
use tracing::warn;
|
||||
|
||||
const NOSTR_SECRET_FILE: &str = "nostr_secret";
|
||||
|
||||
@ -121,7 +123,9 @@ pub async fn publish_presence(
|
||||
for url in relays {
|
||||
let _ = client.add_relay(url).await;
|
||||
}
|
||||
client.connect().await;
|
||||
if tokio::time::timeout(Duration::from_secs(10), client.connect()).await.is_err() {
|
||||
warn!("Nostr relay connection timed out after 10s, continuing anyway");
|
||||
}
|
||||
|
||||
let builder = EventBuilder::new(Kind::Custom(30078), content)
|
||||
.tag(Tag::identifier("archipelago-node"));
|
||||
@ -158,7 +162,9 @@ pub async fn discover_nodes(
|
||||
for url in relays {
|
||||
let _ = client.add_relay(url).await;
|
||||
}
|
||||
client.connect().await;
|
||||
if tokio::time::timeout(Duration::from_secs(10), client.connect()).await.is_err() {
|
||||
warn!("Nostr relay connection timed out after 10s, continuing anyway");
|
||||
}
|
||||
|
||||
let filter = Filter::new()
|
||||
.kind(Kind::Custom(30078))
|
||||
@ -259,7 +265,9 @@ pub async fn send_connect_request(
|
||||
for url in relays {
|
||||
let _ = client.add_relay(url).await;
|
||||
}
|
||||
client.connect().await;
|
||||
if tokio::time::timeout(Duration::from_secs(10), client.connect()).await.is_err() {
|
||||
warn!("Nostr relay connection timed out after 10s, continuing anyway");
|
||||
}
|
||||
|
||||
// Kind 4 encrypted DM with p-tag for recipient
|
||||
let builder = EventBuilder::new(Kind::EncryptedDirectMessage, encrypted)
|
||||
@ -319,7 +327,9 @@ pub async fn send_connect_response(
|
||||
for url in relays {
|
||||
let _ = client.add_relay(url).await;
|
||||
}
|
||||
client.connect().await;
|
||||
if tokio::time::timeout(Duration::from_secs(10), client.connect()).await.is_err() {
|
||||
warn!("Nostr relay connection timed out after 10s, continuing anyway");
|
||||
}
|
||||
|
||||
let builder = EventBuilder::new(Kind::EncryptedDirectMessage, encrypted)
|
||||
.tag(Tag::public_key(recipient_pk));
|
||||
@ -355,7 +365,9 @@ pub async fn poll_handshakes(
|
||||
for url in relays {
|
||||
let _ = client.add_relay(url).await;
|
||||
}
|
||||
client.connect().await;
|
||||
if tokio::time::timeout(Duration::from_secs(10), client.connect()).await.is_err() {
|
||||
warn!("Nostr relay connection timed out after 10s, continuing anyway");
|
||||
}
|
||||
|
||||
// Query for encrypted DMs addressed to us
|
||||
let mut filter = Filter::new()
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
# Rate limit zones
|
||||
limit_req_zone $binary_remote_addr zone=rpc:10m rate=20r/s;
|
||||
limit_req_zone $binary_remote_addr zone=auth:10m rate=3r/s;
|
||||
limit_req_zone $binary_remote_addr zone=peer:10m rate=10r/s;
|
||||
|
||||
server {
|
||||
listen 80;
|
||||
@ -114,12 +115,17 @@ server {
|
||||
|
||||
# Peer-to-peer node messaging (receives from other nodes over Tor)
|
||||
location /archipelago/ {
|
||||
limit_req zone=peer burst=20 nodelay;
|
||||
client_max_body_size 10m;
|
||||
proxy_connect_timeout 30s;
|
||||
proxy_read_timeout 60s;
|
||||
proxy_send_timeout 30s;
|
||||
proxy_pass http://127.0.0.1:5678;
|
||||
proxy_http_version 1.1;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
}
|
||||
|
||||
|
||||
# Proxy API requests to backend
|
||||
location /rpc/ {
|
||||
limit_req zone=rpc burst=40 nodelay;
|
||||
@ -165,6 +171,11 @@ server {
|
||||
|
||||
# Content sharing — peer access over Tor (no auth)
|
||||
location /content {
|
||||
limit_req zone=peer burst=20 nodelay;
|
||||
client_max_body_size 10m;
|
||||
proxy_connect_timeout 30s;
|
||||
proxy_read_timeout 60s;
|
||||
proxy_send_timeout 30s;
|
||||
proxy_pass http://127.0.0.1:5678;
|
||||
proxy_http_version 1.1;
|
||||
proxy_set_header Host $host;
|
||||
@ -173,6 +184,11 @@ server {
|
||||
|
||||
# DWN endpoints — peer access over Tor (no auth)
|
||||
location /dwn {
|
||||
limit_req zone=peer burst=20 nodelay;
|
||||
client_max_body_size 10m;
|
||||
proxy_connect_timeout 30s;
|
||||
proxy_read_timeout 60s;
|
||||
proxy_send_timeout 30s;
|
||||
proxy_pass http://127.0.0.1:5678;
|
||||
proxy_http_version 1.1;
|
||||
proxy_set_header Host $host;
|
||||
@ -790,6 +806,11 @@ server {
|
||||
}
|
||||
|
||||
location /archipelago/ {
|
||||
limit_req zone=peer burst=20 nodelay;
|
||||
client_max_body_size 10m;
|
||||
proxy_connect_timeout 30s;
|
||||
proxy_read_timeout 60s;
|
||||
proxy_send_timeout 30s;
|
||||
proxy_pass http://127.0.0.1:5678;
|
||||
proxy_http_version 1.1;
|
||||
proxy_set_header Host $host;
|
||||
@ -821,6 +842,11 @@ server {
|
||||
|
||||
# Content sharing — peer access over Tor (no auth)
|
||||
location /content {
|
||||
limit_req zone=peer burst=20 nodelay;
|
||||
client_max_body_size 10m;
|
||||
proxy_connect_timeout 30s;
|
||||
proxy_read_timeout 60s;
|
||||
proxy_send_timeout 30s;
|
||||
proxy_pass http://127.0.0.1:5678;
|
||||
proxy_http_version 1.1;
|
||||
proxy_set_header Host $host;
|
||||
@ -829,6 +855,11 @@ server {
|
||||
|
||||
# DWN endpoints — peer access over Tor (no auth)
|
||||
location /dwn {
|
||||
limit_req zone=peer burst=20 nodelay;
|
||||
client_max_body_size 10m;
|
||||
proxy_connect_timeout 30s;
|
||||
proxy_read_timeout 60s;
|
||||
proxy_send_timeout 30s;
|
||||
proxy_pass http://127.0.0.1:5678;
|
||||
proxy_http_version 1.1;
|
||||
proxy_set_header Host $host;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user