fix(peer-files): stream large cloud downloads + surface real errors (#30, #38)

Large peer downloads (~178MB) failed with a generic 'Operation failed', and
the download path had three stacked problems:

- The FIPS reqwest client used a hard-coded 20s total timeout regardless of the
  caller's .timeout(), so a big transfer over the mesh aborted at 20s before
  the Tor fallback could help. Honor the per-request timeout (client_with_timeout).
- The peer-content proxy buffered the whole file into node memory via
  resp.bytes() before sending a byte, and capped the transfer at 60s. Stream
  the body through with hyper::Body::wrap_stream (constant memory) and raise the
  timeout to 900s; bump the nginx peer-content read timeout to match.
- Free downloads pulled the file as base64 over RPC, doubling it in node memory
  and the browser — fatal for large files. Download free files by streaming
  from /api/peer-content straight to disk, after a 1-byte Range probe that
  surfaces the real reason (peer offline on mesh and Tor) instead of a generic
  failure. Paid downloads now return the real error through the {error} channel
  the UI already displays.

Adds the reqwest 'stream' feature for bytes_stream().

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
archipelago 2026-06-17 03:10:21 -04:00
parent 3aea8c5bfa
commit e456c9701b
7 changed files with 118 additions and 29 deletions

17
core/Cargo.lock generated
View File

@ -80,7 +80,7 @@ checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61"
[[package]] [[package]]
name = "archipelago" name = "archipelago"
version = "1.7.97-alpha" version = "1.7.98-alpha"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"archipelago-container", "archipelago-container",
@ -2156,10 +2156,12 @@ dependencies = [
"tokio", "tokio",
"tokio-rustls 0.24.1", "tokio-rustls 0.24.1",
"tokio-socks", "tokio-socks",
"tokio-util",
"tower-service", "tower-service",
"url", "url",
"wasm-bindgen", "wasm-bindgen",
"wasm-bindgen-futures", "wasm-bindgen-futures",
"wasm-streams",
"web-sys", "web-sys",
"webpki-roots 0.25.4", "webpki-roots 0.25.4",
"winreg", "winreg",
@ -3304,6 +3306,19 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "wasm-streams"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]] [[package]]
name = "web-sys" name = "web-sys"
version = "0.3.85" version = "0.3.85"

View File

@ -64,7 +64,7 @@ serde_yaml = "0.9"
# HTTP client (for LND REST proxy, Tor SOCKS for peer messaging) # HTTP client (for LND REST proxy, Tor SOCKS for peer messaging)
# Uses rustls-tls for cross-compilation (no OpenSSL dependency) # Uses rustls-tls for cross-compilation (no OpenSSL dependency)
reqwest = { version = "0.11", default-features = false, features = ["json", "socks", "rustls-tls"] } reqwest = { version = "0.11", default-features = false, features = ["json", "socks", "rustls-tls", "stream"] }
# Nostr (node discovery + NIP-44 encrypted peer handshake) # Nostr (node discovery + NIP-44 encrypted peer handshake)
nostr-sdk = { version = "0.44", features = ["nip04", "nip44"] } nostr-sdk = { version = "0.44", features = ["nip04", "nip44"] }

View File

