archy/core/archipelago/src/electrs_status.rs
2026-03-17 00:03:08 +00:00

248 lines
8.2 KiB
Rust

//! ElectrumX sync status: fetches indexed height from Electrum RPC and network height from Bitcoin Core.
use anyhow::{Context, Result};
use serde::Serialize;
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;
use std::time::Duration;
const ELECTRUMX_HOST: &str = "127.0.0.1";
const ELECTRUMX_PORT: u16 = 50001;
const BITCOIN_RPC_URL: &str = "http://127.0.0.1:8332/";
const ELECTRUMX_DATA_DIR: &str = "/var/lib/archipelago/electrumx";
// Approximate final index size in bytes for mainnet (~55GB for ElectrumX full index)
const ESTIMATED_FULL_INDEX_BYTES: f64 = 55_000_000_000.0;
/// Build Bitcoin RPC Basic auth header from env vars.
/// Falls back to cookie auth file if env vars are not set.
fn bitcoin_rpc_auth() -> String {
let user = std::env::var("BITCOIN_RPC_USER").unwrap_or_else(|_| "archipelago".to_string());
let pass = std::env::var("BITCOIN_RPC_PASSWORD").unwrap_or_else(|_| "archipelago123".to_string());
use base64::Engine;
let encoded = base64::engine::general_purpose::STANDARD.encode(format!("{}:{}", user, pass));
format!("Basic {}", encoded)
}
#[derive(Debug, Serialize)]
pub struct ElectrsSyncStatus {
pub indexed_height: u64,
pub network_height: u64,
pub progress_pct: f64,
pub status: String,
pub error: Option<String>,
/// Index data size in human-readable format (e.g. "11.2 GB")
pub index_size: Option<String>,
/// Tor onion address for ElectrumX (if available)
pub tor_onion: Option<String>,
}
/// Get the total size of a directory in bytes.
fn dir_size_bytes(path: &str) -> u64 {
let mut total: u64 = 0;
if let Ok(entries) = std::fs::read_dir(path) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
total += dir_size_bytes(&path.to_string_lossy());
} else if let Ok(meta) = entry.metadata() {
total += meta.len();
}
}
}
total
}
/// Format bytes as human-readable string.
fn format_bytes(bytes: u64) -> String {
if bytes >= 1_000_000_000 {
format!("{:.1} GB", bytes as f64 / 1_000_000_000.0)
} else if bytes >= 1_000_000 {
format!("{:.1} MB", bytes as f64 / 1_000_000.0)
} else {
format!("{} KB", bytes / 1_000)
}
}
/// Fetch ElectrumX indexed height via Electrum protocol (TCP JSON-RPC).
fn electrumx_indexed_height() -> Result<u64> {
let mut stream = TcpStream::connect((ELECTRUMX_HOST, ELECTRUMX_PORT))
.context("Failed to connect to ElectrumX")?;
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.context("set_read_timeout")?;
stream
.set_write_timeout(Some(Duration::from_secs(5)))
.context("set_write_timeout")?;
// blockchain.numblocks.subscribe returns current block height directly
let req = r#"{"id":1,"method":"blockchain.numblocks.subscribe","params":[]}
"#;
stream.write_all(req.as_bytes())?;
stream.flush()?;
let mut reader = BufReader::new(stream);
let mut line = String::new();
reader.read_line(&mut line)?;
let line = line.trim();
if line.is_empty() {
anyhow::bail!("Empty response from ElectrumX");
}
let json: serde_json::Value = serde_json::from_str(line)?;
// blockchain.numblocks.subscribe returns result as number; headers.subscribe returns {block_height: N, hex: ...}
let height = json
.get("result")
.and_then(|r| r.as_u64())
.or_else(|| {
json.get("result")
.and_then(|r| r.get("block_height"))
.and_then(|h| h.as_u64())
})
.or_else(|| {
// ElectrumX returns {"result": {"height": N, "hex": "..."}}
json.get("result")
.and_then(|r| r.get("height"))
.and_then(|h| h.as_u64())
})
.context("Missing height in ElectrumX response")?;
Ok(height)
}
/// Fetch Bitcoin network height via JSON-RPC.
async fn bitcoin_network_height() -> Result<u64> {
let client = reqwest::Client::new();
let body = serde_json::json!({
"jsonrpc": "1.0",
"id": "electrs-status",
"method": "getblockcount",
"params": []
});
let resp = client
.post(BITCOIN_RPC_URL)
.header("Content-Type", "application/json")
.header("Authorization", bitcoin_rpc_auth())
.body(body.to_string())
.send()
.await
.context("Bitcoin RPC request failed")?;
if !resp.status().is_success() {
anyhow::bail!("Bitcoin RPC returned {}", resp.status());
}
let json: serde_json::Value = resp.json().await?;
let height = json
.get("result")
.and_then(|r| r.as_u64())
.context("Missing result in Bitcoin RPC")?;
Ok(height)
}
/// Get ElectrumX sync status. Runs blocking ElectrumX call in spawn_blocking.
pub async fn get_electrs_sync_status() -> ElectrsSyncStatus {
// Get index data size (non-blocking, fast filesystem stat)
let data_bytes = dir_size_bytes(ELECTRUMX_DATA_DIR);
let index_size = if data_bytes > 0 {
Some(format_bytes(data_bytes))
} else {
None
};
// Read Tor onion address if available
let tor_onion = tokio::fs::read_to_string(
"/var/lib/archipelago/tor/hidden_service_electrs/hostname",
)
.await
.ok()
.map(|s| s.trim().to_string());
let network_height = match bitcoin_network_height().await {
Ok(h) => h,
Err(e) => {
return ElectrsSyncStatus {
indexed_height: 0,
network_height: 0,
progress_pct: 0.0,
status: "error".to_string(),
error: Some(format!("Bitcoin RPC: {}", e)),
index_size,
tor_onion,
};
}
};
let indexed_height = match tokio::task::spawn_blocking(electrumx_indexed_height).await {
Ok(Ok(h)) => h,
Ok(Err(e)) => {
// ElectrumX may not be ready on 50001 during initial sync
let err_msg = e.to_string();
let (status, error) = if err_msg.contains("connect") || err_msg.contains("Connection refused") {
// Estimate progress from data directory size
let _est_pct = if data_bytes > 0 {
((data_bytes as f64 / ESTIMATED_FULL_INDEX_BYTES) * 100.0).min(99.0)
} else {
0.0
};
let size_str = index_size.clone().unwrap_or_else(|| "0 MB".to_string());
(
"indexing".to_string(),
Some(format!(
"Building index ({} / ~55 GB estimated). Electrum RPC will be available when complete.",
size_str
)),
)
} else {
("error".to_string(), Some(format!("ElectrumX: {}", e)))
};
// Use estimated progress when indexing
let progress_pct = if status == "indexing" && data_bytes > 0 {
((data_bytes as f64 / ESTIMATED_FULL_INDEX_BYTES) * 100.0).min(99.0)
} else {
0.0
};
return ElectrsSyncStatus {
indexed_height: 0,
network_height,
progress_pct,
status,
error,
index_size,
tor_onion: tor_onion.clone(),
};
}
Err(e) => {
return ElectrsSyncStatus {
indexed_height: 0,
network_height,
progress_pct: 0.0,
status: "error".to_string(),
error: Some(format!("Task: {}", e)),
index_size,
tor_onion: tor_onion.clone(),
};
}
};
let progress_pct = if network_height > 0 {
(indexed_height as f64 / network_height as f64) * 100.0
} else {
0.0
};
let status = if indexed_height >= network_height.saturating_sub(1) {
"synced"
} else {
"syncing"
};
ElectrsSyncStatus {
indexed_height,
network_height,
progress_pct,
status: status.to_string(),
error: None,
index_size,
tor_onion,
}
}