mod analytics; mod auth; mod backup_rpc; mod bitcoin; mod container; mod content; mod credentials; mod dispatcher; mod dwn; mod federation; mod fips; mod handshake; mod identity; mod interfaces; mod lnd; mod marketplace; mod mesh; mod middleware; mod monitoring; mod names; mod network; mod node; mod nostr; mod package; mod peers; mod response; mod router; mod security; mod seed_rpc; mod streaming; mod system; mod tor; mod totp; mod transitional; mod transport; mod update; mod vpn; mod wallet; mod webhooks; use crate::auth::AuthManager; use crate::config::Config; use crate::container::{ContainerOrchestrator, DevContainerOrchestrator}; use crate::monitoring::MetricsStore; use crate::port_allocator::PortAllocator; use crate::rate_limit::{EndpointRateLimiter, LoginRateLimiter}; use crate::session::{self, SessionStore, REMEMBER_TTL}; use crate::state::StateManager; use anyhow::{Context, Result}; use hyper::{Request, Response, StatusCode}; use std::sync::Arc; use tracing::{debug, error}; use middleware::{ derive_csrf_token, extract_client_ip, extract_cookie, sanitize_error_message, CACHEABLE_METHODS, UNAUTHENTICATED_METHODS, }; use response::{cookie_header, json_response, ResponseCache, RpcError, RpcRequest, RpcResponse}; /// Default dev password when no user is set up (matches mock-backend). pub(crate) const DEV_DEFAULT_PASSWORD: &str = "password123"; pub struct RpcHandler { config: Config, auth_manager: AuthManager, /// Shared lifecycle orchestrator (Dev or Prod). Always `Some` in a normal /// build — the only reason it is `Option` is so tests that don't exercise /// container RPCs can skip constructing one. orchestrator: Option>, /// Concrete handle to the dev orchestrator, when we're in dev mode. Used by /// `container-install { manifest_path }` which takes an ad-hoc manifest /// path and is not part of the shared trait. dev_orchestrator: Option>, state_manager: Arc, pub(crate) metrics_store: Arc, port_allocator: Arc>, pub session_store: SessionStore, login_rate_limiter: LoginRateLimiter, endpoint_rate_limiter: EndpointRateLimiter, response_cache: ResponseCache, mesh_service: Arc>>, transport_router: Arc>>>, /// Shared content-addressed blob store. Set by ApiHandler after construction /// so mesh.send-content / mesh.fetch-content RPCs can reach it without a /// second instance and duplicated cap_key. pub(crate) blob_store: Arc>>>, /// Our own Ed25519 pubkey hex — needed by ContentRef senders for cap scoping /// and by ContentRef receivers to request caps scoped to themselves. pub(crate) self_pubkey_hex: Arc>>, } impl RpcHandler { pub async fn new( config: Config, state_manager: Arc, metrics_store: Arc, session_store: SessionStore, orchestrator: Option>, dev_orchestrator: Option>, ) -> Result { let auth_manager = AuthManager::new(config.data_dir.clone()); let port_allocator = Arc::new(tokio::sync::Mutex::new( PortAllocator::new(&config.data_dir).await?, )); let login_rate_limiter = LoginRateLimiter::new(); let endpoint_rate_limiter = EndpointRateLimiter::new(); // Spawn periodic rate limiter cleanup (every 5 minutes) { let limiter = endpoint_rate_limiter.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(300)); loop { interval.tick().await; limiter.cleanup().await; } }); } { let limiter = login_rate_limiter.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(300)); loop { interval.tick().await; limiter.cleanup().await; } }); } Ok(Self { config, auth_manager, orchestrator, dev_orchestrator, state_manager, metrics_store, port_allocator, session_store, login_rate_limiter, endpoint_rate_limiter, response_cache: ResponseCache::new(5), mesh_service: Arc::new(tokio::sync::RwLock::new(None)), transport_router: Arc::new(tokio::sync::RwLock::new(None)), blob_store: Arc::new(tokio::sync::RwLock::new(None)), self_pubkey_hex: Arc::new(tokio::sync::RwLock::new(None)), }) } /// Set the mesh service (called after identity is loaded). pub async fn set_mesh_service(&self, service: crate::mesh::MeshService) { // If the blob store is already initialised, propagate it into the // freshly-started mesh state so the listener can persist inline // attachments. Mirrors `set_blob_store`'s forward-propagation. if let Some(store) = self.blob_store.read().await.as_ref().cloned() { *service.shared_state().blob_store.write().await = Some(store); } *self.mesh_service.write().await = Some(service); } /// Set the transport router (called after all transports are initialized). pub async fn set_transport_router(&self, router: Arc) { *self.transport_router.write().await = Some(router); } /// Share the blob store + our pubkey so mesh.send-content / fetch-content /// can reach them. Called once from ApiHandler::new. pub async fn set_blob_store( &self, store: Arc, self_pubkey_hex: String, ) { *self.blob_store.write().await = Some(store.clone()); *self.self_pubkey_hex.write().await = Some(self_pubkey_hex); // Propagate into a running mesh service if one is already up — keeps // `set_blob_store` and `set_mesh_service` order-independent. if let Some(svc) = self.mesh_service.read().await.as_ref() { *svc.shared_state().blob_store.write().await = Some(store); } } /// Get reference to the mesh service Arc (for MeshTransport wrapper). pub fn mesh_service_arc(&self) -> Arc>> { Arc::clone(&self.mesh_service) } fn cookie_suffix_for_request(&self, headers: &hyper::header::HeaderMap) -> &'static str { // Only set Secure flag when the original request was over HTTPS. // Nginx sends X-Forwarded-Proto: https for HTTPS connections. // On LAN HTTP, Secure flag prevents browsers from sending cookies back. if self.config.dev_mode { return ""; } if let Some(proto) = headers.get("x-forwarded-proto") { if proto.as_bytes() == b"https" { tracing::debug!("[onboarding] cookie: Secure (X-Forwarded-Proto: https)"); return "; Secure"; } } tracing::debug!("[onboarding] cookie: no Secure flag (HTTP or no X-Forwarded-Proto)"); "" } pub async fn handle( self: Arc, req: Request, ) -> Result> { // Extract session cookie before consuming the request let (parts, body) = req.into_parts(); let session_token = session::extract_session_cookie(&parts.headers); let secure_suffix = self.cookie_suffix_for_request(&parts.headers); let body_bytes = hyper::body::to_bytes(body) .await .context("Failed to read body")?; let rpc_req: RpcRequest = serde_json::from_slice(&body_bytes).context("Invalid RPC request")?; debug!("RPC method: {}", rpc_req.method); // Enforce authentication for non-allowlisted methods let is_unauthenticated = UNAUTHENTICATED_METHODS.contains(&rpc_req.method.as_str()); let mut new_session_cookies: Option<(String, String)> = None; if !is_unauthenticated { let mut authenticated = match &session_token { Some(token) => self.session_store.validate(token).await, None => false, }; // If session invalid, try remember-me token to auto-restore session if !authenticated { if let Some(remember) = extract_cookie(&parts.headers, "remember") { if crate::session::SessionStore::validate_remember_token(&remember).await { let new_token = self.session_store.create().await; let new_csrf = derive_csrf_token(&new_token).await; tracing::info!("Auto-restored session from remember-me token"); new_session_cookies = Some((new_token, new_csrf)); authenticated = true; } } } if !authenticated { let reason = if session_token.is_none() { "no session cookie" } else { "invalid/expired token" }; tracing::warn!(method = %rpc_req.method, reason, "401 Unauthorized — rejecting RPC call"); return Ok(self.error_response(401, "Unauthorized", StatusCode::UNAUTHORIZED)); } } // RBAC: check if the user's role allows this method if !is_unauthenticated { if let Ok(Some(user)) = self.auth_manager.get_user().await { if !user.role.can_access(&rpc_req.method) { return Ok(self.error_response( 403, "Forbidden: insufficient permissions", StatusCode::FORBIDDEN, )); } } } // CSRF protection: validate X-CSRF-Token header via HMAC derivation from session token. // Skip CSRF for read-only methods (polling, status) — CSRF prevents state-changing forgery. // Skip when session was just auto-restored from remember-me (browser has stale CSRF cookie). let csrf_exempt = matches!( rpc_req.method.as_str(), "node-messages-received" | "server.echo" | "server.get-state" | "system.stats" | "tor.status" | "tor.onion-addresses" | "federation.list-nodes" | "system.get-settings" | "system.get-node-key" | "system.get-metrics" | "system.get-version" ); if !is_unauthenticated && new_session_cookies.is_none() && !csrf_exempt { let csrf_header = parts .headers .get("x-csrf-token") .and_then(|v| v.to_str().ok()) .map(|s| s.to_string()); let csrf_valid = match (&session_token, &csrf_header) { (Some(token), Some(header)) => { use hmac::{Hmac, Mac}; use sha2::Sha256; type HmacSha256 = Hmac; let secret = SessionStore::load_or_create_remember_secret().await; let mut mac = match HmacSha256::new_from_slice(&secret) { Ok(m) => m, Err(_) => { return Ok(json_response(StatusCode::INTERNAL_SERVER_ERROR, b"{}")); } }; mac.update(format!("csrf:{}", token).as_bytes()); match hex::decode(header) { Ok(header_bytes) => mac.verify_slice(&header_bytes).is_ok(), Err(_) => false, } } _ => false, }; if !csrf_valid { // Debug: log expected vs received for diagnosis if let (Some(token), Some(header)) = (&session_token, &csrf_header) { let expected = derive_csrf_token(token).await; tracing::warn!( method = %rpc_req.method, session_prefix = %&token[..8.min(token.len())], csrf_prefix = %&header[..8.min(header.len())], expected_prefix = %&expected[..8.min(expected.len())], "403 CSRF mismatch — session/csrf/expected prefixes shown" ); } else { tracing::warn!( method = %rpc_req.method, has_session = session_token.is_some(), has_header = csrf_header.is_some(), "403 CSRF validation failed — rejecting RPC call" ); } return Ok(self.error_response( 403, "CSRF token missing or invalid", StatusCode::FORBIDDEN, )); } } // Rate limit login attempts if rpc_req.method == "auth.login" { let client_ip = extract_client_ip(&parts.headers); if !self.login_rate_limiter.check(client_ip).await { return Ok(self.rate_limit_response()); } } // Rate limit sensitive endpoints { let client_ip = extract_client_ip(&parts.headers); if !self .endpoint_rate_limiter .check(&rpc_req.method, client_ip) .await { return Ok(self.rate_limit_response()); } self.endpoint_rate_limiter .record(&rpc_req.method, client_ip) .await; } // Extract params; clone for post-routing use (login 2FA check needs password) let params = rpc_req.params; let login_params: Option = if rpc_req.method == "auth.login" { params.clone() } else { None }; // Check cache for cacheable methods let is_cacheable = CACHEABLE_METHODS.contains(&rpc_req.method.as_str()); if is_cacheable { if let Some(cached) = self.response_cache.get(&rpc_req.method).await { let rpc_resp = RpcResponse { result: Some(cached), error: None, }; let body = serde_json::to_vec(&rpc_resp)?; return Ok(json_response(StatusCode::OK, &body)); } } // Route to handler (track latency for metrics) let rpc_start = std::time::Instant::now(); let result = Self::dispatch(&self, &rpc_req.method, params, &session_token).await; // Record RPC latency for monitoring let elapsed_ms = rpc_start.elapsed().as_secs_f64() * 1000.0; self.metrics_store.record_rpc_latency(elapsed_ms).await; // Build response (cache successful results for cacheable methods) let mut rpc_resp = match result { Ok(data) => { if is_cacheable { self.response_cache .set(rpc_req.method.clone(), data.clone()) .await; } RpcResponse { result: Some(data), error: None, } } Err(e) => { error!("RPC error on {}: {}", rpc_req.method, e); let user_message = sanitize_error_message(&e.to_string()); RpcResponse { result: None, error: Some(RpcError { code: -1, message: user_message, data: None, }), } } }; let resp_body = serde_json::to_vec(&rpc_resp).context("Failed to serialize response")?; let mut response = json_response(StatusCode::OK, &resp_body); // Post-dispatch: set cookies for auth-related methods let client_ip = extract_client_ip(&parts.headers); self.apply_auth_cookies( &rpc_req.method, &mut rpc_resp, &mut response, &session_token, &login_params, &new_session_cookies, client_ip, secure_suffix, ) .await; Ok(response) } /// Build a JSON error response with the given RPC error code and HTTP status. fn error_response( &self, code: i32, message: &str, status: StatusCode, ) -> Response { let rpc_resp = RpcResponse { result: None, error: Some(RpcError { code, message: message.to_string(), data: None, }), }; let resp_body = serde_json::to_vec(&rpc_resp).unwrap_or_default(); json_response(status, &resp_body) } /// Build a 429 Too Many Requests response. fn rate_limit_response(&self) -> Response { let rpc_resp = RpcResponse { result: None, error: Some(RpcError { code: 429, message: "Rate limit exceeded. Try again later.".to_string(), data: None, }), }; let resp_body = serde_json::to_vec(&rpc_resp).unwrap_or_default(); let mut resp = json_response(StatusCode::TOO_MANY_REQUESTS, &resp_body); resp.headers_mut() .insert("Retry-After", cookie_header("60")); resp } /// Apply session/CSRF/remember-me cookies after dispatch for auth-related methods. async fn apply_auth_cookies( &self, method: &str, rpc_resp: &mut RpcResponse, response: &mut Response, session_token: &Option, login_params: &Option, new_session_cookies: &Option<(String, String)>, client_ip: std::net::IpAddr, secure_suffix: &str, ) { // Track failed login attempts for rate limiting if method == "auth.login" && rpc_resp.error.is_some() { self.login_rate_limiter.record_failure(client_ip).await; } // On successful login, check if 2FA is required if method == "auth.login" && rpc_resp.error.is_none() { let totp_enabled = self.auth_manager.is_totp_enabled().await.unwrap_or(false); if totp_enabled { let password = login_params .as_ref() .and_then(|p| p.get("password")) .and_then(|v| v.as_str()) .unwrap_or(""); if let Ok(Some(totp_data)) = self.auth_manager.get_totp_data().await { if let Ok(secret) = crate::totp::decrypt_secret(&totp_data, password) { let token = self.session_store.create_pending(secret).await; let csrf_token = derive_csrf_token(&token).await; self.set_session_cookie(response, &token, secure_suffix); self.set_csrf_cookie(response, &csrf_token, secure_suffix); let totp_body = serde_json::json!({ "result": { "requires_totp": true }, "error": null }); *response.body_mut() = hyper::Body::from(serde_json::to_vec(&totp_body).unwrap_or_default()); } } } else { let token = self.session_store.create().await; let csrf_token = derive_csrf_token(&token).await; let remember_token = self.session_store.create_remember_token().await; self.set_session_cookie(response, &token, secure_suffix); self.set_csrf_cookie(response, &csrf_token, secure_suffix); self.set_remember_cookie(response, &remember_token, secure_suffix); } } // On successful TOTP verification, set the rotated session cookie if (method == "auth.login.totp" || method == "auth.login.backup") && rpc_resp.error.is_none() { let new_token_opt = rpc_resp .result .as_ref() .and_then(|r| r.get("new_session_token")) .and_then(|v| v.as_str()) .map(|s| s.to_string()); if let Some(new_token) = new_token_opt { let csrf_token = derive_csrf_token(&new_token).await; let remember_token = self.session_store.create_remember_token().await; self.set_session_cookie(response, &new_token, secure_suffix); self.set_csrf_cookie(response, &csrf_token, secure_suffix); self.set_remember_cookie(response, &remember_token, secure_suffix); // Strip the token from the response body if let Some(result) = rpc_resp.result.as_mut() { if let Some(obj) = result.as_object_mut() { obj.remove("new_session_token"); } } let body_bytes = serde_json::to_vec(&rpc_resp).unwrap_or_default(); *response.body_mut() = hyper::Body::from(body_bytes); } } // On password change, rotate the session token for the caller if method == "auth.changePassword" && rpc_resp.error.is_none() { if let Some(token) = session_token { let new_token = self.session_store.rotate(token).await; let csrf_token = derive_csrf_token(&new_token).await; self.set_session_cookie(response, &new_token, secure_suffix); self.set_csrf_cookie(response, &csrf_token, secure_suffix); } } // On logout, invalidate session and expire cookies if method == "auth.logout" { if let Some(token) = session_token { self.session_store.remove(token).await; } response.headers_mut().append( "Set-Cookie", cookie_header(&format!( "session=; HttpOnly; SameSite=Lax; Path=/; Max-Age=0{}", secure_suffix )), ); response.headers_mut().append( "Set-Cookie", cookie_header(&format!( "csrf_token=; SameSite=Lax; Path=/; Max-Age=0{}", secure_suffix )), ); } // If session was auto-restored from remember-me, set new cookies if let Some((new_session, new_csrf)) = new_session_cookies { self.set_session_cookie(response, new_session, secure_suffix); self.set_csrf_cookie(response, new_csrf, secure_suffix); } } fn set_session_cookie( &self, response: &mut Response, token: &str, secure_suffix: &str, ) { response.headers_mut().append( "Set-Cookie", cookie_header(&format!( "session={}; HttpOnly; SameSite=Lax; Path=/{}", token, secure_suffix )), ); } fn set_csrf_cookie( &self, response: &mut Response, csrf_token: &str, secure_suffix: &str, ) { response.headers_mut().append( "Set-Cookie", cookie_header(&format!( "csrf_token={}; SameSite=Lax; Path=/{}", csrf_token, secure_suffix )), ); } fn set_remember_cookie( &self, response: &mut Response, remember_token: &str, secure_suffix: &str, ) { response.headers_mut().append( "Set-Cookie", cookie_header(&format!( "remember={}; HttpOnly; SameSite=Lax; Path=/; Max-Age={}{}", remember_token, REMEMBER_TTL, secure_suffix )), ); } }