feat(dht): Phase 2 — swarm-assist fetch seam, origin always wins
Lands the transport/swarm orchestration layer (the iroh engine attaches later, behind a flag). The seam is fully exercised today with the origin HTTP path; with no swarm providers registered the behaviour is byte-for-byte identical to before. - swarm/mod.rs: BlobProvider trait + fetch_content_addressed() — tries each provider in order, VERIFIES peer-sourced bytes against the content digest before accepting (untrusted seeds can't inject tampered bytes), falls back to the origin closure if none serve. Returns Swarm|Origin. - Cargo: iroh-swarm feature (off by default; heavy QUIC dep tree attaches here). providers() is empty until enabled → every fetch hits origin. - update.rs: components with a BLAKE3 digest route through the seam, using the existing resumable HTTP downloader as the origin fallback; a swarm hit is re-checked against the mandatory SHA-256 manifest gate (re-fetch from origin on any disagreement). Components without blake3 take the original path untouched. 44/44 swarm/update/content_hash/blobs tests pass (incl. swarm hit/miss, tampered-bytes-rejected→origin, fall-through ordering). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
f0cb91ed76
commit
2523c9e3dd
@ -9,6 +9,16 @@ authors = ["Archipelago Team"]
|
||||
name = "archipelago"
|
||||
path = "src/main.rs"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
# DHT Phase 2: iroh-blobs peer swarm engine. OFF by default — it pulls a heavy
|
||||
# QUIC dependency tree, so it ships behind a flag for PoC/measurement on a
|
||||
# scratch node before any fleet rollout. With the flag off, swarm::providers()
|
||||
# is empty and every fetch goes straight to the origin HTTP path (today's
|
||||
# behaviour). Attach the optional iroh / iroh-blobs deps to this feature when
|
||||
# wiring the IrohProvider.
|
||||
iroh-swarm = []
|
||||
|
||||
[dependencies]
|
||||
# Core dependencies
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
|
||||
@ -67,6 +67,7 @@ mod settings;
|
||||
mod state;
|
||||
mod storage_crypto;
|
||||
mod streaming;
|
||||
mod swarm;
|
||||
mod totp;
|
||||
mod transport;
|
||||
mod trust;
|
||||
|
||||
228
core/archipelago/src/swarm/mod.rs
Normal file
228
core/archipelago/src/swarm/mod.rs
Normal file
@ -0,0 +1,228 @@
|
||||
//! Swarm-assist content fetch — the *transport & swarm* tier of the DHT
|
||||
//! distribution plan (`docs/dht-distribution-design.md` §4).
|
||||
//!
|
||||
//! ## Guiding principle: swarm-assist, origin ALWAYS wins
|
||||
//! The peer swarm is an optimization layered *above* a proven HTTP path, never
|
||||
//! in place of it. A node asks each available [`BlobProvider`] (e.g. an
|
||||
//! iroh-blobs swarm) for content by its [`ContentDigest`]; the first peer that
|
||||
//! serves bytes which **verify** against the digest wins. If no provider has it
|
||||
//! — or the swarm is disabled, or every peer is offline — we fall back to the
|
||||
//! origin HTTP download, which is the guaranteed source of truth. Worst case is
|
||||
//! exactly today's behaviour.
|
||||
//!
|
||||
//! Peer-sourced bytes are UNTRUSTED, so this module verifies them against the
|
||||
//! content digest before accepting. Origin bytes run through the caller's
|
||||
//! existing verification (e.g. the SHA-256 gate in `update.rs`).
|
||||
//!
|
||||
//! The actual iroh-blobs provider is gated behind the `iroh-swarm` feature
|
||||
//! (heavy QUIC dep tree); with the feature off, [`providers`] is empty and
|
||||
//! every fetch goes straight to origin — byte-for-byte today's path.
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::content_hash::ContentDigest;
|
||||
|
||||
/// Which source ultimately served the content.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum FetchSource {
|
||||
/// A peer in the swarm served (and the bytes verified).
|
||||
Swarm,
|
||||
/// The origin HTTP fallback served.
|
||||
Origin,
|
||||
}
|
||||
|
||||
/// A source that may be able to serve content addressed by its digest.
|
||||
#[async_trait]
|
||||
pub trait BlobProvider: Send + Sync {
|
||||
/// Short name for logging (e.g. "iroh").
|
||||
fn name(&self) -> &str;
|
||||
|
||||
/// Try to fetch the content for `digest` into `dest`.
|
||||
///
|
||||
/// * `Ok(true)` — bytes written to `dest` (caller verifies the digest).
|
||||
/// * `Ok(false)` — this provider does not have the content; try the next.
|
||||
/// * `Err(_)` — a transient failure; try the next provider.
|
||||
async fn try_fetch(&self, digest: &ContentDigest, dest: &Path) -> Result<bool>;
|
||||
}
|
||||
|
||||
/// The ordered list of swarm providers to consult before the origin.
|
||||
///
|
||||
/// Empty unless the `iroh-swarm` feature is enabled and a provider has been
|
||||
/// registered. Today it is always empty — the seam exists so wiring iroh is a
|
||||
/// localized change rather than a surgery through the download path.
|
||||
pub fn providers() -> Vec<Arc<dyn BlobProvider>> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
/// Fetch content-addressed bytes: swarm-assist, origin always wins.
|
||||
///
|
||||
/// Tries each provider in order; the first to write bytes that VERIFY against
|
||||
/// `digest` wins and returns [`FetchSource::Swarm`]. If none succeed, runs
|
||||
/// `origin` (the guaranteed HTTP fallback) and returns [`FetchSource::Origin`].
|
||||
/// A node that obtained bytes from the swarm has, by definition, a verified
|
||||
/// copy it can itself seed afterwards.
|
||||
pub async fn fetch_content_addressed<F, Fut>(
|
||||
digest: &ContentDigest,
|
||||
providers: &[Arc<dyn BlobProvider>],
|
||||
dest: &Path,
|
||||
origin: F,
|
||||
) -> Result<FetchSource>
|
||||
where
|
||||
F: FnOnce() -> Fut,
|
||||
Fut: std::future::Future<Output = Result<()>>,
|
||||
{
|
||||
for provider in providers {
|
||||
match provider.try_fetch(digest, dest).await {
|
||||
Ok(true) => match verify_dest(digest, dest).await {
|
||||
Ok(()) => {
|
||||
info!("swarm: {} served {} (verified)", provider.name(), digest);
|
||||
return Ok(FetchSource::Swarm);
|
||||
}
|
||||
Err(e) => {
|
||||
// A peer served bytes that don't match the digest — could be
|
||||
// corruption or a malicious seed. Discard and try the next
|
||||
// source; never let unverified peer bytes through.
|
||||
warn!(
|
||||
"swarm: {} served bytes failing verification for {}: {} — discarding",
|
||||
provider.name(),
|
||||
digest,
|
||||
e
|
||||
);
|
||||
let _ = tokio::fs::remove_file(dest).await;
|
||||
}
|
||||
},
|
||||
Ok(false) => debug!("swarm: {} does not have {}", provider.name(), digest),
|
||||
Err(e) => debug!("swarm: {} failed for {}: {}", provider.name(), digest, e),
|
||||
}
|
||||
}
|
||||
|
||||
debug!("swarm: no provider served {} — falling back to origin", digest);
|
||||
origin().await?;
|
||||
Ok(FetchSource::Origin)
|
||||
}
|
||||
|
||||
/// Read `dest` and verify it hashes to `digest`.
|
||||
async fn verify_dest(digest: &ContentDigest, dest: &Path) -> Result<()> {
|
||||
let bytes = tokio::fs::read(dest).await?;
|
||||
digest.verify(&bytes)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
fn digest_of(bytes: &[u8]) -> ContentDigest {
|
||||
ContentDigest::parse(&format!("blake3:{}", crate::content_hash::blake3_hex(bytes))).unwrap()
|
||||
}
|
||||
|
||||
/// Provider that writes a fixed payload (which may or may not match).
|
||||
struct FixedProvider {
|
||||
name: &'static str,
|
||||
payload: Option<Vec<u8>>,
|
||||
}
|
||||
#[async_trait]
|
||||
impl BlobProvider for FixedProvider {
|
||||
fn name(&self) -> &str {
|
||||
self.name
|
||||
}
|
||||
async fn try_fetch(&self, _d: &ContentDigest, dest: &Path) -> Result<bool> {
|
||||
match &self.payload {
|
||||
Some(p) => {
|
||||
tokio::fs::write(dest, p).await?;
|
||||
Ok(true)
|
||||
}
|
||||
None => Ok(false),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn arc(p: FixedProvider) -> Arc<dyn BlobProvider> {
|
||||
Arc::new(p)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn swarm_hit_verifies_and_skips_origin() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let dest = dir.path().join("out");
|
||||
let content = b"hello swarm".to_vec();
|
||||
let digest = digest_of(&content);
|
||||
let providers = vec![arc(FixedProvider {
|
||||
name: "good",
|
||||
payload: Some(content.clone()),
|
||||
})];
|
||||
let origin_ran = AtomicBool::new(false);
|
||||
let src = fetch_content_addressed(&digest, &providers, &dest, || async {
|
||||
origin_ran.store(true, Ordering::SeqCst);
|
||||
tokio::fs::write(&dest, b"from-origin").await?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(src, FetchSource::Swarm);
|
||||
assert!(!origin_ran.load(Ordering::SeqCst), "origin must not run on swarm hit");
|
||||
assert_eq!(tokio::fs::read(&dest).await.unwrap(), content);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn bad_swarm_bytes_are_discarded_and_origin_wins() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let dest = dir.path().join("out");
|
||||
let content = b"the real bytes".to_vec();
|
||||
let digest = digest_of(&content);
|
||||
// Provider claims a hit but serves tampered bytes.
|
||||
let providers = vec![arc(FixedProvider {
|
||||
name: "evil",
|
||||
payload: Some(b"TAMPERED".to_vec()),
|
||||
})];
|
||||
let src = fetch_content_addressed(&digest, &providers, &dest, || async {
|
||||
tokio::fs::write(&dest, &content).await?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(src, FetchSource::Origin, "tampered swarm bytes must not be accepted");
|
||||
assert_eq!(tokio::fs::read(&dest).await.unwrap(), content);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn no_providers_goes_straight_to_origin() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let dest = dir.path().join("out");
|
||||
let content = b"x".to_vec();
|
||||
let digest = digest_of(&content);
|
||||
let providers: Vec<Arc<dyn BlobProvider>> = vec![];
|
||||
let src = fetch_content_addressed(&digest, &providers, &dest, || async {
|
||||
tokio::fs::write(&dest, &content).await?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(src, FetchSource::Origin);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn falls_through_providers_in_order() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let dest = dir.path().join("out");
|
||||
let content = b"second wins".to_vec();
|
||||
let digest = digest_of(&content);
|
||||
let providers = vec![
|
||||
arc(FixedProvider { name: "miss", payload: None }),
|
||||
arc(FixedProvider { name: "hit", payload: Some(content.clone()) }),
|
||||
];
|
||||
let src = fetch_content_addressed(&digest, &providers, &dest, || async {
|
||||
tokio::fs::write(&dest, b"origin").await?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(src, FetchSource::Swarm);
|
||||
assert_eq!(tokio::fs::read(&dest).await.unwrap(), content);
|
||||
}
|
||||
}
|
||||
@ -807,7 +807,49 @@ pub async fn download_update(data_dir: &Path) -> Result<DownloadProgress> {
|
||||
}
|
||||
info!(name = %component.name, url = %component.download_url, "Downloading component");
|
||||
let dest = staging_dir.join(&component.name);
|
||||
download_component_resumable(&client, component, &dest, downloaded).await?;
|
||||
|
||||
// DHT Phase 2: when the manifest pins a BLAKE3 digest, route the fetch
|
||||
// through the swarm seam (swarm-assist, origin always wins). With no
|
||||
// providers registered (iroh-swarm feature off) this is identical to
|
||||
// calling the resumable HTTP origin directly — same bytes, now
|
||||
// content-addressed. A swarm hit is BLAKE3-verified inside the seam;
|
||||
// we still enforce the mandatory SHA-256 gate on peer bytes here and
|
||||
// re-fetch from origin if a (consistency-broken) peer slips through.
|
||||
let digest = component.blake3.as_deref().and_then(|b| {
|
||||
let s = b.trim();
|
||||
let normalized = if s.contains(':') {
|
||||
s.to_string()
|
||||
} else {
|
||||
format!("blake3:{s}")
|
||||
};
|
||||
crate::content_hash::ContentDigest::parse(&normalized).ok()
|
||||
});
|
||||
if let Some(digest) = digest {
|
||||
let client_ref = &client;
|
||||
let dest_ref = &dest;
|
||||
let source = crate::swarm::fetch_content_addressed(
|
||||
&digest,
|
||||
&crate::swarm::providers(),
|
||||
&dest,
|
||||
move || async move {
|
||||
download_component_resumable(client_ref, component, dest_ref, downloaded).await
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
if source == crate::swarm::FetchSource::Swarm {
|
||||
let bytes = tokio::fs::read(&dest).await?;
|
||||
if crate::content_hash::sha256_hex(&bytes) != component.sha256 {
|
||||
warn!(
|
||||
name = %component.name,
|
||||
"swarm bytes passed BLAKE3 but failed the SHA-256 manifest gate — re-fetching from origin"
|
||||
);
|
||||
let _ = tokio::fs::remove_file(&dest).await;
|
||||
download_component_resumable(&client, component, &dest, downloaded).await?;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
download_component_resumable(&client, component, &dest, downloaded).await?;
|
||||
}
|
||||
downloaded += component.size_bytes;
|
||||
DOWNLOAD_BYTES.store(downloaded, Ordering::Relaxed);
|
||||
info!(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user