310 lines
12 KiB
Rust
Raw Normal View History

2026-01-24 22:59:20 +00:00
use crate::api::ApiHandler;
use crate::config::{Config, ContainerRuntime};
use crate::container::{docker_packages, DockerPackageScanner};
use crate::identity::{self, NodeIdentity};
use crate::monitoring::MetricsStore;
use crate::node_message;
use crate::nostr_discovery;
use crate::peers;
use crate::state::StateManager;
2026-01-24 22:59:20 +00:00
use anyhow::Result;
use hyper::server::conn::Http;
use hyper::service::service_fn;
2026-01-24 22:59:20 +00:00
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
2026-01-24 22:59:20 +00:00
use tokio::net::TcpListener;
use tracing::{debug, error, info, warn};
2026-01-24 22:59:20 +00:00
pub struct Server {
_config: Config,
_identity: Arc<NodeIdentity>,
2026-01-24 22:59:20 +00:00
api_handler: Arc<ApiHandler>,
_state_manager: Arc<StateManager>,
2026-01-24 22:59:20 +00:00
}
impl Server {
pub async fn new(config: Config) -> Result<Self> {
let state_manager = Arc::new(StateManager::new());
// Load node identity and set stable server_info
let identity_dir = config.data_dir.join("identity");
let identity = NodeIdentity::load_or_create(&identity_dir).await?;
let (mut data, _) = state_manager.get_snapshot().await;
data.server_info.id = identity.node_id();
data.server_info.pubkey = identity.pubkey_hex();
data.server_info.tor_address = docker_packages::read_tor_address("archipelago");
if let Some(ref tor) = data.server_info.tor_address {
data.server_info.node_address = Some(identity.node_address(tor));
}
state_manager.update_data(data.clone()).await;
// Revoke any previously published Nostr data (runs before publish so revocation is not overwritten)
let identity_dir = config.data_dir.join("identity");
let tor_proxy_revoke = config.nostr_tor_proxy.clone();
if let Err(e) = nostr_discovery::revoke_if_needed(&identity_dir, tor_proxy_revoke.as_deref()).await {
tracing::debug!("Nostr revoke (non-fatal): {}", e);
}
// Publish node identity to Nostr only when opt-in (nostr_discovery_enabled + relays)
if config.nostr_discovery_enabled
&& !config.nostr_relays.is_empty()
&& data.server_info.node_address.is_some()
{
let identity_dir = config.data_dir.join("identity");
let did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey).unwrap_or_default();
let node_addr = data.server_info.node_address.clone().unwrap_or_default();
let version = data.server_info.version.clone();
let relays = config.nostr_relays.clone();
let tor_proxy = config.nostr_tor_proxy.clone();
tokio::spawn(async move {
if let Err(e) = nostr_discovery::publish_node_identity(
&identity_dir,
&did,
&node_addr,
&version,
&relays,
tor_proxy.as_deref(),
)
.await
{
tracing::debug!("Nostr publish (non-fatal): {}", e);
}
});
}
info!("🔑 Node identity: {} (pubkey: {}...)", identity.node_id(), &identity.pubkey_hex()[..16.min(identity.pubkey_hex().len())]);
let identity = Arc::new(identity);
// Create metrics store and spawn background collector
let metrics_store = Arc::new(MetricsStore::new());
crate::monitoring::spawn_metrics_collector(metrics_store.clone(), Some(state_manager.clone()));
let api_handler = Arc::new(
ApiHandler::new(config.clone(), state_manager.clone(), metrics_store).await?,
);
// Periodic Tor address refresh (runs regardless of dev_mode)
// Picks up hostname when Tor creates it after startup/rotation (30-60s delay)
{
let state = state_manager.clone();
let identity_clone = identity.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
if let Err(e) = refresh_tor_address(&state, identity_clone.as_ref()).await {
debug!("Tor address refresh (non-fatal): {}", e);
}
}
});
}
// Initialize Docker scanner if in dev mode
if config.dev_mode {
let scanner = create_docker_scanner(&config).await?;
let state = state_manager.clone();
let identity_clone = identity.clone();
// Initial scan
tokio::spawn(async move {
info!("🐳 Scanning Docker containers...");
if let Err(e) = scan_and_update_packages(&scanner, &state, identity_clone.as_ref()).await {
error!("Failed to scan Docker containers: {}", e);
}
// Periodic scan every 10 seconds (only broadcasts if state changed)
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
interval.tick().await;
if let Err(e) = scan_and_update_packages(&scanner, &state, identity_clone.as_ref()).await {
error!("Failed to update Docker containers: {}", e);
}
}
});
}
2026-01-24 22:59:20 +00:00
// Peer health monitoring — check every 5 minutes
{
let state = state_manager.clone();
let data_dir = config.data_dir.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(300));
loop {
interval.tick().await;
if let Err(e) = check_peer_health(&state, &data_dir).await {
debug!("Peer health check (non-fatal): {}", e);
}
}
});
}
// Container health monitoring — auto-restart unhealthy containers
crate::health_monitor::spawn_health_monitor(state_manager.clone());
2026-01-24 22:59:20 +00:00
Ok(Self {
_config: config,
_identity: identity,
2026-01-24 22:59:20 +00:00
api_handler,
_state_manager: state_manager,
2026-01-24 22:59:20 +00:00
})
}
pub async fn serve(&self, addr: SocketAddr) -> Result<()> {
self.serve_with_shutdown(addr, std::future::pending()).await
}
/// Serve with a graceful shutdown signal.
/// When the shutdown future completes, stop accepting new connections and drain in-flight requests.
pub async fn serve_with_shutdown(
&self,
addr: SocketAddr,
shutdown: impl std::future::Future<Output = ()>,
) -> Result<()> {
2026-01-24 22:59:20 +00:00
let listener = TcpListener::bind(addr).await?;
let active_connections = Arc::new(tokio::sync::Semaphore::new(1024));
tokio::pin!(shutdown);
2026-01-24 22:59:20 +00:00
loop {
tokio::select! {
result = listener.accept() => {
let (stream, peer_addr) = match result {
Ok(conn) => conn,
Err(e) => {
error!("Failed to accept connection: {}", e);
continue;
}
};
let handler = self.api_handler.clone();
let permit = active_connections.clone().acquire_owned().await;
tokio::spawn(async move {
let _permit = permit;
let service = service_fn(move |req| {
let handler = handler.clone();
async move {
handler.handle_request(req).await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))
}
});
if let Err(e) = Http::new()
.http1_keep_alive(false)
.serve_connection(stream, service)
.with_upgrades()
.await
{
error!("Error serving connection from {}: {}", peer_addr, e);
}
});
2026-01-24 22:59:20 +00:00
}
_ = &mut shutdown => {
info!("Shutdown signal received, draining connections...");
// Wait up to 5 seconds for in-flight requests to complete
let drain_start = std::time::Instant::now();
let drain_timeout = std::time::Duration::from_secs(5);
while active_connections.available_permits() < 1024 {
if drain_start.elapsed() > drain_timeout {
warn!("Drain timeout reached, forcing shutdown");
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2026-01-24 22:59:20 +00:00
}
info!("Shutdown complete");
return Ok(());
2026-01-24 22:59:20 +00:00
}
}
2026-01-24 22:59:20 +00:00
}
}
}
async fn create_docker_scanner(config: &Config) -> Result<DockerPackageScanner> {
let user = std::env::var("USER").unwrap_or_else(|_| "archipelago".to_string());
let runtime: Arc<dyn archipelago_container::ContainerRuntime> = match &config.container_runtime {
ContainerRuntime::Podman => {
Arc::new(archipelago_container::PodmanRuntime::new(user.clone()))
}
ContainerRuntime::Docker => {
Arc::new(archipelago_container::DockerRuntime::new(user.clone()))
}
ContainerRuntime::Auto => {
Arc::new(
archipelago_container::AutoRuntime::new(user.clone())
.await?
)
}
};
Ok(DockerPackageScanner::new(runtime))
}
async fn refresh_tor_address(state: &StateManager, identity: &NodeIdentity) -> Result<()> {
let tor_addr = docker_packages::read_tor_address("archipelago");
let (current_data, _) = state.get_snapshot().await;
if tor_addr != current_data.server_info.tor_address {
let mut data = current_data;
data.server_info.tor_address = tor_addr.clone();
data.server_info.node_address = tor_addr.as_ref().map(|t| identity.node_address(t));
state.update_data(data).await;
if let Some(ref addr) = tor_addr {
info!("🔒 Tor address updated: {}", addr);
}
}
Ok(())
}
async fn scan_and_update_packages(
scanner: &DockerPackageScanner,
state: &StateManager,
identity: &NodeIdentity,
) -> Result<()> {
let packages = scanner.scan_containers().await?;
let (current_data, _) = state.get_snapshot().await;
let packages_changed = !packages.is_empty() && current_data.package_data != packages;
let tor_addr = docker_packages::read_tor_address("archipelago");
let tor_changed = tor_addr != current_data.server_info.tor_address;
if packages_changed || tor_changed {
let mut data = current_data;
if !packages.is_empty() {
data.package_data = packages;
}
data.server_info.tor_address = tor_addr.clone();
data.server_info.node_address = tor_addr.as_ref().map(|t| identity.node_address(t));
state.update_data(data).await;
debug!("📦 State changed (packages={}, tor={}), broadcasting update", packages_changed, tor_changed);
}
Ok(())
}
/// Periodically check peer reachability and broadcast status changes.
async fn check_peer_health(state: &StateManager, data_dir: &std::path::Path) -> Result<()> {
let known_peers = peers::load_peers(data_dir).await.unwrap_or_default();
if known_peers.is_empty() {
return Ok(());
}
let mut new_health = std::collections::HashMap::new();
for peer in &known_peers {
let reachable = node_message::check_peer_reachable(&peer.onion)
.await
.unwrap_or(false);
new_health.insert(peer.onion.clone(), reachable);
}
let (current_data, _) = state.get_snapshot().await;
if current_data.peer_health != new_health {
let mut data = current_data;
data.peer_health = new_health;
state.update_data(data).await;
debug!("🔗 Peer health updated, broadcasting changes");
}
Ok(())
}