feat(container): wire ProdContainerOrchestrator + BootReconciler into main

Step 6 of the rust-orchestrator migration. Construct the container
orchestrator once in main.rs, call load_manifests + adopt_existing
immediately after Config::load, log the adoption report, and spawn
BootReconciler::run_forever with the 30s default interval. Thread the
orchestrator through Server::new -> ApiHandler::new -> RpcHandler::new
so the reconciler and RPC layer share one instance.

Wire a tokio::sync::Notify through the SIGTERM/SIGINT shutdown path so
the reconciler exits cleanly alongside the server drain. Uses notify_one
so the signal stores a permit if the reconciler is mid reconcile_all
when the signal fires.

Delete the commented-out run_boot_reconciliation block in main.rs that
documented the prior bash-script approach being unsafe on unbundled
installs — the new reconciler is manifest-driven and only touches apps
present in /opt/archipelago/apps, fixing that concern.

cargo check -p archipelago clean (6 pre-existing dead-code warnings on
trait methods not yet exercised until Step 9 hot-swap). Container test
suite 43/44 pass; the one failure (container::image_versions::
test_parse_image_versions) is pre-existing and unrelated.
This commit is contained in:
archipelago 2026-04-22 19:20:13 -04:00
parent 81c1613040
commit 6a0809d386
4 changed files with 94 additions and 29 deletions

View File

