use crate::api::rpc::RpcHandler; use crate::config::Config; use anyhow::Result; use futures_util::{SinkExt, StreamExt}; use hyper::{Method, Request, Response, StatusCode}; use hyper_ws_listener::WsStream; use std::sync::Arc; use tokio_tungstenite::tungstenite::Message; use tracing::{debug, info}; pub struct ApiHandler { _config: Config, rpc_handler: Arc, } impl ApiHandler { pub async fn new(config: Config) -> Result { let rpc_handler = Arc::new(RpcHandler::new(config.clone()).await?); Ok(Self { _config: config, rpc_handler, }) } pub async fn handle_request( &self, req: Request, ) -> Result> { let path = req.uri().path().to_string(); let method = req.method().clone(); // WebSocket upgrade must be handled before consuming the body if method == Method::GET && path == "/ws/db" { return Self::handle_websocket(req).await; } // Convert body to bytes for non-WS routes let (parts, body) = req.into_parts(); let body_bytes = hyper::body::to_bytes(body).await .map_err(|e| anyhow::anyhow!("Failed to read body: {}", e))?; let req_with_bytes = Request::from_parts(parts, hyper::Body::from(body_bytes)); debug!("{} {}", method, path); match (method, path.as_str()) { (Method::POST, "/rpc/v1") => self.rpc_handler.handle(req_with_bytes).await, (Method::GET, "/health") => Ok(Response::builder() .status(StatusCode::OK) .body(hyper::Body::from("OK")) .unwrap()), _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(hyper::Body::from("Not Found")) .unwrap()), } } async fn handle_websocket( req: Request, ) -> Result> { let (response, ws_fut_opt) = hyper_ws_listener::create_ws(req) .map_err(|e| anyhow::anyhow!("WebSocket upgrade failed: {}", e))?; // Spawn a task to hold the connection open if upgrade future exists if let Some(ws_fut) = ws_fut_opt { tokio::spawn(async move { let ws_stream: WsStream = match ws_fut.await { Ok(Ok(s)) => s, Ok(Err(e)) => { debug!("WebSocket handshake failed (hyper): {}", e); return; } Err(e) => { debug!("WebSocket task join failed: {}", e); return; } }; info!("WebSocket /ws/db connected"); let (mut tx, mut rx) = ws_stream.split(); // Send periodic pings to keep connection alive let ping_interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); tokio::pin!(ping_interval); // Keep connection open; UI may send/receive JSON patches. For now just accept and ignore. loop { tokio::select! { _ = ping_interval.tick() => { if tx.send(Message::Ping(vec![])).await.is_err() { debug!("Failed to send ping, connection likely closed"); break; } } msg = rx.next() => { match msg { Some(Ok(Message::Close(_))) => break, Some(Ok(Message::Pong(_))) => { debug!("Received pong"); } Some(Ok(Message::Ping(data))) => { let _ = tx.send(Message::Pong(data)).await; } Some(Ok(_)) => {} Some(Err(e)) => { debug!("WebSocket stream error: {}", e); break; } None => break, } } } } info!("WebSocket /ws/db disconnected"); }); } Ok(response) } }