RestrictNamespaces and SystemCallFilter block rootless podman from creating user namespaces needed for container isolation. Removed these along with RestrictSUIDSGID (implied by NoNewPrivileges). ProtectHome set to no (rootless podman needs ~/.local/share/containers writable). Remaining active protections: NoNewPrivileges, ProtectSystem=strict, ReadWritePaths, RestrictAddressFamilies, MemoryDenyWriteExecute, RestrictRealtime, SystemCallArchitectures=native. Also reduced initial scan delay from 15s to 3s for faster container visibility after boot, and removed Ollama from auto-deploy. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
500 lines
21 KiB
Rust
500 lines
21 KiB
Rust
use crate::api::ApiHandler;
|
|
use crate::config::{Config, ContainerRuntime};
|
|
use crate::container::{docker_packages, DockerPackageScanner};
|
|
use crate::identity::{self, NodeIdentity};
|
|
use crate::monitoring::MetricsStore;
|
|
use crate::node_message;
|
|
use crate::nostr_discovery;
|
|
use crate::nostr_handshake;
|
|
use crate::peers;
|
|
use crate::state::StateManager;
|
|
use anyhow::Result;
|
|
use hyper::server::conn::Http;
|
|
use hyper::service::service_fn;
|
|
use std::net::SocketAddr;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use tokio::net::TcpListener;
|
|
use tracing::{debug, error, info, warn};
|
|
|
|
pub struct Server {
|
|
_config: Config,
|
|
_identity: Arc<NodeIdentity>,
|
|
api_handler: Arc<ApiHandler>,
|
|
_state_manager: Arc<StateManager>,
|
|
}
|
|
|
|
impl Server {
|
|
pub async fn new(config: Config) -> Result<Self> {
|
|
let state_manager = Arc::new(StateManager::new());
|
|
|
|
// Load node identity and set stable server_info
|
|
let identity_dir = config.data_dir.join("identity");
|
|
let identity = NodeIdentity::load_or_create(&identity_dir).await?;
|
|
let (mut data, _) = state_manager.get_snapshot().await;
|
|
data.server_info.id = identity.node_id();
|
|
data.server_info.pubkey = identity.pubkey_hex();
|
|
// Load persisted server name
|
|
let name_file = config.data_dir.join("server-name");
|
|
if let Ok(name) = tokio::fs::read_to_string(&name_file).await {
|
|
let name = name.trim().to_string();
|
|
if !name.is_empty() {
|
|
data.server_info.name = Some(name);
|
|
}
|
|
}
|
|
data.server_info.tor_address = docker_packages::read_tor_address("archipelago");
|
|
if let Some(ref tor) = data.server_info.tor_address {
|
|
data.server_info.node_address = Some(identity.node_address(tor));
|
|
}
|
|
state_manager.update_data(data.clone()).await;
|
|
|
|
// Auto-create default identity if none exist (fresh boot or factory reset)
|
|
{
|
|
let im = crate::identity_manager::IdentityManager::new(&config.data_dir).await;
|
|
if let Ok(mgr) = im {
|
|
if let Ok((list, _)) = mgr.list().await {
|
|
if list.is_empty() {
|
|
match mgr.create("Default".to_string(), crate::identity_manager::IdentityPurpose::Personal).await {
|
|
Ok(record) => {
|
|
let _ = mgr.create_nostr_key(&record.id).await;
|
|
tracing::info!(did = %record.did, "Auto-created default identity with Nostr key");
|
|
}
|
|
Err(e) => tracing::debug!("Auto-identity creation (non-fatal): {}", e),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Revoke any previously published Nostr data (runs before publish so revocation is not overwritten)
|
|
let identity_dir = config.data_dir.join("identity");
|
|
let tor_proxy_revoke = config.nostr_tor_proxy.clone();
|
|
if let Err(e) = nostr_discovery::revoke_if_needed(&identity_dir, tor_proxy_revoke.as_deref()).await {
|
|
tracing::debug!("Nostr revoke (non-fatal): {}", e);
|
|
}
|
|
|
|
// Publish presence-only to Nostr (DID + Nostr pubkey, NO onion address).
|
|
// Onion addresses are exchanged privately via NIP-44 encrypted DMs.
|
|
if config.nostr_discovery_enabled
|
|
&& !config.nostr_relays.is_empty()
|
|
{
|
|
let identity_dir = config.data_dir.join("identity");
|
|
let did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey).unwrap_or_default();
|
|
let version = data.server_info.version.clone();
|
|
let relays = config.nostr_relays.clone();
|
|
let tor_proxy = config.nostr_tor_proxy.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = nostr_handshake::publish_presence(
|
|
&identity_dir,
|
|
&did,
|
|
&version,
|
|
&relays,
|
|
tor_proxy.as_deref(),
|
|
)
|
|
.await
|
|
{
|
|
tracing::debug!("Nostr presence publish (non-fatal): {}", e);
|
|
}
|
|
});
|
|
}
|
|
info!("🔑 Node identity: {} (pubkey: {}...)", identity.node_id(), &identity.pubkey_hex()[..16.min(identity.pubkey_hex().len())]);
|
|
|
|
let identity = Arc::new(identity);
|
|
|
|
// Create metrics store and spawn background collector
|
|
let metrics_store = Arc::new(MetricsStore::with_data_dir(config.data_dir.clone()));
|
|
crate::monitoring::spawn_metrics_collector(metrics_store.clone(), Some(state_manager.clone()), Some(config.data_dir.clone()));
|
|
|
|
let api_handler = Arc::new(
|
|
ApiHandler::new(config.clone(), state_manager.clone(), metrics_store).await?,
|
|
);
|
|
|
|
// Initialize mesh networking service (if config has enabled: true)
|
|
{
|
|
let data_dir = config.data_dir.clone();
|
|
let did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey)
|
|
.unwrap_or_default();
|
|
let pubkey_hex = identity.pubkey_hex();
|
|
let signing_key = identity.signing_key();
|
|
match crate::mesh::MeshService::new(&data_dir, signing_key, &did, &pubkey_hex).await {
|
|
Ok(mut mesh_service) => {
|
|
let mesh_config = crate::mesh::load_config(&data_dir).await.unwrap_or_default();
|
|
if mesh_config.enabled {
|
|
if let Err(e) = mesh_service.start() {
|
|
warn!("Mesh service start failed (non-fatal): {}", e);
|
|
} else {
|
|
info!("📡 Mesh networking started");
|
|
}
|
|
}
|
|
api_handler.rpc_handler().set_mesh_service(mesh_service).await;
|
|
info!("📡 Mesh service initialized");
|
|
}
|
|
Err(e) => {
|
|
warn!("Mesh service init failed (non-fatal): {}", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Initialize transport router (unified routing: mesh > lan > tor)
|
|
{
|
|
let data_dir = config.data_dir.clone();
|
|
let did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey)
|
|
.unwrap_or_default();
|
|
let pubkey_hex = identity.pubkey_hex();
|
|
let mesh_config = crate::mesh::load_config(&data_dir).await.unwrap_or_default();
|
|
let mesh_only = mesh_config.mesh_only_mode.unwrap_or(false);
|
|
|
|
match crate::transport::PeerRegistry::load(&data_dir).await {
|
|
Ok(registry) => {
|
|
let registry = std::sync::Arc::new(registry);
|
|
let mut transports: Vec<Box<dyn crate::transport::NodeTransport>> = Vec::new();
|
|
|
|
// Tor transport (always register — availability checked dynamically)
|
|
transports.push(Box::new(
|
|
crate::transport::tor::TorTransport::new(&pubkey_hex),
|
|
));
|
|
|
|
// Mesh transport (wraps the mesh service)
|
|
transports.push(Box::new(
|
|
crate::transport::mesh_transport::MeshTransport::new(
|
|
api_handler.rpc_handler().mesh_service_arc(),
|
|
),
|
|
));
|
|
|
|
// LAN transport (mDNS discovery)
|
|
let mut lan = crate::transport::lan::LanTransport::new(&did, &pubkey_hex, 5678);
|
|
match lan.start(registry.clone()) {
|
|
Ok(()) => info!("📡 LAN transport (mDNS) started"),
|
|
Err(e) => debug!("LAN transport init (non-fatal): {}", e),
|
|
}
|
|
transports.push(Box::new(lan));
|
|
|
|
let router = std::sync::Arc::new(crate::transport::TransportRouter::new(
|
|
transports,
|
|
registry,
|
|
mesh_only,
|
|
));
|
|
api_handler.rpc_handler().set_transport_router(router).await;
|
|
info!("📡 Transport router initialized (mesh_only={})", mesh_only);
|
|
}
|
|
Err(e) => {
|
|
warn!("Transport router init failed (non-fatal): {}", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Register Archipelago DWN protocols (background, non-blocking)
|
|
{
|
|
let data_dir = config.data_dir.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = register_dwn_protocols(&data_dir).await {
|
|
debug!("DWN protocol registration (non-fatal): {}", e);
|
|
}
|
|
});
|
|
}
|
|
|
|
// Periodic Tor address refresh (runs regardless of dev_mode)
|
|
// Picks up hostname when Tor creates it after startup/rotation (30-60s delay)
|
|
{
|
|
let state = state_manager.clone();
|
|
let identity_clone = identity.clone();
|
|
tokio::spawn(async move {
|
|
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
|
loop {
|
|
interval.tick().await;
|
|
if let Err(e) = refresh_tor_address(&state, identity_clone.as_ref()).await {
|
|
debug!("Tor address refresh (non-fatal): {}", e);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
// Initialize container scanner — discovers installed apps from Podman/Docker
|
|
{
|
|
let scanner = create_docker_scanner(&config).await?;
|
|
let state = state_manager.clone();
|
|
let identity_clone = identity.clone();
|
|
|
|
// Initial scan (delayed to let crash recovery finish first)
|
|
tokio::spawn(async move {
|
|
// Brief delay for containers to stabilize after boot
|
|
tokio::time::sleep(Duration::from_secs(3)).await;
|
|
info!("🐳 Scanning containers...");
|
|
if let Err(e) = scan_and_update_packages(&scanner, &state, identity_clone.as_ref()).await {
|
|
error!("Failed to scan containers: {}", e);
|
|
}
|
|
|
|
// Periodic scan every 30 seconds (only broadcasts if state changed)
|
|
// Uses an in-flight guard to skip scans when a previous one is still running
|
|
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
|
let scanning = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
|
|
loop {
|
|
interval.tick().await;
|
|
if scanning.load(std::sync::atomic::Ordering::Relaxed) {
|
|
debug!("Skipping container scan — previous scan still in progress");
|
|
continue;
|
|
}
|
|
scanning.store(true, std::sync::atomic::Ordering::Relaxed);
|
|
if let Err(e) = scan_and_update_packages(&scanner, &state, identity_clone.as_ref()).await {
|
|
error!("Failed to update containers: {}", e);
|
|
}
|
|
scanning.store(false, std::sync::atomic::Ordering::Relaxed);
|
|
}
|
|
});
|
|
}
|
|
|
|
// Peer health monitoring — check every 5 minutes
|
|
{
|
|
let state = state_manager.clone();
|
|
let data_dir = config.data_dir.clone();
|
|
tokio::spawn(async move {
|
|
let mut interval = tokio::time::interval(Duration::from_secs(300));
|
|
loop {
|
|
interval.tick().await;
|
|
if let Err(e) = check_peer_health(&state, &data_dir).await {
|
|
debug!("Peer health check (non-fatal): {}", e);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
// did:dht auto-refresh — re-publish DHT records every 2 hours
|
|
if config.nostr_discovery_enabled {
|
|
let data_dir = config.data_dir.clone();
|
|
tokio::spawn(async move {
|
|
let mut interval = tokio::time::interval(Duration::from_secs(7200));
|
|
loop {
|
|
interval.tick().await;
|
|
let identity_dir = data_dir.join("identity");
|
|
let node_key_path = identity_dir.join("node_key");
|
|
if !node_key_path.exists() {
|
|
continue;
|
|
}
|
|
match tokio::fs::read(&node_key_path).await {
|
|
Ok(key_bytes) if key_bytes.len() == 32 => {
|
|
let mut seed = [0u8; 32];
|
|
seed.copy_from_slice(&key_bytes);
|
|
let signing_key = ed25519_dalek::SigningKey::from_bytes(&seed);
|
|
match crate::network::did_dht::create_and_publish(&signing_key, &[]).await {
|
|
Ok(did) => tracing::info!(did = %did, "did:dht record refreshed"),
|
|
Err(e) => tracing::debug!("did:dht refresh (non-fatal): {}", e),
|
|
}
|
|
}
|
|
_ => {
|
|
tracing::debug!("did:dht refresh skipped: no valid node key");
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
// Container health monitoring — auto-restart unhealthy containers
|
|
// Respects webhook config: skips when disabled or ContainerCrash not subscribed
|
|
crate::health_monitor::spawn_health_monitor(state_manager.clone(), config.data_dir.clone());
|
|
|
|
Ok(Self {
|
|
_config: config,
|
|
_identity: identity,
|
|
api_handler,
|
|
_state_manager: state_manager,
|
|
})
|
|
}
|
|
|
|
pub async fn serve(&self, addr: SocketAddr) -> Result<()> {
|
|
self.serve_with_shutdown(addr, std::future::pending()).await
|
|
}
|
|
|
|
/// Serve with a graceful shutdown signal.
|
|
/// When the shutdown future completes, stop accepting new connections and drain in-flight requests.
|
|
pub async fn serve_with_shutdown(
|
|
&self,
|
|
addr: SocketAddr,
|
|
shutdown: impl std::future::Future<Output = ()>,
|
|
) -> Result<()> {
|
|
let listener = TcpListener::bind(addr).await?;
|
|
let active_connections = Arc::new(tokio::sync::Semaphore::new(1024));
|
|
|
|
tokio::pin!(shutdown);
|
|
|
|
loop {
|
|
tokio::select! {
|
|
result = listener.accept() => {
|
|
let (stream, peer_addr) = match result {
|
|
Ok(conn) => conn,
|
|
Err(e) => {
|
|
error!("Failed to accept connection: {}", e);
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let handler = self.api_handler.clone();
|
|
let permit = active_connections.clone().acquire_owned().await;
|
|
|
|
tokio::spawn(async move {
|
|
let _permit = permit;
|
|
let service = service_fn(move |req| {
|
|
let handler = handler.clone();
|
|
async move {
|
|
handler.handle_request(req).await
|
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))
|
|
}
|
|
});
|
|
|
|
if let Err(e) = Http::new()
|
|
.http1_keep_alive(false)
|
|
.serve_connection(stream, service)
|
|
.with_upgrades()
|
|
.await
|
|
{
|
|
error!("Error serving connection from {}: {}", peer_addr, e);
|
|
}
|
|
});
|
|
}
|
|
_ = &mut shutdown => {
|
|
info!("Shutdown signal received, draining connections...");
|
|
// Wait up to 5 seconds for in-flight requests to complete
|
|
let drain_start = std::time::Instant::now();
|
|
let drain_timeout = std::time::Duration::from_secs(5);
|
|
while active_connections.available_permits() < 1024 {
|
|
if drain_start.elapsed() > drain_timeout {
|
|
warn!("Drain timeout reached, forcing shutdown");
|
|
break;
|
|
}
|
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
|
}
|
|
info!("Shutdown complete");
|
|
return Ok(());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn create_docker_scanner(config: &Config) -> Result<DockerPackageScanner> {
|
|
let user = std::env::var("USER").unwrap_or_else(|_| "archipelago".to_string());
|
|
|
|
let runtime: Arc<dyn archipelago_container::ContainerRuntime> = match &config.container_runtime {
|
|
ContainerRuntime::Podman => {
|
|
Arc::new(archipelago_container::PodmanRuntime::new(user.clone()))
|
|
}
|
|
ContainerRuntime::Docker => {
|
|
Arc::new(archipelago_container::DockerRuntime::new(user.clone()))
|
|
}
|
|
ContainerRuntime::Auto => {
|
|
Arc::new(
|
|
archipelago_container::AutoRuntime::new(user.clone())
|
|
.await?
|
|
)
|
|
}
|
|
};
|
|
|
|
Ok(DockerPackageScanner::new(runtime))
|
|
}
|
|
|
|
async fn refresh_tor_address(state: &StateManager, identity: &NodeIdentity) -> Result<()> {
|
|
let tor_addr = docker_packages::read_tor_address("archipelago");
|
|
let (current_data, _) = state.get_snapshot().await;
|
|
if tor_addr != current_data.server_info.tor_address {
|
|
let mut data = current_data;
|
|
data.server_info.tor_address = tor_addr.clone();
|
|
data.server_info.node_address = tor_addr.as_ref().map(|t| identity.node_address(t));
|
|
state.update_data(data).await;
|
|
if let Some(ref addr) = tor_addr {
|
|
info!("🔒 Tor address updated: {}", addr);
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn scan_and_update_packages(
|
|
scanner: &DockerPackageScanner,
|
|
state: &StateManager,
|
|
identity: &NodeIdentity,
|
|
) -> Result<()> {
|
|
let packages = scanner.scan_containers().await?;
|
|
|
|
let (current_data, _) = state.get_snapshot().await;
|
|
let packages_changed = !packages.is_empty() && current_data.package_data != packages;
|
|
let tor_addr = docker_packages::read_tor_address("archipelago");
|
|
let tor_changed = tor_addr != current_data.server_info.tor_address;
|
|
let first_scan = !current_data.server_info.status_info.containers_scanned;
|
|
|
|
if packages_changed || tor_changed || first_scan {
|
|
let mut data = current_data;
|
|
if !packages.is_empty() {
|
|
data.package_data = packages;
|
|
}
|
|
data.server_info.tor_address = tor_addr.clone();
|
|
data.server_info.node_address = tor_addr.as_ref().map(|t| identity.node_address(t));
|
|
data.server_info.status_info.containers_scanned = true;
|
|
state.update_data(data).await;
|
|
debug!("📦 State changed (packages={}, tor={}, first_scan={}), broadcasting update", packages_changed, tor_changed, first_scan);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Register Archipelago DWN protocols on startup.
|
|
async fn register_dwn_protocols(data_dir: &std::path::Path) -> Result<()> {
|
|
use crate::network::dwn_store::{DwnStore, ProtocolDefinition};
|
|
|
|
let protocols = [
|
|
("https://archipelago.dev/protocols/node-identity/v1", true),
|
|
("https://archipelago.dev/protocols/file-catalog/v1", true),
|
|
("https://archipelago.dev/protocols/federation/v1", false),
|
|
("https://archipelago.dev/protocols/app-deploy/v1", false),
|
|
];
|
|
|
|
let store = DwnStore::new(data_dir).await?;
|
|
let existing = store.list_protocols().await?;
|
|
let existing_uris: std::collections::HashSet<String> =
|
|
existing.iter().map(|p| p.protocol.clone()).collect();
|
|
|
|
let mut registered = 0;
|
|
for (uri, published) in &protocols {
|
|
if existing_uris.contains(*uri) {
|
|
continue;
|
|
}
|
|
let def = ProtocolDefinition {
|
|
protocol: uri.to_string(),
|
|
published: *published,
|
|
types: std::collections::HashMap::new(),
|
|
structure: std::collections::HashMap::new(),
|
|
date_registered: chrono::Utc::now().to_rfc3339(),
|
|
};
|
|
store.register_protocol(&def).await?;
|
|
registered += 1;
|
|
}
|
|
|
|
if registered > 0 {
|
|
info!("📋 Registered {registered} DWN protocols");
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Periodically check peer reachability and broadcast status changes.
|
|
async fn check_peer_health(state: &StateManager, data_dir: &std::path::Path) -> Result<()> {
|
|
let known_peers = peers::load_peers(data_dir).await.unwrap_or_default();
|
|
if known_peers.is_empty() {
|
|
return Ok(());
|
|
}
|
|
|
|
let mut new_health = std::collections::HashMap::new();
|
|
for peer in &known_peers {
|
|
let reachable = node_message::check_peer_reachable(&peer.onion)
|
|
.await
|
|
.unwrap_or(false);
|
|
new_health.insert(peer.onion.clone(), reachable);
|
|
}
|
|
|
|
let (current_data, _) = state.get_snapshot().await;
|
|
if current_data.peer_health != new_health {
|
|
let mut data = current_data;
|
|
data.peer_health = new_health;
|
|
state.update_data(data).await;
|
|
debug!("🔗 Peer health updated, broadcasting changes");
|
|
}
|
|
|
|
Ok(())
|
|
}
|