refactor: replace blocking std::fs and TCP I/O with async tokio equivalents

- R6: Convert 6 std::fs calls in session.rs to tokio::fs async
- R7: Convert std::fs::read_to_string in docker_packages.rs to async
- R8: Convert 3 std::fs calls in port_allocator.rs to async, switch to tokio::sync::Mutex
- R9+R10+R11: Fix blocking I/O in node_message.rs and nostr_discovery.rs
- R12: Convert electrs_status.rs from sync TCP to async tokio::net with 5s timeouts
- R4+R5: Spawn periodic cleanup tasks for endpoint and login rate limiters

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dorian 2026-03-21 01:21:08 +00:00
parent 1d98de24d0
commit 2443ae6bba
12 changed files with 161 additions and 117 deletions

View File

@ -31,7 +31,7 @@ impl ApiHandler {
state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>,
) -> Result<Self> {
let session_store = SessionStore::new();
let session_store = SessionStore::new().await;
let rpc_handler = Arc::new(
RpcHandler::new(
config.clone(),

View File

@ -42,7 +42,7 @@ use anyhow::{Context, Result};
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use tracing::{debug, error};
#[derive(Debug, Deserialize)]
@ -164,7 +164,7 @@ pub struct RpcHandler {
orchestrator: Option<Arc<DevContainerOrchestrator>>,
state_manager: Arc<StateManager>,
pub(crate) metrics_store: Arc<MetricsStore>,
port_allocator: Arc<Mutex<PortAllocator>>,
port_allocator: Arc<tokio::sync::Mutex<PortAllocator>>,
pub session_store: SessionStore,
login_rate_limiter: LoginRateLimiter,
endpoint_rate_limiter: EndpointRateLimiter,
@ -188,7 +188,32 @@ impl RpcHandler {
} else {
None
};
let port_allocator = Arc::new(Mutex::new(PortAllocator::new(&config.data_dir)?));
let port_allocator = Arc::new(tokio::sync::Mutex::new(PortAllocator::new(&config.data_dir).await?));
let login_rate_limiter = LoginRateLimiter::new();
let endpoint_rate_limiter = EndpointRateLimiter::new();
// Spawn periodic rate limiter cleanup (every 5 minutes)
{
let limiter = endpoint_rate_limiter.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(300));
loop {
interval.tick().await;
limiter.cleanup().await;
}
});
}
{
let limiter = login_rate_limiter.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(300));
loop {
interval.tick().await;
limiter.cleanup().await;
}
});
}
Ok(Self {
config,
@ -198,8 +223,8 @@ impl RpcHandler {
metrics_store,
port_allocator,
session_store,
login_rate_limiter: LoginRateLimiter::new(),
endpoint_rate_limiter: EndpointRateLimiter::new(),
login_rate_limiter,
endpoint_rate_limiter,
response_cache: ResponseCache::new(5),
mesh_service: Arc::new(tokio::sync::RwLock::new(None)),
transport_router: Arc::new(tokio::sync::RwLock::new(None)),
@ -253,10 +278,10 @@ impl RpcHandler {
// If session invalid, try remember-me token to auto-restore session
if !authenticated {
if let Some(remember) = extract_cookie(&parts.headers, "remember") {
if crate::session::SessionStore::validate_remember_token(&remember) {
if crate::session::SessionStore::validate_remember_token(&remember).await {
// Auto-create a new session from the remember-me token
let new_token = self.session_store.create().await;
let new_csrf = derive_csrf_token(&new_token);
let new_csrf = derive_csrf_token(&new_token).await;
tracing::info!("Auto-restored session from remember-me token");
new_session_cookies = Some((new_token, new_csrf));
authenticated = true;
@ -325,7 +350,7 @@ impl RpcHandler {
use hmac::{Hmac, Mac};
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
let secret = SessionStore::load_or_create_remember_secret();
let secret = SessionStore::load_or_create_remember_secret().await;
let mut mac = match HmacSha256::new_from_slice(&secret) {
Ok(m) => m,
Err(_) => { return Ok(Response::builder().status(500).body(hyper::Body::empty()).unwrap()); }
@ -858,7 +883,7 @@ impl RpcHandler {
if let Ok(Some(totp_data)) = self.auth_manager.get_totp_data().await {
if let Ok(secret) = crate::totp::decrypt_secret(&totp_data, password) {
let token = self.session_store.create_pending(secret).await;
let csrf_token = derive_csrf_token(&token);
let csrf_token = derive_csrf_token(&token).await;
response.headers_mut().append(
"Set-Cookie",
format!("session={}; HttpOnly; SameSite=Lax; Path=/{}", token, self.cookie_suffix())
@ -884,8 +909,8 @@ impl RpcHandler {
} else {
// No 2FA: create a full session immediately
let token = self.session_store.create().await;
let csrf_token = derive_csrf_token(&token);
let remember_token = self.session_store.create_remember_token();
let csrf_token = derive_csrf_token(&token).await;
let remember_token = self.session_store.create_remember_token().await;
response.headers_mut().append(
"Set-Cookie",
format!("session={}; HttpOnly; SameSite=Lax; Path=/{}", token, self.cookie_suffix())
@ -921,8 +946,8 @@ impl RpcHandler {
.map(|s| s.to_string());
if let Some(new_token) = new_token_opt {
let csrf_token = derive_csrf_token(&new_token);
let remember_token = self.session_store.create_remember_token();
let csrf_token = derive_csrf_token(&new_token).await;
let remember_token = self.session_store.create_remember_token().await;
response.headers_mut().append(
"Set-Cookie",
format!(
@ -969,7 +994,7 @@ impl RpcHandler {
if rpc_req.method == "auth.changePassword" && rpc_resp.error.is_none() {
if let Some(token) = &session_token {
let new_token = self.session_store.rotate(token).await;
let csrf_token = derive_csrf_token(&new_token);
let csrf_token = derive_csrf_token(&new_token).await;
response.headers_mut().append(
"Set-Cookie",
format!(
@ -1059,11 +1084,11 @@ impl RpcHandler {
/// Deterministic: same session token always produces the same CSRF token.
/// Survives backend restarts because it depends only on the session token
/// and the on-disk remember secret (not ephemeral state).
fn derive_csrf_token(session_token: &str) -> String {
async fn derive_csrf_token(session_token: &str) -> String {
use hmac::{Hmac, Mac};
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
let secret = SessionStore::load_or_create_remember_secret();
let secret = SessionStore::load_or_create_remember_secret().await;
let mut mac = HmacSha256::new_from_slice(&secret).expect("HMAC key");
mac.update(format!("csrf:{}", session_token).as_bytes());
hex::encode(mac.finalize().into_bytes())

View File

@ -51,7 +51,7 @@ impl RpcHandler {
/// Get the current node visibility setting.
pub(super) async fn handle_network_get_visibility(&self) -> Result<serde_json::Value> {
let vis = self.load_visibility().await;
let tor_address = docker_packages::read_tor_address("archipelago");
let tor_address = docker_packages::read_tor_address("archipelago").await;
Ok(serde_json::json!({
"visibility": vis.as_str(),
"tor_address": tor_address,
@ -106,7 +106,7 @@ impl RpcHandler {
let (data, _) = self.state_manager.get_snapshot().await;
let my_pubkey = &data.server_info.pubkey;
let my_did = identity::did_key_from_pubkey_hex(my_pubkey)?;
let my_onion = docker_packages::read_tor_address("archipelago")
let my_onion = docker_packages::read_tor_address("archipelago").await
.unwrap_or_default();
let req_msg = serde_json::json!({

View File

@ -71,7 +71,7 @@ impl RpcHandler {
}
pub(super) async fn handle_node_tor_address(&self) -> Result<serde_json::Value> {
let tor_address = docker_packages::read_tor_address("archipelago");
let tor_address = docker_packages::read_tor_address("archipelago").await;
Ok(serde_json::json!({ "tor_address": tor_address }))
}

View File

@ -198,10 +198,8 @@ impl RpcHandler {
// App-specific configuration (should come from manifest)
let (mut ports, mut volumes, env_vars, custom_command, mut custom_args) = {
let mut allocator = self.port_allocator.lock().map_err(|e| {
anyhow::anyhow!("Port allocator lock poisoned: {}", e)
})?;
get_app_config(package_id, &self.config.host_ip, &mut allocator, &rpc_user, &rpc_pass)
let mut allocator = self.port_allocator.lock().await;
get_app_config(package_id, &self.config.host_ip, &mut allocator, &rpc_user, &rpc_pass).await
};
// Fedimint Gateway: auto-detect LND and switch to lnd mode
@ -846,8 +844,9 @@ printtoconsole=1\n", rpcauth_line);
}
// Release port allocation
if let Ok(mut allocator) = self.port_allocator.lock() {
let _ = allocator.release(package_id);
{
let mut allocator = self.port_allocator.lock().await;
let _ = allocator.release(package_id).await;
}
// Clean data directories unless preserve_data
@ -1487,7 +1486,7 @@ fn get_memory_limit(app_id: &str) -> &'static str {
/// Get app-specific configuration
/// Returns: (ports, volumes, env_vars, custom_command, custom_args)
fn get_app_config(
async fn get_app_config(
app_id: &str,
host_ip: &str,
allocator: &mut PortAllocator,
@ -1625,7 +1624,7 @@ fn get_app_config(
),
"nextcloud" => {
let host_port = allocator
.allocate_or_get(app_id, 8085, 80)
.allocate_or_get(app_id, 8085, 80).await
.unwrap_or(8085);
(
vec![format!("{}:80", host_port)],
@ -1637,7 +1636,7 @@ fn get_app_config(
}
"vaultwarden" => {
let host_port = allocator
.allocate_or_get(app_id, 8082, 80)
.allocate_or_get(app_id, 8082, 80).await
.unwrap_or(8082);
(
vec![format!("{}:80", host_port)],
@ -1677,7 +1676,7 @@ fn get_app_config(
),
"filebrowser" => {
let host_port = allocator
.allocate_or_get(app_id, 8083, 80)
.allocate_or_get(app_id, 8083, 80).await
.unwrap_or(8083);
(
vec![format!("{}:80", host_port)],

View File

@ -141,7 +141,7 @@ impl DockerPackageScanner {
// Convert container state to package/service state
let (package_state, service_status) = convert_state(&container.state);
let tor_address = read_tor_address(&app_id);
let tor_address = read_tor_address(&app_id).await;
let package = PackageDataEntry {
state: package_state.clone(),
@ -548,7 +548,7 @@ fn is_real_onion_address(s: &str) -> bool {
/// Read real .onion address from Tor hidden service hostname file.
/// Service name "archipelago" is for the main web UI (nginx port 80).
/// Uses TOR_DATA_DIR env var if set, else /var/lib/archipelago/tor.
pub fn read_tor_address(app_id: &str) -> Option<String> {
pub async fn read_tor_address(app_id: &str) -> Option<String> {
let service = tor_service_name(app_id)?;
let base = std::env::var("TOR_DATA_DIR").unwrap_or_else(|_| "/var/lib/archipelago/tor".to_string());
@ -558,7 +558,8 @@ pub fn read_tor_address(app_id: &str) -> Option<String> {
.unwrap_or(std::path::Path::new("/var/lib/archipelago"))
.join("tor-hostnames")
.join(service);
if let Some(addr) = std::fs::read_to_string(&hostnames_path)
if let Some(addr) = tokio::fs::read_to_string(&hostnames_path)
.await
.ok()
.map(|s| s.trim().to_string())
.filter(|s| s.ends_with(".onion") && !s.is_empty())
@ -570,7 +571,8 @@ pub fn read_tor_address(app_id: &str) -> Option<String> {
let path = std::path::Path::new(&base)
.join(format!("hidden_service_{}", service))
.join("hostname");
std::fs::read_to_string(&path)
tokio::fs::read_to_string(&path)
.await
.ok()
.map(|s| s.trim().to_string())
.filter(|s| s.ends_with(".onion") && !s.is_empty())

View File

@ -2,9 +2,9 @@
use anyhow::{Context, Result};
use serde::Serialize;
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
const ELECTRUMX_HOST: &str = "127.0.0.1";
const ELECTRUMX_PORT: u16 = 50001;
@ -35,16 +35,18 @@ pub struct ElectrsSyncStatus {
}
/// Get the total size of a directory in bytes.
fn dir_size_bytes(path: &str) -> u64 {
async fn dir_size_bytes(path: &str) -> u64 {
let mut total: u64 = 0;
if let Ok(entries) = std::fs::read_dir(path) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
total += dir_size_bytes(&path.to_string_lossy());
} else if let Ok(meta) = entry.metadata() {
total += meta.len();
}
let mut entries = match tokio::fs::read_dir(path).await {
Ok(entries) => entries,
Err(_) => return 0,
};
while let Ok(Some(entry)) = entries.next_entry().await {
let entry_path = entry.path();
if entry_path.is_dir() {
total += Box::pin(dir_size_bytes(&entry_path.to_string_lossy())).await;
} else if let Ok(meta) = entry.metadata().await {
total += meta.len();
}
}
total
@ -62,25 +64,39 @@ fn format_bytes(bytes: u64) -> String {
}
/// Fetch ElectrumX indexed height via Electrum protocol (TCP JSON-RPC).
fn electrumx_indexed_height() -> Result<u64> {
let mut stream = TcpStream::connect((ELECTRUMX_HOST, ELECTRUMX_PORT))
.context("Failed to connect to ElectrumX")?;
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.context("set_read_timeout")?;
stream
.set_write_timeout(Some(Duration::from_secs(5)))
.context("set_write_timeout")?;
async fn electrumx_indexed_height() -> Result<u64> {
let timeout_duration = Duration::from_secs(5);
let stream = tokio::time::timeout(
timeout_duration,
TcpStream::connect((ELECTRUMX_HOST, ELECTRUMX_PORT)),
)
.await
.context("ElectrumX connection timed out")?
.context("Failed to connect to ElectrumX")?;
let (reader_half, mut writer_half) = tokio::io::split(stream);
// blockchain.headers.subscribe returns {"height": N, "hex": "..."}
let req = r#"{"id":1,"method":"blockchain.headers.subscribe","params":[]}
"#;
stream.write_all(req.as_bytes())?;
stream.flush()?;
tokio::time::timeout(timeout_duration, writer_half.write_all(req.as_bytes()))
.await
.context("ElectrumX write timed out")?
.context("Failed to write to ElectrumX")?;
let mut reader = BufReader::new(stream);
tokio::time::timeout(timeout_duration, writer_half.flush())
.await
.context("ElectrumX flush timed out")?
.context("Failed to flush ElectrumX stream")?;
let mut reader = BufReader::new(reader_half);
let mut line = String::new();
reader.read_line(&mut line)?;
tokio::time::timeout(timeout_duration, reader.read_line(&mut line))
.await
.context("ElectrumX read timed out")?
.context("Failed to read from ElectrumX")?;
let line = line.trim();
if line.is_empty() {
anyhow::bail!("Empty response from ElectrumX");
@ -136,10 +152,10 @@ async fn bitcoin_network_height() -> Result<u64> {
Ok(height)
}
/// Get ElectrumX sync status. Runs blocking ElectrumX call in spawn_blocking.
/// Get ElectrumX sync status.
pub async fn get_electrs_sync_status() -> ElectrsSyncStatus {
// Get index data size (non-blocking, fast filesystem stat)
let data_bytes = dir_size_bytes(ELECTRUMX_DATA_DIR);
// Get index data size
let data_bytes = dir_size_bytes(ELECTRUMX_DATA_DIR).await;
let index_size = if data_bytes > 0 {
Some(format_bytes(data_bytes))
} else {
@ -193,13 +209,13 @@ pub async fn get_electrs_sync_status() -> ElectrsSyncStatus {
}
};
let indexed_height = match tokio::task::spawn_blocking(electrumx_indexed_height).await {
Ok(Ok(h)) => h,
Ok(Err(e)) => {
let indexed_height = match electrumx_indexed_height().await {
Ok(h) => h,
Err(e) => {
// ElectrumX may not be ready on 50001 during initial sync
let err_msg = e.to_string();
let err_lower = err_msg.to_lowercase();
let (status, error) = if err_lower.contains("connect") || err_lower.contains("reset") || err_lower.contains("refused") {
let (status, error) = if err_lower.contains("connect") || err_lower.contains("reset") || err_lower.contains("refused") || err_lower.contains("timed out") {
// Estimate progress from data directory size
let _est_pct = if data_bytes > 0 {
((data_bytes as f64 / ESTIMATED_FULL_INDEX_BYTES) * 100.0).min(99.0)
@ -233,17 +249,6 @@ pub async fn get_electrs_sync_status() -> ElectrsSyncStatus {
tor_onion: tor_onion.clone(),
};
}
Err(e) => {
return ElectrsSyncStatus {
indexed_height: 0,
network_height,
progress_pct: 0.0,
status: "error".to_string(),
error: Some(format!("Task: {}", e)),
index_size,
tor_onion: tor_onion.clone(),
};
}
};
let progress_pct = if network_height > 0 {

View File

@ -56,13 +56,19 @@ pub async fn init(data_dir: &Path) {
}
/// Persist current messages to disk.
/// Serializes under the lock, then writes asynchronously via spawn_blocking
/// to avoid blocking the tokio runtime.
fn persist() {
let guard = store().lock().unwrap_or_else(|e| e.into_inner());
let path_guard = data_path().lock().unwrap_or_else(|e| e.into_inner());
if let Some(ref path) = *path_guard {
if let Ok(content) = serde_json::to_string(&*guard) {
// Use sync write — called from store functions already holding the lock
let _ = std::fs::write(path, content);
let path = path.clone();
drop(path_guard);
drop(guard);
tokio::task::spawn(async move {
let _ = tokio::fs::write(&path, content).await;
});
}
}
}

View File

@ -51,12 +51,9 @@ async fn load_or_create_nostr_keys(identity_dir: &Path) -> Result<Keys> {
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
tokio::task::spawn_blocking(move || {
std::fs::set_permissions(secret_path, std::fs::Permissions::from_mode(0o600))
})
.await
.context("spawn_blocking")?
.context("Failed to set Nostr key permissions")?;
fs::set_permissions(&secret_path, std::fs::Permissions::from_mode(0o600))
.await
.context("Failed to set Nostr key permissions")?;
}
fs::write(&pub_path, keys.public_key().to_hex())
.await

View File

@ -52,11 +52,11 @@ pub struct PortAllocator {
}
impl PortAllocator {
pub fn new(data_dir: impl AsRef<Path>) -> Result<Self> {
pub async fn new(data_dir: impl AsRef<Path>) -> Result<Self> {
let data_dir = data_dir.as_ref().to_path_buf();
let path = data_dir.join("port_allocations.json");
let allocations = if path.exists() {
let s = std::fs::read_to_string(&path)
let s = tokio::fs::read_to_string(&path).await
.context("Failed to read port allocations")?;
serde_json::from_str(&s).unwrap_or_default()
} else {
@ -68,13 +68,13 @@ impl PortAllocator {
})
}
fn save(&self) -> Result<()> {
async fn save(&self) -> Result<()> {
let path = self.data_dir.join("port_allocations.json");
std::fs::create_dir_all(&self.data_dir)
tokio::fs::create_dir_all(&self.data_dir).await
.context("Failed to create data dir for port allocations")?;
let s = serde_json::to_string_pretty(&self.allocations)
.context("Failed to serialize port allocations")?;
std::fs::write(&path, s).context("Failed to write port allocations")?;
tokio::fs::write(&path, s).await.context("Failed to write port allocations")?;
Ok(())
}
@ -94,7 +94,7 @@ impl PortAllocator {
}
/// Allocate a host port for an app. Uses preferred_port if available, else finds next free.
pub fn allocate(
pub async fn allocate(
&mut self,
app_id: &str,
preferred_host_port: u16,
@ -115,7 +115,7 @@ impl PortAllocator {
container_port,
},
);
self.save()?;
self.save().await?;
Ok(host_port)
}
@ -127,7 +127,7 @@ impl PortAllocator {
}
/// Allocate or return existing. Use when installing/starting an app.
pub fn allocate_or_get(
pub async fn allocate_or_get(
&mut self,
app_id: &str,
preferred_host_port: u16,
@ -136,13 +136,13 @@ impl PortAllocator {
if let Some((host, _)) = self.get(app_id) {
return Ok(host);
}
self.allocate(app_id, preferred_host_port, container_port)
self.allocate(app_id, preferred_host_port, container_port).await
}
/// Release port when app is uninstalled.
pub fn release(&mut self, app_id: &str) -> Result<()> {
pub async fn release(&mut self, app_id: &str) -> Result<()> {
self.allocations.allocations.remove(app_id);
self.save()?;
self.save().await?;
Ok(())
}
}

View File

@ -42,7 +42,7 @@ impl Server {
data.server_info.name = Some(name);
}
}
data.server_info.tor_address = docker_packages::read_tor_address("archipelago");
data.server_info.tor_address = docker_packages::read_tor_address("archipelago").await;
if let Some(ref tor) = data.server_info.tor_address {
data.server_info.node_address = Some(identity.node_address(tor));
}
@ -403,7 +403,7 @@ async fn create_docker_scanner(config: &Config) -> Result<DockerPackageScanner>
}
async fn refresh_tor_address(state: &StateManager, identity: &NodeIdentity) -> Result<()> {
let tor_addr = docker_packages::read_tor_address("archipelago");
let tor_addr = docker_packages::read_tor_address("archipelago").await;
let (current_data, _) = state.get_snapshot().await;
if tor_addr != current_data.server_info.tor_address {
let mut data = current_data;
@ -426,7 +426,7 @@ async fn scan_and_update_packages(
let (current_data, _) = state.get_snapshot().await;
let packages_changed = !packages.is_empty() && current_data.package_data != packages;
let tor_addr = docker_packages::read_tor_address("archipelago");
let tor_addr = docker_packages::read_tor_address("archipelago").await;
let tor_changed = tor_addr != current_data.server_info.tor_address;
let first_scan = !current_data.server_info.status_info.containers_scanned;

View File

@ -58,9 +58,9 @@ struct PersistedSession {
}
impl SessionStore {
pub fn new() -> Self {
pub async fn new() -> Self {
let persist_path = PathBuf::from(SESSIONS_FILE);
let sessions = Self::load_from_disk(&persist_path);
let sessions = Self::load_from_disk(&persist_path).await;
let count = sessions.len();
if count > 0 {
tracing::info!("Restored {} sessions from disk", count);
@ -72,9 +72,9 @@ impl SessionStore {
}
/// Load persisted sessions from disk (only Full sessions).
fn load_from_disk(path: &Path) -> HashMap<[u8; 32], Session> {
async fn load_from_disk(path: &Path) -> HashMap<[u8; 32], Session> {
let mut map = HashMap::new();
let data = match std::fs::read_to_string(path) {
let data = match tokio::fs::read_to_string(path).await {
Ok(d) => d,
Err(_) => return map,
};
@ -114,7 +114,7 @@ impl SessionStore {
}
/// Save all Full sessions to disk. Called after mutations.
fn save_to_disk_sync(sessions: &HashMap<[u8; 32], Session>, path: &Path) {
async fn save_to_disk(sessions: &HashMap<[u8; 32], Session>, path: &Path) {
let persisted: Vec<PersistedSession> = sessions
.iter()
.filter(|(_, s)| matches!(s.session_type, SessionType::Full))
@ -125,7 +125,7 @@ impl SessionStore {
})
.collect();
if let Ok(json) = serde_json::to_string(&persisted) {
let _ = std::fs::write(path, json);
let _ = tokio::fs::write(path, json).await;
}
}
@ -166,7 +166,7 @@ impl SessionStore {
sessions.insert(hash, session);
// Sync save — must complete before returning the token to the client.
// Async save risks losing the session if the process is killed (e.g., deploy restart).
Self::save_to_disk_sync(&sessions, &self.persist_path);
Self::save_to_disk(&sessions, &self.persist_path).await;
token
}
@ -256,7 +256,7 @@ impl SessionStore {
session_type: SessionType::Full,
},
);
Self::save_to_disk_sync(&sessions, &self.persist_path);
Self::save_to_disk(&sessions, &self.persist_path).await;
Some(new_token)
} else {
None
@ -267,7 +267,7 @@ impl SessionStore {
let hash = hash_token(token);
let mut sessions = self.sessions.write().await;
sessions.remove(&hash);
Self::save_to_disk_sync(&sessions, &self.persist_path);
Self::save_to_disk(&sessions, &self.persist_path).await;
}
/// Invalidate all sessions except the one matching the given token.
@ -276,7 +276,7 @@ impl SessionStore {
let keep_hash = hash_token(keep_token);
let mut sessions = self.sessions.write().await;
sessions.retain(|hash, _| *hash == keep_hash);
Self::save_to_disk_sync(&sessions, &self.persist_path);
Self::save_to_disk(&sessions, &self.persist_path).await;
}
/// Rotate a session: invalidate the old token and create a new one.
@ -298,7 +298,7 @@ impl SessionStore {
session_type: SessionType::Full,
},
);
Self::save_to_disk_sync(&sessions, &self.persist_path);
Self::save_to_disk(&sessions, &self.persist_path).await;
new_token
}
@ -352,8 +352,8 @@ impl SessionStore {
// Format: "timestamp_hex:hmac_hex"
/// Create a remember-me token. Returns the cookie value.
pub fn create_remember_token(&self) -> String {
let secret = Self::load_or_create_remember_secret();
pub async fn create_remember_token(&self) -> String {
let secret = Self::load_or_create_remember_secret().await;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
@ -366,8 +366,8 @@ impl SessionStore {
}
/// Validate a remember-me token. Returns true if valid and not expired.
pub fn validate_remember_token(token: &str) -> bool {
let secret = match std::fs::read(REMEMBER_SECRET_FILE) {
pub async fn validate_remember_token(token: &str) -> bool {
let secret = match tokio::fs::read(REMEMBER_SECRET_FILE).await {
Ok(s) if s.len() == 32 => s,
_ => return false,
};
@ -408,9 +408,9 @@ impl SessionStore {
now.saturating_sub(ts_bytes) < REMEMBER_TTL
}
pub fn load_or_create_remember_secret() -> Vec<u8> {
pub async fn load_or_create_remember_secret() -> Vec<u8> {
// Try existing secret file first
if let Ok(secret) = std::fs::read(REMEMBER_SECRET_FILE) {
if let Ok(secret) = tokio::fs::read(REMEMBER_SECRET_FILE).await {
if secret.len() == 32 {
return secret;
}
@ -420,9 +420,9 @@ impl SessionStore {
rand::rngs::OsRng.fill_bytes(&mut secret);
// Ensure parent directory exists
if let Some(parent) = std::path::Path::new(REMEMBER_SECRET_FILE).parent() {
let _ = std::fs::create_dir_all(parent);
let _ = tokio::fs::create_dir_all(parent).await;
}
let _ = std::fs::write(REMEMBER_SECRET_FILE, &secret);
let _ = tokio::fs::write(REMEMBER_SECRET_FILE, &secret).await;
secret.to_vec()
}
}
@ -476,6 +476,16 @@ impl LoginRateLimiter {
let entry = attempts.entry(ip).or_default();
entry.push(Instant::now());
}
/// Periodic cleanup of expired entries for IPs that are no longer active.
pub async fn cleanup(&self) {
let mut attempts = self.attempts.write().await;
let now = Instant::now();
attempts.retain(|_, timestamps| {
timestamps.retain(|t| now.duration_since(*t).as_secs() < WINDOW_SECS);
!timestamps.is_empty()
});
}
}
/// General-purpose rate limiter for sensitive endpoints.