Phase 3a of the install path consolidation. Two coupled changes:
1. install.rs handle_package_install: gate the legacy "container exists →
adopt + return" probe on !orchestrator_managed. Apps the orchestrator
knows about (bitcoin-knots, bitcoin-core, lnd, electrumx, fedimint,
filebrowser, btcpay-server stack apps, mempool stack apps, plus the
companion UIs that just moved to Quadlet) skip the legacy probe and
fall straight into the orchestrator branch.
The legacy adopt block was returning success on a bare `podman start`
exit-0 — even when the process inside the container crashed seconds
later. That's the .228 "running but unreachable" failure mode. The
orchestrator's ensure_running honors the manifest's health check and
pre-start hooks (e.g. re-renders bitcoin-ui's nginx.conf if the RPC
password rotated), so this is a behavioral upgrade, not just a
refactor.
2. ProdContainerOrchestrator::install: make idempotent. Previously it
blindly called install_fresh which would fail on `podman create` if
the container name already existed. Now it delegates to ensure_running:
- Container Running + healthy → no-op (refresh hooks, restart if
config rewritten)
- Container Stopped/Exited → start (with hook refresh)
- Container missing → install_fresh
- Container in wedged state (Created/Paused/Unknown) → force-recreate
Without this, change #1 would regress every "container already exists"
case for the 18 orchestrator-managed app IDs. With it, install becomes
the single source of truth for "make app X be in the desired state."
Tests: 654 passed across the workspace (614 unit + 37 orchestration + 3
rpc), 0 failures. The 20 prod_orchestrator tests cover the install /
ensure_running / reconcile paths the new install delegates through.
Net delta: install.rs grows by ~30 lines (gating wrapper + comments),
prod_orchestrator.rs grows by ~30 lines (idempotent install body). Both
are temporary — the larger deletions (~1700 lines) come once every app
has been verified through the orchestrator path in subsequent phases.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
376 lines
14 KiB
Rust
376 lines
14 KiB
Rust
//! BootReconciler — the long-running task that keeps the prod orchestrator's
|
|
//! desired-state view in lockstep with what podman actually has.
|
|
//!
|
|
//! Step 5 of the rust-orchestrator migration. Spawned once from `main.rs`
|
|
//! (Step 6) after the initial `adopt_existing()` pass. Every `interval` it
|
|
//! calls `ProdContainerOrchestrator::reconcile_all()`, which ensures every
|
|
//! loaded manifest has a running container, installing fresh ones as needed.
|
|
//!
|
|
//! Per answered design Q3, `interval` defaults to 30 seconds.
|
|
//!
|
|
//! Shutdown is signalled via `Arc<Notify>`. The reconciler finishes its
|
|
//! current `reconcile_all` call before exiting — we don't interrupt an
|
|
//! in-flight pull or build.
|
|
//!
|
|
//! See `docs/rust-orchestrator-migration.md` §269-352.
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use tokio::sync::Notify;
|
|
use tokio::time::{self, Instant};
|
|
|
|
use crate::container::prod_orchestrator::{ProdContainerOrchestrator, ReconcileReport};
|
|
|
|
/// Default reconciler cadence (answered design Q3).
|
|
pub const DEFAULT_INTERVAL: Duration = Duration::from_secs(30);
|
|
|
|
pub struct BootReconciler {
|
|
orchestrator: Arc<ProdContainerOrchestrator>,
|
|
interval: Duration,
|
|
shutdown: Arc<Notify>,
|
|
/// Run the companion-unit repair stage each tick. Default true.
|
|
/// Tests disable this — companion reconcile shells out to
|
|
/// `systemctl --user` and `podman`, which both block real time
|
|
/// and would race the paused-clock test fixtures.
|
|
companion_stage: bool,
|
|
}
|
|
|
|
impl BootReconciler {
|
|
pub fn new(
|
|
orchestrator: Arc<ProdContainerOrchestrator>,
|
|
interval: Duration,
|
|
shutdown: Arc<Notify>,
|
|
) -> Self {
|
|
Self {
|
|
orchestrator,
|
|
interval,
|
|
shutdown,
|
|
companion_stage: true,
|
|
}
|
|
}
|
|
|
|
/// Disable the companion-unit reconcile stage. Used by unit tests
|
|
/// that exercise loop cadence without the real systemd / podman
|
|
/// surface. Production must not call this.
|
|
#[cfg(test)]
|
|
pub fn without_companion_stage(mut self) -> Self {
|
|
self.companion_stage = false;
|
|
self
|
|
}
|
|
|
|
/// Run the reconcile loop until `shutdown` is notified.
|
|
///
|
|
/// Does one reconcile immediately, then sleeps `interval` between
|
|
/// subsequent passes. A `shutdown.notify_one()` call unblocks the sleep
|
|
/// and the task returns after the *next* pass completes.
|
|
///
|
|
/// Each pass is two stages:
|
|
/// 1. App reconcile: `reconcile_all()` keeps every loaded manifest's
|
|
/// container running.
|
|
/// 2. Companion reconcile: any expected Quadlet companion unit that
|
|
/// is missing or inactive is repaired (writes the unit, daemon-
|
|
/// reloads, starts the service). This is the safety net for the
|
|
/// "someone deleted my unit file" / "systemd lost the service"
|
|
/// failure modes.
|
|
///
|
|
/// Never panics: per-app failures are absorbed into `ReconcileReport`
|
|
/// by the orchestrator, and companion failures are logged but never
|
|
/// propagated.
|
|
pub async fn run_forever(self) {
|
|
// Initial pass: no delay.
|
|
self.tick().await;
|
|
|
|
loop {
|
|
let deadline = Instant::now() + self.interval;
|
|
tokio::select! {
|
|
_ = time::sleep_until(deadline) => {
|
|
self.tick().await;
|
|
}
|
|
_ = self.shutdown.notified() => {
|
|
tracing::info!("boot reconciler: shutdown requested, exiting loop");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn tick(&self) {
|
|
let report = self.orchestrator.reconcile_all().await;
|
|
Self::log_report(&report);
|
|
|
|
if !self.companion_stage {
|
|
return;
|
|
}
|
|
let installed = self.orchestrator.manifest_ids().await;
|
|
for (companion, err) in crate::container::companion::reconcile(&installed).await {
|
|
tracing::warn!(
|
|
companion = %companion,
|
|
error = %err,
|
|
"companion reconcile failed"
|
|
);
|
|
}
|
|
}
|
|
|
|
fn log_report(report: &ReconcileReport) {
|
|
for (app_id, action) in &report.actions {
|
|
tracing::debug!(app_id = %app_id, action = ?action, "reconcile action");
|
|
}
|
|
for (app_id, err) in &report.failures {
|
|
tracing::warn!(app_id = %app_id, error = %err, "reconcile failure");
|
|
}
|
|
if report.failures.is_empty() {
|
|
tracing::debug!(count = report.actions.len(), "reconcile pass complete");
|
|
} else {
|
|
tracing::warn!(
|
|
ok = report.actions.len(),
|
|
failed = report.failures.len(),
|
|
"reconcile pass completed with failures"
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Tests
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::container::prod_orchestrator::ProdContainerOrchestrator;
|
|
use anyhow::Result;
|
|
use archipelago_container::{
|
|
AppManifest, BuildConfig, ContainerRuntime as ContainerRuntimeTrait, ContainerState,
|
|
ContainerStatus,
|
|
};
|
|
use async_trait::async_trait;
|
|
use std::collections::HashMap;
|
|
use std::path::PathBuf;
|
|
use std::sync::Mutex as StdMutex;
|
|
|
|
/// Instrumented runtime that counts reconcile-loop side effects so tests
|
|
/// can tell exactly how many passes have fired. All containers are
|
|
/// reported as already Running so `reconcile_all` will NoOp — we are only
|
|
/// measuring loop cadence, not install behavior.
|
|
#[derive(Default)]
|
|
struct CountingRuntime {
|
|
/// Number of times get_container_status has been called. Each
|
|
/// reconcile_all pass hits this once per manifest, so with one
|
|
/// manifest this equals the number of reconcile passes.
|
|
status_calls: StdMutex<u32>,
|
|
running: StdMutex<HashMap<String, ContainerState>>,
|
|
}
|
|
|
|
impl CountingRuntime {
|
|
fn new_with(names: &[&str]) -> Self {
|
|
let me = Self::default();
|
|
let mut m = me.running.lock().unwrap();
|
|
for n in names {
|
|
m.insert((*n).to_string(), ContainerState::Running);
|
|
}
|
|
drop(m);
|
|
me
|
|
}
|
|
fn status_call_count(&self) -> u32 {
|
|
*self.status_calls.lock().unwrap()
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ContainerRuntimeTrait for CountingRuntime {
|
|
async fn pull_image(&self, _: &str, _: Option<&str>) -> Result<()> {
|
|
Ok(())
|
|
}
|
|
async fn create_container(&self, _: &AppManifest, name: &str, _: u16) -> Result<String> {
|
|
Ok(name.to_string())
|
|
}
|
|
async fn start_container(&self, _: &str) -> Result<()> {
|
|
Ok(())
|
|
}
|
|
async fn stop_container(&self, _: &str) -> Result<()> {
|
|
Ok(())
|
|
}
|
|
async fn remove_container(&self, _: &str) -> Result<()> {
|
|
Ok(())
|
|
}
|
|
async fn get_container_status(&self, name: &str) -> Result<ContainerStatus> {
|
|
*self.status_calls.lock().unwrap() += 1;
|
|
let state = self
|
|
.running
|
|
.lock()
|
|
.unwrap()
|
|
.get(name)
|
|
.cloned()
|
|
.ok_or_else(|| anyhow::anyhow!("not found: {name}"))?;
|
|
Ok(ContainerStatus {
|
|
id: format!("id-{name}"),
|
|
name: name.to_string(),
|
|
state,
|
|
health: None,
|
|
exit_code: None,
|
|
started_at: None,
|
|
image: "test".into(),
|
|
created: "now".into(),
|
|
ports: vec![],
|
|
lan_address: None,
|
|
})
|
|
}
|
|
async fn get_container_logs(&self, _: &str, _: u32) -> Result<Vec<String>> {
|
|
Ok(vec![])
|
|
}
|
|
async fn list_containers(&self) -> Result<Vec<ContainerStatus>> {
|
|
Ok(vec![])
|
|
}
|
|
async fn image_exists(&self, _: &str) -> Result<bool> {
|
|
Ok(true)
|
|
}
|
|
async fn build_image(&self, _: &BuildConfig) -> Result<()> {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn pull_manifest(id: &str, image: &str) -> AppManifest {
|
|
let yaml = format!(
|
|
"app:\n id: {id}\n name: {id}\n version: 1.0.0\n container:\n image: {image}\n"
|
|
);
|
|
AppManifest::parse(&yaml).unwrap()
|
|
}
|
|
|
|
async fn orch_with_one_running_manifest(
|
|
rt: Arc<CountingRuntime>,
|
|
) -> Arc<ProdContainerOrchestrator> {
|
|
let orch = Arc::new(ProdContainerOrchestrator::with_runtime(
|
|
rt,
|
|
PathBuf::from("/nonexistent-for-tests"),
|
|
));
|
|
orch.insert_manifest_for_test(
|
|
pull_manifest("bitcoin-knots", "docker.io/bitcoin/knots:28"),
|
|
PathBuf::from("/tmp/bk"),
|
|
)
|
|
.await;
|
|
orch
|
|
}
|
|
|
|
#[tokio::test(start_paused = true)]
|
|
async fn initial_pass_fires_immediately() {
|
|
let rt = Arc::new(CountingRuntime::new_with(&["bitcoin-knots"]));
|
|
let orch = orch_with_one_running_manifest(rt.clone()).await;
|
|
let shutdown = Arc::new(Notify::new());
|
|
let reconciler =
|
|
BootReconciler::new(orch.clone(), Duration::from_secs(30), shutdown.clone())
|
|
.without_companion_stage();
|
|
let handle = tokio::spawn(reconciler.run_forever());
|
|
|
|
// Yield so the spawned task gets CPU to run its initial reconcile.
|
|
tokio::task::yield_now().await;
|
|
tokio::task::yield_now().await;
|
|
|
|
// We expect exactly one reconcile pass to have run by now (the initial),
|
|
// NOT a second one (the 30s sleep hasn't elapsed in paused time).
|
|
assert_eq!(rt.status_call_count(), 1, "initial pass should fire once");
|
|
|
|
shutdown.notify_one();
|
|
// Under paused clock the select! is blocked on sleep_until; the notify
|
|
// will unblock it. Advance wall-clock a hair so the notify gets polled.
|
|
tokio::task::yield_now().await;
|
|
let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
|
|
}
|
|
|
|
#[tokio::test(start_paused = true)]
|
|
async fn second_pass_fires_after_interval() {
|
|
let rt = Arc::new(CountingRuntime::new_with(&["bitcoin-knots"]));
|
|
let orch = orch_with_one_running_manifest(rt.clone()).await;
|
|
let shutdown = Arc::new(Notify::new());
|
|
let reconciler =
|
|
BootReconciler::new(orch.clone(), Duration::from_secs(30), shutdown.clone())
|
|
.without_companion_stage();
|
|
let handle = tokio::spawn(reconciler.run_forever());
|
|
|
|
tokio::task::yield_now().await;
|
|
tokio::task::yield_now().await;
|
|
assert_eq!(rt.status_call_count(), 1);
|
|
|
|
// Fast-forward past one interval; the sleep_until should fire.
|
|
tokio::time::advance(Duration::from_secs(31)).await;
|
|
tokio::task::yield_now().await;
|
|
tokio::task::yield_now().await;
|
|
|
|
assert_eq!(
|
|
rt.status_call_count(),
|
|
2,
|
|
"a second reconcile pass should fire after one interval"
|
|
);
|
|
|
|
shutdown.notify_one();
|
|
tokio::task::yield_now().await;
|
|
let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
|
|
}
|
|
|
|
#[tokio::test(start_paused = true)]
|
|
async fn shutdown_terminates_loop() {
|
|
let rt = Arc::new(CountingRuntime::new_with(&["bitcoin-knots"]));
|
|
let orch = orch_with_one_running_manifest(rt.clone()).await;
|
|
let shutdown = Arc::new(Notify::new());
|
|
let reconciler =
|
|
BootReconciler::new(orch.clone(), Duration::from_secs(30), shutdown.clone())
|
|
.without_companion_stage();
|
|
let handle = tokio::spawn(reconciler.run_forever());
|
|
tokio::task::yield_now().await;
|
|
tokio::task::yield_now().await;
|
|
|
|
shutdown.notify_one();
|
|
// The select! should wake on Notified and return. Use a real timeout
|
|
// with advancing the paused clock to make sure the task exits.
|
|
tokio::time::advance(Duration::from_millis(10)).await;
|
|
let result = tokio::time::timeout(Duration::from_secs(5), handle).await;
|
|
assert!(result.is_ok(), "reconciler did not exit after shutdown");
|
|
}
|
|
|
|
#[tokio::test(start_paused = true)]
|
|
async fn failure_in_one_pass_does_not_stop_loop() {
|
|
// Manifest references a container the runtime does not have AND
|
|
// cannot create (no install path — install_fresh will also fail to
|
|
// pull, since CountingRuntime::pull_image returns Ok but the
|
|
// manifest's referenced container stays uncreated). In practice
|
|
// reconcile_all will observe the missing container, install_fresh
|
|
// will run, and the next pass will see a new state. We care about
|
|
// "loop keeps ticking even when the report has actions".
|
|
let rt = Arc::new(CountingRuntime::default());
|
|
let orch = Arc::new(ProdContainerOrchestrator::with_runtime(
|
|
rt.clone(),
|
|
PathBuf::from("/nonexistent-for-tests"),
|
|
));
|
|
orch.insert_manifest_for_test(
|
|
pull_manifest("bitcoin-knots", "docker.io/bitcoin/knots:28"),
|
|
PathBuf::from("/tmp/bk"),
|
|
)
|
|
.await;
|
|
let shutdown = Arc::new(Notify::new());
|
|
let reconciler =
|
|
BootReconciler::new(orch.clone(), Duration::from_secs(30), shutdown.clone())
|
|
.without_companion_stage();
|
|
let handle = tokio::spawn(reconciler.run_forever());
|
|
|
|
tokio::task::yield_now().await;
|
|
tokio::task::yield_now().await;
|
|
let first = rt.status_call_count();
|
|
assert!(first >= 1, "initial pass should have touched the runtime");
|
|
|
|
// Advance one interval — second pass should fire regardless of what
|
|
// the first pass did.
|
|
tokio::time::advance(Duration::from_secs(31)).await;
|
|
tokio::task::yield_now().await;
|
|
tokio::task::yield_now().await;
|
|
let second = rt.status_call_count();
|
|
assert!(
|
|
second > first,
|
|
"loop should have fired a second pass after the interval"
|
|
);
|
|
|
|
shutdown.notify_one();
|
|
tokio::time::advance(Duration::from_millis(10)).await;
|
|
let _ = tokio::time::timeout(Duration::from_secs(5), handle).await;
|
|
}
|
|
}
|