@ -10,6 +10,7 @@ mod websocket;
use crate::api::rpc::RpcHandler;
use crate::blobs::BlobStore;
use crate::config::Config;
use crate::container::{ContainerOrchestrator, DevContainerOrchestrator};
use crate::monitoring::MetricsStore;
use crate::session::{self, SessionStore};
use crate::state::StateManager;
@ -54,6 +55,8 @@ impl ApiHandler {
config: Config,
state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>,
orchestrator: Option<Arc<dyn ContainerOrchestrator>>,
dev_orchestrator: Option<Arc<DevContainerOrchestrator>>,
) -> Result<Self> {
let session_store = SessionStore::new().await;
let rpc_handler = Arc::new(
@ -62,6 +65,8 @@ impl ApiHandler {
state_manager.clone(),
metrics_store.clone(),
session_store.clone(),
orchestrator,
dev_orchestrator,
)
.await?,
);

View File

@ -39,7 +39,7 @@ mod webhooks;
use crate::auth::AuthManager;
use crate::config::Config;
use crate::container::{ContainerOrchestrator, DevContainerOrchestrator, ProdContainerOrchestrator};
use crate::container::{ContainerOrchestrator, DevContainerOrchestrator};
use crate::monitoring::MetricsStore;
use crate::port_allocator::PortAllocator;
use crate::rate_limit::{EndpointRateLimiter, LoginRateLimiter};
@ -94,25 +94,10 @@ impl RpcHandler {
state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>,
session_store: SessionStore,
orchestrator: Option<Arc<dyn ContainerOrchestrator>>,
dev_orchestrator: Option<Arc<DevContainerOrchestrator>>,
) -> Result<Self> {
let auth_manager = AuthManager::new(config.data_dir.clone());
let (orchestrator, dev_orchestrator): (
Option<Arc<dyn ContainerOrchestrator>>,
Option<Arc<DevContainerOrchestrator>>,
) = if config.dev_mode {
let dev = Arc::new(DevContainerOrchestrator::new(config.clone()).await?);
let trait_obj: Arc<dyn ContainerOrchestrator> = dev.clone();
(Some(trait_obj), Some(dev))
} else {
let prod = Arc::new(ProdContainerOrchestrator::new(config.clone()).await?);
// Best-effort manifest load; a missing /opt/archipelago/apps is
// logged by load_manifests and not fatal.
if let Err(e) = prod.load_manifests().await {
tracing::error!(error = %e, "prod orchestrator: load_manifests failed at startup");
}
let trait_obj: Arc<dyn ContainerOrchestrator> = prod;
(Some(trait_obj), None)
};
let port_allocator = Arc::new(tokio::sync::Mutex::new(
PortAllocator::new(&config.data_dir).await?,
));

View File

@ -19,7 +19,9 @@
use anyhow::{Context, Result};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::signal;
use tokio::sync::Notify;
use tracing::info;
mod api;
@ -69,6 +71,10 @@ mod wallet;
mod webhooks;
use config::Config;
use container::{
BootReconciler, ContainerOrchestrator, DevContainerOrchestrator, ProdContainerOrchestrator,
RECONCILER_DEFAULT_INTERVAL,
};
use server::Server;
#[tokio::main]
@ -168,15 +174,65 @@ async fn main() -> Result<()> {
// Signal to health monitor that boot recovery is done
crash_recovery::mark_recovery_complete();
// Boot reconciliation disabled — the reconciler creates ALL containers
// from specs, which is wrong on unbundled installs where only user-chosen
// apps should exist. The health monitor handles restarting existing
// containers. Run reconcile-containers.sh manually when needed.
// crash_recovery::run_boot_reconciliation().await;
});
}
// Construct the container orchestrator once. In prod mode we load the
// on-disk app manifests, do an initial adoption pass, and spawn the
// BootReconciler loop (Step 5/6 of the rust-orchestrator migration).
// Dev mode uses the in-memory DevContainerOrchestrator and has no
// reconciler (manifests are pushed via RPC, not discovered from disk).
let shutdown_notify = Arc::new(Notify::new());
let (orchestrator, dev_orchestrator): (
Option<Arc<dyn ContainerOrchestrator>>,
Option<Arc<DevContainerOrchestrator>>,
) = if config.dev_mode {
let dev = Arc::new(DevContainerOrchestrator::new(config.clone()).await?);
let trait_obj: Arc<dyn ContainerOrchestrator> = dev.clone();
(Some(trait_obj), Some(dev))
} else {
let prod = Arc::new(ProdContainerOrchestrator::new(config.clone()).await?);
// Best-effort manifest load; a missing /opt/archipelago/apps is
// logged inside load_manifests and not fatal.
match prod.load_manifests().await {
Ok(n) => info!("📦 Loaded {n} app manifest(s) from disk"),
Err(e) => {
tracing::error!(error = %e, "prod orchestrator: load_manifests failed at startup");
}
}
// Adoption pass: link existing podman containers back to their
// manifests so the reconciler doesn't recreate them.
match prod.adopt_existing().await {
Ok(report) => {
info!(
"🔗 Adopted {} existing container(s): {:?}",
report.adopted.len(),
report.adopted
);
}
Err(e) => {
tracing::warn!(error = %e, "prod orchestrator: adopt_existing failed (non-fatal)");
}
}
// Spawn the boot reconciler loop. Runs an initial reconcile
// immediately, then re-checks every RECONCILER_DEFAULT_INTERVAL
// until shutdown_notify fires.
{
let reconciler = BootReconciler::new(
prod.clone(),
RECONCILER_DEFAULT_INTERVAL,
shutdown_notify.clone(),
);
tokio::spawn(reconciler.run_forever());
info!(
"🔄 Boot reconciler started (interval: {:?})",
RECONCILER_DEFAULT_INTERVAL
);
}
let trait_obj: Arc<dyn ContainerOrchestrator> = prod;
(Some(trait_obj), None)
};
// Ensure a default user exists so login works after install/onboarding.
// In production, the default password is "password123" (shown during install).
// In dev mode, the dev default password is used.
@ -185,7 +241,7 @@ async fn main() -> Result<()> {
// "Create Password" form instead of login form.
// Create server
let server = Server::new(config.clone()).await?;
let server = Server::new(config.clone(), orchestrator, dev_orchestrator).await?;
// Start server
let addr: SocketAddr = format!("{}:{}", config.bind_host, config.bind_port)
@ -261,6 +317,7 @@ async fn main() -> Result<()> {
// Graceful shutdown: wait for SIGTERM or SIGINT
let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
.context("Failed to register SIGTERM handler")?;
let shutdown_notify_for_signal = shutdown_notify.clone();
let shutdown = async move {
tokio::select! {
_ = signal::ctrl_c() => {
@ -270,6 +327,10 @@ async fn main() -> Result<()> {
info!("Received SIGTERM, initiating graceful shutdown...");
}
}
// Signal the boot reconciler (and any other subscribers) to stop.
// `notify_one` stores a permit if no task is currently parked on
// `notified()`, so we don't race the reconciler's reconcile_all pass.
shutdown_notify_for_signal.notify_one();
};
server.serve_with_shutdown(addr, shutdown).await?;

View File

@ -1,6 +1,8 @@
use crate::api::ApiHandler;
use crate::config::{Config, ContainerRuntime};
use crate::container::{docker_packages, DockerPackageScanner};
use crate::container::{
docker_packages, ContainerOrchestrator, DevContainerOrchestrator, DockerPackageScanner,
};
use crate::identity::{self, NodeIdentity};
use crate::monitoring::MetricsStore;
use crate::node_message;
@ -26,7 +28,11 @@ pub struct Server {
}
impl Server {
pub async fn new(config: Config) -> Result<Self> {
pub async fn new(
config: Config,
orchestrator: Option<Arc<dyn ContainerOrchestrator>>,
dev_orchestrator: Option<Arc<DevContainerOrchestrator>>,
) -> Result<Self> {
let state_manager = Arc::new(StateManager::new());
// Load node identity and set stable server_info.
@ -172,8 +178,16 @@ impl Server {
Some(config.data_dir.clone()),
);
let api_handler =
Arc::new(ApiHandler::new(config.clone(), state_manager.clone(), metrics_store).await?);
let api_handler = Arc::new(
ApiHandler::new(
config.clone(),
state_manager.clone(),
metrics_store,
orchestrator,
dev_orchestrator,
)
.await?,
);
// Initialize mesh networking service (if config has enabled: true)
{