//! Electrs sync status: fetches indexed height from Electrum RPC and network height from Bitcoin Core. use anyhow::{Context, Result}; use serde::Serialize; use std::io::{BufRead, BufReader, Write}; use std::net::TcpStream; use std::time::Duration; const ELECTRS_HOST: &str = "127.0.0.1"; const ELECTRS_PORT: u16 = 50001; const BITCOIN_RPC_URL: &str = "http://127.0.0.1:8332/"; /// Build Bitcoin RPC Basic auth header from env vars. /// Falls back to cookie auth file if env vars are not set. fn bitcoin_rpc_auth() -> String { let user = std::env::var("BITCOIN_RPC_USER").unwrap_or_else(|_| "archipelago".to_string()); let pass = std::env::var("BITCOIN_RPC_PASSWORD").unwrap_or_else(|_| "archipelago123".to_string()); use base64::Engine; let encoded = base64::engine::general_purpose::STANDARD.encode(format!("{}:{}", user, pass)); format!("Basic {}", encoded) } #[derive(Debug, Serialize)] pub struct ElectrsSyncStatus { pub indexed_height: u64, pub network_height: u64, pub progress_pct: f64, pub status: String, pub error: Option, } /// Fetch electrs indexed height via Electrum protocol (TCP JSON-RPC). fn electrs_indexed_height() -> Result { let mut stream = TcpStream::connect((ELECTRS_HOST, ELECTRS_PORT)) .context("Failed to connect to electrs")?; stream .set_read_timeout(Some(Duration::from_secs(5))) .context("set_read_timeout")?; stream .set_write_timeout(Some(Duration::from_secs(5))) .context("set_write_timeout")?; // blockchain.numblocks.subscribe returns current block height directly let req = r#"{"id":1,"method":"blockchain.numblocks.subscribe","params":[]} "#; stream.write_all(req.as_bytes())?; stream.flush()?; let mut reader = BufReader::new(stream); let mut line = String::new(); reader.read_line(&mut line)?; let line = line.trim(); if line.is_empty() { anyhow::bail!("Empty response from electrs"); } let json: serde_json::Value = serde_json::from_str(line)?; // blockchain.numblocks.subscribe returns result as number; headers.subscribe returns {block_height: N} let height = json .get("result") .and_then(|r| r.as_u64()) .or_else(|| { json.get("result") .and_then(|r| r.get("block_height")) .and_then(|h| h.as_u64()) }) .context("Missing height in electrs response")?; Ok(height) } /// Fetch Bitcoin network height via JSON-RPC. async fn bitcoin_network_height() -> Result { let client = reqwest::Client::new(); let body = serde_json::json!({ "jsonrpc": "1.0", "id": "electrs-status", "method": "getblockcount", "params": [] }); let resp = client .post(BITCOIN_RPC_URL) .header("Content-Type", "application/json") .header("Authorization", bitcoin_rpc_auth()) .body(body.to_string()) .send() .await .context("Bitcoin RPC request failed")?; if !resp.status().is_success() { anyhow::bail!("Bitcoin RPC returned {}", resp.status()); } let json: serde_json::Value = resp.json().await?; let height = json .get("result") .and_then(|r| r.as_u64()) .context("Missing result in Bitcoin RPC")?; Ok(height) } /// Get electrs sync status. Runs blocking electrs call in spawn_blocking. pub async fn get_electrs_sync_status() -> ElectrsSyncStatus { let network_height = match bitcoin_network_height().await { Ok(h) => h, Err(e) => { return ElectrsSyncStatus { indexed_height: 0, network_height: 0, progress_pct: 0.0, status: "error".to_string(), error: Some(format!("Bitcoin RPC: {}", e)), }; } }; let indexed_height = match tokio::task::spawn_blocking(electrs_indexed_height).await { Ok(Ok(h)) => h, Ok(Err(e)) => { // Electrs doesn't listen on 50001 until indexing completes (can take hours) let err_msg = e.to_string(); let (status, error) = if err_msg.contains("connect") || err_msg.contains("Connection refused") { ( "indexing".to_string(), Some("Electrs is building the index. Electrum RPC will be available when indexing completes (may take hours).".to_string()), ) } else { ("error".to_string(), Some(format!("Electrs: {}", e))) }; return ElectrsSyncStatus { indexed_height: 0, network_height, progress_pct: 0.0, status, error, }; } Err(e) => { return ElectrsSyncStatus { indexed_height: 0, network_height, progress_pct: 0.0, status: "error".to_string(), error: Some(format!("Task: {}", e)), }; } }; let progress_pct = if network_height > 0 { (indexed_height as f64 / network_height as f64) * 100.0 } else { 0.0 }; let status = if indexed_height >= network_height.saturating_sub(1) { "synced" } else { "syncing" }; ElectrsSyncStatus { indexed_height, network_height, progress_pct, status: status.to_string(), error: None, } }