Dorian df478c4a1e feat: Phase 3 Week 4 — mesh RPC endpoints for typed messages + session management
Backend (6 new RPC endpoints):
- mesh.send-invoice: create Lightning invoice, send bolt11 to mesh peer
- mesh.send-coordinate: send GPS coordinates (integer microdegrees)
- mesh.send-alert: send signed emergency alert (with optional GPS)
- mesh.outbox: list pending store-and-forward messages
- mesh.session-status: get Double Ratchet session info per peer
- mesh.rotate-prekeys: force X3DH prekey rotation

Mock backend: matching dev mode responses for all 6 new endpoints

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-17 02:23:30 +00:00

990 lines
45 KiB
Rust

mod analytics;
mod auth;
mod backup_rpc;
mod bitcoin;
mod container;
mod content;
mod credentials;
mod dwn;
mod federation;
mod handshake;
mod identity;
mod interfaces;
mod marketplace;
mod monitoring;
mod names;
mod lnd;
mod mesh;
mod network;
mod node;
mod nostr;
mod package;
mod peers;
mod router;
mod security;
mod tor;
mod transport;
mod totp;
mod system;
mod update;
mod vpn;
mod wallet;
mod webhooks;
use crate::auth::AuthManager;
use crate::config::Config;
use crate::container::DevContainerOrchestrator;
use crate::monitoring::MetricsStore;
use crate::port_allocator::PortAllocator;
use crate::session::{self, EndpointRateLimiter, LoginRateLimiter, SessionStore, REMEMBER_TTL};
use crate::state::StateManager;
use anyhow::{Context, Result};
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
use std::sync::{Arc, Mutex};
use tracing::{debug, error};
use rand::Rng;
#[derive(Debug, Deserialize)]
struct RpcRequest {
method: String,
params: Option<serde_json::Value>,
}
#[derive(Debug, Serialize)]
struct RpcResponse {
result: Option<serde_json::Value>,
error: Option<RpcError>,
}
#[derive(Debug, Serialize)]
struct RpcError {
code: i32,
message: String,
data: Option<serde_json::Value>,
}
/// Default dev password when no user is set up (matches mock-backend).
pub(crate) const DEV_DEFAULT_PASSWORD: &str = "password123";
/// Sanitize error messages before returning to clients.
/// Keeps user-facing validation errors but strips internal system details.
fn sanitize_error_message(msg: &str) -> String {
// Allow known validation errors through (these are user-actionable)
let user_facing_prefixes = [
"Invalid",
"Missing",
"Not found",
"Already exists",
"Rate limit",
"Unauthorized",
"Forbidden",
"Not supported",
"requires",
"must be",
"cannot",
"Password",
"Session",
];
for prefix in &user_facing_prefixes {
if msg.starts_with(prefix) || msg.contains(prefix) {
// Truncate long messages and strip file paths
let sanitized = msg.replace("/var/lib/archipelago/", "[data]/")
.replace("/usr/local/bin/", "[bin]/")
.replace("/etc/", "[config]/");
return if sanitized.len() > 200 {
format!("{}...", &sanitized[..200])
} else {
sanitized
};
}
}
// For all other errors, return a generic message
"Operation failed. Check server logs for details.".to_string()
}
/// Methods that do not require a valid session cookie.
const UNAUTHENTICATED_METHODS: &[&str] = &[
"auth.login",
"auth.login.totp",
"auth.login.backup",
"auth.isOnboardingComplete",
"auth.isSetup",
"health",
// Onboarding restore (before user account exists)
"backup.restore-identity",
// Inter-node RPC: called by federated peers over Tor, no session cookies
"federation.peer-joined",
"federation.peer-address-changed",
"federation.get-state",
];
/// Simple TTL cache for read-only RPC responses.
struct ResponseCache {
entries: tokio::sync::RwLock<std::collections::HashMap<String, (std::time::Instant, serde_json::Value)>>,
ttl: std::time::Duration,
}
impl ResponseCache {
fn new(ttl_secs: u64) -> Self {
Self {
entries: tokio::sync::RwLock::new(std::collections::HashMap::new()),
ttl: std::time::Duration::from_secs(ttl_secs),
}
}
async fn get(&self, key: &str) -> Option<serde_json::Value> {
let entries = self.entries.read().await;
if let Some((ts, value)) = entries.get(key) {
if ts.elapsed() < self.ttl {
return Some(value.clone());
}
}
None
}
async fn set(&self, key: String, value: serde_json::Value) {
let mut entries = self.entries.write().await;
entries.insert(key, (std::time::Instant::now(), value));
}
}
/// Methods whose responses can be cached for a few seconds.
const CACHEABLE_METHODS: &[&str] = &[
"system.stats",
"federation.list-nodes",
];
pub struct RpcHandler {
config: Config,
auth_manager: AuthManager,
orchestrator: Option<Arc<DevContainerOrchestrator>>,
state_manager: Arc<StateManager>,
pub(crate) metrics_store: Arc<MetricsStore>,
port_allocator: Arc<Mutex<PortAllocator>>,
pub session_store: SessionStore,
login_rate_limiter: LoginRateLimiter,
endpoint_rate_limiter: EndpointRateLimiter,
response_cache: ResponseCache,
mesh_service: Arc<tokio::sync::RwLock<Option<crate::mesh::MeshService>>>,
transport_router: Arc<tokio::sync::RwLock<Option<Arc<crate::transport::TransportRouter>>>>,
}
impl RpcHandler {
pub async fn new(
config: Config,
state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>,
session_store: SessionStore,
) -> Result<Self> {
let auth_manager = AuthManager::new(config.data_dir.clone());
let orchestrator = if config.dev_mode {
Some(Arc::new(
DevContainerOrchestrator::new(config.clone()).await?,
))
} else {
None
};
let port_allocator = Arc::new(Mutex::new(PortAllocator::new(&config.data_dir)?));
Ok(Self {
config,
auth_manager,
orchestrator,
state_manager,
metrics_store,
port_allocator,
session_store,
login_rate_limiter: LoginRateLimiter::new(),
endpoint_rate_limiter: EndpointRateLimiter::new(),
response_cache: ResponseCache::new(5),
mesh_service: Arc::new(tokio::sync::RwLock::new(None)),
transport_router: Arc::new(tokio::sync::RwLock::new(None)),
})
}
/// Set the mesh service (called after identity is loaded).
pub async fn set_mesh_service(&self, service: crate::mesh::MeshService) {
*self.mesh_service.write().await = Some(service);
}
/// Set the transport router (called after all transports are initialized).
pub async fn set_transport_router(&self, router: Arc<crate::transport::TransportRouter>) {
*self.transport_router.write().await = Some(router);
}
/// Get reference to the mesh service Arc (for MeshTransport wrapper).
pub fn mesh_service_arc(&self) -> Arc<tokio::sync::RwLock<Option<crate::mesh::MeshService>>> {
Arc::clone(&self.mesh_service)
}
fn cookie_suffix(&self) -> &'static str {
if self.config.dev_mode { "" } else { "; Secure" }
}
pub async fn handle(
&self,
req: Request<hyper::Body>,
) -> Result<Response<hyper::Body>> {
// Extract session cookie before consuming the request
let (parts, body) = req.into_parts();
let session_token = session::extract_session_cookie(&parts.headers);
let body_bytes = hyper::body::to_bytes(body).await
.context("Failed to read body")?;
let rpc_req: RpcRequest = serde_json::from_slice(&body_bytes)
.context("Invalid RPC request")?;
debug!("RPC method: {}", rpc_req.method);
// Enforce authentication for non-allowlisted methods
let is_unauthenticated = UNAUTHENTICATED_METHODS.contains(&rpc_req.method.as_str());
let mut new_session_cookies: Option<(String, String)> = None; // (session, csrf) if auto-restored
if !is_unauthenticated {
let mut authenticated = match &session_token {
Some(token) => self.session_store.validate(token).await,
None => false,
};
// If session invalid, try remember-me token to auto-restore session
if !authenticated {
if let Some(remember) = extract_cookie(&parts.headers, "remember") {
if crate::session::SessionStore::validate_remember_token(&remember) {
// Auto-create a new session from the remember-me token
let new_token = self.session_store.create().await;
let new_csrf = generate_csrf_token();
tracing::info!("Auto-restored session from remember-me token");
new_session_cookies = Some((new_token, new_csrf));
authenticated = true;
}
}
}
if !authenticated {
let reason = if session_token.is_none() { "no session cookie" } else { "invalid/expired token" };
tracing::warn!(method = %rpc_req.method, reason, "401 Unauthorized — rejecting RPC call");
let rpc_resp = RpcResponse {
result: None,
error: Some(RpcError {
code: 401,
message: "Unauthorized".to_string(),
data: None,
}),
};
let resp_body = serde_json::to_vec(&rpc_resp)
.context("Failed to serialize response")?;
return Ok(Response::builder()
.status(StatusCode::UNAUTHORIZED)
.header("Content-Type", "application/json")
.body(hyper::Body::from(resp_body))
.unwrap());
}
}
// RBAC: check if the user's role allows this method
if !is_unauthenticated {
if let Ok(Some(user)) = self.auth_manager.get_user().await {
if !user.role.can_access(&rpc_req.method) {
let rpc_resp = RpcResponse {
result: None,
error: Some(RpcError {
code: 403,
message: "Forbidden: insufficient permissions".to_string(),
data: None,
}),
};
let resp_body = serde_json::to_vec(&rpc_resp)
.context("Failed to serialize response")?;
return Ok(Response::builder()
.status(StatusCode::FORBIDDEN)
.header("Content-Type", "application/json")
.body(hyper::Body::from(resp_body))
.unwrap());
}
}
}
// CSRF protection: validate X-CSRF-Token header for authenticated methods
// Skip CSRF check if session was just auto-restored from remember-me (new CSRF will be set in response)
if !is_unauthenticated && new_session_cookies.is_none() {
let csrf_cookie = extract_csrf_cookie(&parts.headers);
let csrf_header = parts
.headers
.get("x-csrf-token")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
match (&csrf_cookie, &csrf_header) {
(Some(cookie), Some(header)) if cookie == header => { /* valid */ }
_ => {
tracing::warn!(
method = %rpc_req.method,
has_cookie = csrf_cookie.is_some(),
has_header = csrf_header.is_some(),
"403 CSRF mismatch — rejecting RPC call"
);
let rpc_resp = RpcResponse {
result: None,
error: Some(RpcError {
code: 403,
message: "CSRF token missing or invalid".to_string(),
data: None,
}),
};
let resp_body = serde_json::to_vec(&rpc_resp)
.context("Failed to serialize response")?;
return Ok(Response::builder()
.status(StatusCode::FORBIDDEN)
.header("Content-Type", "application/json")
.body(hyper::Body::from(resp_body))
.unwrap());
}
}
}
// Rate limit login attempts
if rpc_req.method == "auth.login" {
let client_ip = extract_client_ip(&parts.headers);
if !self.login_rate_limiter.check(client_ip).await {
let rpc_resp = RpcResponse {
result: None,
error: Some(RpcError {
code: 429,
message: "Too many login attempts. Try again later.".to_string(),
data: None,
}),
};
let resp_body = serde_json::to_vec(&rpc_resp)
.context("Failed to serialize response")?;
return Ok(Response::builder()
.status(StatusCode::TOO_MANY_REQUESTS)
.header("Content-Type", "application/json")
.header("Retry-After", "60")
.body(hyper::Body::from(resp_body))
.unwrap());
}
}
// Rate limit sensitive endpoints (wallet, identity, backup, container, etc.)
{
let client_ip = extract_client_ip(&parts.headers);
if !self.endpoint_rate_limiter.check(&rpc_req.method, client_ip).await {
let rpc_resp = RpcResponse {
result: None,
error: Some(RpcError {
code: 429,
message: "Rate limit exceeded for this operation. Try again later.".to_string(),
data: None,
}),
};
let resp_body = serde_json::to_vec(&rpc_resp)
.context("Failed to serialize response")?;
return Ok(Response::builder()
.status(StatusCode::TOO_MANY_REQUESTS)
.header("Content-Type", "application/json")
.header("Retry-After", "60")
.body(hyper::Body::from(resp_body))
.unwrap());
}
self.endpoint_rate_limiter.record(&rpc_req.method, client_ip).await;
}
// Extract params; clone for post-routing use (login 2FA check needs password)
let params = rpc_req.params;
let login_params: Option<serde_json::Value> = if rpc_req.method == "auth.login" {
params.clone()
} else {
None
};
// Check cache for cacheable methods
let is_cacheable = CACHEABLE_METHODS.contains(&rpc_req.method.as_str());
if is_cacheable {
if let Some(cached) = self.response_cache.get(&rpc_req.method).await {
let rpc_resp = RpcResponse {
result: Some(cached),
error: None,
};
return Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(hyper::Body::from(serde_json::to_string(&rpc_resp)?))
.unwrap());
}
}
// Route to handler (track latency for metrics)
let rpc_start = std::time::Instant::now();
let result = match rpc_req.method.as_str() {
"echo" => self.handle_echo(params).await,
"server.echo" => self.handle_echo(params).await,
"auth.login" => self.handle_auth_login(params).await,
"auth.logout" => self.handle_auth_logout().await,
"auth.changePassword" => self.handle_auth_change_password(params, &session_token).await,
"auth.onboardingComplete" => self.handle_auth_onboarding_complete().await,
"auth.isOnboardingComplete" => self.handle_auth_is_onboarding_complete().await,
"auth.resetOnboarding" => self.handle_auth_reset_onboarding().await,
// Container orchestration (for Archipelago-managed containers)
"container-install" => self.handle_container_install(params).await,
"container-start" => self.handle_container_start(params).await,
"container-stop" => self.handle_container_stop(params).await,
"container-remove" => self.handle_container_remove(params).await,
"container-list" => self.handle_container_list().await,
"container-status" => self.handle_container_status(params).await,
"container-logs" => self.handle_container_logs(params).await,
"container-health" => self.handle_container_health(params).await,
// Package management (for docker-compose apps)
"package.install" => self.handle_package_install(params).await,
"package.start" => self.handle_package_start(params).await,
"package.stop" => self.handle_package_stop(params).await,
"package.restart" => self.handle_package_restart(params).await,
"package.uninstall" => self.handle_package_uninstall(params).await,
// Bundled app management (for pre-loaded container images)
"bundled-app-start" => self.handle_bundled_app_start(params).await,
"bundled-app-stop" => self.handle_bundled_app_stop(params).await,
// Node identity and P2P peers
"node-add-peer" => self.handle_node_add_peer(params).await,
"node-list-peers" => self.handle_node_list_peers().await,
"node-remove-peer" => self.handle_node_remove_peer(params).await,
"node-send-message" => self.handle_node_send_message(params).await,
"node-check-peer" => self.handle_node_check_peer(params).await,
"node-messages-received" => self.handle_node_messages_received().await,
"node-nostr-discover" => self.handle_node_nostr_discover().await,
"node.did" => self.handle_node_did().await,
"node.signChallenge" => self.handle_node_sign_challenge(params).await,
"node.createBackup" => self.handle_node_create_backup(params).await,
"node.tor-address" => self.handle_node_tor_address().await,
"node.nostr-publish" => self.handle_node_nostr_publish().await,
"node.nostr-pubkey" => self.handle_node_nostr_pubkey().await,
"node.nostr-sign" => self.handle_node_nostr_sign(params).await,
"node-nostr-verify-revoked" => self.handle_node_nostr_verify_revoked().await,
// Encrypted peer handshake (NIP-44)
"handshake.discover" => self.handle_handshake_discover().await,
"handshake.connect" => self.handle_handshake_connect(params).await,
"handshake.poll" => self.handle_handshake_poll().await,
// TOTP 2FA
"auth.totp.setup.begin" => self.handle_totp_setup_begin(params).await,
"auth.totp.setup.confirm" => self.handle_totp_setup_confirm(params).await,
"auth.totp.disable" => self.handle_totp_disable(params).await,
"auth.totp.status" => self.handle_totp_status().await,
"auth.login.totp" => self.handle_login_totp(params, &session_token).await,
"auth.login.backup" => self.handle_login_backup(params, &session_token).await,
// Bitcoin & Lightning deep data
"bitcoin.getinfo" => self.handle_bitcoin_getinfo().await,
"lnd.getinfo" => self.handle_lnd_getinfo().await,
"lnd.listchannels" => self.handle_lnd_listchannels().await,
"lnd.openchannel" => self.handle_lnd_openchannel(params).await,
"lnd.closechannel" => self.handle_lnd_closechannel(params).await,
"lnd.newaddress" => self.handle_lnd_newaddress().await,
"lnd.sendcoins" => self.handle_lnd_sendcoins(params).await,
"lnd.createinvoice" => self.handle_lnd_createinvoice(params).await,
"lnd.payinvoice" => self.handle_lnd_payinvoice(params).await,
"lnd.create-psbt" => self.handle_lnd_create_psbt(params).await,
"lnd.finalize-psbt" => self.handle_lnd_finalize_psbt(params).await,
"lnd.gettransactions" => self.handle_lnd_gettransactions().await,
"lnd.connect-info" => self.handle_lnd_connect_info().await,
// Multi-identity management
"identity.list" => self.handle_identity_list(params).await,
"identity.create" => self.handle_identity_create(params).await,
"identity.get" => self.handle_identity_get(params).await,
"identity.delete" => self.handle_identity_delete(params).await,
"identity.set-default" => self.handle_identity_set_default(params).await,
"identity.sign" => self.handle_identity_sign(params).await,
"identity.verify" => self.handle_identity_verify(params).await,
"identity.resolve-did" => self.handle_identity_resolve_did(params).await,
"identity.resolve-remote-did" => self.handle_identity_resolve_remote_did(params).await,
"identity.verify-did-document" => self.handle_identity_verify_did_document(params).await,
"identity.create-dht-did" => self.handle_identity_create_dht_did(params).await,
"identity.resolve-dht-did" => self.handle_identity_resolve_dht_did(params).await,
"identity.refresh-dht-did" => self.handle_identity_refresh_dht_did(params).await,
"identity.dht-status" => self.handle_identity_dht_status(params).await,
"identity.update-profile" => self.handle_identity_update_profile(params).await,
"identity.publish-profile" => self.handle_identity_publish_profile(params).await,
"identity.export-keys" => self.handle_identity_export_keys(params).await,
"identity.create-nostr-key" => self.handle_identity_create_nostr_key(params).await,
"identity.nostr-sign" => self.handle_identity_nostr_sign(params).await,
"identity.nostr-encrypt-nip04" => self.handle_identity_nostr_encrypt_nip04(params).await,
"identity.nostr-decrypt-nip04" => self.handle_identity_nostr_decrypt_nip04(params).await,
"identity.nostr-encrypt-nip44" => self.handle_identity_nostr_encrypt_nip44(params).await,
"identity.nostr-decrypt-nip44" => self.handle_identity_nostr_decrypt_nip44(params).await,
// Bitcoin domain names (NIP-05)
"identity.register-name" => self.handle_identity_register_name(params).await,
"identity.remove-name" => self.handle_identity_remove_name(params).await,
"identity.resolve-name" => self.handle_identity_resolve_name(params).await,
"identity.list-names" => self.handle_identity_list_names(params).await,
"identity.link-name" => self.handle_identity_link_name(params).await,
// Verifiable Credentials
"identity.issue-credential" => self.handle_identity_issue_credential(params).await,
"identity.verify-credential" => self.handle_identity_verify_credential(params).await,
"identity.list-credentials" => self.handle_identity_list_credentials(params).await,
"identity.revoke-credential" => self.handle_identity_revoke_credential(params).await,
"identity.create-presentation" => self.handle_identity_create_presentation(params).await,
"identity.verify-presentation" => self.handle_identity_verify_presentation(params).await,
// Network overlay
"network.get-visibility" => self.handle_network_get_visibility().await,
"network.set-visibility" => self.handle_network_set_visibility(params).await,
"network.request-connection" => self.handle_network_request_connection(params).await,
"network.list-requests" => self.handle_network_list_requests().await,
"network.accept-request" => self.handle_network_accept_request(params).await,
"network.reject-request" => self.handle_network_reject_request(params).await,
// Tor hidden services
"tor.list-services" => self.handle_tor_list_services().await,
"tor.create-service" => self.handle_tor_create_service(params).await,
"tor.delete-service" => self.handle_tor_delete_service(params).await,
"tor.get-onion-address" => self.handle_tor_get_onion_address(params).await,
"tor.rotate-service" => self.handle_tor_rotate_service(params).await,
"tor.cleanup-rotated" => self.handle_tor_cleanup_rotated().await,
"tor.toggle-app" => self.handle_tor_toggle_app(params).await,
// Nostr relay management
"nostr.list-relays" => self.handle_nostr_list_relays().await,
"nostr.add-relay" => self.handle_nostr_add_relay(params).await,
"nostr.remove-relay" => self.handle_nostr_remove_relay(params).await,
"nostr.toggle-relay" => self.handle_nostr_toggle_relay(params).await,
"nostr.get-stats" => self.handle_nostr_get_stats().await,
// Router / UPnP
"router.discover" => self.handle_router_discover().await,
"router.list-forwards" => self.handle_router_list_forwards().await,
"router.add-forward" => self.handle_router_add_forward(params).await,
"router.remove-forward" => self.handle_router_remove_forward(params).await,
"network.diagnostics" => self.handle_network_diagnostics().await,
"network.list-interfaces" => self.handle_network_list_interfaces().await,
"network.scan-wifi" => self.handle_network_scan_wifi().await,
"network.configure-wifi" => self.handle_network_configure_wifi(params).await,
"network.configure-ethernet" => self.handle_network_configure_ethernet(params).await,
"network.dns-status" => self.handle_network_dns_status().await,
"network.configure-dns" => self.handle_network_configure_dns(params).await,
"router.detect" => self.handle_router_detect(params).await,
"router.info" => self.handle_router_info().await,
"router.configure" => self.handle_router_configure(params).await,
// Ecash wallet
"wallet.ecash-balance" => self.handle_wallet_ecash_balance().await,
"wallet.ecash-mint" => self.handle_wallet_ecash_mint(params).await,
"wallet.ecash-melt" => self.handle_wallet_ecash_melt(params).await,
"wallet.ecash-send" => self.handle_wallet_ecash_send(params).await,
"wallet.ecash-receive" => self.handle_wallet_ecash_receive(params).await,
"wallet.ecash-history" => self.handle_wallet_ecash_history().await,
"wallet.networking-profits" => self.handle_wallet_networking_profits().await,
// Content catalog management
"content.list-mine" => self.handle_content_list_mine().await,
"content.add" => self.handle_content_add(params).await,
"content.remove" => self.handle_content_remove(params).await,
"content.set-pricing" => self.handle_content_set_pricing(params).await,
"content.set-availability" => self.handle_content_set_availability(params).await,
"content.browse-peer" => self.handle_content_browse_peer(params).await,
"content.download-peer" => self.handle_content_download_peer(params).await,
// DWN (Decentralized Web Node)
"dwn.status" => self.handle_dwn_status().await,
"dwn.sync" => self.handle_dwn_sync().await,
"dwn.register-protocol" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_dwn_register_protocol(&p).await
}
"dwn.list-protocols" => self.handle_dwn_list_protocols().await,
"dwn.remove-protocol" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_dwn_remove_protocol(&p).await
}
"dwn.query-messages" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_dwn_query_messages(&p).await
}
"dwn.write-message" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_dwn_write_message(&p).await
}
// Federation
"federation.invite" => self.handle_federation_invite().await,
"federation.join" => self.handle_federation_join(params).await,
"federation.list-nodes" => self.handle_federation_list_nodes().await,
"federation.remove-node" => self.handle_federation_remove_node(params).await,
"federation.set-trust" => self.handle_federation_set_trust(params).await,
"federation.sync-state" => self.handle_federation_sync_state().await,
"federation.get-state" => self.handle_federation_get_state().await,
"federation.peer-joined" => self.handle_federation_peer_joined(params).await,
"federation.deploy-app" => self.handle_federation_deploy_app(params).await,
"federation.peer-address-changed" => self.handle_federation_peer_address_changed(params).await,
// VPN & Remote Access
"vpn.status" => self.handle_vpn_status().await,
"vpn.configure" => self.handle_vpn_configure(params).await,
"vpn.disconnect" => self.handle_vpn_disconnect().await,
"remote.setup" => self.handle_remote_setup(params).await,
// Marketplace
"marketplace.discover" => self.handle_marketplace_discover().await,
"marketplace.publish" => self.handle_marketplace_publish(params).await,
"marketplace.get-manifest" => self.handle_marketplace_get_manifest(params).await,
"marketplace.list-published" => self.handle_marketplace_list_published().await,
"marketplace.verify" => self.handle_marketplace_verify(params).await,
"marketplace.create-invoice" => self.handle_marketplace_create_invoice(params).await,
"marketplace.check-payment" => self.handle_marketplace_check_payment(params).await,
// Mesh networking (Meshcore LoRa)
"mesh.status" => self.handle_mesh_status().await,
"mesh.peers" => self.handle_mesh_peers().await,
"mesh.messages" => self.handle_mesh_messages(params).await,
"mesh.send" => self.handle_mesh_send(params).await,
"mesh.broadcast" => self.handle_mesh_broadcast().await,
"mesh.configure" => self.handle_mesh_configure(params).await,
"mesh.send-invoice" => self.handle_mesh_send_invoice(params).await,
"mesh.send-coordinate" => self.handle_mesh_send_coordinate(params).await,
"mesh.send-alert" => self.handle_mesh_send_alert(params).await,
"mesh.outbox" => self.handle_mesh_outbox(params).await,
"mesh.session-status" => self.handle_mesh_session_status(params).await,
"mesh.rotate-prekeys" => self.handle_mesh_rotate_prekeys().await,
// Transport layer (unified routing)
"transport.status" => self.handle_transport_status().await,
"transport.peers" => self.handle_transport_peers().await,
"transport.send" => self.handle_transport_send(params).await,
"transport.set-mode" => self.handle_transport_set_mode(params).await,
// Server settings
"server.set-name" => self.handle_server_set_name(params).await,
// System monitoring
"system.stats" => self.handle_system_stats().await,
"system.processes" => self.handle_system_processes().await,
"system.temperature" => self.handle_system_temperature().await,
"system.detect-usb-devices" => self.handle_system_detect_usb_devices().await,
"system.disk-status" => self.handle_system_disk_status().await,
"system.disk-cleanup" => self.handle_system_disk_cleanup().await,
"system.factory-reset" => self.handle_system_factory_reset(params).await,
// Opt-in anonymous analytics
"analytics.get-status" => self.handle_analytics_get_status().await,
"analytics.enable" => self.handle_analytics_enable().await,
"analytics.disable" => self.handle_analytics_disable().await,
"analytics.get-snapshot" => self.handle_analytics_get_snapshot().await,
// Real-time metrics monitoring
"monitoring.current" => self.handle_monitoring_current().await,
"monitoring.history" => self.handle_monitoring_history(params).await,
"monitoring.containers" => self.handle_monitoring_containers().await,
"monitoring.alerts" => self.handle_monitoring_alerts(params).await,
"monitoring.alert-rules" => self.handle_monitoring_alert_rules().await,
"monitoring.configure-alert" => self.handle_monitoring_configure_alert(params).await,
"monitoring.acknowledge-alert" => self.handle_monitoring_acknowledge_alert(params).await,
"monitoring.export" => self.handle_monitoring_export(params).await,
// System updates
"update.check" => self.handle_update_check().await,
"update.status" => self.handle_update_status().await,
"update.dismiss" => self.handle_update_dismiss().await,
"update.download" => self.handle_update_download().await,
"update.apply" => self.handle_update_apply().await,
"update.rollback" => self.handle_update_rollback().await,
"update.get-schedule" => self.handle_update_get_schedule().await,
"update.set-schedule" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_update_set_schedule(&p).await
}
// Backup & Restore
"backup.create" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_create(&p).await
}
"backup.list" => self.handle_backup_list().await,
"backup.verify" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_verify(&p).await
}
"backup.restore" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_restore(&p).await
}
"backup.restore-identity" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_restore_identity(&p).await
}
"backup.delete" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_delete(&p).await
}
"backup.list-drives" => self.handle_backup_list_drives().await,
"backup.to-usb" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_to_usb(&p).await
}
"backup.upload-s3" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_upload_s3(&p).await
}
"backup.download-s3" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_download_s3(&p).await
}
// Security / secrets
"security.rotate-secrets" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_security_rotate_secrets(&p).await
}
"security.list-expiring" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_security_list_expiring(&p).await
}
// Webhooks
"webhook.get-config" => self.handle_webhook_get_config().await,
"webhook.configure" => self.handle_webhook_configure(params).await,
"webhook.test" => self.handle_webhook_test().await,
_ => {
Err(anyhow::anyhow!("Unknown method: {}", rpc_req.method))
}
};
// Record RPC latency for monitoring
let elapsed_ms = rpc_start.elapsed().as_secs_f64() * 1000.0;
self.metrics_store.record_rpc_latency(elapsed_ms).await;
// Build response (cache successful results for cacheable methods)
let rpc_resp = match result {
Ok(data) => {
if is_cacheable {
self.response_cache.set(rpc_req.method.clone(), data.clone()).await;
}
RpcResponse {
result: Some(data),
error: None,
}
}
Err(e) => {
error!("RPC error on {}: {}", rpc_req.method, e);
// Sanitize error messages: only return user-facing text, not internal details
let user_message = sanitize_error_message(&e.to_string());
RpcResponse {
result: None,
error: Some(RpcError {
code: -1,
message: user_message,
data: None,
}),
}
}
};
let resp_body = serde_json::to_vec(&rpc_resp)
.context("Failed to serialize response")?;
let mut response = Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(hyper::Body::from(resp_body))
.unwrap();
// Track failed login attempts for rate limiting
if rpc_req.method == "auth.login" && rpc_resp.error.is_some() {
let client_ip = extract_client_ip(&parts.headers);
self.login_rate_limiter.record_failure(client_ip).await;
}
// On successful login, check if 2FA is required
if rpc_req.method == "auth.login" && rpc_resp.error.is_none() {
let totp_enabled = self.auth_manager.is_totp_enabled().await.unwrap_or(false);
if totp_enabled {
// 2FA enabled: create a pending session with cached TOTP secret
// We need the password to decrypt the TOTP secret for step 2
let password = login_params
.as_ref()
.and_then(|p| p.get("password"))
.and_then(|v| v.as_str())
.unwrap_or("");
if let Ok(Some(totp_data)) = self.auth_manager.get_totp_data().await {
if let Ok(secret) = crate::totp::decrypt_secret(&totp_data, password) {
let token = self.session_store.create_pending(secret).await;
let csrf_token = generate_csrf_token();
response.headers_mut().append(
"Set-Cookie",
format!("session={}; HttpOnly; SameSite=Strict; Path=/{}", token, self.cookie_suffix())
.parse()
.unwrap(),
);
response.headers_mut().append(
"Set-Cookie",
format!("csrf_token={}; SameSite=Strict; Path=/{}", csrf_token, self.cookie_suffix())
.parse()
.unwrap(),
);
// Override the response body to indicate TOTP is required
let totp_body = serde_json::json!({
"result": { "requires_totp": true },
"error": null
});
*response.body_mut() = hyper::Body::from(
serde_json::to_vec(&totp_body).unwrap_or_default(),
);
}
}
} else {
// No 2FA: create a full session immediately
let token = self.session_store.create().await;
let csrf_token = generate_csrf_token();
let remember_token = self.session_store.create_remember_token();
response.headers_mut().append(
"Set-Cookie",
format!("session={}; HttpOnly; SameSite=Strict; Path=/{}", token, self.cookie_suffix())
.parse()
.unwrap(),
);
response.headers_mut().append(
"Set-Cookie",
format!("csrf_token={}; SameSite=Strict; Path=/{}", csrf_token, self.cookie_suffix())
.parse()
.unwrap(),
);
// Remember-me: HMAC-signed, survives backend restarts (30-day TTL)
response.headers_mut().append(
"Set-Cookie",
format!("remember={}; HttpOnly; SameSite=Strict; Path=/; Max-Age={}{}", remember_token, REMEMBER_TTL, self.cookie_suffix())
.parse()
.unwrap(),
);
}
}
// On successful TOTP verification, the session is already upgraded to full
// (handled inside handle_login_totp/handle_login_backup)
// On password change, rotate the session token for the caller
if rpc_req.method == "auth.changePassword" && rpc_resp.error.is_none() {
if let Some(token) = &session_token {
let new_token = self.session_store.rotate(token).await;
let csrf_token = generate_csrf_token();
response.headers_mut().append(
"Set-Cookie",
format!(
"session={}; HttpOnly; SameSite=Strict; Path=/{}",
new_token,
self.cookie_suffix()
)
.parse()
.unwrap(),
);
response.headers_mut().append(
"Set-Cookie",
format!(
"csrf_token={}; SameSite=Strict; Path=/{}",
csrf_token,
self.cookie_suffix()
)
.parse()
.unwrap(),
);
}
}
// On logout, invalidate session and expire cookies
if rpc_req.method == "auth.logout" {
if let Some(token) = &session_token {
self.session_store.remove(token).await;
}
let secure_suffix = if self.config.dev_mode { "" } else { "; Secure" };
response.headers_mut().append(
"Set-Cookie",
format!("session=; HttpOnly; SameSite=Strict; Path=/; Max-Age=0{}", secure_suffix)
.parse()
.unwrap(),
);
response.headers_mut().append(
"Set-Cookie",
format!("csrf_token=; SameSite=Strict; Path=/; Max-Age=0{}", secure_suffix)
.parse()
.unwrap(),
);
}
// If session was auto-restored from remember-me, set new cookies on the response
if let Some((new_session, new_csrf)) = new_session_cookies {
let suffix = self.cookie_suffix();
response.headers_mut().append(
"Set-Cookie",
format!("session={}; HttpOnly; SameSite=Strict; Path=/{}", new_session, suffix)
.parse()
.unwrap(),
);
response.headers_mut().append(
"Set-Cookie",
format!("csrf_token={}; SameSite=Strict; Path=/{}", new_csrf, suffix)
.parse()
.unwrap(),
);
}
Ok(response)
}
async fn handle_echo(&self, params: Option<serde_json::Value>) -> Result<serde_json::Value> {
if let Some(p) = params {
if let Some(msg) = p.get("message").and_then(|v| v.as_str()) {
return Ok(serde_json::json!({ "message": msg }));
}
}
Ok(serde_json::json!({ "message": "Hello from Archipelago!" }))
}
}
/// Generate a random CSRF token (32-byte hex string).
fn generate_csrf_token() -> String {
let mut bytes = [0u8; 32];
rand::thread_rng().fill(&mut bytes);
hex::encode(bytes)
}
/// Extract a named cookie value from headers.
fn extract_cookie(headers: &hyper::HeaderMap, name: &str) -> Option<String> {
let prefix = format!("{}=", name);
for value in headers.get_all("cookie") {
if let Ok(s) = value.to_str() {
for part in s.split(';') {
let part = part.trim();
if let Some(val) = part.strip_prefix(&prefix) {
let val = val.trim();
if !val.is_empty() {
return Some(val.to_string());
}
}
}
}
}
None
}
/// Extract the csrf_token cookie value from headers.
fn extract_csrf_cookie(headers: &hyper::HeaderMap) -> Option<String> {
extract_cookie(headers, "csrf_token")
}
/// Extract the client IP from request headers (X-Real-IP or X-Forwarded-For).
fn extract_client_ip(headers: &hyper::HeaderMap) -> IpAddr {
headers
.get("x-real-ip")
.or_else(|| headers.get("x-forwarded-for"))
.and_then(|v| v.to_str().ok())
.and_then(|s| s.split(',').next())
.and_then(|s| s.trim().parse::<IpAddr>().ok())
.unwrap_or(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST))
}