From 2523c9e3dd160d4b36a2c2ed3aa13364ed421b86 Mon Sep 17 00:00:00 2001 From: archipelago Date: Tue, 16 Jun 2026 13:38:19 -0400 Subject: [PATCH] =?UTF-8?q?feat(dht):=20Phase=202=20=E2=80=94=20swarm-assi?= =?UTF-8?q?st=20fetch=20seam,=20origin=20always=20wins?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lands the transport/swarm orchestration layer (the iroh engine attaches later, behind a flag). The seam is fully exercised today with the origin HTTP path; with no swarm providers registered the behaviour is byte-for-byte identical to before. - swarm/mod.rs: BlobProvider trait + fetch_content_addressed() — tries each provider in order, VERIFIES peer-sourced bytes against the content digest before accepting (untrusted seeds can't inject tampered bytes), falls back to the origin closure if none serve. Returns Swarm|Origin. - Cargo: iroh-swarm feature (off by default; heavy QUIC dep tree attaches here). providers() is empty until enabled → every fetch hits origin. - update.rs: components with a BLAKE3 digest route through the seam, using the existing resumable HTTP downloader as the origin fallback; a swarm hit is re-checked against the mandatory SHA-256 manifest gate (re-fetch from origin on any disagreement). Components without blake3 take the original path untouched. 44/44 swarm/update/content_hash/blobs tests pass (incl. swarm hit/miss, tampered-bytes-rejected→origin, fall-through ordering). Co-Authored-By: Claude Opus 4.8 (1M context) --- core/archipelago/Cargo.toml | 10 ++ core/archipelago/src/main.rs | 1 + core/archipelago/src/swarm/mod.rs | 228 ++++++++++++++++++++++++++++++ core/archipelago/src/update.rs | 44 +++++- 4 files changed, 282 insertions(+), 1 deletion(-) create mode 100644 core/archipelago/src/swarm/mod.rs diff --git a/core/archipelago/Cargo.toml b/core/archipelago/Cargo.toml index f8613089..4bf5e0b3 100644 --- a/core/archipelago/Cargo.toml +++ b/core/archipelago/Cargo.toml @@ -9,6 +9,16 @@ authors = ["Archipelago Team"] name = "archipelago" path = "src/main.rs" +[features] +default = [] +# DHT Phase 2: iroh-blobs peer swarm engine. OFF by default — it pulls a heavy +# QUIC dependency tree, so it ships behind a flag for PoC/measurement on a +# scratch node before any fleet rollout. With the flag off, swarm::providers() +# is empty and every fetch goes straight to the origin HTTP path (today's +# behaviour). Attach the optional iroh / iroh-blobs deps to this feature when +# wiring the IrohProvider. +iroh-swarm = [] + [dependencies] # Core dependencies tokio = { version = "1", features = ["full"] } diff --git a/core/archipelago/src/main.rs b/core/archipelago/src/main.rs index cbfdfdb5..20db5609 100644 --- a/core/archipelago/src/main.rs +++ b/core/archipelago/src/main.rs @@ -67,6 +67,7 @@ mod settings; mod state; mod storage_crypto; mod streaming; +mod swarm; mod totp; mod transport; mod trust; diff --git a/core/archipelago/src/swarm/mod.rs b/core/archipelago/src/swarm/mod.rs new file mode 100644 index 00000000..77f3ae8d --- /dev/null +++ b/core/archipelago/src/swarm/mod.rs @@ -0,0 +1,228 @@ +//! Swarm-assist content fetch — the *transport & swarm* tier of the DHT +//! distribution plan (`docs/dht-distribution-design.md` §4). +//! +//! ## Guiding principle: swarm-assist, origin ALWAYS wins +//! The peer swarm is an optimization layered *above* a proven HTTP path, never +//! in place of it. A node asks each available [`BlobProvider`] (e.g. an +//! iroh-blobs swarm) for content by its [`ContentDigest`]; the first peer that +//! serves bytes which **verify** against the digest wins. If no provider has it +//! — or the swarm is disabled, or every peer is offline — we fall back to the +//! origin HTTP download, which is the guaranteed source of truth. Worst case is +//! exactly today's behaviour. +//! +//! Peer-sourced bytes are UNTRUSTED, so this module verifies them against the +//! content digest before accepting. Origin bytes run through the caller's +//! existing verification (e.g. the SHA-256 gate in `update.rs`). +//! +//! The actual iroh-blobs provider is gated behind the `iroh-swarm` feature +//! (heavy QUIC dep tree); with the feature off, [`providers`] is empty and +//! every fetch goes straight to origin — byte-for-byte today's path. + +use std::path::Path; +use std::sync::Arc; + +use anyhow::Result; +use async_trait::async_trait; +use tracing::{debug, info, warn}; + +use crate::content_hash::ContentDigest; + +/// Which source ultimately served the content. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FetchSource { + /// A peer in the swarm served (and the bytes verified). + Swarm, + /// The origin HTTP fallback served. + Origin, +} + +/// A source that may be able to serve content addressed by its digest. +#[async_trait] +pub trait BlobProvider: Send + Sync { + /// Short name for logging (e.g. "iroh"). + fn name(&self) -> &str; + + /// Try to fetch the content for `digest` into `dest`. + /// + /// * `Ok(true)` — bytes written to `dest` (caller verifies the digest). + /// * `Ok(false)` — this provider does not have the content; try the next. + /// * `Err(_)` — a transient failure; try the next provider. + async fn try_fetch(&self, digest: &ContentDigest, dest: &Path) -> Result; +} + +/// The ordered list of swarm providers to consult before the origin. +/// +/// Empty unless the `iroh-swarm` feature is enabled and a provider has been +/// registered. Today it is always empty — the seam exists so wiring iroh is a +/// localized change rather than a surgery through the download path. +pub fn providers() -> Vec> { + Vec::new() +} + +/// Fetch content-addressed bytes: swarm-assist, origin always wins. +/// +/// Tries each provider in order; the first to write bytes that VERIFY against +/// `digest` wins and returns [`FetchSource::Swarm`]. If none succeed, runs +/// `origin` (the guaranteed HTTP fallback) and returns [`FetchSource::Origin`]. +/// A node that obtained bytes from the swarm has, by definition, a verified +/// copy it can itself seed afterwards. +pub async fn fetch_content_addressed( + digest: &ContentDigest, + providers: &[Arc], + dest: &Path, + origin: F, +) -> Result +where + F: FnOnce() -> Fut, + Fut: std::future::Future>, +{ + for provider in providers { + match provider.try_fetch(digest, dest).await { + Ok(true) => match verify_dest(digest, dest).await { + Ok(()) => { + info!("swarm: {} served {} (verified)", provider.name(), digest); + return Ok(FetchSource::Swarm); + } + Err(e) => { + // A peer served bytes that don't match the digest — could be + // corruption or a malicious seed. Discard and try the next + // source; never let unverified peer bytes through. + warn!( + "swarm: {} served bytes failing verification for {}: {} — discarding", + provider.name(), + digest, + e + ); + let _ = tokio::fs::remove_file(dest).await; + } + }, + Ok(false) => debug!("swarm: {} does not have {}", provider.name(), digest), + Err(e) => debug!("swarm: {} failed for {}: {}", provider.name(), digest, e), + } + } + + debug!("swarm: no provider served {} — falling back to origin", digest); + origin().await?; + Ok(FetchSource::Origin) +} + +/// Read `dest` and verify it hashes to `digest`. +async fn verify_dest(digest: &ContentDigest, dest: &Path) -> Result<()> { + let bytes = tokio::fs::read(dest).await?; + digest.verify(&bytes) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicBool, Ordering}; + + fn digest_of(bytes: &[u8]) -> ContentDigest { + ContentDigest::parse(&format!("blake3:{}", crate::content_hash::blake3_hex(bytes))).unwrap() + } + + /// Provider that writes a fixed payload (which may or may not match). + struct FixedProvider { + name: &'static str, + payload: Option>, + } + #[async_trait] + impl BlobProvider for FixedProvider { + fn name(&self) -> &str { + self.name + } + async fn try_fetch(&self, _d: &ContentDigest, dest: &Path) -> Result { + match &self.payload { + Some(p) => { + tokio::fs::write(dest, p).await?; + Ok(true) + } + None => Ok(false), + } + } + } + + fn arc(p: FixedProvider) -> Arc { + Arc::new(p) + } + + #[tokio::test] + async fn swarm_hit_verifies_and_skips_origin() { + let dir = tempfile::tempdir().unwrap(); + let dest = dir.path().join("out"); + let content = b"hello swarm".to_vec(); + let digest = digest_of(&content); + let providers = vec![arc(FixedProvider { + name: "good", + payload: Some(content.clone()), + })]; + let origin_ran = AtomicBool::new(false); + let src = fetch_content_addressed(&digest, &providers, &dest, || async { + origin_ran.store(true, Ordering::SeqCst); + tokio::fs::write(&dest, b"from-origin").await?; + Ok(()) + }) + .await + .unwrap(); + assert_eq!(src, FetchSource::Swarm); + assert!(!origin_ran.load(Ordering::SeqCst), "origin must not run on swarm hit"); + assert_eq!(tokio::fs::read(&dest).await.unwrap(), content); + } + + #[tokio::test] + async fn bad_swarm_bytes_are_discarded_and_origin_wins() { + let dir = tempfile::tempdir().unwrap(); + let dest = dir.path().join("out"); + let content = b"the real bytes".to_vec(); + let digest = digest_of(&content); + // Provider claims a hit but serves tampered bytes. + let providers = vec![arc(FixedProvider { + name: "evil", + payload: Some(b"TAMPERED".to_vec()), + })]; + let src = fetch_content_addressed(&digest, &providers, &dest, || async { + tokio::fs::write(&dest, &content).await?; + Ok(()) + }) + .await + .unwrap(); + assert_eq!(src, FetchSource::Origin, "tampered swarm bytes must not be accepted"); + assert_eq!(tokio::fs::read(&dest).await.unwrap(), content); + } + + #[tokio::test] + async fn no_providers_goes_straight_to_origin() { + let dir = tempfile::tempdir().unwrap(); + let dest = dir.path().join("out"); + let content = b"x".to_vec(); + let digest = digest_of(&content); + let providers: Vec> = vec![]; + let src = fetch_content_addressed(&digest, &providers, &dest, || async { + tokio::fs::write(&dest, &content).await?; + Ok(()) + }) + .await + .unwrap(); + assert_eq!(src, FetchSource::Origin); + } + + #[tokio::test] + async fn falls_through_providers_in_order() { + let dir = tempfile::tempdir().unwrap(); + let dest = dir.path().join("out"); + let content = b"second wins".to_vec(); + let digest = digest_of(&content); + let providers = vec![ + arc(FixedProvider { name: "miss", payload: None }), + arc(FixedProvider { name: "hit", payload: Some(content.clone()) }), + ]; + let src = fetch_content_addressed(&digest, &providers, &dest, || async { + tokio::fs::write(&dest, b"origin").await?; + Ok(()) + }) + .await + .unwrap(); + assert_eq!(src, FetchSource::Swarm); + assert_eq!(tokio::fs::read(&dest).await.unwrap(), content); + } +} diff --git a/core/archipelago/src/update.rs b/core/archipelago/src/update.rs index 0d88b8c0..4b83594d 100644 --- a/core/archipelago/src/update.rs +++ b/core/archipelago/src/update.rs @@ -807,7 +807,49 @@ pub async fn download_update(data_dir: &Path) -> Result { } info!(name = %component.name, url = %component.download_url, "Downloading component"); let dest = staging_dir.join(&component.name); - download_component_resumable(&client, component, &dest, downloaded).await?; + + // DHT Phase 2: when the manifest pins a BLAKE3 digest, route the fetch + // through the swarm seam (swarm-assist, origin always wins). With no + // providers registered (iroh-swarm feature off) this is identical to + // calling the resumable HTTP origin directly — same bytes, now + // content-addressed. A swarm hit is BLAKE3-verified inside the seam; + // we still enforce the mandatory SHA-256 gate on peer bytes here and + // re-fetch from origin if a (consistency-broken) peer slips through. + let digest = component.blake3.as_deref().and_then(|b| { + let s = b.trim(); + let normalized = if s.contains(':') { + s.to_string() + } else { + format!("blake3:{s}") + }; + crate::content_hash::ContentDigest::parse(&normalized).ok() + }); + if let Some(digest) = digest { + let client_ref = &client; + let dest_ref = &dest; + let source = crate::swarm::fetch_content_addressed( + &digest, + &crate::swarm::providers(), + &dest, + move || async move { + download_component_resumable(client_ref, component, dest_ref, downloaded).await + }, + ) + .await?; + if source == crate::swarm::FetchSource::Swarm { + let bytes = tokio::fs::read(&dest).await?; + if crate::content_hash::sha256_hex(&bytes) != component.sha256 { + warn!( + name = %component.name, + "swarm bytes passed BLAKE3 but failed the SHA-256 manifest gate — re-fetching from origin" + ); + let _ = tokio::fs::remove_file(&dest).await; + download_component_resumable(&client, component, &dest, downloaded).await?; + } + } + } else { + download_component_resumable(&client, component, &dest, downloaded).await?; + } downloaded += component.size_bytes; DOWNLOAD_BYTES.store(downloaded, Ordering::Relaxed); info!(