archy/core/archipelago/src/electrs_status.rs
Dorian 94f2de4a64 refactor: centralize constants, eliminate unwraps, remove dead code, resolve TODOs
- R13+R16: Replace .expect() with .context()? in main.rs and identity.rs
- R17+R18+R19: Fix unwrap() calls in helpers and js-engine
- R20+R21: Remove #[allow(dead_code)] annotations and delete truly dead code
- R22-R26: Create constants.rs module, replace 21 hardcoded values across 12 files
- R28+R29: LND/DWN timeouts already present — verified
- R30-R33: Remove TODO comments, implement marketplace payment check

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 01:54:35 +00:00

275 lines
9.1 KiB
Rust

//! ElectrumX sync status: fetches indexed height from Electrum RPC and network height from Bitcoin Core.
use anyhow::{Context, Result};
use serde::Serialize;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
const ELECTRUMX_HOST: &str = "127.0.0.1";
const ELECTRUMX_PORT: u16 = 50001;
const ELECTRUMX_DATA_DIR: &str = "/var/lib/archipelago/electrumx";
// Approximate final index size in bytes for mainnet (~130GB for ElectrumX full index as of 2026)
const ESTIMATED_FULL_INDEX_BYTES: f64 = 130_000_000_000.0;
/// Build Bitcoin RPC Basic auth header using shared credentials.
async fn bitcoin_rpc_auth() -> String {
let (user, pass) = crate::bitcoin_rpc::bitcoin_rpc_credentials().await;
use base64::Engine;
let encoded = base64::engine::general_purpose::STANDARD.encode(format!("{}:{}", user, pass));
format!("Basic {}", encoded)
}
#[derive(Debug, Serialize)]
pub struct ElectrsSyncStatus {
pub indexed_height: u64,
pub network_height: u64,
pub progress_pct: f64,
pub status: String,
pub error: Option<String>,
/// Index data size in human-readable format (e.g. "11.2 GB")
pub index_size: Option<String>,
/// Tor onion address for ElectrumX (if available)
pub tor_onion: Option<String>,
}
/// Get the total size of a directory in bytes.
async fn dir_size_bytes(path: &str) -> u64 {
let mut total: u64 = 0;
let mut entries = match tokio::fs::read_dir(path).await {
Ok(entries) => entries,
Err(_) => return 0,
};
while let Ok(Some(entry)) = entries.next_entry().await {
let entry_path = entry.path();
if entry_path.is_dir() {
total += Box::pin(dir_size_bytes(&entry_path.to_string_lossy())).await;
} else if let Ok(meta) = entry.metadata().await {
total += meta.len();
}
}
total
}
/// Format bytes as human-readable string.
fn format_bytes(bytes: u64) -> String {
if bytes >= 1_000_000_000 {
format!("{:.1} GB", bytes as f64 / 1_000_000_000.0)
} else if bytes >= 1_000_000 {
format!("{:.1} MB", bytes as f64 / 1_000_000.0)
} else {
format!("{} KB", bytes / 1_000)
}
}
/// Fetch ElectrumX indexed height via Electrum protocol (TCP JSON-RPC).
async fn electrumx_indexed_height() -> Result<u64> {
let timeout_duration = Duration::from_secs(5);
let stream = tokio::time::timeout(
timeout_duration,
TcpStream::connect((ELECTRUMX_HOST, ELECTRUMX_PORT)),
)
.await
.context("ElectrumX connection timed out")?
.context("Failed to connect to ElectrumX")?;
let (reader_half, mut writer_half) = tokio::io::split(stream);
// blockchain.headers.subscribe returns {"height": N, "hex": "..."}
let req = r#"{"id":1,"method":"blockchain.headers.subscribe","params":[]}
"#;
tokio::time::timeout(timeout_duration, writer_half.write_all(req.as_bytes()))
.await
.context("ElectrumX write timed out")?
.context("Failed to write to ElectrumX")?;
tokio::time::timeout(timeout_duration, writer_half.flush())
.await
.context("ElectrumX flush timed out")?
.context("Failed to flush ElectrumX stream")?;
let mut reader = BufReader::new(reader_half);
let mut line = String::new();
tokio::time::timeout(timeout_duration, reader.read_line(&mut line))
.await
.context("ElectrumX read timed out")?
.context("Failed to read from ElectrumX")?;
let line = line.trim();
if line.is_empty() {
anyhow::bail!("Empty response from ElectrumX");
}
let json: serde_json::Value = serde_json::from_str(line)?;
// blockchain.numblocks.subscribe returns result as number; headers.subscribe returns {block_height: N, hex: ...}
let height = json
.get("result")
.and_then(|r| r.as_u64())
.or_else(|| {
json.get("result")
.and_then(|r| r.get("block_height"))
.and_then(|h| h.as_u64())
})
.or_else(|| {
// ElectrumX returns {"result": {"height": N, "hex": "..."}}
json.get("result")
.and_then(|r| r.get("height"))
.and_then(|h| h.as_u64())
})
.context("Missing height in ElectrumX response")?;
Ok(height)
}
/// Fetch Bitcoin network height via JSON-RPC.
async fn bitcoin_network_height() -> Result<u64> {
let client = reqwest::Client::new();
let body = serde_json::json!({
"jsonrpc": "1.0",
"id": "electrs-status",
"method": "getblockcount",
"params": []
});
let resp = client
.post(crate::constants::BITCOIN_RPC_URL)
.header("Content-Type", "application/json")
.header("Authorization", bitcoin_rpc_auth().await)
.body(body.to_string())
.send()
.await
.context("Bitcoin RPC request failed")?;
if !resp.status().is_success() {
anyhow::bail!("Bitcoin RPC returned {}", resp.status());
}
let json: serde_json::Value = resp.json().await?;
let height = json
.get("result")
.and_then(|r| r.as_u64())
.context("Missing result in Bitcoin RPC")?;
Ok(height)
}
/// Get ElectrumX sync status.
pub async fn get_electrs_sync_status() -> ElectrsSyncStatus {
// Get index data size
let data_bytes = dir_size_bytes(ELECTRUMX_DATA_DIR).await;
let index_size = if data_bytes > 0 {
Some(format_bytes(data_bytes))
} else {
None
};
// Read Tor onion address — check system Tor path first, then legacy
let tor_onion = {
let mut onion = None;
for path in &[
"/var/lib/archipelago/tor-hostnames/electrs",
"/var/lib/tor/hidden_service_electrs/hostname",
"/var/lib/archipelago/tor/hidden_service_electrs/hostname",
] {
if let Ok(addr) = tokio::fs::read_to_string(path).await {
let addr = addr.trim().to_string();
if addr.ends_with(".onion") {
onion = Some(addr);
break;
}
}
if let Ok(output) = tokio::process::Command::new("sudo")
.args(["cat", path])
.output()
.await
{
if output.status.success() {
let addr = String::from_utf8_lossy(&output.stdout).trim().to_string();
if addr.ends_with(".onion") {
onion = Some(addr);
break;
}
}
}
}
onion
};
let network_height = match bitcoin_network_height().await {
Ok(h) => h,
Err(e) => {
return ElectrsSyncStatus {
indexed_height: 0,
network_height: 0,
progress_pct: 0.0,
status: "error".to_string(),
error: Some(format!("Bitcoin RPC: {}", e)),
index_size,
tor_onion,
};
}
};
let indexed_height = match electrumx_indexed_height().await {
Ok(h) => h,
Err(e) => {
// ElectrumX may not be ready on 50001 during initial sync
let err_msg = e.to_string();
let err_lower = err_msg.to_lowercase();
let (status, error) = if err_lower.contains("connect") || err_lower.contains("reset") || err_lower.contains("refused") || err_lower.contains("timed out") {
// Estimate progress from data directory size
let _est_pct = if data_bytes > 0 {
((data_bytes as f64 / ESTIMATED_FULL_INDEX_BYTES) * 100.0).min(99.0)
} else {
0.0
};
let size_str = index_size.clone().unwrap_or_else(|| "0 MB".to_string());
(
"indexing".to_string(),
Some(format!(
"Building index ({} / ~130 GB estimated). Electrum RPC will be available when complete.",
size_str
)),
)
} else {
("error".to_string(), Some(format!("ElectrumX: {}", e)))
};
// Use estimated progress when indexing
let progress_pct = if status == "indexing" && data_bytes > 0 {
((data_bytes as f64 / ESTIMATED_FULL_INDEX_BYTES) * 100.0).min(99.0)
} else {
0.0
};
return ElectrsSyncStatus {
indexed_height: 0,
network_height,
progress_pct,
status,
error,
index_size,
tor_onion: tor_onion.clone(),
};
}
};
let progress_pct = if network_height > 0 {
(indexed_height as f64 / network_height as f64) * 100.0
} else {
0.0
};
let status = if indexed_height >= network_height.saturating_sub(1) {
"synced"
} else {
"syncing"
};
ElectrsSyncStatus {
indexed_height,
network_height,
progress_pct,
status: status.to_string(),
error: None,
index_size,
tor_onion,
}
}