//! BootReconciler — the long-running task that keeps the prod orchestrator's //! desired-state view in lockstep with what podman actually has. //! //! Step 5 of the rust-orchestrator migration. Spawned once from `main.rs` //! (Step 6) after the initial `adopt_existing()` pass. Every `interval` it //! calls `ProdContainerOrchestrator::reconcile_existing()`, which repairs //! containers that already exist without installing every catalog manifest. //! //! Per answered design Q3, `interval` defaults to 30 seconds. //! //! Shutdown is signalled via `Arc`. The reconciler finishes its //! current `reconcile_all` call before exiting — we don't interrupt an //! in-flight pull or build. //! //! See `docs/rust-orchestrator-migration.md` §269-352. use std::sync::Arc; use std::time::Duration; use tokio::sync::Notify; use tokio::time::{self, Instant}; use crate::container::prod_orchestrator::{ProdContainerOrchestrator, ReconcileReport}; /// Default reconciler cadence (answered design Q3). pub const DEFAULT_INTERVAL: Duration = Duration::from_secs(30); pub struct BootReconciler { orchestrator: Arc, interval: Duration, shutdown: Arc, /// Run the companion-unit repair stage each tick. Default true. /// Tests disable this — companion reconcile shells out to /// `systemctl --user` and `podman`, which both block real time /// and would race the paused-clock test fixtures. companion_stage: bool, wait_for_recovery: bool, } impl BootReconciler { pub fn new( orchestrator: Arc, interval: Duration, shutdown: Arc, ) -> Self { Self { orchestrator, interval, shutdown, companion_stage: true, wait_for_recovery: true, } } /// Disable the companion-unit reconcile stage. Used by unit tests /// that exercise loop cadence without the real systemd / podman /// surface. Production must not call this. #[cfg(test)] pub fn without_companion_stage(mut self) -> Self { self.companion_stage = false; self.wait_for_recovery = false; self } /// Run the reconcile loop until `shutdown` is notified. /// /// Does one reconcile immediately, then sleeps `interval` between /// subsequent passes. A `shutdown.notify_one()` call unblocks the sleep /// and the task returns after the *next* pass completes. /// /// Each pass is two stages: /// 1. App reconcile: `reconcile_all()` keeps every loaded manifest's /// container running. /// 2. Companion reconcile: any expected Quadlet companion unit that /// is missing or inactive is repaired (writes the unit, daemon- /// reloads, starts the service). This is the safety net for the /// "someone deleted my unit file" / "systemd lost the service" /// failure modes. /// /// Never panics: per-app failures are absorbed into `ReconcileReport` /// by the orchestrator, and companion failures are logged but never /// propagated. pub async fn run_forever(self) { let wait_start = Instant::now(); while self.wait_for_recovery && !crate::crash_recovery::is_recovery_complete() { if wait_start.elapsed() > Duration::from_secs(1800) { tracing::warn!("boot reconciler: boot recovery did not complete within 30 minutes, starting anyway"); break; } tokio::select! { _ = time::sleep(Duration::from_secs(5)) => {} _ = self.shutdown.notified() => { tracing::info!("boot reconciler: shutdown requested before recovery completed"); return; } } } // Initial pass: no delay. self.tick().await; loop { let deadline = Instant::now() + self.interval; tokio::select! { _ = time::sleep_until(deadline) => { self.tick().await; } _ = self.shutdown.notified() => { tracing::info!("boot reconciler: shutdown requested, exiting loop"); break; } } } } async fn tick(&self) { let report = self.orchestrator.reconcile_existing().await; Self::log_report(&report); if !self.companion_stage { return; } let installed = self.orchestrator.manifest_ids().await; for (companion, err) in crate::container::companion::reconcile(&installed).await { tracing::warn!( companion = %companion, error = %err, "companion reconcile failed" ); } } fn log_report(report: &ReconcileReport) { for (app_id, action) in &report.actions { tracing::debug!(app_id = %app_id, action = ?action, "reconcile action"); } for (app_id, err) in &report.failures { tracing::warn!(app_id = %app_id, error = %err, "reconcile failure"); } if report.failures.is_empty() { tracing::debug!(count = report.actions.len(), "reconcile pass complete"); } else { tracing::warn!( ok = report.actions.len(), failed = report.failures.len(), "reconcile pass completed with failures" ); } } } // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- #[cfg(test)] mod tests { use super::*; use crate::container::prod_orchestrator::ProdContainerOrchestrator; use anyhow::Result; use archipelago_container::{ AppManifest, BuildConfig, ContainerRuntime as ContainerRuntimeTrait, ContainerState, ContainerStatus, }; use async_trait::async_trait; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Mutex as StdMutex; /// Instrumented runtime that counts reconcile-loop side effects so tests /// can tell exactly how many passes have fired. All containers are /// reported as already Running so `reconcile_all` will NoOp — we are only /// measuring loop cadence, not install behavior. #[derive(Default)] struct CountingRuntime { /// Number of times get_container_status has been called. Each /// reconcile_all pass hits this once per manifest, so with one /// manifest this equals the number of reconcile passes. status_calls: StdMutex, running: StdMutex>, } impl CountingRuntime { fn new_with(names: &[&str]) -> Self { let me = Self::default(); let mut m = me.running.lock().unwrap(); for n in names { m.insert((*n).to_string(), ContainerState::Running); } drop(m); me } fn status_call_count(&self) -> u32 { *self.status_calls.lock().unwrap() } } #[async_trait] impl ContainerRuntimeTrait for CountingRuntime { async fn pull_image(&self, _: &str, _: Option<&str>) -> Result<()> { Ok(()) } async fn create_container(&self, _: &AppManifest, name: &str, _: u16) -> Result { Ok(name.to_string()) } async fn start_container(&self, _: &str) -> Result<()> { Ok(()) } async fn stop_container(&self, _: &str) -> Result<()> { Ok(()) } async fn remove_container(&self, _: &str) -> Result<()> { Ok(()) } async fn get_container_status(&self, name: &str) -> Result { *self.status_calls.lock().unwrap() += 1; let state = self .running .lock() .unwrap() .get(name) .cloned() .ok_or_else(|| anyhow::anyhow!("not found: {name}"))?; Ok(ContainerStatus { id: format!("id-{name}"), name: name.to_string(), state, health: None, exit_code: None, started_at: None, image: "test".into(), created: "now".into(), ports: vec![], lan_address: None, }) } async fn get_container_logs(&self, _: &str, _: u32) -> Result> { Ok(vec![]) } async fn list_containers(&self) -> Result> { Ok(vec![]) } async fn image_exists(&self, _: &str) -> Result { Ok(true) } async fn build_image(&self, _: &BuildConfig) -> Result<()> { Ok(()) } } fn pull_manifest(id: &str, image: &str) -> AppManifest { let yaml = format!( "app:\n id: {id}\n name: {id}\n version: 1.0.0\n container:\n image: {image}\n" ); AppManifest::parse(&yaml).unwrap() } async fn orch_with_one_running_manifest( rt: Arc, ) -> Arc { let mut orch = ProdContainerOrchestrator::with_runtime(rt, PathBuf::from("/nonexistent-for-tests")); let tmp = tempfile::tempdir().unwrap().keep(); orch.set_data_dir(tmp); orch.set_disk_gb_for_test(2_000); let orch = Arc::new(orch); orch.insert_manifest_for_test( pull_manifest("test-app", "docker.io/example/test-app:1"), PathBuf::from("/tmp/test-app"), ) .await; orch } async fn wait_for_status_calls(rt: &CountingRuntime, expected: u32) -> u32 { for _ in 0..100 { let count = rt.status_call_count(); if count >= expected { return count; } tokio::task::yield_now().await; tokio::time::sleep(Duration::from_millis(1)).await; } rt.status_call_count() } #[tokio::test] async fn initial_pass_fires_immediately() { let rt = Arc::new(CountingRuntime::new_with(&["test-app"])); let orch = orch_with_one_running_manifest(rt.clone()).await; let shutdown = Arc::new(Notify::new()); let reconciler = BootReconciler::new(orch.clone(), Duration::from_millis(50), shutdown.clone()) .without_companion_stage(); let handle = tokio::spawn(reconciler.run_forever()); // We expect exactly one reconcile pass to have run by now (the initial), // NOT a second one (the 30s sleep hasn't elapsed in paused time). assert_eq!( wait_for_status_calls(&rt, 1).await, 1, "initial pass should fire once" ); shutdown.notify_one(); tokio::task::yield_now().await; let _ = tokio::time::timeout(Duration::from_secs(1), handle).await; } #[tokio::test] async fn second_pass_fires_after_interval() { let rt = Arc::new(CountingRuntime::new_with(&["test-app"])); let orch = orch_with_one_running_manifest(rt.clone()).await; let shutdown = Arc::new(Notify::new()); let reconciler = BootReconciler::new(orch.clone(), Duration::from_millis(10), shutdown.clone()) .without_companion_stage(); let handle = tokio::spawn(reconciler.run_forever()); assert_eq!(wait_for_status_calls(&rt, 1).await, 1); tokio::time::sleep(Duration::from_millis(20)).await; wait_for_status_calls(&rt, 2).await; assert_eq!( rt.status_call_count(), 2, "a second reconcile pass should fire after one interval" ); shutdown.notify_one(); tokio::task::yield_now().await; let _ = tokio::time::timeout(Duration::from_secs(1), handle).await; } #[tokio::test] async fn shutdown_terminates_loop() { let rt = Arc::new(CountingRuntime::new_with(&["test-app"])); let orch = orch_with_one_running_manifest(rt.clone()).await; let shutdown = Arc::new(Notify::new()); let reconciler = BootReconciler::new(orch.clone(), Duration::from_millis(50), shutdown.clone()) .without_companion_stage(); let handle = tokio::spawn(reconciler.run_forever()); wait_for_status_calls(&rt, 1).await; shutdown.notify_one(); let result = tokio::time::timeout(Duration::from_secs(5), handle).await; assert!(result.is_ok(), "reconciler did not exit after shutdown"); } #[tokio::test] async fn failure_in_one_pass_does_not_stop_loop() { // Manifest references a container the runtime does not have AND // cannot create (no install path — install_fresh will also fail to // pull, since CountingRuntime::pull_image returns Ok but the // manifest's referenced container stays uncreated). In practice // reconcile_all will observe the missing container, install_fresh // will run, and the next pass will see a new state. We care about // "loop keeps ticking even when the report has actions". let rt = Arc::new(CountingRuntime::default()); let mut orch = ProdContainerOrchestrator::with_runtime( rt.clone(), PathBuf::from("/nonexistent-for-tests"), ); let tmp = tempfile::tempdir().unwrap().keep(); orch.set_data_dir(tmp); orch.set_disk_gb_for_test(2_000); let orch = Arc::new(orch); orch.insert_manifest_for_test( pull_manifest("test-app", "docker.io/example/test-app:1"), PathBuf::from("/tmp/test-app"), ) .await; let shutdown = Arc::new(Notify::new()); let reconciler = BootReconciler::new(orch.clone(), Duration::from_millis(10), shutdown.clone()) .without_companion_stage(); let handle = tokio::spawn(reconciler.run_forever()); let first = wait_for_status_calls(&rt, 1).await; assert!(first >= 1, "initial pass should have touched the runtime"); tokio::time::sleep(Duration::from_millis(20)).await; tokio::task::yield_now().await; tokio::task::yield_now().await; let second = rt.status_call_count(); assert!( second > first, "loop should have fired a second pass after the interval" ); shutdown.notify_one(); let _ = tokio::time::timeout(Duration::from_secs(5), handle).await; } }