diff --git a/core/Cargo.lock b/core/Cargo.lock index 4e234a84..3905cca2 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -36,9 +36,11 @@ dependencies = [ "archipelago-performance", "archipelago-security", "bcrypt", + "futures-util", "http-body 1.0.1", "http-body-util", "hyper 0.14.32", + "hyper-tungstenite", "hyper-util", "serde", "serde_json", @@ -46,7 +48,7 @@ dependencies = [ "thiserror", "tokio", "tokio-test", - "tokio-tungstenite", + "tokio-tungstenite 0.20.1", "toml", "tower", "tower-http", @@ -674,6 +676,19 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "hyper-tungstenite" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "226df6fd0aece319a325419d770aa9d947defa60463f142cd82b329121f906a3" +dependencies = [ + "hyper 0.14.32", + "pin-project", + "tokio", + "tokio-tungstenite 0.19.0", + "tungstenite 0.19.0", +] + [[package]] name = "hyper-util" version = "0.1.19" @@ -1055,6 +1070,26 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -1622,6 +1657,18 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "tokio-tungstenite" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec509ac96e9a0c43427c74f003127d953a265737636129424288d27cb5c4b12c" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.19.0", +] + [[package]] name = "tokio-tungstenite" version = "0.20.1" @@ -1633,7 +1680,7 @@ dependencies = [ "native-tls", "tokio", "tokio-native-tls", - "tungstenite", + "tungstenite 0.20.1", ] [[package]] @@ -1795,6 +1842,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15fba1a6d6bb030745759a9a2a588bfe8490fc8b4751a277db3a0be1c9ebbf67" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 0.2.12", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.20.1" diff --git a/core/archipelago/Cargo.toml b/core/archipelago/Cargo.toml index af9ce4b8..60607bd1 100644 --- a/core/archipelago/Cargo.toml +++ b/core/archipelago/Cargo.toml @@ -27,6 +27,8 @@ http-body = "1.0" tower = "0.5" tower-http = { version = "0.6", features = ["cors", "trace"] } tokio-tungstenite = { version = "0.20", features = ["native-tls"] } +hyper-tungstenite = { version = "0.10", default-features = false } +futures-util = "0.3" # Our modules archipelago-container = { path = "../container" } diff --git a/core/archipelago/src/api/handler.rs b/core/archipelago/src/api/handler.rs index 591dd09e..282f2788 100644 --- a/core/archipelago/src/api/handler.rs +++ b/core/archipelago/src/api/handler.rs @@ -1,14 +1,15 @@ use crate::api::rpc::RpcHandler; use crate::config::Config; use anyhow::Result; +use futures_util::{SinkExt, StreamExt}; use hyper::{Method, Request, Response, StatusCode}; +use hyper_tungstenite::tungstenite::Message; use std::sync::Arc; -use tracing::debug; +use tracing::{debug, error, info}; pub struct ApiHandler { _config: Config, rpc_handler: Arc, - // Add other handlers here (websocket, static files, etc.) } impl ApiHandler { @@ -25,37 +26,73 @@ impl ApiHandler { &self, req: Request, ) -> Result> { - // Extract path and method before consuming req let path = req.uri().path().to_string(); let method = req.method().clone(); - - // Convert body to bytes + + // WebSocket upgrade must be handled before consuming the body + if method == Method::GET && path == "/ws/db" { + return Self::handle_websocket(req).await; + } + + // Convert body to bytes for non-WS routes let (parts, body) = req.into_parts(); let body_bytes = hyper::body::to_bytes(body).await .map_err(|e| anyhow::anyhow!("Failed to read body: {}", e))?; - - // Reconstruct request with body as Bytes for RPC handler let req_with_bytes = Request::from_parts(parts, hyper::Body::from(body_bytes)); debug!("{} {}", method, path); - // Route requests match (method, path.as_str()) { - (Method::POST, "/rpc/v1") => { - self.rpc_handler.handle(req_with_bytes).await - } - (Method::GET, "/health") => { - Ok(Response::builder() - .status(StatusCode::OK) - .body(hyper::Body::from("OK")) - .unwrap()) - } - _ => { - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(hyper::Body::from("Not Found")) - .unwrap()) - } + (Method::POST, "/rpc/v1") => self.rpc_handler.handle(req_with_bytes).await, + (Method::GET, "/health") => Ok(Response::builder() + .status(StatusCode::OK) + .body(hyper::Body::from("OK")) + .unwrap()), + _ => Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(hyper::Body::from("Not Found")) + .unwrap()), } } + + async fn handle_websocket( + req: Request, + ) -> Result> { + if !hyper_tungstenite::is_upgrade_request(&req) { + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(hyper::Body::from("Expected WebSocket upgrade"))?); + } + + let (response, ws_fut) = hyper_tungstenite::upgrade(req, None) + .map_err(|e| anyhow::anyhow!("WebSocket upgrade failed: {}", e))?; + + // Spawn a task to hold the connection open and avoid EPIPE when the UI connects + tokio::spawn(async move { + let mut ws_stream = match ws_fut.await { + Ok(s) => s, + Err(e) => { + error!("WebSocket handshake failed: {}", e); + return; + } + }; + info!("WebSocket /ws/db connected"); + // Keep connection open; UI may send/receive JSON patches. For now just accept and ignore. + while let Some(msg) = ws_stream.next().await { + match msg { + Ok(Message::Close(_)) => break, + Ok(Message::Ping(data)) => { + let _ = ws_stream.send(Message::Pong(data)).await; + } + Ok(_) => {} + Err(e) => { + debug!("WebSocket stream error: {}", e); + break; + } + } + } + }); + + Ok(response) + } } diff --git a/core/archipelago/src/main.rs b/core/archipelago/src/main.rs index d3dfaffd..2c0cd41a 100644 --- a/core/archipelago/src/main.rs +++ b/core/archipelago/src/main.rs @@ -11,9 +11,13 @@ mod config; mod container; mod server; +use auth::AuthManager; use config::Config; use server::Server; +/// Default dev password when auto-creating a user (matches mock-backend). +const DEV_DEFAULT_PASSWORD: &str = "password123"; + #[tokio::main] async fn main() -> Result<()> { // Initialize tracing @@ -30,6 +34,15 @@ async fn main() -> Result<()> { let config = Config::load().await?; info!("📁 Data directory: {}", config.data_dir.display()); + // In dev mode, ensure a default user exists so login works without manual setup + if config.dev_mode { + let auth = AuthManager::new(config.data_dir.clone()); + if !auth.is_setup().await? { + auth.setup_user(DEV_DEFAULT_PASSWORD).await?; + info!("👤 Created default dev user (password: {})", DEV_DEFAULT_PASSWORD); + } + } + // Create server let server = Server::new(config.clone()).await?;