archipelago d9d5fa65e5 chore: retire .23 VPS mirror, promote .168 OVH to primary
The Hetzner VPS at 23.182.128.160 was decommissioned. Replace it
everywhere with the OVH VPS at 146.59.87.168, which was previously
the tertiary mirror.

  - update.rs: drop DEFAULT_TERTIARY_MIRROR_URL, promote .168 into
    the secondary slot as "Server 1 (OVH)"; tx1138 becomes Server 2.
    Default mirror list shrinks from 3 to 2.
  - container/registry.rs: default RegistryConfig drops .23, promotes
    .168 to Server 1 / priority 0, tx1138 stays Server 2 / priority 10.
  - api/rpc/package/config.rs: trusted-registry allowlist swaps .23
    for .168.
  - api/handler/mod.rs: app-catalog fallback URL uses .168.
  - neode-ui/views/marketplace/marketplaceData.ts: REGISTRY uses .168.
  - scripts/image-versions.sh: ARCHY_REGISTRY_FALLBACK uses .168.
  - image-recipe/build-auto-installer-iso.sh: installer ISO registries
    use .168 (both podman registries.conf and backend registries.json).

Tests updated to assert on the new 2-entry default lists (registry +
mirror). URL-parser fixture tests in update.rs retain .23 strings —
they exercise string-parsing logic, not mirror policy.

Git remotes: dropped `gitea-vps` and the .23 push URL on the `origin`
multi-push alias (not part of this commit — pure working-copy change).
2026-04-23 08:22:32 -04:00

516 lines
21 KiB
Rust

