mod blob; mod content; mod dwn; mod node_message; mod proxy; mod remote_input; mod remote_relay; mod websocket; use crate::api::rpc::RpcHandler; use crate::blobs::BlobStore; use crate::config::Config; use crate::container::{ContainerOrchestrator, DevContainerOrchestrator}; use crate::monitoring::MetricsStore; use crate::session::{self, SessionStore}; use crate::state::StateManager; use anyhow::Result; use hyper::{Method, Request, Response, StatusCode}; use sha2::{Digest, Sha256}; use std::sync::Arc; use tokio::sync::broadcast; use tracing::debug; /// Build an HTTP response without unwrap. Falls back to a plain 500 if builder fails. // Used by handler submodules after unwrap elimination #[allow(dead_code)] pub(super) fn build_response( status: StatusCode, content_type: &str, body: hyper::Body, ) -> Response { Response::builder() .status(status) .header("Content-Type", content_type) .body(body) .unwrap_or_else(|_| Response::new(hyper::Body::from("Internal error"))) } pub struct ApiHandler { config: Config, rpc_handler: Arc, state_manager: Arc, metrics_store: Arc, session_store: SessionStore, /// Broadcast channel for relaying companion app input to remote browsers. input_relay_tx: broadcast::Sender, /// Content-addressed blob store for attachments shared over mesh/federation. blob_store: Arc, /// Our own node pubkey (hex) — used to self-sign debug/test capabilities. self_pubkey_hex: String, } impl ApiHandler { pub async fn new( config: Config, state_manager: Arc, metrics_store: Arc, orchestrator: Option>, dev_orchestrator: Option>, ) -> Result { let session_store = SessionStore::new().await; let rpc_handler = Arc::new( RpcHandler::new( config.clone(), state_manager.clone(), metrics_store.clone(), session_store.clone(), orchestrator, dev_orchestrator, ) .await?, ); let (input_relay_tx, _) = broadcast::channel(64); // Derive a blob-store capability key from the node's Ed25519 signing // key. SHA-256 domain-separated so rotating the identity rotates // every outstanding capability token (intentional — prevents a // replaced node from honouring old caps). let identity_dir = config.data_dir.join("identity"); let identity = crate::identity::NodeIdentity::load_or_create(&identity_dir).await?; let mut hasher = Sha256::new(); hasher.update(identity.signing_key().to_bytes()); hasher.update(b"|archipelago-blob-cap-v1"); let mut cap_key = [0u8; 32]; cap_key.copy_from_slice(&hasher.finalize()); let blob_store = Arc::new(BlobStore::open(&config.data_dir, cap_key).await?); let self_pubkey_hex = hex::encode(identity.signing_key().verifying_key().as_bytes()); // Share blob store with the RPC layer so mesh.send-content / // mesh.fetch-content can reach the same instance (single cap_key, // single on-disk root) without re-opening it. rpc_handler .set_blob_store(blob_store.clone(), self_pubkey_hex.clone()) .await; Ok(Self { config, rpc_handler, state_manager, metrics_store, session_store, input_relay_tx, blob_store, self_pubkey_hex, }) } /// Access the RPC handler (for service initialization after construction). pub fn rpc_handler(&self) -> &Arc { &self.rpc_handler } /// Check if the request has a valid session cookie. async fn is_authenticated(&self, headers: &hyper::HeaderMap) -> bool { match session::extract_session_cookie(headers) { Some(token) => self.session_store.validate(&token).await, None => false, } } /// Server-side fetch of the upstream app catalog so the browser can /// load it without fighting CORS (git.tx1138.com emits no ACAO) or /// CSP (the fallback IP-port URL isn't in `connect-src`). The upstream /// list is derived from the operator's configured container registries /// so switching mirrors in Settings changes the App Store source too — /// each active registry contributes one Gitea `raw/branch/main/catalog.json` /// URL (http or https per `tls_verify`), tried in priority order. /// If registry config can't be loaded, falls back to the legacy /// hardcoded pair so the App Store still renders on nodes that haven't /// persisted a registry config yet. 15s total timeout. async fn handle_app_catalog_proxy(&self) -> Result> { let mut upstreams: Vec = Vec::new(); if let Ok(config) = crate::container::registry::load_registries(&self.config.data_dir).await { for reg in config.active_registries() { let scheme = if reg.tls_verify { "https" } else { "http" }; // Gitea raw URL: :////app-catalog/raw/branch/main/catalog.json. // reg.url already includes the namespace (e.g. "host/lfg2025"), // so we just tack on the repo + raw path. upstreams.push(format!( "{}://{}/app-catalog/raw/branch/main/catalog.json", scheme, reg.url )); } } if upstreams.is_empty() { upstreams.push( "http://146.59.87.168:3000/lfg2025/app-catalog/raw/branch/main/catalog.json" .to_string(), ); upstreams.push( "https://git.tx1138.com/lfg2025/app-catalog/raw/branch/main/catalog.json" .to_string(), ); } let client = match reqwest::Client::builder() .timeout(std::time::Duration::from_secs(15)) .build() { Ok(c) => c, Err(e) => { return Ok(build_response( hyper::StatusCode::INTERNAL_SERVER_ERROR, "text/plain", hyper::Body::from(format!("client build failed: {}", e)), )); } }; for url in &upstreams { match client.get(url).send().await { Ok(resp) if resp.status().is_success() => { if let Ok(bytes) = resp.bytes().await { return Ok(Response::builder() .status(hyper::StatusCode::OK) .header("Content-Type", "application/json") .header("Cache-Control", "public, max-age=3600") .body(hyper::Body::from(bytes)) .unwrap_or_else(|_| { Response::new(hyper::Body::from("proxy response build failed")) })); } } _ => continue, } } Ok(build_response( hyper::StatusCode::BAD_GATEWAY, "text/plain", hyper::Body::from("all upstream catalog URLs failed"), )) } /// Build a 401 Unauthorized JSON response. fn unauthorized() -> Response { let body = serde_json::json!({ "error": "Unauthorized" }); let body_bytes = serde_json::to_vec(&body).unwrap_or_default(); Response::builder() .status(StatusCode::UNAUTHORIZED) .header("Content-Type", "application/json") .body(hyper::Body::from(body_bytes)) .unwrap() } /// Allowed CORS origins derived from the config host IP. fn allowed_origins(&self) -> Vec { let mut origins = vec![ format!("http://{}", self.config.host_ip), format!("https://{}", self.config.host_ip), ]; if self.config.dev_mode { origins.push("http://localhost:8100".to_string()); // Vite dev server } origins } /// Validate the Origin header against allowed origins. /// Returns the matched origin if valid, None if cross-origin is not allowed. fn validate_origin(&self, headers: &hyper::HeaderMap) -> Option { let origin = headers.get("origin").and_then(|v| v.to_str().ok())?; let allowed = self.allowed_origins(); if allowed.iter().any(|a| a == origin) { Some(origin.to_string()) } else { None } } /// Permissive origin check for the share-to-mesh iframe intent: any scheme /// http(s):// followed by the configured host_ip, optionally `:port`. Apps /// proxied under other ports (APP_PORTS) call this from within the same /// node, so they share host_ip but not port. The session cookie still has /// to be valid — this is a sanity check, not the primary auth. fn validate_app_origin(&self, headers: &hyper::HeaderMap) -> Option { let origin = headers.get("origin").and_then(|v| v.to_str().ok())?; // Allow localhost dev server too so the Vite frontend can exercise it. if self.config.dev_mode && origin == "http://localhost:8100" { return Some(origin.to_string()); } let host_ip = &self.config.host_ip; let matches = |scheme: &str| -> bool { let prefix = format!("{}{}", scheme, host_ip); if origin == prefix { return true; } let with_port = format!("{}:", prefix); origin.starts_with(&with_port) && origin[with_port.len()..] .bytes() .all(|b| b.is_ascii_digit()) }; if matches("http://") || matches("https://") { Some(origin.to_string()) } else { None } } pub async fn handle_request(&self, req: Request) -> Result> { let path = req.uri().path().to_string(); let method = req.method().clone(); // Handle CORS preflight for all routes if method == Method::OPTIONS { let mut builder = Response::builder() .status(StatusCode::NO_CONTENT) .header("Vary", "Origin"); if let Some(origin) = self.validate_origin(req.headers()) { builder = builder .header("Access-Control-Allow-Origin", &origin) .header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") .header("Access-Control-Allow-Headers", "Content-Type, X-CSRF-Token") .header("Access-Control-Allow-Credentials", "true"); } return Ok(builder.body(hyper::Body::empty()).unwrap()); } // WebSocket upgrade — validate session before upgrading if method == Method::GET && path == "/ws/db" { if !self.is_authenticated(req.headers()).await { tracing::warn!("401 WebSocket /ws/db — session invalid or missing"); return Ok(Self::unauthorized()); } return Self::handle_websocket( req, self.state_manager.clone(), self.metrics_store.clone(), ) .await; } // Remote input WebSocket — companion app sends keyboard/mouse events if method == Method::GET && path == "/ws/remote-input" { if !self.is_authenticated(req.headers()).await { tracing::warn!("401 WebSocket /ws/remote-input — session invalid or missing"); return Ok(Self::unauthorized()); } return Self::handle_remote_input(req, self.input_relay_tx.clone()).await; } // Remote relay WebSocket — browser receives companion input events if method == Method::GET && path == "/ws/remote-relay" { if !self.is_authenticated(req.headers()).await { tracing::warn!("401 WebSocket /ws/remote-relay — session invalid or missing"); return Ok(Self::unauthorized()); } return Self::handle_remote_relay(req, self.input_relay_tx.subscribe()).await; } // Convert body to bytes for non-WS routes let headers = req.headers().clone(); let query_string = req.uri().query().map(|s| s.to_string()).unwrap_or_default(); let (parts, body) = req.into_parts(); let body_bytes = hyper::body::to_bytes(body) .await .map_err(|e| anyhow::anyhow!("Failed to read body: {}", e))?; let req_with_bytes = Request::from_parts(parts, hyper::Body::from(body_bytes.clone())); debug!("{} {}", method, path); match (method, path.as_str()) { // RPC — auth is handled inside rpc handler per-method (Method::POST, "/rpc/v1") => self.rpc_handler.clone().handle(req_with_bytes).await, // Health — unauthenticated, returns JSON with service status (Method::GET, "/health") => { let recovery_complete = crate::crash_recovery::is_recovery_complete(); let uptime = crate::crash_recovery::uptime_seconds(); let health_status = if recovery_complete { "ok" } else { "degraded" }; let status = serde_json::json!({ "status": health_status, "crash_recovery_complete": recovery_complete, "uptime_seconds": uptime, "version": env!("CARGO_PKG_VERSION"), "services": { "rpc": true, "sessions": true, } }); Ok(Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(hyper::Body::from( serde_json::to_vec(&status).unwrap_or_default(), )) .unwrap()) } // Node message — P2P endpoint (authenticated by source validation, not cookie) (Method::POST, "/archipelago/node-message") => { Self::handle_node_message(body_bytes).await } // Mesh typed envelope relay over federation — peers POST // pre-encoded TypedEnvelope wire bytes here when the envelope is // too large for a single LoRa frame (primarily ContentRef). No // session auth: the body carries a pubkey + ed25519 signature // over the wire bytes which we verify before dispatching. (Method::POST, "/archipelago/mesh-typed") => { Self::handle_mesh_typed_relay(self.rpc_handler.clone(), body_bytes).await } // Blob upload — local/session use only. Session-authenticated so // only the node owner can push attachments into the blob store. (Method::POST, "/api/blob") => { if !self.is_authenticated(&headers).await { return Ok(Self::unauthorized()); } Self::handle_blob_upload( &self.blob_store, &self.self_pubkey_hex, &self.config.data_dir, &headers, body_bytes, ) .await } // Share-to-mesh intent — marketplace app iframes POST a file here // to stage it as a mesh attachment. Same body format as /api/blob // (raw bytes + X-Blob-Mime/X-Blob-Filename headers). The app is // expected to postMessage `{type:'share-to-mesh', cid, ...}` to // its parent window afterwards so the Mesh view can pick it up. // Authenticated by session cookie + a relaxed Origin check (any // port on the archipelago host is allowed, so proxied apps on // their own ports can reach it with credentials:'include'). (Method::POST, "/api/share-to-mesh") => { if !self.is_authenticated(&headers).await { return Ok(Self::unauthorized()); } let origin = match self.validate_app_origin(&headers) { Some(o) => o, None => { return Ok(build_response( StatusCode::FORBIDDEN, "text/plain", hyper::Body::from("origin not allowed"), )) } }; Self::handle_share_to_mesh( &self.blob_store, &self.self_pubkey_hex, &headers, body_bytes, &origin, ) .await } // Blob download — peer-facing. No session required; authenticated // by HMAC capability token signed when the blob ref was shared. (Method::GET, p) if p.starts_with("/blob/") => { Self::handle_blob_download(&self.blob_store, p, &query_string).await } // Content preview — degraded previews for paid content (no auth, no payment) (Method::GET, p) if p.starts_with("/content/") && p.ends_with("/preview") => { Self::handle_content_preview(p, &self.config).await } // Content serving — peers access shared content over Tor (no session auth) (Method::GET, p) if p.starts_with("/content/") => { Self::handle_content_request(p, &headers, &self.config).await } // Content catalog — list available content (no session auth, for peers) (Method::GET, "/content") => Self::handle_content_catalog(&self.config).await, // Electrs status — unauthenticated (read-only sync status) (Method::GET, "/electrs-status") => Self::handle_electrs_status().await, (Method::GET, "/bitcoin-status") => Self::handle_bitcoin_status().await, // App-catalog proxy — fetches catalog.json from the configured // upstream URLs server-side so the browser doesn't hit CORS // (git.tx1138.com has no ACAO header) or CSP (IP-port upstream // falls outside `connect-src`). Session-authenticated so only // the logged-in node owner can spin up fetches. (Method::GET, "/api/app-catalog") => { if !self.is_authenticated(&headers).await { return Ok(Self::unauthorized()); } self.handle_app_catalog_proxy().await } // LND connect info — nginx validates session cookie (presence check), // backend is bound to 127.0.0.1 so only nginx can reach it. // No backend auth check here because the LND UI iframe fetches this // endpoint and the session cookie flow is validated at the nginx layer. (Method::GET, "/lnd-connect-info") => { Self::handle_lnd_connect_info(self.rpc_handler.clone()).await } // Container logs — requires session (Method::GET, path) if path.starts_with("/api/container/logs") => { if !self.is_authenticated(&headers).await { return Ok(Self::unauthorized()); } let origin = self.validate_origin(&headers).unwrap_or_default(); Self::handle_container_logs_http(self.rpc_handler.clone(), path, &origin).await } // LND proxy — requires session (Method::GET, path) if path.starts_with("/proxy/lnd/") => { if !self.is_authenticated(&headers).await { return Ok(Self::unauthorized()); } let origin = self.validate_origin(&headers).unwrap_or_default(); Self::handle_lnd_proxy(path, &origin).await } // DWN health — unauthenticated (Method::GET, "/dwn/health") => Self::handle_dwn_health(&self.config).await, // DWN message processing — peers access over Tor for sync (no session auth) (Method::POST, "/dwn") => Self::handle_dwn_message(body_bytes, &self.config).await, _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(hyper::Body::from("Not Found")) .unwrap()), } } } /// Validate that an app ID matches the safe pattern: lowercase alphanumeric + hyphens. fn is_valid_app_id(id: &str) -> bool { !id.is_empty() && id.len() <= 64 && id .bytes() .all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || b == b'-') && id.as_bytes()[0] != b'-' } /// Validate that a pubkey is a 64-char hex string. fn is_valid_pubkey_hex(s: &str) -> bool { s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit()) } /// Strip newlines and ANSI escape sequences from strings before logging. fn sanitize_log_string(s: &str) -> String { s.replace('\n', "\\n") .replace('\r', "\\r") .replace('\x1b', "") } /// Strip HTML-sensitive characters to prevent XSS when stored/rendered. fn sanitize_html(s: &str) -> String { s.replace('&', "&") .replace('<', "<") .replace('>', ">") .replace('"', """) .replace('\'', "'") }