From 2443ae6bbacdfbecf319d1a6204bdbf69505ad37 Mon Sep 17 00:00:00 2001 From: Dorian Date: Sat, 21 Mar 2026 01:21:08 +0000 Subject: [PATCH] refactor: replace blocking std::fs and TCP I/O with async tokio equivalents - R6: Convert 6 std::fs calls in session.rs to tokio::fs async - R7: Convert std::fs::read_to_string in docker_packages.rs to async - R8: Convert 3 std::fs calls in port_allocator.rs to async, switch to tokio::sync::Mutex - R9+R10+R11: Fix blocking I/O in node_message.rs and nostr_discovery.rs - R12: Convert electrs_status.rs from sync TCP to async tokio::net with 5s timeouts - R4+R5: Spawn periodic cleanup tasks for endpoint and login rate limiters Co-Authored-By: Claude Opus 4.6 (1M context) --- core/archipelago/src/api/handler.rs | 2 +- core/archipelago/src/api/rpc/mod.rs | 57 ++++++++---- core/archipelago/src/api/rpc/network.rs | 4 +- core/archipelago/src/api/rpc/node.rs | 2 +- core/archipelago/src/api/rpc/package.rs | 19 ++-- .../src/container/docker_packages.rs | 10 ++- core/archipelago/src/electrs_status.rs | 89 ++++++++++--------- core/archipelago/src/node_message.rs | 10 ++- core/archipelago/src/nostr_discovery.rs | 9 +- core/archipelago/src/port_allocator.rs | 22 ++--- core/archipelago/src/server.rs | 6 +- core/archipelago/src/session.rs | 48 ++++++---- 12 files changed, 161 insertions(+), 117 deletions(-) diff --git a/core/archipelago/src/api/handler.rs b/core/archipelago/src/api/handler.rs index 1fc03f7d..c3bdc105 100644 --- a/core/archipelago/src/api/handler.rs +++ b/core/archipelago/src/api/handler.rs @@ -31,7 +31,7 @@ impl ApiHandler { state_manager: Arc, metrics_store: Arc, ) -> Result { - let session_store = SessionStore::new(); + let session_store = SessionStore::new().await; let rpc_handler = Arc::new( RpcHandler::new( config.clone(), diff --git a/core/archipelago/src/api/rpc/mod.rs b/core/archipelago/src/api/rpc/mod.rs index 22842e52..053ab35b 100644 --- a/core/archipelago/src/api/rpc/mod.rs +++ b/core/archipelago/src/api/rpc/mod.rs @@ -42,7 +42,7 @@ use anyhow::{Context, Result}; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use std::net::IpAddr; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use tracing::{debug, error}; #[derive(Debug, Deserialize)] @@ -164,7 +164,7 @@ pub struct RpcHandler { orchestrator: Option>, state_manager: Arc, pub(crate) metrics_store: Arc, - port_allocator: Arc>, + port_allocator: Arc>, pub session_store: SessionStore, login_rate_limiter: LoginRateLimiter, endpoint_rate_limiter: EndpointRateLimiter, @@ -188,7 +188,32 @@ impl RpcHandler { } else { None }; - let port_allocator = Arc::new(Mutex::new(PortAllocator::new(&config.data_dir)?)); + let port_allocator = Arc::new(tokio::sync::Mutex::new(PortAllocator::new(&config.data_dir).await?)); + + let login_rate_limiter = LoginRateLimiter::new(); + let endpoint_rate_limiter = EndpointRateLimiter::new(); + + // Spawn periodic rate limiter cleanup (every 5 minutes) + { + let limiter = endpoint_rate_limiter.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(300)); + loop { + interval.tick().await; + limiter.cleanup().await; + } + }); + } + { + let limiter = login_rate_limiter.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(300)); + loop { + interval.tick().await; + limiter.cleanup().await; + } + }); + } Ok(Self { config, @@ -198,8 +223,8 @@ impl RpcHandler { metrics_store, port_allocator, session_store, - login_rate_limiter: LoginRateLimiter::new(), - endpoint_rate_limiter: EndpointRateLimiter::new(), + login_rate_limiter, + endpoint_rate_limiter, response_cache: ResponseCache::new(5), mesh_service: Arc::new(tokio::sync::RwLock::new(None)), transport_router: Arc::new(tokio::sync::RwLock::new(None)), @@ -253,10 +278,10 @@ impl RpcHandler { // If session invalid, try remember-me token to auto-restore session if !authenticated { if let Some(remember) = extract_cookie(&parts.headers, "remember") { - if crate::session::SessionStore::validate_remember_token(&remember) { + if crate::session::SessionStore::validate_remember_token(&remember).await { // Auto-create a new session from the remember-me token let new_token = self.session_store.create().await; - let new_csrf = derive_csrf_token(&new_token); + let new_csrf = derive_csrf_token(&new_token).await; tracing::info!("Auto-restored session from remember-me token"); new_session_cookies = Some((new_token, new_csrf)); authenticated = true; @@ -325,7 +350,7 @@ impl RpcHandler { use hmac::{Hmac, Mac}; use sha2::Sha256; type HmacSha256 = Hmac; - let secret = SessionStore::load_or_create_remember_secret(); + let secret = SessionStore::load_or_create_remember_secret().await; let mut mac = match HmacSha256::new_from_slice(&secret) { Ok(m) => m, Err(_) => { return Ok(Response::builder().status(500).body(hyper::Body::empty()).unwrap()); } @@ -858,7 +883,7 @@ impl RpcHandler { if let Ok(Some(totp_data)) = self.auth_manager.get_totp_data().await { if let Ok(secret) = crate::totp::decrypt_secret(&totp_data, password) { let token = self.session_store.create_pending(secret).await; - let csrf_token = derive_csrf_token(&token); + let csrf_token = derive_csrf_token(&token).await; response.headers_mut().append( "Set-Cookie", format!("session={}; HttpOnly; SameSite=Lax; Path=/{}", token, self.cookie_suffix()) @@ -884,8 +909,8 @@ impl RpcHandler { } else { // No 2FA: create a full session immediately let token = self.session_store.create().await; - let csrf_token = derive_csrf_token(&token); - let remember_token = self.session_store.create_remember_token(); + let csrf_token = derive_csrf_token(&token).await; + let remember_token = self.session_store.create_remember_token().await; response.headers_mut().append( "Set-Cookie", format!("session={}; HttpOnly; SameSite=Lax; Path=/{}", token, self.cookie_suffix()) @@ -921,8 +946,8 @@ impl RpcHandler { .map(|s| s.to_string()); if let Some(new_token) = new_token_opt { - let csrf_token = derive_csrf_token(&new_token); - let remember_token = self.session_store.create_remember_token(); + let csrf_token = derive_csrf_token(&new_token).await; + let remember_token = self.session_store.create_remember_token().await; response.headers_mut().append( "Set-Cookie", format!( @@ -969,7 +994,7 @@ impl RpcHandler { if rpc_req.method == "auth.changePassword" && rpc_resp.error.is_none() { if let Some(token) = &session_token { let new_token = self.session_store.rotate(token).await; - let csrf_token = derive_csrf_token(&new_token); + let csrf_token = derive_csrf_token(&new_token).await; response.headers_mut().append( "Set-Cookie", format!( @@ -1059,11 +1084,11 @@ impl RpcHandler { /// Deterministic: same session token always produces the same CSRF token. /// Survives backend restarts because it depends only on the session token /// and the on-disk remember secret (not ephemeral state). -fn derive_csrf_token(session_token: &str) -> String { +async fn derive_csrf_token(session_token: &str) -> String { use hmac::{Hmac, Mac}; use sha2::Sha256; type HmacSha256 = Hmac; - let secret = SessionStore::load_or_create_remember_secret(); + let secret = SessionStore::load_or_create_remember_secret().await; let mut mac = HmacSha256::new_from_slice(&secret).expect("HMAC key"); mac.update(format!("csrf:{}", session_token).as_bytes()); hex::encode(mac.finalize().into_bytes()) diff --git a/core/archipelago/src/api/rpc/network.rs b/core/archipelago/src/api/rpc/network.rs index 0e7e20ff..a024eeff 100644 --- a/core/archipelago/src/api/rpc/network.rs +++ b/core/archipelago/src/api/rpc/network.rs @@ -51,7 +51,7 @@ impl RpcHandler { /// Get the current node visibility setting. pub(super) async fn handle_network_get_visibility(&self) -> Result { let vis = self.load_visibility().await; - let tor_address = docker_packages::read_tor_address("archipelago"); + let tor_address = docker_packages::read_tor_address("archipelago").await; Ok(serde_json::json!({ "visibility": vis.as_str(), "tor_address": tor_address, @@ -106,7 +106,7 @@ impl RpcHandler { let (data, _) = self.state_manager.get_snapshot().await; let my_pubkey = &data.server_info.pubkey; let my_did = identity::did_key_from_pubkey_hex(my_pubkey)?; - let my_onion = docker_packages::read_tor_address("archipelago") + let my_onion = docker_packages::read_tor_address("archipelago").await .unwrap_or_default(); let req_msg = serde_json::json!({ diff --git a/core/archipelago/src/api/rpc/node.rs b/core/archipelago/src/api/rpc/node.rs index 1fd68b23..184034fa 100644 --- a/core/archipelago/src/api/rpc/node.rs +++ b/core/archipelago/src/api/rpc/node.rs @@ -71,7 +71,7 @@ impl RpcHandler { } pub(super) async fn handle_node_tor_address(&self) -> Result { - let tor_address = docker_packages::read_tor_address("archipelago"); + let tor_address = docker_packages::read_tor_address("archipelago").await; Ok(serde_json::json!({ "tor_address": tor_address })) } diff --git a/core/archipelago/src/api/rpc/package.rs b/core/archipelago/src/api/rpc/package.rs index e189d729..045ba97e 100644 --- a/core/archipelago/src/api/rpc/package.rs +++ b/core/archipelago/src/api/rpc/package.rs @@ -198,10 +198,8 @@ impl RpcHandler { // App-specific configuration (should come from manifest) let (mut ports, mut volumes, env_vars, custom_command, mut custom_args) = { - let mut allocator = self.port_allocator.lock().map_err(|e| { - anyhow::anyhow!("Port allocator lock poisoned: {}", e) - })?; - get_app_config(package_id, &self.config.host_ip, &mut allocator, &rpc_user, &rpc_pass) + let mut allocator = self.port_allocator.lock().await; + get_app_config(package_id, &self.config.host_ip, &mut allocator, &rpc_user, &rpc_pass).await }; // Fedimint Gateway: auto-detect LND and switch to lnd mode @@ -846,8 +844,9 @@ printtoconsole=1\n", rpcauth_line); } // Release port allocation - if let Ok(mut allocator) = self.port_allocator.lock() { - let _ = allocator.release(package_id); + { + let mut allocator = self.port_allocator.lock().await; + let _ = allocator.release(package_id).await; } // Clean data directories unless preserve_data @@ -1487,7 +1486,7 @@ fn get_memory_limit(app_id: &str) -> &'static str { /// Get app-specific configuration /// Returns: (ports, volumes, env_vars, custom_command, custom_args) -fn get_app_config( +async fn get_app_config( app_id: &str, host_ip: &str, allocator: &mut PortAllocator, @@ -1625,7 +1624,7 @@ fn get_app_config( ), "nextcloud" => { let host_port = allocator - .allocate_or_get(app_id, 8085, 80) + .allocate_or_get(app_id, 8085, 80).await .unwrap_or(8085); ( vec![format!("{}:80", host_port)], @@ -1637,7 +1636,7 @@ fn get_app_config( } "vaultwarden" => { let host_port = allocator - .allocate_or_get(app_id, 8082, 80) + .allocate_or_get(app_id, 8082, 80).await .unwrap_or(8082); ( vec![format!("{}:80", host_port)], @@ -1677,7 +1676,7 @@ fn get_app_config( ), "filebrowser" => { let host_port = allocator - .allocate_or_get(app_id, 8083, 80) + .allocate_or_get(app_id, 8083, 80).await .unwrap_or(8083); ( vec![format!("{}:80", host_port)], diff --git a/core/archipelago/src/container/docker_packages.rs b/core/archipelago/src/container/docker_packages.rs index 1202a9e9..a9aaa08c 100644 --- a/core/archipelago/src/container/docker_packages.rs +++ b/core/archipelago/src/container/docker_packages.rs @@ -141,7 +141,7 @@ impl DockerPackageScanner { // Convert container state to package/service state let (package_state, service_status) = convert_state(&container.state); - let tor_address = read_tor_address(&app_id); + let tor_address = read_tor_address(&app_id).await; let package = PackageDataEntry { state: package_state.clone(), @@ -548,7 +548,7 @@ fn is_real_onion_address(s: &str) -> bool { /// Read real .onion address from Tor hidden service hostname file. /// Service name "archipelago" is for the main web UI (nginx port 80). /// Uses TOR_DATA_DIR env var if set, else /var/lib/archipelago/tor. -pub fn read_tor_address(app_id: &str) -> Option { +pub async fn read_tor_address(app_id: &str) -> Option { let service = tor_service_name(app_id)?; let base = std::env::var("TOR_DATA_DIR").unwrap_or_else(|_| "/var/lib/archipelago/tor".to_string()); @@ -558,7 +558,8 @@ pub fn read_tor_address(app_id: &str) -> Option { .unwrap_or(std::path::Path::new("/var/lib/archipelago")) .join("tor-hostnames") .join(service); - if let Some(addr) = std::fs::read_to_string(&hostnames_path) + if let Some(addr) = tokio::fs::read_to_string(&hostnames_path) + .await .ok() .map(|s| s.trim().to_string()) .filter(|s| s.ends_with(".onion") && !s.is_empty()) @@ -570,7 +571,8 @@ pub fn read_tor_address(app_id: &str) -> Option { let path = std::path::Path::new(&base) .join(format!("hidden_service_{}", service)) .join("hostname"); - std::fs::read_to_string(&path) + tokio::fs::read_to_string(&path) + .await .ok() .map(|s| s.trim().to_string()) .filter(|s| s.ends_with(".onion") && !s.is_empty()) diff --git a/core/archipelago/src/electrs_status.rs b/core/archipelago/src/electrs_status.rs index 4055bc80..69424acb 100644 --- a/core/archipelago/src/electrs_status.rs +++ b/core/archipelago/src/electrs_status.rs @@ -2,9 +2,9 @@ use anyhow::{Context, Result}; use serde::Serialize; -use std::io::{BufRead, BufReader, Write}; -use std::net::TcpStream; use std::time::Duration; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpStream; const ELECTRUMX_HOST: &str = "127.0.0.1"; const ELECTRUMX_PORT: u16 = 50001; @@ -35,16 +35,18 @@ pub struct ElectrsSyncStatus { } /// Get the total size of a directory in bytes. -fn dir_size_bytes(path: &str) -> u64 { +async fn dir_size_bytes(path: &str) -> u64 { let mut total: u64 = 0; - if let Ok(entries) = std::fs::read_dir(path) { - for entry in entries.flatten() { - let path = entry.path(); - if path.is_dir() { - total += dir_size_bytes(&path.to_string_lossy()); - } else if let Ok(meta) = entry.metadata() { - total += meta.len(); - } + let mut entries = match tokio::fs::read_dir(path).await { + Ok(entries) => entries, + Err(_) => return 0, + }; + while let Ok(Some(entry)) = entries.next_entry().await { + let entry_path = entry.path(); + if entry_path.is_dir() { + total += Box::pin(dir_size_bytes(&entry_path.to_string_lossy())).await; + } else if let Ok(meta) = entry.metadata().await { + total += meta.len(); } } total @@ -62,25 +64,39 @@ fn format_bytes(bytes: u64) -> String { } /// Fetch ElectrumX indexed height via Electrum protocol (TCP JSON-RPC). -fn electrumx_indexed_height() -> Result { - let mut stream = TcpStream::connect((ELECTRUMX_HOST, ELECTRUMX_PORT)) - .context("Failed to connect to ElectrumX")?; - stream - .set_read_timeout(Some(Duration::from_secs(5))) - .context("set_read_timeout")?; - stream - .set_write_timeout(Some(Duration::from_secs(5))) - .context("set_write_timeout")?; +async fn electrumx_indexed_height() -> Result { + let timeout_duration = Duration::from_secs(5); + + let stream = tokio::time::timeout( + timeout_duration, + TcpStream::connect((ELECTRUMX_HOST, ELECTRUMX_PORT)), + ) + .await + .context("ElectrumX connection timed out")? + .context("Failed to connect to ElectrumX")?; + + let (reader_half, mut writer_half) = tokio::io::split(stream); // blockchain.headers.subscribe returns {"height": N, "hex": "..."} let req = r#"{"id":1,"method":"blockchain.headers.subscribe","params":[]} "#; - stream.write_all(req.as_bytes())?; - stream.flush()?; + tokio::time::timeout(timeout_duration, writer_half.write_all(req.as_bytes())) + .await + .context("ElectrumX write timed out")? + .context("Failed to write to ElectrumX")?; - let mut reader = BufReader::new(stream); + tokio::time::timeout(timeout_duration, writer_half.flush()) + .await + .context("ElectrumX flush timed out")? + .context("Failed to flush ElectrumX stream")?; + + let mut reader = BufReader::new(reader_half); let mut line = String::new(); - reader.read_line(&mut line)?; + tokio::time::timeout(timeout_duration, reader.read_line(&mut line)) + .await + .context("ElectrumX read timed out")? + .context("Failed to read from ElectrumX")?; + let line = line.trim(); if line.is_empty() { anyhow::bail!("Empty response from ElectrumX"); @@ -136,10 +152,10 @@ async fn bitcoin_network_height() -> Result { Ok(height) } -/// Get ElectrumX sync status. Runs blocking ElectrumX call in spawn_blocking. +/// Get ElectrumX sync status. pub async fn get_electrs_sync_status() -> ElectrsSyncStatus { - // Get index data size (non-blocking, fast filesystem stat) - let data_bytes = dir_size_bytes(ELECTRUMX_DATA_DIR); + // Get index data size + let data_bytes = dir_size_bytes(ELECTRUMX_DATA_DIR).await; let index_size = if data_bytes > 0 { Some(format_bytes(data_bytes)) } else { @@ -193,13 +209,13 @@ pub async fn get_electrs_sync_status() -> ElectrsSyncStatus { } }; - let indexed_height = match tokio::task::spawn_blocking(electrumx_indexed_height).await { - Ok(Ok(h)) => h, - Ok(Err(e)) => { + let indexed_height = match electrumx_indexed_height().await { + Ok(h) => h, + Err(e) => { // ElectrumX may not be ready on 50001 during initial sync let err_msg = e.to_string(); let err_lower = err_msg.to_lowercase(); - let (status, error) = if err_lower.contains("connect") || err_lower.contains("reset") || err_lower.contains("refused") { + let (status, error) = if err_lower.contains("connect") || err_lower.contains("reset") || err_lower.contains("refused") || err_lower.contains("timed out") { // Estimate progress from data directory size let _est_pct = if data_bytes > 0 { ((data_bytes as f64 / ESTIMATED_FULL_INDEX_BYTES) * 100.0).min(99.0) @@ -233,17 +249,6 @@ pub async fn get_electrs_sync_status() -> ElectrsSyncStatus { tor_onion: tor_onion.clone(), }; } - Err(e) => { - return ElectrsSyncStatus { - indexed_height: 0, - network_height, - progress_pct: 0.0, - status: "error".to_string(), - error: Some(format!("Task: {}", e)), - index_size, - tor_onion: tor_onion.clone(), - }; - } }; let progress_pct = if network_height > 0 { diff --git a/core/archipelago/src/node_message.rs b/core/archipelago/src/node_message.rs index b3225c58..617f6423 100644 --- a/core/archipelago/src/node_message.rs +++ b/core/archipelago/src/node_message.rs @@ -56,13 +56,19 @@ pub async fn init(data_dir: &Path) { } /// Persist current messages to disk. +/// Serializes under the lock, then writes asynchronously via spawn_blocking +/// to avoid blocking the tokio runtime. fn persist() { let guard = store().lock().unwrap_or_else(|e| e.into_inner()); let path_guard = data_path().lock().unwrap_or_else(|e| e.into_inner()); if let Some(ref path) = *path_guard { if let Ok(content) = serde_json::to_string(&*guard) { - // Use sync write — called from store functions already holding the lock - let _ = std::fs::write(path, content); + let path = path.clone(); + drop(path_guard); + drop(guard); + tokio::task::spawn(async move { + let _ = tokio::fs::write(&path, content).await; + }); } } } diff --git a/core/archipelago/src/nostr_discovery.rs b/core/archipelago/src/nostr_discovery.rs index 1355cef7..234d6854 100644 --- a/core/archipelago/src/nostr_discovery.rs +++ b/core/archipelago/src/nostr_discovery.rs @@ -51,12 +51,9 @@ async fn load_or_create_nostr_keys(identity_dir: &Path) -> Result { #[cfg(unix)] { use std::os::unix::fs::PermissionsExt; - tokio::task::spawn_blocking(move || { - std::fs::set_permissions(secret_path, std::fs::Permissions::from_mode(0o600)) - }) - .await - .context("spawn_blocking")? - .context("Failed to set Nostr key permissions")?; + fs::set_permissions(&secret_path, std::fs::Permissions::from_mode(0o600)) + .await + .context("Failed to set Nostr key permissions")?; } fs::write(&pub_path, keys.public_key().to_hex()) .await diff --git a/core/archipelago/src/port_allocator.rs b/core/archipelago/src/port_allocator.rs index a8243a37..abf87f13 100644 --- a/core/archipelago/src/port_allocator.rs +++ b/core/archipelago/src/port_allocator.rs @@ -52,11 +52,11 @@ pub struct PortAllocator { } impl PortAllocator { - pub fn new(data_dir: impl AsRef) -> Result { + pub async fn new(data_dir: impl AsRef) -> Result { let data_dir = data_dir.as_ref().to_path_buf(); let path = data_dir.join("port_allocations.json"); let allocations = if path.exists() { - let s = std::fs::read_to_string(&path) + let s = tokio::fs::read_to_string(&path).await .context("Failed to read port allocations")?; serde_json::from_str(&s).unwrap_or_default() } else { @@ -68,13 +68,13 @@ impl PortAllocator { }) } - fn save(&self) -> Result<()> { + async fn save(&self) -> Result<()> { let path = self.data_dir.join("port_allocations.json"); - std::fs::create_dir_all(&self.data_dir) + tokio::fs::create_dir_all(&self.data_dir).await .context("Failed to create data dir for port allocations")?; let s = serde_json::to_string_pretty(&self.allocations) .context("Failed to serialize port allocations")?; - std::fs::write(&path, s).context("Failed to write port allocations")?; + tokio::fs::write(&path, s).await.context("Failed to write port allocations")?; Ok(()) } @@ -94,7 +94,7 @@ impl PortAllocator { } /// Allocate a host port for an app. Uses preferred_port if available, else finds next free. - pub fn allocate( + pub async fn allocate( &mut self, app_id: &str, preferred_host_port: u16, @@ -115,7 +115,7 @@ impl PortAllocator { container_port, }, ); - self.save()?; + self.save().await?; Ok(host_port) } @@ -127,7 +127,7 @@ impl PortAllocator { } /// Allocate or return existing. Use when installing/starting an app. - pub fn allocate_or_get( + pub async fn allocate_or_get( &mut self, app_id: &str, preferred_host_port: u16, @@ -136,13 +136,13 @@ impl PortAllocator { if let Some((host, _)) = self.get(app_id) { return Ok(host); } - self.allocate(app_id, preferred_host_port, container_port) + self.allocate(app_id, preferred_host_port, container_port).await } /// Release port when app is uninstalled. - pub fn release(&mut self, app_id: &str) -> Result<()> { + pub async fn release(&mut self, app_id: &str) -> Result<()> { self.allocations.allocations.remove(app_id); - self.save()?; + self.save().await?; Ok(()) } } diff --git a/core/archipelago/src/server.rs b/core/archipelago/src/server.rs index 2d2e8ac7..4d5fa4b1 100644 --- a/core/archipelago/src/server.rs +++ b/core/archipelago/src/server.rs @@ -42,7 +42,7 @@ impl Server { data.server_info.name = Some(name); } } - data.server_info.tor_address = docker_packages::read_tor_address("archipelago"); + data.server_info.tor_address = docker_packages::read_tor_address("archipelago").await; if let Some(ref tor) = data.server_info.tor_address { data.server_info.node_address = Some(identity.node_address(tor)); } @@ -403,7 +403,7 @@ async fn create_docker_scanner(config: &Config) -> Result } async fn refresh_tor_address(state: &StateManager, identity: &NodeIdentity) -> Result<()> { - let tor_addr = docker_packages::read_tor_address("archipelago"); + let tor_addr = docker_packages::read_tor_address("archipelago").await; let (current_data, _) = state.get_snapshot().await; if tor_addr != current_data.server_info.tor_address { let mut data = current_data; @@ -426,7 +426,7 @@ async fn scan_and_update_packages( let (current_data, _) = state.get_snapshot().await; let packages_changed = !packages.is_empty() && current_data.package_data != packages; - let tor_addr = docker_packages::read_tor_address("archipelago"); + let tor_addr = docker_packages::read_tor_address("archipelago").await; let tor_changed = tor_addr != current_data.server_info.tor_address; let first_scan = !current_data.server_info.status_info.containers_scanned; diff --git a/core/archipelago/src/session.rs b/core/archipelago/src/session.rs index c86d653c..3edd0111 100644 --- a/core/archipelago/src/session.rs +++ b/core/archipelago/src/session.rs @@ -58,9 +58,9 @@ struct PersistedSession { } impl SessionStore { - pub fn new() -> Self { + pub async fn new() -> Self { let persist_path = PathBuf::from(SESSIONS_FILE); - let sessions = Self::load_from_disk(&persist_path); + let sessions = Self::load_from_disk(&persist_path).await; let count = sessions.len(); if count > 0 { tracing::info!("Restored {} sessions from disk", count); @@ -72,9 +72,9 @@ impl SessionStore { } /// Load persisted sessions from disk (only Full sessions). - fn load_from_disk(path: &Path) -> HashMap<[u8; 32], Session> { + async fn load_from_disk(path: &Path) -> HashMap<[u8; 32], Session> { let mut map = HashMap::new(); - let data = match std::fs::read_to_string(path) { + let data = match tokio::fs::read_to_string(path).await { Ok(d) => d, Err(_) => return map, }; @@ -114,7 +114,7 @@ impl SessionStore { } /// Save all Full sessions to disk. Called after mutations. - fn save_to_disk_sync(sessions: &HashMap<[u8; 32], Session>, path: &Path) { + async fn save_to_disk(sessions: &HashMap<[u8; 32], Session>, path: &Path) { let persisted: Vec = sessions .iter() .filter(|(_, s)| matches!(s.session_type, SessionType::Full)) @@ -125,7 +125,7 @@ impl SessionStore { }) .collect(); if let Ok(json) = serde_json::to_string(&persisted) { - let _ = std::fs::write(path, json); + let _ = tokio::fs::write(path, json).await; } } @@ -166,7 +166,7 @@ impl SessionStore { sessions.insert(hash, session); // Sync save — must complete before returning the token to the client. // Async save risks losing the session if the process is killed (e.g., deploy restart). - Self::save_to_disk_sync(&sessions, &self.persist_path); + Self::save_to_disk(&sessions, &self.persist_path).await; token } @@ -256,7 +256,7 @@ impl SessionStore { session_type: SessionType::Full, }, ); - Self::save_to_disk_sync(&sessions, &self.persist_path); + Self::save_to_disk(&sessions, &self.persist_path).await; Some(new_token) } else { None @@ -267,7 +267,7 @@ impl SessionStore { let hash = hash_token(token); let mut sessions = self.sessions.write().await; sessions.remove(&hash); - Self::save_to_disk_sync(&sessions, &self.persist_path); + Self::save_to_disk(&sessions, &self.persist_path).await; } /// Invalidate all sessions except the one matching the given token. @@ -276,7 +276,7 @@ impl SessionStore { let keep_hash = hash_token(keep_token); let mut sessions = self.sessions.write().await; sessions.retain(|hash, _| *hash == keep_hash); - Self::save_to_disk_sync(&sessions, &self.persist_path); + Self::save_to_disk(&sessions, &self.persist_path).await; } /// Rotate a session: invalidate the old token and create a new one. @@ -298,7 +298,7 @@ impl SessionStore { session_type: SessionType::Full, }, ); - Self::save_to_disk_sync(&sessions, &self.persist_path); + Self::save_to_disk(&sessions, &self.persist_path).await; new_token } @@ -352,8 +352,8 @@ impl SessionStore { // Format: "timestamp_hex:hmac_hex" /// Create a remember-me token. Returns the cookie value. - pub fn create_remember_token(&self) -> String { - let secret = Self::load_or_create_remember_secret(); + pub async fn create_remember_token(&self) -> String { + let secret = Self::load_or_create_remember_secret().await; let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() @@ -366,8 +366,8 @@ impl SessionStore { } /// Validate a remember-me token. Returns true if valid and not expired. - pub fn validate_remember_token(token: &str) -> bool { - let secret = match std::fs::read(REMEMBER_SECRET_FILE) { + pub async fn validate_remember_token(token: &str) -> bool { + let secret = match tokio::fs::read(REMEMBER_SECRET_FILE).await { Ok(s) if s.len() == 32 => s, _ => return false, }; @@ -408,9 +408,9 @@ impl SessionStore { now.saturating_sub(ts_bytes) < REMEMBER_TTL } - pub fn load_or_create_remember_secret() -> Vec { + pub async fn load_or_create_remember_secret() -> Vec { // Try existing secret file first - if let Ok(secret) = std::fs::read(REMEMBER_SECRET_FILE) { + if let Ok(secret) = tokio::fs::read(REMEMBER_SECRET_FILE).await { if secret.len() == 32 { return secret; } @@ -420,9 +420,9 @@ impl SessionStore { rand::rngs::OsRng.fill_bytes(&mut secret); // Ensure parent directory exists if let Some(parent) = std::path::Path::new(REMEMBER_SECRET_FILE).parent() { - let _ = std::fs::create_dir_all(parent); + let _ = tokio::fs::create_dir_all(parent).await; } - let _ = std::fs::write(REMEMBER_SECRET_FILE, &secret); + let _ = tokio::fs::write(REMEMBER_SECRET_FILE, &secret).await; secret.to_vec() } } @@ -476,6 +476,16 @@ impl LoginRateLimiter { let entry = attempts.entry(ip).or_default(); entry.push(Instant::now()); } + + /// Periodic cleanup of expired entries for IPs that are no longer active. + pub async fn cleanup(&self) { + let mut attempts = self.attempts.write().await; + let now = Instant::now(); + attempts.retain(|_, timestamps| { + timestamps.retain(|t| now.duration_since(*t).as_secs() < WINDOW_SECS); + !timestamps.is_empty() + }); + } } /// General-purpose rate limiter for sensitive endpoints.