136 lines
5.2 KiB
Rust
Raw Normal View History

2026-01-24 22:59:20 +00:00
use crate::api::rpc::RpcHandler;
use crate::config::Config;
use crate::state::StateManager;
2026-01-24 22:59:20 +00:00
use anyhow::Result;
2026-01-27 22:37:08 +00:00
use futures_util::{SinkExt, StreamExt};
2026-01-24 22:59:20 +00:00
use hyper::{Method, Request, Response, StatusCode};
use hyper_ws_listener::WsStream;
2026-01-24 22:59:20 +00:00
use std::sync::Arc;
use tokio_tungstenite::tungstenite::Message;
use tracing::{debug, info};
2026-01-24 22:59:20 +00:00
pub struct ApiHandler {
_config: Config,
2026-01-24 22:59:20 +00:00
rpc_handler: Arc<RpcHandler>,
state_manager: Arc<StateManager>,
2026-01-24 22:59:20 +00:00
}
impl ApiHandler {
pub async fn new(config: Config, state_manager: Arc<StateManager>) -> Result<Self> {
2026-01-24 22:59:20 +00:00
let rpc_handler = Arc::new(RpcHandler::new(config.clone()).await?);
Ok(Self {
_config: config,
2026-01-24 22:59:20 +00:00
rpc_handler,
state_manager,
2026-01-24 22:59:20 +00:00
})
}
2026-01-24 23:20:54 +00:00
pub async fn handle_request(
&self,
req: Request<hyper::Body>,
) -> Result<Response<hyper::Body>> {
2026-01-24 23:20:54 +00:00
let path = req.uri().path().to_string();
let method = req.method().clone();
2026-01-27 22:37:08 +00:00
// WebSocket upgrade must be handled before consuming the body
if method == Method::GET && path == "/ws/db" {
return Self::handle_websocket(req, self.state_manager.clone()).await;
2026-01-27 22:37:08 +00:00
}
// Convert body to bytes for non-WS routes
2026-01-24 22:59:20 +00:00
let (parts, body) = req.into_parts();
let body_bytes = hyper::body::to_bytes(body).await
.map_err(|e| anyhow::anyhow!("Failed to read body: {}", e))?;
let req_with_bytes = Request::from_parts(parts, hyper::Body::from(body_bytes));
2026-01-24 22:59:20 +00:00
debug!("{} {}", method, path);
match (method, path.as_str()) {
2026-01-27 22:37:08 +00:00
(Method::POST, "/rpc/v1") => self.rpc_handler.handle(req_with_bytes).await,
(Method::GET, "/health") => Ok(Response::builder()
.status(StatusCode::OK)
.body(hyper::Body::from("OK"))
.unwrap()),
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(hyper::Body::from("Not Found"))
.unwrap()),
2026-01-24 22:59:20 +00:00
}
}
2026-01-27 22:37:08 +00:00
async fn handle_websocket(
req: Request<hyper::Body>,
state_manager: Arc<StateManager>,
2026-01-27 22:37:08 +00:00
) -> Result<Response<hyper::Body>> {
let (response, ws_fut_opt) = hyper_ws_listener::create_ws(req)
2026-01-27 22:37:08 +00:00
.map_err(|e| anyhow::anyhow!("WebSocket upgrade failed: {}", e))?;
// Spawn a task to hold the connection open if upgrade future exists
if let Some(ws_fut) = ws_fut_opt {
tokio::spawn(async move {
let ws_stream: WsStream = match ws_fut.await {
Ok(Ok(s)) => s,
Ok(Err(e)) => {
debug!("WebSocket handshake failed (hyper): {}", e);
return;
2026-01-27 22:37:08 +00:00
}
Err(e) => {
debug!("WebSocket task join failed: {}", e);
return;
}
};
info!("WebSocket /ws/db connected");
let (mut tx, mut rx) = ws_stream.split();
// Send initial data dump
let initial_msg = state_manager.get_initial_message().await;
if let Ok(json_msg) = serde_json::to_string(&initial_msg) {
if let Err(e) = tx.send(Message::Text(json_msg)).await {
debug!("Failed to send initial data: {}", e);
return;
}
debug!("Sent initial data dump at revision {}", initial_msg.rev);
}
// Send periodic pings to keep connection alive
let ping_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
tokio::pin!(ping_interval);
// Keep connection open; UI may send/receive JSON patches. For now just accept and ignore.
loop {
tokio::select! {
_ = ping_interval.tick() => {
if tx.send(Message::Ping(vec![])).await.is_err() {
debug!("Failed to send ping, connection likely closed");
break;
}
}
msg = rx.next() => {
match msg {
Some(Ok(Message::Close(_))) => break,
Some(Ok(Message::Pong(_))) => {
debug!("Received pong");
}
Some(Ok(Message::Ping(data))) => {
let _ = tx.send(Message::Pong(data)).await;
}
Some(Ok(_)) => {}
Some(Err(e)) => {
debug!("WebSocket stream error: {}", e);
break;
}
None => break,
}
}
2026-01-27 22:37:08 +00:00
}
}
info!("WebSocket /ws/db disconnected");
});
}
2026-01-27 22:37:08 +00:00
Ok(response)
}
2026-01-24 22:59:20 +00:00
}