@ -227,9 +227,13 @@ impl ApiHandler {
let fips_npub = crate::federation::fips_npub_for_onion(&self.config.data_dir, onion).await; let fips_npub = crate::federation::fips_npub_for_onion(&self.config.data_dir, onion).await;
let peer_path = format!("/content/{}", content_id); let peer_path = format!("/content/{}", content_id);
// Generous overall timeout: this endpoint serves both seek/Range
// playback (small, finishes fast) and full-file downloads of large
// media (#38). 60s was too tight for a multi-hundred-MB transfer over
// Tor and aborted the download mid-stream.
let mut req = crate::fips::dial::PeerRequest::new(fips_npub.as_deref(), onion, &peer_path) let mut req = crate::fips::dial::PeerRequest::new(fips_npub.as_deref(), onion, &peer_path)
.service(crate::settings::transport::PeerService::PeerFiles) .service(crate::settings::transport::PeerService::PeerFiles)
.timeout(std::time::Duration::from_secs(60)); .timeout(std::time::Duration::from_secs(900));
if let Some(r) = headers.get("range").and_then(|v| v.to_str().ok()) { if let Some(r) = headers.get("range").and_then(|v| v.to_str().ok()) {
req = req.header("Range", r.to_string()); req = req.header("Range", r.to_string());
} }
@ -237,7 +241,6 @@ impl ApiHandler {
Ok((resp, _transport)) => { Ok((resp, _transport)) => {
let status = resp.status().as_u16(); let status = resp.status().as_u16();
let rh = resp.headers().clone(); let rh = resp.headers().clone();
let bytes = resp.bytes().await.unwrap_or_default();
let mut builder = Response::builder() let mut builder = Response::builder()
.status(status) .status(status)
.header("Accept-Ranges", "bytes"); .header("Accept-Ranges", "bytes");
@ -246,8 +249,13 @@ impl ApiHandler {
builder = builder.header(h, v); builder = builder.header(h, v);
} }
} }
// Stream the peer's body straight through instead of buffering
// the whole file into memory (#38). For a 178MB download the old
// `resp.bytes().await` allocated the entire file on the node
// before sending a byte; `wrap_stream` forwards chunks as they
// arrive, with constant memory.
Ok(builder Ok(builder
.body(hyper::Body::from(bytes)) .body(hyper::Body::wrap_stream(resp.bytes_stream()))
.unwrap_or_else(|_| Response::new(hyper::Body::empty()))) .unwrap_or_else(|_| Response::new(hyper::Body::empty())))
} }
Err(e) => Ok(build_response( Err(e) => Ok(build_response(

View File

@ -379,15 +379,26 @@ impl RpcHandler {
let fips_npub = crate::federation::fips_npub_for_onion(&self.config.data_dir, onion).await; let fips_npub = crate::federation::fips_npub_for_onion(&self.config.data_dir, onion).await;
let path = format!("/content/{}", content_id); let path = format!("/content/{}", content_id);
// Surface a real reason instead of the generic sanitized error (#30):
// the dial already tries FIPS/mesh then falls back to Tor, so a failure
// here means the peer is genuinely unreachable on both transports.
let (response, transport) = let (response, transport) =
crate::fips::dial::PeerRequest::new(fips_npub.as_deref(), onion, &path) match crate::fips::dial::PeerRequest::new(fips_npub.as_deref(), onion, &path)
.service(crate::settings::transport::PeerService::PeerFiles) .service(crate::settings::transport::PeerService::PeerFiles)
.header("X-Federation-DID", local_did) .header("X-Federation-DID", local_did)
.header("X-Payment-Token", token_str) .header("X-Payment-Token", token_str)
.timeout(std::time::Duration::from_secs(120)) .timeout(std::time::Duration::from_secs(900))
.send_get() .send_get()
.await .await
.context("Failed to connect to peer")?; {
Ok(v) => v,
Err(e) => {
tracing::warn!("paid peer download dial failed for {}: {:#}", onion, e);
return Ok(serde_json::json!({
"error": "Could not reach the peer over mesh or Tor — it may be offline. Please try again."
}));
}
};
// Record which transport actually reached the peer (B14). // Record which transport actually reached the peer (B14).
let _ = crate::federation::record_peer_transport( let _ = crate::federation::record_peer_transport(
&self.config.data_dir, &self.config.data_dir,
@ -399,13 +410,15 @@ impl RpcHandler {
if response.status() == reqwest::StatusCode::PAYMENT_REQUIRED { if response.status() == reqwest::StatusCode::PAYMENT_REQUIRED {
// Payment was rejected — token is spent but content not received // Payment was rejected — token is spent but content not received
return Err(anyhow::anyhow!( return Ok(serde_json::json!({
"Payment rejected by peer — token may have been insufficient or invalid" "error": "Payment rejected by peer — the token may have been insufficient or invalid."
)); }));
} }
if !response.status().is_success() { if !response.status().is_success() {
return Err(anyhow::anyhow!("Peer returned: {}", response.status())); return Ok(serde_json::json!({
"error": format!("Peer returned an error ({}).", response.status())
}));
} }
let bytes = response let bytes = response

View File

@ -68,7 +68,7 @@ const NGINX_LND_PROXY_BLOCK: &str = "\n # LND REST proxy — backend handles
/// and peer media won't play (B3). Forwards Cookie (session auth) + Range and /// and peer media won't play (B3). Forwards Cookie (session auth) + Range and
/// disables buffering so streaming works. Kept in sync with the canonical /// disables buffering so streaming works. Kept in sync with the canonical
/// block in image-recipe/configs/nginx-archipelago.conf. /// block in image-recipe/configs/nginx-archipelago.conf.
const NGINX_PEER_CONTENT_BLOCK: &str = "\n # Peer content streaming proxy (B3) — Range-streams a peer's media file\n location /api/peer-content/ {\n proxy_pass http://127.0.0.1:5678;\n proxy_http_version 1.1;\n proxy_set_header Host $host;\n proxy_set_header Cookie $http_cookie;\n proxy_set_header Range $http_range;\n proxy_buffering off;\n proxy_connect_timeout 10s;\n proxy_read_timeout 120s;\n error_page 502 503 = @backend_unavailable;\n error_page 504 = @backend_timeout;\n }\n"; const NGINX_PEER_CONTENT_BLOCK: &str = "\n # Peer content streaming proxy (B3) — Range-streams a peer's media file.\n # Long read timeout: this path also serves full-file downloads of large\n # media (#38), which can take minutes over Tor; 120s aborted them.\n location /api/peer-content/ {\n proxy_pass http://127.0.0.1:5678;\n proxy_http_version 1.1;\n proxy_set_header Host $host;\n proxy_set_header Cookie $http_cookie;\n proxy_set_header Range $http_range;\n proxy_buffering off;\n proxy_connect_timeout 10s;\n proxy_read_timeout 900s;\n error_page 502 503 = @backend_unavailable;\n error_page 504 = @backend_timeout;\n }\n";
/// B13 — Fedimint UI asset rewrite. Pre-fix nodes proxy /app/fedimint/ with only /// B13 — Fedimint UI asset rewrite. Pre-fix nodes proxy /app/fedimint/ with only
/// the nostr-provider injection (`sub_filter_once on`), so the UI's root-rooted /// the nostr-provider injection (`sub_filter_once on`), so the UI's root-rooted

View File

@ -99,8 +99,18 @@ pub async fn peer_base_url(npub: &str) -> Result<String> {
/// until the first packets flow), so a reachable-but-cold peer isn't abandoned /// until the first packets flow), so a reachable-but-cold peer isn't abandoned
/// to Tor prematurely. Reliability over latency — FIPS is the preferred path. /// to Tor prematurely. Reliability over latency — FIPS is the preferred path.
pub fn client() -> reqwest::Client { pub fn client() -> reqwest::Client {
client_with_timeout(Duration::from_secs(20))
}
/// FIPS client with a caller-chosen overall request timeout. The static 20s
/// `client()` budget is fine for catalog browses and short calls, but a large
/// content download (#38) needs the per-request timeout the caller asked for —
/// otherwise a 178MB transfer is aborted at 20s and the whole download fails
/// before the Tor fallback ever gets a chance. The generous `connect_timeout`
/// is preserved so a cold hole-punched path still gets time to establish.
pub fn client_with_timeout(timeout: Duration) -> reqwest::Client {
reqwest::Client::builder() reqwest::Client::builder()
.timeout(Duration::from_secs(20)) .timeout(timeout)
.connect_timeout(Duration::from_secs(8)) .connect_timeout(Duration::from_secs(8))
.user_agent("archipelago-fips/1") .user_agent("archipelago-fips/1")
.build() .build()
@ -413,7 +423,7 @@ impl<'a> PeerRequest<'a> {
} }
}; };
let url = format!("{}{}", base, self.path); let url = format!("{}{}", base, self.path);
let c = client(); let c = client_with_timeout(self.timeout);
let mut rb = c.post(&url).json(body); let mut rb = c.post(&url).json(body);
for (k, v) in &self.headers { for (k, v) in &self.headers {
rb = rb.header(*k, v); rb = rb.header(*k, v);
@ -446,7 +456,7 @@ impl<'a> PeerRequest<'a> {
} }
}; };
let url = format!("{}{}", base, self.path); let url = format!("{}{}", base, self.path);
let c = client(); let c = client_with_timeout(self.timeout);
let mut rb = c.get(&url); let mut rb = c.get(&url);
for (k, v) in &self.headers { for (k, v) in &self.headers {
rb = rb.header(*k, v); rb = rb.header(*k, v);

View File

@ -530,21 +530,19 @@ async function downloadFile(item: CatalogItem) {
purchaseError.value = `Payment failed: ${result.error}` purchaseError.value = `Payment failed: ${result.error}`
} }
} else { } else {
// Free / peers-only download // Free / peers-only download: stream straight from the Range-capable
const result = await rpcClient.call<{ data?: string; error?: string; price_sats?: number }>({ // proxy (B3) to disk instead of pulling the whole file as a base64 blob
method: 'content.download-peer', // over RPC. The old base64-over-RPC path buffered the entire file in node
params: { onion, content_id: item.id }, // memory AND the browser, which failed outright for large files (#38).
timeout: 120000, // A tiny probe first surfaces the real server error (peer unreachable on
}) // mesh and Tor) instead of a generic failure (#30).
const streamUrl = `/api/peer-content/${encodeURIComponent(onion)}/${encodeURIComponent(item.id)}`
if (result?.error === 'payment_required') { const probe = await probePeerContent(streamUrl)
purchaseError.value = `This content requires payment: ${result.price_sats ?? 0} sats` if (probe !== true) {
purchaseError.value = probe
return return
} }
streamDownload(streamUrl, item)
if (result?.data) {
triggerDownload(result.data, item)
}
} }
} catch (e: unknown) { } catch (e: unknown) {
purchaseError.value = e instanceof Error ? e.message : 'Download failed' purchaseError.value = e instanceof Error ? e.message : 'Download failed'
@ -640,6 +638,51 @@ watch(videoPlayerUrl, (url) => {
} }
}) })
/**
* Probe the streaming proxy with a 1-byte Range request so we can report the
* real failure reason (peer offline on both mesh and Tor) before kicking off a
* browser-managed download that can't surface HTTP status. Returns `true` on
* success, or a human-readable error string. Aborts immediately on success so
* a large body is never drained.
*/
async function probePeerContent(url: string): Promise<true | string> {
const controller = new AbortController()
try {
const res = await fetch(url, {
headers: { Range: 'bytes=0-0' },
signal: controller.signal,
})
if (res.ok || res.status === 206) {
controller.abort()
return true
}
let msg = `Peer returned an error (${res.status})`
try {
const body = await res.json()
if (body?.error) msg = String(body.error)
} catch {
// non-JSON error body keep the status-based message
}
return msg
} catch (e) {
return e instanceof Error ? e.message : 'Could not reach the peer — it may be offline'
}
}
/**
* Stream a free file to disk via the Range-capable proxy. The browser manages
* the transfer with constant memory (no base64, no in-memory Blob), so large
* files download reliably (#38). Same-origin, so the session cookie rides along.
*/
function streamDownload(url: string, item: CatalogItem) {
const a = document.createElement('a')
a.href = url
a.download = item.filename.split('/').pop() || item.filename
document.body.appendChild(a)
a.click()
a.remove()
}
function triggerDownload(base64Data: string, item: CatalogItem) { function triggerDownload(base64Data: string, item: CatalogItem) {
const blob = new Blob( const blob = new Blob(
[Uint8Array.from(atob(base64Data), c => c.charCodeAt(0))], [Uint8Array.from(atob(base64Data), c => c.charCodeAt(0))],