Dorian 062e1fada2 release(v1.7.18-alpha): transitive peers default Trusted + update-flow logs
Flip transitively-discovered federation peers to Trusted instead of
Observer. Hints are already only ingested from peers we trust and only
peers we trust are re-exported via build_local_state, so the chain of
trust is already vetted end-to-end — making the user promote each
newcomer by hand was friction with no security win.

Backend:
- federation/sync.rs: merge_transitive_peers now inserts TrustLevel::Trusted
  (doc comment updated to explain the transitive-trust rationale)
- update.rs: info! log at download start (version, components, total_bytes,
  staging path), cancel (staging wiped?, marker cleared?), and apply (backup
  path) so journalctl reveals where a stuck update actually is

Frontend:
- SystemUpdate What's New block gets a v1.7.18-alpha entry

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 20:20:36 -04:00

1024 lines
39 KiB
Rust

//! Update system: check for updates, download deltas, apply with rollback.
use anyhow::{Context, Result};
use chrono::Timelike;
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use tokio::fs;
use tracing::{debug, info};
/// Live download progress counters. Updated by download_component_resumable
/// as bytes arrive and read by the update.status RPC so the UI can show
/// a real progress bar instead of a fake creep. Global because the
/// download runs in one place at a time; no need for per-handler state.
pub static DOWNLOAD_BYTES: AtomicU64 = AtomicU64::new(0);
pub static DOWNLOAD_TOTAL: AtomicU64 = AtomicU64::new(0);
/// Set true to ask the in-flight download loop to bail out at the next
/// chunk boundary. Read via `is_canceled`; reset at the start of every
/// `download_update` run. Also flipped by the `cancel_download` RPC.
pub static DOWNLOAD_CANCEL: AtomicBool = AtomicBool::new(false);
/// Monotonic ms timestamp of the last time DOWNLOAD_BYTES advanced.
/// Lets `update.status` flag a download as "stalled" when no bytes have
/// arrived for a while, so the UI can offer a Cancel button with more
/// confidence than "looks stuck at 0%".
pub static DOWNLOAD_PROGRESS_AT: AtomicU64 = AtomicU64::new(0);
fn now_ms() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
fn is_canceled() -> bool {
DOWNLOAD_CANCEL.load(Ordering::Relaxed)
}
const DEFAULT_UPDATE_MANIFEST_URL: &str =
"https://git.tx1138.com/lfg2025/archy/raw/branch/main/releases/manifest.json";
const UPDATE_STATE_FILE: &str = "update_state.json";
fn update_manifest_url() -> String {
std::env::var("ARCHIPELAGO_UPDATE_URL")
.unwrap_or_else(|_| DEFAULT_UPDATE_MANIFEST_URL.to_string())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateManifest {
pub version: String,
pub release_date: String,
pub changelog: Vec<String>,
pub components: Vec<ComponentUpdate>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComponentUpdate {
pub name: String,
pub current_version: String,
pub new_version: String,
pub download_url: String,
pub sha256: String,
pub size_bytes: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[derive(Default)]
pub enum UpdateSchedule {
Manual,
#[default]
DailyCheck,
AutoApply,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateState {
pub current_version: String,
pub last_check: Option<String>,
pub available_update: Option<UpdateManifest>,
pub update_in_progress: bool,
pub rollback_available: bool,
#[serde(default)]
pub schedule: UpdateSchedule,
}
impl Default for UpdateState {
fn default() -> Self {
Self {
current_version: env!("CARGO_PKG_VERSION").to_string(),
last_check: None,
available_update: None,
update_in_progress: false,
rollback_available: false,
schedule: UpdateSchedule::DailyCheck,
}
}
}
pub async fn load_state(data_dir: &Path) -> Result<UpdateState> {
let path = data_dir.join(UPDATE_STATE_FILE);
if !path.exists() {
let state = UpdateState::default();
save_state(data_dir, &state).await?;
return Ok(state);
}
let data = fs::read_to_string(&path)
.await
.context("Reading update state")?;
let mut state: UpdateState =
serde_json::from_str(&data).context("Parsing update state")?;
// Keep current_version in sync with the binary. Sideloaded nodes
// (ssh + cp /usr/local/bin/archipelago) don't touch the state file,
// so without this the running 1.7.0-alpha binary would keep seeing
// `current_version: "1.6.0-alpha"` and re-offer itself as an update.
let running = env!("CARGO_PKG_VERSION");
if state.current_version != running {
state.current_version = running.to_string();
// Clear any stale "available_update" that matched the old
// current_version — the new binary will re-check on its own.
if let Some(ref avail) = state.available_update {
if avail.version == running {
state.available_update = None;
}
}
save_state(data_dir, &state).await?;
}
Ok(state)
}
pub async fn save_state(data_dir: &Path, state: &UpdateState) -> Result<()> {
let path = data_dir.join(UPDATE_STATE_FILE);
let data = serde_json::to_string_pretty(state)?;
fs::write(&path, data).await.context("Writing update state")
}
/// Check for available updates by fetching the release manifest.
pub async fn check_for_updates(data_dir: &Path) -> Result<UpdateState> {
let mut state = load_state(data_dir).await?;
info!("Checking for updates...");
// 45s total budget, and we retry up to 3 times so a momentary
// gitea hiccup doesn't make the node report "up to date" when an
// update actually exists. Short per-attempt timeout keeps the RPC
// responsive in the common case.
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(15))
.connect_timeout(std::time::Duration::from_secs(10))
.build()
.context("Failed to create HTTP client")?;
let manifest_url = update_manifest_url();
let mut last_err: Option<String> = None;
let mut handled = false;
for attempt in 1..=3u8 {
if attempt > 1 {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
match client.get(&manifest_url).send().await {
Ok(resp) if resp.status().is_success() => {
match resp.json::<UpdateManifest>().await {
Ok(manifest) => {
if manifest.version != state.current_version {
info!(
current = %state.current_version,
available = %manifest.version,
"Update available"
);
state.available_update = Some(manifest);
} else {
debug!("Already on latest version: {}", state.current_version);
state.available_update = None;
}
handled = true;
break;
}
Err(e) => {
last_err = Some(format!("parse: {}", e));
}
}
}
Ok(resp) => {
last_err = Some(format!("HTTP {}", resp.status()));
}
Err(e) => {
last_err = Some(e.to_string());
}
}
}
if !handled {
if let Some(e) = last_err {
debug!("Update check failed after retries: {}", e);
}
}
state.last_check = Some(chrono::Utc::now().to_rfc3339());
save_state(data_dir, &state).await?;
Ok(state)
}
/// Get current update status without checking remote.
pub async fn get_status(data_dir: &Path) -> Result<UpdateState> {
load_state(data_dir).await
}
/// Dismiss the available update notification.
pub async fn dismiss_update(data_dir: &Path) -> Result<()> {
let mut state = load_state(data_dir).await?;
state.available_update = None;
save_state(data_dir, &state).await
}
/// Download update components to a staging directory.
/// Verifies SHA256 hash for each component.
///
/// Robustness: each component download is **resumable** via HTTP Range
/// requests and retried up to 6 times with exponential backoff. When
/// gitea drops the connection mid-stream (happens regularly at slow
/// raw-file throughput), the next attempt picks up where the previous
/// one left off instead of restarting from byte zero. SHA256 is
/// verified over the complete file at the end of each component, so a
/// partially-corrupt resume still fails cleanly.
pub async fn download_update(data_dir: &Path) -> Result<DownloadProgress> {
let state = load_state(data_dir).await?;
let manifest = state
.available_update
.as_ref()
.ok_or_else(|| anyhow::anyhow!("No update available to download"))?;
let staging_dir = data_dir.join("update-staging");
fs::create_dir_all(&staging_dir)
.await
.context("Failed to create staging dir")?;
let client = reqwest::Client::builder()
// Per-request budget; each attempt gets the full hour. A retry
// restarts the budget cleanly.
.timeout(std::time::Duration::from_secs(3600))
.connect_timeout(std::time::Duration::from_secs(30))
.build()
.context("Failed to create HTTP client")?;
let mut downloaded = 0u64;
let total_bytes: u64 = manifest.components.iter().map(|c| c.size_bytes).sum();
info!(
version = %manifest.version,
components = manifest.components.len(),
total_bytes,
staging = %staging_dir.display(),
"Starting update download"
);
// Clear any stale cancel flag from a prior aborted run, then seed
// the live counters so polls during the handshake show the right
// denominator immediately instead of 0/0 → NaN%.
DOWNLOAD_CANCEL.store(false, Ordering::Relaxed);
DOWNLOAD_TOTAL.store(total_bytes, Ordering::Relaxed);
DOWNLOAD_BYTES.store(0, Ordering::Relaxed);
DOWNLOAD_PROGRESS_AT.store(now_ms(), Ordering::Relaxed);
for component in &manifest.components {
if is_canceled() {
DOWNLOAD_TOTAL.store(0, Ordering::Relaxed);
DOWNLOAD_BYTES.store(0, Ordering::Relaxed);
anyhow::bail!("Download canceled");
}
info!(name = %component.name, url = %component.download_url, "Downloading component");
let dest = staging_dir.join(&component.name);
download_component_resumable(&client, component, &dest, downloaded).await?;
downloaded += component.size_bytes;
DOWNLOAD_BYTES.store(downloaded, Ordering::Relaxed);
info!(
name = %component.name,
bytes = component.size_bytes,
"Component downloaded and verified"
);
}
// Mark update as downloaded
let mut state = load_state(data_dir).await?;
state.update_in_progress = true;
save_state(data_dir, &state).await?;
Ok(DownloadProgress {
total_bytes,
downloaded_bytes: downloaded,
components_downloaded: manifest.components.len(),
staging_dir: staging_dir.to_string_lossy().to_string(),
})
}
/// Download a single component to `dest`, resuming from the end of
/// any existing partial file via a Range request. Retries up to 6
/// times with exponential backoff (5s, 15s, 30s, 60s, 120s, 180s).
/// Verifies the SHA256 over the full file at the end.
async fn download_component_resumable(
client: &reqwest::Client,
component: &ComponentUpdate,
dest: &Path,
prior_total: u64,
) -> Result<()> {
use sha2::{Digest, Sha256};
use tokio::io::AsyncWriteExt;
const MAX_ATTEMPTS: u32 = 6;
const BACKOFFS: [u64; 5] = [5, 15, 30, 60, 120];
let mut last_err: Option<anyhow::Error> = None;
for attempt in 1..=MAX_ATTEMPTS {
let existing_len = match tokio::fs::metadata(dest).await {
Ok(m) => m.len(),
Err(_) => 0,
};
if existing_len >= component.size_bytes {
// File is already complete — break out and go verify.
break;
}
if attempt > 1 {
let delay = BACKOFFS[(attempt as usize - 2).min(BACKOFFS.len() - 1)];
tracing::warn!(
name = %component.name,
attempt,
resume_at = existing_len,
"Retrying download in {}s (previous error: {})",
delay,
last_err.as_ref().map(|e| e.to_string()).unwrap_or_default()
);
// Sleep in 500ms slices so a Cancel during backoff wakes
// promptly instead of waiting out the full exponential window.
let slices = delay * 2;
for _ in 0..slices {
if is_canceled() {
anyhow::bail!("Download canceled");
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
if is_canceled() {
anyhow::bail!("Download canceled");
}
let mut req = client.get(&component.download_url);
if existing_len > 0 {
req = req.header("Range", format!("bytes={}-", existing_len));
}
let resp = match req.send().await {
Ok(r) => r,
Err(e) => {
last_err = Some(anyhow::anyhow!(e));
continue;
}
};
let status = resp.status();
// 200 OK on a fresh start, 206 Partial Content on a resume
// that the server honoured. Anything else is a problem.
let is_resume = existing_len > 0 && status == reqwest::StatusCode::PARTIAL_CONTENT;
let is_fresh = existing_len == 0 && status.is_success();
let server_ignored_range = existing_len > 0 && status == reqwest::StatusCode::OK;
if !is_resume && !is_fresh && !server_ignored_range {
last_err = Some(anyhow::anyhow!(
"HTTP {} for {} (resume offset {})",
status,
component.name,
existing_len
));
continue;
}
// If the server ignored Range (returned 200 with the full
// body), wipe the partial file and start over.
let mut file = if server_ignored_range {
let _ = tokio::fs::remove_file(dest).await;
tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(dest)
.await
.context("open staging file")?
} else if is_resume {
tokio::fs::OpenOptions::new()
.append(true)
.open(dest)
.await
.context("open staging file for append")?
} else {
tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(dest)
.await
.context("open staging file")?
};
let mut resp = resp;
let mut stream_err = false;
let mut on_disk = existing_len;
let mut canceled = false;
loop {
if is_canceled() {
canceled = true;
break;
}
match resp.chunk().await {
Ok(Some(bytes)) => {
if let Err(e) = file.write_all(&bytes).await {
last_err = Some(anyhow::anyhow!(e).context("writing chunk"));
stream_err = true;
break;
}
on_disk += bytes.len() as u64;
DOWNLOAD_BYTES.store(
prior_total + on_disk.min(component.size_bytes),
Ordering::Relaxed,
);
DOWNLOAD_PROGRESS_AT.store(now_ms(), Ordering::Relaxed);
}
Ok(None) => break, // stream ended cleanly
Err(e) => {
last_err = Some(anyhow::anyhow!(e).context("reading chunk"));
stream_err = true;
break;
}
}
}
if canceled {
let _ = file.flush().await;
drop(file);
DOWNLOAD_TOTAL.store(0, Ordering::Relaxed);
DOWNLOAD_BYTES.store(0, Ordering::Relaxed);
anyhow::bail!("Download canceled");
}
let _ = file.flush().await;
let _ = file.sync_all().await;
drop(file);
if stream_err {
continue;
}
// Stream ended cleanly. If we've got the expected size, verify
// the SHA and succeed. Otherwise loop to resume from the new
// offset on the next attempt.
let final_len = tokio::fs::metadata(dest)
.await
.map(|m| m.len())
.unwrap_or(0);
if final_len < component.size_bytes {
last_err = Some(anyhow::anyhow!(
"download truncated: got {} of {} bytes",
final_len,
component.size_bytes
));
continue;
}
// Full file — verify hash.
let bytes = tokio::fs::read(dest)
.await
.context("read staging file for hash check")?;
let hash = hex::encode(Sha256::digest(&bytes));
if hash == component.sha256 {
return Ok(());
}
// SHA mismatch — the file on disk is garbage. Nuke it and
// start over from scratch on the next attempt.
let _ = tokio::fs::remove_file(dest).await;
last_err = Some(anyhow::anyhow!(
"SHA256 mismatch for {}: expected {}, got {}",
component.name,
component.sha256,
hash
));
}
Err(last_err.unwrap_or_else(|| anyhow::anyhow!("download failed without a captured error")))
}
/// Cancel an in-flight download. Sets the cancellation flag so the
/// download loop bails out at the next chunk or backoff boundary, then
/// zeros the live counters and wipes the staging directory so the UI
/// sees "no active download" immediately and the next attempt starts
/// clean. Safe to call even when no download is running.
pub async fn cancel_download(data_dir: &Path) -> Result<()> {
DOWNLOAD_CANCEL.store(true, Ordering::Relaxed);
DOWNLOAD_BYTES.store(0, Ordering::Relaxed);
DOWNLOAD_TOTAL.store(0, Ordering::Relaxed);
let staging = data_dir.join("update-staging");
let wiped = if staging.exists() {
tokio::fs::remove_dir_all(&staging).await.is_ok()
} else {
false
};
// Clear the "downloaded, ready to apply" marker too — a canceled
// download is not a staged update.
let mut cleared_marker = false;
if let Ok(mut state) = load_state(data_dir).await {
if state.update_in_progress {
state.update_in_progress = false;
let _ = save_state(data_dir, &state).await;
cleared_marker = true;
}
}
info!(
staging = %staging.display(),
wiped,
cleared_marker,
"Update download canceled"
);
Ok(())
}
/// Run a command as root, but *outside* the archipelago service's
/// restricted mount namespace.
///
/// archipelago.service uses `ProtectSystem=strict`, which makes `/opt`
/// and `/usr` read-only inside the service — and sudo inherits the
/// namespace, so `sudo mv /opt/archipelago/...` fails with EROFS even
/// though sudo itself is root. `systemd-run --wait` spawns a transient
/// service unit that inherits systemd's default protections (i.e. none
/// of ours), escaping the namespace.
async fn host_sudo(args: &[&str]) -> Result<std::process::ExitStatus> {
let mut full: Vec<&str> = vec![
"systemd-run",
"--wait",
"--quiet",
"--collect",
"--pipe",
"--",
];
full.extend_from_slice(args);
tokio::process::Command::new("sudo")
.args(&full)
.status()
.await
.context("sudo systemd-run spawn failed")
}
/// Apply a downloaded update. Backs up current binaries, replaces with staged versions.
pub async fn apply_update(data_dir: &Path) -> Result<()> {
let staging_dir = data_dir.join("update-staging");
if !staging_dir.exists() {
anyhow::bail!("No staged update found. Download first.");
}
let backup_dir = data_dir.join("update-backup");
fs::create_dir_all(&backup_dir)
.await
.context("Failed to create backup dir")?;
info!(
staging = %staging_dir.display(),
backup = %backup_dir.display(),
"Applying staged update"
);
// Back up current backend binary
let current_binary = Path::new("/usr/local/bin/archipelago");
if current_binary.exists() {
let backup_path = backup_dir.join("archipelago");
fs::copy(current_binary, &backup_path)
.await
.context("Failed to backup current binary")?;
info!("Current binary backed up");
}
// Apply staged components
let mut entries = fs::read_dir(&staging_dir)
.await
.context("Failed to read staging dir")?;
while let Some(entry) = entries.next_entry().await? {
let name = entry.file_name().to_string_lossy().to_string();
let src = entry.path();
match name.as_str() {
"archipelago" => {
// Two namespace gotchas this block works around:
// 1. We're running FROM /usr/local/bin/archipelago, so
// `install`/`cp` (O_TRUNC + write) fail with ETXTBSY.
// Use `mv`, which is atomic rename() and tolerates a
// busy destination.
// 2. archipelago.service sets ProtectSystem=strict, so
// even `sudo mv` into /usr/local/bin/ fails EROFS —
// sudo inherits the service's mount namespace. Route
// the rename through systemd-run so it runs in a
// transient unit with default protections.
let staged = src.to_string_lossy().to_string();
let _ = host_sudo(&["chmod", "0755", &staged]).await;
let _ = host_sudo(&["chown", "root:root", &staged]).await;
let status = host_sudo(&["mv", &staged, "/usr/local/bin/archipelago"])
.await
.with_context(|| format!("Failed to spawn mv for {}", name))?;
if !status.success() {
anyhow::bail!(
"mv into /usr/local/bin failed for {} (exit {:?})",
name,
status.code()
);
}
info!(name = %name, "Backend binary applied");
}
_ if name.contains("frontend") && name.ends_with(".tar.gz") => {
// Tarball contents are the *inside* of web-ui/ (root entries
// `./test-aiui.html`, `./assets/`, ...). Extract into a
// uniquely-named staging dir, then mv into place. No `rm
// -rf` pre-cleanup — that's what hit transient EROFS on
// .198 and aborted the apply mid-flight.
let ts = chrono::Utc::now().timestamp_millis();
let staging_new = format!("/opt/archipelago/web-ui.new.{}", ts);
let staging_old = format!("/opt/archipelago/web-ui.old.{}", ts);
let web_ui = "/opt/archipelago/web-ui";
let backup_path = "/opt/archipelago/web-ui.bak";
// All sudo calls that touch /opt/archipelago go through
// host_sudo so they see a normal root mount namespace.
let mk = host_sudo(&["mkdir", "-p", &staging_new])
.await
.context("Failed to create frontend staging dir")?;
if !mk.success() {
anyhow::bail!("mkdir {} failed", staging_new);
}
let extract = host_sudo(&[
"tar",
"-xzf",
&src.to_string_lossy(),
"-C",
&staging_new,
])
.await
.with_context(|| format!("Failed to extract {}", name))?;
if !extract.success() {
let _ = host_sudo(&["rm", "-rf", &staging_new]).await;
anyhow::bail!("tar extraction failed for {}", name);
}
let _ = host_sudo(&[
"chown",
"-R",
"archipelago:archipelago",
&staging_new,
])
.await;
// Preserve paths that are installed outside the Vue build
// (baked in by the ISO or sibling installers) and so
// aren't in the new tarball. Without this copy, every OTA
// wipes them — notably aiui/ (Claude Code sidebar) and
// the companion APK. `cp -a` preserves mode/ownership.
for preserved in ["aiui", "archipelago-companion.apk"] {
let src = format!("{}/{}", web_ui, preserved);
let dst = format!("{}/{}", staging_new, preserved);
// Only preserve the old copy if the new tarball
// doesn't already ship a fresher one.
if Path::new(&src).exists() && !Path::new(&dst).exists() {
let _ = host_sudo(&["cp", "-a", &src, &dst]).await;
}
}
// Swap: mv current web-ui aside, then mv new into place.
if Path::new(web_ui).exists() {
let mv_old = host_sudo(&["mv", web_ui, &staging_old])
.await
.context("Failed to rotate old web-ui")?;
if !mv_old.success() {
anyhow::bail!("failed to move old web-ui aside");
}
}
let mv_new = host_sudo(&["mv", &staging_new, web_ui])
.await
.context("Failed to swap new web-ui into place")?;
if !mv_new.success() {
if Path::new(&staging_old).exists() {
let _ = host_sudo(&["mv", &staging_old, web_ui]).await;
}
anyhow::bail!("failed to move new web-ui into place");
}
// Rotate previous rollback aside and install this apply's
// old copy as the new rollback.
if Path::new(&staging_old).exists() {
if Path::new(backup_path).exists() {
let _ = host_sudo(&[
"mv",
backup_path,
&format!("{}.{}", backup_path, ts),
])
.await;
}
let _ = host_sudo(&["mv", &staging_old, backup_path]).await;
}
info!(name = %name, "Frontend archive extracted to /opt/archipelago/web-ui");
}
_ => {
debug!(name = %name, "Unknown component, skipping");
}
}
}
// Update state
let mut state = load_state(data_dir).await?;
if let Some(manifest) = &state.available_update {
state.current_version = manifest.version.clone();
}
state.available_update = None;
state.update_in_progress = false;
state.rollback_available = true;
save_state(data_dir, &state).await?;
// Clean staging
let _ = fs::remove_dir_all(&staging_dir).await;
info!("Update applied — scheduling service restart in 2s so the RPC reply lands first");
// Restart asynchronously so the JSON-RPC response actually reaches the
// UI before systemd kills us. --no-block makes sure systemctl doesn't
// try to wait for the current service (us) to exit cleanly before
// starting the new process — it would deadlock otherwise.
tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
// systemctl talks to PID 1 over D-Bus — doesn't need the host
// mount namespace, but routing through host_sudo keeps the
// apply flow's sudo calls uniform.
let _ = host_sudo(&["systemctl", "--no-block", "restart", "archipelago"]).await;
});
Ok(())
}
/// Rollback to the previous version from backup.
pub async fn rollback_update(data_dir: &Path) -> Result<()> {
let backup_dir = data_dir.join("update-backup");
if !backup_dir.exists() {
anyhow::bail!("No rollback backup available");
}
let backup_binary = backup_dir.join("archipelago");
if backup_binary.exists() {
fs::copy(&backup_binary, "/usr/local/bin/archipelago")
.await
.context("Failed to restore backup binary")?;
info!("Binary rolled back to previous version");
}
let mut state = load_state(data_dir).await?;
state.rollback_available = false;
save_state(data_dir, &state).await?;
let _ = fs::remove_dir_all(&backup_dir).await;
info!("Rollback complete. Restart service to take effect.");
Ok(())
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DownloadProgress {
pub total_bytes: u64,
pub downloaded_bytes: u64,
pub components_downloaded: usize,
pub staging_dir: String,
}
/// Set the update schedule preference.
pub async fn set_schedule(data_dir: &Path, schedule: UpdateSchedule) -> Result<()> {
let mut state = load_state(data_dir).await?;
state.schedule = schedule;
save_state(data_dir, &state).await?;
info!(schedule = ?schedule, "Update schedule changed");
Ok(())
}
/// Get the current schedule.
pub async fn get_schedule(data_dir: &Path) -> Result<UpdateSchedule> {
let state = load_state(data_dir).await?;
Ok(state.schedule)
}
/// Background update scheduler. Runs in a loop, checking/applying based on schedule.
/// Call this once at startup via `tokio::spawn`.
pub async fn run_update_scheduler(data_dir: std::path::PathBuf) {
use tokio::time::{interval, Duration};
// Check every hour; act based on schedule setting
let mut tick = interval(Duration::from_secs(3600));
loop {
tick.tick().await;
let state = match load_state(&data_dir).await {
Ok(s) => s,
Err(e) => {
debug!("Update scheduler: failed to load state: {}", e);
continue;
}
};
match state.schedule {
UpdateSchedule::Manual => {
debug!("Update scheduler: manual mode, skipping");
continue;
}
UpdateSchedule::DailyCheck => {
// Only check once per day
if let Some(ref last) = state.last_check {
if let Ok(last_time) = chrono::DateTime::parse_from_rfc3339(last) {
let elapsed = chrono::Utc::now() - last_time.with_timezone(&chrono::Utc);
if elapsed.num_hours() < 24 {
debug!("Update scheduler: checked recently, skipping");
continue;
}
}
}
info!("Update scheduler: running daily check");
if let Err(e) = check_for_updates(&data_dir).await {
debug!("Update scheduler: check failed: {}", e);
}
}
UpdateSchedule::AutoApply => {
// Auto-apply: check, download, and apply during 3 AM window
let hour = chrono::Local::now().hour();
if hour != 3 {
// Still do daily check outside the window
if let Some(ref last) = state.last_check {
if let Ok(last_time) = chrono::DateTime::parse_from_rfc3339(last) {
let elapsed =
chrono::Utc::now() - last_time.with_timezone(&chrono::Utc);
if elapsed.num_hours() < 24 {
continue;
}
}
}
info!("Update scheduler: auto-apply check (outside window)");
if let Err(e) = check_for_updates(&data_dir).await {
debug!("Update scheduler: check failed: {}", e);
}
continue;
}
// 3 AM — check, download, and apply
info!("Update scheduler: 3 AM auto-apply window");
match check_for_updates(&data_dir).await {
Ok(s) if s.available_update.is_some() => {
info!("Update scheduler: downloading update");
if let Err(e) = download_update(&data_dir).await {
debug!("Update scheduler: download failed: {}", e);
continue;
}
info!("Update scheduler: applying update");
if let Err(e) = apply_update(&data_dir).await {
debug!("Update scheduler: apply failed: {}", e);
continue;
}
info!("Update scheduler: update applied, restart needed");
// Signal for service restart (systemd will handle via exit code)
std::process::exit(0);
}
Ok(_) => {
debug!("Update scheduler: no update available");
}
Err(e) => {
debug!("Update scheduler: check failed: {}", e);
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_update_schedule_default_is_daily_check() {
let schedule = UpdateSchedule::default();
assert_eq!(schedule, UpdateSchedule::DailyCheck);
}
#[test]
fn test_update_state_default_values() {
let state = UpdateState::default();
assert_eq!(state.current_version, env!("CARGO_PKG_VERSION"));
assert!(state.last_check.is_none());
assert!(state.available_update.is_none());
assert!(!state.update_in_progress);
assert!(!state.rollback_available);
assert_eq!(state.schedule, UpdateSchedule::DailyCheck);
}
#[test]
fn test_update_state_serialization_roundtrip() {
let state = UpdateState {
current_version: "0.2.0".to_string(),
last_check: Some("2025-01-01T00:00:00Z".to_string()),
available_update: None,
update_in_progress: false,
rollback_available: true,
schedule: UpdateSchedule::AutoApply,
};
let json = serde_json::to_string(&state).unwrap();
let deserialized: UpdateState = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.current_version, "0.2.0");
assert!(deserialized.rollback_available);
assert_eq!(deserialized.schedule, UpdateSchedule::AutoApply);
}
#[test]
fn test_update_schedule_serde_rename() {
let json = serde_json::to_string(&UpdateSchedule::DailyCheck).unwrap();
assert_eq!(json, "\"daily_check\"");
let json = serde_json::to_string(&UpdateSchedule::Manual).unwrap();
assert_eq!(json, "\"manual\"");
let json = serde_json::to_string(&UpdateSchedule::AutoApply).unwrap();
assert_eq!(json, "\"auto_apply\"");
}
#[test]
fn test_update_state_schedule_defaults_on_missing_field() {
// When schedule field is missing from JSON, it should default to DailyCheck
let json = r#"{
"current_version": "0.1.0",
"last_check": null,
"available_update": null,
"update_in_progress": false,
"rollback_available": false
}"#;
let state: UpdateState = serde_json::from_str(json).unwrap();
assert_eq!(state.schedule, UpdateSchedule::DailyCheck);
}
#[tokio::test]
async fn test_load_state_creates_default_when_missing() {
let dir = tempfile::tempdir().unwrap();
let state = load_state(dir.path()).await.unwrap();
assert_eq!(state.current_version, env!("CARGO_PKG_VERSION"));
assert!(!state.update_in_progress);
// File should now exist after load created the default
assert!(dir.path().join(UPDATE_STATE_FILE).exists());
}
#[tokio::test]
async fn test_save_and_load_state_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let state = UpdateState {
current_version: "1.0.0".to_string(),
last_check: Some("2025-06-15T12:00:00Z".to_string()),
available_update: Some(UpdateManifest {
version: "1.1.0".to_string(),
release_date: "2025-06-20".to_string(),
changelog: vec!["Fix bugs".to_string(), "New feature".to_string()],
components: vec![ComponentUpdate {
name: "archipelago".to_string(),
current_version: "1.0.0".to_string(),
new_version: "1.1.0".to_string(),
download_url: "https://example.com/binary".to_string(),
sha256: "abc123".to_string(),
size_bytes: 5000,
}],
}),
update_in_progress: true,
rollback_available: false,
schedule: UpdateSchedule::Manual,
};
save_state(dir.path(), &state).await.unwrap();
let loaded = load_state(dir.path()).await.unwrap();
assert_eq!(loaded.current_version, "1.0.0");
assert!(loaded.update_in_progress);
assert_eq!(loaded.schedule, UpdateSchedule::Manual);
let manifest = loaded.available_update.unwrap();
assert_eq!(manifest.version, "1.1.0");
assert_eq!(manifest.components.len(), 1);
assert_eq!(manifest.components[0].size_bytes, 5000);
}
#[tokio::test]
async fn test_dismiss_update_clears_available() {
let dir = tempfile::tempdir().unwrap();
let state = UpdateState {
available_update: Some(UpdateManifest {
version: "2.0.0".to_string(),
release_date: "2025-07-01".to_string(),
changelog: vec![],
components: vec![],
}),
..UpdateState::default()
};
save_state(dir.path(), &state).await.unwrap();
dismiss_update(dir.path()).await.unwrap();
let loaded = load_state(dir.path()).await.unwrap();
assert!(loaded.available_update.is_none());
}
#[tokio::test]
async fn test_set_and_get_schedule() {
let dir = tempfile::tempdir().unwrap();
// Initialize state
let _ = load_state(dir.path()).await.unwrap();
set_schedule(dir.path(), UpdateSchedule::AutoApply)
.await
.unwrap();
let schedule = get_schedule(dir.path()).await.unwrap();
assert_eq!(schedule, UpdateSchedule::AutoApply);
set_schedule(dir.path(), UpdateSchedule::Manual)
.await
.unwrap();
let schedule = get_schedule(dir.path()).await.unwrap();
assert_eq!(schedule, UpdateSchedule::Manual);
}
#[tokio::test]
async fn test_get_status_returns_current_state() {
let dir = tempfile::tempdir().unwrap();
let state = UpdateState {
current_version: "3.0.0".to_string(),
rollback_available: true,
..UpdateState::default()
};
save_state(dir.path(), &state).await.unwrap();
let status = get_status(dir.path()).await.unwrap();
assert_eq!(status.current_version, "3.0.0");
assert!(status.rollback_available);
}
}