mod blob;
mod content;
mod dwn;
mod node_message;
mod proxy;
mod remote_input;
mod remote_relay;
mod websocket;
use crate::api::rpc::RpcHandler;
use crate::blobs::BlobStore;
use crate::config::Config;
use crate::container::{ContainerOrchestrator, DevContainerOrchestrator};
use crate::monitoring::MetricsStore;
use crate::session::{self, SessionStore};
use crate::state::StateManager;
use anyhow::Result;
use hyper::{Method, Request, Response, StatusCode};
use sha2::{Digest, Sha256};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::debug;
/// Build an HTTP response without unwrap. Falls back to a plain 500 if builder fails.
// Used by handler submodules after unwrap elimination
#[allow(dead_code)]
pub(super) fn build_response(
status: StatusCode,
content_type: &str,
body: hyper::Body,
) -> Response<hyper::Body> {
Response::builder()
.status(status)
.header("Content-Type", content_type)
.body(body)
.unwrap_or_else(|_| Response::new(hyper::Body::from("Internal error")))
}
pub struct ApiHandler {
config: Config,
rpc_handler: Arc<RpcHandler>,
state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>,
session_store: SessionStore,
/// Broadcast channel for relaying companion app input to remote browsers.
input_relay_tx: broadcast::Sender<String>,
/// Content-addressed blob store for attachments shared over mesh/federation.
blob_store: Arc<BlobStore>,
/// Our own node pubkey (hex) — used to self-sign debug/test capabilities.
self_pubkey_hex: String,
}
impl ApiHandler {
pub async fn new(
config: Config,
state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>,
orchestrator: Option<Arc<dyn ContainerOrchestrator>>,
dev_orchestrator: Option<Arc<DevContainerOrchestrator>>,
) -> Result<Self> {
let session_store = SessionStore::new().await;
let rpc_handler = Arc::new(
RpcHandler::new(
config.clone(),
state_manager.clone(),
metrics_store.clone(),
session_store.clone(),
orchestrator,
dev_orchestrator,
)
.await?,
);
let (input_relay_tx, _) = broadcast::channel(64);
// Derive a blob-store capability key from the node's Ed25519 signing
// key. SHA-256 domain-separated so rotating the identity rotates
// every outstanding capability token (intentional — prevents a
// replaced node from honouring old caps).
let identity_dir = config.data_dir.join("identity");
let identity = crate::identity::NodeIdentity::load_or_create(&identity_dir).await?;
let mut hasher = Sha256::new();
hasher.update(identity.signing_key().to_bytes());
hasher.update(b"|archipelago-blob-cap-v1");
let mut cap_key = [0u8; 32];
cap_key.copy_from_slice(&hasher.finalize());
let blob_store = Arc::new(BlobStore::open(&config.data_dir, cap_key).await?);
let self_pubkey_hex = hex::encode(identity.signing_key().verifying_key().as_bytes());
// Share blob store with the RPC layer so mesh.send-content /
// mesh.fetch-content can reach the same instance (single cap_key,
// single on-disk root) without re-opening it.
rpc_handler
.set_blob_store(blob_store.clone(), self_pubkey_hex.clone())
.await;
Ok(Self {
config,
rpc_handler,
state_manager,
metrics_store,
session_store,
input_relay_tx,
blob_store,
self_pubkey_hex,
})
}
/// Access the RPC handler (for service initialization after construction).
pub fn rpc_handler(&self) -> &Arc<RpcHandler> {
&self.rpc_handler
}
/// Check if the request has a valid session cookie.
async fn is_authenticated(&self, headers: &hyper::HeaderMap) -> bool {
match session::extract_session_cookie(headers) {
Some(token) => self.session_store.validate(&token).await,
None => false,
}
}
/// Server-side fetch of the upstream app catalog so the browser can
/// load it without fighting CORS (git.tx1138.com emits no ACAO) or
/// CSP (the fallback IP-port URL isn't in `connect-src`). The upstream
/// list is derived from the operator's configured container registries
/// so switching mirrors in Settings changes the App Store source too —
/// each active registry contributes one Gitea `raw/branch/main/catalog.json`
/// URL (http or https per `tls_verify`), tried in priority order.
/// If registry config can't be loaded, falls back to the legacy
/// hardcoded pair so the App Store still renders on nodes that haven't
/// persisted a registry config yet. 15s total timeout.
async fn handle_app_catalog_proxy(&self) -> Result<Response<hyper::Body>> {
let mut upstreams: Vec<String> = Vec::new();
if let Ok(config) =
crate::container::registry::load_registries(&self.config.data_dir).await
{
for reg in config.active_registries() {
let scheme = if reg.tls_verify { "https" } else { "http" };
// Gitea raw URL: <scheme>://<host>/<namespace>/app-catalog/raw/branch/main/catalog.json.
// reg.url already includes the namespace (e.g. "host/lfg2025"),
// so we just tack on the repo + raw path.
upstreams.push(format!(
"{}://{}/app-catalog/raw/branch/main/catalog.json",
scheme, reg.url
));
}
}
if upstreams.is_empty() {
upstreams.push(
"http://146.59.87.168:3000/lfg2025/app-catalog/raw/branch/main/catalog.json"
.to_string(),
);
upstreams.push(
"https://git.tx1138.com/lfg2025/app-catalog/raw/branch/main/catalog.json"
.to_string(),
);
}
let client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(15))
.build()
{
Ok(c) => c,
Err(e) => {
return Ok(build_response(
hyper::StatusCode::INTERNAL_SERVER_ERROR,
"text/plain",
hyper::Body::from(format!("client build failed: {}", e)),
));
}
};
for url in &upstreams {
match client.get(url).send().await {
Ok(resp) if resp.status().is_success() => {
if let Ok(bytes) = resp.bytes().await {
return Ok(Response::builder()
.status(hyper::StatusCode::OK)
.header("Content-Type", "application/json")
.header("Cache-Control", "public, max-age=3600")
.body(hyper::Body::from(bytes))
.unwrap_or_else(|_| {
Response::new(hyper::Body::from("proxy response build failed"))
}));
}
}
_ => continue,
}
}
Ok(build_response(
hyper::StatusCode::BAD_GATEWAY,
"text/plain",
hyper::Body::from("all upstream catalog URLs failed"),
))
}
/// Build a 401 Unauthorized JSON response.
fn unauthorized() -> Response<hyper::Body> {
let body = serde_json::json!({ "error": "Unauthorized" });
let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
Response::builder()
.status(StatusCode::UNAUTHORIZED)
.header("Content-Type", "application/json")
.body(hyper::Body::from(body_bytes))
.unwrap()
}
/// Allowed CORS origins derived from the config host IP.
fn allowed_origins(&self) -> Vec<String> {
let mut origins = vec![
format!("http://{}", self.config.host_ip),
format!("https://{}", self.config.host_ip),
];
if self.config.dev_mode {
origins.push("http://localhost:8100".to_string()); // Vite dev server
}
origins
}
/// Validate the Origin header against allowed origins.
/// Returns the matched origin if valid, None if cross-origin is not allowed.
fn validate_origin(&self, headers: &hyper::HeaderMap) -> Option<String> {
let origin = headers.get("origin").and_then(|v| v.to_str().ok())?;
let allowed = self.allowed_origins();
if allowed.iter().any(|a| a == origin) {
Some(origin.to_string())
} else {
None
}
}
/// Permissive origin check for the share-to-mesh iframe intent: any scheme
/// http(s):// followed by the configured host_ip, optionally `:port`. Apps
/// proxied under other ports (APP_PORTS) call this from within the same
/// node, so they share host_ip but not port. The session cookie still has
/// to be valid — this is a sanity check, not the primary auth.
fn validate_app_origin(&self, headers: &hyper::HeaderMap) -> Option<String> {
let origin = headers.get("origin").and_then(|v| v.to_str().ok())?;
// Allow localhost dev server too so the Vite frontend can exercise it.
if self.config.dev_mode && origin == "http://localhost:8100" {
return Some(origin.to_string());
}
let host_ip = &self.config.host_ip;
let matches = |scheme: &str| -> bool {
let prefix = format!("{}{}", scheme, host_ip);
if origin == prefix {
return true;
}
let with_port = format!("{}:", prefix);
origin.starts_with(&with_port)
&& origin[with_port.len()..]
.bytes()
.all(|b| b.is_ascii_digit())
};
if matches("http://") || matches("https://") {
Some(origin.to_string())
} else {
None
}
}
pub async fn handle_request(&self, req: Request<hyper::Body>) -> Result<Response<hyper::Body>> {
let path = req.uri().path().to_string();
let method = req.method().clone();
// Handle CORS preflight for all routes
if method == Method::OPTIONS {
let mut builder = Response::builder()
.status(StatusCode::NO_CONTENT)
.header("Vary", "Origin");
if let Some(origin) = self.validate_origin(req.headers()) {
builder = builder
.header("Access-Control-Allow-Origin", &origin)
.header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
.header("Access-Control-Allow-Headers", "Content-Type, X-CSRF-Token")
.header("Access-Control-Allow-Credentials", "true");
}
return Ok(builder.body(hyper::Body::empty()).unwrap());
}
// WebSocket upgrade — validate session before upgrading
if method == Method::GET && path == "/ws/db" {
if !self.is_authenticated(req.headers()).await {
tracing::warn!("401 WebSocket /ws/db — session invalid or missing");
return Ok(Self::unauthorized());
}
return Self::handle_websocket(
req,
self.state_manager.clone(),
self.metrics_store.clone(),
)
.await;
}
// Remote input WebSocket — companion app sends keyboard/mouse events
if method == Method::GET && path == "/ws/remote-input" {
if !self.is_authenticated(req.headers()).await {
tracing::warn!("401 WebSocket /ws/remote-input — session invalid or missing");
return Ok(Self::unauthorized());
}
return Self::handle_remote_input(req, self.input_relay_tx.clone()).await;
}
// Remote relay WebSocket — browser receives companion input events
if method == Method::GET && path == "/ws/remote-relay" {
if !self.is_authenticated(req.headers()).await {
tracing::warn!("401 WebSocket /ws/remote-relay — session invalid or missing");
return Ok(Self::unauthorized());
}
return Self::handle_remote_relay(req, self.input_relay_tx.subscribe()).await;
}
// Convert body to bytes for non-WS routes
let headers = req.headers().clone();
let query_string = req.uri().query().map(|s| s.to_string()).unwrap_or_default();
let (parts, body) = req.into_parts();
let body_bytes = hyper::body::to_bytes(body)
.await
.map_err(|e| anyhow::anyhow!("Failed to read body: {}", e))?;
let req_with_bytes = Request::from_parts(parts, hyper::Body::from(body_bytes.clone()));
debug!("{} {}", method, path);
match (method, path.as_str()) {
// RPC — auth is handled inside rpc handler per-method
(Method::POST, "/rpc/v1") => self.rpc_handler.clone().handle(req_with_bytes).await,
// Health — unauthenticated, returns JSON with service status
(Method::GET, "/health") => {
let recovery_complete = crate::crash_recovery::is_recovery_complete();
let uptime = crate::crash_recovery::uptime_seconds();
let health_status = if recovery_complete { "ok" } else { "degraded" };
let status = serde_json::json!({
"status": health_status,
"crash_recovery_complete": recovery_complete,
"uptime_seconds": uptime,
"version": env!("CARGO_PKG_VERSION"),
"services": {
"rpc": true,
"sessions": true,
}
});
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(hyper::Body::from(
serde_json::to_vec(&status).unwrap_or_default(),
))
.unwrap())
}
// Node message — P2P endpoint (authenticated by source validation, not cookie)
(Method::POST, "/archipelago/node-message") => {
Self::handle_node_message(body_bytes).await
}
// Mesh typed envelope relay over federation — peers POST
// pre-encoded TypedEnvelope wire bytes here when the envelope is
// too large for a single LoRa frame (primarily ContentRef). No
// session auth: the body carries a pubkey + ed25519 signature
// over the wire bytes which we verify before dispatching.
(Method::POST, "/archipelago/mesh-typed") => {
Self::handle_mesh_typed_relay(self.rpc_handler.clone(), body_bytes).await
}
// Blob upload — local/session use only. Session-authenticated so
// only the node owner can push attachments into the blob store.
(Method::POST, "/api/blob") => {
if !self.is_authenticated(&headers).await {
return Ok(Self::unauthorized());
}
Self::handle_blob_upload(
&self.blob_store,
&self.self_pubkey_hex,
&self.config.data_dir,
&headers,
body_bytes,
)
.await
}
// Share-to-mesh intent — marketplace app iframes POST a file here
// to stage it as a mesh attachment. Same body format as /api/blob
// (raw bytes + X-Blob-Mime/X-Blob-Filename headers). The app is
// expected to postMessage `{type:'share-to-mesh', cid, ...}` to
// its parent window afterwards so the Mesh view can pick it up.
// Authenticated by session cookie + a relaxed Origin check (any
// port on the archipelago host is allowed, so proxied apps on
// their own ports can reach it with credentials:'include').
(Method::POST, "/api/share-to-mesh") => {
if !self.is_authenticated(&headers).await {
return Ok(Self::unauthorized());
}
let origin = match self.validate_app_origin(&headers) {
Some(o) => o,
None => {
return Ok(build_response(
StatusCode::FORBIDDEN,
"text/plain",
hyper::Body::from("origin not allowed"),
))
}
};
Self::handle_share_to_mesh(
&self.blob_store,
&self.self_pubkey_hex,
&headers,
body_bytes,
&origin,
)
.await
}
// Blob download — peer-facing. No session required; authenticated
// by HMAC capability token signed when the blob ref was shared.
(Method::GET, p) if p.starts_with("/blob/") => {
Self::handle_blob_download(&self.blob_store, p, &query_string).await
}
// Content preview — degraded previews for paid content (no auth, no payment)
(Method::GET, p) if p.starts_with("/content/") && p.ends_with("/preview") => {
Self::handle_content_preview(p, &self.config).await
}
// Content serving — peers access shared content over Tor (no session auth)
(Method::GET, p) if p.starts_with("/content/") => {
Self::handle_content_request(p, &headers, &self.config).await
}
// Content catalog — list available content (no session auth, for peers)
(Method::GET, "/content") => Self::handle_content_catalog(&self.config).await,
// Electrs status — unauthenticated (read-only sync status)
(Method::GET, "/electrs-status") => Self::handle_electrs_status().await,
// App-catalog proxy — fetches catalog.json from the configured
// upstream URLs server-side so the browser doesn't hit CORS
// (git.tx1138.com has no ACAO header) or CSP (IP-port upstream
// falls outside `connect-src`). Session-authenticated so only
// the logged-in node owner can spin up fetches.
(Method::GET, "/api/app-catalog") => {
if !self.is_authenticated(&headers).await {
return Ok(Self::unauthorized());
}
self.handle_app_catalog_proxy().await
}
// LND connect info — nginx validates session cookie (presence check),
// backend is bound to 127.0.0.1 so only nginx can reach it.
// No backend auth check here because the LND UI iframe fetches this
// endpoint and the session cookie flow is validated at the nginx layer.
(Method::GET, "/lnd-connect-info") => {
Self::handle_lnd_connect_info(self.rpc_handler.clone()).await
}
// Container logs — requires session
(Method::GET, path) if path.starts_with("/api/container/logs") => {
if !self.is_authenticated(&headers).await {
return Ok(Self::unauthorized());
}
let origin = self.validate_origin(&headers).unwrap_or_default();
Self::handle_container_logs_http(self.rpc_handler.clone(), path, &origin).await
}
// LND proxy — requires session
(Method::GET, path) if path.starts_with("/proxy/lnd/") => {
if !self.is_authenticated(&headers).await {
return Ok(Self::unauthorized());
}
let origin = self.validate_origin(&headers).unwrap_or_default();
Self::handle_lnd_proxy(path, &origin).await
}
// DWN health — unauthenticated
(Method::GET, "/dwn/health") => Self::handle_dwn_health(&self.config).await,
// DWN message processing — peers access over Tor for sync (no session auth)
(Method::POST, "/dwn") => Self::handle_dwn_message(body_bytes, &self.config).await,
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(hyper::Body::from("Not Found"))
.unwrap()),
}
}
}
/// Validate that an app ID matches the safe pattern: lowercase alphanumeric + hyphens.
fn is_valid_app_id(id: &str) -> bool {
!id.is_empty()
&& id.len() <= 64
&& id
.bytes()
.all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || b == b'-')
&& id.as_bytes()[0] != b'-'
}
/// Validate that a pubkey is a 64-char hex string.
fn is_valid_pubkey_hex(s: &str) -> bool {
s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit())
}
/// Strip newlines and ANSI escape sequences from strings before logging.
fn sanitize_log_string(s: &str) -> String {
s.replace('\n', "\\n")
.replace('\r', "\\r")
.replace('\x1b', "")
}
/// Strip HTML-sensitive characters to prevent XSS when stored/rendered.
fn sanitize_html(s: &str) -> String {
s.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
.replace('"', "&quot;")
.replace('\'', "&#x27;")
}