2026-04-30 16:37:54 -04:00

516 lines
22 KiB
Rust

mod blob;
mod content;
mod dwn;
mod node_message;
mod proxy;
mod remote_input;
mod remote_relay;
mod websocket;
use crate::api::rpc::RpcHandler;
use crate::blobs::BlobStore;
use crate::config::Config;
use crate::container::{ContainerOrchestrator, DevContainerOrchestrator};
use crate::monitoring::MetricsStore;
use crate::session::{self, SessionStore};
use crate::state::StateManager;
use anyhow::Result;
use hyper::{Method, Request, Response, StatusCode};
use sha2::{Digest, Sha256};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::debug;
/// Build an HTTP response without unwrap. Falls back to a plain 500 if builder fails.
// Used by handler submodules after unwrap elimination
#[allow(dead_code)]
pub(super) fn build_response(
status: StatusCode,
content_type: &str,
body: hyper::Body,
) -> Response<hyper::Body> {
Response::builder()
.status(status)
.header("Content-Type", content_type)
.body(body)
.unwrap_or_else(|_| Response::new(hyper::Body::from("Internal error")))
}
pub struct ApiHandler {
config: Config,
rpc_handler: Arc<RpcHandler>,
state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>,
session_store: SessionStore,
/// Broadcast channel for relaying companion app input to remote browsers.
input_relay_tx: broadcast::Sender<String>,
/// Content-addressed blob store for attachments shared over mesh/federation.
blob_store: Arc<BlobStore>,
/// Our own node pubkey (hex) — used to self-sign debug/test capabilities.
self_pubkey_hex: String,
}
impl ApiHandler {
pub async fn new(
config: Config,
state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>,
orchestrator: Option<Arc<dyn ContainerOrchestrator>>,
dev_orchestrator: Option<Arc<DevContainerOrchestrator>>,
) -> Result<Self> {
let session_store = SessionStore::new().await;
let rpc_handler = Arc::new(
RpcHandler::new(
config.clone(),
state_manager.clone(),
metrics_store.clone(),
session_store.clone(),
orchestrator,
dev_orchestrator,
)
.await?,
);
let (input_relay_tx, _) = broadcast::channel(64);
// Derive a blob-store capability key from the node's Ed25519 signing
// key. SHA-256 domain-separated so rotating the identity rotates
// every outstanding capability token (intentional — prevents a
// replaced node from honouring old caps).
let identity_dir = config.data_dir.join("identity");
let identity = crate::identity::NodeIdentity::load_or_create(&identity_dir).await?;
let mut hasher = Sha256::new();
hasher.update(identity.signing_key().to_bytes());
hasher.update(b"|archipelago-blob-cap-v1");
let mut cap_key = [0u8; 32];
cap_key.copy_from_slice(&hasher.finalize());
let blob_store = Arc::new(BlobStore::open(&config.data_dir, cap_key).await?);
let self_pubkey_hex = hex::encode(identity.signing_key().verifying_key().as_bytes());
// Share blob store with the RPC layer so mesh.send-content /
// mesh.fetch-content can reach the same instance (single cap_key,
// single on-disk root) without re-opening it.
rpc_handler
.set_blob_store(blob_store.clone(), self_pubkey_hex.clone())
.await;
Ok(Self {
config,
rpc_handler,
state_manager,
metrics_store,
session_store,
input_relay_tx,
blob_store,
self_pubkey_hex,
})
}
/// Access the RPC handler (for service initialization after construction).
pub fn rpc_handler(&self) -> &Arc<RpcHandler> {
&self.rpc_handler
}
/// Check if the request has a valid session cookie.
async fn is_authenticated(&self, headers: &hyper::HeaderMap) -> bool {
match session::extract_session_cookie(headers) {
Some(token) => self.session_store.validate(&token).await,
None => false,
}
}
/// Server-side fetch of the upstream app catalog so the browser can
/// load it without fighting CORS (git.tx1138.com emits no ACAO) or
/// CSP (the fallback IP-port URL isn't in `connect-src`). The upstream
/// list is derived from the operator's configured container registries
/// so switching mirrors in Settings changes the App Store source too —
/// each active registry contributes one Gitea `raw/branch/main/catalog.json`
/// URL (http or https per `tls_verify`), tried in priority order.
/// If registry config can't be loaded, falls back to the legacy
/// hardcoded pair so the App Store still renders on nodes that haven't
/// persisted a registry config yet. 15s total timeout.
async fn handle_app_catalog_proxy(&self) -> Result<Response<hyper::Body>> {
let mut upstreams: Vec<String> = Vec::new();
if let Ok(config) = crate::container::registry::load_registries(&self.config.data_dir).await
{
for reg in config.active_registries() {
let scheme = if reg.tls_verify { "https" } else { "http" };
// Gitea raw URL: <scheme>://<host>/<namespace>/app-catalog/raw/branch/main/catalog.json.
// reg.url already includes the namespace (e.g. "host/lfg2025"),
// so we just tack on the repo + raw path.
upstreams.push(format!(
"{}://{}/app-catalog/raw/branch/main/catalog.json",
scheme, reg.url
));
}
}
if upstreams.is_empty() {
upstreams.push(
"http://146.59.87.168:3000/lfg2025/app-catalog/raw/branch/main/catalog.json"
.to_string(),
);
upstreams.push(
"https://git.tx1138.com/lfg2025/app-catalog/raw/branch/main/catalog.json"
.to_string(),
);
}
let client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(15))
.build()
{
Ok(c) => c,
Err(e) => {
return Ok(build_response(
hyper::StatusCode::INTERNAL_SERVER_ERROR,
"text/plain",
hyper::Body::from(format!("client build failed: {}", e)),
));
}
};
for url in &upstreams {
match client.get(url).send().await {
Ok(resp) if resp.status().is_success() => {
if let Ok(bytes) = resp.bytes().await {
return Ok(Response::builder()
.status(hyper::StatusCode::OK)
.header("Content-Type", "application/json")
.header("Cache-Control", "public, max-age=3600")
.body(hyper::Body::from(bytes))
.unwrap_or_else(|_| {
Response::new(hyper::Body::from("proxy response build failed"))
}));
}
}
_ => continue,
}
}
Ok(build_response(
hyper::StatusCode::BAD_GATEWAY,
"text/plain",
hyper::Body::from("all upstream catalog URLs failed"),
))
}
/// Build a 401 Unauthorized JSON response.
fn unauthorized() -> Response<hyper::Body> {
let body = serde_json::json!({ "error": "Unauthorized" });
let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
Response::builder()
.status(StatusCode::UNAUTHORIZED)
.header("Content-Type", "application/json")
.body(hyper::Body::from(body_bytes))
.unwrap()
}
/// Allowed CORS origins derived from the config host IP.
fn allowed_origins(&self) -> Vec<String> {
let mut origins = vec![
format!("http://{}", self.config.host_ip),
format!("https://{}", self.config.host_ip),
];
if self.config.dev_mode {
origins.push("http://localhost:8100".to_string()); // Vite dev server
}
origins
}
/// Validate the Origin header against allowed origins.
/// Returns the matched origin if valid, None if cross-origin is not allowed.
fn validate_origin(&self, headers: &hyper::HeaderMap) -> Option<String> {
let origin = headers.get("origin").and_then(|v| v.to_str().ok())?;
let allowed = self.allowed_origins();
if allowed.iter().any(|a| a == origin) {
Some(origin.to_string())
} else {
None
}
}
/// Permissive origin check for the share-to-mesh iframe intent: any scheme
/// http(s):// followed by the configured host_ip, optionally `:port`. Apps
/// proxied under other ports (APP_PORTS) call this from within the same
/// node, so they share host_ip but not port. The session cookie still has
/// to be valid — this is a sanity check, not the primary auth.
fn validate_app_origin(&self, headers: &hyper::HeaderMap) -> Option<String> {
let origin = headers.get("origin").and_then(|v| v.to_str().ok())?;
// Allow localhost dev server too so the Vite frontend can exercise it.
if self.config.dev_mode && origin == "http://localhost:8100" {
return Some(origin.to_string());
}
let host_ip = &self.config.host_ip;
let matches = |scheme: &str| -> bool {
let prefix = format!("{}{}", scheme, host_ip);
if origin == prefix {
return true;
}
let with_port = format!("{}:", prefix);
origin.starts_with(&with_port)
&& origin[with_port.len()..]
.bytes()
.all(|b| b.is_ascii_digit())
};
if matches("http://") || matches("https://") {
Some(origin.to_string())
} else {
None
}
}
pub async fn handle_request(&self, req: Request<hyper::Body>) -> Result<Response<hyper::Body>> {
let path = req.uri().path().to_string();
let method = req.method().clone();
// Handle CORS preflight for all routes
if method == Method::OPTIONS {
let mut builder = Response::builder()
.status(StatusCode::NO_CONTENT)
.header("Vary", "Origin");
if let Some(origin) = self.validate_origin(req.headers()) {
builder = builder
.header("Access-Control-Allow-Origin", &origin)
.header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
.header("Access-Control-Allow-Headers", "Content-Type, X-CSRF-Token")
.header("Access-Control-Allow-Credentials", "true");
}
return Ok(builder.body(hyper::Body::empty()).unwrap());
}
// WebSocket upgrade — validate session before upgrading
if method == Method::GET && path == "/ws/db" {
if !self.is_authenticated(req.headers()).await {
tracing::warn!("401 WebSocket /ws/db — session invalid or missing");
return Ok(Self::unauthorized());
}
return Self::handle_websocket(
req,
self.state_manager.clone(),
self.metrics_store.clone(),
)
.await;
}
// Remote input WebSocket — companion app sends keyboard/mouse events
if method == Method::GET && path == "/ws/remote-input" {
if !self.is_authenticated(req.headers()).await {
tracing::warn!("401 WebSocket /ws/remote-input — session invalid or missing");
return Ok(Self::unauthorized());
}
return Self::handle_remote_input(req, self.input_relay_tx.clone()).await;
}
// Remote relay WebSocket — browser receives companion input events
if method == Method::GET && path == "/ws/remote-relay" {
if !self.is_authenticated(req.headers()).await {
tracing::warn!("401 WebSocket /ws/remote-relay — session invalid or missing");
return Ok(Self::unauthorized());
}
return Self::handle_remote_relay(req, self.input_relay_tx.subscribe()).await;
}
// Convert body to bytes for non-WS routes
let headers = req.headers().clone();
let query_string = req.uri().query().map(|s| s.to_string()).unwrap_or_default();
let (parts, body) = req.into_parts();
let body_bytes = hyper::body::to_bytes(body)
.await
.map_err(|e| anyhow::anyhow!("Failed to read body: {}", e))?;
let req_with_bytes = Request::from_parts(parts, hyper::Body::from(body_bytes.clone()));
debug!("{} {}", method, path);
match (method, path.as_str()) {
// RPC — auth is handled inside rpc handler per-method
(Method::POST, "/rpc/v1") => self.rpc_handler.clone().handle(req_with_bytes).await,
// Health — unauthenticated, returns JSON with service status
(Method::GET, "/health") => {
let recovery_complete = crate::crash_recovery::is_recovery_complete();
let uptime = crate::crash_recovery::uptime_seconds();
let health_status = if recovery_complete { "ok" } else { "degraded" };
let status = serde_json::json!({
"status": health_status,
"crash_recovery_complete": recovery_complete,
"uptime_seconds": uptime,
"version": env!("CARGO_PKG_VERSION"),
"services": {
"rpc": true,
"sessions": true,
}
});
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(hyper::Body::from(
serde_json::to_vec(&status).unwrap_or_default(),
))
.unwrap())
}
// Node message — P2P endpoint (authenticated by source validation, not cookie)
(Method::POST, "/archipelago/node-message") => {
Self::handle_node_message(body_bytes).await
}
// Mesh typed envelope relay over federation — peers POST
// pre-encoded TypedEnvelope wire bytes here when the envelope is
// too large for a single LoRa frame (primarily ContentRef). No
// session auth: the body carries a pubkey + ed25519 signature
// over the wire bytes which we verify before dispatching.
(Method::POST, "/archipelago/mesh-typed") => {
Self::handle_mesh_typed_relay(self.rpc_handler.clone(), body_bytes).await
}
// Blob upload — local/session use only. Session-authenticated so
// only the node owner can push attachments into the blob store.
(Method::POST, "/api/blob") => {
if !self.is_authenticated(&headers).await {
return Ok(Self::unauthorized());
}
Self::handle_blob_upload(
&self.blob_store,
&self.self_pubkey_hex,
&self.config.data_dir,
&headers,
body_bytes,
)
.await
}
// Share-to-mesh intent — marketplace app iframes POST a file here
// to stage it as a mesh attachment. Same body format as /api/blob
// (raw bytes + X-Blob-Mime/X-Blob-Filename headers). The app is
// expected to postMessage `{type:'share-to-mesh', cid, ...}` to
// its parent window afterwards so the Mesh view can pick it up.
// Authenticated by session cookie + a relaxed Origin check (any
// port on the archipelago host is allowed, so proxied apps on
// their own ports can reach it with credentials:'include').
(Method::POST, "/api/share-to-mesh") => {
if !self.is_authenticated(&headers).await {
return Ok(Self::unauthorized());
}
let origin = match self.validate_app_origin(&headers) {
Some(o) => o,
None => {
return Ok(build_response(
StatusCode::FORBIDDEN,
"text/plain",
hyper::Body::from("origin not allowed"),
))
}
};
Self::handle_share_to_mesh(
&self.blob_store,
&self.self_pubkey_hex,
&headers,
body_bytes,
&origin,
)
.await
}
// Blob download — peer-facing. No session required; authenticated
// by HMAC capability token signed when the blob ref was shared.
(Method::GET, p) if p.starts_with("/blob/") => {
Self::handle_blob_download(&self.blob_store, p, &query_string).await
}
// Content preview — degraded previews for paid content (no auth, no payment)
(Method::GET, p) if p.starts_with("/content/") && p.ends_with("/preview") => {
Self::handle_content_preview(p, &self.config).await
}
// Content serving — peers access shared content over Tor (no session auth)
(Method::GET, p) if p.starts_with("/content/") => {
Self::handle_content_request(p, &headers, &self.config).await
}
// Content catalog — list available content (no session auth, for peers)
(Method::GET, "/content") => Self::handle_content_catalog(&self.config).await,
// Electrs status — unauthenticated (read-only sync status)
(Method::GET, "/electrs-status") => Self::handle_electrs_status().await,
(Method::GET, "/bitcoin-status") => Self::handle_bitcoin_status().await,
// App-catalog proxy — fetches catalog.json from the configured
// upstream URLs server-side so the browser doesn't hit CORS
// (git.tx1138.com has no ACAO header) or CSP (IP-port upstream
// falls outside `connect-src`). Session-authenticated so only
// the logged-in node owner can spin up fetches.
(Method::GET, "/api/app-catalog") => {
if !self.is_authenticated(&headers).await {
return Ok(Self::unauthorized());
}
self.handle_app_catalog_proxy().await
}
// LND connect info — nginx validates session cookie (presence check),
// backend is bound to 127.0.0.1 so only nginx can reach it.
// No backend auth check here because the LND UI iframe fetches this
// endpoint and the session cookie flow is validated at the nginx layer.
(Method::GET, "/lnd-connect-info") => {
Self::handle_lnd_connect_info(self.rpc_handler.clone()).await
}
// Container logs — requires session
(Method::GET, path) if path.starts_with("/api/container/logs") => {
if !self.is_authenticated(&headers).await {
return Ok(Self::unauthorized());
}
let origin = self.validate_origin(&headers).unwrap_or_default();
Self::handle_container_logs_http(self.rpc_handler.clone(), path, &origin).await
}
// LND proxy — requires session
(Method::GET, path) if path.starts_with("/proxy/lnd/") => {
if !self.is_authenticated(&headers).await {
return Ok(Self::unauthorized());
}
let origin = self.validate_origin(&headers).unwrap_or_default();
Self::handle_lnd_proxy(path, &origin).await
}
// DWN health — unauthenticated
(Method::GET, "/dwn/health") => Self::handle_dwn_health(&self.config).await,
// DWN message processing — peers access over Tor for sync (no session auth)
(Method::POST, "/dwn") => Self::handle_dwn_message(body_bytes, &self.config).await,
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(hyper::Body::from("Not Found"))
.unwrap()),
}
}
}
/// Validate that an app ID matches the safe pattern: lowercase alphanumeric + hyphens.
fn is_valid_app_id(id: &str) -> bool {
!id.is_empty()
&& id.len() <= 64
&& id
.bytes()
.all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || b == b'-')
&& id.as_bytes()[0] != b'-'
}
/// Validate that a pubkey is a 64-char hex string.
fn is_valid_pubkey_hex(s: &str) -> bool {
s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit())
}
/// Strip newlines and ANSI escape sequences from strings before logging.
fn sanitize_log_string(s: &str) -> String {
s.replace('\n', "\\n")
.replace('\r', "\\r")
.replace('\x1b', "")
}
/// Strip HTML-sensitive characters to prevent XSS when stored/rendered.
fn sanitize_html(s: &str) -> String {
s.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
.replace('"', "&quot;")
.replace('\'', "&#x27;")
}