From e456c9701bed4daa562eff6c442b1f0c9dcf62b6 Mon Sep 17 00:00:00 2001 From: archipelago Date: Wed, 17 Jun 2026 03:10:21 -0400 Subject: [PATCH] fix(peer-files): stream large cloud downloads + surface real errors (#30, #38) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Large peer downloads (~178MB) failed with a generic 'Operation failed', and the download path had three stacked problems: - The FIPS reqwest client used a hard-coded 20s total timeout regardless of the caller's .timeout(), so a big transfer over the mesh aborted at 20s before the Tor fallback could help. Honor the per-request timeout (client_with_timeout). - The peer-content proxy buffered the whole file into node memory via resp.bytes() before sending a byte, and capped the transfer at 60s. Stream the body through with hyper::Body::wrap_stream (constant memory) and raise the timeout to 900s; bump the nginx peer-content read timeout to match. - Free downloads pulled the file as base64 over RPC, doubling it in node memory and the browser — fatal for large files. Download free files by streaming from /api/peer-content straight to disk, after a 1-byte Range probe that surfaces the real reason (peer offline on mesh and Tor) instead of a generic failure. Paid downloads now return the real error through the {error} channel the UI already displays. Adds the reqwest 'stream' feature for bytes_stream(). Co-Authored-By: Claude Opus 4.8 (1M context) --- core/Cargo.lock | 17 +++++- core/archipelago/Cargo.toml | 2 +- core/archipelago/src/api/handler/proxy.rs | 14 ++++- core/archipelago/src/api/rpc/content.rs | 27 ++++++--- core/archipelago/src/bootstrap.rs | 2 +- core/archipelago/src/fips/dial.rs | 16 +++++- neode-ui/src/views/PeerFiles.vue | 69 ++++++++++++++++++----- 7 files changed, 118 insertions(+), 29 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index b1d65f8f..31557390 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -80,7 +80,7 @@ checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" [[package]] name = "archipelago" -version = "1.7.97-alpha" +version = "1.7.98-alpha" dependencies = [ "anyhow", "archipelago-container", @@ -2156,10 +2156,12 @@ dependencies = [ "tokio", "tokio-rustls 0.24.1", "tokio-socks", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 0.25.4", "winreg", @@ -3304,6 +3306,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.85" diff --git a/core/archipelago/Cargo.toml b/core/archipelago/Cargo.toml index 15a933c5..513ca7a8 100644 --- a/core/archipelago/Cargo.toml +++ b/core/archipelago/Cargo.toml @@ -64,7 +64,7 @@ serde_yaml = "0.9" # HTTP client (for LND REST proxy, Tor SOCKS for peer messaging) # Uses rustls-tls for cross-compilation (no OpenSSL dependency) -reqwest = { version = "0.11", default-features = false, features = ["json", "socks", "rustls-tls"] } +reqwest = { version = "0.11", default-features = false, features = ["json", "socks", "rustls-tls", "stream"] } # Nostr (node discovery + NIP-44 encrypted peer handshake) nostr-sdk = { version = "0.44", features = ["nip04", "nip44"] } diff --git a/core/archipelago/src/api/handler/proxy.rs b/core/archipelago/src/api/handler/proxy.rs index a3e9d1cb..466d9af7 100644 --- a/core/archipelago/src/api/handler/proxy.rs +++ b/core/archipelago/src/api/handler/proxy.rs @@ -227,9 +227,13 @@ impl ApiHandler { let fips_npub = crate::federation::fips_npub_for_onion(&self.config.data_dir, onion).await; let peer_path = format!("/content/{}", content_id); + // Generous overall timeout: this endpoint serves both seek/Range + // playback (small, finishes fast) and full-file downloads of large + // media (#38). 60s was too tight for a multi-hundred-MB transfer over + // Tor and aborted the download mid-stream. let mut req = crate::fips::dial::PeerRequest::new(fips_npub.as_deref(), onion, &peer_path) .service(crate::settings::transport::PeerService::PeerFiles) - .timeout(std::time::Duration::from_secs(60)); + .timeout(std::time::Duration::from_secs(900)); if let Some(r) = headers.get("range").and_then(|v| v.to_str().ok()) { req = req.header("Range", r.to_string()); } @@ -237,7 +241,6 @@ impl ApiHandler { Ok((resp, _transport)) => { let status = resp.status().as_u16(); let rh = resp.headers().clone(); - let bytes = resp.bytes().await.unwrap_or_default(); let mut builder = Response::builder() .status(status) .header("Accept-Ranges", "bytes"); @@ -246,8 +249,13 @@ impl ApiHandler { builder = builder.header(h, v); } } + // Stream the peer's body straight through instead of buffering + // the whole file into memory (#38). For a 178MB download the old + // `resp.bytes().await` allocated the entire file on the node + // before sending a byte; `wrap_stream` forwards chunks as they + // arrive, with constant memory. Ok(builder - .body(hyper::Body::from(bytes)) + .body(hyper::Body::wrap_stream(resp.bytes_stream())) .unwrap_or_else(|_| Response::new(hyper::Body::empty()))) } Err(e) => Ok(build_response( diff --git a/core/archipelago/src/api/rpc/content.rs b/core/archipelago/src/api/rpc/content.rs index 315bff7f..8b77fd44 100644 --- a/core/archipelago/src/api/rpc/content.rs +++ b/core/archipelago/src/api/rpc/content.rs @@ -379,15 +379,26 @@ impl RpcHandler { let fips_npub = crate::federation::fips_npub_for_onion(&self.config.data_dir, onion).await; let path = format!("/content/{}", content_id); + // Surface a real reason instead of the generic sanitized error (#30): + // the dial already tries FIPS/mesh then falls back to Tor, so a failure + // here means the peer is genuinely unreachable on both transports. let (response, transport) = - crate::fips::dial::PeerRequest::new(fips_npub.as_deref(), onion, &path) + match crate::fips::dial::PeerRequest::new(fips_npub.as_deref(), onion, &path) .service(crate::settings::transport::PeerService::PeerFiles) .header("X-Federation-DID", local_did) .header("X-Payment-Token", token_str) - .timeout(std::time::Duration::from_secs(120)) + .timeout(std::time::Duration::from_secs(900)) .send_get() .await - .context("Failed to connect to peer")?; + { + Ok(v) => v, + Err(e) => { + tracing::warn!("paid peer download dial failed for {}: {:#}", onion, e); + return Ok(serde_json::json!({ + "error": "Could not reach the peer over mesh or Tor — it may be offline. Please try again." + })); + } + }; // Record which transport actually reached the peer (B14). let _ = crate::federation::record_peer_transport( &self.config.data_dir, @@ -399,13 +410,15 @@ impl RpcHandler { if response.status() == reqwest::StatusCode::PAYMENT_REQUIRED { // Payment was rejected — token is spent but content not received - return Err(anyhow::anyhow!( - "Payment rejected by peer — token may have been insufficient or invalid" - )); + return Ok(serde_json::json!({ + "error": "Payment rejected by peer — the token may have been insufficient or invalid." + })); } if !response.status().is_success() { - return Err(anyhow::anyhow!("Peer returned: {}", response.status())); + return Ok(serde_json::json!({ + "error": format!("Peer returned an error ({}).", response.status()) + })); } let bytes = response diff --git a/core/archipelago/src/bootstrap.rs b/core/archipelago/src/bootstrap.rs index a39e8254..e42076d3 100644 --- a/core/archipelago/src/bootstrap.rs +++ b/core/archipelago/src/bootstrap.rs @@ -68,7 +68,7 @@ const NGINX_LND_PROXY_BLOCK: &str = "\n # LND REST proxy — backend handles /// and peer media won't play (B3). Forwards Cookie (session auth) + Range and /// disables buffering so streaming works. Kept in sync with the canonical /// block in image-recipe/configs/nginx-archipelago.conf. -const NGINX_PEER_CONTENT_BLOCK: &str = "\n # Peer content streaming proxy (B3) — Range-streams a peer's media file\n location /api/peer-content/ {\n proxy_pass http://127.0.0.1:5678;\n proxy_http_version 1.1;\n proxy_set_header Host $host;\n proxy_set_header Cookie $http_cookie;\n proxy_set_header Range $http_range;\n proxy_buffering off;\n proxy_connect_timeout 10s;\n proxy_read_timeout 120s;\n error_page 502 503 = @backend_unavailable;\n error_page 504 = @backend_timeout;\n }\n"; +const NGINX_PEER_CONTENT_BLOCK: &str = "\n # Peer content streaming proxy (B3) — Range-streams a peer's media file.\n # Long read timeout: this path also serves full-file downloads of large\n # media (#38), which can take minutes over Tor; 120s aborted them.\n location /api/peer-content/ {\n proxy_pass http://127.0.0.1:5678;\n proxy_http_version 1.1;\n proxy_set_header Host $host;\n proxy_set_header Cookie $http_cookie;\n proxy_set_header Range $http_range;\n proxy_buffering off;\n proxy_connect_timeout 10s;\n proxy_read_timeout 900s;\n error_page 502 503 = @backend_unavailable;\n error_page 504 = @backend_timeout;\n }\n"; /// B13 — Fedimint UI asset rewrite. Pre-fix nodes proxy /app/fedimint/ with only /// the nostr-provider injection (`sub_filter_once on`), so the UI's root-rooted diff --git a/core/archipelago/src/fips/dial.rs b/core/archipelago/src/fips/dial.rs index 3749e457..441e0632 100644 --- a/core/archipelago/src/fips/dial.rs +++ b/core/archipelago/src/fips/dial.rs @@ -99,8 +99,18 @@ pub async fn peer_base_url(npub: &str) -> Result { /// until the first packets flow), so a reachable-but-cold peer isn't abandoned /// to Tor prematurely. Reliability over latency — FIPS is the preferred path. pub fn client() -> reqwest::Client { + client_with_timeout(Duration::from_secs(20)) +} + +/// FIPS client with a caller-chosen overall request timeout. The static 20s +/// `client()` budget is fine for catalog browses and short calls, but a large +/// content download (#38) needs the per-request timeout the caller asked for — +/// otherwise a 178MB transfer is aborted at 20s and the whole download fails +/// before the Tor fallback ever gets a chance. The generous `connect_timeout` +/// is preserved so a cold hole-punched path still gets time to establish. +pub fn client_with_timeout(timeout: Duration) -> reqwest::Client { reqwest::Client::builder() - .timeout(Duration::from_secs(20)) + .timeout(timeout) .connect_timeout(Duration::from_secs(8)) .user_agent("archipelago-fips/1") .build() @@ -413,7 +423,7 @@ impl<'a> PeerRequest<'a> { } }; let url = format!("{}{}", base, self.path); - let c = client(); + let c = client_with_timeout(self.timeout); let mut rb = c.post(&url).json(body); for (k, v) in &self.headers { rb = rb.header(*k, v); @@ -446,7 +456,7 @@ impl<'a> PeerRequest<'a> { } }; let url = format!("{}{}", base, self.path); - let c = client(); + let c = client_with_timeout(self.timeout); let mut rb = c.get(&url); for (k, v) in &self.headers { rb = rb.header(*k, v); diff --git a/neode-ui/src/views/PeerFiles.vue b/neode-ui/src/views/PeerFiles.vue index 08c34242..5aad1ac2 100644 --- a/neode-ui/src/views/PeerFiles.vue +++ b/neode-ui/src/views/PeerFiles.vue @@ -530,21 +530,19 @@ async function downloadFile(item: CatalogItem) { purchaseError.value = `Payment failed: ${result.error}` } } else { - // Free / peers-only download - const result = await rpcClient.call<{ data?: string; error?: string; price_sats?: number }>({ - method: 'content.download-peer', - params: { onion, content_id: item.id }, - timeout: 120000, - }) - - if (result?.error === 'payment_required') { - purchaseError.value = `This content requires payment: ${result.price_sats ?? 0} sats` + // Free / peers-only download: stream straight from the Range-capable + // proxy (B3) to disk instead of pulling the whole file as a base64 blob + // over RPC. The old base64-over-RPC path buffered the entire file in node + // memory AND the browser, which failed outright for large files (#38). + // A tiny probe first surfaces the real server error (peer unreachable on + // mesh and Tor) instead of a generic failure (#30). + const streamUrl = `/api/peer-content/${encodeURIComponent(onion)}/${encodeURIComponent(item.id)}` + const probe = await probePeerContent(streamUrl) + if (probe !== true) { + purchaseError.value = probe return } - - if (result?.data) { - triggerDownload(result.data, item) - } + streamDownload(streamUrl, item) } } catch (e: unknown) { purchaseError.value = e instanceof Error ? e.message : 'Download failed' @@ -640,6 +638,51 @@ watch(videoPlayerUrl, (url) => { } }) +/** + * Probe the streaming proxy with a 1-byte Range request so we can report the + * real failure reason (peer offline on both mesh and Tor) before kicking off a + * browser-managed download that can't surface HTTP status. Returns `true` on + * success, or a human-readable error string. Aborts immediately on success so + * a large body is never drained. + */ +async function probePeerContent(url: string): Promise { + const controller = new AbortController() + try { + const res = await fetch(url, { + headers: { Range: 'bytes=0-0' }, + signal: controller.signal, + }) + if (res.ok || res.status === 206) { + controller.abort() + return true + } + let msg = `Peer returned an error (${res.status})` + try { + const body = await res.json() + if (body?.error) msg = String(body.error) + } catch { + // non-JSON error body — keep the status-based message + } + return msg + } catch (e) { + return e instanceof Error ? e.message : 'Could not reach the peer — it may be offline' + } +} + +/** + * Stream a free file to disk via the Range-capable proxy. The browser manages + * the transfer with constant memory (no base64, no in-memory Blob), so large + * files download reliably (#38). Same-origin, so the session cookie rides along. + */ +function streamDownload(url: string, item: CatalogItem) { + const a = document.createElement('a') + a.href = url + a.download = item.filename.split('/').pop() || item.filename + document.body.appendChild(a) + a.click() + a.remove() +} + function triggerDownload(base64Data: string, item: CatalogItem) { const blob = new Blob( [Uint8Array.from(atob(base64Data), c => c.charCodeAt(0))],