//! Electrs sync status: fetches indexed height from Electrum RPC and network height from Bitcoin Core. use anyhow::{Context, Result}; use serde::Serialize; use std::io::{BufRead, BufReader, Write}; use std::net::TcpStream; use std::time::Duration; const ELECTRS_HOST: &str = "127.0.0.1"; const ELECTRS_PORT: u16 = 50001; const BITCOIN_RPC_URL: &str = "http://127.0.0.1:8332/"; const ELECTRS_DATA_DIR: &str = "/var/lib/archipelago/mempool-electrs"; // Approximate final index size in bytes for mainnet with --lightmode (~35GB) const ESTIMATED_FULL_INDEX_BYTES: f64 = 35_000_000_000.0; /// Build Bitcoin RPC Basic auth header from env vars. /// Falls back to cookie auth file if env vars are not set. fn bitcoin_rpc_auth() -> String { let user = std::env::var("BITCOIN_RPC_USER").unwrap_or_else(|_| "archipelago".to_string()); let pass = std::env::var("BITCOIN_RPC_PASSWORD").unwrap_or_else(|_| "archipelago123".to_string()); use base64::Engine; let encoded = base64::engine::general_purpose::STANDARD.encode(format!("{}:{}", user, pass)); format!("Basic {}", encoded) } #[derive(Debug, Serialize)] pub struct ElectrsSyncStatus { pub indexed_height: u64, pub network_height: u64, pub progress_pct: f64, pub status: String, pub error: Option, /// Index data size in human-readable format (e.g. "11.2 GB") pub index_size: Option, } /// Get the total size of a directory in bytes. fn dir_size_bytes(path: &str) -> u64 { let mut total: u64 = 0; if let Ok(entries) = std::fs::read_dir(path) { for entry in entries.flatten() { let path = entry.path(); if path.is_dir() { total += dir_size_bytes(&path.to_string_lossy()); } else if let Ok(meta) = entry.metadata() { total += meta.len(); } } } total } /// Format bytes as human-readable string. fn format_bytes(bytes: u64) -> String { if bytes >= 1_000_000_000 { format!("{:.1} GB", bytes as f64 / 1_000_000_000.0) } else if bytes >= 1_000_000 { format!("{:.1} MB", bytes as f64 / 1_000_000.0) } else { format!("{} KB", bytes / 1_000) } } /// Fetch electrs indexed height via Electrum protocol (TCP JSON-RPC). fn electrs_indexed_height() -> Result { let mut stream = TcpStream::connect((ELECTRS_HOST, ELECTRS_PORT)) .context("Failed to connect to electrs")?; stream .set_read_timeout(Some(Duration::from_secs(5))) .context("set_read_timeout")?; stream .set_write_timeout(Some(Duration::from_secs(5))) .context("set_write_timeout")?; // blockchain.numblocks.subscribe returns current block height directly let req = r#"{"id":1,"method":"blockchain.numblocks.subscribe","params":[]} "#; stream.write_all(req.as_bytes())?; stream.flush()?; let mut reader = BufReader::new(stream); let mut line = String::new(); reader.read_line(&mut line)?; let line = line.trim(); if line.is_empty() { anyhow::bail!("Empty response from electrs"); } let json: serde_json::Value = serde_json::from_str(line)?; // blockchain.numblocks.subscribe returns result as number; headers.subscribe returns {block_height: N} let height = json .get("result") .and_then(|r| r.as_u64()) .or_else(|| { json.get("result") .and_then(|r| r.get("block_height")) .and_then(|h| h.as_u64()) }) .context("Missing height in electrs response")?; Ok(height) } /// Fetch Bitcoin network height via JSON-RPC. async fn bitcoin_network_height() -> Result { let client = reqwest::Client::new(); let body = serde_json::json!({ "jsonrpc": "1.0", "id": "electrs-status", "method": "getblockcount", "params": [] }); let resp = client .post(BITCOIN_RPC_URL) .header("Content-Type", "application/json") .header("Authorization", bitcoin_rpc_auth()) .body(body.to_string()) .send() .await .context("Bitcoin RPC request failed")?; if !resp.status().is_success() { anyhow::bail!("Bitcoin RPC returned {}", resp.status()); } let json: serde_json::Value = resp.json().await?; let height = json .get("result") .and_then(|r| r.as_u64()) .context("Missing result in Bitcoin RPC")?; Ok(height) } /// Get electrs sync status. Runs blocking electrs call in spawn_blocking. pub async fn get_electrs_sync_status() -> ElectrsSyncStatus { // Get index data size (non-blocking, fast filesystem stat) let data_bytes = dir_size_bytes(ELECTRS_DATA_DIR); let index_size = if data_bytes > 0 { Some(format_bytes(data_bytes)) } else { None }; let network_height = match bitcoin_network_height().await { Ok(h) => h, Err(e) => { return ElectrsSyncStatus { indexed_height: 0, network_height: 0, progress_pct: 0.0, status: "error".to_string(), error: Some(format!("Bitcoin RPC: {}", e)), index_size, }; } }; let indexed_height = match tokio::task::spawn_blocking(electrs_indexed_height).await { Ok(Ok(h)) => h, Ok(Err(e)) => { // Electrs doesn't listen on 50001 until indexing completes (can take hours) let err_msg = e.to_string(); let (status, error) = if err_msg.contains("connect") || err_msg.contains("Connection refused") { // Estimate progress from data directory size let est_pct = if data_bytes > 0 { ((data_bytes as f64 / ESTIMATED_FULL_INDEX_BYTES) * 100.0).min(99.0) } else { 0.0 }; let size_str = index_size.clone().unwrap_or_else(|| "0 MB".to_string()); ( "indexing".to_string(), Some(format!( "Building index ({} / ~35 GB estimated). Electrum RPC will be available when complete.", size_str )), ) } else { ("error".to_string(), Some(format!("Electrs: {}", e))) }; // Use estimated progress when indexing let progress_pct = if status == "indexing" && data_bytes > 0 { ((data_bytes as f64 / ESTIMATED_FULL_INDEX_BYTES) * 100.0).min(99.0) } else { 0.0 }; return ElectrsSyncStatus { indexed_height: 0, network_height, progress_pct, status, error, index_size, }; } Err(e) => { return ElectrsSyncStatus { indexed_height: 0, network_height, progress_pct: 0.0, status: "error".to_string(), error: Some(format!("Task: {}", e)), index_size, }; } }; let progress_pct = if network_height > 0 { (indexed_height as f64 / network_height as f64) * 100.0 } else { 0.0 }; let status = if indexed_height >= network_height.saturating_sub(1) { "synced" } else { "syncing" }; ElectrsSyncStatus { indexed_height, network_height, progress_pct, status: status.to_string(), error: None, index_size, } }