use crate::auth::AuthManager; use crate::config::Config; use crate::container::DevContainerOrchestrator; use anyhow::{Context, Result}; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use std::sync::Arc; use tracing::{debug, error}; #[derive(Debug, Deserialize)] struct RpcRequest { method: String, params: Option, } #[derive(Debug, Serialize)] struct RpcResponse { result: Option, error: Option, } #[derive(Debug, Serialize)] struct RpcError { code: i32, message: String, data: Option, } /// Default dev password when no user is set up (matches mock-backend). const DEV_DEFAULT_PASSWORD: &str = "password123"; pub struct RpcHandler { config: Config, auth_manager: AuthManager, orchestrator: Option>, } impl RpcHandler { pub async fn new(config: Config) -> Result { let auth_manager = AuthManager::new(config.data_dir.clone()); let orchestrator = if config.dev_mode { Some(Arc::new( DevContainerOrchestrator::new(config.clone()).await?, )) } else { None }; Ok(Self { config, auth_manager, orchestrator, }) } pub async fn handle( &self, req: Request, ) -> Result> { // Read request body let (_, body) = req.into_parts(); let body_bytes = hyper::body::to_bytes(body).await .context("Failed to read body")?; let rpc_req: RpcRequest = serde_json::from_slice(&body_bytes) .context("Invalid RPC request")?; debug!("RPC method: {}", rpc_req.method); // Route to handler let result = match rpc_req.method.as_str() { "echo" => self.handle_echo(rpc_req.params).await, "server.echo" => self.handle_echo(rpc_req.params).await, "auth.login" => self.handle_auth_login(rpc_req.params).await, "auth.logout" => self.handle_auth_logout().await, // Container orchestration (for Archipelago-managed containers) "container-install" => self.handle_container_install(rpc_req.params).await, "container-start" => self.handle_container_start(rpc_req.params).await, "container-stop" => self.handle_container_stop(rpc_req.params).await, "container-remove" => self.handle_container_remove(rpc_req.params).await, "container-list" => self.handle_container_list().await, "container-status" => self.handle_container_status(rpc_req.params).await, "container-logs" => self.handle_container_logs(rpc_req.params).await, "container-health" => self.handle_container_health(rpc_req.params).await, // Package management (for docker-compose apps) "package.start" => self.handle_package_start(rpc_req.params).await, "package.stop" => self.handle_package_stop(rpc_req.params).await, "package.restart" => self.handle_package_restart(rpc_req.params).await, _ => { Err(anyhow::anyhow!("Unknown method: {}", rpc_req.method)) } }; // Build response let rpc_resp = match result { Ok(data) => RpcResponse { result: Some(data), error: None, }, Err(e) => { error!("RPC error: {}", e); RpcResponse { result: None, error: Some(RpcError { code: -1, message: e.to_string(), data: None, }), } } }; let body = serde_json::to_vec(&rpc_resp) .context("Failed to serialize response")?; Ok(Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(hyper::Body::from(body)) .unwrap()) } async fn handle_echo(&self, params: Option) -> Result { if let Some(p) = params { if let Some(msg) = p.get("message").and_then(|v| v.as_str()) { return Ok(serde_json::json!({ "message": msg })); } } Ok(serde_json::json!({ "message": "Hello from Archipelago!" })) } async fn handle_auth_login( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let password = params .get("password") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing password"))?; let is_setup = self.auth_manager.is_setup().await?; if !is_setup { // Dev mode: allow default password so UI can log in without running setup if self.config.dev_mode && password == DEV_DEFAULT_PASSWORD { return Ok(serde_json::Value::Null); } return Err(anyhow::anyhow!( "User not set up. Please complete setup first." )); } let valid = self.auth_manager.verify_password(password).await?; if !valid { return Err(anyhow::anyhow!("Password Incorrect")); } Ok(serde_json::Value::Null) } async fn handle_auth_logout(&self) -> Result { // For now, just return success. In a full implementation, this would: // - Invalidate session tokens // - Clear cookies (if we were managing them) // - Close authenticated WebSocket connections Ok(serde_json::Value::Null) } async fn handle_container_install( &self, params: Option, ) -> Result { let orchestrator = self .orchestrator .as_ref() .ok_or_else(|| anyhow::anyhow!("Container orchestrator not available (dev mode required)"))?; let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let manifest_path = params .get("manifest_path") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing manifest_path"))?; // Load manifest let manifest_content = tokio::fs::read_to_string(manifest_path) .await .context("Failed to read manifest file")?; let manifest: archipelago_container::AppManifest = serde_yaml::from_str(&manifest_content) .context("Failed to parse manifest")?; let container_name = orchestrator .install_container(&manifest, manifest_path) .await .context("Failed to install container")?; Ok(serde_json::json!(container_name)) } async fn handle_container_start( &self, params: Option, ) -> Result { let orchestrator = self .orchestrator .as_ref() .ok_or_else(|| anyhow::anyhow!("Container orchestrator not available (dev mode required)"))?; let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let app_id = params .get("app_id") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing app_id"))?; orchestrator .start_container(app_id) .await .context("Failed to start container")?; Ok(serde_json::json!({ "status": "started" })) } async fn handle_container_stop( &self, params: Option, ) -> Result { let orchestrator = self .orchestrator .as_ref() .ok_or_else(|| anyhow::anyhow!("Container orchestrator not available (dev mode required)"))?; let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let app_id = params .get("app_id") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing app_id"))?; orchestrator .stop_container(app_id) .await .context("Failed to stop container")?; Ok(serde_json::json!({ "status": "stopped" })) } async fn handle_container_remove( &self, params: Option, ) -> Result { let orchestrator = self .orchestrator .as_ref() .ok_or_else(|| anyhow::anyhow!("Container orchestrator not available (dev mode required)"))?; let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let app_id = params .get("app_id") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing app_id"))?; let preserve_data = params .get("preserve_data") .and_then(|v| v.as_bool()) .unwrap_or(false); orchestrator .remove_container(app_id, preserve_data) .await .context("Failed to remove container")?; Ok(serde_json::json!({ "status": "removed" })) } async fn handle_container_list(&self) -> Result { let orchestrator = self .orchestrator .as_ref() .ok_or_else(|| anyhow::anyhow!("Container orchestrator not available (dev mode required)"))?; let containers = orchestrator .list_containers() .await .context("Failed to list containers")?; Ok(serde_json::to_value(containers)?) } async fn handle_container_status( &self, params: Option, ) -> Result { let orchestrator = self .orchestrator .as_ref() .ok_or_else(|| anyhow::anyhow!("Container orchestrator not available (dev mode required)"))?; let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let app_id = params .get("app_id") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing app_id"))?; let status = orchestrator .get_container_status(app_id) .await .context("Failed to get container status")?; Ok(serde_json::to_value(status)?) } async fn handle_container_logs( &self, params: Option, ) -> Result { let orchestrator = self .orchestrator .as_ref() .ok_or_else(|| anyhow::anyhow!("Container orchestrator not available (dev mode required)"))?; let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let app_id = params .get("app_id") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing app_id"))?; let lines = params .get("lines") .and_then(|v| v.as_u64()) .unwrap_or(100) as u32; let logs = orchestrator .get_container_logs(app_id, lines) .await .context("Failed to get container logs")?; Ok(serde_json::to_value(logs)?) } async fn handle_container_health( &self, params: Option, ) -> Result { let orchestrator = self .orchestrator .as_ref() .ok_or_else(|| anyhow::anyhow!("Container orchestrator not available (dev mode required)"))?; // If app_id is provided, get health for that app if let Some(params) = params { if let Some(app_id) = params.get("app_id").and_then(|v| v.as_str()) { let health = orchestrator .get_health_status(app_id) .await .context("Failed to get container health")?; return Ok(serde_json::json!({ app_id: health })); } } // Otherwise, get health for all containers let containers = orchestrator .list_containers() .await .context("Failed to list containers")?; let mut health_map = serde_json::Map::new(); for container in containers { // Extract app_id from container name if let Some(app_id) = container.name.strip_prefix("archipelago-") { if let Some(app_id) = app_id.strip_suffix("-dev") { match orchestrator.get_health_status(app_id).await { Ok(health) => { health_map.insert(app_id.to_string(), serde_json::Value::String(health)); } Err(_) => { health_map.insert(app_id.to_string(), serde_json::Value::String("unknown".to_string())); } } } } } Ok(serde_json::Value::Object(health_map)) } // Package management methods for docker-compose containers async fn handle_package_start( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let package_id = params .get("id") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing package id"))?; // Convert package ID to container name (e.g., "bitcoin" -> "archy-bitcoin") let container_name = format!("archy-{}", package_id); // Use docker CLI to start the container let output = tokio::process::Command::new("docker") .arg("start") .arg(&container_name) .output() .await .context("Failed to execute docker start")?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); return Err(anyhow::anyhow!("Failed to start container: {}", stderr)); } Ok(serde_json::Value::Null) } async fn handle_package_stop( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let package_id = params .get("id") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing package id"))?; // Convert package ID to container name let container_name = format!("archy-{}", package_id); // Use docker CLI to stop the container let output = tokio::process::Command::new("docker") .arg("stop") .arg(&container_name) .output() .await .context("Failed to execute docker stop")?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); return Err(anyhow::anyhow!("Failed to stop container: {}", stderr)); } Ok(serde_json::Value::Null) } async fn handle_package_restart( &self, params: Option, ) -> Result { let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; let package_id = params .get("id") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing package id"))?; // Convert package ID to container name let container_name = format!("archy-{}", package_id); // Use docker CLI to restart the container let output = tokio::process::Command::new("docker") .arg("restart") .arg(&container_name) .output() .await .context("Failed to execute docker restart")?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); return Err(anyhow::anyhow!("Failed to restart container: {}", stderr)); } Ok(serde_json::Value::Null) } }