diff --git a/core/archipelago/src/api/handler/mod.rs b/core/archipelago/src/api/handler/mod.rs index 56d6da11..17318366 100644 --- a/core/archipelago/src/api/handler/mod.rs +++ b/core/archipelago/src/api/handler/mod.rs @@ -10,6 +10,7 @@ mod websocket; use crate::api::rpc::RpcHandler; use crate::blobs::BlobStore; use crate::config::Config; +use crate::container::{ContainerOrchestrator, DevContainerOrchestrator}; use crate::monitoring::MetricsStore; use crate::session::{self, SessionStore}; use crate::state::StateManager; @@ -54,6 +55,8 @@ impl ApiHandler { config: Config, state_manager: Arc, metrics_store: Arc, + orchestrator: Option>, + dev_orchestrator: Option>, ) -> Result { let session_store = SessionStore::new().await; let rpc_handler = Arc::new( @@ -62,6 +65,8 @@ impl ApiHandler { state_manager.clone(), metrics_store.clone(), session_store.clone(), + orchestrator, + dev_orchestrator, ) .await?, ); diff --git a/core/archipelago/src/api/rpc/mod.rs b/core/archipelago/src/api/rpc/mod.rs index 5e9e7c0c..903016e9 100644 --- a/core/archipelago/src/api/rpc/mod.rs +++ b/core/archipelago/src/api/rpc/mod.rs @@ -39,7 +39,7 @@ mod webhooks; use crate::auth::AuthManager; use crate::config::Config; -use crate::container::{ContainerOrchestrator, DevContainerOrchestrator, ProdContainerOrchestrator}; +use crate::container::{ContainerOrchestrator, DevContainerOrchestrator}; use crate::monitoring::MetricsStore; use crate::port_allocator::PortAllocator; use crate::rate_limit::{EndpointRateLimiter, LoginRateLimiter}; @@ -94,25 +94,10 @@ impl RpcHandler { state_manager: Arc, metrics_store: Arc, session_store: SessionStore, + orchestrator: Option>, + dev_orchestrator: Option>, ) -> Result { let auth_manager = AuthManager::new(config.data_dir.clone()); - let (orchestrator, dev_orchestrator): ( - Option>, - Option>, - ) = if config.dev_mode { - let dev = Arc::new(DevContainerOrchestrator::new(config.clone()).await?); - let trait_obj: Arc = dev.clone(); - (Some(trait_obj), Some(dev)) - } else { - let prod = Arc::new(ProdContainerOrchestrator::new(config.clone()).await?); - // Best-effort manifest load; a missing /opt/archipelago/apps is - // logged by load_manifests and not fatal. - if let Err(e) = prod.load_manifests().await { - tracing::error!(error = %e, "prod orchestrator: load_manifests failed at startup"); - } - let trait_obj: Arc = prod; - (Some(trait_obj), None) - }; let port_allocator = Arc::new(tokio::sync::Mutex::new( PortAllocator::new(&config.data_dir).await?, )); diff --git a/core/archipelago/src/main.rs b/core/archipelago/src/main.rs index a6119b31..8abc008b 100644 --- a/core/archipelago/src/main.rs +++ b/core/archipelago/src/main.rs @@ -19,7 +19,9 @@ use anyhow::{Context, Result}; use std::net::SocketAddr; +use std::sync::Arc; use tokio::signal; +use tokio::sync::Notify; use tracing::info; mod api; @@ -69,6 +71,10 @@ mod wallet; mod webhooks; use config::Config; +use container::{ + BootReconciler, ContainerOrchestrator, DevContainerOrchestrator, ProdContainerOrchestrator, + RECONCILER_DEFAULT_INTERVAL, +}; use server::Server; #[tokio::main] @@ -168,15 +174,65 @@ async fn main() -> Result<()> { // Signal to health monitor that boot recovery is done crash_recovery::mark_recovery_complete(); - - // Boot reconciliation disabled — the reconciler creates ALL containers - // from specs, which is wrong on unbundled installs where only user-chosen - // apps should exist. The health monitor handles restarting existing - // containers. Run reconcile-containers.sh manually when needed. - // crash_recovery::run_boot_reconciliation().await; }); } + // Construct the container orchestrator once. In prod mode we load the + // on-disk app manifests, do an initial adoption pass, and spawn the + // BootReconciler loop (Step 5/6 of the rust-orchestrator migration). + // Dev mode uses the in-memory DevContainerOrchestrator and has no + // reconciler (manifests are pushed via RPC, not discovered from disk). + let shutdown_notify = Arc::new(Notify::new()); + let (orchestrator, dev_orchestrator): ( + Option>, + Option>, + ) = if config.dev_mode { + let dev = Arc::new(DevContainerOrchestrator::new(config.clone()).await?); + let trait_obj: Arc = dev.clone(); + (Some(trait_obj), Some(dev)) + } else { + let prod = Arc::new(ProdContainerOrchestrator::new(config.clone()).await?); + // Best-effort manifest load; a missing /opt/archipelago/apps is + // logged inside load_manifests and not fatal. + match prod.load_manifests().await { + Ok(n) => info!("📦 Loaded {n} app manifest(s) from disk"), + Err(e) => { + tracing::error!(error = %e, "prod orchestrator: load_manifests failed at startup"); + } + } + // Adoption pass: link existing podman containers back to their + // manifests so the reconciler doesn't recreate them. + match prod.adopt_existing().await { + Ok(report) => { + info!( + "🔗 Adopted {} existing container(s): {:?}", + report.adopted.len(), + report.adopted + ); + } + Err(e) => { + tracing::warn!(error = %e, "prod orchestrator: adopt_existing failed (non-fatal)"); + } + } + // Spawn the boot reconciler loop. Runs an initial reconcile + // immediately, then re-checks every RECONCILER_DEFAULT_INTERVAL + // until shutdown_notify fires. + { + let reconciler = BootReconciler::new( + prod.clone(), + RECONCILER_DEFAULT_INTERVAL, + shutdown_notify.clone(), + ); + tokio::spawn(reconciler.run_forever()); + info!( + "🔄 Boot reconciler started (interval: {:?})", + RECONCILER_DEFAULT_INTERVAL + ); + } + let trait_obj: Arc = prod; + (Some(trait_obj), None) + }; + // Ensure a default user exists so login works after install/onboarding. // In production, the default password is "password123" (shown during install). // In dev mode, the dev default password is used. @@ -185,7 +241,7 @@ async fn main() -> Result<()> { // "Create Password" form instead of login form. // Create server - let server = Server::new(config.clone()).await?; + let server = Server::new(config.clone(), orchestrator, dev_orchestrator).await?; // Start server let addr: SocketAddr = format!("{}:{}", config.bind_host, config.bind_port) @@ -261,6 +317,7 @@ async fn main() -> Result<()> { // Graceful shutdown: wait for SIGTERM or SIGINT let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate()) .context("Failed to register SIGTERM handler")?; + let shutdown_notify_for_signal = shutdown_notify.clone(); let shutdown = async move { tokio::select! { _ = signal::ctrl_c() => { @@ -270,6 +327,10 @@ async fn main() -> Result<()> { info!("Received SIGTERM, initiating graceful shutdown..."); } } + // Signal the boot reconciler (and any other subscribers) to stop. + // `notify_one` stores a permit if no task is currently parked on + // `notified()`, so we don't race the reconciler's reconcile_all pass. + shutdown_notify_for_signal.notify_one(); }; server.serve_with_shutdown(addr, shutdown).await?; diff --git a/core/archipelago/src/server.rs b/core/archipelago/src/server.rs index f2bbd948..2d054ea2 100644 --- a/core/archipelago/src/server.rs +++ b/core/archipelago/src/server.rs @@ -1,6 +1,8 @@ use crate::api::ApiHandler; use crate::config::{Config, ContainerRuntime}; -use crate::container::{docker_packages, DockerPackageScanner}; +use crate::container::{ + docker_packages, ContainerOrchestrator, DevContainerOrchestrator, DockerPackageScanner, +}; use crate::identity::{self, NodeIdentity}; use crate::monitoring::MetricsStore; use crate::node_message; @@ -26,7 +28,11 @@ pub struct Server { } impl Server { - pub async fn new(config: Config) -> Result { + pub async fn new( + config: Config, + orchestrator: Option>, + dev_orchestrator: Option>, + ) -> Result { let state_manager = Arc::new(StateManager::new()); // Load node identity and set stable server_info. @@ -172,8 +178,16 @@ impl Server { Some(config.data_dir.clone()), ); - let api_handler = - Arc::new(ApiHandler::new(config.clone(), state_manager.clone(), metrics_store).await?); + let api_handler = Arc::new( + ApiHandler::new( + config.clone(), + state_manager.clone(), + metrics_store, + orchestrator, + dev_orchestrator, + ) + .await?, + ); // Initialize mesh networking service (if config has enabled: true) {