diff --git a/core/archipelago/src/container/boot_reconciler.rs b/core/archipelago/src/container/boot_reconciler.rs new file mode 100644 index 00000000..265df9d6 --- /dev/null +++ b/core/archipelago/src/container/boot_reconciler.rs @@ -0,0 +1,351 @@ +//! BootReconciler — the long-running task that keeps the prod orchestrator's +//! desired-state view in lockstep with what podman actually has. +//! +//! Step 5 of the rust-orchestrator migration. Spawned once from `main.rs` +//! (Step 6) after the initial `adopt_existing()` pass. Every `interval` it +//! calls `ProdContainerOrchestrator::reconcile_all()`, which ensures every +//! loaded manifest has a running container, installing fresh ones as needed. +//! +//! Per answered design Q3, `interval` defaults to 30 seconds. +//! +//! Shutdown is signalled via `Arc`. The reconciler finishes its +//! current `reconcile_all` call before exiting — we don't interrupt an +//! in-flight pull or build. +//! +//! See `docs/rust-orchestrator-migration.md` §269-352. + +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::Notify; +use tokio::time::{self, Instant}; + +use crate::container::prod_orchestrator::{ProdContainerOrchestrator, ReconcileReport}; + +/// Default reconciler cadence (answered design Q3). +pub const DEFAULT_INTERVAL: Duration = Duration::from_secs(30); + +pub struct BootReconciler { + orchestrator: Arc, + interval: Duration, + shutdown: Arc, +} + +impl BootReconciler { + pub fn new( + orchestrator: Arc, + interval: Duration, + shutdown: Arc, + ) -> Self { + Self { + orchestrator, + interval, + shutdown, + } + } + + /// Run the reconcile loop until `shutdown` is notified. + /// + /// Does one reconcile immediately, then sleeps `interval` between + /// subsequent passes. A `shutdown.notify_one()` call unblocks the sleep + /// and the task returns after the *next* pass completes. + /// + /// Never panics: per-app failures are absorbed into `ReconcileReport` by + /// the orchestrator, and `reconcile_all` itself returns infallibly. + pub async fn run_forever(self) { + // Initial pass: no delay. + let report = self.orchestrator.reconcile_all().await; + Self::log_report(&report); + + loop { + let deadline = Instant::now() + self.interval; + tokio::select! { + _ = time::sleep_until(deadline) => { + let report = self.orchestrator.reconcile_all().await; + Self::log_report(&report); + } + _ = self.shutdown.notified() => { + tracing::info!("boot reconciler: shutdown requested, exiting loop"); + break; + } + } + } + } + + fn log_report(report: &ReconcileReport) { + for (app_id, action) in &report.actions { + tracing::debug!(app_id = %app_id, action = ?action, "reconcile action"); + } + for (app_id, err) in &report.failures { + tracing::warn!(app_id = %app_id, error = %err, "reconcile failure"); + } + if report.failures.is_empty() { + tracing::debug!( + count = report.actions.len(), + "reconcile pass complete" + ); + } else { + tracing::warn!( + ok = report.actions.len(), + failed = report.failures.len(), + "reconcile pass completed with failures" + ); + } + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use crate::container::prod_orchestrator::ProdContainerOrchestrator; + use anyhow::Result; + use archipelago_container::{ + AppManifest, BuildConfig, ContainerRuntime as ContainerRuntimeTrait, ContainerState, + ContainerStatus, + }; + use async_trait::async_trait; + use std::collections::HashMap; + use std::path::PathBuf; + use std::sync::Mutex as StdMutex; + + /// Instrumented runtime that counts reconcile-loop side effects so tests + /// can tell exactly how many passes have fired. All containers are + /// reported as already Running so `reconcile_all` will NoOp — we are only + /// measuring loop cadence, not install behavior. + #[derive(Default)] + struct CountingRuntime { + /// Number of times get_container_status has been called. Each + /// reconcile_all pass hits this once per manifest, so with one + /// manifest this equals the number of reconcile passes. + status_calls: StdMutex, + running: StdMutex>, + } + + impl CountingRuntime { + fn new_with(names: &[&str]) -> Self { + let me = Self::default(); + let mut m = me.running.lock().unwrap(); + for n in names { + m.insert((*n).to_string(), ContainerState::Running); + } + drop(m); + me + } + fn status_call_count(&self) -> u32 { + *self.status_calls.lock().unwrap() + } + } + + #[async_trait] + impl ContainerRuntimeTrait for CountingRuntime { + async fn pull_image(&self, _: &str, _: Option<&str>) -> Result<()> { + Ok(()) + } + async fn create_container( + &self, + _: &AppManifest, + name: &str, + _: u16, + ) -> Result { + Ok(name.to_string()) + } + async fn start_container(&self, _: &str) -> Result<()> { + Ok(()) + } + async fn stop_container(&self, _: &str) -> Result<()> { + Ok(()) + } + async fn remove_container(&self, _: &str) -> Result<()> { + Ok(()) + } + async fn get_container_status(&self, name: &str) -> Result { + *self.status_calls.lock().unwrap() += 1; + let state = self + .running + .lock() + .unwrap() + .get(name) + .cloned() + .ok_or_else(|| anyhow::anyhow!("not found: {name}"))?; + Ok(ContainerStatus { + id: format!("id-{name}"), + name: name.to_string(), + state, + health: None, + exit_code: None, + started_at: None, + image: "test".into(), + created: "now".into(), + ports: vec![], + lan_address: None, + }) + } + async fn get_container_logs(&self, _: &str, _: u32) -> Result> { + Ok(vec![]) + } + async fn list_containers(&self) -> Result> { + Ok(vec![]) + } + async fn image_exists(&self, _: &str) -> Result { + Ok(true) + } + async fn build_image(&self, _: &BuildConfig) -> Result<()> { + Ok(()) + } + } + + fn pull_manifest(id: &str, image: &str) -> AppManifest { + let yaml = format!( + "app:\n id: {id}\n name: {id}\n version: 1.0.0\n container:\n image: {image}\n" + ); + AppManifest::parse(&yaml).unwrap() + } + + async fn orch_with_one_running_manifest( + rt: Arc, + ) -> Arc { + let orch = Arc::new(ProdContainerOrchestrator::with_runtime( + rt, + PathBuf::from("/nonexistent-for-tests"), + )); + orch.insert_manifest_for_test( + pull_manifest("bitcoin-knots", "docker.io/bitcoin/knots:28"), + PathBuf::from("/tmp/bk"), + ) + .await; + orch + } + + #[tokio::test(start_paused = true)] + async fn initial_pass_fires_immediately() { + let rt = Arc::new(CountingRuntime::new_with(&["bitcoin-knots"])); + let orch = orch_with_one_running_manifest(rt.clone()).await; + let shutdown = Arc::new(Notify::new()); + let reconciler = BootReconciler::new( + orch.clone(), + Duration::from_secs(30), + shutdown.clone(), + ); + let handle = tokio::spawn(reconciler.run_forever()); + + // Yield so the spawned task gets CPU to run its initial reconcile. + tokio::task::yield_now().await; + tokio::task::yield_now().await; + + // We expect exactly one reconcile pass to have run by now (the initial), + // NOT a second one (the 30s sleep hasn't elapsed in paused time). + assert_eq!(rt.status_call_count(), 1, "initial pass should fire once"); + + shutdown.notify_one(); + // Under paused clock the select! is blocked on sleep_until; the notify + // will unblock it. Advance wall-clock a hair so the notify gets polled. + tokio::task::yield_now().await; + let _ = tokio::time::timeout(Duration::from_secs(1), handle).await; + } + + #[tokio::test(start_paused = true)] + async fn second_pass_fires_after_interval() { + let rt = Arc::new(CountingRuntime::new_with(&["bitcoin-knots"])); + let orch = orch_with_one_running_manifest(rt.clone()).await; + let shutdown = Arc::new(Notify::new()); + let reconciler = BootReconciler::new( + orch.clone(), + Duration::from_secs(30), + shutdown.clone(), + ); + let handle = tokio::spawn(reconciler.run_forever()); + + tokio::task::yield_now().await; + tokio::task::yield_now().await; + assert_eq!(rt.status_call_count(), 1); + + // Fast-forward past one interval; the sleep_until should fire. + tokio::time::advance(Duration::from_secs(31)).await; + tokio::task::yield_now().await; + tokio::task::yield_now().await; + + assert_eq!( + rt.status_call_count(), + 2, + "a second reconcile pass should fire after one interval" + ); + + shutdown.notify_one(); + tokio::task::yield_now().await; + let _ = tokio::time::timeout(Duration::from_secs(1), handle).await; + } + + #[tokio::test(start_paused = true)] + async fn shutdown_terminates_loop() { + let rt = Arc::new(CountingRuntime::new_with(&["bitcoin-knots"])); + let orch = orch_with_one_running_manifest(rt.clone()).await; + let shutdown = Arc::new(Notify::new()); + let reconciler = BootReconciler::new( + orch.clone(), + Duration::from_secs(30), + shutdown.clone(), + ); + let handle = tokio::spawn(reconciler.run_forever()); + tokio::task::yield_now().await; + tokio::task::yield_now().await; + + shutdown.notify_one(); + // The select! should wake on Notified and return. Use a real timeout + // with advancing the paused clock to make sure the task exits. + tokio::time::advance(Duration::from_millis(10)).await; + let result = tokio::time::timeout(Duration::from_secs(5), handle).await; + assert!(result.is_ok(), "reconciler did not exit after shutdown"); + } + + #[tokio::test(start_paused = true)] + async fn failure_in_one_pass_does_not_stop_loop() { + // Manifest references a container the runtime does not have AND + // cannot create (no install path — install_fresh will also fail to + // pull, since CountingRuntime::pull_image returns Ok but the + // manifest's referenced container stays uncreated). In practice + // reconcile_all will observe the missing container, install_fresh + // will run, and the next pass will see a new state. We care about + // "loop keeps ticking even when the report has actions". + let rt = Arc::new(CountingRuntime::default()); + let orch = Arc::new(ProdContainerOrchestrator::with_runtime( + rt.clone(), + PathBuf::from("/nonexistent-for-tests"), + )); + orch.insert_manifest_for_test( + pull_manifest("bitcoin-knots", "docker.io/bitcoin/knots:28"), + PathBuf::from("/tmp/bk"), + ) + .await; + let shutdown = Arc::new(Notify::new()); + let reconciler = BootReconciler::new( + orch.clone(), + Duration::from_secs(30), + shutdown.clone(), + ); + let handle = tokio::spawn(reconciler.run_forever()); + + tokio::task::yield_now().await; + tokio::task::yield_now().await; + let first = rt.status_call_count(); + assert!(first >= 1, "initial pass should have touched the runtime"); + + // Advance one interval — second pass should fire regardless of what + // the first pass did. + tokio::time::advance(Duration::from_secs(31)).await; + tokio::task::yield_now().await; + tokio::task::yield_now().await; + let second = rt.status_call_count(); + assert!( + second > first, + "loop should have fired a second pass after the interval" + ); + + shutdown.notify_one(); + tokio::time::advance(Duration::from_millis(10)).await; + let _ = tokio::time::timeout(Duration::from_secs(5), handle).await; + } +} diff --git a/core/archipelago/src/container/mod.rs b/core/archipelago/src/container/mod.rs index c04826ca..2f15f9eb 100644 --- a/core/archipelago/src/container/mod.rs +++ b/core/archipelago/src/container/mod.rs @@ -1,3 +1,4 @@ +pub mod boot_reconciler; pub mod data_manager; pub mod dev_orchestrator; pub mod docker_packages; @@ -6,6 +7,7 @@ pub mod prod_orchestrator; pub mod registry; pub mod traits; +pub use boot_reconciler::{BootReconciler, DEFAULT_INTERVAL as RECONCILER_DEFAULT_INTERVAL}; pub use dev_orchestrator::DevContainerOrchestrator; pub use docker_packages::DockerPackageScanner; pub use prod_orchestrator::{