344 lines
14 KiB
Rust
Raw Normal View History

mod auth;
mod bitcoin;
mod container;
mod lnd;
mod node;
mod package;
mod peers;
mod totp;
use crate::auth::AuthManager;
use crate::config::Config;
use crate::container::DevContainerOrchestrator;
use crate::port_allocator::PortAllocator;
use crate::session::{self, LoginRateLimiter, SessionStore};
use crate::state::StateManager;
use anyhow::{Context, Result};
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
use std::sync::{Arc, Mutex};
use tracing::{debug, error};
#[derive(Debug, Deserialize)]
struct RpcRequest {
method: String,
params: Option<serde_json::Value>,
}
#[derive(Debug, Serialize)]
struct RpcResponse {
result: Option<serde_json::Value>,
error: Option<RpcError>,
}
#[derive(Debug, Serialize)]
struct RpcError {
code: i32,
message: String,
data: Option<serde_json::Value>,
}
/// Default dev password when no user is set up (matches mock-backend).
pub(crate) const DEV_DEFAULT_PASSWORD: &str = "password123";
/// Methods that do not require a valid session cookie.
const UNAUTHENTICATED_METHODS: &[&str] = &[
"auth.login",
"auth.login.totp",
"auth.login.backup",
"auth.isOnboardingComplete",
"health",
];
pub struct RpcHandler {
config: Config,
auth_manager: AuthManager,
orchestrator: Option<Arc<DevContainerOrchestrator>>,
state_manager: Arc<StateManager>,
port_allocator: Arc<Mutex<PortAllocator>>,
pub session_store: SessionStore,
login_rate_limiter: LoginRateLimiter,
}
impl RpcHandler {
pub async fn new(
config: Config,
state_manager: Arc<StateManager>,
session_store: SessionStore,
) -> Result<Self> {
let auth_manager = AuthManager::new(config.data_dir.clone());
let orchestrator = if config.dev_mode {
Some(Arc::new(
DevContainerOrchestrator::new(config.clone()).await?,
))
} else {
None
};
let port_allocator = Arc::new(Mutex::new(PortAllocator::new(&config.data_dir)?));
Ok(Self {
config,
auth_manager,
orchestrator,
state_manager,
port_allocator,
session_store,
login_rate_limiter: LoginRateLimiter::new(),
})
}
pub async fn handle(
&self,
req: Request<hyper::Body>,
) -> Result<Response<hyper::Body>> {
// Extract session cookie before consuming the request
let (parts, body) = req.into_parts();
let session_token = session::extract_session_cookie(&parts.headers);
let body_bytes = hyper::body::to_bytes(body).await
.context("Failed to read body")?;
let rpc_req: RpcRequest = serde_json::from_slice(&body_bytes)
.context("Invalid RPC request")?;
debug!("RPC method: {}", rpc_req.method);
// Enforce authentication for non-allowlisted methods
let is_unauthenticated = UNAUTHENTICATED_METHODS.contains(&rpc_req.method.as_str());
if !is_unauthenticated {
let authenticated = match &session_token {
Some(token) => self.session_store.validate(token).await,
None => false,
};
if !authenticated {
let rpc_resp = RpcResponse {
result: None,
error: Some(RpcError {
code: 401,
message: "Unauthorized".to_string(),
data: None,
}),
};
let resp_body = serde_json::to_vec(&rpc_resp)
.context("Failed to serialize response")?;
return Ok(Response::builder()
.status(StatusCode::UNAUTHORIZED)
.header("Content-Type", "application/json")
.body(hyper::Body::from(resp_body))
.unwrap());
}
}
// Rate limit login attempts
if rpc_req.method == "auth.login" {
let client_ip = extract_client_ip(&parts.headers);
if !self.login_rate_limiter.check(client_ip).await {
let rpc_resp = RpcResponse {
result: None,
error: Some(RpcError {
code: 429,
message: "Too many login attempts. Try again later.".to_string(),
data: None,
}),
};
let resp_body = serde_json::to_vec(&rpc_resp)
.context("Failed to serialize response")?;
return Ok(Response::builder()
.status(StatusCode::TOO_MANY_REQUESTS)
.header("Content-Type", "application/json")
.header("Retry-After", "60")
.body(hyper::Body::from(resp_body))
.unwrap());
}
}
// Extract params; clone for post-routing use (login 2FA check needs password)
let params = rpc_req.params;
let login_params: Option<serde_json::Value> = if rpc_req.method == "auth.login" {
params.clone()
} else {
None
};
// Route to handler
let result = match rpc_req.method.as_str() {
"echo" => self.handle_echo(params).await,
"server.echo" => self.handle_echo(params).await,
"auth.login" => self.handle_auth_login(params).await,
"auth.logout" => self.handle_auth_logout().await,
"auth.changePassword" => self.handle_auth_change_password(params).await,
"auth.onboardingComplete" => self.handle_auth_onboarding_complete().await,
"auth.isOnboardingComplete" => self.handle_auth_is_onboarding_complete().await,
"auth.resetOnboarding" => self.handle_auth_reset_onboarding().await,
// Container orchestration (for Archipelago-managed containers)
"container-install" => self.handle_container_install(params).await,
"container-start" => self.handle_container_start(params).await,
"container-stop" => self.handle_container_stop(params).await,
"container-remove" => self.handle_container_remove(params).await,
"container-list" => self.handle_container_list().await,
"container-status" => self.handle_container_status(params).await,
"container-logs" => self.handle_container_logs(params).await,
"container-health" => self.handle_container_health(params).await,
// Package management (for docker-compose apps)
"package.install" => self.handle_package_install(params).await,
"package.start" => self.handle_package_start(params).await,
"package.stop" => self.handle_package_stop(params).await,
"package.restart" => self.handle_package_restart(params).await,
"package.uninstall" => self.handle_package_uninstall(params).await,
// Bundled app management (for pre-loaded container images)
"bundled-app-start" => self.handle_bundled_app_start(params).await,
"bundled-app-stop" => self.handle_bundled_app_stop(params).await,
// Node identity and P2P peers
"node-add-peer" => self.handle_node_add_peer(params).await,
"node-list-peers" => self.handle_node_list_peers().await,
"node-remove-peer" => self.handle_node_remove_peer(params).await,
"node-send-message" => self.handle_node_send_message(params).await,
"node-check-peer" => self.handle_node_check_peer(params).await,
"node-messages-received" => self.handle_node_messages_received().await,
"node-nostr-discover" => self.handle_node_nostr_discover().await,
"node.did" => self.handle_node_did().await,
"node.signChallenge" => self.handle_node_sign_challenge(params).await,
"node.createBackup" => self.handle_node_create_backup(params).await,
"node.tor-address" => self.handle_node_tor_address().await,
"node.nostr-publish" => self.handle_node_nostr_publish().await,
"node.nostr-pubkey" => self.handle_node_nostr_pubkey().await,
"node-nostr-verify-revoked" => self.handle_node_nostr_verify_revoked().await,
// TOTP 2FA
"auth.totp.setup.begin" => self.handle_totp_setup_begin(params).await,
"auth.totp.setup.confirm" => self.handle_totp_setup_confirm(params).await,
"auth.totp.disable" => self.handle_totp_disable(params).await,
"auth.totp.status" => self.handle_totp_status().await,
"auth.login.totp" => self.handle_login_totp(params, &session_token).await,
"auth.login.backup" => self.handle_login_backup(params, &session_token).await,
// Bitcoin & Lightning deep data
"bitcoin.getinfo" => self.handle_bitcoin_getinfo().await,
"lnd.getinfo" => self.handle_lnd_getinfo().await,
_ => {
Err(anyhow::anyhow!("Unknown method: {}", rpc_req.method))
}
};
// Build response
let rpc_resp = match result {
Ok(data) => RpcResponse {
result: Some(data),
error: None,
},
Err(e) => {
error!("RPC error: {}", e);
RpcResponse {
result: None,
error: Some(RpcError {
code: -1,
message: e.to_string(),
data: None,
}),
}
}
};
let resp_body = serde_json::to_vec(&rpc_resp)
.context("Failed to serialize response")?;
let mut response = Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(hyper::Body::from(resp_body))
.unwrap();
// Track failed login attempts for rate limiting
if rpc_req.method == "auth.login" && rpc_resp.error.is_some() {
let client_ip = extract_client_ip(&parts.headers);
self.login_rate_limiter.record_failure(client_ip).await;
}
// On successful login, check if 2FA is required
if rpc_req.method == "auth.login" && rpc_resp.error.is_none() {
let totp_enabled = self.auth_manager.is_totp_enabled().await.unwrap_or(false);
if totp_enabled {
// 2FA enabled: create a pending session with cached TOTP secret
// We need the password to decrypt the TOTP secret for step 2
let password = login_params
.as_ref()
.and_then(|p| p.get("password"))
.and_then(|v| v.as_str())
.unwrap_or("");
if let Ok(Some(totp_data)) = self.auth_manager.get_totp_data().await {
if let Ok(secret) = crate::totp::decrypt_secret(&totp_data, password) {
let token = self.session_store.create_pending(secret).await;
response.headers_mut().insert(
"Set-Cookie",
format!("session={}; HttpOnly; SameSite=Strict; Path=/", token)
.parse()
.unwrap(),
);
// Override the response body to indicate TOTP is required
let totp_body = serde_json::json!({
"result": { "requires_totp": true },
"error": null
});
*response.body_mut() = hyper::Body::from(
serde_json::to_vec(&totp_body).unwrap_or_default(),
);
}
}
} else {
// No 2FA: create a full session immediately
let token = self.session_store.create().await;
response.headers_mut().insert(
"Set-Cookie",
format!("session={}; HttpOnly; SameSite=Strict; Path=/", token)
.parse()
.unwrap(),
);
}
}
// On successful TOTP verification, the session is already upgraded to full
// (handled inside handle_login_totp/handle_login_backup)
// On logout, invalidate session and expire the cookie
if rpc_req.method == "auth.logout" {
if let Some(token) = &session_token {
self.session_store.remove(token).await;
}
response.headers_mut().insert(
"Set-Cookie",
"session=; HttpOnly; SameSite=Strict; Path=/; Max-Age=0"
.parse()
.unwrap(),
);
}
Ok(response)
}
async fn handle_echo(&self, params: Option<serde_json::Value>) -> Result<serde_json::Value> {
if let Some(p) = params {
if let Some(msg) = p.get("message").and_then(|v| v.as_str()) {
return Ok(serde_json::json!({ "message": msg }));
}
}
Ok(serde_json::json!({ "message": "Hello from Archipelago!" }))
}
}
/// Extract the client IP from request headers (X-Real-IP or X-Forwarded-For).
fn extract_client_ip(headers: &hyper::HeaderMap) -> IpAddr {
headers
.get("x-real-ip")
.or_else(|| headers.get("x-forwarded-for"))
.and_then(|v| v.to_str().ok())
.and_then(|s| s.split(',').next())
.and_then(|s| s.trim().parse::<IpAddr>().ok())
.unwrap_or(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST))
}