From be3ebd7fe0807d83000adb98ceedebb24d1991af Mon Sep 17 00:00:00 2001 From: archipelago Date: Wed, 17 Jun 2026 04:47:18 -0400 Subject: [PATCH] feat(dht): Phase 3 discovery glue + paid swarm serving Phase 3 wiring (task #12): - NostrSeedDiscovery: async ProviderDiscovery that queries relays for signed seed adverts and parses endpoint ids (swarm/iroh_provider.rs, seed_advert.rs). - seed_and_advertise publish path; dep-free fetch/publish helpers reuse the node's Nostr identity (build_nostr_client/load_or_create_nostr_keys made pub(crate)). - swarm::init builds the IrohProvider once into a OnceLock runtime; providers() returns it; announce_held_blob() is called from update.rs after a release component passes both hash gates. - config swarm_enabled (ARCHIPELAGO_SWARM_ENABLED, default off); server.rs init. Paid swarm serving (Phase 4 step F): - swarm/paid.rs gates the iroh-blobs provider through streaming::gate, intercepting connect + GET (peer push hard-disabled). Free by default (content-download service disabled); denies unpaid peers when enabled; fails open on internal error so a payment fault never blocks distribution. Wired into IrohProvider::new. All iroh code behind the iroh-swarm feature; the default build is inert. Default build clean; --features iroh-swarm: 11/11 swarm tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- core/archipelago/src/config.rs | 14 ++ core/archipelago/src/nostr_discovery.rs | 4 +- core/archipelago/src/server.rs | 14 ++ core/archipelago/src/swarm/iroh_provider.rs | 102 +++++++++- core/archipelago/src/swarm/mod.rs | 131 ++++++++++++- core/archipelago/src/swarm/paid.rs | 194 ++++++++++++++++++++ core/archipelago/src/swarm/seed_advert.rs | 91 ++++++++- core/archipelago/src/update.rs | 4 + 8 files changed, 540 insertions(+), 14 deletions(-) create mode 100644 core/archipelago/src/swarm/paid.rs diff --git a/core/archipelago/src/config.rs b/core/archipelago/src/config.rs index 6ad480cb..1b19b1ff 100644 --- a/core/archipelago/src/config.rs +++ b/core/archipelago/src/config.rs @@ -70,6 +70,13 @@ pub struct Config { /// on .228 + .198. See `project_v1_7_52_phase3_quadlet_design`. #[serde(default)] pub use_quadlet_backends: bool, + /// DHT swarm-assist (Phase 3): when true AND the binary was built with the + /// `iroh-swarm` feature, stand up an iroh-blobs provider that fetches release + /// blobs peer-to-peer (origin always wins) and seeds them via signed Nostr + /// adverts. Off by default; with the feature absent this is inert. Reuses + /// `nostr_relays` + `nostr_tor_proxy` for discovery transport. + #[serde(default)] + pub swarm_enabled: bool, } impl Config { @@ -182,6 +189,12 @@ impl Config { config.nostr_tor_proxy = if s.is_empty() { None } else { Some(s) }; } + // DHT swarm-assist (Phase 3). Opt-in: only takes effect when the binary + // was also built with the `iroh-swarm` feature; otherwise inert. + if let Ok(v) = std::env::var("ARCHIPELAGO_SWARM_ENABLED") { + config.swarm_enabled = parse_truthy_env(&v); + } + // Phase 3.2 of v1.7.52. Truthy values (1, true, yes, on — case-insensitive) // route backend installs through the Quadlet path without requiring a // config.json edit + archipelago.service restart (which would trigger @@ -241,6 +254,7 @@ impl Default for Config { ], nostr_tor_proxy: Some("127.0.0.1:9050".into()), use_quadlet_backends: false, + swarm_enabled: false, } } } diff --git a/core/archipelago/src/nostr_discovery.rs b/core/archipelago/src/nostr_discovery.rs index dbd9873a..13a84753 100644 --- a/core/archipelago/src/nostr_discovery.rs +++ b/core/archipelago/src/nostr_discovery.rs @@ -27,7 +27,7 @@ const D_TAG: &str = "archipelago-node"; const LEGACY_RELAYS: &[&str] = &["wss://relay.damus.io", "wss://relay.nostr.info"]; /// Load or create Nostr keys (secp256k1) for node discovery. -async fn load_or_create_nostr_keys(identity_dir: &Path) -> Result { +pub(crate) async fn load_or_create_nostr_keys(identity_dir: &Path) -> Result { let secret_path = identity_dir.join(NOSTR_SECRET_FILE); let pub_path = identity_dir.join(NOSTR_PUB_FILE); @@ -78,7 +78,7 @@ async fn load_nostr_keys_if_exists(identity_dir: &Path) -> Result> /// Publish a replaceable event with empty content to overwrite/revoke previously published data. /// Uses NIP-33: same kind + d-tag + author = latest replaces. Sends to LEGACY_RELAYS only. /// Requires tor_proxy to avoid leaking IP to relay operators. -fn build_nostr_client(keys: Keys, tor_proxy: Option<&str>) -> Result { +pub(crate) fn build_nostr_client(keys: Keys, tor_proxy: Option<&str>) -> Result { let client = if let Some(proxy_str) = tor_proxy { let addr = parse_proxy_addr(proxy_str) .ok_or_else(|| anyhow::anyhow!("Invalid Nostr Tor proxy: {}", proxy_str))?; diff --git a/core/archipelago/src/server.rs b/core/archipelago/src/server.rs index 113eb891..dfdc0501 100644 --- a/core/archipelago/src/server.rs +++ b/core/archipelago/src/server.rs @@ -150,6 +150,20 @@ impl Server { } } + // DHT swarm-assist (Phase 3): build the iroh provider once at startup so + // release downloads can fetch from peers (origin always wins) and seed + // what they hold. Inert unless built with `iroh-swarm` AND swarm_enabled. + if let Err(e) = crate::swarm::init( + &config.data_dir, + &config.nostr_relays, + config.nostr_tor_proxy.as_deref(), + config.swarm_enabled, + ) + .await + { + tracing::warn!("Swarm init (non-fatal, falling back to origin-only): {}", e); + } + // Revoke any previously published Nostr data (runs before publish so revocation is not overwritten) let identity_dir = config.data_dir.join("identity"); let tor_proxy_revoke = config.nostr_tor_proxy.clone(); diff --git a/core/archipelago/src/swarm/iroh_provider.rs b/core/archipelago/src/swarm/iroh_provider.rs index 60b8418b..5dbc09ab 100644 --- a/core/archipelago/src/swarm/iroh_provider.rs +++ b/core/archipelago/src/swarm/iroh_provider.rs @@ -19,6 +19,7 @@ //! so enabling the feature is safe (never worse than today). use std::path::Path; +use std::str::FromStr; use std::sync::Arc; use anyhow::Result; @@ -31,11 +32,51 @@ use crate::content_hash::{ContentDigest, HashAlg}; /// Resolves which peers are believed to hold a given content hash. /// -/// Phase 3 (signed Nostr advertisement events) provides the production impl; -/// `None` discovery means "origin-only" — a safe default. +/// Phase 3 (signed Nostr advertisement events) provides the production impl +/// [`NostrSeedDiscovery`]; `None` discovery means "origin-only" — a safe +/// default. The query is async (it hits relays), so the trait is async. +#[async_trait] pub trait ProviderDiscovery: Send + Sync { /// Candidate seed endpoints for `hash` (may be empty). - fn providers_for(&self, hash: &Hash) -> Vec; + async fn providers_for(&self, hash: &Hash) -> Vec; +} + +/// Production [`ProviderDiscovery`]: reads signed seed advertisements from Nostr +/// relays and parses the advertised endpoint-id strings into [`EndpointId`]s. +/// +/// Unparseable ids are skipped (an advert from an incompatible/garbage peer must +/// not abort discovery). Reuses the node's existing relay list + Tor proxy. +pub struct NostrSeedDiscovery { + relays: Vec, + tor_proxy: Option, +} + +impl NostrSeedDiscovery { + pub fn new(relays: Vec, tor_proxy: Option) -> Self { + Self { relays, tor_proxy } + } +} + +#[async_trait] +impl ProviderDiscovery for NostrSeedDiscovery { + async fn providers_for(&self, hash: &Hash) -> Vec { + let hex = hash.to_hex(); + let ids = super::seed_advert::fetch_seed_endpoint_ids( + &self.relays, + self.tor_proxy.as_deref(), + &hex, + ) + .await; + ids.into_iter() + .filter_map(|s| match EndpointId::from_str(&s) { + Ok(id) => Some(id), + Err(e) => { + tracing::debug!("swarm: skipping unparseable seed endpoint id {s}: {e}"); + None + } + }) + .collect() + } } /// Fetches content-addressed blobs from the iroh swarm, and seeds what it has. @@ -68,7 +109,11 @@ impl IrohProvider { .map_err(|e| anyhow::anyhow!("bind iroh endpoint: {e}"))?; // Serve blobs: a node that fetches a blob can then seed it to others. - let blobs = BlobsProtocol::new(&store, None); + // The event sender gates each request through the ecash `streaming` layer + // — free by default, paid only if the operator priced `content-download` + // (Networking Profits → Settings). It also hard-disables peer writes. + let event_sender = super::paid::gated_event_sender(data_dir.to_path_buf(), (*store).clone()); + let blobs = BlobsProtocol::new(&store, Some(event_sender)); let router = Router::builder(endpoint.clone()) .accept(iroh_blobs::ALPN, blobs) .spawn(); @@ -85,6 +130,53 @@ impl IrohProvider { pub fn endpoint_id(&self) -> EndpointId { self.endpoint.id() } + + /// Import a held PUBLIC blob into the seed store and advertise it on Nostr so + /// other nodes can fetch it from us. Call this only for releases/catalog + /// content (the design's privacy scope) — never private user blobs. + /// + /// Importing makes us an actual seed: a node that downloaded a release from + /// the HTTP origin can now serve it to peers over iroh-blobs. The advert maps + /// `blake3_hex → this endpoint id`. Defensive check: the bytes we import must + /// hash to what we advertise, so a path/hash mismatch can never publish a lie. + pub async fn seed_and_advertise( + &self, + path: &Path, + blake3_hex: &str, + identity_dir: &Path, + relays: &[String], + tor_proxy: Option<&str>, + ) -> Result<()> { + let expected = { + let raw = hex::decode(blake3_hex).map_err(|e| anyhow::anyhow!("blake3 hex: {e}"))?; + let arr: [u8; 32] = raw + .as_slice() + .try_into() + .map_err(|_| anyhow::anyhow!("blake3 digest must be 32 bytes"))?; + Hash::from_bytes(arr) + }; + let info = self + .store + .blobs() + .add_path(path) + .await + .map_err(|e| anyhow::anyhow!("import blob into seed store: {e}"))?; + if info.hash != expected { + anyhow::bail!( + "imported blob hash {} != advertised {}", + info.hash.to_hex(), + blake3_hex + ); + } + super::seed_advert::publish_seed_advert( + identity_dir, + relays, + tor_proxy, + blake3_hex, + &self.endpoint_id().to_string(), + ) + .await + } } #[async_trait] @@ -108,7 +200,7 @@ impl BlobProvider for IrohProvider { // Who has it? Without discovery (Phase 3) this is empty → origin wins. let providers = match &self.discovery { - Some(d) => d.providers_for(&hash), + Some(d) => d.providers_for(&hash).await, None => Vec::new(), }; if providers.is_empty() { diff --git a/core/archipelago/src/swarm/mod.rs b/core/archipelago/src/swarm/mod.rs index 8abe1068..45f0ff83 100644 --- a/core/archipelago/src/swarm/mod.rs +++ b/core/archipelago/src/swarm/mod.rs @@ -19,7 +19,7 @@ //! every fetch goes straight to origin — byte-for-byte today's path. use std::path::Path; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use anyhow::Result; use async_trait::async_trait; @@ -32,6 +32,9 @@ pub mod seed_advert; #[cfg(feature = "iroh-swarm")] pub mod iroh_provider; +#[cfg(feature = "iroh-swarm")] +pub mod paid; + /// Which source ultimately served the content. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum FetchSource { @@ -55,13 +58,131 @@ pub trait BlobProvider: Send + Sync { async fn try_fetch(&self, digest: &ContentDigest, dest: &Path) -> Result; } +/// Process-wide swarm runtime, built once at startup by [`init`]. Holding the +/// providers here (rather than rebuilding per download) keeps the iroh endpoint +/// + blob store + protocol router alive for the life of the process, so a node +/// keeps *seeding* between downloads. Empty/inert unless the `iroh-swarm` +/// feature is built AND `swarm_enabled` is set. +struct SwarmRuntime { + providers: Vec>, + /// Context for announcing held public blobs; `None` when seeding is off. + #[cfg(feature = "iroh-swarm")] + announce: Option, +} + +#[cfg(feature = "iroh-swarm")] +struct AnnounceCtx { + iroh: Arc, + relays: Vec, + tor_proxy: Option, + identity_dir: std::path::PathBuf, +} + +static RUNTIME: OnceLock = OnceLock::new(); + +/// Build the swarm runtime once, at startup. Idempotent: a second call is a +/// no-op (the first registration wins). Safe to call unconditionally — when the +/// `iroh-swarm` feature is absent, or `enabled` is false, it registers an empty +/// runtime so every fetch goes straight to origin (today's path). +/// +/// `relays` / `tor_proxy` come from the node's Nostr config and double as the +/// seed-advert transport; `data_dir` hosts the persistent iroh blob store under +/// `data_dir/iroh-blobs` and the node identity under `data_dir/identity`. +pub async fn init( + data_dir: &Path, + relays: &[String], + tor_proxy: Option<&str>, + enabled: bool, +) -> Result<()> { + if RUNTIME.get().is_some() { + return Ok(()); + } + + #[cfg(not(feature = "iroh-swarm"))] + { + let _ = (data_dir, relays, tor_proxy); + if enabled { + warn!("swarm: swarm_enabled set but binary built without the `iroh-swarm` feature — staying origin-only"); + } + let _ = RUNTIME.set(SwarmRuntime { providers: Vec::new() }); + return Ok(()); + } + + #[cfg(feature = "iroh-swarm")] + { + if !enabled { + info!("swarm: disabled (swarm_enabled=false) — origin-only"); + let _ = RUNTIME.set(SwarmRuntime { + providers: Vec::new(), + announce: None, + }); + return Ok(()); + } + + let discovery: Arc = + Arc::new(iroh_provider::NostrSeedDiscovery::new( + relays.to_vec(), + tor_proxy.map(str::to_string), + )); + let provider = + Arc::new(iroh_provider::IrohProvider::new(data_dir, Some(discovery)).await?); + info!( + "swarm: iroh provider active (endpoint {}) — swarm-assist enabled, origin always wins", + provider.endpoint_id() + ); + let providers: Vec> = vec![provider.clone()]; + let _ = RUNTIME.set(SwarmRuntime { + providers, + announce: Some(AnnounceCtx { + iroh: provider, + relays: relays.to_vec(), + tor_proxy: tor_proxy.map(str::to_string), + identity_dir: data_dir.join("identity"), + }), + }); + Ok(()) + } +} + /// The ordered list of swarm providers to consult before the origin. /// -/// Empty unless the `iroh-swarm` feature is enabled and a provider has been -/// registered. Today it is always empty — the seam exists so wiring iroh is a -/// localized change rather than a surgery through the download path. +/// Empty until [`init`] registers a provider (needs the `iroh-swarm` feature + +/// `swarm_enabled`). While empty, [`fetch_content_addressed`] goes straight to +/// origin — byte-for-byte today's path. pub fn providers() -> Vec> { - Vec::new() + RUNTIME + .get() + .map(|r| r.providers.clone()) + .unwrap_or_default() +} + +/// Announce that this node now holds a PUBLIC release/catalog blob (addressed by +/// `blake3_hex`, bytes at `path`) so peers can fetch it from us: import it into +/// the seed store and publish a signed Nostr advert. Best-effort and inert +/// unless the iroh provider is active — a failure never affects the install. +/// +/// **Scope:** call only for releases/catalog content, never private user blobs. +pub async fn announce_held_blob(_blake3_hex: &str, _path: &Path) { + #[cfg(feature = "iroh-swarm")] + { + let Some(rt) = RUNTIME.get() else { return }; + let Some(ctx) = rt.announce.as_ref() else { + return; + }; + if let Err(e) = ctx + .iroh + .seed_and_advertise( + _path, + _blake3_hex, + &ctx.identity_dir, + &ctx.relays, + ctx.tor_proxy.as_deref(), + ) + .await + { + warn!("swarm: failed to announce held blob {_blake3_hex}: {e}"); + } + } } /// Fetch content-addressed bytes: swarm-assist, origin always wins. diff --git a/core/archipelago/src/swarm/paid.rs b/core/archipelago/src/swarm/paid.rs new file mode 100644 index 00000000..e77af81d --- /dev/null +++ b/core/archipelago/src/swarm/paid.rs @@ -0,0 +1,194 @@ +//! Paid swarm serving — gate the iroh-blobs provider through the ecash +//! `streaming` payment layer (DHT distribution plan, Phase 4 step F). +//! +//! ## Free by default +//! Serving is **free unless the node operator turns it on** in +//! *Networking Profits → Settings* (which enables the `content-download` +//! streaming service). With that service disabled — the shipped default — +//! [`is_authorized`] returns `true` for everyone and behaviour is byte-for-byte +//! the old open seeder. When it is enabled, a peer must hold an active paid +//! session (opened out-of-band via the `streaming.pay` RPC with a Cashu token) +//! before the swarm will serve them; otherwise the request is refused and they +//! fall back to the HTTP origin. +//! +//! ## How it hooks in +//! iroh-blobs 0.103 lets a provider authorize each request: we pass an +//! [`EventSender`] (built here) to `BlobsProtocol::new`, set the [`EventMask`] +//! to intercept connections + GET requests, and answer each one with +//! `Ok(())` (serve) or `Err(AbortReason::Permission)` (refuse). Peer-initiated +//! writes (`push`) are hard-disabled so a peer can never mutate our store. +//! +//! Scope note: today every swarm blob is a public release/app component, so the +//! gate only ever charges if the operator explicitly priced `content-download`. +//! When IndeeHub films land on the same blob layer (Phase 4), they reuse this +//! exact path. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; + +use iroh::EndpointId; +use iroh_blobs::api::blobs::BlobStatus; +use iroh_blobs::api::Store; +use iroh_blobs::provider::events::{ + AbortReason, ConnectMode, EventMask, EventResult, EventSender, ObserveMode, ProviderMessage, + RequestMode, ThrottleMode, +}; +use iroh_blobs::Hash; + +use crate::streaming::gate::{self, GateResult}; + +/// The streaming pricing service that meters swarm blob serving. Enabling it in +/// the Settings UI is what flips swarm serving from free to paid. +const SERVICE_ID: &str = "content-download"; + +/// Build the gated [`EventSender`] for `BlobsProtocol` and spawn the task that +/// authorizes each blob GET through the ecash gate. +/// +/// `data_dir` locates the pricing/session state; `store` is cloned in to look up +/// blob sizes for metering. The spawned task lives as long as the provider keeps +/// the returned sender alive (i.e. the life of the node). +pub fn gated_event_sender(data_dir: PathBuf, store: Store) -> EventSender { + // Intercept connections + read requests so we can allow/deny per peer & hash. + // `push` (peer writes into our store) is hard-disabled. `throttle`/`observe` + // stay off — we meter coarsely at request time, not per 16 KiB chunk. + let mask = EventMask { + connected: ConnectMode::Intercept, + get: RequestMode::Intercept, + get_many: RequestMode::Intercept, + push: RequestMode::Disabled, + observe: ObserveMode::None, + throttle: ThrottleMode::None, + }; + let (sender, mut rx) = EventSender::channel(64, mask); + tokio::spawn(async move { + // connection_id → remote endpoint id, learned at ClientConnected and used + // to key the paying peer's streaming session on each request. + let mut peers: HashMap> = HashMap::new(); + while let Some(msg) = rx.recv().await { + match msg { + ProviderMessage::ClientConnected(m) => { + peers.insert(m.inner.connection_id, m.inner.endpoint_id); + // Accept the connection; gating happens per request. + let _ = m.tx.send(Ok(())).await; + } + ProviderMessage::ConnectionClosed(m) => { + peers.remove(&m.inner.connection_id); + } + ProviderMessage::GetRequestReceived(m) => { + let peer = peers.get(&m.inner.connection_id).copied().flatten(); + let hash = m.inner.request.hash; + let verdict = authorize(&data_dir, &store, peer, &hash).await; + let _ = m.tx.send(verdict).await; + } + ProviderMessage::GetManyRequestReceived(m) => { + let peer = peers.get(&m.inner.connection_id).copied().flatten(); + // A get-many is all-or-nothing here: authorize on the first hash. + let verdict = match m.inner.request.hashes.first().copied() { + Some(h) => authorize(&data_dir, &store, peer, &h).await, + None => Ok(()), + }; + let _ = m.tx.send(verdict).await; + } + ProviderMessage::PushRequestReceived(m) => { + // Disabled in the mask; refuse defensively if one ever arrives. + let _ = m.tx.send(Err(AbortReason::Permission)).await; + } + // Notify-only variants, observe and throttle: nothing to gate. + _ => {} + } + } + }); + sender +} + +/// Authorize one blob GET, returning the iroh [`EventResult`] +/// (`Ok(())` = serve, `Err(Permission)` = refuse). +async fn authorize( + data_dir: &Path, + store: &Store, + peer: Option, + hash: &Hash, +) -> EventResult { + // Cost = full blob size (coarse, request-time metering). If we don't hold the + // complete blob there's nothing to meter — let iroh serve what it can. + let size = match store.blobs().status(*hash).await { + Ok(BlobStatus::Complete { size }) => size, + _ => 0, + }; + let peer_id = peer + .map(|e| e.to_string()) + .unwrap_or_else(|| "anonymous".to_string()); + if is_authorized(data_dir, &peer_id, size).await { + Ok(()) + } else { + Err(AbortReason::Permission) + } +} + +/// Pure allow/deny decision (no iroh types) — unit-testable without a live node. +async fn is_authorized(data_dir: &Path, peer_id: &str, size: u64) -> bool { + match gate::check_gate(data_dir, peer_id, SERVICE_ID, None, size).await { + // Service disabled (the default) → free for everyone. Or the peer holds an + // active paid session with remaining allotment. + Ok(GateResult::ServiceUnavailable) + | Ok(GateResult::Allowed { .. }) + | Ok(GateResult::PaidAndAllowed { .. }) => true, + // Metered + no/exhausted session: the peer must pay out-of-band first + // (streaming.pay) before the swarm serves them — they fall back to origin. + Ok(_) => false, + // Never let a payment-layer fault break content distribution: fail OPEN + // (serve free) and log. Availability beats revenue when something breaks. + Err(e) => { + tracing::warn!("paid-gate: check errored ({e}); serving free"); + true + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::streaming::pricing::{self, Metric, PricingConfig, ServicePricing}; + + fn content_download(enabled: bool) -> PricingConfig { + PricingConfig { + services: vec![ServicePricing { + service_id: SERVICE_ID.to_string(), + name: "Content Downloads".to_string(), + metric: Metric::Bytes, + step_size: 1_048_576, + price_per_step: 1, + min_steps: 0, + enabled, + description: String::new(), + accepted_mints: vec![], + }], + } + } + + #[tokio::test] + async fn free_when_service_disabled_by_default() { + let dir = tempfile::tempdir().unwrap(); + // No pricing file → defaults → content-download disabled → free for all. + assert!(is_authorized(dir.path(), "peer-a", 1_000_000).await); + } + + #[tokio::test] + async fn free_when_service_explicitly_disabled() { + let dir = tempfile::tempdir().unwrap(); + pricing::save_pricing(dir.path(), &content_download(false)) + .await + .unwrap(); + assert!(is_authorized(dir.path(), "peer-a", 1_048_576).await); + } + + #[tokio::test] + async fn denied_when_metered_and_peer_has_not_paid() { + let dir = tempfile::tempdir().unwrap(); + pricing::save_pricing(dir.path(), &content_download(true)) + .await + .unwrap(); + // Enabled service + no session/token → the swarm refuses; peer uses origin. + assert!(!is_authorized(dir.path(), "peer-b", 1_048_576).await); + } +} diff --git a/core/archipelago/src/swarm/seed_advert.rs b/core/archipelago/src/swarm/seed_advert.rs index 0f425b7c..9f0c9d5a 100644 --- a/core/archipelago/src/swarm/seed_advert.rs +++ b/core/archipelago/src/swarm/seed_advert.rs @@ -21,9 +21,18 @@ // by unit tests — allow them to stand without a production caller. #![allow(dead_code)] -use nostr_sdk::{Event, EventBuilder, Filter, Kind, Tag}; +use std::path::Path; +use std::time::Duration; + +use nostr_sdk::{Event, EventBuilder, Filter, Keys, Kind, Tag}; use serde::{Deserialize, Serialize}; +/// How long to wait for relay connects / event fetches. Matches the rest of the +/// Nostr discovery path so the swarm never stalls the download longer than node +/// discovery already might. +const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); +const RELAY_FETCH_TIMEOUT: Duration = Duration::from_secs(15); + /// NIP-33 addressable kind for Archipelago seed advertisements. /// Distinct from the node-discovery app-data kind (30078). pub const ARCHIPELAGO_SEED_KIND: u16 = 30081; @@ -82,10 +91,88 @@ pub fn endpoint_ids_from_events<'a>(events: impl IntoIterator) out } +/// Query `relays` for the current seed advertisements for `blake3_hex` and +/// return the de-duplicated endpoint-id strings (opaque here; the `iroh-swarm` +/// glue parses them into `iroh::EndpointId`). +/// +/// Best-effort by design: an empty relay list, a connect timeout, or a fetch +/// failure all yield an empty list — never an error. The swarm seam treats "no +/// providers" as "use origin", so discovery problems can only ever degrade to +/// today's HTTP path, never block it. +pub async fn fetch_seed_endpoint_ids( + relays: &[String], + tor_proxy: Option<&str>, + blake3_hex: &str, +) -> Vec { + if relays.is_empty() { + return Vec::new(); + } + // Query anonymously — discovery reads public adverts and must not link the + // query back to this node's seed identity. + let anon = Keys::generate(); + let client = match crate::nostr_discovery::build_nostr_client(anon, tor_proxy) { + Ok(c) => c, + Err(e) => { + tracing::warn!("seed-advert: build relay client failed: {e}"); + return Vec::new(); + } + }; + for url in relays { + let _ = client.add_relay(url).await; + } + if tokio::time::timeout(RELAY_CONNECT_TIMEOUT, client.connect()) + .await + .is_err() + { + tracing::warn!("seed-advert: relay connect timed out, continuing anyway"); + } + let events = client + .fetch_events(advertisement_filter(blake3_hex), RELAY_FETCH_TIMEOUT) + .await + .map(|e| e.to_vec()) + .unwrap_or_default(); + client.disconnect().await; + endpoint_ids_from_events(events.iter()) +} + +/// Publish a signed advertisement — "this node can seed `blake3_hex` from +/// `endpoint_id`" — to `relays`, signed with the node's seed-derived Nostr key. +/// +/// **Caller must restrict this to PUBLIC releases/catalog blobs** (the design's +/// privacy scope, decided 2026-06-16) — never private user content. Best-effort: +/// relay failures are logged, not fatal, since seeding is an optimization. +pub async fn publish_seed_advert( + identity_dir: &Path, + relays: &[String], + tor_proxy: Option<&str>, + blake3_hex: &str, + endpoint_id: &str, +) -> anyhow::Result<()> { + if relays.is_empty() { + return Ok(()); + } + let keys = crate::nostr_discovery::load_or_create_nostr_keys(identity_dir).await?; + let client = crate::nostr_discovery::build_nostr_client(keys, tor_proxy)?; + for url in relays { + let _ = client.add_relay(url).await; + } + if tokio::time::timeout(RELAY_CONNECT_TIMEOUT, client.connect()) + .await + .is_err() + { + tracing::warn!("seed-advert: publish relay connect timed out, continuing anyway"); + } + let _ = client + .send_event_builder(advertisement_builder(blake3_hex, endpoint_id)) + .await; + client.disconnect().await; + tracing::info!("seed-advert: announced {blake3_hex} seedable from {endpoint_id}"); + Ok(()) +} + #[cfg(test)] mod tests { use super::*; - use nostr_sdk::Keys; #[test] fn build_sign_parse_roundtrip() { diff --git a/core/archipelago/src/update.rs b/core/archipelago/src/update.rs index 4b83594d..6f77618c 100644 --- a/core/archipelago/src/update.rs +++ b/core/archipelago/src/update.rs @@ -847,6 +847,10 @@ pub async fn download_update(data_dir: &Path) -> Result { download_component_resumable(&client, component, &dest, downloaded).await?; } } + // This is a PUBLIC release blob and it just passed both the BLAKE3 and + // SHA-256 gates — announce that we can now seed it to peers. Best-effort + // and inert unless the iroh swarm is active; never blocks the install. + crate::swarm::announce_held_blob(&digest.hex, &dest).await; } else { download_component_resumable(&client, component, &dest, downloaded).await?; }