mod analytics; mod auth; mod backup_rpc; mod bitcoin; mod container; mod content; mod credentials; mod dispatcher; mod dwn; mod federation; mod handshake; mod identity; mod interfaces; mod marketplace; mod middleware; mod monitoring; mod names; mod lnd; mod mesh; mod network; mod node; mod nostr; mod package; mod peers; mod response; mod router; mod security; mod tor; mod transport; mod totp; mod system; mod update; mod vpn; mod wallet; mod webhooks; use crate::auth::AuthManager; use crate::config::Config; use crate::container::DevContainerOrchestrator; use crate::monitoring::MetricsStore; use crate::port_allocator::PortAllocator; use crate::rate_limit::{EndpointRateLimiter, LoginRateLimiter}; use crate::session::{self, SessionStore, REMEMBER_TTL}; use crate::state::StateManager; use anyhow::{Context, Result}; use hyper::{Request, Response, StatusCode}; use std::sync::Arc; use tracing::{debug, error}; use middleware::{ UNAUTHENTICATED_METHODS, CACHEABLE_METHODS, derive_csrf_token, extract_client_ip, extract_cookie, sanitize_error_message, }; use response::{RpcRequest, RpcResponse, RpcError, ResponseCache, json_response, cookie_header}; /// Default dev password when no user is set up (matches mock-backend). pub(crate) const DEV_DEFAULT_PASSWORD: &str = "password123"; pub struct RpcHandler { config: Config, auth_manager: AuthManager, orchestrator: Option>, state_manager: Arc, pub(crate) metrics_store: Arc, port_allocator: Arc>, pub session_store: SessionStore, login_rate_limiter: LoginRateLimiter, endpoint_rate_limiter: EndpointRateLimiter, response_cache: ResponseCache, mesh_service: Arc>>, transport_router: Arc>>>, } impl RpcHandler { pub async fn new( config: Config, state_manager: Arc, metrics_store: Arc, session_store: SessionStore, ) -> Result { let auth_manager = AuthManager::new(config.data_dir.clone()); let orchestrator = if config.dev_mode { Some(Arc::new( DevContainerOrchestrator::new(config.clone()).await?, )) } else { None }; let port_allocator = Arc::new(tokio::sync::Mutex::new(PortAllocator::new(&config.data_dir).await?)); let login_rate_limiter = LoginRateLimiter::new(); let endpoint_rate_limiter = EndpointRateLimiter::new(); // Spawn periodic rate limiter cleanup (every 5 minutes) { let limiter = endpoint_rate_limiter.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(300)); loop { interval.tick().await; limiter.cleanup().await; } }); } { let limiter = login_rate_limiter.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(300)); loop { interval.tick().await; limiter.cleanup().await; } }); } Ok(Self { config, auth_manager, orchestrator, state_manager, metrics_store, port_allocator, session_store, login_rate_limiter, endpoint_rate_limiter, response_cache: ResponseCache::new(5), mesh_service: Arc::new(tokio::sync::RwLock::new(None)), transport_router: Arc::new(tokio::sync::RwLock::new(None)), }) } /// Set the mesh service (called after identity is loaded). pub async fn set_mesh_service(&self, service: crate::mesh::MeshService) { *self.mesh_service.write().await = Some(service); } /// Set the transport router (called after all transports are initialized). pub async fn set_transport_router(&self, router: Arc) { *self.transport_router.write().await = Some(router); } /// Get reference to the mesh service Arc (for MeshTransport wrapper). pub fn mesh_service_arc(&self) -> Arc>> { Arc::clone(&self.mesh_service) } fn cookie_suffix_for_request(&self, headers: &hyper::header::HeaderMap) -> &'static str { // Only set Secure flag when the original request was over HTTPS. // Nginx sends X-Forwarded-Proto: https for HTTPS connections. // On LAN HTTP, Secure flag prevents browsers from sending cookies back. if self.config.dev_mode { return ""; } if let Some(proto) = headers.get("x-forwarded-proto") { if proto.as_bytes() == b"https" { tracing::debug!("[onboarding] cookie: Secure (X-Forwarded-Proto: https)"); return "; Secure"; } } tracing::debug!("[onboarding] cookie: no Secure flag (HTTP or no X-Forwarded-Proto)"); "" } pub async fn handle( &self, req: Request, ) -> Result> { // Extract session cookie before consuming the request let (parts, body) = req.into_parts(); let session_token = session::extract_session_cookie(&parts.headers); let secure_suffix = self.cookie_suffix_for_request(&parts.headers); let body_bytes = hyper::body::to_bytes(body).await .context("Failed to read body")?; let rpc_req: RpcRequest = serde_json::from_slice(&body_bytes) .context("Invalid RPC request")?; debug!("RPC method: {}", rpc_req.method); // Enforce authentication for non-allowlisted methods let is_unauthenticated = UNAUTHENTICATED_METHODS.contains(&rpc_req.method.as_str()); let mut new_session_cookies: Option<(String, String)> = None; if !is_unauthenticated { let mut authenticated = match &session_token { Some(token) => self.session_store.validate(token).await, None => false, }; // If session invalid, try remember-me token to auto-restore session if !authenticated { if let Some(remember) = extract_cookie(&parts.headers, "remember") { if crate::session::SessionStore::validate_remember_token(&remember).await { let new_token = self.session_store.create().await; let new_csrf = derive_csrf_token(&new_token).await; tracing::info!("Auto-restored session from remember-me token"); new_session_cookies = Some((new_token, new_csrf)); authenticated = true; } } } if !authenticated { let reason = if session_token.is_none() { "no session cookie" } else { "invalid/expired token" }; tracing::warn!(method = %rpc_req.method, reason, "401 Unauthorized — rejecting RPC call"); return Ok(self.error_response(401, "Unauthorized", StatusCode::UNAUTHORIZED)); } } // RBAC: check if the user's role allows this method if !is_unauthenticated { if let Ok(Some(user)) = self.auth_manager.get_user().await { if !user.role.can_access(&rpc_req.method) { return Ok(self.error_response(403, "Forbidden: insufficient permissions", StatusCode::FORBIDDEN)); } } } // CSRF protection: validate X-CSRF-Token header via HMAC derivation from session token. // Skip CSRF check if session was just auto-restored from remember-me. if !is_unauthenticated && new_session_cookies.is_none() { let csrf_header = parts .headers .get("x-csrf-token") .and_then(|v| v.to_str().ok()) .map(|s| s.to_string()); let csrf_valid = match (&session_token, &csrf_header) { (Some(token), Some(header)) => { use hmac::{Hmac, Mac}; use sha2::Sha256; type HmacSha256 = Hmac; let secret = SessionStore::load_or_create_remember_secret().await; let mut mac = match HmacSha256::new_from_slice(&secret) { Ok(m) => m, Err(_) => { return Ok(json_response(StatusCode::INTERNAL_SERVER_ERROR, b"{}")); } }; mac.update(format!("csrf:{}", token).as_bytes()); match hex::decode(header) { Ok(header_bytes) => mac.verify_slice(&header_bytes).is_ok(), Err(_) => false, } } _ => false, }; if !csrf_valid { tracing::warn!( method = %rpc_req.method, has_session = session_token.is_some(), has_header = csrf_header.is_some(), "403 CSRF validation failed — rejecting RPC call" ); return Ok(self.error_response(403, "CSRF token missing or invalid", StatusCode::FORBIDDEN)); } } // Rate limit login attempts if rpc_req.method == "auth.login" { let client_ip = extract_client_ip(&parts.headers); if !self.login_rate_limiter.check(client_ip).await { return Ok(self.rate_limit_response()); } } // Rate limit sensitive endpoints { let client_ip = extract_client_ip(&parts.headers); if !self.endpoint_rate_limiter.check(&rpc_req.method, client_ip).await { return Ok(self.rate_limit_response()); } self.endpoint_rate_limiter.record(&rpc_req.method, client_ip).await; } // Extract params; clone for post-routing use (login 2FA check needs password) let params = rpc_req.params; let login_params: Option = if rpc_req.method == "auth.login" { params.clone() } else { None }; // Check cache for cacheable methods let is_cacheable = CACHEABLE_METHODS.contains(&rpc_req.method.as_str()); if is_cacheable { if let Some(cached) = self.response_cache.get(&rpc_req.method).await { let rpc_resp = RpcResponse { result: Some(cached), error: None, }; let body = serde_json::to_vec(&rpc_resp)?; return Ok(json_response(StatusCode::OK, &body)); } } // Route to handler (track latency for metrics) let rpc_start = std::time::Instant::now(); let result = self.dispatch(&rpc_req.method, params, &session_token).await; // Record RPC latency for monitoring let elapsed_ms = rpc_start.elapsed().as_secs_f64() * 1000.0; self.metrics_store.record_rpc_latency(elapsed_ms).await; // Build response (cache successful results for cacheable methods) let mut rpc_resp = match result { Ok(data) => { if is_cacheable { self.response_cache.set(rpc_req.method.clone(), data.clone()).await; } RpcResponse { result: Some(data), error: None, } } Err(e) => { error!("RPC error on {}: {}", rpc_req.method, e); let user_message = sanitize_error_message(&e.to_string()); RpcResponse { result: None, error: Some(RpcError { code: -1, message: user_message, data: None, }), } } }; let resp_body = serde_json::to_vec(&rpc_resp) .context("Failed to serialize response")?; let mut response = json_response(StatusCode::OK, &resp_body); // Post-dispatch: set cookies for auth-related methods let client_ip = extract_client_ip(&parts.headers); self.apply_auth_cookies( &rpc_req.method, &mut rpc_resp, &mut response, &session_token, &login_params, &new_session_cookies, client_ip, secure_suffix, ).await; Ok(response) } /// Build a JSON error response with the given RPC error code and HTTP status. fn error_response(&self, code: i32, message: &str, status: StatusCode) -> Response { let rpc_resp = RpcResponse { result: None, error: Some(RpcError { code, message: message.to_string(), data: None, }), }; let resp_body = serde_json::to_vec(&rpc_resp).unwrap_or_default(); json_response(status, &resp_body) } /// Build a 429 Too Many Requests response. fn rate_limit_response(&self) -> Response { let rpc_resp = RpcResponse { result: None, error: Some(RpcError { code: 429, message: "Rate limit exceeded. Try again later.".to_string(), data: None, }), }; let resp_body = serde_json::to_vec(&rpc_resp).unwrap_or_default(); let mut resp = json_response(StatusCode::TOO_MANY_REQUESTS, &resp_body); resp.headers_mut().insert("Retry-After", cookie_header("60")); resp } /// Apply session/CSRF/remember-me cookies after dispatch for auth-related methods. async fn apply_auth_cookies( &self, method: &str, rpc_resp: &mut RpcResponse, response: &mut Response, session_token: &Option, login_params: &Option, new_session_cookies: &Option<(String, String)>, client_ip: std::net::IpAddr, secure_suffix: &str, ) { // Track failed login attempts for rate limiting if method == "auth.login" && rpc_resp.error.is_some() { self.login_rate_limiter.record_failure(client_ip).await; } // On successful login, check if 2FA is required if method == "auth.login" && rpc_resp.error.is_none() { let totp_enabled = self.auth_manager.is_totp_enabled().await.unwrap_or(false); if totp_enabled { let password = login_params .as_ref() .and_then(|p| p.get("password")) .and_then(|v| v.as_str()) .unwrap_or(""); if let Ok(Some(totp_data)) = self.auth_manager.get_totp_data().await { if let Ok(secret) = crate::totp::decrypt_secret(&totp_data, password) { let token = self.session_store.create_pending(secret).await; let csrf_token = derive_csrf_token(&token).await; self.set_session_cookie(response, &token, secure_suffix); self.set_csrf_cookie(response, &csrf_token, secure_suffix); let totp_body = serde_json::json!({ "result": { "requires_totp": true }, "error": null }); *response.body_mut() = hyper::Body::from( serde_json::to_vec(&totp_body).unwrap_or_default(), ); } } } else { let token = self.session_store.create().await; let csrf_token = derive_csrf_token(&token).await; let remember_token = self.session_store.create_remember_token().await; self.set_session_cookie(response, &token, secure_suffix); self.set_csrf_cookie(response, &csrf_token, secure_suffix); self.set_remember_cookie(response, &remember_token, secure_suffix); } } // On successful TOTP verification, set the rotated session cookie if (method == "auth.login.totp" || method == "auth.login.backup") && rpc_resp.error.is_none() { let new_token_opt = rpc_resp .result .as_ref() .and_then(|r| r.get("new_session_token")) .and_then(|v| v.as_str()) .map(|s| s.to_string()); if let Some(new_token) = new_token_opt { let csrf_token = derive_csrf_token(&new_token).await; let remember_token = self.session_store.create_remember_token().await; self.set_session_cookie(response, &new_token, secure_suffix); self.set_csrf_cookie(response, &csrf_token, secure_suffix); self.set_remember_cookie(response, &remember_token, secure_suffix); // Strip the token from the response body if let Some(result) = rpc_resp.result.as_mut() { if let Some(obj) = result.as_object_mut() { obj.remove("new_session_token"); } } let body_bytes = serde_json::to_vec(&rpc_resp).unwrap_or_default(); *response.body_mut() = hyper::Body::from(body_bytes); } } // On password change, rotate the session token for the caller if method == "auth.changePassword" && rpc_resp.error.is_none() { if let Some(token) = session_token { let new_token = self.session_store.rotate(token).await; let csrf_token = derive_csrf_token(&new_token).await; self.set_session_cookie(response, &new_token, secure_suffix); self.set_csrf_cookie(response, &csrf_token, secure_suffix); } } // On logout, invalidate session and expire cookies if method == "auth.logout" { if let Some(token) = session_token { self.session_store.remove(token).await; } response.headers_mut().append( "Set-Cookie", cookie_header(&format!("session=; HttpOnly; SameSite=Lax; Path=/; Max-Age=0{}", secure_suffix)), ); response.headers_mut().append( "Set-Cookie", cookie_header(&format!("csrf_token=; SameSite=Lax; Path=/; Max-Age=0{}", secure_suffix)), ); } // If session was auto-restored from remember-me, set new cookies if let Some((new_session, new_csrf)) = new_session_cookies { self.set_session_cookie(response, new_session, secure_suffix); self.set_csrf_cookie(response, new_csrf, secure_suffix); } } fn set_session_cookie(&self, response: &mut Response, token: &str, secure_suffix: &str) { response.headers_mut().append( "Set-Cookie", cookie_header(&format!("session={}; HttpOnly; SameSite=Lax; Path=/{}", token, secure_suffix)), ); } fn set_csrf_cookie(&self, response: &mut Response, csrf_token: &str, secure_suffix: &str) { response.headers_mut().append( "Set-Cookie", cookie_header(&format!("csrf_token={}; SameSite=Lax; Path=/{}", csrf_token, secure_suffix)), ); } fn set_remember_cookie(&self, response: &mut Response, remember_token: &str, secure_suffix: &str) { response.headers_mut().append( "Set-Cookie", cookie_header(&format!("remember={}; HttpOnly; SameSite=Lax; Path=/; Max-Age={}{}", remember_token, REMEMBER_TTL, secure_suffix)), ); } }