refactor: replace blocking std::fs and TCP I/O with async tokio equivalents

- R6: Convert 6 std::fs calls in session.rs to tokio::fs async
- R7: Convert std::fs::read_to_string in docker_packages.rs to async
- R8: Convert 3 std::fs calls in port_allocator.rs to async, switch to tokio::sync::Mutex
- R9+R10+R11: Fix blocking I/O in node_message.rs and nostr_discovery.rs
- R12: Convert electrs_status.rs from sync TCP to async tokio::net with 5s timeouts
- R4+R5: Spawn periodic cleanup tasks for endpoint and login rate limiters

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dorian 2026-03-21 01:21:08 +00:00
parent 38dc845f57
commit 4d17c60da7
12 changed files with 161 additions and 117 deletions

View File

@ -31,7 +31,7 @@ impl ApiHandler {
state_manager: Arc<StateManager>, state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>, metrics_store: Arc<MetricsStore>,
) -> Result<Self> { ) -> Result<Self> {
let session_store = SessionStore::new(); let session_store = SessionStore::new().await;
let rpc_handler = Arc::new( let rpc_handler = Arc::new(
RpcHandler::new( RpcHandler::new(
config.clone(), config.clone(),

View File

@ -42,7 +42,7 @@ use anyhow::{Context, Result};
use hyper::{Request, Response, StatusCode}; use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::net::IpAddr; use std::net::IpAddr;
use std::sync::{Arc, Mutex}; use std::sync::Arc;
use tracing::{debug, error}; use tracing::{debug, error};
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
@ -164,7 +164,7 @@ pub struct RpcHandler {
orchestrator: Option<Arc<DevContainerOrchestrator>>, orchestrator: Option<Arc<DevContainerOrchestrator>>,
state_manager: Arc<StateManager>, state_manager: Arc<StateManager>,
pub(crate) metrics_store: Arc<MetricsStore>, pub(crate) metrics_store: Arc<MetricsStore>,
port_allocator: Arc<Mutex<PortAllocator>>, port_allocator: Arc<tokio::sync::Mutex<PortAllocator>>,
pub session_store: SessionStore, pub session_store: SessionStore,
login_rate_limiter: LoginRateLimiter, login_rate_limiter: LoginRateLimiter,
endpoint_rate_limiter: EndpointRateLimiter, endpoint_rate_limiter: EndpointRateLimiter,
@ -188,7 +188,32 @@ impl RpcHandler {
} else { } else {
None None
}; };
let port_allocator = Arc::new(Mutex::new(PortAllocator::new(&config.data_dir)?)); let port_allocator = Arc::new(tokio::sync::Mutex::new(PortAllocator::new(&config.data_dir).await?));
let login_rate_limiter = LoginRateLimiter::new();
let endpoint_rate_limiter = EndpointRateLimiter::new();
// Spawn periodic rate limiter cleanup (every 5 minutes)
{
let limiter = endpoint_rate_limiter.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(300));
loop {
interval.tick().await;
limiter.cleanup().await;
}
});
}
{
let limiter = login_rate_limiter.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(300));
loop {
interval.tick().await;
limiter.cleanup().await;
}
});
}
Ok(Self { Ok(Self {
config, config,
@ -198,8 +223,8 @@ impl RpcHandler {
metrics_store, metrics_store,
port_allocator, port_allocator,
session_store, session_store,
login_rate_limiter: LoginRateLimiter::new(), login_rate_limiter,
endpoint_rate_limiter: EndpointRateLimiter::new(), endpoint_rate_limiter,
response_cache: ResponseCache::new(5), response_cache: ResponseCache::new(5),
mesh_service: Arc::new(tokio::sync::RwLock::new(None)), mesh_service: Arc::new(tokio::sync::RwLock::new(None)),
transport_router: Arc::new(tokio::sync::RwLock::new(None)), transport_router: Arc::new(tokio::sync::RwLock::new(None)),
@ -253,10 +278,10 @@ impl RpcHandler {
// If session invalid, try remember-me token to auto-restore session // If session invalid, try remember-me token to auto-restore session
if !authenticated { if !authenticated {
if let Some(remember) = extract_cookie(&parts.headers, "remember") { if let Some(remember) = extract_cookie(&parts.headers, "remember") {
if crate::session::SessionStore::validate_remember_token(&remember) { if crate::session::SessionStore::validate_remember_token(&remember).await {
// Auto-create a new session from the remember-me token // Auto-create a new session from the remember-me token
let new_token = self.session_store.create().await; let new_token = self.session_store.create().await;
let new_csrf = derive_csrf_token(&new_token); let new_csrf = derive_csrf_token(&new_token).await;
tracing::info!("Auto-restored session from remember-me token"); tracing::info!("Auto-restored session from remember-me token");
new_session_cookies = Some((new_token, new_csrf)); new_session_cookies = Some((new_token, new_csrf));
authenticated = true; authenticated = true;
@ -325,7 +350,7 @@ impl RpcHandler {
use hmac::{Hmac, Mac}; use hmac::{Hmac, Mac};
use sha2::Sha256; use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>; type HmacSha256 = Hmac<Sha256>;
let secret = SessionStore::load_or_create_remember_secret(); let secret = SessionStore::load_or_create_remember_secret().await;
let mut mac = match HmacSha256::new_from_slice(&secret) { let mut mac = match HmacSha256::new_from_slice(&secret) {
Ok(m) => m, Ok(m) => m,
Err(_) => { return Ok(Response::builder().status(500).body(hyper::Body::empty()).unwrap()); } Err(_) => { return Ok(Response::builder().status(500).body(hyper::Body::empty()).unwrap()); }
@ -858,7 +883,7 @@ impl RpcHandler {
if let Ok(Some(totp_data)) = self.auth_manager.get_totp_data().await { if let Ok(Some(totp_data)) = self.auth_manager.get_totp_data().await {
if let Ok(secret) = crate::totp::decrypt_secret(&totp_data, password) { if let Ok(secret) = crate::totp::decrypt_secret(&totp_data, password) {
let token = self.session_store.create_pending(secret).await; let token = self.session_store.create_pending(secret).await;
let csrf_token = derive_csrf_token(&token); let csrf_token = derive_csrf_token(&token).await;
response.headers_mut().append( response.headers_mut().append(
"Set-Cookie", "Set-Cookie",
format!("session={}; HttpOnly; SameSite=Lax; Path=/{}", token, self.cookie_suffix()) format!("session={}; HttpOnly; SameSite=Lax; Path=/{}", token, self.cookie_suffix())
@ -884,8 +909,8 @@ impl RpcHandler {
} else { } else {
// No 2FA: create a full session immediately // No 2FA: create a full session immediately
let token = self.session_store.create().await; let token = self.session_store.create().await;
let csrf_token = derive_csrf_token(&token); let csrf_token = derive_csrf_token(&token).await;
let remember_token = self.session_store.create_remember_token(); let remember_token = self.session_store.create_remember_token().await;
response.headers_mut().append( response.headers_mut().append(
"Set-Cookie", "Set-Cookie",
format!("session={}; HttpOnly; SameSite=Lax; Path=/{}", token, self.cookie_suffix()) format!("session={}; HttpOnly; SameSite=Lax; Path=/{}", token, self.cookie_suffix())
@ -921,8 +946,8 @@ impl RpcHandler {
.map(|s| s.to_string()); .map(|s| s.to_string());
if let Some(new_token) = new_token_opt { if let Some(new_token) = new_token_opt {
let csrf_token = derive_csrf_token(&new_token); let csrf_token = derive_csrf_token(&new_token).await;
let remember_token = self.session_store.create_remember_token(); let remember_token = self.session_store.create_remember_token().await;
response.headers_mut().append( response.headers_mut().append(
"Set-Cookie", "Set-Cookie",
format!( format!(
@ -969,7 +994,7 @@ impl RpcHandler {
if rpc_req.method == "auth.changePassword" && rpc_resp.error.is_none() { if rpc_req.method == "auth.changePassword" && rpc_resp.error.is_none() {
if let Some(token) = &session_token { if let Some(token) = &session_token {
let new_token = self.session_store.rotate(token).await; let new_token = self.session_store.rotate(token).await;
let csrf_token = derive_csrf_token(&new_token); let csrf_token = derive_csrf_token(&new_token).await;
response.headers_mut().append( response.headers_mut().append(
"Set-Cookie", "Set-Cookie",
format!( format!(
@ -1059,11 +1084,11 @@ impl RpcHandler {
/// Deterministic: same session token always produces the same CSRF token. /// Deterministic: same session token always produces the same CSRF token.
/// Survives backend restarts because it depends only on the session token /// Survives backend restarts because it depends only on the session token
/// and the on-disk remember secret (not ephemeral state). /// and the on-disk remember secret (not ephemeral state).
fn derive_csrf_token(session_token: &str) -> String { async fn derive_csrf_token(session_token: &str) -> String {
use hmac::{Hmac, Mac}; use hmac::{Hmac, Mac};
use sha2::Sha256; use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>; type HmacSha256 = Hmac<Sha256>;
let secret = SessionStore::load_or_create_remember_secret(); let secret = SessionStore::load_or_create_remember_secret().await;
let mut mac = HmacSha256::new_from_slice(&secret).expect("HMAC key"); let mut mac = HmacSha256::new_from_slice(&secret).expect("HMAC key");
mac.update(format!("csrf:{}", session_token).as_bytes()); mac.update(format!("csrf:{}", session_token).as_bytes());
hex::encode(mac.finalize().into_bytes()) hex::encode(mac.finalize().into_bytes())

View File

@ -51,7 +51,7 @@ impl RpcHandler {
/// Get the current node visibility setting. /// Get the current node visibility setting.
pub(super) async fn handle_network_get_visibility(&self) -> Result<serde_json::Value> { pub(super) async fn handle_network_get_visibility(&self) -> Result<serde_json::Value> {
let vis = self.load_visibility().await; let vis = self.load_visibility().await;
let tor_address = docker_packages::read_tor_address("archipelago"); let tor_address = docker_packages::read_tor_address("archipelago").await;
Ok(serde_json::json!({ Ok(serde_json::json!({
"visibility": vis.as_str(), "visibility": vis.as_str(),
"tor_address": tor_address, "tor_address": tor_address,
@ -106,7 +106,7 @@ impl RpcHandler {
let (data, _) = self.state_manager.get_snapshot().await; let (data, _) = self.state_manager.get_snapshot().await;
let my_pubkey = &data.server_info.pubkey; let my_pubkey = &data.server_info.pubkey;
let my_did = identity::did_key_from_pubkey_hex(my_pubkey)?; let my_did = identity::did_key_from_pubkey_hex(my_pubkey)?;
let my_onion = docker_packages::read_tor_address("archipelago") let my_onion = docker_packages::read_tor_address("archipelago").await
.unwrap_or_default(); .unwrap_or_default();
let req_msg = serde_json::json!({ let req_msg = serde_json::json!({

View File

@ -71,7 +71,7 @@ impl RpcHandler {
} }
pub(super) async fn handle_node_tor_address(&self) -> Result<serde_json::Value> { pub(super) async fn handle_node_tor_address(&self) -> Result<serde_json::Value> {
let tor_address = docker_packages::read_tor_address("archipelago"); let tor_address = docker_packages::read_tor_address("archipelago").await;
Ok(serde_json::json!({ "tor_address": tor_address })) Ok(serde_json::json!({ "tor_address": tor_address }))
} }

View File

@ -198,10 +198,8 @@ impl RpcHandler {
// App-specific configuration (should come from manifest) // App-specific configuration (should come from manifest)
let (mut ports, mut volumes, env_vars, custom_command, mut custom_args) = { let (mut ports, mut volumes, env_vars, custom_command, mut custom_args) = {
let mut allocator = self.port_allocator.lock().map_err(|e| { let mut allocator = self.port_allocator.lock().await;
anyhow::anyhow!("Port allocator lock poisoned: {}", e) get_app_config(package_id, &self.config.host_ip, &mut allocator, &rpc_user, &rpc_pass).await
})?;
get_app_config(package_id, &self.config.host_ip, &mut allocator, &rpc_user, &rpc_pass)
}; };
// Fedimint Gateway: auto-detect LND and switch to lnd mode // Fedimint Gateway: auto-detect LND and switch to lnd mode
@ -846,8 +844,9 @@ printtoconsole=1\n", rpcauth_line);
} }
// Release port allocation // Release port allocation
if let Ok(mut allocator) = self.port_allocator.lock() { {
let _ = allocator.release(package_id); let mut allocator = self.port_allocator.lock().await;
let _ = allocator.release(package_id).await;
} }
// Clean data directories unless preserve_data // Clean data directories unless preserve_data
@ -1487,7 +1486,7 @@ fn get_memory_limit(app_id: &str) -> &'static str {
/// Get app-specific configuration /// Get app-specific configuration
/// Returns: (ports, volumes, env_vars, custom_command, custom_args) /// Returns: (ports, volumes, env_vars, custom_command, custom_args)
fn get_app_config( async fn get_app_config(
app_id: &str, app_id: &str,
host_ip: &str, host_ip: &str,
allocator: &mut PortAllocator, allocator: &mut PortAllocator,
@ -1625,7 +1624,7 @@ fn get_app_config(
), ),
"nextcloud" => { "nextcloud" => {
let host_port = allocator let host_port = allocator
.allocate_or_get(app_id, 8085, 80) .allocate_or_get(app_id, 8085, 80).await
.unwrap_or(8085); .unwrap_or(8085);
( (
vec![format!("{}:80", host_port)], vec![format!("{}:80", host_port)],
@ -1637,7 +1636,7 @@ fn get_app_config(
} }
"vaultwarden" => { "vaultwarden" => {
let host_port = allocator let host_port = allocator
.allocate_or_get(app_id, 8082, 80) .allocate_or_get(app_id, 8082, 80).await
.unwrap_or(8082); .unwrap_or(8082);
( (
vec![format!("{}:80", host_port)], vec![format!("{}:80", host_port)],
@ -1677,7 +1676,7 @@ fn get_app_config(
), ),
"filebrowser" => { "filebrowser" => {
let host_port = allocator let host_port = allocator
.allocate_or_get(app_id, 8083, 80) .allocate_or_get(app_id, 8083, 80).await
.unwrap_or(8083); .unwrap_or(8083);
( (
vec![format!("{}:80", host_port)], vec![format!("{}:80", host_port)],

View File

@ -141,7 +141,7 @@ impl DockerPackageScanner {
// Convert container state to package/service state // Convert container state to package/service state
let (package_state, service_status) = convert_state(&container.state); let (package_state, service_status) = convert_state(&container.state);
let tor_address = read_tor_address(&app_id); let tor_address = read_tor_address(&app_id).await;
let package = PackageDataEntry { let package = PackageDataEntry {
state: package_state.clone(), state: package_state.clone(),
@ -548,7 +548,7 @@ fn is_real_onion_address(s: &str) -> bool {
/// Read real .onion address from Tor hidden service hostname file. /// Read real .onion address from Tor hidden service hostname file.
/// Service name "archipelago" is for the main web UI (nginx port 80). /// Service name "archipelago" is for the main web UI (nginx port 80).
/// Uses TOR_DATA_DIR env var if set, else /var/lib/archipelago/tor. /// Uses TOR_DATA_DIR env var if set, else /var/lib/archipelago/tor.
pub fn read_tor_address(app_id: &str) -> Option<String> { pub async fn read_tor_address(app_id: &str) -> Option<String> {
let service = tor_service_name(app_id)?; let service = tor_service_name(app_id)?;
let base = std::env::var("TOR_DATA_DIR").unwrap_or_else(|_| "/var/lib/archipelago/tor".to_string()); let base = std::env::var("TOR_DATA_DIR").unwrap_or_else(|_| "/var/lib/archipelago/tor".to_string());
@ -558,7 +558,8 @@ pub fn read_tor_address(app_id: &str) -> Option<String> {
.unwrap_or(std::path::Path::new("/var/lib/archipelago")) .unwrap_or(std::path::Path::new("/var/lib/archipelago"))
.join("tor-hostnames") .join("tor-hostnames")
.join(service); .join(service);
if let Some(addr) = std::fs::read_to_string(&hostnames_path) if let Some(addr) = tokio::fs::read_to_string(&hostnames_path)
.await
.ok() .ok()
.map(|s| s.trim().to_string()) .map(|s| s.trim().to_string())
.filter(|s| s.ends_with(".onion") && !s.is_empty()) .filter(|s| s.ends_with(".onion") && !s.is_empty())
@ -570,7 +571,8 @@ pub fn read_tor_address(app_id: &str) -> Option<String> {
let path = std::path::Path::new(&base) let path = std::path::Path::new(&base)
.join(format!("hidden_service_{}", service)) .join(format!("hidden_service_{}", service))
.join("hostname"); .join("hostname");
std::fs::read_to_string(&path) tokio::fs::read_to_string(&path)
.await
.ok() .ok()
.map(|s| s.trim().to_string()) .map(|s| s.trim().to_string())
.filter(|s| s.ends_with(".onion") && !s.is_empty()) .filter(|s| s.ends_with(".onion") && !s.is_empty())

View File

@ -2,9 +2,9 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use serde::Serialize; use serde::Serialize;
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;
use std::time::Duration; use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
const ELECTRUMX_HOST: &str = "127.0.0.1"; const ELECTRUMX_HOST: &str = "127.0.0.1";
const ELECTRUMX_PORT: u16 = 50001; const ELECTRUMX_PORT: u16 = 50001;
@ -35,18 +35,20 @@ pub struct ElectrsSyncStatus {
} }
/// Get the total size of a directory in bytes. /// Get the total size of a directory in bytes.
fn dir_size_bytes(path: &str) -> u64 { async fn dir_size_bytes(path: &str) -> u64 {
let mut total: u64 = 0; let mut total: u64 = 0;
if let Ok(entries) = std::fs::read_dir(path) { let mut entries = match tokio::fs::read_dir(path).await {
for entry in entries.flatten() { Ok(entries) => entries,
let path = entry.path(); Err(_) => return 0,
if path.is_dir() { };
total += dir_size_bytes(&path.to_string_lossy()); while let Ok(Some(entry)) = entries.next_entry().await {
} else if let Ok(meta) = entry.metadata() { let entry_path = entry.path();
if entry_path.is_dir() {
total += Box::pin(dir_size_bytes(&entry_path.to_string_lossy())).await;
} else if let Ok(meta) = entry.metadata().await {
total += meta.len(); total += meta.len();
} }
} }
}
total total
} }
@ -62,25 +64,39 @@ fn format_bytes(bytes: u64) -> String {
} }
/// Fetch ElectrumX indexed height via Electrum protocol (TCP JSON-RPC). /// Fetch ElectrumX indexed height via Electrum protocol (TCP JSON-RPC).
fn electrumx_indexed_height() -> Result<u64> { async fn electrumx_indexed_height() -> Result<u64> {
let mut stream = TcpStream::connect((ELECTRUMX_HOST, ELECTRUMX_PORT)) let timeout_duration = Duration::from_secs(5);
let stream = tokio::time::timeout(
timeout_duration,
TcpStream::connect((ELECTRUMX_HOST, ELECTRUMX_PORT)),
)
.await
.context("ElectrumX connection timed out")?
.context("Failed to connect to ElectrumX")?; .context("Failed to connect to ElectrumX")?;
stream
.set_read_timeout(Some(Duration::from_secs(5))) let (reader_half, mut writer_half) = tokio::io::split(stream);
.context("set_read_timeout")?;
stream
.set_write_timeout(Some(Duration::from_secs(5)))
.context("set_write_timeout")?;
// blockchain.headers.subscribe returns {"height": N, "hex": "..."} // blockchain.headers.subscribe returns {"height": N, "hex": "..."}
let req = r#"{"id":1,"method":"blockchain.headers.subscribe","params":[]} let req = r#"{"id":1,"method":"blockchain.headers.subscribe","params":[]}
"#; "#;
stream.write_all(req.as_bytes())?; tokio::time::timeout(timeout_duration, writer_half.write_all(req.as_bytes()))
stream.flush()?; .await
.context("ElectrumX write timed out")?
.context("Failed to write to ElectrumX")?;
let mut reader = BufReader::new(stream); tokio::time::timeout(timeout_duration, writer_half.flush())
.await
.context("ElectrumX flush timed out")?
.context("Failed to flush ElectrumX stream")?;
let mut reader = BufReader::new(reader_half);
let mut line = String::new(); let mut line = String::new();
reader.read_line(&mut line)?; tokio::time::timeout(timeout_duration, reader.read_line(&mut line))
.await
.context("ElectrumX read timed out")?
.context("Failed to read from ElectrumX")?;
let line = line.trim(); let line = line.trim();
if line.is_empty() { if line.is_empty() {
anyhow::bail!("Empty response from ElectrumX"); anyhow::bail!("Empty response from ElectrumX");
@ -136,10 +152,10 @@ async fn bitcoin_network_height() -> Result<u64> {
Ok(height) Ok(height)
} }
/// Get ElectrumX sync status. Runs blocking ElectrumX call in spawn_blocking. /// Get ElectrumX sync status.
pub async fn get_electrs_sync_status() -> ElectrsSyncStatus { pub async fn get_electrs_sync_status() -> ElectrsSyncStatus {
// Get index data size (non-blocking, fast filesystem stat) // Get index data size
let data_bytes = dir_size_bytes(ELECTRUMX_DATA_DIR); let data_bytes = dir_size_bytes(ELECTRUMX_DATA_DIR).await;
let index_size = if data_bytes > 0 { let index_size = if data_bytes > 0 {
Some(format_bytes(data_bytes)) Some(format_bytes(data_bytes))
} else { } else {
@ -193,13 +209,13 @@ pub async fn get_electrs_sync_status() -> ElectrsSyncStatus {
} }
}; };
let indexed_height = match tokio::task::spawn_blocking(electrumx_indexed_height).await { let indexed_height = match electrumx_indexed_height().await {
Ok(Ok(h)) => h, Ok(h) => h,
Ok(Err(e)) => { Err(e) => {
// ElectrumX may not be ready on 50001 during initial sync // ElectrumX may not be ready on 50001 during initial sync
let err_msg = e.to_string(); let err_msg = e.to_string();
let err_lower = err_msg.to_lowercase(); let err_lower = err_msg.to_lowercase();
let (status, error) = if err_lower.contains("connect") || err_lower.contains("reset") || err_lower.contains("refused") { let (status, error) = if err_lower.contains("connect") || err_lower.contains("reset") || err_lower.contains("refused") || err_lower.contains("timed out") {
// Estimate progress from data directory size // Estimate progress from data directory size
let _est_pct = if data_bytes > 0 { let _est_pct = if data_bytes > 0 {
((data_bytes as f64 / ESTIMATED_FULL_INDEX_BYTES) * 100.0).min(99.0) ((data_bytes as f64 / ESTIMATED_FULL_INDEX_BYTES) * 100.0).min(99.0)
@ -233,17 +249,6 @@ pub async fn get_electrs_sync_status() -> ElectrsSyncStatus {
tor_onion: tor_onion.clone(), tor_onion: tor_onion.clone(),
}; };
} }
Err(e) => {
return ElectrsSyncStatus {
indexed_height: 0,
network_height,
progress_pct: 0.0,
status: "error".to_string(),
error: Some(format!("Task: {}", e)),
index_size,
tor_onion: tor_onion.clone(),
};
}
}; };
let progress_pct = if network_height > 0 { let progress_pct = if network_height > 0 {

View File

@ -56,13 +56,19 @@ pub async fn init(data_dir: &Path) {
} }
/// Persist current messages to disk. /// Persist current messages to disk.
/// Serializes under the lock, then writes asynchronously via spawn_blocking
/// to avoid blocking the tokio runtime.
fn persist() { fn persist() {
let guard = store().lock().unwrap_or_else(|e| e.into_inner()); let guard = store().lock().unwrap_or_else(|e| e.into_inner());
let path_guard = data_path().lock().unwrap_or_else(|e| e.into_inner()); let path_guard = data_path().lock().unwrap_or_else(|e| e.into_inner());
if let Some(ref path) = *path_guard { if let Some(ref path) = *path_guard {
if let Ok(content) = serde_json::to_string(&*guard) { if let Ok(content) = serde_json::to_string(&*guard) {
// Use sync write — called from store functions already holding the lock let path = path.clone();
let _ = std::fs::write(path, content); drop(path_guard);
drop(guard);
tokio::task::spawn(async move {
let _ = tokio::fs::write(&path, content).await;
});
} }
} }
} }

View File

@ -51,11 +51,8 @@ async fn load_or_create_nostr_keys(identity_dir: &Path) -> Result<Keys> {
#[cfg(unix)] #[cfg(unix)]
{ {
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
tokio::task::spawn_blocking(move || { fs::set_permissions(&secret_path, std::fs::Permissions::from_mode(0o600))
std::fs::set_permissions(secret_path, std::fs::Permissions::from_mode(0o600))
})
.await .await
.context("spawn_blocking")?
.context("Failed to set Nostr key permissions")?; .context("Failed to set Nostr key permissions")?;
} }
fs::write(&pub_path, keys.public_key().to_hex()) fs::write(&pub_path, keys.public_key().to_hex())

View File

@ -52,11 +52,11 @@ pub struct PortAllocator {
} }
impl PortAllocator { impl PortAllocator {
pub fn new(data_dir: impl AsRef<Path>) -> Result<Self> { pub async fn new(data_dir: impl AsRef<Path>) -> Result<Self> {
let data_dir = data_dir.as_ref().to_path_buf(); let data_dir = data_dir.as_ref().to_path_buf();
let path = data_dir.join("port_allocations.json"); let path = data_dir.join("port_allocations.json");
let allocations = if path.exists() { let allocations = if path.exists() {
let s = std::fs::read_to_string(&path) let s = tokio::fs::read_to_string(&path).await
.context("Failed to read port allocations")?; .context("Failed to read port allocations")?;
serde_json::from_str(&s).unwrap_or_default() serde_json::from_str(&s).unwrap_or_default()
} else { } else {
@ -68,13 +68,13 @@ impl PortAllocator {
}) })
} }
fn save(&self) -> Result<()> { async fn save(&self) -> Result<()> {
let path = self.data_dir.join("port_allocations.json"); let path = self.data_dir.join("port_allocations.json");
std::fs::create_dir_all(&self.data_dir) tokio::fs::create_dir_all(&self.data_dir).await
.context("Failed to create data dir for port allocations")?; .context("Failed to create data dir for port allocations")?;
let s = serde_json::to_string_pretty(&self.allocations) let s = serde_json::to_string_pretty(&self.allocations)
.context("Failed to serialize port allocations")?; .context("Failed to serialize port allocations")?;
std::fs::write(&path, s).context("Failed to write port allocations")?; tokio::fs::write(&path, s).await.context("Failed to write port allocations")?;
Ok(()) Ok(())
} }
@ -94,7 +94,7 @@ impl PortAllocator {
} }
/// Allocate a host port for an app. Uses preferred_port if available, else finds next free. /// Allocate a host port for an app. Uses preferred_port if available, else finds next free.
pub fn allocate( pub async fn allocate(
&mut self, &mut self,
app_id: &str, app_id: &str,
preferred_host_port: u16, preferred_host_port: u16,
@ -115,7 +115,7 @@ impl PortAllocator {
container_port, container_port,
}, },
); );
self.save()?; self.save().await?;
Ok(host_port) Ok(host_port)
} }
@ -127,7 +127,7 @@ impl PortAllocator {
} }
/// Allocate or return existing. Use when installing/starting an app. /// Allocate or return existing. Use when installing/starting an app.
pub fn allocate_or_get( pub async fn allocate_or_get(
&mut self, &mut self,
app_id: &str, app_id: &str,
preferred_host_port: u16, preferred_host_port: u16,
@ -136,13 +136,13 @@ impl PortAllocator {
if let Some((host, _)) = self.get(app_id) { if let Some((host, _)) = self.get(app_id) {
return Ok(host); return Ok(host);
} }
self.allocate(app_id, preferred_host_port, container_port) self.allocate(app_id, preferred_host_port, container_port).await
} }
/// Release port when app is uninstalled. /// Release port when app is uninstalled.
pub fn release(&mut self, app_id: &str) -> Result<()> { pub async fn release(&mut self, app_id: &str) -> Result<()> {
self.allocations.allocations.remove(app_id); self.allocations.allocations.remove(app_id);
self.save()?; self.save().await?;
Ok(()) Ok(())
} }
} }

View File

@ -42,7 +42,7 @@ impl Server {
data.server_info.name = Some(name); data.server_info.name = Some(name);
} }
} }
data.server_info.tor_address = docker_packages::read_tor_address("archipelago"); data.server_info.tor_address = docker_packages::read_tor_address("archipelago").await;
if let Some(ref tor) = data.server_info.tor_address { if let Some(ref tor) = data.server_info.tor_address {
data.server_info.node_address = Some(identity.node_address(tor)); data.server_info.node_address = Some(identity.node_address(tor));
} }
@ -403,7 +403,7 @@ async fn create_docker_scanner(config: &Config) -> Result<DockerPackageScanner>
} }
async fn refresh_tor_address(state: &StateManager, identity: &NodeIdentity) -> Result<()> { async fn refresh_tor_address(state: &StateManager, identity: &NodeIdentity) -> Result<()> {
let tor_addr = docker_packages::read_tor_address("archipelago"); let tor_addr = docker_packages::read_tor_address("archipelago").await;
let (current_data, _) = state.get_snapshot().await; let (current_data, _) = state.get_snapshot().await;
if tor_addr != current_data.server_info.tor_address { if tor_addr != current_data.server_info.tor_address {
let mut data = current_data; let mut data = current_data;
@ -426,7 +426,7 @@ async fn scan_and_update_packages(
let (current_data, _) = state.get_snapshot().await; let (current_data, _) = state.get_snapshot().await;
let packages_changed = !packages.is_empty() && current_data.package_data != packages; let packages_changed = !packages.is_empty() && current_data.package_data != packages;
let tor_addr = docker_packages::read_tor_address("archipelago"); let tor_addr = docker_packages::read_tor_address("archipelago").await;
let tor_changed = tor_addr != current_data.server_info.tor_address; let tor_changed = tor_addr != current_data.server_info.tor_address;
let first_scan = !current_data.server_info.status_info.containers_scanned; let first_scan = !current_data.server_info.status_info.containers_scanned;

View File

@ -58,9 +58,9 @@ struct PersistedSession {
} }
impl SessionStore { impl SessionStore {
pub fn new() -> Self { pub async fn new() -> Self {
let persist_path = PathBuf::from(SESSIONS_FILE); let persist_path = PathBuf::from(SESSIONS_FILE);
let sessions = Self::load_from_disk(&persist_path); let sessions = Self::load_from_disk(&persist_path).await;
let count = sessions.len(); let count = sessions.len();
if count > 0 { if count > 0 {
tracing::info!("Restored {} sessions from disk", count); tracing::info!("Restored {} sessions from disk", count);
@ -72,9 +72,9 @@ impl SessionStore {
} }
/// Load persisted sessions from disk (only Full sessions). /// Load persisted sessions from disk (only Full sessions).
fn load_from_disk(path: &Path) -> HashMap<[u8; 32], Session> { async fn load_from_disk(path: &Path) -> HashMap<[u8; 32], Session> {
let mut map = HashMap::new(); let mut map = HashMap::new();
let data = match std::fs::read_to_string(path) { let data = match tokio::fs::read_to_string(path).await {
Ok(d) => d, Ok(d) => d,
Err(_) => return map, Err(_) => return map,
}; };
@ -114,7 +114,7 @@ impl SessionStore {
} }
/// Save all Full sessions to disk. Called after mutations. /// Save all Full sessions to disk. Called after mutations.
fn save_to_disk_sync(sessions: &HashMap<[u8; 32], Session>, path: &Path) { async fn save_to_disk(sessions: &HashMap<[u8; 32], Session>, path: &Path) {
let persisted: Vec<PersistedSession> = sessions let persisted: Vec<PersistedSession> = sessions
.iter() .iter()
.filter(|(_, s)| matches!(s.session_type, SessionType::Full)) .filter(|(_, s)| matches!(s.session_type, SessionType::Full))
@ -125,7 +125,7 @@ impl SessionStore {
}) })
.collect(); .collect();
if let Ok(json) = serde_json::to_string(&persisted) { if let Ok(json) = serde_json::to_string(&persisted) {
let _ = std::fs::write(path, json); let _ = tokio::fs::write(path, json).await;
} }
} }
@ -166,7 +166,7 @@ impl SessionStore {
sessions.insert(hash, session); sessions.insert(hash, session);
// Sync save — must complete before returning the token to the client. // Sync save — must complete before returning the token to the client.
// Async save risks losing the session if the process is killed (e.g., deploy restart). // Async save risks losing the session if the process is killed (e.g., deploy restart).
Self::save_to_disk_sync(&sessions, &self.persist_path); Self::save_to_disk(&sessions, &self.persist_path).await;
token token
} }
@ -256,7 +256,7 @@ impl SessionStore {
session_type: SessionType::Full, session_type: SessionType::Full,
}, },
); );
Self::save_to_disk_sync(&sessions, &self.persist_path); Self::save_to_disk(&sessions, &self.persist_path).await;
Some(new_token) Some(new_token)
} else { } else {
None None
@ -267,7 +267,7 @@ impl SessionStore {
let hash = hash_token(token); let hash = hash_token(token);
let mut sessions = self.sessions.write().await; let mut sessions = self.sessions.write().await;
sessions.remove(&hash); sessions.remove(&hash);
Self::save_to_disk_sync(&sessions, &self.persist_path); Self::save_to_disk(&sessions, &self.persist_path).await;
} }
/// Invalidate all sessions except the one matching the given token. /// Invalidate all sessions except the one matching the given token.
@ -276,7 +276,7 @@ impl SessionStore {
let keep_hash = hash_token(keep_token); let keep_hash = hash_token(keep_token);
let mut sessions = self.sessions.write().await; let mut sessions = self.sessions.write().await;
sessions.retain(|hash, _| *hash == keep_hash); sessions.retain(|hash, _| *hash == keep_hash);
Self::save_to_disk_sync(&sessions, &self.persist_path); Self::save_to_disk(&sessions, &self.persist_path).await;
} }
/// Rotate a session: invalidate the old token and create a new one. /// Rotate a session: invalidate the old token and create a new one.
@ -298,7 +298,7 @@ impl SessionStore {
session_type: SessionType::Full, session_type: SessionType::Full,
}, },
); );
Self::save_to_disk_sync(&sessions, &self.persist_path); Self::save_to_disk(&sessions, &self.persist_path).await;
new_token new_token
} }
@ -352,8 +352,8 @@ impl SessionStore {
// Format: "timestamp_hex:hmac_hex" // Format: "timestamp_hex:hmac_hex"
/// Create a remember-me token. Returns the cookie value. /// Create a remember-me token. Returns the cookie value.
pub fn create_remember_token(&self) -> String { pub async fn create_remember_token(&self) -> String {
let secret = Self::load_or_create_remember_secret(); let secret = Self::load_or_create_remember_secret().await;
let now = SystemTime::now() let now = SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.unwrap_or_default() .unwrap_or_default()
@ -366,8 +366,8 @@ impl SessionStore {
} }
/// Validate a remember-me token. Returns true if valid and not expired. /// Validate a remember-me token. Returns true if valid and not expired.
pub fn validate_remember_token(token: &str) -> bool { pub async fn validate_remember_token(token: &str) -> bool {
let secret = match std::fs::read(REMEMBER_SECRET_FILE) { let secret = match tokio::fs::read(REMEMBER_SECRET_FILE).await {
Ok(s) if s.len() == 32 => s, Ok(s) if s.len() == 32 => s,
_ => return false, _ => return false,
}; };
@ -408,9 +408,9 @@ impl SessionStore {
now.saturating_sub(ts_bytes) < REMEMBER_TTL now.saturating_sub(ts_bytes) < REMEMBER_TTL
} }
pub fn load_or_create_remember_secret() -> Vec<u8> { pub async fn load_or_create_remember_secret() -> Vec<u8> {
// Try existing secret file first // Try existing secret file first
if let Ok(secret) = std::fs::read(REMEMBER_SECRET_FILE) { if let Ok(secret) = tokio::fs::read(REMEMBER_SECRET_FILE).await {
if secret.len() == 32 { if secret.len() == 32 {
return secret; return secret;
} }
@ -420,9 +420,9 @@ impl SessionStore {
rand::rngs::OsRng.fill_bytes(&mut secret); rand::rngs::OsRng.fill_bytes(&mut secret);
// Ensure parent directory exists // Ensure parent directory exists
if let Some(parent) = std::path::Path::new(REMEMBER_SECRET_FILE).parent() { if let Some(parent) = std::path::Path::new(REMEMBER_SECRET_FILE).parent() {
let _ = std::fs::create_dir_all(parent); let _ = tokio::fs::create_dir_all(parent).await;
} }
let _ = std::fs::write(REMEMBER_SECRET_FILE, &secret); let _ = tokio::fs::write(REMEMBER_SECRET_FILE, &secret).await;
secret.to_vec() secret.to_vec()
} }
} }
@ -476,6 +476,16 @@ impl LoginRateLimiter {
let entry = attempts.entry(ip).or_default(); let entry = attempts.entry(ip).or_default();
entry.push(Instant::now()); entry.push(Instant::now());
} }
/// Periodic cleanup of expired entries for IPs that are no longer active.
pub async fn cleanup(&self) {
let mut attempts = self.attempts.write().await;
let now = Instant::now();
attempts.retain(|_, timestamps| {
timestamps.retain(|t| now.duration_since(*t).as_secs() < WINDOW_SECS);
!timestamps.is_empty()
});
}
} }
/// General-purpose rate limiter for sensitive endpoints. /// General-purpose rate limiter for sensitive endpoints.