archy/core/archipelago/src/session.rs
Dorian de6e25221c feat: add per-endpoint rate limiting for sensitive operations (PENTEST-04)
New EndpointRateLimiter in session.rs tracks requests per (method, IP)
with configurable limits and time windows:

Financial operations (5 req/5min):
- wallet.send, lnd.sendcoins, lnd.payinvoice, lnd.create-psbt,
  lnd.finalize-psbt, wallet.ecash-send

Channel operations (3 req/5min):
- lnd.openchannel, lnd.closechannel

Backup operations (2-3 req/10min):
- backup.create, backup.restore

Container/package installs (5 req/5min):
- container-install, package.install

System operations (2 req/5min):
- system.reboot, system.shutdown, update.apply

Identity/auth (3-10 req/5min):
- identity.create, identity.issue-credential, auth.changePassword

Returns HTTP 429 with Retry-After header when limits exceeded.
Verified on live server: auth.changePassword blocks at 4th request,
lnd.sendcoins blocks at 6th request.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 14:46:25 +00:00

586 lines
19 KiB
Rust

use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
use zeroize::Zeroize;
const FULL_SESSION_TTL: u64 = 86400; // 24 hours of inactivity
const PENDING_SESSION_TTL: u64 = 300; // 5 minutes
const MAX_TOTP_ATTEMPTS: u8 = 5;
const MAX_CONCURRENT_SESSIONS: usize = 5;
#[derive(Clone)]
enum SessionType {
Full,
PendingTotp {
totp_secret: Vec<u8>,
attempts: u8,
},
}
impl Drop for SessionType {
fn drop(&mut self) {
if let SessionType::PendingTotp { totp_secret, .. } = self {
totp_secret.zeroize();
}
}
}
#[derive(Clone)]
struct Session {
created_at: Instant,
last_activity: Instant,
session_type: SessionType,
}
#[derive(Clone)]
pub struct SessionStore {
sessions: Arc<RwLock<HashMap<[u8; 32], Session>>>,
}
impl SessionStore {
pub fn new() -> Self {
Self {
sessions: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Create a full (authenticated) session. Returns the plaintext token.
/// Enforces max concurrent sessions by evicting the oldest if limit reached.
pub async fn create(&self) -> String {
let token_bytes: [u8; 32] = rand::random();
let token = hex::encode(token_bytes);
let hash = hash_token(&token);
let now = Instant::now();
let session = Session {
created_at: now,
last_activity: now,
session_type: SessionType::Full,
};
let mut sessions = self.sessions.write().await;
self.evict_if_over_limit(&mut sessions);
sessions.insert(hash, session);
token
}
/// Create a pending TOTP session (password verified, awaiting TOTP).
/// Caches the decrypted TOTP secret in memory for verification.
pub async fn create_pending(&self, totp_secret: Vec<u8>) -> String {
let token_bytes: [u8; 32] = rand::random();
let token = hex::encode(token_bytes);
let hash = hash_token(&token);
let now = Instant::now();
let session = Session {
created_at: now,
last_activity: now,
session_type: SessionType::PendingTotp {
totp_secret,
attempts: 0,
},
};
self.sessions.write().await.insert(hash, session);
token
}
/// Validate a full session token. Returns true if the session exists and hasn't expired.
/// Updates last_activity on successful validation (inactivity-based expiry).
pub async fn validate(&self, token: &str) -> bool {
let hash = hash_token(token);
let mut sessions = self.sessions.write().await;
if let Some(session) = sessions.get_mut(&hash) {
if !matches!(session.session_type, SessionType::Full) {
return false;
}
if session.last_activity.elapsed().as_secs() >= FULL_SESSION_TTL {
sessions.remove(&hash);
return false;
}
session.last_activity = Instant::now();
true
} else {
false
}
}
/// Get the TOTP secret from a pending session. Returns None if not a valid pending session.
/// Increments the attempt counter.
pub async fn get_pending_secret(&self, token: &str) -> Option<Vec<u8>> {
let hash = hash_token(token);
let mut sessions = self.sessions.write().await;
if let Some(session) = sessions.get_mut(&hash) {
if session.created_at.elapsed().as_secs() >= PENDING_SESSION_TTL {
sessions.remove(&hash);
return None;
}
if let SessionType::PendingTotp {
ref totp_secret,
ref mut attempts,
} = session.session_type
{
*attempts += 1;
if *attempts > MAX_TOTP_ATTEMPTS {
sessions.remove(&hash); // Too many attempts, force re-login
return None;
}
return Some(totp_secret.clone());
}
}
None
}
/// Upgrade a pending session to a full session.
pub async fn upgrade_to_full(&self, token: &str) {
let hash = hash_token(token);
let mut sessions = self.sessions.write().await;
if let Some(session) = sessions.get_mut(&hash) {
session.session_type = SessionType::Full;
let now = Instant::now();
session.created_at = now;
session.last_activity = now;
}
}
pub async fn remove(&self, token: &str) {
let hash = hash_token(token);
self.sessions.write().await.remove(&hash);
}
/// Invalidate all sessions except the one matching the given token.
/// Used after sensitive operations like password change.
pub async fn invalidate_all_except(&self, keep_token: &str) {
let keep_hash = hash_token(keep_token);
let mut sessions = self.sessions.write().await;
sessions.retain(|hash, _| *hash == keep_hash);
}
/// Rotate a session: invalidate the old token and create a new one.
/// Returns the new plaintext token.
pub async fn rotate(&self, old_token: &str) -> String {
let old_hash = hash_token(old_token);
let new_token_bytes: [u8; 32] = rand::random();
let new_token = hex::encode(new_token_bytes);
let new_hash = hash_token(&new_token);
let now = Instant::now();
let mut sessions = self.sessions.write().await;
sessions.remove(&old_hash);
sessions.insert(
new_hash,
Session {
created_at: now,
last_activity: now,
session_type: SessionType::Full,
},
);
new_token
}
/// Remove all expired sessions (cleanup).
pub async fn cleanup_expired(&self) {
let mut sessions = self.sessions.write().await;
sessions.retain(|_, session| {
match &session.session_type {
SessionType::Full => session.last_activity.elapsed().as_secs() < FULL_SESSION_TTL,
SessionType::PendingTotp { .. } => {
session.created_at.elapsed().as_secs() < PENDING_SESSION_TTL
}
}
});
}
/// Evict the oldest full session if at or over the concurrent limit.
fn evict_if_over_limit(&self, sessions: &mut HashMap<[u8; 32], Session>) {
let full_count = sessions
.values()
.filter(|s| matches!(s.session_type, SessionType::Full))
.count();
if full_count >= MAX_CONCURRENT_SESSIONS {
// Find the oldest full session by last_activity
if let Some(oldest_hash) = sessions
.iter()
.filter(|(_, s)| matches!(s.session_type, SessionType::Full))
.min_by_key(|(_, s)| s.last_activity)
.map(|(h, _)| *h)
{
sessions.remove(&oldest_hash);
}
}
}
/// Get the number of active full sessions.
pub async fn active_session_count(&self) -> usize {
let sessions = self.sessions.read().await;
sessions
.values()
.filter(|s| {
matches!(s.session_type, SessionType::Full)
&& s.last_activity.elapsed().as_secs() < FULL_SESSION_TTL
})
.count()
}
}
fn hash_token(token: &str) -> [u8; 32] {
let mut hasher = Sha256::new();
hasher.update(token.as_bytes());
hasher.finalize().into()
}
/// Extract the session token from a Cookie header value.
pub fn extract_session_cookie(headers: &hyper::HeaderMap) -> Option<String> {
headers
.get("cookie")
.and_then(|v| v.to_str().ok())
.and_then(|cookies| {
cookies.split(';').find_map(|c| {
let c = c.trim();
c.strip_prefix("session=").map(|v| v.to_string())
})
})
.filter(|v| !v.is_empty())
}
/// Rate limiter for login attempts: max 5 failures per 60 seconds per IP.
#[derive(Clone)]
pub struct LoginRateLimiter {
attempts: Arc<RwLock<HashMap<IpAddr, Vec<Instant>>>>,
}
const MAX_ATTEMPTS: usize = 5;
const WINDOW_SECS: u64 = 60;
impl LoginRateLimiter {
pub fn new() -> Self {
Self {
attempts: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn check(&self, ip: IpAddr) -> bool {
let mut attempts = self.attempts.write().await;
let now = Instant::now();
let entry = attempts.entry(ip).or_default();
entry.retain(|t| now.duration_since(*t).as_secs() < WINDOW_SECS);
entry.len() < MAX_ATTEMPTS
}
pub async fn record_failure(&self, ip: IpAddr) {
let mut attempts = self.attempts.write().await;
let entry = attempts.entry(ip).or_default();
entry.push(Instant::now());
}
}
/// General-purpose rate limiter for sensitive endpoints.
/// Tracks request counts per (method, IP) with configurable limits and windows.
#[derive(Clone)]
pub struct EndpointRateLimiter {
/// Map of (method, ip) -> list of request timestamps
requests: Arc<RwLock<HashMap<(String, IpAddr), Vec<Instant>>>>,
/// Per-method configuration: (max_requests, window_secs)
limits: Arc<HashMap<String, (usize, u64)>>,
}
impl EndpointRateLimiter {
pub fn new() -> Self {
let mut limits = HashMap::new();
// Financial operations: strict limits
limits.insert("wallet.send".to_string(), (5usize, 300u64));
limits.insert("wallet.ecash-send".to_string(), (10, 300));
limits.insert("lnd.sendcoins".to_string(), (5, 300));
limits.insert("lnd.payinvoice".to_string(), (10, 300));
limits.insert("lnd.openchannel".to_string(), (3, 300));
limits.insert("lnd.closechannel".to_string(), (3, 300));
limits.insert("lnd.create-psbt".to_string(), (5, 300));
limits.insert("lnd.finalize-psbt".to_string(), (5, 300));
// Identity/credential operations
limits.insert("identity.create".to_string(), (10, 300));
limits.insert("identity.issue-credential".to_string(), (20, 300));
// Backup operations (resource-intensive)
limits.insert("backup.create".to_string(), (3, 600));
limits.insert("backup.restore".to_string(), (2, 600));
// Container operations
limits.insert("container-install".to_string(), (5, 300));
limits.insert("package.install".to_string(), (5, 300));
// System operations
limits.insert("update.apply".to_string(), (2, 600));
limits.insert("system.reboot".to_string(), (2, 300));
limits.insert("system.shutdown".to_string(), (2, 300));
// Password changes
limits.insert("auth.changePassword".to_string(), (3, 300));
Self {
requests: Arc::new(RwLock::new(HashMap::new())),
limits: Arc::new(limits),
}
}
/// Check if a request is allowed. Returns true if within limits.
pub async fn check(&self, method: &str, ip: IpAddr) -> bool {
let (max_req, window) = match self.limits.get(method) {
Some(config) => *config,
None => return true, // Not rate-limited
};
let key = (method.to_string(), ip);
let mut requests = self.requests.write().await;
let now = Instant::now();
let entry = requests.entry(key).or_default();
entry.retain(|t| now.duration_since(*t).as_secs() < window);
entry.len() < max_req
}
/// Record a request for rate limiting purposes.
pub async fn record(&self, method: &str, ip: IpAddr) {
if !self.limits.contains_key(method) {
return; // Not rate-limited, skip tracking
}
let key = (method.to_string(), ip);
let mut requests = self.requests.write().await;
let entry = requests.entry(key).or_default();
entry.push(Instant::now());
}
/// Periodic cleanup of expired entries.
pub async fn cleanup(&self) {
let mut requests = self.requests.write().await;
let now = Instant::now();
requests.retain(|(method, _), timestamps| {
let window = self
.limits
.get(method)
.map(|(_, w)| *w)
.unwrap_or(300);
timestamps.retain(|t| now.duration_since(*t).as_secs() < window);
!timestamps.is_empty()
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_session_create_and_validate() {
let store = SessionStore::new();
let token = store.create().await;
assert!(store.validate(&token).await);
}
#[tokio::test]
async fn test_session_invalid_token() {
let store = SessionStore::new();
assert!(!store.validate("nonexistent_token").await);
}
#[tokio::test]
async fn test_session_remove() {
let store = SessionStore::new();
let token = store.create().await;
assert!(store.validate(&token).await);
store.remove(&token).await;
assert!(!store.validate(&token).await);
}
#[tokio::test]
async fn test_pending_session_upgrade() {
let store = SessionStore::new();
let secret = vec![1, 2, 3, 4];
let token = store.create_pending(secret.clone()).await;
// Pending session should not validate as full
assert!(!store.validate(&token).await);
// Can get the TOTP secret
let got = store.get_pending_secret(&token).await;
assert_eq!(got, Some(secret));
// Upgrade to full
store.upgrade_to_full(&token).await;
assert!(store.validate(&token).await);
}
#[tokio::test]
async fn test_pending_session_max_attempts() {
let store = SessionStore::new();
let secret = vec![1, 2, 3];
let token = store.create_pending(secret).await;
// Exhaust MAX_TOTP_ATTEMPTS (5) + 1 to trigger removal
for _ in 0..MAX_TOTP_ATTEMPTS {
assert!(store.get_pending_secret(&token).await.is_some());
}
// 6th attempt should fail (session removed)
assert!(store.get_pending_secret(&token).await.is_none());
}
#[tokio::test]
async fn test_extract_session_cookie() {
let mut headers = hyper::HeaderMap::new();
headers.insert("cookie", "session=abc123; other=xyz".parse().unwrap());
assert_eq!(extract_session_cookie(&headers), Some("abc123".to_string()));
}
#[tokio::test]
async fn test_extract_session_cookie_missing() {
let headers = hyper::HeaderMap::new();
assert_eq!(extract_session_cookie(&headers), None);
}
#[tokio::test]
async fn test_rate_limiter_allows_under_limit() {
let limiter = LoginRateLimiter::new();
let ip: IpAddr = "127.0.0.1".parse().unwrap();
for _ in 0..MAX_ATTEMPTS {
assert!(limiter.check(ip).await);
limiter.record_failure(ip).await;
}
}
#[tokio::test]
async fn test_rate_limiter_blocks_over_limit() {
let limiter = LoginRateLimiter::new();
let ip: IpAddr = "127.0.0.1".parse().unwrap();
for _ in 0..MAX_ATTEMPTS {
limiter.record_failure(ip).await;
}
assert!(!limiter.check(ip).await);
}
#[tokio::test]
async fn test_rate_limiter_different_ips() {
let limiter = LoginRateLimiter::new();
let ip1: IpAddr = "127.0.0.1".parse().unwrap();
let ip2: IpAddr = "192.168.1.1".parse().unwrap();
for _ in 0..MAX_ATTEMPTS {
limiter.record_failure(ip1).await;
}
// ip1 should be blocked
assert!(!limiter.check(ip1).await);
// ip2 should still be allowed
assert!(limiter.check(ip2).await);
}
#[tokio::test]
async fn test_session_activity_updates_on_validate() {
let store = SessionStore::new();
let token = store.create().await;
// First validation should succeed and touch last_activity
assert!(store.validate(&token).await);
// Still valid after another validation
assert!(store.validate(&token).await);
}
#[tokio::test]
async fn test_invalidate_all_except() {
let store = SessionStore::new();
let token1 = store.create().await;
let token2 = store.create().await;
let token3 = store.create().await;
// Invalidate all except token2
store.invalidate_all_except(&token2).await;
assert!(!store.validate(&token1).await);
assert!(store.validate(&token2).await);
assert!(!store.validate(&token3).await);
}
#[tokio::test]
async fn test_session_rotate() {
let store = SessionStore::new();
let old_token = store.create().await;
assert!(store.validate(&old_token).await);
let new_token = store.rotate(&old_token).await;
// Old token should be invalid
assert!(!store.validate(&old_token).await);
// New token should be valid
assert!(store.validate(&new_token).await);
}
#[tokio::test]
async fn test_max_concurrent_sessions() {
let store = SessionStore::new();
let mut tokens = Vec::new();
// Create MAX_CONCURRENT_SESSIONS sessions
for _ in 0..MAX_CONCURRENT_SESSIONS {
tokens.push(store.create().await);
}
// All should be valid
for token in &tokens {
assert!(store.validate(token).await);
}
// Creating one more should evict the oldest
let new_token = store.create().await;
assert!(store.validate(&new_token).await);
// The first token (oldest) should have been evicted
assert!(!store.validate(&tokens[0]).await);
// The rest should still be valid
for token in &tokens[1..] {
assert!(store.validate(token).await);
}
}
#[tokio::test]
async fn test_active_session_count() {
let store = SessionStore::new();
assert_eq!(store.active_session_count().await, 0);
let token1 = store.create().await;
assert_eq!(store.active_session_count().await, 1);
let _token2 = store.create().await;
assert_eq!(store.active_session_count().await, 2);
store.remove(&token1).await;
assert_eq!(store.active_session_count().await, 1);
}
#[tokio::test]
async fn test_cleanup_expired_removes_stale() {
let store = SessionStore::new();
let token = store.create().await;
assert!(store.validate(&token).await);
assert_eq!(store.active_session_count().await, 1);
// Cleanup shouldn't remove active sessions
store.cleanup_expired().await;
assert_eq!(store.active_session_count().await, 1);
}
#[tokio::test]
async fn test_rotate_preserves_session_count() {
let store = SessionStore::new();
let token = store.create().await;
assert_eq!(store.active_session_count().await, 1);
let new_token = store.rotate(&token).await;
assert_eq!(store.active_session_count().await, 1);
assert!(store.validate(&new_token).await);
}
}