use crate::api::rpc::RpcHandler; use crate::config::Config; use crate::state::StateManager; use anyhow::Result; use futures_util::{SinkExt, StreamExt}; use hyper::{Method, Request, Response, StatusCode}; use hyper_ws_listener::WsStream; use std::sync::Arc; use tokio::sync::broadcast; use tokio_tungstenite::tungstenite::Message; use tracing::{debug, info}; const CORS_ANY: &str = "*"; pub struct ApiHandler { _config: Config, rpc_handler: Arc, state_manager: Arc, } impl ApiHandler { pub async fn new(config: Config, state_manager: Arc) -> Result { let rpc_handler = Arc::new(RpcHandler::new(config.clone()).await?); Ok(Self { _config: config, rpc_handler, state_manager, }) } pub async fn handle_request( &self, req: Request, ) -> Result> { let path = req.uri().path().to_string(); let method = req.method().clone(); // WebSocket upgrade must be handled before consuming the body if method == Method::GET && path == "/ws/db" { return Self::handle_websocket(req, self.state_manager.clone()).await; } // Convert body to bytes for non-WS routes let (parts, body) = req.into_parts(); let body_bytes = hyper::body::to_bytes(body).await .map_err(|e| anyhow::anyhow!("Failed to read body: {}", e))?; let req_with_bytes = Request::from_parts(parts, hyper::Body::from(body_bytes)); debug!("{} {}", method, path); match (method, path.as_str()) { (Method::POST, "/rpc/v1") => self.rpc_handler.handle(req_with_bytes).await, (Method::GET, "/health") => Ok(Response::builder() .status(StatusCode::OK) .body(hyper::Body::from("OK")) .unwrap()), (Method::GET, path) if path.starts_with("/api/container/logs") => { Self::handle_container_logs_http(self.rpc_handler.clone(), path).await } (Method::GET, path) if path.starts_with("/proxy/lnd/") => { Self::handle_lnd_proxy(path).await } _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(hyper::Body::from("Not Found")) .unwrap()), } } async fn handle_container_logs_http( rpc: Arc, path: &str, ) -> Result> { let query = path .strip_prefix("/api/container/logs") .and_then(|s| s.strip_prefix('?')) .unwrap_or(""); let params: std::collections::HashMap = query .split('&') .filter_map(|p| { let mut it = p.splitn(2, '='); let k = it.next()?.to_string(); let v = it.next()?.to_string(); Some((k, v)) }) .collect(); let app_id = params.get("app_id").map(|s| s.as_str()).unwrap_or("lnd"); let lines = params .get("lines") .and_then(|s| s.parse::().ok()) .unwrap_or(200); match rpc.get_container_logs_value(app_id, lines).await { Ok(value) => { let body = serde_json::json!({ "result": value }); let body_bytes = serde_json::to_vec(&body).unwrap_or_default(); Ok(Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .header("Access-Control-Allow-Origin", CORS_ANY) .body(hyper::Body::from(body_bytes)) .unwrap()) } Err(e) => { let body = serde_json::json!({ "error": e.to_string() }); let body_bytes = serde_json::to_vec(&body).unwrap_or_default(); Ok(Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .header("Content-Type", "application/json") .header("Access-Control-Allow-Origin", CORS_ANY) .body(hyper::Body::from(body_bytes)) .unwrap()) } } } async fn handle_lnd_proxy(path: &str) -> Result> { let suffix = path.strip_prefix("/proxy/lnd").unwrap_or("/"); let url = format!("http://127.0.0.1:8080{}", suffix); match reqwest::get(&url).await { Ok(resp) => { let status = resp.status().as_u16(); let headers = resp.headers().clone(); let body = resp.bytes().await.unwrap_or_default(); let mut builder = Response::builder().status(status); if let Some(ct) = headers.get("content-type") { if let Ok(s) = ct.to_str() { builder = builder.header("Content-Type", s); } } builder .header("Access-Control-Allow-Origin", CORS_ANY) .body(hyper::Body::from(body)) .map_err(|e| anyhow::anyhow!("response build: {}", e)) } Err(e) => { let body = serde_json::json!({ "error": e.to_string() }); let body_bytes = serde_json::to_vec(&body).unwrap_or_default(); Ok(Response::builder() .status(StatusCode::BAD_GATEWAY) .header("Content-Type", "application/json") .header("Access-Control-Allow-Origin", CORS_ANY) .body(hyper::Body::from(body_bytes)) .unwrap()) } } } async fn handle_websocket( req: Request, state_manager: Arc, ) -> Result> { let (response, ws_fut_opt) = hyper_ws_listener::create_ws(req) .map_err(|e| anyhow::anyhow!("WebSocket upgrade failed: {}", e))?; // Spawn a task to hold the connection open if upgrade future exists if let Some(ws_fut) = ws_fut_opt { tokio::spawn(async move { let ws_stream: WsStream = match ws_fut.await { Ok(Ok(s)) => s, Ok(Err(e)) => { debug!("WebSocket handshake failed (hyper): {}", e); return; } Err(e) => { debug!("WebSocket task join failed: {}", e); return; } }; info!("WebSocket /ws/db connected"); let (mut tx, mut rx) = ws_stream.split(); // Send initial data dump let initial_msg = state_manager.get_initial_message().await; if let Ok(json_msg) = serde_json::to_string(&initial_msg) { if let Err(e) = tx.send(Message::Text(json_msg)).await { debug!("Failed to send initial data: {}", e); return; } debug!("Sent initial data dump at revision {}", initial_msg.rev); } // Subscribe to state updates let mut state_rx = state_manager.subscribe(); // Send periodic pings to keep connection alive let ping_interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); tokio::pin!(ping_interval); // Keep connection open and forward state updates to client loop { tokio::select! { _ = ping_interval.tick() => { if tx.send(Message::Ping(vec![])).await.is_err() { debug!("Failed to send ping, connection likely closed"); break; } } // Forward state updates from broadcast channel to WebSocket update = state_rx.recv() => { match update { Ok(msg) => { if let Ok(json_msg) = serde_json::to_string(&msg) { if let Err(e) = tx.send(Message::Text(json_msg)).await { debug!("Failed to send state update: {}", e); break; } debug!("Sent state update at revision {}", msg.rev); } } Err(broadcast::error::RecvError::Lagged(skipped)) => { debug!("Client lagged behind, skipped {} messages", skipped); // Continue receiving - the client will get the next update } Err(broadcast::error::RecvError::Closed) => { debug!("Broadcast channel closed"); break; } } } msg = rx.next() => { match msg { Some(Ok(Message::Close(_))) => break, Some(Ok(Message::Pong(_))) => { debug!("Received pong"); } Some(Ok(Message::Ping(data))) => { let _ = tx.send(Message::Pong(data)).await; } Some(Ok(_)) => {} Some(Err(e)) => { debug!("WebSocket stream error: {}", e); break; } None => break, } } } } info!("WebSocket /ws/db disconnected"); }); } Ok(response) } }