289 lines
12 KiB
Rust
Raw Normal View History

2026-01-24 22:59:20 +00:00
use crate::api::rpc::RpcHandler;
use crate::electrs_status;
use crate::node_message as node_msg;
2026-01-24 22:59:20 +00:00
use crate::config::Config;
use crate::state::StateManager;
2026-01-24 22:59:20 +00:00
use anyhow::Result;
2026-01-27 22:37:08 +00:00
use futures_util::{SinkExt, StreamExt};
2026-01-24 22:59:20 +00:00
use hyper::{Method, Request, Response, StatusCode};
use hyper_ws_listener::WsStream;
2026-01-24 22:59:20 +00:00
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio_tungstenite::tungstenite::Message;
use tracing::{debug, info};
2026-01-24 22:59:20 +00:00
const CORS_ANY: &str = "*";
2026-01-24 22:59:20 +00:00
pub struct ApiHandler {
_config: Config,
2026-01-24 22:59:20 +00:00
rpc_handler: Arc<RpcHandler>,
state_manager: Arc<StateManager>,
2026-01-24 22:59:20 +00:00
}
impl ApiHandler {
pub async fn new(config: Config, state_manager: Arc<StateManager>) -> Result<Self> {
let rpc_handler = Arc::new(RpcHandler::new(config.clone(), state_manager.clone()).await?);
2026-01-24 22:59:20 +00:00
Ok(Self {
_config: config,
2026-01-24 22:59:20 +00:00
rpc_handler,
state_manager,
2026-01-24 22:59:20 +00:00
})
}
2026-01-24 23:20:54 +00:00
pub async fn handle_request(
&self,
req: Request<hyper::Body>,
) -> Result<Response<hyper::Body>> {
2026-01-24 23:20:54 +00:00
let path = req.uri().path().to_string();
let method = req.method().clone();
2026-01-27 22:37:08 +00:00
// WebSocket upgrade must be handled before consuming the body
if method == Method::GET && path == "/ws/db" {
return Self::handle_websocket(req, self.state_manager.clone()).await;
2026-01-27 22:37:08 +00:00
}
// Convert body to bytes for non-WS routes
2026-01-24 22:59:20 +00:00
let (parts, body) = req.into_parts();
let body_bytes = hyper::body::to_bytes(body).await
.map_err(|e| anyhow::anyhow!("Failed to read body: {}", e))?;
let req_with_bytes = Request::from_parts(parts, hyper::Body::from(body_bytes.clone()));
2026-01-24 22:59:20 +00:00
debug!("{} {}", method, path);
match (method, path.as_str()) {
2026-01-27 22:37:08 +00:00
(Method::POST, "/rpc/v1") => self.rpc_handler.handle(req_with_bytes).await,
(Method::GET, "/health") => Ok(Response::builder()
.status(StatusCode::OK)
.body(hyper::Body::from("OK"))
.unwrap()),
(Method::POST, "/archipelago/node-message") => {
Self::handle_node_message(body_bytes).await
}
(Method::GET, "/electrs-status") => Self::handle_electrs_status().await,
(Method::GET, path) if path.starts_with("/api/container/logs") => {
Self::handle_container_logs_http(self.rpc_handler.clone(), path).await
}
(Method::GET, path) if path.starts_with("/proxy/lnd/") => {
Self::handle_lnd_proxy(path).await
}
2026-01-27 22:37:08 +00:00
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(hyper::Body::from("Not Found"))
.unwrap()),
2026-01-24 22:59:20 +00:00
}
}
2026-01-27 22:37:08 +00:00
async fn handle_container_logs_http(
rpc: Arc<RpcHandler>,
path: &str,
) -> Result<Response<hyper::Body>> {
let query = path
.strip_prefix("/api/container/logs")
.and_then(|s| s.strip_prefix('?'))
.unwrap_or("");
let params: std::collections::HashMap<String, String> =
query
.split('&')
.filter_map(|p| {
let mut it = p.splitn(2, '=');
let k = it.next()?.to_string();
let v = it.next()?.to_string();
Some((k, v))
})
.collect();
let app_id = params.get("app_id").map(|s| s.as_str()).unwrap_or("lnd");
let lines = params
.get("lines")
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(200);
match rpc.get_container_logs_value(app_id, lines).await {
Ok(value) => {
let body = serde_json::json!({ "result": value });
let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.header("Access-Control-Allow-Origin", CORS_ANY)
.body(hyper::Body::from(body_bytes))
.unwrap())
}
Err(e) => {
let body = serde_json::json!({ "error": e.to_string() });
let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header("Content-Type", "application/json")
.header("Access-Control-Allow-Origin", CORS_ANY)
.body(hyper::Body::from(body_bytes))
.unwrap())
}
}
}
async fn handle_node_message(body: hyper::body::Bytes) -> Result<Response<hyper::Body>> {
#[derive(serde::Deserialize)]
struct Incoming {
from_pubkey: Option<String>,
message: Option<String>,
}
let incoming: Incoming = serde_json::from_slice(&body).unwrap_or(Incoming {
from_pubkey: None,
message: None,
});
if let (Some(from), Some(msg)) = (incoming.from_pubkey, incoming.message) {
tracing::info!("📩 Received message from {}: {}", from, msg);
node_msg::store_received(&from, &msg).await;
}
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.header("Access-Control-Allow-Origin", CORS_ANY)
.body(hyper::Body::from(r#"{"ok":true}"#))
.unwrap())
}
async fn handle_electrs_status() -> Result<Response<hyper::Body>> {
let status = electrs_status::get_electrs_sync_status().await;
let body = serde_json::to_vec(&status).unwrap_or_default();
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.header("Access-Control-Allow-Origin", CORS_ANY)
.body(hyper::Body::from(body))
.unwrap())
}
async fn handle_lnd_proxy(path: &str) -> Result<Response<hyper::Body>> {
let suffix = path.strip_prefix("/proxy/lnd").unwrap_or("/");
let url = format!("http://127.0.0.1:8080{}", suffix);
match reqwest::get(&url).await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers = resp.headers().clone();
let body = resp.bytes().await.unwrap_or_default();
let mut builder = Response::builder().status(status);
if let Some(ct) = headers.get("content-type") {
if let Ok(s) = ct.to_str() {
builder = builder.header("Content-Type", s);
}
}
builder
.header("Access-Control-Allow-Origin", CORS_ANY)
.body(hyper::Body::from(body))
.map_err(|e| anyhow::anyhow!("response build: {}", e))
}
Err(e) => {
let body = serde_json::json!({ "error": e.to_string() });
let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
Ok(Response::builder()
.status(StatusCode::BAD_GATEWAY)
.header("Content-Type", "application/json")
.header("Access-Control-Allow-Origin", CORS_ANY)
.body(hyper::Body::from(body_bytes))
.unwrap())
}
}
}
2026-01-27 22:37:08 +00:00
async fn handle_websocket(
req: Request<hyper::Body>,
state_manager: Arc<StateManager>,
2026-01-27 22:37:08 +00:00
) -> Result<Response<hyper::Body>> {
let (response, ws_fut_opt) = hyper_ws_listener::create_ws(req)
2026-01-27 22:37:08 +00:00
.map_err(|e| anyhow::anyhow!("WebSocket upgrade failed: {}", e))?;
// Spawn a task to hold the connection open if upgrade future exists
if let Some(ws_fut) = ws_fut_opt {
tokio::spawn(async move {
let ws_stream: WsStream = match ws_fut.await {
Ok(Ok(s)) => s,
Ok(Err(e)) => {
debug!("WebSocket handshake failed (hyper): {}", e);
return;
2026-01-27 22:37:08 +00:00
}
Err(e) => {
debug!("WebSocket task join failed: {}", e);
return;
}
};
info!("WebSocket /ws/db connected");
let (mut tx, mut rx) = ws_stream.split();
// Send initial data dump
let initial_msg = state_manager.get_initial_message().await;
if let Ok(json_msg) = serde_json::to_string(&initial_msg) {
if let Err(e) = tx.send(Message::Text(json_msg)).await {
debug!("Failed to send initial data: {}", e);
return;
}
debug!("Sent initial data dump at revision {}", initial_msg.rev);
}
// Subscribe to state updates
let mut state_rx = state_manager.subscribe();
// Send periodic pings to keep connection alive
let ping_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
tokio::pin!(ping_interval);
// Keep connection open and forward state updates to client
loop {
tokio::select! {
_ = ping_interval.tick() => {
if tx.send(Message::Ping(vec![])).await.is_err() {
debug!("Failed to send ping, connection likely closed");
break;
}
}
// Forward state updates from broadcast channel to WebSocket
update = state_rx.recv() => {
match update {
Ok(msg) => {
if let Ok(json_msg) = serde_json::to_string(&msg) {
if let Err(e) = tx.send(Message::Text(json_msg)).await {
debug!("Failed to send state update: {}", e);
break;
}
debug!("Sent state update at revision {}", msg.rev);
}
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
debug!("Client lagged behind, skipped {} messages", skipped);
// Continue receiving - the client will get the next update
}
Err(broadcast::error::RecvError::Closed) => {
debug!("Broadcast channel closed");
break;
}
}
}
msg = rx.next() => {
match msg {
Some(Ok(Message::Close(_))) => break,
Some(Ok(Message::Pong(_))) => {
debug!("Received pong");
}
Some(Ok(Message::Ping(data))) => {
let _ = tx.send(Message::Pong(data)).await;
}
Some(Ok(_)) => {}
Some(Err(e)) => {
debug!("WebSocket stream error: {}", e);
break;
}
None => break,
}
}
2026-01-27 22:37:08 +00:00
}
}
info!("WebSocket /ws/db disconnected");
});
}
2026-01-27 22:37:08 +00:00
Ok(response)
}
2026-01-24 22:59:20 +00:00
}