use crate::api::rpc::RpcHandler; use crate::content_server; use crate::electrs_status; use crate::monitoring::MetricsStore; use crate::network::dwn_store::DwnStore; use crate::node_message as node_msg; use crate::config::Config; use crate::session::{self, SessionStore}; use crate::state::StateManager; use anyhow::Result; use futures_util::{SinkExt, StreamExt}; use hyper::{Method, Request, Response, StatusCode}; use hyper_ws_listener::WsStream; use std::sync::Arc; use tokio::sync::broadcast; use tokio_tungstenite::tungstenite::Message; use std::time::Instant; use tracing::{debug, info}; pub struct ApiHandler { config: Config, rpc_handler: Arc, state_manager: Arc, metrics_store: Arc, session_store: SessionStore, } impl ApiHandler { pub async fn new( config: Config, state_manager: Arc, metrics_store: Arc, ) -> Result { let session_store = SessionStore::new(); let rpc_handler = Arc::new( RpcHandler::new( config.clone(), state_manager.clone(), metrics_store.clone(), session_store.clone(), ) .await?, ); Ok(Self { config, rpc_handler, state_manager, metrics_store, session_store, }) } /// Access the RPC handler (for service initialization after construction). pub fn rpc_handler(&self) -> &Arc { &self.rpc_handler } /// Check if the request has a valid session cookie. async fn is_authenticated(&self, headers: &hyper::HeaderMap) -> bool { match session::extract_session_cookie(headers) { Some(token) => self.session_store.validate(&token).await, None => false, } } /// Build a 401 Unauthorized JSON response. fn unauthorized() -> Response { let body = serde_json::json!({ "error": "Unauthorized" }); let body_bytes = serde_json::to_vec(&body).unwrap_or_default(); Response::builder() .status(StatusCode::UNAUTHORIZED) .header("Content-Type", "application/json") .body(hyper::Body::from(body_bytes)) .unwrap() } /// Allowed CORS origins derived from the config host IP. fn allowed_origins(&self) -> Vec { vec![ format!("http://{}", self.config.host_ip), format!("https://{}", self.config.host_ip), "http://localhost:8100".to_string(), // Vite dev server ] } /// Validate the Origin header against allowed origins. /// Returns the matched origin if valid, None if cross-origin is not allowed. fn validate_origin(&self, headers: &hyper::HeaderMap) -> Option { let origin = headers .get("origin") .and_then(|v| v.to_str().ok())?; let allowed = self.allowed_origins(); if allowed.iter().any(|a| a == origin) { Some(origin.to_string()) } else { None } } pub async fn handle_request( &self, req: Request, ) -> Result> { let path = req.uri().path().to_string(); let method = req.method().clone(); // Handle CORS preflight for all routes if method == Method::OPTIONS { let mut builder = Response::builder() .status(StatusCode::NO_CONTENT) .header("Vary", "Origin"); if let Some(origin) = self.validate_origin(req.headers()) { builder = builder .header("Access-Control-Allow-Origin", &origin) .header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") .header("Access-Control-Allow-Headers", "Content-Type, X-CSRF-Token") .header("Access-Control-Allow-Credentials", "true"); } return Ok(builder.body(hyper::Body::empty()).unwrap()); } // WebSocket upgrade — validate session before upgrading if method == Method::GET && path == "/ws/db" { if !self.is_authenticated(req.headers()).await { tracing::warn!("401 WebSocket /ws/db — session invalid or missing"); return Ok(Self::unauthorized()); } return Self::handle_websocket(req, self.state_manager.clone(), self.metrics_store.clone()).await; } // Convert body to bytes for non-WS routes let headers = req.headers().clone(); let (parts, body) = req.into_parts(); let body_bytes = hyper::body::to_bytes(body).await .map_err(|e| anyhow::anyhow!("Failed to read body: {}", e))?; let req_with_bytes = Request::from_parts(parts, hyper::Body::from(body_bytes.clone())); debug!("{} {}", method, path); match (method, path.as_str()) { // RPC — auth is handled inside rpc handler per-method (Method::POST, "/rpc/v1") => self.rpc_handler.handle(req_with_bytes).await, // Health — unauthenticated (Method::GET, "/health") => Ok(Response::builder() .status(StatusCode::OK) .body(hyper::Body::from("OK")) .unwrap()), // Node message — P2P endpoint (authenticated by source validation, not cookie) (Method::POST, "/archipelago/node-message") => { Self::handle_node_message(body_bytes).await } // Content serving — peers access shared content over Tor (no session auth) (Method::GET, p) if p.starts_with("/content/") => { Self::handle_content_request(p, &headers, &self.config).await } // Content catalog — list available content (no session auth, for peers) (Method::GET, "/content") => { Self::handle_content_catalog(&self.config).await } // Electrs status — unauthenticated (read-only sync status) (Method::GET, "/electrs-status") => Self::handle_electrs_status().await, // LND connect info — unauthenticated (read-only, localhost only) (Method::GET, "/lnd-connect-info") => { Self::handle_lnd_connect_info(self.rpc_handler.clone()).await } // Container logs — requires session (Method::GET, path) if path.starts_with("/api/container/logs") => { if !self.is_authenticated(&headers).await { return Ok(Self::unauthorized()); } let origin = self.validate_origin(&headers).unwrap_or_default(); Self::handle_container_logs_http(self.rpc_handler.clone(), path, &origin).await } // LND proxy — requires session (Method::GET, path) if path.starts_with("/proxy/lnd/") => { if !self.is_authenticated(&headers).await { return Ok(Self::unauthorized()); } let origin = self.validate_origin(&headers).unwrap_or_default(); Self::handle_lnd_proxy(path, &origin).await } // DWN health — unauthenticated (Method::GET, "/dwn/health") => { Self::handle_dwn_health(&self.config).await } // DWN message processing — peers access over Tor for sync (no session auth) (Method::POST, "/dwn") => { Self::handle_dwn_message(body_bytes, &self.config).await } _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(hyper::Body::from("Not Found")) .unwrap()), } } async fn handle_container_logs_http( rpc: Arc, path: &str, cors_origin: &str, ) -> Result> { let query = path .strip_prefix("/api/container/logs") .and_then(|s| s.strip_prefix('?')) .unwrap_or(""); let params: std::collections::HashMap = query .split('&') .filter_map(|p| { let mut it = p.splitn(2, '='); let k = it.next()?.to_string(); let v = it.next()?.to_string(); Some((k, v)) }) .collect(); let app_id = params.get("app_id").map(|s| s.as_str()).unwrap_or("lnd"); // Validate app_id format if !is_valid_app_id(app_id) { let body = serde_json::json!({ "error": "Invalid app_id" }); let body_bytes = serde_json::to_vec(&body).unwrap_or_default(); return Ok(Response::builder() .status(StatusCode::BAD_REQUEST) .header("Content-Type", "application/json") .body(hyper::Body::from(body_bytes)) .unwrap()); } let lines = params .get("lines") .and_then(|s| s.parse::().ok()) .unwrap_or(200); match rpc.get_container_logs_value(app_id, lines).await { Ok(value) => { let body = serde_json::json!({ "result": value }); let body_bytes = serde_json::to_vec(&body).unwrap_or_default(); Ok(Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .header("Access-Control-Allow-Origin", cors_origin) .header("Access-Control-Allow-Credentials", "true") .header("Vary", "Origin") .body(hyper::Body::from(body_bytes)) .unwrap()) } Err(e) => { let body = serde_json::json!({ "error": e.to_string() }); let body_bytes = serde_json::to_vec(&body).unwrap_or_default(); Ok(Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .header("Content-Type", "application/json") .header("Access-Control-Allow-Origin", cors_origin) .header("Access-Control-Allow-Credentials", "true") .header("Vary", "Origin") .body(hyper::Body::from(body_bytes)) .unwrap()) } } } async fn handle_node_message(body: hyper::body::Bytes) -> Result> { #[derive(serde::Deserialize)] struct Incoming { from_pubkey: Option, message: Option, } let incoming: Incoming = serde_json::from_slice(&body).unwrap_or(Incoming { from_pubkey: None, message: None, }); if let (Some(from), Some(msg)) = (incoming.from_pubkey, incoming.message) { // Validate from_pubkey is a valid hex ed25519 pubkey if !is_valid_pubkey_hex(&from) { return Ok(Response::builder() .status(StatusCode::BAD_REQUEST) .header("Content-Type", "application/json") .body(hyper::Body::from(r#"{"error":"Invalid pubkey format"}"#)) .unwrap()); } // Sanitize log output to prevent log injection let safe_from = sanitize_log_string(&from); let safe_msg = sanitize_log_string(&msg); tracing::info!("Received message from {}: {}", safe_from, safe_msg); // Sanitize stored message content (strip HTML entities) let clean_from = sanitize_html(&from); let clean_msg = sanitize_html(&msg); node_msg::store_received(&clean_from, &clean_msg).await; } Ok(Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(hyper::Body::from(r#"{"ok":true}"#)) .unwrap()) } async fn handle_electrs_status() -> Result> { let status = electrs_status::get_electrs_sync_status().await; let body = serde_json::to_vec(&status).unwrap_or_default(); Ok(Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(hyper::Body::from(body)) .unwrap()) } async fn handle_lnd_connect_info( rpc: std::sync::Arc, ) -> Result> { match rpc.handle_lnd_connect_info().await { Ok(val) => { let body = serde_json::to_vec(&val).unwrap_or_default(); Ok(Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(hyper::Body::from(body)) .unwrap()) } Err(e) => Ok(Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .header("Content-Type", "application/json") .body(hyper::Body::from( serde_json::json!({"error": e.to_string()}).to_string(), )) .unwrap()), } } async fn handle_lnd_proxy(path: &str, cors_origin: &str) -> Result> { let suffix = path.strip_prefix("/proxy/lnd").unwrap_or("/"); let url = format!("http://127.0.0.1:8080{}", suffix); match reqwest::get(&url).await { Ok(resp) => { let status = resp.status().as_u16(); let headers = resp.headers().clone(); let body = resp.bytes().await.unwrap_or_default(); let mut builder = Response::builder().status(status); if let Some(ct) = headers.get("content-type") { if let Ok(s) = ct.to_str() { builder = builder.header("Content-Type", s); } } builder .header("Access-Control-Allow-Origin", cors_origin) .header("Access-Control-Allow-Credentials", "true") .header("Vary", "Origin") .body(hyper::Body::from(body)) .map_err(|e| anyhow::anyhow!("response build: {}", e)) } Err(e) => { let body = serde_json::json!({ "error": e.to_string() }); let body_bytes = serde_json::to_vec(&body).unwrap_or_default(); Ok(Response::builder() .status(StatusCode::BAD_GATEWAY) .header("Content-Type", "application/json") .header("Access-Control-Allow-Origin", cors_origin) .header("Access-Control-Allow-Credentials", "true") .header("Vary", "Origin") .body(hyper::Body::from(body_bytes)) .unwrap()) } } } async fn handle_content_catalog(config: &Config) -> Result> { match content_server::load_catalog(&config.data_dir).await { Ok(catalog) => { // Only expose public metadata for available items let items: Vec = catalog .items .iter() .filter(|i| !matches!(i.availability, content_server::Availability::Nobody)) .map(|i| { serde_json::json!({ "id": i.id, "filename": i.filename, "mime_type": i.mime_type, "size_bytes": i.size_bytes, "description": i.description, "access": i.access, }) }) .collect(); let body = serde_json::to_vec(&serde_json::json!({ "items": items })) .unwrap_or_default(); Ok(Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(hyper::Body::from(body)) .unwrap()) } Err(e) => { let body = serde_json::json!({ "error": e.to_string() }); let body_bytes = serde_json::to_vec(&body).unwrap_or_default(); Ok(Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .header("Content-Type", "application/json") .body(hyper::Body::from(body_bytes)) .unwrap()) } } } async fn handle_content_request( path: &str, headers: &hyper::HeaderMap, config: &Config, ) -> Result> { let content_id = path.strip_prefix("/content/").unwrap_or(""); if content_id.is_empty() || !is_valid_app_id(content_id) { return Ok(Response::builder() .status(StatusCode::BAD_REQUEST) .body(hyper::Body::from("Invalid content ID")) .unwrap()); } // Extract payment token from X-Payment-Token header let payment_token = headers .get("x-payment-token") .and_then(|v| v.to_str().ok()) .map(|s| s.to_string()); // Extract federation peer DID from X-Federation-DID header let peer_did = headers .get("x-federation-did") .and_then(|v| v.to_str().ok()) .map(|s| s.to_string()); // Parse Range header for streaming support let range = headers .get("range") .and_then(|v| v.to_str().ok()) .and_then(content_server::parse_range_header); match content_server::serve_content( &config.data_dir, content_id, payment_token.as_deref(), peer_did.as_deref(), range, ) .await { Ok(content_server::ServeResult::Ok(bytes, mime_type)) => { let len = bytes.len(); Ok(Response::builder() .status(StatusCode::OK) .header("Content-Type", mime_type) .header("Content-Length", len.to_string()) .header("Accept-Ranges", "bytes") .body(hyper::Body::from(bytes)) .unwrap()) } Ok(content_server::ServeResult::Partial { bytes, mime_type, start, end, total, }) => { Ok(Response::builder() .status(StatusCode::PARTIAL_CONTENT) .header("Content-Type", mime_type) .header("Content-Length", bytes.len().to_string()) .header("Content-Range", format!("bytes {}-{}/{}", start, end, total)) .header("Accept-Ranges", "bytes") .body(hyper::Body::from(bytes)) .unwrap()) } Ok(content_server::ServeResult::PaymentRequired(price_sats)) => { let body = serde_json::json!({ "error": "Payment required", "price_sats": price_sats, "payment_header": "X-Payment-Token", }); let body_bytes = serde_json::to_vec(&body).unwrap_or_default(); Ok(Response::builder() .status(StatusCode::PAYMENT_REQUIRED) .header("Content-Type", "application/json") .body(hyper::Body::from(body_bytes)) .unwrap()) } Ok(content_server::ServeResult::Forbidden) => { Ok(Response::builder() .status(StatusCode::FORBIDDEN) .header("Content-Type", "application/json") .body(hyper::Body::from( r#"{"error":"Access denied — federation peer required"}"#, )) .unwrap()) } Ok(content_server::ServeResult::NotFound) | Err(_) => { Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(hyper::Body::from("Content not found")) .unwrap()) } } } async fn handle_websocket( req: Request, state_manager: Arc, metrics_store: Arc, ) -> Result> { let (response, ws_fut_opt) = hyper_ws_listener::create_ws(req) .map_err(|e| anyhow::anyhow!("WebSocket upgrade failed: {}", e))?; if let Some(ws_fut) = ws_fut_opt { tokio::spawn(async move { let ws_stream: WsStream = match ws_fut.await { Ok(Ok(s)) => s, Ok(Err(e)) => { debug!("WebSocket handshake failed (hyper): {}", e); return; } Err(e) => { debug!("WebSocket task join failed: {}", e); return; } }; metrics_store.increment_ws(); info!("WebSocket /ws/db connected"); let (mut tx, mut rx) = ws_stream.split(); let initial_msg = state_manager.get_initial_message().await; if let Ok(json_msg) = serde_json::to_string(&initial_msg) { if let Err(e) = tx.send(Message::Text(json_msg)).await { debug!("Failed to send initial data: {}", e); return; } debug!("Sent initial data dump at revision {}", initial_msg.rev); } let mut state_rx = state_manager.subscribe(); let ping_interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); tokio::pin!(ping_interval); let mut last_client_activity = Instant::now(); const INACTIVITY_TIMEOUT_SECS: u64 = 300; // 5 minutes loop { tokio::select! { _ = ping_interval.tick() => { // Check inactivity timeout if last_client_activity.elapsed().as_secs() >= INACTIVITY_TIMEOUT_SECS { info!("WebSocket client inactive for {}s, closing", INACTIVITY_TIMEOUT_SECS); let _ = tx.send(Message::Close(None)).await; break; } if tx.send(Message::Ping(vec![])).await.is_err() { debug!("Failed to send ping, connection likely closed"); break; } } update = state_rx.recv() => { match update { Ok(msg) => { if let Ok(json_msg) = serde_json::to_string(&msg) { if let Err(e) = tx.send(Message::Text(json_msg)).await { debug!("Failed to send state update: {}", e); break; } debug!("Sent state update at revision {}", msg.rev); } } Err(broadcast::error::RecvError::Lagged(skipped)) => { debug!("Client lagged behind, skipped {} messages", skipped); } Err(broadcast::error::RecvError::Closed) => { debug!("Broadcast channel closed"); break; } } } msg = rx.next() => { match msg { Some(Ok(Message::Close(_))) => break, Some(Ok(Message::Pong(_))) => { last_client_activity = Instant::now(); debug!("Received pong"); } Some(Ok(Message::Ping(data))) => { last_client_activity = Instant::now(); let _ = tx.send(Message::Pong(data)).await; } Some(Ok(Message::Text(text))) => { last_client_activity = Instant::now(); // Handle JSON ping from frontend if text.contains("\"type\":\"ping\"") || text.contains("\"type\": \"ping\"") { let _ = tx.send(Message::Text(r#"{"type":"pong"}"#.to_string())).await; } } Some(Ok(_)) => { last_client_activity = Instant::now(); } Some(Err(e)) => { debug!("WebSocket stream error: {}", e); break; } None => break, } } } } metrics_store.decrement_ws(); info!("WebSocket /ws/db disconnected"); }); } Ok(response) } } /// Validate that an app ID matches the safe pattern: lowercase alphanumeric + hyphens. fn is_valid_app_id(id: &str) -> bool { !id.is_empty() && id.len() <= 64 && id.bytes().all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || b == b'-') && id.as_bytes()[0] != b'-' } /// Validate that a pubkey is a 64-char hex string. fn is_valid_pubkey_hex(s: &str) -> bool { s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit()) } /// Strip newlines and ANSI escape sequences from strings before logging. fn sanitize_log_string(s: &str) -> String { s.replace('\n', "\\n") .replace('\r', "\\r") .replace('\x1b', "") } /// Strip HTML-sensitive characters to prevent XSS when stored/rendered. fn sanitize_html(s: &str) -> String { s.replace('&', "&") .replace('<', "<") .replace('>', ">") .replace('"', """) .replace('\'', "'") } impl ApiHandler { /// DWN health endpoint — returns store stats. async fn handle_dwn_health(config: &Config) -> Result> { match DwnStore::new(&config.data_dir).await { Ok(store) => { let stats = store.stats().await.unwrap_or(crate::network::dwn_store::StoreStats { message_count: 0, protocol_count: 0, total_bytes: 0, }); let body = serde_json::json!({ "status": "ok", "message_count": stats.message_count, "protocol_count": stats.protocol_count, "total_bytes": stats.total_bytes, }); Ok(Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(hyper::Body::from(body.to_string())) .unwrap()) } Err(_) => Ok(Response::builder() .status(StatusCode::SERVICE_UNAVAILABLE) .header("Content-Type", "application/json") .body(hyper::Body::from(r#"{"status":"unavailable"}"#)) .unwrap()), } } /// DWN message processing endpoint — handles RecordsWrite, RecordsQuery, RecordsRead, RecordsDelete. /// Supports batch processing: all messages in the array are processed. async fn handle_dwn_message( body: hyper::body::Bytes, config: &Config, ) -> Result> { let request: serde_json::Value = match serde_json::from_slice(&body) { Ok(v) => v, Err(e) => { let err = serde_json::json!({"error": format!("Invalid JSON: {}", e)}); return Ok(Response::builder() .status(StatusCode::BAD_REQUEST) .header("Content-Type", "application/json") .body(hyper::Body::from(err.to_string())) .unwrap()); } }; // Collect all messages to process let messages: Vec = if request.get("message").is_some() { vec![request["message"].clone()] } else if let Some(msgs) = request["messages"].as_array() { msgs.clone() } else { vec![serde_json::Value::Null] }; let store = DwnStore::new(&config.data_dir).await?; let mut results = Vec::new(); for message in &messages { let interface = message["descriptor"]["interface"] .as_str() .unwrap_or(""); let method = message["descriptor"]["method"] .as_str() .unwrap_or(""); let result = match (interface, method) { ("Records", "Write") => { let author = message["author"].as_str().unwrap_or("unknown"); let protocol = message["descriptor"]["protocol"].as_str(); let schema = message["descriptor"]["schema"].as_str(); let data_format = message["descriptor"]["dataFormat"].as_str(); let data = message.get("data").cloned(); // Deduplicate: check if recordId already exists if let Some(record_id) = message["recordId"].as_str() { if store.read_message(record_id).await.ok().flatten().is_some() { serde_json::json!({"status": {"code": 200, "detail": "Already exists"}}) } else { match store .write_message(author, protocol, schema, data_format, data) .await { Ok(msg) => { serde_json::json!({"status": {"code": 202}, "entry": msg}) } Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}), } } } else { match store .write_message(author, protocol, schema, data_format, data) .await { Ok(msg) => serde_json::json!({"status": {"code": 202}, "entry": msg}), Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}), } } } ("Records", "Query") => { let query = crate::network::dwn_store::MessageQuery { protocol: message["descriptor"]["filter"]["protocol"] .as_str() .map(|s| s.to_string()), schema: message["descriptor"]["filter"]["schema"] .as_str() .map(|s| s.to_string()), author: message["descriptor"]["filter"]["author"] .as_str() .map(|s| s.to_string()), date_from: message["descriptor"]["filter"]["dateFrom"] .as_str() .map(|s| s.to_string()), date_to: message["descriptor"]["filter"]["dateTo"] .as_str() .map(|s| s.to_string()), limit: message["descriptor"]["filter"]["limit"] .as_u64() .map(|n| n as usize), }; match store.query_messages(&query).await { Ok(messages) => { serde_json::json!({"status": {"code": 200}, "entries": messages}) } Err(e) => { serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}) } } } ("Records", "Read") => { let record_id = message["descriptor"]["recordId"] .as_str() .unwrap_or(""); match store.read_message(record_id).await { Ok(Some(msg)) => { serde_json::json!({"status": {"code": 200}, "entry": msg}) } Ok(None) => serde_json::json!({"status": {"code": 404, "detail": "Record not found"}}), Err(e) => { serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}) } } } ("Records", "Delete") => { let record_id = message["descriptor"]["recordId"] .as_str() .unwrap_or(""); match store.delete_message(record_id).await { Ok(true) => serde_json::json!({"status": {"code": 200}}), Ok(false) => serde_json::json!({"status": {"code": 404, "detail": "Record not found"}}), Err(e) => { serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}) } } } _ => { serde_json::json!({"status": {"code": 400, "detail": format!("Unknown method: {}.{}", interface, method)}}) } }; results.push(result); } // Return single result for single message, array for batch let (response_body, http_status) = if results.len() == 1 { let result = &results[0]; let status_code = result["status"]["code"].as_u64().unwrap_or(200); let http_status = match status_code { 202 => StatusCode::ACCEPTED, 400 => StatusCode::BAD_REQUEST, 404 => StatusCode::NOT_FOUND, 500 => StatusCode::INTERNAL_SERVER_ERROR, _ => StatusCode::OK, }; (result.to_string(), http_status) } else { ( serde_json::json!({"replies": results}).to_string(), StatusCode::OK, ) }; Ok(Response::builder() .status(http_status) .header("Content-Type", "application/json") .body(hyper::Body::from(response_body)) .unwrap()) } }